Issue #1794 Use a queue to hold ncf retrieval transfers
Amend: Peer review comments Change-Id: Id200527b4f901134bf79f60a9137ffda484b8d21 Former-commit-id:3d87218f1a
[formerlyb29eb94780
] [formerly3d87218f1a
[formerlyb29eb94780
] [formerlyebce91c15d
[formerly 120bd1f8278c78878bb499ec6b08932174dae1a6]]] Former-commit-id:ebce91c15d
Former-commit-id:e3a5458f1a
[formerlyc88d9dd6cf
] Former-commit-id:67f055ca39
This commit is contained in:
parent
f1337b3561
commit
eaf2a061b5
7 changed files with 145 additions and 105 deletions
|
@ -35,14 +35,15 @@
|
||||||
<constructor-arg ref="retrievalDao" />
|
<constructor-arg ref="retrievalDao" />
|
||||||
</bean>
|
</bean>
|
||||||
|
|
||||||
<!-- Pick up SBN retrievals from the drop-off directory -->
|
<!-- Pick up SBN retrievals from the drop-off point -->
|
||||||
|
<bean id="sbnRetrievalQueue" class="java.util.concurrent.ConcurrentLinkedQueue" />
|
||||||
<bean id="sbnRetrievalTask"
|
<bean id="sbnRetrievalTask"
|
||||||
class="com.raytheon.uf.edex.datadelivery.retrieval.handlers.RetrievalTask">
|
class="com.raytheon.uf.edex.datadelivery.retrieval.handlers.RetrievalTask">
|
||||||
<constructor-arg value="SBN" />
|
<constructor-arg value="SBN" />
|
||||||
<constructor-arg>
|
<constructor-arg>
|
||||||
<bean
|
<bean
|
||||||
class="com.raytheon.uf.edex.datadelivery.retrieval.handlers.DeserializeRetrievedDataFromDirectory">
|
class="com.raytheon.uf.edex.datadelivery.retrieval.handlers.DeserializeRetrievedDataFromIngest">
|
||||||
<constructor-arg value="${sbn.retrieval.transfer.directory}" />
|
<constructor-arg ref="sbnRetrievalQueue" />
|
||||||
</bean>
|
</bean>
|
||||||
</constructor-arg>
|
</constructor-arg>
|
||||||
<constructor-arg>
|
<constructor-arg>
|
||||||
|
|
|
@ -19,21 +19,18 @@
|
||||||
**/
|
**/
|
||||||
package com.raytheon.uf.edex.datadelivery.retrieval.handlers;
|
package com.raytheon.uf.edex.datadelivery.retrieval.handlers;
|
||||||
|
|
||||||
import java.io.File;
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
import java.io.FileFilter;
|
|
||||||
|
|
||||||
import javax.xml.bind.JAXBException;
|
import javax.xml.bind.JAXBException;
|
||||||
|
|
||||||
import com.raytheon.edex.esb.Headers;
|
import com.raytheon.edex.esb.Headers;
|
||||||
import com.raytheon.uf.common.datadelivery.registry.Coverage;
|
import com.raytheon.uf.common.datadelivery.registry.Coverage;
|
||||||
import com.raytheon.uf.common.serialization.JAXBManager;
|
import com.raytheon.uf.common.serialization.JAXBManager;
|
||||||
import com.raytheon.uf.common.util.CollectionUtil;
|
|
||||||
import com.raytheon.uf.common.util.FileUtil;
|
|
||||||
import com.raytheon.uf.edex.datadelivery.retrieval.opendap.OpenDapRetrievalResponse;
|
import com.raytheon.uf.edex.datadelivery.retrieval.opendap.OpenDapRetrievalResponse;
|
||||||
import com.raytheon.uf.edex.wmo.message.WMOMessage;
|
import com.raytheon.uf.edex.wmo.message.WMOMessage;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Deserializes the retrieved data in a directory.
|
* Deserializes the retrieved data in a retrievalQueue.
|
||||||
*
|
*
|
||||||
* <pre>
|
* <pre>
|
||||||
*
|
*
|
||||||
|
@ -43,30 +40,25 @@ import com.raytheon.uf.edex.wmo.message.WMOMessage;
|
||||||
* ------------ ---------- ----------- --------------------------
|
* ------------ ---------- ----------- --------------------------
|
||||||
* Feb 01, 2013 1543 djohnson Initial creation
|
* Feb 01, 2013 1543 djohnson Initial creation
|
||||||
* Mar 05, 2013 1647 djohnson Remove WMO header.
|
* Mar 05, 2013 1647 djohnson Remove WMO header.
|
||||||
|
* Mar 19, 2013 1794 djohnson Read from a queue rather than the file system.
|
||||||
*
|
*
|
||||||
* </pre>
|
* </pre>
|
||||||
*
|
*
|
||||||
* @author djohnson
|
* @author djohnson
|
||||||
* @version 1.0
|
* @version 1.0
|
||||||
*/
|
*/
|
||||||
public class DeserializeRetrievedDataFromDirectory implements IRetrievalsFinder {
|
public class DeserializeRetrievedDataFromIngest implements IRetrievalsFinder {
|
||||||
|
|
||||||
private static final FileFilter NO_DIRECTORIES = new FileFilter() {
|
private final ConcurrentLinkedQueue<String> retrievalQueue;
|
||||||
@Override
|
|
||||||
public boolean accept(File pathname) {
|
|
||||||
return pathname.isFile();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
private final File directory;
|
|
||||||
|
|
||||||
private final JAXBManager jaxbManager;
|
private final JAXBManager jaxbManager;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param directory
|
* @param retrievalQueue
|
||||||
*/
|
*/
|
||||||
public DeserializeRetrievedDataFromDirectory(File directory) {
|
public DeserializeRetrievedDataFromIngest(
|
||||||
this.directory = directory;
|
ConcurrentLinkedQueue<String> retrievalQueue) {
|
||||||
|
this.retrievalQueue = retrievalQueue;
|
||||||
try {
|
try {
|
||||||
this.jaxbManager = new JAXBManager(RetrievalResponseXml.class,
|
this.jaxbManager = new JAXBManager(RetrievalResponseXml.class,
|
||||||
OpenDapRetrievalResponse.class, Coverage.class);
|
OpenDapRetrievalResponse.class, Coverage.class);
|
||||||
|
@ -81,22 +73,16 @@ public class DeserializeRetrievedDataFromDirectory implements IRetrievalsFinder
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public RetrievalResponseXml findRetrievals() throws Exception {
|
public RetrievalResponseXml findRetrievals() throws Exception {
|
||||||
|
String xml = retrievalQueue.poll();
|
||||||
|
|
||||||
final File[] files = directory.listFiles(NO_DIRECTORIES);
|
if (xml == null) {
|
||||||
|
|
||||||
if (CollectionUtil.isNullOrEmpty(files)) {
|
|
||||||
return null;
|
return null;
|
||||||
}
|
} else {
|
||||||
|
WMOMessage message = new WMOMessage(xml, new Headers());
|
||||||
final File file = files[0];
|
|
||||||
try {
|
|
||||||
WMOMessage message = new WMOMessage(FileUtil.file2bytes(file),
|
|
||||||
new Headers());
|
|
||||||
return (RetrievalResponseXml) jaxbManager
|
return (RetrievalResponseXml) jaxbManager
|
||||||
.unmarshalFromXml(new String(message.getMessageBody()));
|
.unmarshalFromXml(new String(message.getMessageBody()));
|
||||||
} finally {
|
}
|
||||||
file.delete();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -32,13 +32,16 @@ import com.raytheon.uf.common.datadelivery.retrieval.xml.Retrieval;
|
||||||
import com.raytheon.uf.common.datadelivery.retrieval.xml.RetrievalAttribute;
|
import com.raytheon.uf.common.datadelivery.retrieval.xml.RetrievalAttribute;
|
||||||
import com.raytheon.uf.common.dataplugin.PluginDataObject;
|
import com.raytheon.uf.common.dataplugin.PluginDataObject;
|
||||||
import com.raytheon.uf.common.event.EventBus;
|
import com.raytheon.uf.common.event.EventBus;
|
||||||
|
import com.raytheon.uf.common.serialization.SerializationException;
|
||||||
import com.raytheon.uf.common.status.IUFStatusHandler;
|
import com.raytheon.uf.common.status.IUFStatusHandler;
|
||||||
import com.raytheon.uf.common.status.UFStatus;
|
import com.raytheon.uf.common.status.UFStatus;
|
||||||
import com.raytheon.uf.common.util.CollectionUtil;
|
import com.raytheon.uf.common.util.CollectionUtil;
|
||||||
import com.raytheon.uf.edex.datadelivery.retrieval.ServiceTypeFactory;
|
import com.raytheon.uf.edex.datadelivery.retrieval.ServiceTypeFactory;
|
||||||
import com.raytheon.uf.edex.datadelivery.retrieval.adapters.RetrievalAdapter;
|
import com.raytheon.uf.edex.datadelivery.retrieval.adapters.RetrievalAdapter;
|
||||||
|
import com.raytheon.uf.edex.datadelivery.retrieval.adapters.RetrievalAdapter.TranslationException;
|
||||||
import com.raytheon.uf.edex.datadelivery.retrieval.db.IRetrievalDao;
|
import com.raytheon.uf.edex.datadelivery.retrieval.db.IRetrievalDao;
|
||||||
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord;
|
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord;
|
||||||
|
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecordPK;
|
||||||
import com.raytheon.uf.edex.datadelivery.retrieval.interfaces.IRetrievalResponse;
|
import com.raytheon.uf.edex.datadelivery.retrieval.interfaces.IRetrievalResponse;
|
||||||
import com.raytheon.uf.edex.datadelivery.retrieval.util.RetrievalPersistUtil;
|
import com.raytheon.uf.edex.datadelivery.retrieval.util.RetrievalPersistUtil;
|
||||||
|
|
||||||
|
@ -89,10 +92,17 @@ public class StoreRetrievedData implements IRetrievalPluginDataObjectsProcessor
|
||||||
@Override
|
@Override
|
||||||
public void processRetrievedPluginDataObjects(
|
public void processRetrievedPluginDataObjects(
|
||||||
RetrievalResponseXml retrievalPluginDataObjects)
|
RetrievalResponseXml retrievalPluginDataObjects)
|
||||||
throws Exception {
|
throws SerializationException, TranslationException {
|
||||||
Map<String, PluginDataObject[]> pluginDataObjects = Maps.newHashMap();
|
Map<String, PluginDataObject[]> pluginDataObjects = Maps.newHashMap();
|
||||||
final RetrievalRequestRecord requestRecord = retrievalDao
|
final RetrievalRequestRecordPK id = retrievalPluginDataObjects
|
||||||
.getById(retrievalPluginDataObjects.getRequestRecord());
|
.getRequestRecord();
|
||||||
|
final RetrievalRequestRecord requestRecord = retrievalDao.getById(id);
|
||||||
|
|
||||||
|
if (requestRecord == null) {
|
||||||
|
statusHandler.warn("Unable to find retrieval by id [" + id
|
||||||
|
+ "]! Retrieval will not be processed...");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
final List<RetrievalResponseWrapper> retrievalAttributePluginDataObjects = retrievalPluginDataObjects
|
final List<RetrievalResponseWrapper> retrievalAttributePluginDataObjects = retrievalPluginDataObjects
|
||||||
.getRetrievalAttributePluginDataObjects();
|
.getRetrievalAttributePluginDataObjects();
|
||||||
|
|
|
@ -22,7 +22,6 @@ package com.raytheon.uf.edex.distribution;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -61,6 +60,7 @@ import com.raytheon.uf.common.status.UFStatus.Priority;
|
||||||
* 09/01/2010 4293 cjeanbap Logging of unknown Weather Products.
|
* 09/01/2010 4293 cjeanbap Logging of unknown Weather Products.
|
||||||
* Feb 27, 2013 1638 mschenke Cleaned up localization code to fix null pointer
|
* Feb 27, 2013 1638 mschenke Cleaned up localization code to fix null pointer
|
||||||
* when no distribution files present
|
* when no distribution files present
|
||||||
|
* Mar 19, 2013 1794 djohnson PatternWrapper is immutable, add toString() to it for debugging.
|
||||||
*
|
*
|
||||||
* </pre>
|
* </pre>
|
||||||
*
|
*
|
||||||
|
@ -70,15 +70,38 @@ import com.raytheon.uf.common.status.UFStatus.Priority;
|
||||||
|
|
||||||
public class DistributionSrv {
|
public class DistributionSrv {
|
||||||
|
|
||||||
private static final transient IUFStatusHandler statusHandler = UFStatus
|
private static final IUFStatusHandler statusHandler = UFStatus
|
||||||
.getHandler(DistributionSrv.class);
|
.getHandler(DistributionSrv.class);
|
||||||
|
|
||||||
private static class PatternWrapper {
|
private static class PatternWrapper {
|
||||||
String plugin;
|
private final String plugin;
|
||||||
|
|
||||||
RequestPatterns patterns;
|
private final RequestPatterns patterns;
|
||||||
|
|
||||||
String route;
|
private final String route;
|
||||||
|
|
||||||
|
private final String displayString;
|
||||||
|
|
||||||
|
private PatternWrapper(String plugin, String route,
|
||||||
|
RequestPatterns patterns) {
|
||||||
|
this.plugin = plugin;
|
||||||
|
this.route = route;
|
||||||
|
this.patterns = patterns;
|
||||||
|
this.displayString = createDisplayString();
|
||||||
|
}
|
||||||
|
|
||||||
|
private String createDisplayString() {
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
sb.append("plugin=").append(plugin).append(", ");
|
||||||
|
sb.append("route=").append(route).append(", ");
|
||||||
|
sb.append("patterns=").append(patterns);
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return displayString;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected transient Log logger = LogFactory.getLog("Ingest");
|
protected transient Log logger = LogFactory.getLog("Ingest");
|
||||||
|
@ -86,12 +109,12 @@ public class DistributionSrv {
|
||||||
protected transient Log routeFailedLogger = LogFactory
|
protected transient Log routeFailedLogger = LogFactory
|
||||||
.getLog("RouteFailedLog");
|
.getLog("RouteFailedLog");
|
||||||
|
|
||||||
private List<PatternWrapper> pluginPatterns = new ArrayList<PatternWrapper>(
|
private final List<PatternWrapper> pluginPatterns = new ArrayList<PatternWrapper>(
|
||||||
100);
|
100);
|
||||||
|
|
||||||
private ConcurrentMap<String, PatternWrapper> patternMap = new ConcurrentHashMap<String, PatternWrapper>();
|
private final ConcurrentMap<String, PatternWrapper> patternMap = new ConcurrentHashMap<String, PatternWrapper>();
|
||||||
|
|
||||||
private ConcurrentMap<String, Long> modifiedTimes = new ConcurrentHashMap<String, Long>();
|
private final ConcurrentMap<String, Long> modifiedTimes = new ConcurrentHashMap<String, Long>();
|
||||||
|
|
||||||
public DistributionSrv() {
|
public DistributionSrv() {
|
||||||
for (File file : getDistributionFiles()) {
|
for (File file : getDistributionFiles()) {
|
||||||
|
@ -118,7 +141,9 @@ public class DistributionSrv {
|
||||||
"Change to distribution file detected. "
|
"Change to distribution file detected. "
|
||||||
+ file.getName()
|
+ file.getName()
|
||||||
+ " has been modified. Reloading distribution patterns");
|
+ " has been modified. Reloading distribution patterns");
|
||||||
wrapper.patterns = loadPatterns(file, plugin);
|
wrapper = new PatternWrapper(wrapper.plugin,
|
||||||
|
wrapper.route, loadPatterns(file, plugin));
|
||||||
|
patternMap.put(plugin, wrapper);
|
||||||
modifiedTimes.put(file.getName(), file.lastModified());
|
modifiedTimes.put(file.getName(), file.lastModified());
|
||||||
} catch (DistributionException e) {
|
} catch (DistributionException e) {
|
||||||
statusHandler.handle(Priority.PROBLEM,
|
statusHandler.handle(Priority.PROBLEM,
|
||||||
|
@ -168,18 +193,18 @@ public class DistributionSrv {
|
||||||
+ " does not have an accompanying patterns file in localization.");
|
+ " does not have an accompanying patterns file in localization.");
|
||||||
}
|
}
|
||||||
|
|
||||||
PatternWrapper wrapper = new PatternWrapper();
|
|
||||||
wrapper.plugin = pluginName;
|
|
||||||
wrapper.route = destination;
|
|
||||||
File modelFile = new File(path);
|
File modelFile = new File(path);
|
||||||
File siteModelFile = new File(sitePath);
|
File siteModelFile = new File(sitePath);
|
||||||
|
RequestPatterns patterns = null;
|
||||||
if (siteModelFile.exists()) {
|
if (siteModelFile.exists()) {
|
||||||
wrapper.patterns = loadPatterns(siteModelFile, pluginName);
|
patterns = loadPatterns(siteModelFile, pluginName);
|
||||||
} else if (modelFile.exists()) {
|
} else if (modelFile.exists()) {
|
||||||
wrapper.patterns = loadPatterns(modelFile, pluginName);
|
patterns = loadPatterns(modelFile, pluginName);
|
||||||
} else {
|
} else {
|
||||||
wrapper.patterns = new RequestPatterns();
|
patterns = new RequestPatterns();
|
||||||
}
|
}
|
||||||
|
PatternWrapper wrapper = new PatternWrapper(pluginName, destination,
|
||||||
|
patterns);
|
||||||
patternMap.put(wrapper.plugin, wrapper);
|
patternMap.put(wrapper.plugin, wrapper);
|
||||||
pluginPatterns.add(wrapper);
|
pluginPatterns.add(wrapper);
|
||||||
return this;
|
return this;
|
||||||
|
@ -252,8 +277,8 @@ public class DistributionSrv {
|
||||||
throws DistributionException {
|
throws DistributionException {
|
||||||
RequestPatterns patternSet = null;
|
RequestPatterns patternSet = null;
|
||||||
try {
|
try {
|
||||||
patternSet = (RequestPatterns) SerializationUtil
|
patternSet = SerializationUtil
|
||||||
.jaxbUnmarshalFromXmlFile(modelFile.getPath());
|
.jaxbUnmarshalFromXmlFile(RequestPatterns.class, modelFile.getPath());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new DistributionException("File "
|
throw new DistributionException("File "
|
||||||
+ modelFile.getAbsolutePath()
|
+ modelFile.getAbsolutePath()
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
package com.raytheon.uf.edex.distribution;
|
package com.raytheon.uf.edex.distribution;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
import java.util.regex.PatternSyntaxException;
|
import java.util.regex.PatternSyntaxException;
|
||||||
|
|
||||||
|
@ -35,10 +36,10 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import com.raytheon.uf.common.serialization.ISerializableObject;
|
import com.raytheon.uf.common.serialization.ISerializableObject;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A container of regular expressions, both original strings and
|
* A container of regular expressions, both original strings and the compiled
|
||||||
* the compiled patterns. Used by the DistributionSrv bean to
|
* patterns. Used by the DistributionSrv bean to store regex patterns for
|
||||||
* store regex patterns for plugins. It is important to note that
|
* plugins. It is important to note that no validation is done on duplicate
|
||||||
* no validation is done on duplicate regex patterns.
|
* regex patterns.
|
||||||
*
|
*
|
||||||
* <pre>
|
* <pre>
|
||||||
*
|
*
|
||||||
|
@ -49,6 +50,7 @@ import com.raytheon.uf.common.serialization.ISerializableObject;
|
||||||
* Oct 20, 2009 brockwoo Initial creation
|
* Oct 20, 2009 brockwoo Initial creation
|
||||||
* May 16, 2011 7317 cjeanbap Added try-catch statement
|
* May 16, 2011 7317 cjeanbap Added try-catch statement
|
||||||
* for PatternSyntaxException.
|
* for PatternSyntaxException.
|
||||||
|
* Mar 19, 2013 1794 djohnson Add toString() for debugging.
|
||||||
*
|
*
|
||||||
* </pre>
|
* </pre>
|
||||||
*
|
*
|
||||||
|
@ -64,9 +66,9 @@ public class RequestPatterns implements ISerializableObject{
|
||||||
* List of patterns requested by a plugin.
|
* List of patterns requested by a plugin.
|
||||||
*/
|
*/
|
||||||
@XmlElements( { @XmlElement(name = "regex", type = String.class) })
|
@XmlElements( { @XmlElement(name = "regex", type = String.class) })
|
||||||
private ArrayList<String> patterns;
|
private List<String> patterns = new ArrayList<String>();
|
||||||
|
|
||||||
private ArrayList<Pattern> compiledPatterns;
|
private final List<Pattern> compiledPatterns = new ArrayList<Pattern>();
|
||||||
|
|
||||||
protected transient Log patternFailedLogger = LogFactory.getLog("PatternFailedLog");
|
protected transient Log patternFailedLogger = LogFactory.getLog("PatternFailedLog");
|
||||||
|
|
||||||
|
@ -74,8 +76,6 @@ public class RequestPatterns implements ISerializableObject{
|
||||||
* Creates a new instance of the container.
|
* Creates a new instance of the container.
|
||||||
*/
|
*/
|
||||||
public RequestPatterns(){
|
public RequestPatterns(){
|
||||||
this.patterns = new ArrayList<String>();
|
|
||||||
this.compiledPatterns = new ArrayList<Pattern>();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -83,7 +83,7 @@ public class RequestPatterns implements ISerializableObject{
|
||||||
*
|
*
|
||||||
* @return a list of regex pattern strings
|
* @return a list of regex pattern strings
|
||||||
*/
|
*/
|
||||||
public ArrayList<String> getPatterns() {
|
public List<String> getPatterns() {
|
||||||
return patterns;
|
return patterns;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -92,7 +92,7 @@ public class RequestPatterns implements ISerializableObject{
|
||||||
*
|
*
|
||||||
* @param patterns an arraylist of regex strings
|
* @param patterns an arraylist of regex strings
|
||||||
*/
|
*/
|
||||||
public void setPatterns(ArrayList<String> patterns) {
|
public void setPatterns(List<String> patterns) {
|
||||||
this.patterns = patterns;
|
this.patterns = patterns;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -140,4 +140,9 @@ public class RequestPatterns implements ISerializableObject{
|
||||||
}
|
}
|
||||||
return isFound;
|
return isFound;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return patterns.toString();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,22 +19,28 @@
|
||||||
**/
|
**/
|
||||||
package com.raytheon.uf.edex.datadelivery.retrieval.handlers;
|
package com.raytheon.uf.edex.datadelivery.retrieval.handlers;
|
||||||
|
|
||||||
import static com.raytheon.uf.common.util.Matchers.hasNoFiles;
|
import static org.hamcrest.Matchers.empty;
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
import static org.hamcrest.Matchers.notNullValue;
|
import static org.hamcrest.Matchers.notNullValue;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertThat;
|
import static org.junit.Assert.assertThat;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
|
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import com.raytheon.uf.common.localization.PathManagerFactoryTest;
|
import com.raytheon.uf.common.localization.PathManagerFactoryTest;
|
||||||
|
import com.raytheon.uf.common.serialization.SerializationException;
|
||||||
|
import com.raytheon.uf.common.util.FileUtil;
|
||||||
import com.raytheon.uf.common.util.TestUtil;
|
import com.raytheon.uf.common.util.TestUtil;
|
||||||
|
import com.raytheon.uf.common.util.file.FilenameFilters;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test {@link DeserializeRetrievedDataFromDirectory}.
|
* Test {@link DeserializeRetrievedDataFromIngest}.
|
||||||
*
|
*
|
||||||
* <pre>
|
* <pre>
|
||||||
*
|
*
|
||||||
|
@ -46,18 +52,21 @@ import com.raytheon.uf.common.util.TestUtil;
|
||||||
* Feb 12, 2013 1543 djohnson Can only test the retrieval response is now not null.
|
* Feb 12, 2013 1543 djohnson Can only test the retrieval response is now not null.
|
||||||
* Feb 15, 2013 1543 djohnson Some renames.
|
* Feb 15, 2013 1543 djohnson Some renames.
|
||||||
* Mar 05, 2013 1647 djohnson Pass wmo header strategy to constructor.
|
* Mar 05, 2013 1647 djohnson Pass wmo header strategy to constructor.
|
||||||
|
* Mar 19, 2013 1794 djohnson Read from a queue rather than the file system.
|
||||||
*
|
*
|
||||||
* </pre>
|
* </pre>
|
||||||
*
|
*
|
||||||
* @author djohnson
|
* @author djohnson
|
||||||
* @version 1.0
|
* @version 1.0
|
||||||
*/
|
*/
|
||||||
public class DeserializeRetrievedDataFromDirectoryTest {
|
public class DeserializeRetrievedDataFromIngestTest {
|
||||||
private final File directory = TestUtil
|
private final File directory = TestUtil
|
||||||
.setupTestClassDir(DeserializeRetrievedDataFromDirectoryTest.class);
|
.setupTestClassDir(DeserializeRetrievedDataFromIngestTest.class);
|
||||||
|
|
||||||
private final DeserializeRetrievedDataFromDirectory service = new DeserializeRetrievedDataFromDirectory(
|
private final ConcurrentLinkedQueue<String> retrievalQueue = new ConcurrentLinkedQueue<String>();
|
||||||
directory);
|
|
||||||
|
private final DeserializeRetrievedDataFromIngest service = new DeserializeRetrievedDataFromIngest(
|
||||||
|
retrievalQueue);
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void classSetUp() {
|
public static void classSetUp() {
|
||||||
|
@ -65,28 +74,40 @@ public class DeserializeRetrievedDataFromDirectoryTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void deserializesRetrievedDataFromAFileInTheTargetDirectory()
|
public void deserializesRetrievedDataFromTheQueue()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
|
||||||
RetrievalResponseXml retrievalPluginDataObjects = RetrievalPluginDataObjectsFixture.INSTANCE
|
addRetrievalToQueue();
|
||||||
.get();
|
|
||||||
|
|
||||||
new SerializeRetrievedDataToDirectory(directory,
|
final RetrievalResponseXml restored = service.findRetrievals();
|
||||||
new AlwaysSameWmoHeader("SMYG10 LYBM 280000"))
|
|
||||||
.processRetrievedPluginDataObjects(retrievalPluginDataObjects);
|
|
||||||
|
|
||||||
final RetrievalResponseXml restored = service
|
|
||||||
.findRetrievals();
|
|
||||||
|
|
||||||
// Just make sure the payload is present
|
// Just make sure the payload is present
|
||||||
assertThat(restored.getRetrievalAttributePluginDataObjects().get(0)
|
assertThat(restored.getRetrievalAttributePluginDataObjects().get(0)
|
||||||
.getRetrievalResponse(), is(notNullValue()));
|
.getRetrievalResponse(), is(notNullValue()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void deletesFileAfterRetrievingFromTheTargetDirectory()
|
public void removesFromQueueWhileRetrieving()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
|
||||||
|
addRetrievalToQueue();
|
||||||
|
|
||||||
|
service.findRetrievals();
|
||||||
|
|
||||||
|
assertThat(retrievalQueue, is(empty()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void returnsNullWhenNothingInTheQueue() throws Exception {
|
||||||
|
|
||||||
|
final RetrievalResponseXml restored = service.findRetrievals();
|
||||||
|
|
||||||
|
assertNull(restored);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addRetrievalToQueue() throws SerializationException,
|
||||||
|
IOException {
|
||||||
RetrievalResponseXml retrievalPluginDataObjects = RetrievalPluginDataObjectsFixture.INSTANCE
|
RetrievalResponseXml retrievalPluginDataObjects = RetrievalPluginDataObjectsFixture.INSTANCE
|
||||||
.get();
|
.get();
|
||||||
|
|
||||||
|
@ -94,25 +115,8 @@ public class DeserializeRetrievedDataFromDirectoryTest {
|
||||||
new AlwaysSameWmoHeader("SMYG10 LYBM 280000"))
|
new AlwaysSameWmoHeader("SMYG10 LYBM 280000"))
|
||||||
.processRetrievedPluginDataObjects(retrievalPluginDataObjects);
|
.processRetrievedPluginDataObjects(retrievalPluginDataObjects);
|
||||||
|
|
||||||
service.findRetrievals();
|
final List<File> files = FileUtil.listFiles(directory,
|
||||||
|
FilenameFilters.ACCEPT_FILES, false);
|
||||||
assertThat(directory, hasNoFiles());
|
retrievalQueue.add(FileUtil.file2String(files.get(0)));
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void ignoresSubDirectories() throws Exception {
|
|
||||||
|
|
||||||
new File(directory, "subDir1").mkdirs();
|
|
||||||
|
|
||||||
service.findRetrievals();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void returnsNullWhenNoFileInTheTargetDirectory() throws Exception {
|
|
||||||
|
|
||||||
final RetrievalResponseXml restored = service
|
|
||||||
.findRetrievals();
|
|
||||||
|
|
||||||
assertNull(restored);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -31,6 +31,7 @@ import java.util.Arrays;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Ignore;
|
import org.junit.Ignore;
|
||||||
|
@ -53,8 +54,10 @@ import com.raytheon.uf.common.event.EventBus;
|
||||||
import com.raytheon.uf.common.localization.PathManagerFactoryTest;
|
import com.raytheon.uf.common.localization.PathManagerFactoryTest;
|
||||||
import com.raytheon.uf.common.registry.handler.RegistryHandlerException;
|
import com.raytheon.uf.common.registry.handler.RegistryHandlerException;
|
||||||
import com.raytheon.uf.common.serialization.SerializationException;
|
import com.raytheon.uf.common.serialization.SerializationException;
|
||||||
|
import com.raytheon.uf.common.util.FileUtil;
|
||||||
import com.raytheon.uf.common.util.SpringFiles;
|
import com.raytheon.uf.common.util.SpringFiles;
|
||||||
import com.raytheon.uf.common.util.TestUtil;
|
import com.raytheon.uf.common.util.TestUtil;
|
||||||
|
import com.raytheon.uf.common.util.file.FilenameFilters;
|
||||||
import com.raytheon.uf.edex.database.DataAccessLayerException;
|
import com.raytheon.uf.edex.database.DataAccessLayerException;
|
||||||
import com.raytheon.uf.edex.database.dao.DatabaseUtil;
|
import com.raytheon.uf.edex.database.dao.DatabaseUtil;
|
||||||
import com.raytheon.uf.edex.datadelivery.retrieval.ServiceTypeFactory;
|
import com.raytheon.uf.edex.datadelivery.retrieval.ServiceTypeFactory;
|
||||||
|
@ -78,6 +81,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.interfaces.IRetrievalResponse
|
||||||
* Feb 12, 2013 1543 djohnson Retrieval responses are now sent further down the chain.
|
* Feb 12, 2013 1543 djohnson Retrieval responses are now sent further down the chain.
|
||||||
* Feb 15, 2013 1543 djohnson Class renames.
|
* Feb 15, 2013 1543 djohnson Class renames.
|
||||||
* Mar 05, 2013 1647 djohnson Pass wmo header strategy to constructor.
|
* Mar 05, 2013 1647 djohnson Pass wmo header strategy to constructor.
|
||||||
|
* Mar 19, 2013 1794 djohnson RetrievalTasks integrate at a queue.
|
||||||
*
|
*
|
||||||
* </pre>
|
* </pre>
|
||||||
*
|
*
|
||||||
|
@ -221,17 +225,17 @@ public class RetrievalTaskTest {
|
||||||
IRetrievalsFinder retrievalDataFinder = new PerformRetrievalsThenReturnFinder(
|
IRetrievalsFinder retrievalDataFinder = new PerformRetrievalsThenReturnFinder(
|
||||||
Network.OPSNET, dao);
|
Network.OPSNET, dao);
|
||||||
|
|
||||||
|
final ConcurrentLinkedQueue<String> retrievalQueue = new ConcurrentLinkedQueue<String>();
|
||||||
final File testDirectory = TestUtil
|
final File testDirectory = TestUtil
|
||||||
.setupTestClassDir(RetrievalTaskTest.class);
|
.setupTestClassDir(RetrievalTaskTest.class);
|
||||||
IRetrievalPluginDataObjectsProcessor serializeToDirectory = new SerializeRetrievedDataToDirectory(
|
IRetrievalPluginDataObjectsProcessor serializeToDirectory = new SerializeRetrievedDataToDirectory(
|
||||||
testDirectory, new AlwaysSameWmoHeader(
|
testDirectory, new AlwaysSameWmoHeader("SMYG10 LYBM 280000"));
|
||||||
"SMYG10 LYBM 280000"));
|
|
||||||
|
|
||||||
RetrievalTask downloadTask = new RetrievalTask(Network.OPSNET,
|
RetrievalTask downloadTask = new RetrievalTask(Network.OPSNET,
|
||||||
retrievalDataFinder, serializeToDirectory,
|
retrievalDataFinder, serializeToDirectory,
|
||||||
mock(IRetrievalResponseCompleter.class), dao);
|
mock(IRetrievalResponseCompleter.class), dao);
|
||||||
RetrievalTask readDownloadsTask = new RetrievalTask(Network.OPSNET,
|
RetrievalTask readDownloadsTask = new RetrievalTask(Network.OPSNET,
|
||||||
new DeserializeRetrievedDataFromDirectory(testDirectory),
|
new DeserializeRetrievedDataFromIngest(retrievalQueue),
|
||||||
retrievedDataProcessor, new RetrievalResponseCompleter(
|
retrievedDataProcessor, new RetrievalResponseCompleter(
|
||||||
mock(SubscriptionNotifyTask.class), dao), dao);
|
mock(SubscriptionNotifyTask.class), dao), dao);
|
||||||
|
|
||||||
|
@ -242,6 +246,11 @@ public class RetrievalTaskTest {
|
||||||
assertThat(request.getState(), is(State.RUNNING));
|
assertThat(request.getState(), is(State.RUNNING));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (File file : FileUtil.listFiles(testDirectory,
|
||||||
|
FilenameFilters.ACCEPT_FILES, false)) {
|
||||||
|
retrievalQueue.add(FileUtil.file2String(file));
|
||||||
|
}
|
||||||
|
|
||||||
readDownloadsTask.run();
|
readDownloadsTask.run();
|
||||||
|
|
||||||
final List<RetrievalRequestRecord> allRetrievals = dao.getAll();
|
final List<RetrievalRequestRecord> allRetrievals = dao.getAll();
|
||||||
|
|
Loading…
Add table
Reference in a new issue