Issue #1794 Use a queue to hold ncf retrieval transfers

Amend:
  Peer review comments

Change-Id: Id200527b4f901134bf79f60a9137ffda484b8d21

Former-commit-id: 120bd1f8278c78878bb499ec6b08932174dae1a6
This commit is contained in:
Dustin Johnson 2013-03-19 16:02:15 -05:00
parent e5bcff21cb
commit b29eb94780
7 changed files with 145 additions and 105 deletions

View file

@ -35,14 +35,15 @@
<constructor-arg ref="retrievalDao" />
</bean>
<!-- Pick up SBN retrievals from the drop-off directory -->
<!-- Pick up SBN retrievals from the drop-off point -->
<bean id="sbnRetrievalQueue" class="java.util.concurrent.ConcurrentLinkedQueue" />
<bean id="sbnRetrievalTask"
class="com.raytheon.uf.edex.datadelivery.retrieval.handlers.RetrievalTask">
<constructor-arg value="SBN" />
<constructor-arg>
<bean
class="com.raytheon.uf.edex.datadelivery.retrieval.handlers.DeserializeRetrievedDataFromDirectory">
<constructor-arg value="${sbn.retrieval.transfer.directory}" />
class="com.raytheon.uf.edex.datadelivery.retrieval.handlers.DeserializeRetrievedDataFromIngest">
<constructor-arg ref="sbnRetrievalQueue" />
</bean>
</constructor-arg>
<constructor-arg>

View file

@ -19,21 +19,18 @@
**/
package com.raytheon.uf.edex.datadelivery.retrieval.handlers;
import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.xml.bind.JAXBException;
import com.raytheon.edex.esb.Headers;
import com.raytheon.uf.common.datadelivery.registry.Coverage;
import com.raytheon.uf.common.serialization.JAXBManager;
import com.raytheon.uf.common.util.CollectionUtil;
import com.raytheon.uf.common.util.FileUtil;
import com.raytheon.uf.edex.datadelivery.retrieval.opendap.OpenDapRetrievalResponse;
import com.raytheon.uf.edex.wmo.message.WMOMessage;
/**
* Deserializes the retrieved data in a directory.
* Deserializes the retrieved data in a retrievalQueue.
*
* <pre>
*
@ -43,30 +40,25 @@ import com.raytheon.uf.edex.wmo.message.WMOMessage;
* ------------ ---------- ----------- --------------------------
* Feb 01, 2013 1543 djohnson Initial creation
* Mar 05, 2013 1647 djohnson Remove WMO header.
* Mar 19, 2013 1794 djohnson Read from a queue rather than the file system.
*
* </pre>
*
* @author djohnson
* @version 1.0
*/
public class DeserializeRetrievedDataFromDirectory implements IRetrievalsFinder {
public class DeserializeRetrievedDataFromIngest implements IRetrievalsFinder {
private static final FileFilter NO_DIRECTORIES = new FileFilter() {
@Override
public boolean accept(File pathname) {
return pathname.isFile();
}
};
private final File directory;
private final ConcurrentLinkedQueue<String> retrievalQueue;
private final JAXBManager jaxbManager;
/**
* @param directory
* @param retrievalQueue
*/
public DeserializeRetrievedDataFromDirectory(File directory) {
this.directory = directory;
public DeserializeRetrievedDataFromIngest(
ConcurrentLinkedQueue<String> retrievalQueue) {
this.retrievalQueue = retrievalQueue;
try {
this.jaxbManager = new JAXBManager(RetrievalResponseXml.class,
OpenDapRetrievalResponse.class, Coverage.class);
@ -81,22 +73,16 @@ public class DeserializeRetrievedDataFromDirectory implements IRetrievalsFinder
*/
@Override
public RetrievalResponseXml findRetrievals() throws Exception {
String xml = retrievalQueue.poll();
final File[] files = directory.listFiles(NO_DIRECTORIES);
if (CollectionUtil.isNullOrEmpty(files)) {
if (xml == null) {
return null;
}
final File file = files[0];
try {
WMOMessage message = new WMOMessage(FileUtil.file2bytes(file),
new Headers());
} else {
WMOMessage message = new WMOMessage(xml, new Headers());
return (RetrievalResponseXml) jaxbManager
.unmarshalFromXml(new String(message.getMessageBody()));
} finally {
file.delete();
}
}
}

View file

@ -32,13 +32,16 @@ import com.raytheon.uf.common.datadelivery.retrieval.xml.Retrieval;
import com.raytheon.uf.common.datadelivery.retrieval.xml.RetrievalAttribute;
import com.raytheon.uf.common.dataplugin.PluginDataObject;
import com.raytheon.uf.common.event.EventBus;
import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.util.CollectionUtil;
import com.raytheon.uf.edex.datadelivery.retrieval.ServiceTypeFactory;
import com.raytheon.uf.edex.datadelivery.retrieval.adapters.RetrievalAdapter;
import com.raytheon.uf.edex.datadelivery.retrieval.adapters.RetrievalAdapter.TranslationException;
import com.raytheon.uf.edex.datadelivery.retrieval.db.IRetrievalDao;
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord;
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecordPK;
import com.raytheon.uf.edex.datadelivery.retrieval.interfaces.IRetrievalResponse;
import com.raytheon.uf.edex.datadelivery.retrieval.util.RetrievalPersistUtil;
@ -89,10 +92,17 @@ public class StoreRetrievedData implements IRetrievalPluginDataObjectsProcessor
@Override
public void processRetrievedPluginDataObjects(
RetrievalResponseXml retrievalPluginDataObjects)
throws Exception {
throws SerializationException, TranslationException {
Map<String, PluginDataObject[]> pluginDataObjects = Maps.newHashMap();
final RetrievalRequestRecord requestRecord = retrievalDao
.getById(retrievalPluginDataObjects.getRequestRecord());
final RetrievalRequestRecordPK id = retrievalPluginDataObjects
.getRequestRecord();
final RetrievalRequestRecord requestRecord = retrievalDao.getById(id);
if (requestRecord == null) {
statusHandler.warn("Unable to find retrieval by id [" + id
+ "]! Retrieval will not be processed...");
return;
}
final List<RetrievalResponseWrapper> retrievalAttributePluginDataObjects = retrievalPluginDataObjects
.getRetrievalAttributePluginDataObjects();

View file

@ -22,7 +22,6 @@ package com.raytheon.uf.edex.distribution;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -61,6 +60,7 @@ import com.raytheon.uf.common.status.UFStatus.Priority;
* 09/01/2010 4293 cjeanbap Logging of unknown Weather Products.
* Feb 27, 2013 1638 mschenke Cleaned up localization code to fix null pointer
* when no distribution files present
* Mar 19, 2013 1794 djohnson PatternWrapper is immutable, add toString() to it for debugging.
*
* </pre>
*
@ -70,15 +70,38 @@ import com.raytheon.uf.common.status.UFStatus.Priority;
public class DistributionSrv {
private static final transient IUFStatusHandler statusHandler = UFStatus
private static final IUFStatusHandler statusHandler = UFStatus
.getHandler(DistributionSrv.class);
private static class PatternWrapper {
String plugin;
private final String plugin;
RequestPatterns patterns;
private final RequestPatterns patterns;
String route;
private final String route;
private final String displayString;
private PatternWrapper(String plugin, String route,
RequestPatterns patterns) {
this.plugin = plugin;
this.route = route;
this.patterns = patterns;
this.displayString = createDisplayString();
}
private String createDisplayString() {
StringBuilder sb = new StringBuilder();
sb.append("plugin=").append(plugin).append(", ");
sb.append("route=").append(route).append(", ");
sb.append("patterns=").append(patterns);
return sb.toString();
}
@Override
public String toString() {
return displayString;
}
}
protected transient Log logger = LogFactory.getLog("Ingest");
@ -86,12 +109,12 @@ public class DistributionSrv {
protected transient Log routeFailedLogger = LogFactory
.getLog("RouteFailedLog");
private List<PatternWrapper> pluginPatterns = new ArrayList<PatternWrapper>(
private final List<PatternWrapper> pluginPatterns = new ArrayList<PatternWrapper>(
100);
private ConcurrentMap<String, PatternWrapper> patternMap = new ConcurrentHashMap<String, PatternWrapper>();
private final ConcurrentMap<String, PatternWrapper> patternMap = new ConcurrentHashMap<String, PatternWrapper>();
private ConcurrentMap<String, Long> modifiedTimes = new ConcurrentHashMap<String, Long>();
private final ConcurrentMap<String, Long> modifiedTimes = new ConcurrentHashMap<String, Long>();
public DistributionSrv() {
for (File file : getDistributionFiles()) {
@ -118,7 +141,9 @@ public class DistributionSrv {
"Change to distribution file detected. "
+ file.getName()
+ " has been modified. Reloading distribution patterns");
wrapper.patterns = loadPatterns(file, plugin);
wrapper = new PatternWrapper(wrapper.plugin,
wrapper.route, loadPatterns(file, plugin));
patternMap.put(plugin, wrapper);
modifiedTimes.put(file.getName(), file.lastModified());
} catch (DistributionException e) {
statusHandler.handle(Priority.PROBLEM,
@ -168,18 +193,18 @@ public class DistributionSrv {
+ " does not have an accompanying patterns file in localization.");
}
PatternWrapper wrapper = new PatternWrapper();
wrapper.plugin = pluginName;
wrapper.route = destination;
File modelFile = new File(path);
File siteModelFile = new File(sitePath);
RequestPatterns patterns = null;
if (siteModelFile.exists()) {
wrapper.patterns = loadPatterns(siteModelFile, pluginName);
patterns = loadPatterns(siteModelFile, pluginName);
} else if (modelFile.exists()) {
wrapper.patterns = loadPatterns(modelFile, pluginName);
patterns = loadPatterns(modelFile, pluginName);
} else {
wrapper.patterns = new RequestPatterns();
patterns = new RequestPatterns();
}
PatternWrapper wrapper = new PatternWrapper(pluginName, destination,
patterns);
patternMap.put(wrapper.plugin, wrapper);
pluginPatterns.add(wrapper);
return this;
@ -252,8 +277,8 @@ public class DistributionSrv {
throws DistributionException {
RequestPatterns patternSet = null;
try {
patternSet = (RequestPatterns) SerializationUtil
.jaxbUnmarshalFromXmlFile(modelFile.getPath());
patternSet = SerializationUtil
.jaxbUnmarshalFromXmlFile(RequestPatterns.class, modelFile.getPath());
} catch (Exception e) {
throw new DistributionException("File "
+ modelFile.getAbsolutePath()

View file

@ -20,6 +20,7 @@
package com.raytheon.uf.edex.distribution;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
@ -35,25 +36,26 @@ import org.apache.commons.logging.LogFactory;
import com.raytheon.uf.common.serialization.ISerializableObject;
/**
* A container of regular expressions, both original strings and
* the compiled patterns. Used by the DistributionSrv bean to
* store regex patterns for plugins. It is important to note that
* no validation is done on duplicate regex patterns.
* A container of regular expressions, both original strings and the compiled
* patterns. Used by the DistributionSrv bean to store regex patterns for
* plugins. It is important to note that no validation is done on duplicate
* regex patterns.
*
* <pre>
*
*
* SOFTWARE HISTORY
*
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Oct 20, 2009 brockwoo Initial creation
* May 16, 2011 7317 cjeanbap Added try-catch statement
* for PatternSyntaxException.
*
* Mar 19, 2013 1794 djohnson Add toString() for debugging.
*
* </pre>
*
*
* @author brockwoo
* @version 1.0
* @version 1.0
*/
@XmlRootElement(name = "requestPatterns")
@ -64,9 +66,9 @@ public class RequestPatterns implements ISerializableObject{
* List of patterns requested by a plugin.
*/
@XmlElements( { @XmlElement(name = "regex", type = String.class) })
private ArrayList<String> patterns;
private List<String> patterns = new ArrayList<String>();
private ArrayList<Pattern> compiledPatterns;
private final List<Pattern> compiledPatterns = new ArrayList<Pattern>();
protected transient Log patternFailedLogger = LogFactory.getLog("PatternFailedLog");
@ -74,8 +76,6 @@ public class RequestPatterns implements ISerializableObject{
* Creates a new instance of the container.
*/
public RequestPatterns(){
this.patterns = new ArrayList<String>();
this.compiledPatterns = new ArrayList<Pattern>();
}
/**
@ -83,7 +83,7 @@ public class RequestPatterns implements ISerializableObject{
*
* @return a list of regex pattern strings
*/
public ArrayList<String> getPatterns() {
public List<String> getPatterns() {
return patterns;
}
@ -92,7 +92,7 @@ public class RequestPatterns implements ISerializableObject{
*
* @param patterns an arraylist of regex strings
*/
public void setPatterns(ArrayList<String> patterns) {
public void setPatterns(List<String> patterns) {
this.patterns = patterns;
}
@ -140,4 +140,9 @@ public class RequestPatterns implements ISerializableObject{
}
return isFound;
}
@Override
public String toString() {
return patterns.toString();
}
}

View file

@ -19,22 +19,28 @@
**/
package com.raytheon.uf.edex.datadelivery.retrieval.handlers;
import static com.raytheon.uf.common.util.Matchers.hasNoFiles;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.junit.BeforeClass;
import org.junit.Test;
import com.raytheon.uf.common.localization.PathManagerFactoryTest;
import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.common.util.FileUtil;
import com.raytheon.uf.common.util.TestUtil;
import com.raytheon.uf.common.util.file.FilenameFilters;
/**
* Test {@link DeserializeRetrievedDataFromDirectory}.
* Test {@link DeserializeRetrievedDataFromIngest}.
*
* <pre>
*
@ -46,18 +52,21 @@ import com.raytheon.uf.common.util.TestUtil;
* Feb 12, 2013 1543 djohnson Can only test the retrieval response is now not null.
* Feb 15, 2013 1543 djohnson Some renames.
* Mar 05, 2013 1647 djohnson Pass wmo header strategy to constructor.
* Mar 19, 2013 1794 djohnson Read from a queue rather than the file system.
*
* </pre>
*
* @author djohnson
* @version 1.0
*/
public class DeserializeRetrievedDataFromDirectoryTest {
public class DeserializeRetrievedDataFromIngestTest {
private final File directory = TestUtil
.setupTestClassDir(DeserializeRetrievedDataFromDirectoryTest.class);
.setupTestClassDir(DeserializeRetrievedDataFromIngestTest.class);
private final DeserializeRetrievedDataFromDirectory service = new DeserializeRetrievedDataFromDirectory(
directory);
private final ConcurrentLinkedQueue<String> retrievalQueue = new ConcurrentLinkedQueue<String>();
private final DeserializeRetrievedDataFromIngest service = new DeserializeRetrievedDataFromIngest(
retrievalQueue);
@BeforeClass
public static void classSetUp() {
@ -65,28 +74,40 @@ public class DeserializeRetrievedDataFromDirectoryTest {
}
@Test
public void deserializesRetrievedDataFromAFileInTheTargetDirectory()
public void deserializesRetrievedDataFromTheQueue()
throws Exception {
RetrievalResponseXml retrievalPluginDataObjects = RetrievalPluginDataObjectsFixture.INSTANCE
.get();
addRetrievalToQueue();
new SerializeRetrievedDataToDirectory(directory,
new AlwaysSameWmoHeader("SMYG10 LYBM 280000"))
.processRetrievedPluginDataObjects(retrievalPluginDataObjects);
final RetrievalResponseXml restored = service
.findRetrievals();
final RetrievalResponseXml restored = service.findRetrievals();
// Just make sure the payload is present
assertThat(restored.getRetrievalAttributePluginDataObjects().get(0)
.getRetrievalResponse(), is(notNullValue()));
}
@Test
public void deletesFileAfterRetrievingFromTheTargetDirectory()
public void removesFromQueueWhileRetrieving()
throws Exception {
addRetrievalToQueue();
service.findRetrievals();
assertThat(retrievalQueue, is(empty()));
}
@Test
public void returnsNullWhenNothingInTheQueue() throws Exception {
final RetrievalResponseXml restored = service.findRetrievals();
assertNull(restored);
}
private void addRetrievalToQueue() throws SerializationException,
IOException {
RetrievalResponseXml retrievalPluginDataObjects = RetrievalPluginDataObjectsFixture.INSTANCE
.get();
@ -94,25 +115,8 @@ public class DeserializeRetrievedDataFromDirectoryTest {
new AlwaysSameWmoHeader("SMYG10 LYBM 280000"))
.processRetrievedPluginDataObjects(retrievalPluginDataObjects);
service.findRetrievals();
assertThat(directory, hasNoFiles());
}
@Test
public void ignoresSubDirectories() throws Exception {
new File(directory, "subDir1").mkdirs();
service.findRetrievals();
}
@Test
public void returnsNullWhenNoFileInTheTargetDirectory() throws Exception {
final RetrievalResponseXml restored = service
.findRetrievals();
assertNull(restored);
final List<File> files = FileUtil.listFiles(directory,
FilenameFilters.ACCEPT_FILES, false);
retrievalQueue.add(FileUtil.file2String(files.get(0)));
}
}

View file

@ -31,6 +31,7 @@ import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.junit.Before;
import org.junit.Ignore;
@ -53,8 +54,10 @@ import com.raytheon.uf.common.event.EventBus;
import com.raytheon.uf.common.localization.PathManagerFactoryTest;
import com.raytheon.uf.common.registry.handler.RegistryHandlerException;
import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.common.util.FileUtil;
import com.raytheon.uf.common.util.SpringFiles;
import com.raytheon.uf.common.util.TestUtil;
import com.raytheon.uf.common.util.file.FilenameFilters;
import com.raytheon.uf.edex.database.DataAccessLayerException;
import com.raytheon.uf.edex.database.dao.DatabaseUtil;
import com.raytheon.uf.edex.datadelivery.retrieval.ServiceTypeFactory;
@ -78,6 +81,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.interfaces.IRetrievalResponse
* Feb 12, 2013 1543 djohnson Retrieval responses are now sent further down the chain.
* Feb 15, 2013 1543 djohnson Class renames.
* Mar 05, 2013 1647 djohnson Pass wmo header strategy to constructor.
* Mar 19, 2013 1794 djohnson RetrievalTasks integrate at a queue.
*
* </pre>
*
@ -221,17 +225,17 @@ public class RetrievalTaskTest {
IRetrievalsFinder retrievalDataFinder = new PerformRetrievalsThenReturnFinder(
Network.OPSNET, dao);
final ConcurrentLinkedQueue<String> retrievalQueue = new ConcurrentLinkedQueue<String>();
final File testDirectory = TestUtil
.setupTestClassDir(RetrievalTaskTest.class);
IRetrievalPluginDataObjectsProcessor serializeToDirectory = new SerializeRetrievedDataToDirectory(
testDirectory, new AlwaysSameWmoHeader(
"SMYG10 LYBM 280000"));
testDirectory, new AlwaysSameWmoHeader("SMYG10 LYBM 280000"));
RetrievalTask downloadTask = new RetrievalTask(Network.OPSNET,
retrievalDataFinder, serializeToDirectory,
mock(IRetrievalResponseCompleter.class), dao);
RetrievalTask readDownloadsTask = new RetrievalTask(Network.OPSNET,
new DeserializeRetrievedDataFromDirectory(testDirectory),
new DeserializeRetrievedDataFromIngest(retrievalQueue),
retrievedDataProcessor, new RetrievalResponseCompleter(
mock(SubscriptionNotifyTask.class), dao), dao);
@ -242,6 +246,11 @@ public class RetrievalTaskTest {
assertThat(request.getState(), is(State.RUNNING));
}
for (File file : FileUtil.listFiles(testDirectory,
FilenameFilters.ACCEPT_FILES, false)) {
retrievalQueue.add(FileUtil.file2String(file));
}
readDownloadsTask.run();
final List<RetrievalRequestRecord> allRetrievals = dao.getAll();