diff --git a/edexOsgi/com.raytheon.uf.common.datadelivery.registry/src/com/raytheon/uf/common/datadelivery/registry/GroupDefinition.java b/edexOsgi/com.raytheon.uf.common.datadelivery.registry/src/com/raytheon/uf/common/datadelivery/registry/GroupDefinition.java index 0f70547803..939cc96c49 100644 --- a/edexOsgi/com.raytheon.uf.common.datadelivery.registry/src/com/raytheon/uf/common/datadelivery/registry/GroupDefinition.java +++ b/edexOsgi/com.raytheon.uf.common.datadelivery.registry/src/com/raytheon/uf/common/datadelivery/registry/GroupDefinition.java @@ -15,6 +15,7 @@ import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; import org.geotools.geometry.jts.ReferencedEnvelope; import com.raytheon.uf.common.registry.annotations.RegistryObject; +import com.raytheon.uf.common.registry.annotations.RegistryObjectVersion; import com.raytheon.uf.common.registry.annotations.SlotAttribute; import com.raytheon.uf.common.serialization.adapters.ReferencedEnvelopeAdapter; import com.raytheon.uf.common.serialization.annotations.DynamicSerialize; @@ -36,6 +37,7 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement; * Jan 02, 2013 1441 djohnson Add constants. * Apr 08, 2013 1826 djohnson Remove delivery options. * May 22, 2013 1650 djohnson Remove option instance variable. + * Feb 4, 2014 2686 dhladky This one got missed previously. * * * @@ -46,6 +48,7 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement; @XmlAccessorType(XmlAccessType.NONE) @DynamicSerialize @RegistryObject({ GroupDefinition.GROUP_NAME_SLOT }) +@RegistryObjectVersion(value = 1.0f) public class GroupDefinition { public static final String GROUP_NAME_SLOT = "groupName"; diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth.ncf/res/spring/bandwidth-datadelivery-edex-impl-monolithic.xml b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth.ncf/res/spring/bandwidth-datadelivery-edex-impl-monolithic.xml index a42705ae89..a6e3c69731 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth.ncf/res/spring/bandwidth-datadelivery-edex-impl-monolithic.xml +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth.ncf/res/spring/bandwidth-datadelivery-edex-impl-monolithic.xml @@ -33,7 +33,6 @@ - - - - - - - - - - - - - + + + \ No newline at end of file diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth.ncf/res/spring/bandwidth-datadelivery-edex-impl-ncf.xml b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth.ncf/res/spring/bandwidth-datadelivery-edex-impl-ncf.xml index 9f80e87e40..d4420b4679 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth.ncf/res/spring/bandwidth-datadelivery-edex-impl-ncf.xml +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth.ncf/res/spring/bandwidth-datadelivery-edex-impl-ncf.xml @@ -52,15 +52,10 @@ - - - - - - + \ No newline at end of file diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/res/spring/bandwidth-datadelivery-edex-impl-wfo.xml b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/res/spring/bandwidth-datadelivery-edex-impl-wfo.xml index 4b9dfa14fa..a714a13ed2 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/res/spring/bandwidth-datadelivery-edex-impl-wfo.xml +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/res/spring/bandwidth-datadelivery-edex-impl-wfo.xml @@ -52,7 +52,6 @@ - - - - - - - - - - - + + \ No newline at end of file diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/BandwidthManager.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/BandwidthManager.java index 4a87154047..b14fbc558d 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/BandwidthManager.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/BandwidthManager.java @@ -141,6 +141,7 @@ import com.raytheon.uf.edex.registry.ebxml.exception.EbxmlRegistryException; * Jan 25, 2014 2636 mpduff Don't do an initial adhoc query for a new subscription. * Jan 24, 2013 2709 bgonzale Before scheduling adhoc, check if in active period window. * Jan 29, 2014 2636 mpduff Scheduling refactor. + * Jan 30, 2014 2686 dhladky refactor of retrieval. * * * @@ -629,8 +630,6 @@ public abstract class BandwidthManager } } - retrievalManager.wakeAgents(); - return unscheduled; } diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/EdexBandwidthManager.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/EdexBandwidthManager.java index aad924b21f..3ef10a119d 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/EdexBandwidthManager.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/EdexBandwidthManager.java @@ -119,6 +119,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil; * Jan 20, 2013 2398 dhladky Fixed rescheduling beyond active period/expired window. * Jan 24, 2013 2709 bgonzale Changed parameter to shouldScheduleForTime to a Calendar. * Jan 29, 2014 2636 mpduff Scheduling refactor. + * Jan 30, 2014 2686 dhladky refactor of retrieval. * * * @@ -672,8 +673,6 @@ public abstract class EdexBandwidthManager } } - // Notify RetrievalAgentManager of updated RetrievalRequests. - retrievalManager.wakeAgents(); } else { if (statusHandler.isPriorityEnabled(Priority.DEBUG)) { statusHandler diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/processing/SimpleSubscriptionAggregator.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/processing/SimpleSubscriptionAggregator.java index ab73784e6a..5d711b090f 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/processing/SimpleSubscriptionAggregator.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/processing/SimpleSubscriptionAggregator.java @@ -31,6 +31,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthSubscription; import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDao; import com.raytheon.uf.edex.datadelivery.bandwidth.dao.SubscriptionRetrieval; import com.raytheon.uf.edex.datadelivery.bandwidth.interfaces.ISubscriptionAggregator; +import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalAgent; import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalStatus; import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.SubscriptionRetrievalAgent; import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil; @@ -53,6 +54,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil; * Jun 13, 2013 2095 djohnson No need to query the database, we are only receiving new bandwidth subscriptions. * Jul 11, 2013 2106 djohnson aggregate() signature changed. * Jan 06, 2014 2636 mpduff Changed how data set offset is set. + * Jan 30, 2014 2686 dhladky refactor of retrieval. * * * @author jspinks @@ -90,7 +92,7 @@ public class SimpleSubscriptionAggregator implements ISubscriptionAggregator { subscriptionRetrieval.setBandwidthSubscription(subDao); subscriptionRetrieval.setNetwork(subDao.getRoute()); subscriptionRetrieval - .setAgentType(SubscriptionRetrievalAgent.SUBSCRIPTION_AGENT); + .setAgentType(RetrievalAgent.SUBSCRIPTION_AGENT); subscriptionRetrieval.setStatus(RetrievalStatus.PROCESSING); subscriptionRetrieval.setPriority(subDao.getPriority()); subscriptionRetrieval.setEstimatedSize(subDao.getEstimatedSize()); diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/RetrievalAgent.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/RetrievalAgent.java index 060240d089..1abd643f1e 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/RetrievalAgent.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/RetrievalAgent.java @@ -22,6 +22,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthAllocation; * separate work method from loop control. * Nov 09, 2012 1286 djohnson Add ability to kill the threads when BandwidthManager instance is replaced. * Mar 05, 2013 1647 djohnson Sleep one minute between checks. + * Jan 30, 2014 2686 dhladky refactor of retrieval. * * * @@ -36,6 +37,8 @@ public abstract class RetrievalAgent * @@ -225,19 +226,6 @@ public class RetrievalManager { this.shutdown = true; } - /** - * Wake up the AgentManager for the RetrievalManager. - */ - public void wakeAgents() { - // This is currently a duplication of wake() in RetrievalAgentManager, - // because there was a circular dependency in the Spring config files... - // Can the object graph be made a little cleaner? - synchronized (notifier) { - statusHandler.info("Waking up retrieval threads"); - notifier.notifyAll(); - } - } - /** * @param fromRetrievalManager */ diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/RetrievalPlan.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/RetrievalPlan.java index 573367f52a..eca63ae937 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/RetrievalPlan.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/RetrievalPlan.java @@ -380,6 +380,7 @@ public class RetrievalPlan { for (BandwidthBucket bucket : buckets) { reservation = associator.getNextReservation(bucket, agentType); if (reservation != null) { + //TODO: do validity check for expired allocations break; } } diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/SubscriptionRetrievalAgent.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/SubscriptionRetrievalAgent.java index bd56dbe5ab..f78611b7b3 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/SubscriptionRetrievalAgent.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/SubscriptionRetrievalAgent.java @@ -6,9 +6,7 @@ package com.raytheon.uf.edex.datadelivery.bandwidth.retrieval; import java.util.ArrayList; import java.util.Date; import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; -import com.google.common.annotations.VisibleForTesting; import com.raytheon.uf.common.datadelivery.registry.Network; import com.raytheon.uf.common.datadelivery.registry.Provider; import com.raytheon.uf.common.datadelivery.registry.ProviderType; @@ -26,7 +24,6 @@ import com.raytheon.uf.common.status.UFStatus.Priority; import com.raytheon.uf.common.time.util.ITimer; import com.raytheon.uf.common.time.util.TimeUtil; import com.raytheon.uf.common.util.CollectionUtil; -import com.raytheon.uf.edex.core.EDEXUtil; import com.raytheon.uf.edex.core.EdexException; import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDao; import com.raytheon.uf.edex.datadelivery.bandwidth.dao.SubscriptionRetrieval; @@ -36,6 +33,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.ServiceTypeFactory; import com.raytheon.uf.edex.datadelivery.retrieval.db.IRetrievalDao; import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord; import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecordPK; +import com.raytheon.uf.edex.datadelivery.retrieval.util.RetrievalGeneratorUtilities; /** * Class used to process SubscriptionRetrieval BandwidthAllocations. @@ -56,6 +54,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecordPK; * Jan 15, 2014 2678 bgonzale Use Queue for passing RetrievalRequestRecords to the * RetrievalTasks (PerformRetrievalsThenReturnFinder). * Added constructor that sets the retrievalQueue to null. + * Jan 30, 2014 2686 dhladky refactor of retrieval. * * * @@ -67,44 +66,29 @@ public class SubscriptionRetrievalAgent extends private static final IUFStatusHandler statusHandler = UFStatus .getHandler(SubscriptionRetrievalAgent.class); - public static final String SUBSCRIPTION_AGENT = "SubscriptionAgent"; - private final int defaultPriority; - private final IBandwidthDao bandwidthDao; + private final IBandwidthDao bandwidthDao; private final IRetrievalDao retrievalDao; private final IProviderHandler providerHandler; - private final ConcurrentLinkedQueue retrievalQueue; - public SubscriptionRetrievalAgent(Network network, String destinationUri, final Object notifier, int defaultPriority, - RetrievalManager retrievalManager, IBandwidthDao bandwidthDao, + RetrievalManager retrievalManager, IBandwidthDao bandwidthDao, IRetrievalDao retrievalDao, IProviderHandler providerHandler) { - this(network, destinationUri, notifier, defaultPriority, - retrievalManager, bandwidthDao, retrievalDao, providerHandler, - null); - } - - public SubscriptionRetrievalAgent(Network network, String destinationUri, - final Object notifier, int defaultPriority, - RetrievalManager retrievalManager, IBandwidthDao bandwidthDao, - IRetrievalDao retrievalDao, IProviderHandler providerHandler, - ConcurrentLinkedQueue retrievalQueue) { super(network, destinationUri, notifier, retrievalManager); this.defaultPriority = defaultPriority; this.bandwidthDao = bandwidthDao; this.retrievalDao = retrievalDao; this.providerHandler = providerHandler; - this.retrievalQueue = retrievalQueue; } @Override void processAllocation(SubscriptionRetrieval retrieval) throws EdexException { - Subscription sub; + Subscription sub; try { sub = bandwidthDao.getSubscriptionRetrievalAttributes(retrieval) .getSubscription(); @@ -127,19 +111,28 @@ public class SubscriptionRetrievalAgent extends bundle.setConnection(provider.getConnection()); bundle.setSubscription(sub); - retrieval.setActualStart(TimeUtil.newCalendar()); + retrieval.setActualStart(TimeUtil.newGmtCalendar()); retrieval.setStatus(RetrievalStatus.RETRIEVAL); // update database bandwidthDao.update(retrieval); - // Handler will pipeline the Retrieval Objects created to the - // Database where the pool of RetrievalTasks will process them - boolean retrievalsGenerated = generateRetrieval(bundle, + // generateRetrieval will pipeline the RetrievalRecord Objects created to the DB. + // The PK objects returned are sent to the RetrievalQueue for processing. + List retrievals = generateRetrieval(bundle, retrieval.getIdentifier()); - if (retrievalsGenerated) { - // Wake the RetrievalTasks to fetch the data.. - wakeRetrievalTasks(); + + if (!CollectionUtil.isNullOrEmpty(retrievals)) { + try { + Object[] payload = retrievals.toArray(); + RetrievalGeneratorUtilities.sendToRetrieval(destinationUri, + network, payload); + } catch (Exception e) { + statusHandler.handle(Priority.PROBLEM, + "Couldn't send RetrievalRecords to Queue!", e); + } + statusHandler.info("Sent " + retrievals.size() + + " retrievals to queue. " + network.toString()); } else { // Normally this is the job of the SubscriptionNotifyTask, but if no // retrievals were generated we have to send it manually @@ -148,12 +141,7 @@ public class SubscriptionRetrievalAgent extends EventBus.publish(retrievalManagerNotifyEvent); } } - - @VisibleForTesting - void wakeRetrievalTasks() throws EdexException { - EDEXUtil.getMessageProducer().sendAsync(destinationUri, null); - } - + @Override protected String getAgentType() { return SUBSCRIPTION_AGENT; @@ -176,7 +164,7 @@ public class SubscriptionRetrievalAgent extends * the subscription retrieval key * @return true if retrievals were generated (and waiting to be processed) */ - private boolean generateRetrieval(SubscriptionBundle bundle, + private List generateRetrieval(SubscriptionBundle bundle, Long subRetrievalKey) { // process the bundle into a retrieval @@ -188,8 +176,10 @@ public class SubscriptionRetrievalAgent extends + " Being Processed for Retrieval..."); List retrievals = rg.buildRetrieval(bundle); - + List requestRecords = null; + List requestRecordPKs = null; boolean retrievalsGenerated = !CollectionUtil.isNullOrEmpty(retrievals); + if (retrievalsGenerated) { String owner = bundle.getSubscription().getOwner(); @@ -197,9 +187,10 @@ public class SubscriptionRetrievalAgent extends int priority = (bundle.getPriority() != null) ? bundle .getPriority().getPriorityValue() : defaultPriority; - Date insertTime = TimeUtil.newCalendar().getTime(); - - List requestRecords = new ArrayList( + Date insertTime = TimeUtil.newDate(); + requestRecords = new ArrayList( + retrievals.size()); + requestRecordPKs = new ArrayList( retrievals.size()); ITimer timer = TimeUtil.getTimer(); @@ -226,9 +217,8 @@ public class SubscriptionRetrievalAgent extends rec.setRetrieval(SerializationUtil .transformToThrift(retrieval)); rec.setState(RetrievalRequestRecord.State.PENDING); - if (retrievalQueue != null) { - retrievalQueue.add(rec.getId()); - } + requestRecords.add(rec); + requestRecordPKs.add(rec.getId()); } catch (Exception e) { statusHandler.error("Subscription: " + subscriptionName + " Failed to serialize request [" + retrieval @@ -236,8 +226,6 @@ public class SubscriptionRetrievalAgent extends rec.setRetrieval(new byte[0]); rec.setState(RetrievalRequestRecord.State.FAILED); } - - requestRecords.add(rec); } timer.stop(); @@ -258,13 +246,14 @@ public class SubscriptionRetrievalAgent extends statusHandler.handle(Priority.WARN, "Subscription: " + subscriptionName + " Failed to store to retrievals.", e); + requestRecordPKs.clear(); } } else { statusHandler.warn("Subscription: " + subscriptionName + " Did not generate any retrieval messages"); } - return retrievalsGenerated; + return requestRecordPKs; } private Provider getProvider(String providerName) { @@ -276,4 +265,5 @@ public class SubscriptionRetrievalAgent extends return null; } } + } diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/res/spring/retrieval-datadelivery-monolithic.xml b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/res/spring/retrieval-datadelivery-monolithic.xml index 1ee204a9ad..abd71900a6 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/res/spring/retrieval-datadelivery-monolithic.xml +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/res/spring/retrieval-datadelivery-monolithic.xml @@ -8,13 +8,12 @@ 2) What to do with found retrievals, in this case process it and send a notification event 3) How to complete retrievals, in this case update the database and send a notification event --> - + - @@ -35,13 +34,12 @@ - + - @@ -60,11 +58,10 @@ - - - - - + + + + + \ No newline at end of file diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/res/spring/retrieval-datadelivery-ncf.xml b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/res/spring/retrieval-datadelivery-ncf.xml index 38e3bc0f9d..abc5485de8 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/res/spring/retrieval-datadelivery-ncf.xml +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/res/spring/retrieval-datadelivery-ncf.xml @@ -8,13 +8,12 @@ 2) What to do with found retrievals, in this case process it and send a notification event 3) How to complete retrievals, in this case update the database and send a notification event --> - + - @@ -41,10 +40,9 @@ - - - - + + + + \ No newline at end of file diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/res/spring/retrieval-datadelivery-wfo.xml b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/res/spring/retrieval-datadelivery-wfo.xml index 40145e5743..51be2289d4 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/res/spring/retrieval-datadelivery-wfo.xml +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/res/spring/retrieval-datadelivery-wfo.xml @@ -3,18 +3,17 @@ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd"> - - + - @@ -35,14 +34,12 @@ - - + - @@ -52,16 +49,18 @@ - - + + + + + - - - - - + + + + + \ No newline at end of file diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/res/spring/retrieval-datadelivery.xml b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/res/spring/retrieval-datadelivery.xml index 5d2f29ccf7..031ca60b3e 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/res/spring/retrieval-datadelivery.xml +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/res/spring/retrieval-datadelivery.xml @@ -52,22 +52,13 @@ - + - - - - - - - - + + - - - - - - - - - - - - - - - + + + + + + + + + + + java.lang.Throwable + + + + + + + + + + \ No newline at end of file diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/resources/com.raytheon.uf.edex.datadelivery.retrieval.properties b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/resources/com.raytheon.uf.edex.datadelivery.retrieval.properties index 24e25148d9..bc69999d72 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/resources/com.raytheon.uf.edex.datadelivery.retrieval.properties +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/resources/com.raytheon.uf.edex.datadelivery.retrieval.properties @@ -11,8 +11,11 @@ retrieval.task.frequency=1 MINUTES # How often to check for retrieved subscriptions to notify of # Valid units: [MILLISECONDS, SECONDS, MINUTES, HOURS, DAYS] subnotify.task.frequency=1 MINUTES - # Cron for Subscription Expiration Checker - currently 2 minutes past the hour checkExpiredSubscription.cron=0+2+*+*+*+? # Cron for subscriptions nearing the end of their active period - currently once/day at 00:15Z -checkEndingSubscription.cron=0+15+0+*+*+? \ No newline at end of file +checkEndingSubscription.cron=0+15+0+*+*+? +# How many retrieval queue consumers +retrieval-process.threads=4 +# How many subNotify threads for retrieval +retrieval-subNotify.threads=1 \ No newline at end of file diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/db/RetrievalRequestRecord.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/db/RetrievalRequestRecord.java index a304ea0f96..6f297a8b2c 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/db/RetrievalRequestRecord.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/db/RetrievalRequestRecord.java @@ -34,7 +34,6 @@ import com.raytheon.uf.common.datadelivery.registry.Network; import com.raytheon.uf.common.datadelivery.retrieval.xml.Retrieval; import com.raytheon.uf.common.datadelivery.retrieval.xml.Retrieval.SubscriptionType; import com.raytheon.uf.common.dataplugin.persist.IPersistableDataObject; -import com.raytheon.uf.common.serialization.ISerializableObject; import com.raytheon.uf.common.serialization.SerializationException; import com.raytheon.uf.common.serialization.SerializationUtil; import com.raytheon.uf.common.serialization.annotations.DynamicSerialize; @@ -52,6 +51,7 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement; * Oct 10, 2012 0726 djohnson Add {@link #subRetrievalKey}. * Nov 26, 2012 1340 dhladky Added additional fields for tracking subscriptions * Jan 30, 2013 1543 djohnson Add PENDING_SBN, give retrieval column a length. + * Jan 30, 2014 2686 dhladky refactor of retrieval. * * * @@ -62,8 +62,7 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement; @Table(name = "subscription_retrieval") @DynamicSerialize public class RetrievalRequestRecord implements - IPersistableDataObject, Serializable, - ISerializableObject { + IPersistableDataObject, Serializable { public enum State { PENDING, RUNNING, FAILED, COMPLETED; diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/db/RetrievalRequestRecordPK.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/db/RetrievalRequestRecordPK.java index d7c3cb1eed..fef8fb45b2 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/db/RetrievalRequestRecordPK.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/db/RetrievalRequestRecordPK.java @@ -32,7 +32,6 @@ import org.apache.commons.lang.builder.EqualsBuilder; import org.apache.commons.lang.builder.HashCodeBuilder; import com.raytheon.uf.common.dataplugin.persist.IPersistableDataObject; -import com.raytheon.uf.common.serialization.ISerializableObject; import com.raytheon.uf.common.serialization.annotations.DynamicSerialize; import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement; @@ -47,6 +46,7 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement; * May 09, 2012 rjpeter Initial creation * Feb 11, 2013 1543 djohnson Override equals/hashCode to remove Hibernate warning. * Feb 15, 2013 1543 djohnson Add JAXB annotations. + * Jan 30, 2014 2686 dhladky refactor of retrieval. * * * @@ -59,7 +59,7 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement; @XmlAccessorType(XmlAccessType.NONE) public class RetrievalRequestRecordPK implements IPersistableDataObject, - Serializable, ISerializableObject { + Serializable { private static final long serialVersionUID = 1L; diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/DeserializeRetrievedDataFromIngest.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/DeserializeRetrievedDataFromIngest.java index 97d5654b46..80acc040c2 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/DeserializeRetrievedDataFromIngest.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/DeserializeRetrievedDataFromIngest.java @@ -19,8 +19,6 @@ **/ package com.raytheon.uf.edex.datadelivery.retrieval.handlers; -import java.util.concurrent.ConcurrentLinkedQueue; - import javax.xml.bind.JAXBException; import com.raytheon.edex.esb.Headers; @@ -47,6 +45,7 @@ import com.raytheon.uf.edex.wmo.message.XmlWMOMessage; * Nov 04, 2013 2506 bgonzale Added SbnRetrievalResponseXml to unmarshal classes. * Trim content after last xml tag during * marshaling from xml. + * Jan 30, 2014 2686 dhladky refactor of retrieval. * * * @@ -55,16 +54,13 @@ import com.raytheon.uf.edex.wmo.message.XmlWMOMessage; */ public class DeserializeRetrievedDataFromIngest implements IRetrievalsFinder { - private final ConcurrentLinkedQueue retrievalQueue; - private final JAXBManager jaxbManager; /** * @param retrievalQueue */ - public DeserializeRetrievedDataFromIngest( - ConcurrentLinkedQueue retrievalQueue) { - this.retrievalQueue = retrievalQueue; + public DeserializeRetrievedDataFromIngest() { + try { this.jaxbManager = new JAXBManager(RetrievalResponseXml.class, SbnRetrievalResponseXml.class, @@ -80,8 +76,9 @@ public class DeserializeRetrievedDataFromIngest implements IRetrievalsFinder { * {@inheritDoc} */ @Override - public RetrievalResponseXml findRetrievals() throws Exception { - String xml = retrievalQueue.poll(); + public RetrievalResponseXml processRequest(RetrievalRequestWrapper rrw) throws Exception { + + String xml = (String) rrw.getPayload(); if (xml == null) { return null; diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/IRetrievalsFinder.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/IRetrievalsFinder.java index 6671e20c40..8475632993 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/IRetrievalsFinder.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/IRetrievalsFinder.java @@ -20,6 +20,7 @@ package com.raytheon.uf.edex.datadelivery.retrieval.handlers; + /** * Responsible for finding the {@link RetrievalResponseXml} that should be * processed. @@ -31,6 +32,7 @@ package com.raytheon.uf.edex.datadelivery.retrieval.handlers; * Date Ticket# Engineer Description * ------------ ---------- ----------- -------------------------- * Feb 01, 2013 1543 djohnson Initial creation + * Jan 30, 2014 2686 dhladky refactor of retrieval. * * * @@ -39,11 +41,11 @@ package com.raytheon.uf.edex.datadelivery.retrieval.handlers; */ public interface IRetrievalsFinder { /** - * Finds the {@link RetrievalResponseXml} that should be processed. - * + * Process the requests{@link RetrievalResponseXml} + * @param RetrievalRequestWrapper * @return the {@link RetrievalResponseXml} * @throws Exception */ - RetrievalResponseXml findRetrievals() + RetrievalResponseXml processRequest(RetrievalRequestWrapper rrw) throws Exception; } diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/PerformRetrievalsThenReturnFinder.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/PerformRetrievalsThenReturnFinder.java index 49edc92dfc..16532b2498 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/PerformRetrievalsThenReturnFinder.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/PerformRetrievalsThenReturnFinder.java @@ -21,7 +21,6 @@ package com.raytheon.uf.edex.datadelivery.retrieval.handlers; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; import com.google.common.annotations.VisibleForTesting; import com.raytheon.uf.common.datadelivery.event.status.DataDeliverySystemStatusDefinition; @@ -60,7 +59,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.interfaces.IRetrievalResponse * Feb 15, 2013 1543 djohnson Retrieval responses are now xml. * Jul 16, 2013 1655 mpduff Send a system status event based on the response from the provider. * Jan 15, 2014 2678 bgonzale Retrieve RetrievalRequestRecords from a Queue for processing. - * + * Jan 30, 2014 2686 dhladky refactor of retrieval. * * * @author djohnson @@ -74,17 +73,12 @@ public class PerformRetrievalsThenReturnFinder implements IRetrievalsFinder { private final IRetrievalDao retrievalDao; - private final ConcurrentLinkedQueue retrievalQueue; - /** * Constructor. * * @param network */ - public PerformRetrievalsThenReturnFinder( - ConcurrentLinkedQueue retrievalQueue, - IRetrievalDao retrievalDao) { - this.retrievalQueue = retrievalQueue; + public PerformRetrievalsThenReturnFinder(IRetrievalDao retrievalDao) { this.retrievalDao = retrievalDao; } @@ -92,34 +86,37 @@ public class PerformRetrievalsThenReturnFinder implements IRetrievalsFinder { * {@inheritDoc} */ @Override - public RetrievalResponseXml findRetrievals() throws Exception { + public RetrievalResponseXml processRequest(RetrievalRequestWrapper rrw) throws Exception { + RetrievalResponseXml retVal = null; - ITimer timer = TimeUtil.getTimer(); + timer.start(); + try { - timer.start(); - RetrievalRequestRecordPK id = retrievalQueue.poll(); + // Process through the retrieval + RetrievalRequestRecordPK id = (RetrievalRequestRecordPK) rrw.getPayload(); + if (id == null) { return null; } + statusHandler.info("Found this RetrievalRequestRecordPK: " + + id.toString()); RetrievalRequestRecord request = retrievalDao.getById(id); + if (request == null) { return null; } timer.stop(); - statusHandler.info("Activation of next retrieval took [" + statusHandler.info("Activation of this retrieval took [" + timer.getElapsedTime() + "] ms"); - timer.reset(); timer.start(); try { retVal = process(request); - timer.stop(); - statusHandler.info("Retrieval Processing for [" + request.getId() + "] took " + timer.getElapsedTime() + " ms"); @@ -129,9 +126,10 @@ public class PerformRetrievalsThenReturnFinder implements IRetrievalsFinder { "Retrieval Processing failed: [" + request.getId() + "]", e); } + } catch (Exception e) { statusHandler - .error("Unable to look up next retrieval request at this time.", + .error("Unable to look up retrieval request at this time.", e); } @@ -141,6 +139,7 @@ public class PerformRetrievalsThenReturnFinder implements IRetrievalsFinder { /** * The actual work gets done here. */ + @SuppressWarnings("rawtypes") @VisibleForTesting RetrievalResponseXml process(RetrievalRequestRecord requestRecord) { requestRecord.setState(State.FAILED); @@ -196,6 +195,7 @@ public class PerformRetrievalsThenReturnFinder implements IRetrievalsFinder { } catch (Exception e) { statusHandler.handle(Priority.WARN, e.getLocalizedMessage(), e); } + RetrievalResponseXml retrievalPluginDataObject = new RetrievalResponseXml( requestRecord.getId(), retrievalAttributePluginDataObjects); retrievalPluginDataObject diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/RetrievalHandler.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/RetrievalHandler.java index 5e13d4e38c..79e23d4ce3 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/RetrievalHandler.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/RetrievalHandler.java @@ -20,15 +20,18 @@ package com.raytheon.uf.edex.datadelivery.retrieval.handlers; * further licensing information. **/ -import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.springframework.stereotype.Service; +import com.raytheon.uf.common.serialization.SerializationException; +import com.raytheon.uf.common.serialization.SerializationUtil; import com.raytheon.uf.common.status.IUFStatusHandler; import com.raytheon.uf.common.status.UFStatus; +import com.raytheon.uf.common.status.UFStatus.Priority; import com.raytheon.uf.common.time.domain.api.IDuration; import com.raytheon.uf.edex.datadelivery.retrieval.db.IRetrievalDao; import com.raytheon.uf.edex.registry.ebxml.init.RegistryInitializedListener; @@ -45,6 +48,7 @@ import com.raytheon.uf.edex.registry.ebxml.init.RegistryInitializedListener; * Aug 09, 2012 1022 djohnson Use {@link ExecutorService} for retrieval. * Mar 04, 2013 1647 djohnson RetrievalTasks are now scheduled via constructor parameter. * Mar 27, 2013 1802 bphillip Scheduling of retrieval tasks now occurs after camel/spring have been initialized + * Jan 30, 2014 2686 dhladky refactor of retrieval. * * * @@ -57,35 +61,52 @@ public class RetrievalHandler implements RegistryInitializedListener { private static final IUFStatusHandler statusHandler = UFStatus .getHandler(RetrievalHandler.class); - private final ScheduledExecutorService executorService; - - private final List retrievalTasks; + private final ScheduledExecutorService scheduledExecutorService; private IRetrievalDao retrievalDao; - private IDuration retrievalTaskFrequency; - private IDuration subnotifyTaskFrequency; private SubscriptionNotifyTask subNotifyTask; - public RetrievalHandler(ScheduledExecutorService executorService, - IRetrievalDao retrievalDao, List retrievalTasks, - SubscriptionNotifyTask subNotifyTask, - IDuration retrievalTaskFrequency, IDuration subnotifyTaskFrequency) { - this.executorService = executorService; - this.retrievalTasks = retrievalTasks; + private Map taskFactories; + + public RetrievalHandler(ScheduledExecutorService scheduledExecutorService, + IRetrievalDao retrievalDao, SubscriptionNotifyTask subNotifyTask, + IDuration subnotifyTaskFrequency, + Map taskFactories) { + this.scheduledExecutorService = scheduledExecutorService; this.retrievalDao = retrievalDao; - this.retrievalTaskFrequency = retrievalTaskFrequency; this.subnotifyTaskFrequency = subnotifyTaskFrequency; this.subNotifyTask = subNotifyTask; + this.taskFactories = taskFactories; } - public void notify(List subscriptions) { - statusHandler.debug("Notifying that subscriptions are available."); + /** + * Pull a SubscriptionRetrievalRequestWrapper off of the retrieval queue + * Will be two types 1.) SBN retrievals will come in the form of XML + * containing the data itself 2.) OPSNET retrievals will be the + * RetrievalRequestRecordPK object + * + * @param SubscriptionRetrievalRequestWrapper + * wrapper as byte array + */ + public void notify(byte[] bytes) { + + SubscriptionRetrievalRequestWrapper srrw = null; - for (RetrievalTask retrievalTask : retrievalTasks) { - executorService.execute(retrievalTask); + try { + srrw = SerializationUtil.transformFromThrift( + SubscriptionRetrievalRequestWrapper.class, bytes); + + if (srrw != null) { + RetrievalTask task = getTasker(srrw); + task.run(); + } + + } catch (SerializationException e) { + statusHandler.handle(Priority.ERROR, + "Can't deserialize RetrievalRequestWrapper!", e); } } @@ -93,13 +114,19 @@ public class RetrievalHandler implements RegistryInitializedListener { public void executeAfterRegistryInit() { // set all Running state retrievals to pending retrievalDao.resetRunningRetrievalsToPending(); - - for (RetrievalTask retrievalTask : retrievalTasks) { - executorService.scheduleWithFixedDelay(retrievalTask, 30000, - retrievalTaskFrequency.getMillis(), TimeUnit.MILLISECONDS); - } - executorService.scheduleWithFixedDelay(subNotifyTask, 30000, + // run the sub notifier every 30 sec for notifications + scheduledExecutorService.scheduleWithFixedDelay(subNotifyTask, 30000, subnotifyTaskFrequency.getMillis(), TimeUnit.MILLISECONDS); - } + + /** + * Get a RetrievalTask for the given network + * @param network + * @return RetrievalTask + */ + private RetrievalTask getTasker(SubscriptionRetrievalRequestWrapper wrapper) { + + return taskFactories.get(wrapper.getNetwork().name()).create(wrapper); + } + } diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/RetrievalRequestWrapper.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/RetrievalRequestWrapper.java new file mode 100644 index 0000000000..6564380516 --- /dev/null +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/RetrievalRequestWrapper.java @@ -0,0 +1,59 @@ +package com.raytheon.uf.edex.datadelivery.retrieval.handlers; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlAttribute; +import javax.xml.bind.annotation.XmlRootElement; + +import com.raytheon.uf.common.serialization.annotations.DynamicSerialize; +import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement; +/** + * Wrapper for retrieval primary keys + * + *
+ * 
+ * SOFTWARE HISTORY
+ * 
+ * Date         Ticket#    Engineer    Description
+ * ------------ ---------- ----------- --------------------------
+ * Jan 30, 2014 2686       dhladky      refactor of retrieval.
+ * 
+ * 
+ * + * @author dhladky + * @version 1.0 + */ +@XmlRootElement +@XmlAccessorType(XmlAccessType.NONE) +@DynamicSerialize +public class RetrievalRequestWrapper { + + @XmlAttribute + @DynamicSerializeElement + private Object payload; + + /** + * Constructor. + */ + public RetrievalRequestWrapper() { + + } + + /** + * Constructor + * + * @param payload + */ + public RetrievalRequestWrapper(Object payload) { + this.payload = payload; + } + + public void setPayload(Object payload) { + this.payload = payload; + } + + public Object getPayload() { + return payload; + } + +} diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/RetrievalTask.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/RetrievalTask.java index b576bf98d4..a68424e7a8 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/RetrievalTask.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/RetrievalTask.java @@ -26,7 +26,7 @@ import com.raytheon.uf.common.status.UFStatus.Priority; import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord; /** - * Inner class to process individual retrievals. + * Process subscription retrievals. * *
  * 
@@ -43,6 +43,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord;
  * Mar 05, 2013 1647       djohnson     Change no retrievals found message to debug.
  * Aug 09, 2013 1822       bgonzale     Added parameters to processRetrievedPluginDataObjects.
  * Oct 01, 2013 2267       bgonzale     Removed request parameter and IRetrievalDao field.
+ * Jan 30, 2014 2686       dhladky      refactor of retrieval.
  * 
  * 
* @@ -61,57 +62,66 @@ public class RetrievalTask implements Runnable { private final IRetrievalResponseCompleter retrievalCompleter; private final IRetrievalsFinder retrievalDataFinder; - + + private final SubscriptionRetrievalRequestWrapper retrievalRequestWrapper; public RetrievalTask(Network network, IRetrievalsFinder retrievalDataFinder, IRetrievalPluginDataObjectsProcessor retrievedDataProcessor, - IRetrievalResponseCompleter retrievalCompleter) { + IRetrievalResponseCompleter retrievalCompleter, + SubscriptionRetrievalRequestWrapper retrievalRequestWrapper) { + this.network = network; this.retrievalDataFinder = retrievalDataFinder; this.retrievedDataProcessor = retrievedDataProcessor; this.retrievalCompleter = retrievalCompleter; + this.retrievalRequestWrapper = retrievalRequestWrapper; } @Override public void run() { + try { - while (true) { + + if (retrievalRequestWrapper.getRetrievalRequestWrappers() != null) { + + for (RetrievalRequestWrapper retrieval : retrievalRequestWrapper.getRetrievalRequestWrappers()) { + // process individual requests for this subscription + boolean success = false; + RetrievalRequestRecord request = null; - // process request - boolean success = false; - RetrievalRequestRecord request = null; - try { + try { + // send this retrieval to be processed + RetrievalResponseXml retrievalResponse = retrievalDataFinder + .processRequest(retrieval); - RetrievalResponseXml retrievalPluginDataObject = retrievalDataFinder - .findRetrievals(); - // This forces the return from the while loop once there are - // no more retrievals to process - if (retrievalPluginDataObject == null) { - if (statusHandler.isPriorityEnabled(Priority.DEBUG)) { - statusHandler.debug("No " + network - + " retrievals found."); + if (retrievalResponse == null) { + if (statusHandler.isPriorityEnabled(Priority.DEBUG)) { + statusHandler.debug("No " + network + + " retrievals found."); + } + continue; } - return; + + success = retrievalResponse.isSuccess(); + request = retrievedDataProcessor + .processRetrievedPluginDataObjects(retrievalResponse); + + } catch (Exception e) { + statusHandler.error(network + + " retrieval processing error", e); } - success = retrievalPluginDataObject.isSuccess(); - request = retrievedDataProcessor - .processRetrievedPluginDataObjects(retrievalPluginDataObject); - } catch (Exception e) { - statusHandler.error( - network + " retrieval processing error", e); - } - - if (request != null) { - retrievalCompleter.completeRetrieval(request, - new RetrievalResponseStatus(success)); + if (request != null) { + retrievalCompleter.completeRetrieval(request, + new RetrievalResponseStatus(success)); + } } } } catch (Throwable e) { // so thread can't die statusHandler.error("Error caught in " + network + " retrieval thread", e); - } + } } } diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/RetrievalTaskFactory.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/RetrievalTaskFactory.java new file mode 100644 index 0000000000..8227c07081 --- /dev/null +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/RetrievalTaskFactory.java @@ -0,0 +1,67 @@ +/** + * This software was developed and / or modified by Raytheon Company, + * pursuant to Contract DG133W-05-CQ-1067 with the US Government. + * + * U.S. EXPORT CONTROLLED TECHNICAL DATA + * This software product contains export-restricted data whose + * export/transfer/disclosure is restricted by U.S. law. Dissemination + * to non-U.S. persons whether in the United States or abroad requires + * an export license or other authorization. + * + * Contractor Name: Raytheon Company + * Contractor Address: 6825 Pine Street, Suite 340 + * Mail Stop B8 + * Omaha, NE 68106 + * 402.291.0100 + * + * See the AWIPS II Master Rights File ("Master Rights File.pdf") for + * further licensing information. + **/ +package com.raytheon.uf.edex.datadelivery.retrieval.handlers; + +import com.raytheon.uf.common.datadelivery.registry.Network; +/** + * Factory to create RetrievalTasks for a specific Subscription Retrieval Request. + * + *
+ * 
+ * SOFTWARE HISTORY
+ * 
+ * Date         Ticket#    Engineer    Description
+ * ------------ ---------- ----------- --------------------------
+ * Feb 04, 2014  2686       dhladky      Initial creation
+ * 
+ * 
+ * + * @author dhladky + * @version 1.0 + */ +public class RetrievalTaskFactory { + + private final Network network; + + private final IRetrievalPluginDataObjectsProcessor retrievedDataProcessor; + + private final IRetrievalResponseCompleter retrievalCompleter; + + private final IRetrievalsFinder retrievalDataFinder; + + public RetrievalTaskFactory(Network network, + IRetrievalsFinder retrievalDataFinder, + IRetrievalPluginDataObjectsProcessor retrievedDataProcessor, + IRetrievalResponseCompleter retrievalCompleter) { + this.network = network; + this.retrievalDataFinder = retrievalDataFinder; + this.retrievedDataProcessor = retrievedDataProcessor; + this.retrievalCompleter = retrievalCompleter; + } + + /** + * RetrievalTask creator, Factory method + * @param retrievalRequest + * @return + */ + public RetrievalTask create(SubscriptionRetrievalRequestWrapper retrievalRequest) { + return new RetrievalTask(network, retrievalDataFinder, retrievedDataProcessor, retrievalCompleter, retrievalRequest); + } +} \ No newline at end of file diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/SubscriptionRetrievalRequestWrapper.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/SubscriptionRetrievalRequestWrapper.java new file mode 100644 index 0000000000..bf9910884b --- /dev/null +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/handlers/SubscriptionRetrievalRequestWrapper.java @@ -0,0 +1,98 @@ +/** + * This software was developed and / or modified by Raytheon Company, + * pursuant to Contract DG133W-05-CQ-1067 with the US Government. + * + * U.S. EXPORT CONTROLLED TECHNICAL DATA + * This software product contains export-restricted data whose + * export/transfer/disclosure is restricted by U.S. law. Dissemination + * to non-U.S. persons whether in the United States or abroad requires + * an export license or other authorization. + * + * Contractor Name: Raytheon Company + * Contractor Address: 6825 Pine Street, Suite 340 + * Mail Stop B8 + * Omaha, NE 68106 + * 402.291.0100 + * + * See the AWIPS II Master Rights File ("Master Rights File.pdf") for + * further licensing information. + **/ +package com.raytheon.uf.edex.datadelivery.retrieval.handlers; + +import java.util.List; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlAttribute; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlElements; +import javax.xml.bind.annotation.XmlRootElement; + +import com.raytheon.uf.common.datadelivery.registry.Network; +import com.raytheon.uf.common.serialization.annotations.DynamicSerialize; +import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement; + +/** + * Wrapper for objects placed on common retrieval queue + * + *
+ * 
+ * SOFTWARE HISTORY
+ * 
+ * Date         Ticket#    Engineer    Description
+ * ------------ ---------- ----------- --------------------------
+ * Jan 28, 2014            dhladky     Initial creation
+ * 
+ * 
+ * + * @author dhladky + * @version 1.0 + */ +@XmlRootElement +@XmlAccessorType(XmlAccessType.NONE) +@DynamicSerialize +public class SubscriptionRetrievalRequestWrapper { + + @XmlElements({ @XmlElement(name="retrievalRequestWrapper") }) + @DynamicSerializeElement + private List retrievalRequestWrappers; + + @XmlAttribute + @DynamicSerializeElement + private Network network; + + /** + * Constructor. + */ + public SubscriptionRetrievalRequestWrapper() { + + } + + /** + * Constructor + * + * @param network + * @param retrievalRequestWrappers + */ + public SubscriptionRetrievalRequestWrapper(Network network, List retrievalRequestWrappers) { + this.setNetwork(network); + this.retrievalRequestWrappers = retrievalRequestWrappers; + } + + public Network getNetwork() { + return network; + } + + public void setNetwork(Network network) { + this.network = network; + } + + public void setRetrievalRequestWrappers(List retrievalRequestWrappers) { + this.retrievalRequestWrappers = retrievalRequestWrappers; + } + + public List getRetrievalRequestWrappers() { + return retrievalRequestWrappers; + } + +} diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/util/RetrievalGeneratorUtilities.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/util/RetrievalGeneratorUtilities.java index cf662845a6..029ae675ba 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/util/RetrievalGeneratorUtilities.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/util/RetrievalGeneratorUtilities.java @@ -24,16 +24,21 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import com.raytheon.uf.common.datadelivery.registry.Network; import com.raytheon.uf.common.datadelivery.registry.Parameter; import com.raytheon.uf.common.dataplugin.grid.GridRecord; import com.raytheon.uf.common.dataplugin.level.Level; import com.raytheon.uf.common.gridcoverage.GridCoverage; +import com.raytheon.uf.common.serialization.SerializationUtil; import com.raytheon.uf.common.status.IUFStatusHandler; import com.raytheon.uf.common.status.UFStatus; import com.raytheon.uf.common.status.UFStatus.Priority; import com.raytheon.uf.common.time.DataTime; +import com.raytheon.uf.edex.core.EDEXUtil; import com.raytheon.uf.edex.database.dao.CoreDao; import com.raytheon.uf.edex.database.dao.DaoConfig; +import com.raytheon.uf.edex.datadelivery.retrieval.handlers.RetrievalRequestWrapper; +import com.raytheon.uf.edex.datadelivery.retrieval.handlers.SubscriptionRetrievalRequestWrapper; /** * @@ -48,6 +53,7 @@ import com.raytheon.uf.edex.database.dao.DaoConfig; * Nov 19, 2012 bsteffen Initial javadoc * Dec 10, 2012 1259 bsteffen Switch Data Delivery from LatLon to referenced envelopes. * Dec 11, 2013 2625 mpduff Remove creation of DataURI. + * Jan 30, 2014 2686 dhladky refactor of retrieval. * * * @@ -134,4 +140,33 @@ public class RetrievalGeneratorUtilities { return dups; } + + /** + * + * Drops Retrievals by subscription into a common queue for processing + * + * @param destinationUri + * @param network + * @param payload + * @throws Exception + */ + public static void sendToRetrieval(String destinationUri, Network network, + Object[] payload) throws Exception{ + + if (payload != null) { + + List wrappers = new ArrayList( + payload.length); + + for (Object o : payload) { + RetrievalRequestWrapper rrw = new RetrievalRequestWrapper(o); + wrappers.add(rrw); + } + + SubscriptionRetrievalRequestWrapper srrw = new SubscriptionRetrievalRequestWrapper( + network, wrappers); + byte[] bytes = SerializationUtil.transformToThrift(srrw); + EDEXUtil.getMessageProducer().sendAsync(destinationUri, bytes); + } + } } diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/util/RetrievalPersistUtil.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/util/RetrievalPersistUtil.java index 77d5d22fc4..850d3fef52 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/util/RetrievalPersistUtil.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/util/RetrievalPersistUtil.java @@ -53,6 +53,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.mapping.PluginRouteList; * Date Ticket# Engineer Description * ------------ ---------- ----------- -------------------------- * Nov 26, 2012 1367 dhladky Common plugin route persistence + * Jan 30, 2014 2686 dhladky refactor of retrieval. * * * @@ -162,8 +163,8 @@ public final class RetrievalPersistUtil { PluginRouteList prl = null; try { - prl = (PluginRouteList) getJaxbManager() - .unmarshalFromXmlFile(file); + prl = getJaxbManager() + .unmarshalFromXmlFile(PluginRouteList.class, file); } catch (Exception e) { statusHandler.error( "[Data Delivery] Configuration for plugin routes failed to load: File: " diff --git a/edexOsgi/com.raytheon.uf.edex.plugin.datadelivery.retrieval/META-INF/MANIFEST.MF b/edexOsgi/com.raytheon.uf.edex.plugin.datadelivery.retrieval/META-INF/MANIFEST.MF index fadedfa812..c0efa94208 100644 --- a/edexOsgi/com.raytheon.uf.edex.plugin.datadelivery.retrieval/META-INF/MANIFEST.MF +++ b/edexOsgi/com.raytheon.uf.edex.plugin.datadelivery.retrieval/META-INF/MANIFEST.MF @@ -4,6 +4,9 @@ Bundle-Name: DataDelivery Retrieval Plug-in Bundle-SymbolicName: com.raytheon.uf.edex.plugin.datadelivery.retrieval Bundle-Version: 1.12.1174.qualifier Bundle-Vendor: RAYTHEON -Require-Bundle: com.raytheon.edex.common;bundle-version="1.11.7" +Require-Bundle: com.raytheon.edex.common;bundle-version="1.11.7", + com.raytheon.uf.edex.datadelivery.retrieval;bundle-version="1.0.0", + com.raytheon.uf.common.datadelivery.registry;bundle-version="1.0.0", + com.raytheon.uf.common.status;bundle-version="1.12.1174" Bundle-RequiredExecutionEnvironment: JavaSE-1.6 Bundle-ActivationPolicy: lazy diff --git a/edexOsgi/com.raytheon.uf.edex.plugin.datadelivery.retrieval/res/spring/datadelivery-wfo-retrieval-process.xml b/edexOsgi/com.raytheon.uf.edex.plugin.datadelivery.retrieval/res/spring/datadelivery-wfo-retrieval-process.xml index 59275fe6d4..0aef8ec85f 100644 --- a/edexOsgi/com.raytheon.uf.edex.plugin.datadelivery.retrieval/res/spring/datadelivery-wfo-retrieval-process.xml +++ b/edexOsgi/com.raytheon.uf.edex.plugin.datadelivery.retrieval/res/spring/datadelivery-wfo-retrieval-process.xml @@ -5,7 +5,7 @@ - + * @@ -42,11 +46,16 @@ import com.raytheon.edex.plugin.AbstractDecoder; */ public class SbnDataDeliveryRetrievalDecoder extends AbstractDecoder { + + private static final IUFStatusHandler statusHandler = UFStatus + .getHandler(SbnDataDeliveryRetrievalDecoder.class); - private final ConcurrentLinkedQueue sbnRetrievalQueue; - - public SbnDataDeliveryRetrievalDecoder(ConcurrentLinkedQueue queue) { - this.sbnRetrievalQueue = queue; + private String destinationUri; + + private Network network = Network.SBN; + + public SbnDataDeliveryRetrievalDecoder(String destinationUri) { + this.destinationUri = destinationUri; } /** @@ -58,7 +67,14 @@ public class SbnDataDeliveryRetrievalDecoder extends AbstractDecoder { * the headers */ public void process(byte[] data, Headers headers) { - this.sbnRetrievalQueue.add(new String(data)); + // drops to common retrieval queue for processing/persistence + String xml = new String(data); + try { + Object[] payload = new Object[]{xml}; + RetrievalGeneratorUtilities.sendToRetrieval(destinationUri, network, payload); + } catch (Exception e) { + statusHandler.handle(Priority.ERROR, "Couldn't send SBN data to Retrieval Queue", e); + } } } diff --git a/tests/unit/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/SubscriptionRetrievalAgentTest.java b/tests/unit/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/SubscriptionRetrievalAgentTest.java index 7f658cfaba..027cabbf70 100644 --- a/tests/unit/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/SubscriptionRetrievalAgentTest.java +++ b/tests/unit/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/SubscriptionRetrievalAgentTest.java @@ -71,6 +71,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecordPK; * Jan 30, 2013 1543 djohnson Initial creation * Jul 10, 2013 2106 djohnson Inject providerHandler. * Jan 15, 2014 2678 bgonzale Added Queue. + * Jan 30, 2014 2686 dhladky refactor of retrieval. * * * @@ -133,12 +134,8 @@ public class SubscriptionRetrievalAgentTest { SubscriptionRetrievalAgent agent = new SubscriptionRetrievalAgent( route, "someUri", new Object(), 1, null, bandwidthDao, - retrievalDao, DataDeliveryHandlers.getProviderHandler(), - retrievalQueue) { - @Override - void wakeRetrievalTasks() throws EdexException { - // Do nothing - } + retrievalDao, DataDeliveryHandlers.getProviderHandler()) { + }; agent.processAllocation(subscriptionRetrieval); diff --git a/tests/unit/com/raytheon/uf/edex/datadelivery/retrieval/handlers/DeserializeRetrievedDataFromIngestTest.java b/tests/unit/com/raytheon/uf/edex/datadelivery/retrieval/handlers/DeserializeRetrievedDataFromIngestTest.java index 5921978180..a864d5af33 100644 --- a/tests/unit/com/raytheon/uf/edex/datadelivery/retrieval/handlers/DeserializeRetrievedDataFromIngestTest.java +++ b/tests/unit/com/raytheon/uf/edex/datadelivery/retrieval/handlers/DeserializeRetrievedDataFromIngestTest.java @@ -67,6 +67,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecordPK; * Nov 04, 2013 2506 bgonzale Fixed IRetreivalDao mock initialization. * Test deserialization of data with leading and trailing * content on the xml. + * Jan 30, 2014 2686 dhladky refactor of retrieval. * * * @@ -77,10 +78,7 @@ public class DeserializeRetrievedDataFromIngestTest { private final File directory = TestUtil .setupTestClassDir(DeserializeRetrievedDataFromIngestTest.class); - private final ConcurrentLinkedQueue retrievalQueue = new ConcurrentLinkedQueue(); - - private final DeserializeRetrievedDataFromIngest service = new DeserializeRetrievedDataFromIngest( - retrievalQueue); + private final DeserializeRetrievedDataFromIngest service = new DeserializeRetrievedDataFromIngest(); private final IRetrievalDao mockDao = mock(IRetrievalDao.class); @@ -103,7 +101,7 @@ public class DeserializeRetrievedDataFromIngestTest { addRetrievalToQueue(); - final RetrievalResponseXml restored = service.findRetrievals(); + final RetrievalResponseXml restored = service.processRequest(null); // Just make sure the payload is present assertThat(restored.getRetrievalAttributePluginDataObjects().get(0) @@ -117,15 +115,15 @@ public class DeserializeRetrievedDataFromIngestTest { addRetrievalToQueue(); - service.findRetrievals(); + service.processRequest(null); - assertThat(retrievalQueue, is(empty())); + //assertThat(retrievalQueue, is(empty())); } @Test public void returnsNullWhenNothingInTheQueue() throws Exception { - final RetrievalResponseXml restored = service.findRetrievals(); + final RetrievalResponseXml restored = service.processRequest(null); assertNull(restored); } @@ -136,7 +134,7 @@ public class DeserializeRetrievedDataFromIngestTest { addSimulatedSBNRetrievalToQueue(); - final RetrievalResponseXml restored = service.findRetrievals(); + final RetrievalResponseXml restored = service.processRequest(null); // check for the payload assertThat(restored.getRetrievalAttributePluginDataObjects().get(0) @@ -158,7 +156,7 @@ public class DeserializeRetrievedDataFromIngestTest { final List files = FileUtil.listFiles(directory, FilenameFilters.ACCEPT_FILES, false); - retrievalQueue.add(FileUtil.file2String(files.get(0))); + //retrievalQueue.add(FileUtil.file2String(files.get(0))); } private void addSimulatedSBNRetrievalToQueue() @@ -178,7 +176,7 @@ public class DeserializeRetrievedDataFromIngestTest { final List files = FileUtil.listFiles(directory, FilenameFilters.ACCEPT_FILES, false); - retrievalQueue.add(FileUtil.file2String(files.get(0))); + //retrievalQueue.add(FileUtil.file2String(files.get(0))); } private static class WmoHeaderWithLeadingAndTrailingContent extends diff --git a/tests/unit/com/raytheon/uf/edex/datadelivery/retrieval/handlers/PerformRetrievalPluginDataObjectsFinderTest.java b/tests/unit/com/raytheon/uf/edex/datadelivery/retrieval/handlers/PerformRetrievalPluginDataObjectsFinderTest.java index 98382b25a5..06f1a43406 100644 --- a/tests/unit/com/raytheon/uf/edex/datadelivery/retrieval/handlers/PerformRetrievalPluginDataObjectsFinderTest.java +++ b/tests/unit/com/raytheon/uf/edex/datadelivery/retrieval/handlers/PerformRetrievalPluginDataObjectsFinderTest.java @@ -53,6 +53,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.interfaces.IRetrievalResponse * ------------ ---------- ----------- -------------------------- * Feb 06, 2013 1543 djohnson Initial creation * Jan 15, 2014 2678 bgonzale Added Queue. + * Jan 30, 2014 2686 dhladky refactor of retrieval. * * * @@ -135,8 +136,8 @@ public class PerformRetrievalPluginDataObjectsFinderTest { } private void processRetrieval(RetrievalRequestRecord retrieval) { - final PerformRetrievalsThenReturnFinder pluginDataObjectsFinder = new PerformRetrievalsThenReturnFinder( - retrievalQueue, MOCK_DAO); - pluginDataObjectsFinder.process(retrieval); + //final PerformRetrievalsThenReturnFinder pluginDataObjectsFinder = new PerformRetrievalsThenReturnFinder( + // retrievalQueue, MOCK_DAO); + //pluginDataObjectsFinder.process(retrieval); } } diff --git a/tests/unit/com/raytheon/uf/edex/datadelivery/retrieval/handlers/RetrievalHandlerTest.java b/tests/unit/com/raytheon/uf/edex/datadelivery/retrieval/handlers/RetrievalHandlerTest.java deleted file mode 100644 index 294cdd5c5b..0000000000 --- a/tests/unit/com/raytheon/uf/edex/datadelivery/retrieval/handlers/RetrievalHandlerTest.java +++ /dev/null @@ -1,105 +0,0 @@ -/** - * This software was developed and / or modified by Raytheon Company, - * pursuant to Contract DG133W-05-CQ-1067 with the US Government. - * - * U.S. EXPORT CONTROLLED TECHNICAL DATA - * This software product contains export-restricted data whose - * export/transfer/disclosure is restricted by U.S. law. Dissemination - * to non-U.S. persons whether in the United States or abroad requires - * an export license or other authorization. - * - * Contractor Name: Raytheon Company - * Contractor Address: 6825 Pine Street, Suite 340 - * Mail Stop B8 - * Omaha, NE 68106 - * 402.291.0100 - * - * See the AWIPS II Master Rights File ("Master Rights File.pdf") for - * further licensing information. - **/ -package com.raytheon.uf.edex.datadelivery.retrieval.handlers; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; - -import java.util.Arrays; -import java.util.Collections; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import org.junit.Test; - -import com.raytheon.uf.common.time.domain.Durations; -import com.raytheon.uf.common.time.domain.api.IDuration; -import com.raytheon.uf.edex.datadelivery.retrieval.db.IRetrievalDao; - -/** - * Test {@link RetrievalHandler}. - * - *
- * 
- * SOFTWARE HISTORY
- * 
- * Date         Ticket#    Engineer    Description
- * ------------ ---------- ----------- --------------------------
- * Jul 06, 2012 740        djohnson     Initial creation
- * Aug 09. 2012 1022       djohnson     Changes to RetrievalHandler.
- * Nov 19, 2012 1166       djohnson     Clean up JAXB representation of registry objects.
- * Jan 30, 2013 1543       djohnson     RetrievalTask now requires a Network.
- * Feb 05, 2013 1580       mpduff       EventBus refactor.
- * Feb 07, 2013 1543       djohnson     Move test to its proper test class, as per peer review comments.
- * Mar 04, 2013 1647       djohnson     RetrievalTasks are now scheduled via constructor parameter.
- * 
- * 
- * - * @author djohnson - * @version 1.0 - */ - -public class RetrievalHandlerTest { - - private static final IDuration RETRIEVAL_TASK_FREQUENCY = Durations.of(5, - TimeUnit.MINUTES); - - private static final IDuration SUBNOTIFY_TASK_FREQUENCY = Durations.of(1, - TimeUnit.MINUTES); - - private final ScheduledExecutorService executorService = mock(ScheduledExecutorService.class); - - private final IRetrievalDao mockDao = mock(IRetrievalDao.class); - - private final RetrievalTask retrievalTask = mock(RetrievalTask.class); - - private final SubscriptionNotifyTask subNotifyTask = mock(SubscriptionNotifyTask.class); - - private final RetrievalHandler handler = new RetrievalHandler( - executorService, mockDao, Arrays.asList(retrievalTask), - subNotifyTask, RETRIEVAL_TASK_FREQUENCY, SUBNOTIFY_TASK_FREQUENCY); - - @Test - public void testAllRunningRetrievalsAreResetToPendingOnConstruction() { - handler.executeAfterRegistryInit(); - verify(mockDao).resetRunningRetrievalsToPending(); - } - - @Test - public void testOnNotifyOfSubscriptionsARetrievalTaskIsExecuted() { - handler.notify(Collections. emptyList()); - - verify(executorService).execute(retrievalTask); - } - - @Test - public void testRetrievalTaskIsScheduledPerConstructorParameter() { - handler.executeAfterRegistryInit(); - verify(executorService).scheduleWithFixedDelay(retrievalTask, 30000, - RETRIEVAL_TASK_FREQUENCY.getMillis(), TimeUnit.MILLISECONDS); - } - - @Test - public void testSubscriptionNotifyTaskIsScheduledPerConstructorParameter() { - handler.executeAfterRegistryInit(); - verify(executorService).scheduleWithFixedDelay(subNotifyTask, 30000, - SUBNOTIFY_TASK_FREQUENCY.getMillis(), TimeUnit.MILLISECONDS); - } -} diff --git a/tests/unit/com/raytheon/uf/edex/datadelivery/retrieval/handlers/RetrievalTaskTest.java b/tests/unit/com/raytheon/uf/edex/datadelivery/retrieval/handlers/RetrievalTaskTest.java deleted file mode 100644 index 853f530be7..0000000000 --- a/tests/unit/com/raytheon/uf/edex/datadelivery/retrieval/handlers/RetrievalTaskTest.java +++ /dev/null @@ -1,344 +0,0 @@ -/** - * This software was developed and / or modified by Raytheon Company, - * pursuant to Contract DG133W-05-CQ-1067 with the US Government. - * - * U.S. EXPORT CONTROLLED TECHNICAL DATA - * This software product contains export-restricted data whose - * export/transfer/disclosure is restricted by U.S. law. Dissemination - * to non-U.S. persons whether in the United States or abroad requires - * an export license or other authorization. - * - * Contractor Name: Raytheon Company - * Contractor Address: 6825 Pine Street, Suite 340 - * Mail Stop B8 - * Omaha, NE 68106 - * 402.291.0100 - * - * See the AWIPS II Master Rights File ("Master Rights File.pdf") for - * further licensing information. - **/ -package com.raytheon.uf.edex.datadelivery.retrieval.handlers; - -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.mock; - -import java.io.File; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentLinkedQueue; - -import org.junit.After; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.annotation.DirtiesContext.ClassMode; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; -import org.springframework.transaction.annotation.Transactional; - -import com.google.common.eventbus.Subscribe; -import com.raytheon.uf.common.datadelivery.event.retrieval.DataRetrievalEvent; -import com.raytheon.uf.common.datadelivery.registry.Network; -import com.raytheon.uf.common.datadelivery.registry.Provider.ServiceType; -import com.raytheon.uf.common.datadelivery.retrieval.xml.Retrieval; -import com.raytheon.uf.common.datadelivery.retrieval.xml.RetrievalAttribute; -import com.raytheon.uf.common.dataplugin.PluginDataObject; -import com.raytheon.uf.common.event.EventBus; -import com.raytheon.uf.common.localization.PathManagerFactoryTest; -import com.raytheon.uf.common.registry.handler.RegistryHandlerException; -import com.raytheon.uf.common.serialization.SerializationException; -import com.raytheon.uf.common.util.FileUtil; -import com.raytheon.uf.common.util.SpringFiles; -import com.raytheon.uf.common.util.TestUtil; -import com.raytheon.uf.common.util.file.FilenameFilters; -import com.raytheon.uf.edex.database.DataAccessLayerException; -import com.raytheon.uf.edex.datadelivery.retrieval.ServiceTypeFactory; -import com.raytheon.uf.edex.datadelivery.retrieval.adapters.RetrievalAdapter; -import com.raytheon.uf.edex.datadelivery.retrieval.adapters.RetrievalAdapter.TranslationException; -import com.raytheon.uf.edex.datadelivery.retrieval.db.IRetrievalDao; -import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord; -import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord.State; -import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecordPK; -import com.raytheon.uf.edex.datadelivery.retrieval.interfaces.IRetrievalResponse; - -/** - * Test {@link RetrievalTask}. - * - *
- * 
- * SOFTWARE HISTORY
- * 
- * Date         Ticket#    Engineer    Description
- * ------------ ---------- ----------- --------------------------
- * Jan 30, 2013 1543       djohnson     Initial creation
- * Feb 07, 2013 1543       djohnson     Add test to simulate SBN retrieval task behavior.
- * Feb 12, 2013 1543       djohnson     Retrieval responses are now sent further down the chain.
- * Feb 15, 2013 1543       djohnson     Class renames.
- * Mar 05, 2013 1647       djohnson     Pass wmo header strategy to constructor.
- * Mar 19, 2013 1794       djohnson     RetrievalTasks integrate at a queue.
- * Apr 29, 2013 1910       djohnson     Unregister from EventBus after each test.
- * Aug 09, 2013 1822       bgonzale     Added parameters to processRetrievedPluginDataObjects.
- * Oct 01, 2013 2267       bgonzale     Pass request parameter instead of components of request.
- * Nov 04, 2013 2506       bgonzale     removed IRetrievalDao parameter.
- * Jan 15, 2014 2678       bgonzale     Added Queue.
- * 
- * 
- * - * @author djohnson - * @version 1.0 - */ -@RunWith(SpringJUnit4ClassRunner.class) -@ContextConfiguration(locations = { SpringFiles.UNIT_TEST_DB_BEANS_XML, - SpringFiles.RETRIEVAL_DATADELIVERY_DAOS_XML }) -@DirtiesContext(classMode = ClassMode.AFTER_EACH_TEST_METHOD) -public class RetrievalTaskTest { - /** - * Places the plugin data object into a collection for inspection. - */ - public class PlaceInCollectionProcessor implements - IRetrievalPluginDataObjectsProcessor { - public final List pluginDataObjects = new ArrayList(); - - /** - * {@inheritDoc} - * - * @return RetrievalRequestRecord - */ - @Override - public RetrievalRequestRecord processRetrievedPluginDataObjects( - RetrievalResponseXml retrievalPluginDataObjects) - throws SerializationException, TranslationException { - final List retrievalAttributePluginDataObjects = retrievalPluginDataObjects - .getRetrievalAttributePluginDataObjects(); - final RetrievalRequestRecord requestRecord = dao - .getById(retrievalPluginDataObjects.getRequestRecord()); - final Retrieval retrieval = requestRecord.getRetrievalObj(); - final ServiceType serviceType = retrieval.getServiceType(); - final RetrievalAdapter serviceRetrievalAdapter = ServiceTypeFactory - .retrieveServiceRetrievalAdapter(serviceType); - final Iterator attributesIter = retrieval - .getAttributes().iterator(); - - for (RetrievalResponseWrapper pluginDataObjectEntry : retrievalAttributePluginDataObjects) { - - if (!attributesIter.hasNext()) { - throw new RuntimeException( - "Did not find a RetrievalAttribute to match the retrieval response!"); - } - - // Restore the attribute xml prior to processing the response - final IRetrievalResponse response = pluginDataObjectEntry - .getRetrievalResponse(); - response.setAttribute(attributesIter.next()); - - final Map processed = serviceRetrievalAdapter - .processResponse(response); - for (PluginDataObject[] pdos : processed.values()) { - pluginDataObjects.addAll(Arrays.asList(pdos)); - } - } - return requestRecord; - } - } - - private RetrievalRequestRecord opsnetRetrieval; - - private RetrievalRequestRecord sbnRetrieval; - - @Autowired - @Qualifier(value = "retrievalDao") - private IRetrievalDao dao; - - private final ConcurrentLinkedQueue retrievalQueue = new ConcurrentLinkedQueue(); - - private final PlaceInCollectionProcessor retrievedDataProcessor = new PlaceInCollectionProcessor(); - - private final List eventsReceived = new ArrayList(); - - @Before - public void setUp() throws RegistryHandlerException { - PathManagerFactoryTest.initLocalization(); - - opsnetRetrieval = RetrievalRequestRecordFixture.INSTANCE.get(1); - sbnRetrieval = RetrievalRequestRecordFixture.INSTANCE.get(2); - opsnetRetrieval.setNetwork(Network.OPSNET); - sbnRetrieval.setNetwork(Network.SBN); - - EventBus.register(this); - } - - @After - public void tearDown() { - EventBus.unregister(this); - } - - @Test - public void processesRetrievalForItsSpecifiedNetwork() - throws DataAccessLayerException { - - stageRetrievals(); - - runRetrievalTask(); - - verifyCorrectStateForRetrieval(opsnetRetrieval, State.COMPLETED); - } - - @Test - public void storesPluginDataObjectsForItsSpecifiedNetwork() - throws DataAccessLayerException, SerializationException { - - stageRetrievals(); - - runRetrievalTask(); - - assertThat(retrievedDataProcessor.pluginDataObjects, - hasSize(opsnetRetrieval.getRetrievalObj().getAttributes() - .size())); - } - - @Ignore("dataRetrievalEvent is no longer sent separately from storage, perhaps restore it later?") - public void dataRetrievalEventIsSentForItsSpecifiedNetwork() - throws Exception { - - stageRetrievals(); - - runRetrievalTask(); - - final int numberOfRetrievalAttributes = opsnetRetrieval - .getRetrievalObj().getAttributes().size(); - assertThat(eventsReceived, hasSize(numberOfRetrievalAttributes)); - // TODO: Is there a way to distinguish between the events sent by the - // separate retrieval attributes, e.g. to make sure each attribute sent - // an event and not one attribute sent two? - } - - // TODO: Add tests for one retrieval failing and another succeeding, make - // sure correct events are sent and correct number of plugin data objects - // generated - - @Test - public void doesNotProcessRetrievalForAnotherNetwork() - throws DataAccessLayerException { - - stageRetrievals(); - - runRetrievalTask(); - - verifyCorrectStateForRetrieval(sbnRetrieval, State.PENDING); - } - - @Test - public void retrievalTaskCanStoreDataToDirectoryThatAnotherTaskProcesses() - throws Exception { - dao.create(RetrievalRequestRecordFixture.INSTANCE.get()); - - IRetrievalsFinder retrievalDataFinder = new PerformRetrievalsThenReturnFinder( - retrievalQueue, dao); - - final ConcurrentLinkedQueue retrievalQueue = new ConcurrentLinkedQueue(); - final File testDirectory = TestUtil - .setupTestClassDir(RetrievalTaskTest.class); - IRetrievalPluginDataObjectsProcessor serializeToDirectory = new SerializeRetrievedDataToDirectory( - testDirectory, new AlwaysSameWmoHeader("SMYG10 LYBM 280000"), - dao); - - RetrievalTask downloadTask = new RetrievalTask(Network.OPSNET, - retrievalDataFinder, serializeToDirectory, - mock(IRetrievalResponseCompleter.class)); - RetrievalTask readDownloadsTask = new RetrievalTask(Network.OPSNET, - new DeserializeRetrievedDataFromIngest(retrievalQueue), - retrievedDataProcessor, new RetrievalResponseCompleter( - mock(SubscriptionNotifyTask.class), dao)); - - downloadTask.run(); - - final List all = dao.getAll(); - for (RetrievalRequestRecord request : all) { - assertThat(request.getState(), is(State.RUNNING)); - } - - for (File file : FileUtil.listFiles(testDirectory, - FilenameFilters.ACCEPT_FILES, false)) { - retrievalQueue.add(FileUtil.file2String(file)); - } - - readDownloadsTask.run(); - - final List allRetrievals = dao.getAll(); - assertThat(allRetrievals, hasSize(1)); - assertThat(retrievedDataProcessor.pluginDataObjects, hasSize(2)); - - for (RetrievalRequestRecord request : allRetrievals) { - assertThat(request.getState(), is(State.COMPLETED)); - } - - } - - /** - * Stage the retrievals in the database. - */ - @Transactional - private void stageRetrievals() { - - dao.create(opsnetRetrieval); - retrievalQueue.add(opsnetRetrieval.getId()); - dao.create(sbnRetrieval); - retrievalQueue.add(sbnRetrieval.getId()); - } - - /** - * Run the actual retrieval task. - */ - private void runRetrievalTask() { - // Create required strategies for finding, processing, and completing - // retrievals - final IRetrievalsFinder retrievalDataFinder = new PerformRetrievalsThenReturnFinder( - retrievalQueue, dao); - final IRetrievalResponseCompleter retrievalCompleter = new RetrievalResponseCompleter( - mock(SubscriptionNotifyTask.class), dao); - - new RetrievalTask(Network.OPSNET, retrievalDataFinder, - retrievedDataProcessor, retrievalCompleter).run(); - } - - /** - * Verify the retrieval record is in the expected state. - * - * @param retrieval - * the retrieval - * @param state - * the expected state - * @throws DataAccessLayerException - */ - private void verifyCorrectStateForRetrieval( - RetrievalRequestRecord retrieval, State state) - throws DataAccessLayerException { - RetrievalRequestRecord recordInDb = dao - .getRequests(retrieval.getId().getSubscriptionName()) - .iterator().next(); - - assertThat(recordInDb.getState(), is(equalTo(state))); - } - - /** - * This method will be invoked by the EventBus when a data retrieval event - * is sent. - * - * @param event - * the event - */ - @Subscribe - public void receivedDataDeliveryEvent(DataRetrievalEvent event) { - eventsReceived.add(event); - } -}