Issue #1794 Use a queue to hold ncf retrieval transfers
Amend: Peer review comments Change-Id: Id200527b4f901134bf79f60a9137ffda484b8d21 Former-commit-id:ebce91c15d
[formerlyb29eb94780
[formerly 120bd1f8278c78878bb499ec6b08932174dae1a6]] Former-commit-id:b29eb94780
Former-commit-id:3d87218f1a
This commit is contained in:
parent
28cd17759e
commit
e8c9726d51
7 changed files with 145 additions and 105 deletions
|
@ -35,14 +35,15 @@
|
|||
<constructor-arg ref="retrievalDao" />
|
||||
</bean>
|
||||
|
||||
<!-- Pick up SBN retrievals from the drop-off directory -->
|
||||
<!-- Pick up SBN retrievals from the drop-off point -->
|
||||
<bean id="sbnRetrievalQueue" class="java.util.concurrent.ConcurrentLinkedQueue" />
|
||||
<bean id="sbnRetrievalTask"
|
||||
class="com.raytheon.uf.edex.datadelivery.retrieval.handlers.RetrievalTask">
|
||||
<constructor-arg value="SBN" />
|
||||
<constructor-arg>
|
||||
<bean
|
||||
class="com.raytheon.uf.edex.datadelivery.retrieval.handlers.DeserializeRetrievedDataFromDirectory">
|
||||
<constructor-arg value="${sbn.retrieval.transfer.directory}" />
|
||||
class="com.raytheon.uf.edex.datadelivery.retrieval.handlers.DeserializeRetrievedDataFromIngest">
|
||||
<constructor-arg ref="sbnRetrievalQueue" />
|
||||
</bean>
|
||||
</constructor-arg>
|
||||
<constructor-arg>
|
||||
|
|
|
@ -19,21 +19,18 @@
|
|||
**/
|
||||
package com.raytheon.uf.edex.datadelivery.retrieval.handlers;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileFilter;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
import javax.xml.bind.JAXBException;
|
||||
|
||||
import com.raytheon.edex.esb.Headers;
|
||||
import com.raytheon.uf.common.datadelivery.registry.Coverage;
|
||||
import com.raytheon.uf.common.serialization.JAXBManager;
|
||||
import com.raytheon.uf.common.util.CollectionUtil;
|
||||
import com.raytheon.uf.common.util.FileUtil;
|
||||
import com.raytheon.uf.edex.datadelivery.retrieval.opendap.OpenDapRetrievalResponse;
|
||||
import com.raytheon.uf.edex.wmo.message.WMOMessage;
|
||||
|
||||
/**
|
||||
* Deserializes the retrieved data in a directory.
|
||||
* Deserializes the retrieved data in a retrievalQueue.
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
|
@ -43,30 +40,25 @@ import com.raytheon.uf.edex.wmo.message.WMOMessage;
|
|||
* ------------ ---------- ----------- --------------------------
|
||||
* Feb 01, 2013 1543 djohnson Initial creation
|
||||
* Mar 05, 2013 1647 djohnson Remove WMO header.
|
||||
* Mar 19, 2013 1794 djohnson Read from a queue rather than the file system.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
* @author djohnson
|
||||
* @version 1.0
|
||||
*/
|
||||
public class DeserializeRetrievedDataFromDirectory implements IRetrievalsFinder {
|
||||
public class DeserializeRetrievedDataFromIngest implements IRetrievalsFinder {
|
||||
|
||||
private static final FileFilter NO_DIRECTORIES = new FileFilter() {
|
||||
@Override
|
||||
public boolean accept(File pathname) {
|
||||
return pathname.isFile();
|
||||
}
|
||||
};
|
||||
|
||||
private final File directory;
|
||||
private final ConcurrentLinkedQueue<String> retrievalQueue;
|
||||
|
||||
private final JAXBManager jaxbManager;
|
||||
|
||||
/**
|
||||
* @param directory
|
||||
* @param retrievalQueue
|
||||
*/
|
||||
public DeserializeRetrievedDataFromDirectory(File directory) {
|
||||
this.directory = directory;
|
||||
public DeserializeRetrievedDataFromIngest(
|
||||
ConcurrentLinkedQueue<String> retrievalQueue) {
|
||||
this.retrievalQueue = retrievalQueue;
|
||||
try {
|
||||
this.jaxbManager = new JAXBManager(RetrievalResponseXml.class,
|
||||
OpenDapRetrievalResponse.class, Coverage.class);
|
||||
|
@ -81,22 +73,16 @@ public class DeserializeRetrievedDataFromDirectory implements IRetrievalsFinder
|
|||
*/
|
||||
@Override
|
||||
public RetrievalResponseXml findRetrievals() throws Exception {
|
||||
String xml = retrievalQueue.poll();
|
||||
|
||||
final File[] files = directory.listFiles(NO_DIRECTORIES);
|
||||
|
||||
if (CollectionUtil.isNullOrEmpty(files)) {
|
||||
if (xml == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final File file = files[0];
|
||||
try {
|
||||
WMOMessage message = new WMOMessage(FileUtil.file2bytes(file),
|
||||
new Headers());
|
||||
} else {
|
||||
WMOMessage message = new WMOMessage(xml, new Headers());
|
||||
return (RetrievalResponseXml) jaxbManager
|
||||
.unmarshalFromXml(new String(message.getMessageBody()));
|
||||
} finally {
|
||||
file.delete();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -32,13 +32,16 @@ import com.raytheon.uf.common.datadelivery.retrieval.xml.Retrieval;
|
|||
import com.raytheon.uf.common.datadelivery.retrieval.xml.RetrievalAttribute;
|
||||
import com.raytheon.uf.common.dataplugin.PluginDataObject;
|
||||
import com.raytheon.uf.common.event.EventBus;
|
||||
import com.raytheon.uf.common.serialization.SerializationException;
|
||||
import com.raytheon.uf.common.status.IUFStatusHandler;
|
||||
import com.raytheon.uf.common.status.UFStatus;
|
||||
import com.raytheon.uf.common.util.CollectionUtil;
|
||||
import com.raytheon.uf.edex.datadelivery.retrieval.ServiceTypeFactory;
|
||||
import com.raytheon.uf.edex.datadelivery.retrieval.adapters.RetrievalAdapter;
|
||||
import com.raytheon.uf.edex.datadelivery.retrieval.adapters.RetrievalAdapter.TranslationException;
|
||||
import com.raytheon.uf.edex.datadelivery.retrieval.db.IRetrievalDao;
|
||||
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord;
|
||||
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecordPK;
|
||||
import com.raytheon.uf.edex.datadelivery.retrieval.interfaces.IRetrievalResponse;
|
||||
import com.raytheon.uf.edex.datadelivery.retrieval.util.RetrievalPersistUtil;
|
||||
|
||||
|
@ -89,10 +92,17 @@ public class StoreRetrievedData implements IRetrievalPluginDataObjectsProcessor
|
|||
@Override
|
||||
public void processRetrievedPluginDataObjects(
|
||||
RetrievalResponseXml retrievalPluginDataObjects)
|
||||
throws Exception {
|
||||
throws SerializationException, TranslationException {
|
||||
Map<String, PluginDataObject[]> pluginDataObjects = Maps.newHashMap();
|
||||
final RetrievalRequestRecord requestRecord = retrievalDao
|
||||
.getById(retrievalPluginDataObjects.getRequestRecord());
|
||||
final RetrievalRequestRecordPK id = retrievalPluginDataObjects
|
||||
.getRequestRecord();
|
||||
final RetrievalRequestRecord requestRecord = retrievalDao.getById(id);
|
||||
|
||||
if (requestRecord == null) {
|
||||
statusHandler.warn("Unable to find retrieval by id [" + id
|
||||
+ "]! Retrieval will not be processed...");
|
||||
return;
|
||||
}
|
||||
|
||||
final List<RetrievalResponseWrapper> retrievalAttributePluginDataObjects = retrievalPluginDataObjects
|
||||
.getRetrievalAttributePluginDataObjects();
|
||||
|
|
|
@ -22,7 +22,6 @@ package com.raytheon.uf.edex.distribution;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -61,6 +60,7 @@ import com.raytheon.uf.common.status.UFStatus.Priority;
|
|||
* 09/01/2010 4293 cjeanbap Logging of unknown Weather Products.
|
||||
* Feb 27, 2013 1638 mschenke Cleaned up localization code to fix null pointer
|
||||
* when no distribution files present
|
||||
* Mar 19, 2013 1794 djohnson PatternWrapper is immutable, add toString() to it for debugging.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -70,15 +70,38 @@ import com.raytheon.uf.common.status.UFStatus.Priority;
|
|||
|
||||
public class DistributionSrv {
|
||||
|
||||
private static final transient IUFStatusHandler statusHandler = UFStatus
|
||||
private static final IUFStatusHandler statusHandler = UFStatus
|
||||
.getHandler(DistributionSrv.class);
|
||||
|
||||
private static class PatternWrapper {
|
||||
String plugin;
|
||||
private final String plugin;
|
||||
|
||||
RequestPatterns patterns;
|
||||
private final RequestPatterns patterns;
|
||||
|
||||
String route;
|
||||
private final String route;
|
||||
|
||||
private final String displayString;
|
||||
|
||||
private PatternWrapper(String plugin, String route,
|
||||
RequestPatterns patterns) {
|
||||
this.plugin = plugin;
|
||||
this.route = route;
|
||||
this.patterns = patterns;
|
||||
this.displayString = createDisplayString();
|
||||
}
|
||||
|
||||
private String createDisplayString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("plugin=").append(plugin).append(", ");
|
||||
sb.append("route=").append(route).append(", ");
|
||||
sb.append("patterns=").append(patterns);
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return displayString;
|
||||
}
|
||||
}
|
||||
|
||||
protected transient Log logger = LogFactory.getLog("Ingest");
|
||||
|
@ -86,12 +109,12 @@ public class DistributionSrv {
|
|||
protected transient Log routeFailedLogger = LogFactory
|
||||
.getLog("RouteFailedLog");
|
||||
|
||||
private List<PatternWrapper> pluginPatterns = new ArrayList<PatternWrapper>(
|
||||
private final List<PatternWrapper> pluginPatterns = new ArrayList<PatternWrapper>(
|
||||
100);
|
||||
|
||||
private ConcurrentMap<String, PatternWrapper> patternMap = new ConcurrentHashMap<String, PatternWrapper>();
|
||||
private final ConcurrentMap<String, PatternWrapper> patternMap = new ConcurrentHashMap<String, PatternWrapper>();
|
||||
|
||||
private ConcurrentMap<String, Long> modifiedTimes = new ConcurrentHashMap<String, Long>();
|
||||
private final ConcurrentMap<String, Long> modifiedTimes = new ConcurrentHashMap<String, Long>();
|
||||
|
||||
public DistributionSrv() {
|
||||
for (File file : getDistributionFiles()) {
|
||||
|
@ -118,7 +141,9 @@ public class DistributionSrv {
|
|||
"Change to distribution file detected. "
|
||||
+ file.getName()
|
||||
+ " has been modified. Reloading distribution patterns");
|
||||
wrapper.patterns = loadPatterns(file, plugin);
|
||||
wrapper = new PatternWrapper(wrapper.plugin,
|
||||
wrapper.route, loadPatterns(file, plugin));
|
||||
patternMap.put(plugin, wrapper);
|
||||
modifiedTimes.put(file.getName(), file.lastModified());
|
||||
} catch (DistributionException e) {
|
||||
statusHandler.handle(Priority.PROBLEM,
|
||||
|
@ -168,18 +193,18 @@ public class DistributionSrv {
|
|||
+ " does not have an accompanying patterns file in localization.");
|
||||
}
|
||||
|
||||
PatternWrapper wrapper = new PatternWrapper();
|
||||
wrapper.plugin = pluginName;
|
||||
wrapper.route = destination;
|
||||
File modelFile = new File(path);
|
||||
File siteModelFile = new File(sitePath);
|
||||
RequestPatterns patterns = null;
|
||||
if (siteModelFile.exists()) {
|
||||
wrapper.patterns = loadPatterns(siteModelFile, pluginName);
|
||||
patterns = loadPatterns(siteModelFile, pluginName);
|
||||
} else if (modelFile.exists()) {
|
||||
wrapper.patterns = loadPatterns(modelFile, pluginName);
|
||||
patterns = loadPatterns(modelFile, pluginName);
|
||||
} else {
|
||||
wrapper.patterns = new RequestPatterns();
|
||||
patterns = new RequestPatterns();
|
||||
}
|
||||
PatternWrapper wrapper = new PatternWrapper(pluginName, destination,
|
||||
patterns);
|
||||
patternMap.put(wrapper.plugin, wrapper);
|
||||
pluginPatterns.add(wrapper);
|
||||
return this;
|
||||
|
@ -252,8 +277,8 @@ public class DistributionSrv {
|
|||
throws DistributionException {
|
||||
RequestPatterns patternSet = null;
|
||||
try {
|
||||
patternSet = (RequestPatterns) SerializationUtil
|
||||
.jaxbUnmarshalFromXmlFile(modelFile.getPath());
|
||||
patternSet = SerializationUtil
|
||||
.jaxbUnmarshalFromXmlFile(RequestPatterns.class, modelFile.getPath());
|
||||
} catch (Exception e) {
|
||||
throw new DistributionException("File "
|
||||
+ modelFile.getAbsolutePath()
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package com.raytheon.uf.edex.distribution;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.regex.PatternSyntaxException;
|
||||
|
||||
|
@ -35,10 +36,10 @@ import org.apache.commons.logging.LogFactory;
|
|||
import com.raytheon.uf.common.serialization.ISerializableObject;
|
||||
|
||||
/**
|
||||
* A container of regular expressions, both original strings and
|
||||
* the compiled patterns. Used by the DistributionSrv bean to
|
||||
* store regex patterns for plugins. It is important to note that
|
||||
* no validation is done on duplicate regex patterns.
|
||||
* A container of regular expressions, both original strings and the compiled
|
||||
* patterns. Used by the DistributionSrv bean to store regex patterns for
|
||||
* plugins. It is important to note that no validation is done on duplicate
|
||||
* regex patterns.
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
|
@ -49,6 +50,7 @@ import com.raytheon.uf.common.serialization.ISerializableObject;
|
|||
* Oct 20, 2009 brockwoo Initial creation
|
||||
* May 16, 2011 7317 cjeanbap Added try-catch statement
|
||||
* for PatternSyntaxException.
|
||||
* Mar 19, 2013 1794 djohnson Add toString() for debugging.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -64,9 +66,9 @@ public class RequestPatterns implements ISerializableObject{
|
|||
* List of patterns requested by a plugin.
|
||||
*/
|
||||
@XmlElements( { @XmlElement(name = "regex", type = String.class) })
|
||||
private ArrayList<String> patterns;
|
||||
private List<String> patterns = new ArrayList<String>();
|
||||
|
||||
private ArrayList<Pattern> compiledPatterns;
|
||||
private final List<Pattern> compiledPatterns = new ArrayList<Pattern>();
|
||||
|
||||
protected transient Log patternFailedLogger = LogFactory.getLog("PatternFailedLog");
|
||||
|
||||
|
@ -74,8 +76,6 @@ public class RequestPatterns implements ISerializableObject{
|
|||
* Creates a new instance of the container.
|
||||
*/
|
||||
public RequestPatterns(){
|
||||
this.patterns = new ArrayList<String>();
|
||||
this.compiledPatterns = new ArrayList<Pattern>();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -83,7 +83,7 @@ public class RequestPatterns implements ISerializableObject{
|
|||
*
|
||||
* @return a list of regex pattern strings
|
||||
*/
|
||||
public ArrayList<String> getPatterns() {
|
||||
public List<String> getPatterns() {
|
||||
return patterns;
|
||||
}
|
||||
|
||||
|
@ -92,7 +92,7 @@ public class RequestPatterns implements ISerializableObject{
|
|||
*
|
||||
* @param patterns an arraylist of regex strings
|
||||
*/
|
||||
public void setPatterns(ArrayList<String> patterns) {
|
||||
public void setPatterns(List<String> patterns) {
|
||||
this.patterns = patterns;
|
||||
}
|
||||
|
||||
|
@ -140,4 +140,9 @@ public class RequestPatterns implements ISerializableObject{
|
|||
}
|
||||
return isFound;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return patterns.toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,22 +19,28 @@
|
|||
**/
|
||||
package com.raytheon.uf.edex.datadelivery.retrieval.handlers;
|
||||
|
||||
import static com.raytheon.uf.common.util.Matchers.hasNoFiles;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.raytheon.uf.common.localization.PathManagerFactoryTest;
|
||||
import com.raytheon.uf.common.serialization.SerializationException;
|
||||
import com.raytheon.uf.common.util.FileUtil;
|
||||
import com.raytheon.uf.common.util.TestUtil;
|
||||
import com.raytheon.uf.common.util.file.FilenameFilters;
|
||||
|
||||
/**
|
||||
* Test {@link DeserializeRetrievedDataFromDirectory}.
|
||||
* Test {@link DeserializeRetrievedDataFromIngest}.
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
|
@ -46,18 +52,21 @@ import com.raytheon.uf.common.util.TestUtil;
|
|||
* Feb 12, 2013 1543 djohnson Can only test the retrieval response is now not null.
|
||||
* Feb 15, 2013 1543 djohnson Some renames.
|
||||
* Mar 05, 2013 1647 djohnson Pass wmo header strategy to constructor.
|
||||
* Mar 19, 2013 1794 djohnson Read from a queue rather than the file system.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
* @author djohnson
|
||||
* @version 1.0
|
||||
*/
|
||||
public class DeserializeRetrievedDataFromDirectoryTest {
|
||||
public class DeserializeRetrievedDataFromIngestTest {
|
||||
private final File directory = TestUtil
|
||||
.setupTestClassDir(DeserializeRetrievedDataFromDirectoryTest.class);
|
||||
.setupTestClassDir(DeserializeRetrievedDataFromIngestTest.class);
|
||||
|
||||
private final DeserializeRetrievedDataFromDirectory service = new DeserializeRetrievedDataFromDirectory(
|
||||
directory);
|
||||
private final ConcurrentLinkedQueue<String> retrievalQueue = new ConcurrentLinkedQueue<String>();
|
||||
|
||||
private final DeserializeRetrievedDataFromIngest service = new DeserializeRetrievedDataFromIngest(
|
||||
retrievalQueue);
|
||||
|
||||
@BeforeClass
|
||||
public static void classSetUp() {
|
||||
|
@ -65,28 +74,40 @@ public class DeserializeRetrievedDataFromDirectoryTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void deserializesRetrievedDataFromAFileInTheTargetDirectory()
|
||||
public void deserializesRetrievedDataFromTheQueue()
|
||||
throws Exception {
|
||||
|
||||
RetrievalResponseXml retrievalPluginDataObjects = RetrievalPluginDataObjectsFixture.INSTANCE
|
||||
.get();
|
||||
addRetrievalToQueue();
|
||||
|
||||
new SerializeRetrievedDataToDirectory(directory,
|
||||
new AlwaysSameWmoHeader("SMYG10 LYBM 280000"))
|
||||
.processRetrievedPluginDataObjects(retrievalPluginDataObjects);
|
||||
|
||||
final RetrievalResponseXml restored = service
|
||||
.findRetrievals();
|
||||
final RetrievalResponseXml restored = service.findRetrievals();
|
||||
|
||||
// Just make sure the payload is present
|
||||
assertThat(restored.getRetrievalAttributePluginDataObjects().get(0)
|
||||
.getRetrievalResponse(), is(notNullValue()));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void deletesFileAfterRetrievingFromTheTargetDirectory()
|
||||
public void removesFromQueueWhileRetrieving()
|
||||
throws Exception {
|
||||
|
||||
addRetrievalToQueue();
|
||||
|
||||
service.findRetrievals();
|
||||
|
||||
assertThat(retrievalQueue, is(empty()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void returnsNullWhenNothingInTheQueue() throws Exception {
|
||||
|
||||
final RetrievalResponseXml restored = service.findRetrievals();
|
||||
|
||||
assertNull(restored);
|
||||
}
|
||||
|
||||
private void addRetrievalToQueue() throws SerializationException,
|
||||
IOException {
|
||||
RetrievalResponseXml retrievalPluginDataObjects = RetrievalPluginDataObjectsFixture.INSTANCE
|
||||
.get();
|
||||
|
||||
|
@ -94,25 +115,8 @@ public class DeserializeRetrievedDataFromDirectoryTest {
|
|||
new AlwaysSameWmoHeader("SMYG10 LYBM 280000"))
|
||||
.processRetrievedPluginDataObjects(retrievalPluginDataObjects);
|
||||
|
||||
service.findRetrievals();
|
||||
|
||||
assertThat(directory, hasNoFiles());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void ignoresSubDirectories() throws Exception {
|
||||
|
||||
new File(directory, "subDir1").mkdirs();
|
||||
|
||||
service.findRetrievals();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void returnsNullWhenNoFileInTheTargetDirectory() throws Exception {
|
||||
|
||||
final RetrievalResponseXml restored = service
|
||||
.findRetrievals();
|
||||
|
||||
assertNull(restored);
|
||||
final List<File> files = FileUtil.listFiles(directory,
|
||||
FilenameFilters.ACCEPT_FILES, false);
|
||||
retrievalQueue.add(FileUtil.file2String(files.get(0)));
|
||||
}
|
||||
}
|
|
@ -31,6 +31,7 @@ import java.util.Arrays;
|
|||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
|
@ -53,8 +54,10 @@ import com.raytheon.uf.common.event.EventBus;
|
|||
import com.raytheon.uf.common.localization.PathManagerFactoryTest;
|
||||
import com.raytheon.uf.common.registry.handler.RegistryHandlerException;
|
||||
import com.raytheon.uf.common.serialization.SerializationException;
|
||||
import com.raytheon.uf.common.util.FileUtil;
|
||||
import com.raytheon.uf.common.util.SpringFiles;
|
||||
import com.raytheon.uf.common.util.TestUtil;
|
||||
import com.raytheon.uf.common.util.file.FilenameFilters;
|
||||
import com.raytheon.uf.edex.database.DataAccessLayerException;
|
||||
import com.raytheon.uf.edex.database.dao.DatabaseUtil;
|
||||
import com.raytheon.uf.edex.datadelivery.retrieval.ServiceTypeFactory;
|
||||
|
@ -78,6 +81,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.interfaces.IRetrievalResponse
|
|||
* Feb 12, 2013 1543 djohnson Retrieval responses are now sent further down the chain.
|
||||
* Feb 15, 2013 1543 djohnson Class renames.
|
||||
* Mar 05, 2013 1647 djohnson Pass wmo header strategy to constructor.
|
||||
* Mar 19, 2013 1794 djohnson RetrievalTasks integrate at a queue.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -221,17 +225,17 @@ public class RetrievalTaskTest {
|
|||
IRetrievalsFinder retrievalDataFinder = new PerformRetrievalsThenReturnFinder(
|
||||
Network.OPSNET, dao);
|
||||
|
||||
final ConcurrentLinkedQueue<String> retrievalQueue = new ConcurrentLinkedQueue<String>();
|
||||
final File testDirectory = TestUtil
|
||||
.setupTestClassDir(RetrievalTaskTest.class);
|
||||
IRetrievalPluginDataObjectsProcessor serializeToDirectory = new SerializeRetrievedDataToDirectory(
|
||||
testDirectory, new AlwaysSameWmoHeader(
|
||||
"SMYG10 LYBM 280000"));
|
||||
testDirectory, new AlwaysSameWmoHeader("SMYG10 LYBM 280000"));
|
||||
|
||||
RetrievalTask downloadTask = new RetrievalTask(Network.OPSNET,
|
||||
retrievalDataFinder, serializeToDirectory,
|
||||
mock(IRetrievalResponseCompleter.class), dao);
|
||||
RetrievalTask readDownloadsTask = new RetrievalTask(Network.OPSNET,
|
||||
new DeserializeRetrievedDataFromDirectory(testDirectory),
|
||||
new DeserializeRetrievedDataFromIngest(retrievalQueue),
|
||||
retrievedDataProcessor, new RetrievalResponseCompleter(
|
||||
mock(SubscriptionNotifyTask.class), dao), dao);
|
||||
|
||||
|
@ -242,6 +246,11 @@ public class RetrievalTaskTest {
|
|||
assertThat(request.getState(), is(State.RUNNING));
|
||||
}
|
||||
|
||||
for (File file : FileUtil.listFiles(testDirectory,
|
||||
FilenameFilters.ACCEPT_FILES, false)) {
|
||||
retrievalQueue.add(FileUtil.file2String(file));
|
||||
}
|
||||
|
||||
readDownloadsTask.run();
|
||||
|
||||
final List<RetrievalRequestRecord> allRetrievals = dao.getAll();
|
||||
|
|
Loading…
Add table
Reference in a new issue