diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/res/spring/retrieval-datadelivery-wfo.xml b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/res/spring/retrieval-datadelivery-wfo.xml index 0ff5550767..6f514f1e9b 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/res/spring/retrieval-datadelivery-wfo.xml +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/res/spring/retrieval-datadelivery-wfo.xml @@ -35,14 +35,15 @@ - + + - + class="com.raytheon.uf.edex.datadelivery.retrieval.handlers.DeserializeRetrievedDataFromIngest"> + diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/DeserializeRetrievedDataFromDirectory.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/DeserializeRetrievedDataFromIngest.java similarity index 68% rename from edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/DeserializeRetrievedDataFromDirectory.java rename to edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/DeserializeRetrievedDataFromIngest.java index 871658ad90..d4767f5e7e 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/DeserializeRetrievedDataFromDirectory.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/DeserializeRetrievedDataFromIngest.java @@ -19,21 +19,18 @@ **/ package com.raytheon.uf.edex.datadelivery.retrieval.handlers; -import java.io.File; -import java.io.FileFilter; +import java.util.concurrent.ConcurrentLinkedQueue; import javax.xml.bind.JAXBException; import com.raytheon.edex.esb.Headers; import com.raytheon.uf.common.datadelivery.registry.Coverage; import com.raytheon.uf.common.serialization.JAXBManager; -import com.raytheon.uf.common.util.CollectionUtil; -import com.raytheon.uf.common.util.FileUtil; import com.raytheon.uf.edex.datadelivery.retrieval.opendap.OpenDapRetrievalResponse; import com.raytheon.uf.edex.wmo.message.WMOMessage; /** - * Deserializes the retrieved data in a directory. + * Deserializes the retrieved data in a retrievalQueue. * *
  * 
@@ -43,30 +40,25 @@ import com.raytheon.uf.edex.wmo.message.WMOMessage;
  * ------------ ---------- ----------- --------------------------
  * Feb 01, 2013 1543       djohnson     Initial creation
  * Mar 05, 2013 1647       djohnson     Remove WMO header.
+ * Mar 19, 2013 1794       djohnson     Read from a queue rather than the file system.
  * 
  * 
* * @author djohnson * @version 1.0 */ -public class DeserializeRetrievedDataFromDirectory implements IRetrievalsFinder { +public class DeserializeRetrievedDataFromIngest implements IRetrievalsFinder { - private static final FileFilter NO_DIRECTORIES = new FileFilter() { - @Override - public boolean accept(File pathname) { - return pathname.isFile(); - } - }; - - private final File directory; + private final ConcurrentLinkedQueue retrievalQueue; private final JAXBManager jaxbManager; /** - * @param directory + * @param retrievalQueue */ - public DeserializeRetrievedDataFromDirectory(File directory) { - this.directory = directory; + public DeserializeRetrievedDataFromIngest( + ConcurrentLinkedQueue retrievalQueue) { + this.retrievalQueue = retrievalQueue; try { this.jaxbManager = new JAXBManager(RetrievalResponseXml.class, OpenDapRetrievalResponse.class, Coverage.class); @@ -81,22 +73,16 @@ public class DeserializeRetrievedDataFromDirectory implements IRetrievalsFinder */ @Override public RetrievalResponseXml findRetrievals() throws Exception { + String xml = retrievalQueue.poll(); - final File[] files = directory.listFiles(NO_DIRECTORIES); - - if (CollectionUtil.isNullOrEmpty(files)) { + if (xml == null) { return null; - } - - final File file = files[0]; - try { - WMOMessage message = new WMOMessage(FileUtil.file2bytes(file), - new Headers()); + } else { + WMOMessage message = new WMOMessage(xml, new Headers()); return (RetrievalResponseXml) jaxbManager .unmarshalFromXml(new String(message.getMessageBody())); - } finally { - file.delete(); } + } } diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/StoreRetrievedData.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/StoreRetrievedData.java index e1af37e927..a4cd38cc9b 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/StoreRetrievedData.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/StoreRetrievedData.java @@ -32,13 +32,16 @@ import com.raytheon.uf.common.datadelivery.retrieval.xml.Retrieval; import com.raytheon.uf.common.datadelivery.retrieval.xml.RetrievalAttribute; import com.raytheon.uf.common.dataplugin.PluginDataObject; import com.raytheon.uf.common.event.EventBus; +import com.raytheon.uf.common.serialization.SerializationException; import com.raytheon.uf.common.status.IUFStatusHandler; import com.raytheon.uf.common.status.UFStatus; import com.raytheon.uf.common.util.CollectionUtil; import com.raytheon.uf.edex.datadelivery.retrieval.ServiceTypeFactory; import com.raytheon.uf.edex.datadelivery.retrieval.adapters.RetrievalAdapter; +import com.raytheon.uf.edex.datadelivery.retrieval.adapters.RetrievalAdapter.TranslationException; import com.raytheon.uf.edex.datadelivery.retrieval.db.IRetrievalDao; import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord; +import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecordPK; import com.raytheon.uf.edex.datadelivery.retrieval.interfaces.IRetrievalResponse; import com.raytheon.uf.edex.datadelivery.retrieval.util.RetrievalPersistUtil; @@ -89,10 +92,17 @@ public class StoreRetrievedData implements IRetrievalPluginDataObjectsProcessor @Override public void processRetrievedPluginDataObjects( RetrievalResponseXml retrievalPluginDataObjects) - throws Exception { + throws SerializationException, TranslationException { Map pluginDataObjects = Maps.newHashMap(); - final RetrievalRequestRecord requestRecord = retrievalDao - .getById(retrievalPluginDataObjects.getRequestRecord()); + final RetrievalRequestRecordPK id = retrievalPluginDataObjects + .getRequestRecord(); + final RetrievalRequestRecord requestRecord = retrievalDao.getById(id); + + if (requestRecord == null) { + statusHandler.warn("Unable to find retrieval by id [" + id + + "]! Retrieval will not be processed..."); + return; + } final List retrievalAttributePluginDataObjects = retrievalPluginDataObjects .getRetrievalAttributePluginDataObjects(); diff --git a/edexOsgi/com.raytheon.uf.edex.distribution/src/com/raytheon/uf/edex/distribution/DistributionSrv.java b/edexOsgi/com.raytheon.uf.edex.distribution/src/com/raytheon/uf/edex/distribution/DistributionSrv.java index 8854f77e41..6d0cd35200 100644 --- a/edexOsgi/com.raytheon.uf.edex.distribution/src/com/raytheon/uf/edex/distribution/DistributionSrv.java +++ b/edexOsgi/com.raytheon.uf.edex.distribution/src/com/raytheon/uf/edex/distribution/DistributionSrv.java @@ -22,7 +22,6 @@ package com.raytheon.uf.edex.distribution; import java.io.File; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -61,6 +60,7 @@ import com.raytheon.uf.common.status.UFStatus.Priority; * 09/01/2010 4293 cjeanbap Logging of unknown Weather Products. * Feb 27, 2013 1638 mschenke Cleaned up localization code to fix null pointer * when no distribution files present + * Mar 19, 2013 1794 djohnson PatternWrapper is immutable, add toString() to it for debugging. * * * @@ -70,15 +70,38 @@ import com.raytheon.uf.common.status.UFStatus.Priority; public class DistributionSrv { - private static final transient IUFStatusHandler statusHandler = UFStatus + private static final IUFStatusHandler statusHandler = UFStatus .getHandler(DistributionSrv.class); private static class PatternWrapper { - String plugin; + private final String plugin; - RequestPatterns patterns; + private final RequestPatterns patterns; - String route; + private final String route; + + private final String displayString; + + private PatternWrapper(String plugin, String route, + RequestPatterns patterns) { + this.plugin = plugin; + this.route = route; + this.patterns = patterns; + this.displayString = createDisplayString(); + } + + private String createDisplayString() { + StringBuilder sb = new StringBuilder(); + sb.append("plugin=").append(plugin).append(", "); + sb.append("route=").append(route).append(", "); + sb.append("patterns=").append(patterns); + return sb.toString(); + } + + @Override + public String toString() { + return displayString; + } } protected transient Log logger = LogFactory.getLog("Ingest"); @@ -86,12 +109,12 @@ public class DistributionSrv { protected transient Log routeFailedLogger = LogFactory .getLog("RouteFailedLog"); - private List pluginPatterns = new ArrayList( + private final List pluginPatterns = new ArrayList( 100); - private ConcurrentMap patternMap = new ConcurrentHashMap(); + private final ConcurrentMap patternMap = new ConcurrentHashMap(); - private ConcurrentMap modifiedTimes = new ConcurrentHashMap(); + private final ConcurrentMap modifiedTimes = new ConcurrentHashMap(); public DistributionSrv() { for (File file : getDistributionFiles()) { @@ -118,7 +141,9 @@ public class DistributionSrv { "Change to distribution file detected. " + file.getName() + " has been modified. Reloading distribution patterns"); - wrapper.patterns = loadPatterns(file, plugin); + wrapper = new PatternWrapper(wrapper.plugin, + wrapper.route, loadPatterns(file, plugin)); + patternMap.put(plugin, wrapper); modifiedTimes.put(file.getName(), file.lastModified()); } catch (DistributionException e) { statusHandler.handle(Priority.PROBLEM, @@ -168,18 +193,18 @@ public class DistributionSrv { + " does not have an accompanying patterns file in localization."); } - PatternWrapper wrapper = new PatternWrapper(); - wrapper.plugin = pluginName; - wrapper.route = destination; File modelFile = new File(path); File siteModelFile = new File(sitePath); + RequestPatterns patterns = null; if (siteModelFile.exists()) { - wrapper.patterns = loadPatterns(siteModelFile, pluginName); + patterns = loadPatterns(siteModelFile, pluginName); } else if (modelFile.exists()) { - wrapper.patterns = loadPatterns(modelFile, pluginName); + patterns = loadPatterns(modelFile, pluginName); } else { - wrapper.patterns = new RequestPatterns(); + patterns = new RequestPatterns(); } + PatternWrapper wrapper = new PatternWrapper(pluginName, destination, + patterns); patternMap.put(wrapper.plugin, wrapper); pluginPatterns.add(wrapper); return this; @@ -252,8 +277,8 @@ public class DistributionSrv { throws DistributionException { RequestPatterns patternSet = null; try { - patternSet = (RequestPatterns) SerializationUtil - .jaxbUnmarshalFromXmlFile(modelFile.getPath()); + patternSet = SerializationUtil + .jaxbUnmarshalFromXmlFile(RequestPatterns.class, modelFile.getPath()); } catch (Exception e) { throw new DistributionException("File " + modelFile.getAbsolutePath() diff --git a/edexOsgi/com.raytheon.uf.edex.distribution/src/com/raytheon/uf/edex/distribution/RequestPatterns.java b/edexOsgi/com.raytheon.uf.edex.distribution/src/com/raytheon/uf/edex/distribution/RequestPatterns.java index ef636ed2b3..139602ca98 100644 --- a/edexOsgi/com.raytheon.uf.edex.distribution/src/com/raytheon/uf/edex/distribution/RequestPatterns.java +++ b/edexOsgi/com.raytheon.uf.edex.distribution/src/com/raytheon/uf/edex/distribution/RequestPatterns.java @@ -20,6 +20,7 @@ package com.raytheon.uf.edex.distribution; import java.util.ArrayList; +import java.util.List; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; @@ -35,25 +36,26 @@ import org.apache.commons.logging.LogFactory; import com.raytheon.uf.common.serialization.ISerializableObject; /** - * A container of regular expressions, both original strings and - * the compiled patterns. Used by the DistributionSrv bean to - * store regex patterns for plugins. It is important to note that - * no validation is done on duplicate regex patterns. + * A container of regular expressions, both original strings and the compiled + * patterns. Used by the DistributionSrv bean to store regex patterns for + * plugins. It is important to note that no validation is done on duplicate + * regex patterns. * *
- *
+ * 
  * SOFTWARE HISTORY
- *
+ * 
  * Date         Ticket#    Engineer    Description
  * ------------ ---------- ----------- --------------------------
  * Oct 20, 2009            brockwoo     Initial creation
  * May 16, 2011 7317       cjeanbap     Added try-catch statement
  *                                      for PatternSyntaxException.
- *
+ * Mar 19, 2013 1794       djohnson     Add toString() for debugging.
+ * 
  * 
- * + * * @author brockwoo - * @version 1.0 + * @version 1.0 */ @XmlRootElement(name = "requestPatterns") @@ -64,9 +66,9 @@ public class RequestPatterns implements ISerializableObject{ * List of patterns requested by a plugin. */ @XmlElements( { @XmlElement(name = "regex", type = String.class) }) - private ArrayList patterns; + private List patterns = new ArrayList(); - private ArrayList compiledPatterns; + private final List compiledPatterns = new ArrayList(); protected transient Log patternFailedLogger = LogFactory.getLog("PatternFailedLog"); @@ -74,8 +76,6 @@ public class RequestPatterns implements ISerializableObject{ * Creates a new instance of the container. */ public RequestPatterns(){ - this.patterns = new ArrayList(); - this.compiledPatterns = new ArrayList(); } /** @@ -83,7 +83,7 @@ public class RequestPatterns implements ISerializableObject{ * * @return a list of regex pattern strings */ - public ArrayList getPatterns() { + public List getPatterns() { return patterns; } @@ -92,7 +92,7 @@ public class RequestPatterns implements ISerializableObject{ * * @param patterns an arraylist of regex strings */ - public void setPatterns(ArrayList patterns) { + public void setPatterns(List patterns) { this.patterns = patterns; } @@ -140,4 +140,9 @@ public class RequestPatterns implements ISerializableObject{ } return isFound; } + + @Override + public String toString() { + return patterns.toString(); + } } diff --git a/tests/unit/com/raytheon/uf/edex/datadelivery/retrieval/handlers/DeserializeRetrievedDataFromDirectoryTest.java b/tests/unit/com/raytheon/uf/edex/datadelivery/retrieval/handlers/DeserializeRetrievedDataFromIngestTest.java similarity index 62% rename from tests/unit/com/raytheon/uf/edex/datadelivery/retrieval/handlers/DeserializeRetrievedDataFromDirectoryTest.java rename to tests/unit/com/raytheon/uf/edex/datadelivery/retrieval/handlers/DeserializeRetrievedDataFromIngestTest.java index 3667a9d91e..72afe2b0ed 100644 --- a/tests/unit/com/raytheon/uf/edex/datadelivery/retrieval/handlers/DeserializeRetrievedDataFromDirectoryTest.java +++ b/tests/unit/com/raytheon/uf/edex/datadelivery/retrieval/handlers/DeserializeRetrievedDataFromIngestTest.java @@ -19,22 +19,28 @@ **/ package com.raytheon.uf.edex.datadelivery.retrieval.handlers; -import static com.raytheon.uf.common.util.Matchers.hasNoFiles; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; import org.junit.BeforeClass; import org.junit.Test; import com.raytheon.uf.common.localization.PathManagerFactoryTest; +import com.raytheon.uf.common.serialization.SerializationException; +import com.raytheon.uf.common.util.FileUtil; import com.raytheon.uf.common.util.TestUtil; +import com.raytheon.uf.common.util.file.FilenameFilters; /** - * Test {@link DeserializeRetrievedDataFromDirectory}. + * Test {@link DeserializeRetrievedDataFromIngest}. * *
  * 
@@ -46,18 +52,21 @@ import com.raytheon.uf.common.util.TestUtil;
  * Feb 12, 2013 1543       djohnson     Can only test the retrieval response is now not null.
  * Feb 15, 2013 1543       djohnson     Some renames.
  * Mar 05, 2013 1647       djohnson     Pass wmo header strategy to constructor.
+ * Mar 19, 2013 1794       djohnson     Read from a queue rather than the file system.
  * 
  * 
* * @author djohnson * @version 1.0 */ -public class DeserializeRetrievedDataFromDirectoryTest { +public class DeserializeRetrievedDataFromIngestTest { private final File directory = TestUtil - .setupTestClassDir(DeserializeRetrievedDataFromDirectoryTest.class); + .setupTestClassDir(DeserializeRetrievedDataFromIngestTest.class); - private final DeserializeRetrievedDataFromDirectory service = new DeserializeRetrievedDataFromDirectory( - directory); + private final ConcurrentLinkedQueue retrievalQueue = new ConcurrentLinkedQueue(); + + private final DeserializeRetrievedDataFromIngest service = new DeserializeRetrievedDataFromIngest( + retrievalQueue); @BeforeClass public static void classSetUp() { @@ -65,28 +74,40 @@ public class DeserializeRetrievedDataFromDirectoryTest { } @Test - public void deserializesRetrievedDataFromAFileInTheTargetDirectory() + public void deserializesRetrievedDataFromTheQueue() throws Exception { - RetrievalResponseXml retrievalPluginDataObjects = RetrievalPluginDataObjectsFixture.INSTANCE - .get(); + addRetrievalToQueue(); - new SerializeRetrievedDataToDirectory(directory, - new AlwaysSameWmoHeader("SMYG10 LYBM 280000")) - .processRetrievedPluginDataObjects(retrievalPluginDataObjects); - - final RetrievalResponseXml restored = service - .findRetrievals(); + final RetrievalResponseXml restored = service.findRetrievals(); // Just make sure the payload is present assertThat(restored.getRetrievalAttributePluginDataObjects().get(0) .getRetrievalResponse(), is(notNullValue())); } + @Test - public void deletesFileAfterRetrievingFromTheTargetDirectory() + public void removesFromQueueWhileRetrieving() throws Exception { + addRetrievalToQueue(); + + service.findRetrievals(); + + assertThat(retrievalQueue, is(empty())); + } + + @Test + public void returnsNullWhenNothingInTheQueue() throws Exception { + + final RetrievalResponseXml restored = service.findRetrievals(); + + assertNull(restored); + } + + private void addRetrievalToQueue() throws SerializationException, + IOException { RetrievalResponseXml retrievalPluginDataObjects = RetrievalPluginDataObjectsFixture.INSTANCE .get(); @@ -94,25 +115,8 @@ public class DeserializeRetrievedDataFromDirectoryTest { new AlwaysSameWmoHeader("SMYG10 LYBM 280000")) .processRetrievedPluginDataObjects(retrievalPluginDataObjects); - service.findRetrievals(); - - assertThat(directory, hasNoFiles()); - } - - @Test - public void ignoresSubDirectories() throws Exception { - - new File(directory, "subDir1").mkdirs(); - - service.findRetrievals(); - } - - @Test - public void returnsNullWhenNoFileInTheTargetDirectory() throws Exception { - - final RetrievalResponseXml restored = service - .findRetrievals(); - - assertNull(restored); + final List files = FileUtil.listFiles(directory, + FilenameFilters.ACCEPT_FILES, false); + retrievalQueue.add(FileUtil.file2String(files.get(0))); } } diff --git a/tests/unit/com/raytheon/uf/edex/datadelivery/retrieval/handlers/RetrievalTaskTest.java b/tests/unit/com/raytheon/uf/edex/datadelivery/retrieval/handlers/RetrievalTaskTest.java index 28e7e1c663..29b8f7f087 100644 --- a/tests/unit/com/raytheon/uf/edex/datadelivery/retrieval/handlers/RetrievalTaskTest.java +++ b/tests/unit/com/raytheon/uf/edex/datadelivery/retrieval/handlers/RetrievalTaskTest.java @@ -31,6 +31,7 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; import org.junit.Before; import org.junit.Ignore; @@ -53,8 +54,10 @@ import com.raytheon.uf.common.event.EventBus; import com.raytheon.uf.common.localization.PathManagerFactoryTest; import com.raytheon.uf.common.registry.handler.RegistryHandlerException; import com.raytheon.uf.common.serialization.SerializationException; +import com.raytheon.uf.common.util.FileUtil; import com.raytheon.uf.common.util.SpringFiles; import com.raytheon.uf.common.util.TestUtil; +import com.raytheon.uf.common.util.file.FilenameFilters; import com.raytheon.uf.edex.database.DataAccessLayerException; import com.raytheon.uf.edex.database.dao.DatabaseUtil; import com.raytheon.uf.edex.datadelivery.retrieval.ServiceTypeFactory; @@ -78,6 +81,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.interfaces.IRetrievalResponse * Feb 12, 2013 1543 djohnson Retrieval responses are now sent further down the chain. * Feb 15, 2013 1543 djohnson Class renames. * Mar 05, 2013 1647 djohnson Pass wmo header strategy to constructor. + * Mar 19, 2013 1794 djohnson RetrievalTasks integrate at a queue. * * * @@ -221,17 +225,17 @@ public class RetrievalTaskTest { IRetrievalsFinder retrievalDataFinder = new PerformRetrievalsThenReturnFinder( Network.OPSNET, dao); + final ConcurrentLinkedQueue retrievalQueue = new ConcurrentLinkedQueue(); final File testDirectory = TestUtil .setupTestClassDir(RetrievalTaskTest.class); IRetrievalPluginDataObjectsProcessor serializeToDirectory = new SerializeRetrievedDataToDirectory( - testDirectory, new AlwaysSameWmoHeader( - "SMYG10 LYBM 280000")); + testDirectory, new AlwaysSameWmoHeader("SMYG10 LYBM 280000")); RetrievalTask downloadTask = new RetrievalTask(Network.OPSNET, retrievalDataFinder, serializeToDirectory, mock(IRetrievalResponseCompleter.class), dao); RetrievalTask readDownloadsTask = new RetrievalTask(Network.OPSNET, - new DeserializeRetrievedDataFromDirectory(testDirectory), + new DeserializeRetrievedDataFromIngest(retrievalQueue), retrievedDataProcessor, new RetrievalResponseCompleter( mock(SubscriptionNotifyTask.class), dao), dao); @@ -242,6 +246,11 @@ public class RetrievalTaskTest { assertThat(request.getState(), is(State.RUNNING)); } + for (File file : FileUtil.listFiles(testDirectory, + FilenameFilters.ACCEPT_FILES, false)) { + retrievalQueue.add(FileUtil.file2String(file)); + } + readDownloadsTask.run(); final List allRetrievals = dao.getAll();