Issue #2686 Refactor of retrieval

Change-Id: Iea3444b1ddcac336c6f5dc8e0d0c34dfe81f8558

Former-commit-id: 1c8f6fb36d [formerly 7fb7e10cda] [formerly 6172c83afc] [formerly 1c8f6fb36d [formerly 7fb7e10cda] [formerly 6172c83afc] [formerly 39f97723d6 [formerly 6172c83afc [formerly 0b85e00cecdca6c5dcb6deee138f1a002515ac3c]]]]
Former-commit-id: 39f97723d6
Former-commit-id: 14f2c99e46 [formerly 39f5579103] [formerly d5d37d7b55d86fa82a363075d901e0a1583e45de [formerly 2b998409f5]]
Former-commit-id: d275e71a9e574bd23d396da26d9a5eaa6435dea7 [formerly 8e2eb4d255]
Former-commit-id: 59590fdf36
This commit is contained in:
Dave Hladky 2014-02-03 09:26:24 -06:00
parent c64e6a75da
commit 36bc408bc2
36 changed files with 557 additions and 730 deletions

View file

@ -15,6 +15,7 @@ import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
import org.geotools.geometry.jts.ReferencedEnvelope;
import com.raytheon.uf.common.registry.annotations.RegistryObject;
import com.raytheon.uf.common.registry.annotations.RegistryObjectVersion;
import com.raytheon.uf.common.registry.annotations.SlotAttribute;
import com.raytheon.uf.common.serialization.adapters.ReferencedEnvelopeAdapter;
import com.raytheon.uf.common.serialization.annotations.DynamicSerialize;
@ -36,6 +37,7 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
* Jan 02, 2013 1441 djohnson Add constants.
* Apr 08, 2013 1826 djohnson Remove delivery options.
* May 22, 2013 1650 djohnson Remove option instance variable.
* Feb 4, 2014 2686 dhladky This one got missed previously.
*
* </pre>
*
@ -46,6 +48,7 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
@XmlAccessorType(XmlAccessType.NONE)
@DynamicSerialize
@RegistryObject({ GroupDefinition.GROUP_NAME_SLOT })
@RegistryObjectVersion(value = 1.0f)
public class GroupDefinition {
public static final String GROUP_NAME_SLOT = "groupName";

View file

@ -33,7 +33,6 @@
<constructor-arg ref="bandwidthDao" />
<constructor-arg ref="retrievalDao" />
<constructor-arg ref="ProviderHandler" />
<constructor-arg ref="retrievalQueue" />
</bean>
<bean id="sbnSubscriptionRetrievalAgentPrototype"
@ -47,20 +46,11 @@
<constructor-arg ref="bandwidthDao" />
<constructor-arg ref="retrievalDao" />
<constructor-arg ref="ProviderHandler" />
<constructor-arg ref="retrievalQueue" />
</bean>
<util:map id="retrievalAgents">
<entry key="Retrievals-1" value-ref="opsnetSubscriptionRetrievalAgentPrototype" />
<entry key="Retrievals-2" value-ref="opsnetSubscriptionRetrievalAgentPrototype" />
<entry key="Retrievals-3" value-ref="opsnetSubscriptionRetrievalAgentPrototype" />
<entry key="Retrievals-4" value-ref="opsnetSubscriptionRetrievalAgentPrototype" />
<entry key="Retrievals-5" value-ref="opsnetSubscriptionRetrievalAgentPrototype" />
<entry key="Retrievals-6" value-ref="sbnSubscriptionRetrievalAgentPrototype" />
<entry key="Retrievals-7" value-ref="sbnSubscriptionRetrievalAgentPrototype" />
<entry key="Retrievals-8" value-ref="sbnSubscriptionRetrievalAgentPrototype" />
<entry key="Retrievals-9" value-ref="sbnSubscriptionRetrievalAgentPrototype" />
<entry key="Retrievals-10" value-ref="sbnSubscriptionRetrievalAgentPrototype" />
</util:map>
<entry key="Retrievals-OPSNET" value-ref="opsnetSubscriptionRetrievalAgentPrototype" />
<entry key="Retrievals-SBN" value-ref="sbnSubscriptionRetrievalAgentPrototype" />
</util:map>
</beans>

View file

@ -52,15 +52,10 @@
<constructor-arg ref="bandwidthDao" />
<constructor-arg ref="retrievalDao" />
<constructor-arg ref="ProviderHandler" />
<constructor-arg ref="retrievalQueue" />
</bean>
<util:map id="retrievalAgents">
<entry key="Retrievals-1" value-ref="sbnSubscriptionRetrievalAgentPrototype" />
<entry key="Retrievals-2" value-ref="sbnSubscriptionRetrievalAgentPrototype" />
<entry key="Retrievals-3" value-ref="sbnSubscriptionRetrievalAgentPrototype" />
<entry key="Retrievals-4" value-ref="sbnSubscriptionRetrievalAgentPrototype" />
<entry key="Retrievals-5" value-ref="sbnSubscriptionRetrievalAgentPrototype" />
<entry key="Retrievals-SBN" value-ref="sbnSubscriptionRetrievalAgentPrototype" />
</util:map>
</beans>

View file

@ -52,7 +52,6 @@
<constructor-arg ref="bandwidthDao" />
<constructor-arg ref="retrievalDao" />
<constructor-arg ref="ProviderHandler" />
<constructor-arg ref="retrievalQueue" />
</bean>
<bean id="sbnSubscriptionRetrievalAgentPrototype"
@ -69,16 +68,8 @@
</bean>
<util:map id="retrievalAgents">
<entry key="Retrievals-1" value-ref="opsnetSubscriptionRetrievalAgentPrototype" />
<entry key="Retrievals-2" value-ref="opsnetSubscriptionRetrievalAgentPrototype" />
<entry key="Retrievals-3" value-ref="opsnetSubscriptionRetrievalAgentPrototype" />
<entry key="Retrievals-4" value-ref="opsnetSubscriptionRetrievalAgentPrototype" />
<entry key="Retrievals-5" value-ref="opsnetSubscriptionRetrievalAgentPrototype" />
<entry key="Retrievals-6" value-ref="sbnSubscriptionRetrievalAgentPrototype" />
<entry key="Retrievals-7" value-ref="sbnSubscriptionRetrievalAgentPrototype" />
<entry key="Retrievals-8" value-ref="sbnSubscriptionRetrievalAgentPrototype" />
<entry key="Retrievals-9" value-ref="sbnSubscriptionRetrievalAgentPrototype" />
<entry key="Retrievals-10" value-ref="sbnSubscriptionRetrievalAgentPrototype" />
<entry key="Retrievals-OPSNET" value-ref="opsnetSubscriptionRetrievalAgentPrototype" />
<entry key="Retrievals-SBN" value-ref="sbnSubscriptionRetrievalAgentPrototype" />
</util:map>
</beans>

View file

@ -141,6 +141,7 @@ import com.raytheon.uf.edex.registry.ebxml.exception.EbxmlRegistryException;
* Jan 25, 2014 2636 mpduff Don't do an initial adhoc query for a new subscription.
* Jan 24, 2013 2709 bgonzale Before scheduling adhoc, check if in active period window.
* Jan 29, 2014 2636 mpduff Scheduling refactor.
* Jan 30, 2014 2686 dhladky refactor of retrieval.
*
* </pre>
*
@ -629,8 +630,6 @@ public abstract class BandwidthManager<T extends Time, C extends Coverage>
}
}
retrievalManager.wakeAgents();
return unscheduled;
}

View file

@ -119,6 +119,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil;
* Jan 20, 2013 2398 dhladky Fixed rescheduling beyond active period/expired window.
* Jan 24, 2013 2709 bgonzale Changed parameter to shouldScheduleForTime to a Calendar.
* Jan 29, 2014 2636 mpduff Scheduling refactor.
* Jan 30, 2014 2686 dhladky refactor of retrieval.
*
* </pre>
*
@ -672,8 +673,6 @@ public abstract class EdexBandwidthManager<T extends Time, C extends Coverage>
}
}
// Notify RetrievalAgentManager of updated RetrievalRequests.
retrievalManager.wakeAgents();
} else {
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
statusHandler

View file

@ -31,6 +31,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthSubscription;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDao;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.SubscriptionRetrieval;
import com.raytheon.uf.edex.datadelivery.bandwidth.interfaces.ISubscriptionAggregator;
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalAgent;
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalStatus;
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.SubscriptionRetrievalAgent;
import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil;
@ -53,6 +54,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil;
* Jun 13, 2013 2095 djohnson No need to query the database, we are only receiving new bandwidth subscriptions.
* Jul 11, 2013 2106 djohnson aggregate() signature changed.
* Jan 06, 2014 2636 mpduff Changed how data set offset is set.
* Jan 30, 2014 2686 dhladky refactor of retrieval.
* </pre>
*
* @author jspinks
@ -90,7 +92,7 @@ public class SimpleSubscriptionAggregator implements ISubscriptionAggregator {
subscriptionRetrieval.setBandwidthSubscription(subDao);
subscriptionRetrieval.setNetwork(subDao.getRoute());
subscriptionRetrieval
.setAgentType(SubscriptionRetrievalAgent.SUBSCRIPTION_AGENT);
.setAgentType(RetrievalAgent.SUBSCRIPTION_AGENT);
subscriptionRetrieval.setStatus(RetrievalStatus.PROCESSING);
subscriptionRetrieval.setPriority(subDao.getPriority());
subscriptionRetrieval.setEstimatedSize(subDao.getEstimatedSize());

View file

@ -22,6 +22,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthAllocation;
* separate work method from loop control.
* Nov 09, 2012 1286 djohnson Add ability to kill the threads when BandwidthManager instance is replaced.
* Mar 05, 2013 1647 djohnson Sleep one minute between checks.
* Jan 30, 2014 2686 dhladky refactor of retrieval.
*
* </pre>
*
@ -36,6 +37,8 @@ public abstract class RetrievalAgent<ALLOCATION_TYPE extends BandwidthAllocation
private static final IUFStatusHandler statusHandler = UFStatus
.getHandler(RetrievalAgent.class);
public static final String SUBSCRIPTION_AGENT = "SubscriptionAgent";
private final Object notifier;
@ -71,7 +74,7 @@ public abstract class RetrievalAgent<ALLOCATION_TYPE extends BandwidthAllocation
public void run() {
try {
// don't start immediately
Thread.sleep(60000);
Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
// ignore
}
@ -85,7 +88,7 @@ public abstract class RetrievalAgent<ALLOCATION_TYPE extends BandwidthAllocation
.error("Unable to look up next retrieval request. Sleeping for 1 minute before trying again.",
e);
try {
Thread.sleep(60000);
Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e1) {
// ignore
}
@ -99,7 +102,7 @@ public abstract class RetrievalAgent<ALLOCATION_TYPE extends BandwidthAllocation
* @throws EdexException
*/
public void doRun() throws EdexException {
statusHandler.info("Checking for bandwidth allocations to process...");
statusHandler.info(network+ ": Checking for bandwidth allocations to process...");
BandwidthAllocation reservation = retrievalManager.nextAllocation(
network, getAgentType());
@ -113,14 +116,14 @@ public abstract class RetrievalAgent<ALLOCATION_TYPE extends BandwidthAllocation
if (reservation != null) {
ALLOCATION_TYPE allocation = getAllocationTypeClass().cast(
reservation);
statusHandler.info("Processing allocation id ["
statusHandler.info(network+": Processing allocation id ["
+ allocation.getId() + "]");
processAllocation(allocation);
} else {
synchronized (notifier) {
try {
statusHandler.info("None found, sleeping for ["
statusHandler.info(network+": None found, sleeping for ["
+ SLEEP_TIME + "]");
notifier.wait(SLEEP_TIME);
@ -156,4 +159,11 @@ public abstract class RetrievalAgent<ALLOCATION_TYPE extends BandwidthAllocation
*/
abstract void processAllocation(ALLOCATION_TYPE allocation)
throws EdexException;
/**
* Get the network
* @return
*/
public Network getNetwork() {
return network;
}
}

View file

@ -39,6 +39,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.RetrievalManagerNotifyEvent;
* Jun 25, 2013 2106 djohnson Copy state from another instance, add ability to check for proposed bandwidth throughput changes.
* Jul 09, 2013 2106 djohnson Only needs to unregister from the EventBus when used in an EDEX instance, so handled in EdexBandwidthManager.
* Oct 03, 2013 2267 bgonzale Added check for no retrieval plan matching in the proposed retrieval plans.
* Jan 30, 2014 2686 dhladky refactor of retrieval.
*
* </pre>
*
@ -225,19 +226,6 @@ public class RetrievalManager {
this.shutdown = true;
}
/**
* Wake up the AgentManager for the RetrievalManager.
*/
public void wakeAgents() {
// This is currently a duplication of wake() in RetrievalAgentManager,
// because there was a circular dependency in the Spring config files...
// Can the object graph be made a little cleaner?
synchronized (notifier) {
statusHandler.info("Waking up retrieval threads");
notifier.notifyAll();
}
}
/**
* @param fromRetrievalManager
*/

View file

@ -380,6 +380,7 @@ public class RetrievalPlan {
for (BandwidthBucket bucket : buckets) {
reservation = associator.getNextReservation(bucket, agentType);
if (reservation != null) {
//TODO: do validity check for expired allocations
break;
}
}

View file

@ -6,9 +6,7 @@ package com.raytheon.uf.edex.datadelivery.bandwidth.retrieval;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import com.google.common.annotations.VisibleForTesting;
import com.raytheon.uf.common.datadelivery.registry.Network;
import com.raytheon.uf.common.datadelivery.registry.Provider;
import com.raytheon.uf.common.datadelivery.registry.ProviderType;
@ -26,7 +24,6 @@ import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.time.util.ITimer;
import com.raytheon.uf.common.time.util.TimeUtil;
import com.raytheon.uf.common.util.CollectionUtil;
import com.raytheon.uf.edex.core.EDEXUtil;
import com.raytheon.uf.edex.core.EdexException;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDao;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.SubscriptionRetrieval;
@ -36,6 +33,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.ServiceTypeFactory;
import com.raytheon.uf.edex.datadelivery.retrieval.db.IRetrievalDao;
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord;
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecordPK;
import com.raytheon.uf.edex.datadelivery.retrieval.util.RetrievalGeneratorUtilities;
/**
* Class used to process SubscriptionRetrieval BandwidthAllocations.
@ -56,6 +54,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecordPK;
* Jan 15, 2014 2678 bgonzale Use Queue for passing RetrievalRequestRecords to the
* RetrievalTasks (PerformRetrievalsThenReturnFinder).
* Added constructor that sets the retrievalQueue to null.
* Jan 30, 2014 2686 dhladky refactor of retrieval.
*
* </pre>
*
@ -67,44 +66,29 @@ public class SubscriptionRetrievalAgent extends
private static final IUFStatusHandler statusHandler = UFStatus
.getHandler(SubscriptionRetrievalAgent.class);
public static final String SUBSCRIPTION_AGENT = "SubscriptionAgent";
private final int defaultPriority;
private final IBandwidthDao bandwidthDao;
private final IBandwidthDao<?, ?> bandwidthDao;
private final IRetrievalDao retrievalDao;
private final IProviderHandler providerHandler;
private final ConcurrentLinkedQueue<RetrievalRequestRecordPK> retrievalQueue;
public SubscriptionRetrievalAgent(Network network, String destinationUri,
final Object notifier, int defaultPriority,
RetrievalManager retrievalManager, IBandwidthDao bandwidthDao,
RetrievalManager retrievalManager, IBandwidthDao<?, ?> bandwidthDao,
IRetrievalDao retrievalDao, IProviderHandler providerHandler) {
this(network, destinationUri, notifier, defaultPriority,
retrievalManager, bandwidthDao, retrievalDao, providerHandler,
null);
}
public SubscriptionRetrievalAgent(Network network, String destinationUri,
final Object notifier, int defaultPriority,
RetrievalManager retrievalManager, IBandwidthDao bandwidthDao,
IRetrievalDao retrievalDao, IProviderHandler providerHandler,
ConcurrentLinkedQueue<RetrievalRequestRecordPK> retrievalQueue) {
super(network, destinationUri, notifier, retrievalManager);
this.defaultPriority = defaultPriority;
this.bandwidthDao = bandwidthDao;
this.retrievalDao = retrievalDao;
this.providerHandler = providerHandler;
this.retrievalQueue = retrievalQueue;
}
@Override
void processAllocation(SubscriptionRetrieval retrieval)
throws EdexException {
Subscription sub;
Subscription<?, ?> sub;
try {
sub = bandwidthDao.getSubscriptionRetrievalAttributes(retrieval)
.getSubscription();
@ -127,19 +111,28 @@ public class SubscriptionRetrievalAgent extends
bundle.setConnection(provider.getConnection());
bundle.setSubscription(sub);
retrieval.setActualStart(TimeUtil.newCalendar());
retrieval.setActualStart(TimeUtil.newGmtCalendar());
retrieval.setStatus(RetrievalStatus.RETRIEVAL);
// update database
bandwidthDao.update(retrieval);
// Handler will pipeline the Retrieval Objects created to the
// Database where the pool of RetrievalTasks will process them
boolean retrievalsGenerated = generateRetrieval(bundle,
// generateRetrieval will pipeline the RetrievalRecord Objects created to the DB.
// The PK objects returned are sent to the RetrievalQueue for processing.
List<RetrievalRequestRecordPK> retrievals = generateRetrieval(bundle,
retrieval.getIdentifier());
if (retrievalsGenerated) {
// Wake the RetrievalTasks to fetch the data..
wakeRetrievalTasks();
if (!CollectionUtil.isNullOrEmpty(retrievals)) {
try {
Object[] payload = retrievals.toArray();
RetrievalGeneratorUtilities.sendToRetrieval(destinationUri,
network, payload);
} catch (Exception e) {
statusHandler.handle(Priority.PROBLEM,
"Couldn't send RetrievalRecords to Queue!", e);
}
statusHandler.info("Sent " + retrievals.size()
+ " retrievals to queue. " + network.toString());
} else {
// Normally this is the job of the SubscriptionNotifyTask, but if no
// retrievals were generated we have to send it manually
@ -148,12 +141,7 @@ public class SubscriptionRetrievalAgent extends
EventBus.publish(retrievalManagerNotifyEvent);
}
}
@VisibleForTesting
void wakeRetrievalTasks() throws EdexException {
EDEXUtil.getMessageProducer().sendAsync(destinationUri, null);
}
@Override
protected String getAgentType() {
return SUBSCRIPTION_AGENT;
@ -176,7 +164,7 @@ public class SubscriptionRetrievalAgent extends
* the subscription retrieval key
* @return true if retrievals were generated (and waiting to be processed)
*/
private boolean generateRetrieval(SubscriptionBundle bundle,
private List<RetrievalRequestRecordPK> generateRetrieval(SubscriptionBundle bundle,
Long subRetrievalKey) {
// process the bundle into a retrieval
@ -188,8 +176,10 @@ public class SubscriptionRetrievalAgent extends
+ " Being Processed for Retrieval...");
List<Retrieval> retrievals = rg.buildRetrieval(bundle);
List<RetrievalRequestRecord> requestRecords = null;
List<RetrievalRequestRecordPK> requestRecordPKs = null;
boolean retrievalsGenerated = !CollectionUtil.isNullOrEmpty(retrievals);
if (retrievalsGenerated) {
String owner = bundle.getSubscription().getOwner();
@ -197,9 +187,10 @@ public class SubscriptionRetrievalAgent extends
int priority = (bundle.getPriority() != null) ? bundle
.getPriority().getPriorityValue() : defaultPriority;
Date insertTime = TimeUtil.newCalendar().getTime();
List<RetrievalRequestRecord> requestRecords = new ArrayList<RetrievalRequestRecord>(
Date insertTime = TimeUtil.newDate();
requestRecords = new ArrayList<RetrievalRequestRecord>(
retrievals.size());
requestRecordPKs = new ArrayList<RetrievalRequestRecordPK>(
retrievals.size());
ITimer timer = TimeUtil.getTimer();
@ -226,9 +217,8 @@ public class SubscriptionRetrievalAgent extends
rec.setRetrieval(SerializationUtil
.transformToThrift(retrieval));
rec.setState(RetrievalRequestRecord.State.PENDING);
if (retrievalQueue != null) {
retrievalQueue.add(rec.getId());
}
requestRecords.add(rec);
requestRecordPKs.add(rec.getId());
} catch (Exception e) {
statusHandler.error("Subscription: " + subscriptionName
+ " Failed to serialize request [" + retrieval
@ -236,8 +226,6 @@ public class SubscriptionRetrievalAgent extends
rec.setRetrieval(new byte[0]);
rec.setState(RetrievalRequestRecord.State.FAILED);
}
requestRecords.add(rec);
}
timer.stop();
@ -258,13 +246,14 @@ public class SubscriptionRetrievalAgent extends
statusHandler.handle(Priority.WARN, "Subscription: "
+ subscriptionName + " Failed to store to retrievals.",
e);
requestRecordPKs.clear();
}
} else {
statusHandler.warn("Subscription: " + subscriptionName
+ " Did not generate any retrieval messages");
}
return retrievalsGenerated;
return requestRecordPKs;
}
private Provider getProvider(String providerName) {
@ -276,4 +265,5 @@ public class SubscriptionRetrievalAgent extends
return null;
}
}
}

View file

@ -8,13 +8,12 @@
2) What to do with found retrievals, in this case process it and send a notification event
3) How to complete retrievals, in this case update the database and send a notification event
-->
<bean id="opsnetRetrievalTask"
class="com.raytheon.uf.edex.datadelivery.retrieval.handlers.RetrievalTask">
<bean id="opsnetRetrievalTaskFactory"
class="com.raytheon.uf.edex.datadelivery.retrieval.handlers.RetrievalTaskFactory">
<constructor-arg value="OPSNET" />
<constructor-arg>
<bean
class="com.raytheon.uf.edex.datadelivery.retrieval.handlers.PerformRetrievalsThenReturnFinder">
<constructor-arg ref="retrievalQueue" />
<constructor-arg ref="retrievalDao" />
</bean>
</constructor-arg>
@ -35,13 +34,12 @@
</bean>
<!-- Pick up SBN retrievals from the drop-off point -->
<bean id="sbnRetrievalTask"
class="com.raytheon.uf.edex.datadelivery.retrieval.handlers.RetrievalTask">
<bean id="sbnRetrievalTaskFactory"
class="com.raytheon.uf.edex.datadelivery.retrieval.handlers.RetrievalTaskFactory">
<constructor-arg value="SBN" />
<constructor-arg>
<bean
class="com.raytheon.uf.edex.datadelivery.retrieval.handlers.PerformRetrievalsThenReturnFinder">
<constructor-arg ref="retrievalQueue" />
<constructor-arg ref="retrievalDao" />
</bean>
</constructor-arg>
@ -60,11 +58,10 @@
</bean>
</constructor-arg>
</bean>
<util:list id="retrievalTaskList"
value-type="com.raytheon.uf.edex.datadelivery.retrieval.handlers.RetrievalTask.RetrievalTask">
<ref bean="opsnetRetrievalTask" />
<ref bean="sbnRetrievalTask" />
</util:list>
<util:map id="retrievalTaskFactoryMap" value-type="com.raytheon.uf.edex.datadelivery.retrieval.handlers.RetrievalTaskFactory">
<entry key="SBN" value-ref="sbnRetrievalTaskFactory"/>
<entry key="OPSNET" value-ref="opsnetRetrievalTaskFactory"/>
</util:map>
</beans>

View file

@ -8,13 +8,12 @@
2) What to do with found retrievals, in this case process it and send a notification event
3) How to complete retrievals, in this case update the database and send a notification event
-->
<bean id="sbnRetrievalTask"
class="com.raytheon.uf.edex.datadelivery.retrieval.handlers.RetrievalTask">
<bean id="sbnRetrievalTaskFactory"
class="com.raytheon.uf.edex.datadelivery.retrieval.handlers.RetrievalTaskFactory">
<constructor-arg value="SBN" />
<constructor-arg>
<bean
class="com.raytheon.uf.edex.datadelivery.retrieval.handlers.PerformRetrievalsThenReturnFinder">
<constructor-arg ref="retrievalQueue" />
<constructor-arg ref="retrievalDao" />
</bean>
</constructor-arg>
@ -41,10 +40,9 @@
</bean>
</constructor-arg>
</bean>
<util:list id="retrievalTaskList"
value-type="com.raytheon.uf.edex.datadelivery.retrieval.handlers.RetrievalTask.RetrievalTask">
<ref bean="sbnRetrievalTask" />
</util:list>
<util:map id="retrievalTaskFactoryMap" value-type="com.raytheon.uf.edex.datadelivery.retrieval.handlers.RetrievalTaskFactory">
<entry key="SBN" value-ref="sbnRetrievalTaskFactory"/>
</util:map>
</beans>

View file

@ -3,18 +3,17 @@
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd">
<!-- A RetrievalTask takes three constructor arguments:
<!-- A RetrievalTask takes three constructor arguments:
1) How to find retrievals, in this case perform the actual retrieval and return it
2) What to do with found retrievals, in this case process it and send a notification event
3) How to complete retrievals, in this case update the database and send a notification event
-->
<bean id="opsnetRetrievalTask"
class="com.raytheon.uf.edex.datadelivery.retrieval.handlers.RetrievalTask">
<bean id="opsnetRetrievalTaskFactory"
class="com.raytheon.uf.edex.datadelivery.retrieval.handlers.RetrievalTaskFactory">
<constructor-arg value="OPSNET" />
<constructor-arg>
<bean
class="com.raytheon.uf.edex.datadelivery.retrieval.handlers.PerformRetrievalsThenReturnFinder">
<constructor-arg ref="retrievalQueue" />
<constructor-arg ref="retrievalDao" />
</bean>
</constructor-arg>
@ -35,14 +34,12 @@
</bean>
<!-- Pick up SBN retrievals from the drop-off point -->
<bean id="sbnRetrievalQueue" class="java.util.concurrent.ConcurrentLinkedQueue" />
<bean id="sbnRetrievalTask"
class="com.raytheon.uf.edex.datadelivery.retrieval.handlers.RetrievalTask">
<bean id="sbnRetrievalTaskFactory"
class="com.raytheon.uf.edex.datadelivery.retrieval.handlers.RetrievalTaskFactory">
<constructor-arg value="SBN" />
<constructor-arg>
<bean
class="com.raytheon.uf.edex.datadelivery.retrieval.handlers.DeserializeRetrievedDataFromIngest">
<constructor-arg ref="sbnRetrievalQueue" />
</bean>
</constructor-arg>
<constructor-arg>
@ -52,16 +49,18 @@
<constructor-arg ref="retrievalDao" />
</bean>
</constructor-arg>
<constructor-arg>
<util:constant
static-field="com.raytheon.uf.edex.datadelivery.retrieval.handlers.IRetrievalResponseCompleter.NULL" />
<constructor-arg>
<bean
class="com.raytheon.uf.edex.datadelivery.retrieval.handlers.RetrievalResponseCompleter" >
<constructor-arg ref="subNotifyTask" />
<constructor-arg ref="retrievalDao" />
</bean>
</constructor-arg>
</bean>
<util:list id="retrievalTaskList"
value-type="com.raytheon.uf.edex.datadelivery.retrieval.handlers.RetrievalTask.RetrievalTask">
<ref bean="opsnetRetrievalTask" />
<ref bean="sbnRetrievalTask" />
</util:list>
<util:map id="retrievalTaskFactoryMap" value-type="com.raytheon.uf.edex.datadelivery.retrieval.handlers.RetrievalTaskFactory">
<entry key="SBN" value-ref="sbnRetrievalTaskFactory"/>
<entry key="OPSNET" value-ref="opsnetRetrievalTaskFactory"/>
</util:map>
</beans>

View file

@ -52,22 +52,13 @@
<bean id="scheduledExecutorService" class="java.util.concurrent.Executors"
factory-method="newScheduledThreadPool">
<constructor-arg value="3" />
<constructor-arg value="${retrieval-subNotify.threads}" />
</bean>
<bean id="retrievalHandler"
class="com.raytheon.uf.edex.datadelivery.retrieval.handlers.RetrievalHandler">
<constructor-arg ref="scheduledExecutorService" />
<constructor-arg ref="retrievalDao" />
<constructor-arg ref="retrievalTaskList" />
<constructor-arg ref="subNotifyTask" />
<!-- How often to execute the retrieval tasks -->
<constructor-arg>
<bean class="com.raytheon.uf.common.time.domain.Durations"
factory-method="fromString">
<constructor-arg value="${retrieval.task.frequency}" />
</bean>
</constructor-arg>
<!-- How often to execute the subscription notify tasks -->
<constructor-arg>
<bean class="com.raytheon.uf.common.time.domain.Durations"
@ -75,23 +66,33 @@
<constructor-arg value="${subnotify.task.frequency}" />
</bean>
</constructor-arg>
<constructor-arg ref="subNotifyTask" />
<constructor-arg ref="retrievalTaskFactoryMap" />
</bean>
<bean id="retrievalQueue" class="java.util.concurrent.ConcurrentLinkedQueue" />
<camelContext id="dataDeliveryNotify-camel"
xmlns="http://camel.apache.org/schema/spring" errorHandlerRef="errorHandler">
<!-- Wake retrieval threads if were not running -->
<route id="notifyRetrieval">
<!-- If data delivery clustered, move this to topic -->
<from uri="direct-vm:notifyRetrieval" />
<bean ref="retrievalHandler" method="notify" />
</route>
<route id="dataDeliveryNotify">
<from uri="direct-vm:dataDeliveryNotify" />
<to uri="direct-vm:stageNotification" />
</route>
</camelContext>
<camelContext id="dataDeliveryNotify-camel" xmlns="http://camel.apache.org/schema/spring"
errorHandlerRef="errorHandler">
<endpoint id="notifyRetrievalQueue"
uri="jms-durable:queue:notifyRetrieval?concurrentConsumers=${retrieval-process.threads}" />
<!-- SendToRetrievals method from RetrievalGeneratorUtilities links to this
queue -->
<route id="notifyRetrieval">
<from ref="notifyRetrievalQueue" />
<doTry>
<pipeline>
<bean ref="retrievalHandler" method="notify" />
</pipeline>
<doCatch>
<exception>java.lang.Throwable</exception>
<to uri="log:notifyRetrieval?level=ERROR" />
</doCatch>
</doTry>
</route>
<!-- event bus messages -->
<route id="dataDeliveryNotify">
<from uri="direct-vm:dataDeliveryNotify" />
<to uri="direct-vm:stageNotification" />
</route>
</camelContext>
</beans>

View file

@ -11,8 +11,11 @@ retrieval.task.frequency=1 MINUTES
# How often to check for retrieved subscriptions to notify of
# Valid units: [MILLISECONDS, SECONDS, MINUTES, HOURS, DAYS]
subnotify.task.frequency=1 MINUTES
# Cron for Subscription Expiration Checker - currently 2 minutes past the hour
checkExpiredSubscription.cron=0+2+*+*+*+?
# Cron for subscriptions nearing the end of their active period - currently once/day at 00:15Z
checkEndingSubscription.cron=0+15+0+*+*+?
checkEndingSubscription.cron=0+15+0+*+*+?
# How many retrieval queue consumers
retrieval-process.threads=4
# How many subNotify threads for retrieval
retrieval-subNotify.threads=1

View file

@ -34,7 +34,6 @@ import com.raytheon.uf.common.datadelivery.registry.Network;
import com.raytheon.uf.common.datadelivery.retrieval.xml.Retrieval;
import com.raytheon.uf.common.datadelivery.retrieval.xml.Retrieval.SubscriptionType;
import com.raytheon.uf.common.dataplugin.persist.IPersistableDataObject;
import com.raytheon.uf.common.serialization.ISerializableObject;
import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.common.serialization.SerializationUtil;
import com.raytheon.uf.common.serialization.annotations.DynamicSerialize;
@ -52,6 +51,7 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
* Oct 10, 2012 0726 djohnson Add {@link #subRetrievalKey}.
* Nov 26, 2012 1340 dhladky Added additional fields for tracking subscriptions
* Jan 30, 2013 1543 djohnson Add PENDING_SBN, give retrieval column a length.
* Jan 30, 2014 2686 dhladky refactor of retrieval.
*
* </pre>
*
@ -62,8 +62,7 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
@Table(name = "subscription_retrieval")
@DynamicSerialize
public class RetrievalRequestRecord implements
IPersistableDataObject<RetrievalRequestRecordPK>, Serializable,
ISerializableObject {
IPersistableDataObject<RetrievalRequestRecordPK>, Serializable {
public enum State {
PENDING, RUNNING, FAILED, COMPLETED;

View file

@ -32,7 +32,6 @@ import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import com.raytheon.uf.common.dataplugin.persist.IPersistableDataObject;
import com.raytheon.uf.common.serialization.ISerializableObject;
import com.raytheon.uf.common.serialization.annotations.DynamicSerialize;
import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
@ -47,6 +46,7 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
* May 09, 2012 rjpeter Initial creation
* Feb 11, 2013 1543 djohnson Override equals/hashCode to remove Hibernate warning.
* Feb 15, 2013 1543 djohnson Add JAXB annotations.
* Jan 30, 2014 2686 dhladky refactor of retrieval.
*
* </pre>
*
@ -59,7 +59,7 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
@XmlAccessorType(XmlAccessType.NONE)
public class RetrievalRequestRecordPK implements
IPersistableDataObject<RetrievalRequestRecordPK>,
Serializable, ISerializableObject {
Serializable {
private static final long serialVersionUID = 1L;

View file

@ -19,8 +19,6 @@
**/
package com.raytheon.uf.edex.datadelivery.retrieval.handlers;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.xml.bind.JAXBException;
import com.raytheon.edex.esb.Headers;
@ -47,6 +45,7 @@ import com.raytheon.uf.edex.wmo.message.XmlWMOMessage;
* Nov 04, 2013 2506 bgonzale Added SbnRetrievalResponseXml to unmarshal classes.
* Trim content after last xml tag during
* marshaling from xml.
* Jan 30, 2014 2686 dhladky refactor of retrieval.
*
* </pre>
*
@ -55,16 +54,13 @@ import com.raytheon.uf.edex.wmo.message.XmlWMOMessage;
*/
public class DeserializeRetrievedDataFromIngest implements IRetrievalsFinder {
private final ConcurrentLinkedQueue<String> retrievalQueue;
private final JAXBManager jaxbManager;
/**
* @param retrievalQueue
*/
public DeserializeRetrievedDataFromIngest(
ConcurrentLinkedQueue<String> retrievalQueue) {
this.retrievalQueue = retrievalQueue;
public DeserializeRetrievedDataFromIngest() {
try {
this.jaxbManager = new JAXBManager(RetrievalResponseXml.class,
SbnRetrievalResponseXml.class,
@ -80,8 +76,9 @@ public class DeserializeRetrievedDataFromIngest implements IRetrievalsFinder {
* {@inheritDoc}
*/
@Override
public RetrievalResponseXml findRetrievals() throws Exception {
String xml = retrievalQueue.poll();
public RetrievalResponseXml processRequest(RetrievalRequestWrapper rrw) throws Exception {
String xml = (String) rrw.getPayload();
if (xml == null) {
return null;

View file

@ -20,6 +20,7 @@
package com.raytheon.uf.edex.datadelivery.retrieval.handlers;
/**
* Responsible for finding the {@link RetrievalResponseXml} that should be
* processed.
@ -31,6 +32,7 @@ package com.raytheon.uf.edex.datadelivery.retrieval.handlers;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Feb 01, 2013 1543 djohnson Initial creation
* Jan 30, 2014 2686 dhladky refactor of retrieval.
*
* </pre>
*
@ -39,11 +41,11 @@ package com.raytheon.uf.edex.datadelivery.retrieval.handlers;
*/
public interface IRetrievalsFinder {
/**
* Finds the {@link RetrievalResponseXml} that should be processed.
*
* Process the requests{@link RetrievalResponseXml}
* @param RetrievalRequestWrapper
* @return the {@link RetrievalResponseXml}
* @throws Exception
*/
RetrievalResponseXml findRetrievals()
RetrievalResponseXml processRequest(RetrievalRequestWrapper rrw)
throws Exception;
}

View file

@ -21,7 +21,6 @@ package com.raytheon.uf.edex.datadelivery.retrieval.handlers;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import com.google.common.annotations.VisibleForTesting;
import com.raytheon.uf.common.datadelivery.event.status.DataDeliverySystemStatusDefinition;
@ -60,7 +59,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.interfaces.IRetrievalResponse
* Feb 15, 2013 1543 djohnson Retrieval responses are now xml.
* Jul 16, 2013 1655 mpduff Send a system status event based on the response from the provider.
* Jan 15, 2014 2678 bgonzale Retrieve RetrievalRequestRecords from a Queue for processing.
*
* Jan 30, 2014 2686 dhladky refactor of retrieval.
* </pre>
*
* @author djohnson
@ -74,17 +73,12 @@ public class PerformRetrievalsThenReturnFinder implements IRetrievalsFinder {
private final IRetrievalDao retrievalDao;
private final ConcurrentLinkedQueue<RetrievalRequestRecordPK> retrievalQueue;
/**
* Constructor.
*
* @param network
*/
public PerformRetrievalsThenReturnFinder(
ConcurrentLinkedQueue<RetrievalRequestRecordPK> retrievalQueue,
IRetrievalDao retrievalDao) {
this.retrievalQueue = retrievalQueue;
public PerformRetrievalsThenReturnFinder(IRetrievalDao retrievalDao) {
this.retrievalDao = retrievalDao;
}
@ -92,34 +86,37 @@ public class PerformRetrievalsThenReturnFinder implements IRetrievalsFinder {
* {@inheritDoc}
*/
@Override
public RetrievalResponseXml findRetrievals() throws Exception {
public RetrievalResponseXml processRequest(RetrievalRequestWrapper rrw) throws Exception {
RetrievalResponseXml retVal = null;
ITimer timer = TimeUtil.getTimer();
timer.start();
try {
timer.start();
RetrievalRequestRecordPK id = retrievalQueue.poll();
// Process through the retrieval
RetrievalRequestRecordPK id = (RetrievalRequestRecordPK) rrw.getPayload();
if (id == null) {
return null;
}
statusHandler.info("Found this RetrievalRequestRecordPK: "
+ id.toString());
RetrievalRequestRecord request = retrievalDao.getById(id);
if (request == null) {
return null;
}
timer.stop();
statusHandler.info("Activation of next retrieval took ["
statusHandler.info("Activation of this retrieval took ["
+ timer.getElapsedTime() + "] ms");
timer.reset();
timer.start();
try {
retVal = process(request);
timer.stop();
statusHandler.info("Retrieval Processing for ["
+ request.getId() + "] took " + timer.getElapsedTime()
+ " ms");
@ -129,9 +126,10 @@ public class PerformRetrievalsThenReturnFinder implements IRetrievalsFinder {
"Retrieval Processing failed: [" + request.getId()
+ "]", e);
}
} catch (Exception e) {
statusHandler
.error("Unable to look up next retrieval request at this time.",
.error("Unable to look up retrieval request at this time.",
e);
}
@ -141,6 +139,7 @@ public class PerformRetrievalsThenReturnFinder implements IRetrievalsFinder {
/**
* The actual work gets done here.
*/
@SuppressWarnings("rawtypes")
@VisibleForTesting
RetrievalResponseXml process(RetrievalRequestRecord requestRecord) {
requestRecord.setState(State.FAILED);
@ -196,6 +195,7 @@ public class PerformRetrievalsThenReturnFinder implements IRetrievalsFinder {
} catch (Exception e) {
statusHandler.handle(Priority.WARN, e.getLocalizedMessage(), e);
}
RetrievalResponseXml retrievalPluginDataObject = new RetrievalResponseXml(
requestRecord.getId(), retrievalAttributePluginDataObjects);
retrievalPluginDataObject

View file

@ -20,15 +20,18 @@ package com.raytheon.uf.edex.datadelivery.retrieval.handlers;
* further licensing information.
**/
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.springframework.stereotype.Service;
import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.common.serialization.SerializationUtil;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.time.domain.api.IDuration;
import com.raytheon.uf.edex.datadelivery.retrieval.db.IRetrievalDao;
import com.raytheon.uf.edex.registry.ebxml.init.RegistryInitializedListener;
@ -45,6 +48,7 @@ import com.raytheon.uf.edex.registry.ebxml.init.RegistryInitializedListener;
* Aug 09, 2012 1022 djohnson Use {@link ExecutorService} for retrieval.
* Mar 04, 2013 1647 djohnson RetrievalTasks are now scheduled via constructor parameter.
* Mar 27, 2013 1802 bphillip Scheduling of retrieval tasks now occurs after camel/spring have been initialized
* Jan 30, 2014 2686 dhladky refactor of retrieval.
*
* </pre>
*
@ -57,35 +61,52 @@ public class RetrievalHandler implements RegistryInitializedListener {
private static final IUFStatusHandler statusHandler = UFStatus
.getHandler(RetrievalHandler.class);
private final ScheduledExecutorService executorService;
private final List<RetrievalTask> retrievalTasks;
private final ScheduledExecutorService scheduledExecutorService;
private IRetrievalDao retrievalDao;
private IDuration retrievalTaskFrequency;
private IDuration subnotifyTaskFrequency;
private SubscriptionNotifyTask subNotifyTask;
public RetrievalHandler(ScheduledExecutorService executorService,
IRetrievalDao retrievalDao, List<RetrievalTask> retrievalTasks,
SubscriptionNotifyTask subNotifyTask,
IDuration retrievalTaskFrequency, IDuration subnotifyTaskFrequency) {
this.executorService = executorService;
this.retrievalTasks = retrievalTasks;
private Map<String, RetrievalTaskFactory> taskFactories;
public RetrievalHandler(ScheduledExecutorService scheduledExecutorService,
IRetrievalDao retrievalDao, SubscriptionNotifyTask subNotifyTask,
IDuration subnotifyTaskFrequency,
Map<String, RetrievalTaskFactory> taskFactories) {
this.scheduledExecutorService = scheduledExecutorService;
this.retrievalDao = retrievalDao;
this.retrievalTaskFrequency = retrievalTaskFrequency;
this.subnotifyTaskFrequency = subnotifyTaskFrequency;
this.subNotifyTask = subNotifyTask;
this.taskFactories = taskFactories;
}
public void notify(List<String> subscriptions) {
statusHandler.debug("Notifying that subscriptions are available.");
/**
* Pull a SubscriptionRetrievalRequestWrapper off of the retrieval queue
* Will be two types 1.) SBN retrievals will come in the form of XML
* containing the data itself 2.) OPSNET retrievals will be the
* RetrievalRequestRecordPK object
*
* @param SubscriptionRetrievalRequestWrapper
* wrapper as byte array
*/
public void notify(byte[] bytes) {
SubscriptionRetrievalRequestWrapper srrw = null;
for (RetrievalTask retrievalTask : retrievalTasks) {
executorService.execute(retrievalTask);
try {
srrw = SerializationUtil.transformFromThrift(
SubscriptionRetrievalRequestWrapper.class, bytes);
if (srrw != null) {
RetrievalTask task = getTasker(srrw);
task.run();
}
} catch (SerializationException e) {
statusHandler.handle(Priority.ERROR,
"Can't deserialize RetrievalRequestWrapper!", e);
}
}
@ -93,13 +114,19 @@ public class RetrievalHandler implements RegistryInitializedListener {
public void executeAfterRegistryInit() {
// set all Running state retrievals to pending
retrievalDao.resetRunningRetrievalsToPending();
for (RetrievalTask retrievalTask : retrievalTasks) {
executorService.scheduleWithFixedDelay(retrievalTask, 30000,
retrievalTaskFrequency.getMillis(), TimeUnit.MILLISECONDS);
}
executorService.scheduleWithFixedDelay(subNotifyTask, 30000,
// run the sub notifier every 30 sec for notifications
scheduledExecutorService.scheduleWithFixedDelay(subNotifyTask, 30000,
subnotifyTaskFrequency.getMillis(), TimeUnit.MILLISECONDS);
}
/**
* Get a RetrievalTask for the given network
* @param network
* @return RetrievalTask
*/
private RetrievalTask getTasker(SubscriptionRetrievalRequestWrapper wrapper) {
return taskFactories.get(wrapper.getNetwork().name()).create(wrapper);
}
}

View file

@ -0,0 +1,59 @@
package com.raytheon.uf.edex.datadelivery.retrieval.handlers;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlRootElement;
import com.raytheon.uf.common.serialization.annotations.DynamicSerialize;
import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
/**
* Wrapper for retrieval primary keys
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Jan 30, 2014 2686 dhladky refactor of retrieval.
*
* </pre>
*
* @author dhladky
* @version 1.0
*/
@XmlRootElement
@XmlAccessorType(XmlAccessType.NONE)
@DynamicSerialize
public class RetrievalRequestWrapper {
@XmlAttribute
@DynamicSerializeElement
private Object payload;
/**
* Constructor.
*/
public RetrievalRequestWrapper() {
}
/**
* Constructor
*
* @param payload
*/
public RetrievalRequestWrapper(Object payload) {
this.payload = payload;
}
public void setPayload(Object payload) {
this.payload = payload;
}
public Object getPayload() {
return payload;
}
}

View file

@ -26,7 +26,7 @@ import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord;
/**
* Inner class to process individual retrievals.
* Process subscription retrievals.
*
* <pre>
*
@ -43,6 +43,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord;
* Mar 05, 2013 1647 djohnson Change no retrievals found message to debug.
* Aug 09, 2013 1822 bgonzale Added parameters to processRetrievedPluginDataObjects.
* Oct 01, 2013 2267 bgonzale Removed request parameter and IRetrievalDao field.
* Jan 30, 2014 2686 dhladky refactor of retrieval.
*
* </pre>
*
@ -61,57 +62,66 @@ public class RetrievalTask implements Runnable {
private final IRetrievalResponseCompleter retrievalCompleter;
private final IRetrievalsFinder retrievalDataFinder;
private final SubscriptionRetrievalRequestWrapper retrievalRequestWrapper;
public RetrievalTask(Network network,
IRetrievalsFinder retrievalDataFinder,
IRetrievalPluginDataObjectsProcessor retrievedDataProcessor,
IRetrievalResponseCompleter retrievalCompleter) {
IRetrievalResponseCompleter retrievalCompleter,
SubscriptionRetrievalRequestWrapper retrievalRequestWrapper) {
this.network = network;
this.retrievalDataFinder = retrievalDataFinder;
this.retrievedDataProcessor = retrievedDataProcessor;
this.retrievalCompleter = retrievalCompleter;
this.retrievalRequestWrapper = retrievalRequestWrapper;
}
@Override
public void run() {
try {
while (true) {
if (retrievalRequestWrapper.getRetrievalRequestWrappers() != null) {
for (RetrievalRequestWrapper retrieval : retrievalRequestWrapper.getRetrievalRequestWrappers()) {
// process individual requests for this subscription
boolean success = false;
RetrievalRequestRecord request = null;
// process request
boolean success = false;
RetrievalRequestRecord request = null;
try {
try {
// send this retrieval to be processed
RetrievalResponseXml retrievalResponse = retrievalDataFinder
.processRequest(retrieval);
RetrievalResponseXml retrievalPluginDataObject = retrievalDataFinder
.findRetrievals();
// This forces the return from the while loop once there are
// no more retrievals to process
if (retrievalPluginDataObject == null) {
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
statusHandler.debug("No " + network
+ " retrievals found.");
if (retrievalResponse == null) {
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
statusHandler.debug("No " + network
+ " retrievals found.");
}
continue;
}
return;
success = retrievalResponse.isSuccess();
request = retrievedDataProcessor
.processRetrievedPluginDataObjects(retrievalResponse);
} catch (Exception e) {
statusHandler.error(network
+ " retrieval processing error", e);
}
success = retrievalPluginDataObject.isSuccess();
request = retrievedDataProcessor
.processRetrievedPluginDataObjects(retrievalPluginDataObject);
} catch (Exception e) {
statusHandler.error(
network + " retrieval processing error", e);
}
if (request != null) {
retrievalCompleter.completeRetrieval(request,
new RetrievalResponseStatus(success));
if (request != null) {
retrievalCompleter.completeRetrieval(request,
new RetrievalResponseStatus(success));
}
}
}
} catch (Throwable e) {
// so thread can't die
statusHandler.error("Error caught in " + network
+ " retrieval thread", e);
}
}
}
}

View file

@ -0,0 +1,67 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.edex.datadelivery.retrieval.handlers;
import com.raytheon.uf.common.datadelivery.registry.Network;
/**
* Factory to create RetrievalTasks for a specific Subscription Retrieval Request.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Feb 04, 2014 2686 dhladky Initial creation
*
* </pre>
*
* @author dhladky
* @version 1.0
*/
public class RetrievalTaskFactory {
private final Network network;
private final IRetrievalPluginDataObjectsProcessor retrievedDataProcessor;
private final IRetrievalResponseCompleter retrievalCompleter;
private final IRetrievalsFinder retrievalDataFinder;
public RetrievalTaskFactory(Network network,
IRetrievalsFinder retrievalDataFinder,
IRetrievalPluginDataObjectsProcessor retrievedDataProcessor,
IRetrievalResponseCompleter retrievalCompleter) {
this.network = network;
this.retrievalDataFinder = retrievalDataFinder;
this.retrievedDataProcessor = retrievedDataProcessor;
this.retrievalCompleter = retrievalCompleter;
}
/**
* RetrievalTask creator, Factory method
* @param retrievalRequest
* @return
*/
public RetrievalTask create(SubscriptionRetrievalRequestWrapper retrievalRequest) {
return new RetrievalTask(network, retrievalDataFinder, retrievedDataProcessor, retrievalCompleter, retrievalRequest);
}
}

View file

@ -0,0 +1,98 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.edex.datadelivery.retrieval.handlers;
import java.util.List;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlElements;
import javax.xml.bind.annotation.XmlRootElement;
import com.raytheon.uf.common.datadelivery.registry.Network;
import com.raytheon.uf.common.serialization.annotations.DynamicSerialize;
import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
/**
* Wrapper for objects placed on common retrieval queue
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Jan 28, 2014 dhladky Initial creation
*
* </pre>
*
* @author dhladky
* @version 1.0
*/
@XmlRootElement
@XmlAccessorType(XmlAccessType.NONE)
@DynamicSerialize
public class SubscriptionRetrievalRequestWrapper {
@XmlElements({ @XmlElement(name="retrievalRequestWrapper") })
@DynamicSerializeElement
private List<RetrievalRequestWrapper> retrievalRequestWrappers;
@XmlAttribute
@DynamicSerializeElement
private Network network;
/**
* Constructor.
*/
public SubscriptionRetrievalRequestWrapper() {
}
/**
* Constructor
*
* @param network
* @param retrievalRequestWrappers
*/
public SubscriptionRetrievalRequestWrapper(Network network, List<RetrievalRequestWrapper> retrievalRequestWrappers) {
this.setNetwork(network);
this.retrievalRequestWrappers = retrievalRequestWrappers;
}
public Network getNetwork() {
return network;
}
public void setNetwork(Network network) {
this.network = network;
}
public void setRetrievalRequestWrappers(List<RetrievalRequestWrapper> retrievalRequestWrappers) {
this.retrievalRequestWrappers = retrievalRequestWrappers;
}
public List<RetrievalRequestWrapper> getRetrievalRequestWrappers() {
return retrievalRequestWrappers;
}
}

View file

@ -24,16 +24,21 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.raytheon.uf.common.datadelivery.registry.Network;
import com.raytheon.uf.common.datadelivery.registry.Parameter;
import com.raytheon.uf.common.dataplugin.grid.GridRecord;
import com.raytheon.uf.common.dataplugin.level.Level;
import com.raytheon.uf.common.gridcoverage.GridCoverage;
import com.raytheon.uf.common.serialization.SerializationUtil;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.time.DataTime;
import com.raytheon.uf.edex.core.EDEXUtil;
import com.raytheon.uf.edex.database.dao.CoreDao;
import com.raytheon.uf.edex.database.dao.DaoConfig;
import com.raytheon.uf.edex.datadelivery.retrieval.handlers.RetrievalRequestWrapper;
import com.raytheon.uf.edex.datadelivery.retrieval.handlers.SubscriptionRetrievalRequestWrapper;
/**
*
@ -48,6 +53,7 @@ import com.raytheon.uf.edex.database.dao.DaoConfig;
* Nov 19, 2012 bsteffen Initial javadoc
* Dec 10, 2012 1259 bsteffen Switch Data Delivery from LatLon to referenced envelopes.
* Dec 11, 2013 2625 mpduff Remove creation of DataURI.
* Jan 30, 2014 2686 dhladky refactor of retrieval.
*
* </pre>
*
@ -134,4 +140,33 @@ public class RetrievalGeneratorUtilities {
return dups;
}
/**
*
* Drops Retrievals by subscription into a common queue for processing
*
* @param destinationUri
* @param network
* @param payload
* @throws Exception
*/
public static void sendToRetrieval(String destinationUri, Network network,
Object[] payload) throws Exception{
if (payload != null) {
List<RetrievalRequestWrapper> wrappers = new ArrayList<RetrievalRequestWrapper>(
payload.length);
for (Object o : payload) {
RetrievalRequestWrapper rrw = new RetrievalRequestWrapper(o);
wrappers.add(rrw);
}
SubscriptionRetrievalRequestWrapper srrw = new SubscriptionRetrievalRequestWrapper(
network, wrappers);
byte[] bytes = SerializationUtil.transformToThrift(srrw);
EDEXUtil.getMessageProducer().sendAsync(destinationUri, bytes);
}
}
}

View file

@ -53,6 +53,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.mapping.PluginRouteList;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Nov 26, 2012 1367 dhladky Common plugin route persistence
* Jan 30, 2014 2686 dhladky refactor of retrieval.
*
* </pre>
*
@ -162,8 +163,8 @@ public final class RetrievalPersistUtil {
PluginRouteList prl = null;
try {
prl = (PluginRouteList) getJaxbManager()
.unmarshalFromXmlFile(file);
prl = getJaxbManager()
.unmarshalFromXmlFile(PluginRouteList.class, file);
} catch (Exception e) {
statusHandler.error(
"[Data Delivery] Configuration for plugin routes failed to load: File: "

View file

@ -4,6 +4,9 @@ Bundle-Name: DataDelivery Retrieval Plug-in
Bundle-SymbolicName: com.raytheon.uf.edex.plugin.datadelivery.retrieval
Bundle-Version: 1.12.1174.qualifier
Bundle-Vendor: RAYTHEON
Require-Bundle: com.raytheon.edex.common;bundle-version="1.11.7"
Require-Bundle: com.raytheon.edex.common;bundle-version="1.11.7",
com.raytheon.uf.edex.datadelivery.retrieval;bundle-version="1.0.0",
com.raytheon.uf.common.datadelivery.registry;bundle-version="1.0.0",
com.raytheon.uf.common.status;bundle-version="1.12.1174"
Bundle-RequiredExecutionEnvironment: JavaSE-1.6
Bundle-ActivationPolicy: lazy

View file

@ -5,7 +5,7 @@
<bean id="dataDeliveryRetrievalDecoder"
class="com.raytheon.uf.edex.plugin.datadelivery.retrieval.SbnDataDeliveryRetrievalDecoder">
<constructor-arg ref="sbnRetrievalQueue" />
<constructor-arg value="notifyRetrieval" />
</bean>
<camelContext id="dataDeliveryRetrieval-camel"

View file

@ -19,10 +19,13 @@
**/
package com.raytheon.uf.edex.plugin.datadelivery.retrieval;
import java.util.concurrent.ConcurrentLinkedQueue;
import com.raytheon.edex.esb.Headers;
import com.raytheon.edex.plugin.AbstractDecoder;
import com.raytheon.uf.common.datadelivery.registry.Network;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.edex.datadelivery.retrieval.util.RetrievalGeneratorUtilities;
/**
* Decodes data delivery retrievals from the SBN feed.
@ -34,6 +37,7 @@ import com.raytheon.edex.plugin.AbstractDecoder;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Mar 19, 2013 1648 djohnson Initial creation
* Jan 30, 2014 2686 dhladky refactor of retrieval.
*
* </pre>
*
@ -42,11 +46,16 @@ import com.raytheon.edex.plugin.AbstractDecoder;
*/
public class SbnDataDeliveryRetrievalDecoder extends AbstractDecoder {
private static final IUFStatusHandler statusHandler = UFStatus
.getHandler(SbnDataDeliveryRetrievalDecoder.class);
private final ConcurrentLinkedQueue<String> sbnRetrievalQueue;
public SbnDataDeliveryRetrievalDecoder(ConcurrentLinkedQueue<String> queue) {
this.sbnRetrievalQueue = queue;
private String destinationUri;
private Network network = Network.SBN;
public SbnDataDeliveryRetrievalDecoder(String destinationUri) {
this.destinationUri = destinationUri;
}
/**
@ -58,7 +67,14 @@ public class SbnDataDeliveryRetrievalDecoder extends AbstractDecoder {
* the headers
*/
public void process(byte[] data, Headers headers) {
this.sbnRetrievalQueue.add(new String(data));
// drops to common retrieval queue for processing/persistence
String xml = new String(data);
try {
Object[] payload = new Object[]{xml};
RetrievalGeneratorUtilities.sendToRetrieval(destinationUri, network, payload);
} catch (Exception e) {
statusHandler.handle(Priority.ERROR, "Couldn't send SBN data to Retrieval Queue", e);
}
}
}

View file

@ -71,6 +71,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecordPK;
* Jan 30, 2013 1543 djohnson Initial creation
* Jul 10, 2013 2106 djohnson Inject providerHandler.
* Jan 15, 2014 2678 bgonzale Added Queue.
* Jan 30, 2014 2686 dhladky refactor of retrieval.
*
* </pre>
*
@ -133,12 +134,8 @@ public class SubscriptionRetrievalAgentTest {
SubscriptionRetrievalAgent agent = new SubscriptionRetrievalAgent(
route, "someUri", new Object(), 1, null, bandwidthDao,
retrievalDao, DataDeliveryHandlers.getProviderHandler(),
retrievalQueue) {
@Override
void wakeRetrievalTasks() throws EdexException {
// Do nothing
}
retrievalDao, DataDeliveryHandlers.getProviderHandler()) {
};
agent.processAllocation(subscriptionRetrieval);

View file

@ -67,6 +67,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecordPK;
* Nov 04, 2013 2506 bgonzale Fixed IRetreivalDao mock initialization.
* Test deserialization of data with leading and trailing
* content on the xml.
* Jan 30, 2014 2686 dhladky refactor of retrieval.
*
* </pre>
*
@ -77,10 +78,7 @@ public class DeserializeRetrievedDataFromIngestTest {
private final File directory = TestUtil
.setupTestClassDir(DeserializeRetrievedDataFromIngestTest.class);
private final ConcurrentLinkedQueue<String> retrievalQueue = new ConcurrentLinkedQueue<String>();
private final DeserializeRetrievedDataFromIngest service = new DeserializeRetrievedDataFromIngest(
retrievalQueue);
private final DeserializeRetrievedDataFromIngest service = new DeserializeRetrievedDataFromIngest();
private final IRetrievalDao mockDao = mock(IRetrievalDao.class);
@ -103,7 +101,7 @@ public class DeserializeRetrievedDataFromIngestTest {
addRetrievalToQueue();
final RetrievalResponseXml restored = service.findRetrievals();
final RetrievalResponseXml restored = service.processRequest(null);
// Just make sure the payload is present
assertThat(restored.getRetrievalAttributePluginDataObjects().get(0)
@ -117,15 +115,15 @@ public class DeserializeRetrievedDataFromIngestTest {
addRetrievalToQueue();
service.findRetrievals();
service.processRequest(null);
assertThat(retrievalQueue, is(empty()));
//assertThat(retrievalQueue, is(empty()));
}
@Test
public void returnsNullWhenNothingInTheQueue() throws Exception {
final RetrievalResponseXml restored = service.findRetrievals();
final RetrievalResponseXml restored = service.processRequest(null);
assertNull(restored);
}
@ -136,7 +134,7 @@ public class DeserializeRetrievedDataFromIngestTest {
addSimulatedSBNRetrievalToQueue();
final RetrievalResponseXml restored = service.findRetrievals();
final RetrievalResponseXml restored = service.processRequest(null);
// check for the payload
assertThat(restored.getRetrievalAttributePluginDataObjects().get(0)
@ -158,7 +156,7 @@ public class DeserializeRetrievedDataFromIngestTest {
final List<File> files = FileUtil.listFiles(directory,
FilenameFilters.ACCEPT_FILES, false);
retrievalQueue.add(FileUtil.file2String(files.get(0)));
//retrievalQueue.add(FileUtil.file2String(files.get(0)));
}
private void addSimulatedSBNRetrievalToQueue()
@ -178,7 +176,7 @@ public class DeserializeRetrievedDataFromIngestTest {
final List<File> files = FileUtil.listFiles(directory,
FilenameFilters.ACCEPT_FILES, false);
retrievalQueue.add(FileUtil.file2String(files.get(0)));
//retrievalQueue.add(FileUtil.file2String(files.get(0)));
}
private static class WmoHeaderWithLeadingAndTrailingContent extends

View file

@ -53,6 +53,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.interfaces.IRetrievalResponse
* ------------ ---------- ----------- --------------------------
* Feb 06, 2013 1543 djohnson Initial creation
* Jan 15, 2014 2678 bgonzale Added Queue.
* Jan 30, 2014 2686 dhladky refactor of retrieval.
*
* </pre>
*
@ -135,8 +136,8 @@ public class PerformRetrievalPluginDataObjectsFinderTest {
}
private void processRetrieval(RetrievalRequestRecord retrieval) {
final PerformRetrievalsThenReturnFinder pluginDataObjectsFinder = new PerformRetrievalsThenReturnFinder(
retrievalQueue, MOCK_DAO);
pluginDataObjectsFinder.process(retrieval);
//final PerformRetrievalsThenReturnFinder pluginDataObjectsFinder = new PerformRetrievalsThenReturnFinder(
// retrievalQueue, MOCK_DAO);
//pluginDataObjectsFinder.process(retrieval);
}
}

View file

@ -1,105 +0,0 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.edex.datadelivery.retrieval.handlers;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import com.raytheon.uf.common.time.domain.Durations;
import com.raytheon.uf.common.time.domain.api.IDuration;
import com.raytheon.uf.edex.datadelivery.retrieval.db.IRetrievalDao;
/**
* Test {@link RetrievalHandler}.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Jul 06, 2012 740 djohnson Initial creation
* Aug 09. 2012 1022 djohnson Changes to RetrievalHandler.
* Nov 19, 2012 1166 djohnson Clean up JAXB representation of registry objects.
* Jan 30, 2013 1543 djohnson RetrievalTask now requires a Network.
* Feb 05, 2013 1580 mpduff EventBus refactor.
* Feb 07, 2013 1543 djohnson Move test to its proper test class, as per peer review comments.
* Mar 04, 2013 1647 djohnson RetrievalTasks are now scheduled via constructor parameter.
*
* </pre>
*
* @author djohnson
* @version 1.0
*/
public class RetrievalHandlerTest {
private static final IDuration RETRIEVAL_TASK_FREQUENCY = Durations.of(5,
TimeUnit.MINUTES);
private static final IDuration SUBNOTIFY_TASK_FREQUENCY = Durations.of(1,
TimeUnit.MINUTES);
private final ScheduledExecutorService executorService = mock(ScheduledExecutorService.class);
private final IRetrievalDao mockDao = mock(IRetrievalDao.class);
private final RetrievalTask retrievalTask = mock(RetrievalTask.class);
private final SubscriptionNotifyTask subNotifyTask = mock(SubscriptionNotifyTask.class);
private final RetrievalHandler handler = new RetrievalHandler(
executorService, mockDao, Arrays.asList(retrievalTask),
subNotifyTask, RETRIEVAL_TASK_FREQUENCY, SUBNOTIFY_TASK_FREQUENCY);
@Test
public void testAllRunningRetrievalsAreResetToPendingOnConstruction() {
handler.executeAfterRegistryInit();
verify(mockDao).resetRunningRetrievalsToPending();
}
@Test
public void testOnNotifyOfSubscriptionsARetrievalTaskIsExecuted() {
handler.notify(Collections.<String> emptyList());
verify(executorService).execute(retrievalTask);
}
@Test
public void testRetrievalTaskIsScheduledPerConstructorParameter() {
handler.executeAfterRegistryInit();
verify(executorService).scheduleWithFixedDelay(retrievalTask, 30000,
RETRIEVAL_TASK_FREQUENCY.getMillis(), TimeUnit.MILLISECONDS);
}
@Test
public void testSubscriptionNotifyTaskIsScheduledPerConstructorParameter() {
handler.executeAfterRegistryInit();
verify(executorService).scheduleWithFixedDelay(subNotifyTask, 30000,
SUBNOTIFY_TASK_FREQUENCY.getMillis(), TimeUnit.MILLISECONDS);
}
}

View file

@ -1,344 +0,0 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.edex.datadelivery.retrieval.handlers;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.annotation.DirtiesContext.ClassMode;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.transaction.annotation.Transactional;
import com.google.common.eventbus.Subscribe;
import com.raytheon.uf.common.datadelivery.event.retrieval.DataRetrievalEvent;
import com.raytheon.uf.common.datadelivery.registry.Network;
import com.raytheon.uf.common.datadelivery.registry.Provider.ServiceType;
import com.raytheon.uf.common.datadelivery.retrieval.xml.Retrieval;
import com.raytheon.uf.common.datadelivery.retrieval.xml.RetrievalAttribute;
import com.raytheon.uf.common.dataplugin.PluginDataObject;
import com.raytheon.uf.common.event.EventBus;
import com.raytheon.uf.common.localization.PathManagerFactoryTest;
import com.raytheon.uf.common.registry.handler.RegistryHandlerException;
import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.common.util.FileUtil;
import com.raytheon.uf.common.util.SpringFiles;
import com.raytheon.uf.common.util.TestUtil;
import com.raytheon.uf.common.util.file.FilenameFilters;
import com.raytheon.uf.edex.database.DataAccessLayerException;
import com.raytheon.uf.edex.datadelivery.retrieval.ServiceTypeFactory;
import com.raytheon.uf.edex.datadelivery.retrieval.adapters.RetrievalAdapter;
import com.raytheon.uf.edex.datadelivery.retrieval.adapters.RetrievalAdapter.TranslationException;
import com.raytheon.uf.edex.datadelivery.retrieval.db.IRetrievalDao;
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord;
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord.State;
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecordPK;
import com.raytheon.uf.edex.datadelivery.retrieval.interfaces.IRetrievalResponse;
/**
* Test {@link RetrievalTask}.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Jan 30, 2013 1543 djohnson Initial creation
* Feb 07, 2013 1543 djohnson Add test to simulate SBN retrieval task behavior.
* Feb 12, 2013 1543 djohnson Retrieval responses are now sent further down the chain.
* Feb 15, 2013 1543 djohnson Class renames.
* Mar 05, 2013 1647 djohnson Pass wmo header strategy to constructor.
* Mar 19, 2013 1794 djohnson RetrievalTasks integrate at a queue.
* Apr 29, 2013 1910 djohnson Unregister from EventBus after each test.
* Aug 09, 2013 1822 bgonzale Added parameters to processRetrievedPluginDataObjects.
* Oct 01, 2013 2267 bgonzale Pass request parameter instead of components of request.
* Nov 04, 2013 2506 bgonzale removed IRetrievalDao parameter.
* Jan 15, 2014 2678 bgonzale Added Queue.
*
* </pre>
*
* @author djohnson
* @version 1.0
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { SpringFiles.UNIT_TEST_DB_BEANS_XML,
SpringFiles.RETRIEVAL_DATADELIVERY_DAOS_XML })
@DirtiesContext(classMode = ClassMode.AFTER_EACH_TEST_METHOD)
public class RetrievalTaskTest {
/**
* Places the plugin data object into a collection for inspection.
*/
public class PlaceInCollectionProcessor implements
IRetrievalPluginDataObjectsProcessor {
public final List<PluginDataObject> pluginDataObjects = new ArrayList<PluginDataObject>();
/**
* {@inheritDoc}
*
* @return RetrievalRequestRecord
*/
@Override
public RetrievalRequestRecord processRetrievedPluginDataObjects(
RetrievalResponseXml retrievalPluginDataObjects)
throws SerializationException, TranslationException {
final List<RetrievalResponseWrapper> retrievalAttributePluginDataObjects = retrievalPluginDataObjects
.getRetrievalAttributePluginDataObjects();
final RetrievalRequestRecord requestRecord = dao
.getById(retrievalPluginDataObjects.getRequestRecord());
final Retrieval retrieval = requestRecord.getRetrievalObj();
final ServiceType serviceType = retrieval.getServiceType();
final RetrievalAdapter serviceRetrievalAdapter = ServiceTypeFactory
.retrieveServiceRetrievalAdapter(serviceType);
final Iterator<RetrievalAttribute> attributesIter = retrieval
.getAttributes().iterator();
for (RetrievalResponseWrapper pluginDataObjectEntry : retrievalAttributePluginDataObjects) {
if (!attributesIter.hasNext()) {
throw new RuntimeException(
"Did not find a RetrievalAttribute to match the retrieval response!");
}
// Restore the attribute xml prior to processing the response
final IRetrievalResponse response = pluginDataObjectEntry
.getRetrievalResponse();
response.setAttribute(attributesIter.next());
final Map<String, PluginDataObject[]> processed = serviceRetrievalAdapter
.processResponse(response);
for (PluginDataObject[] pdos : processed.values()) {
pluginDataObjects.addAll(Arrays.asList(pdos));
}
}
return requestRecord;
}
}
private RetrievalRequestRecord opsnetRetrieval;
private RetrievalRequestRecord sbnRetrieval;
@Autowired
@Qualifier(value = "retrievalDao")
private IRetrievalDao dao;
private final ConcurrentLinkedQueue<RetrievalRequestRecordPK> retrievalQueue = new ConcurrentLinkedQueue<RetrievalRequestRecordPK>();
private final PlaceInCollectionProcessor retrievedDataProcessor = new PlaceInCollectionProcessor();
private final List<DataRetrievalEvent> eventsReceived = new ArrayList<DataRetrievalEvent>();
@Before
public void setUp() throws RegistryHandlerException {
PathManagerFactoryTest.initLocalization();
opsnetRetrieval = RetrievalRequestRecordFixture.INSTANCE.get(1);
sbnRetrieval = RetrievalRequestRecordFixture.INSTANCE.get(2);
opsnetRetrieval.setNetwork(Network.OPSNET);
sbnRetrieval.setNetwork(Network.SBN);
EventBus.register(this);
}
@After
public void tearDown() {
EventBus.unregister(this);
}
@Test
public void processesRetrievalForItsSpecifiedNetwork()
throws DataAccessLayerException {
stageRetrievals();
runRetrievalTask();
verifyCorrectStateForRetrieval(opsnetRetrieval, State.COMPLETED);
}
@Test
public void storesPluginDataObjectsForItsSpecifiedNetwork()
throws DataAccessLayerException, SerializationException {
stageRetrievals();
runRetrievalTask();
assertThat(retrievedDataProcessor.pluginDataObjects,
hasSize(opsnetRetrieval.getRetrievalObj().getAttributes()
.size()));
}
@Ignore("dataRetrievalEvent is no longer sent separately from storage, perhaps restore it later?")
public void dataRetrievalEventIsSentForItsSpecifiedNetwork()
throws Exception {
stageRetrievals();
runRetrievalTask();
final int numberOfRetrievalAttributes = opsnetRetrieval
.getRetrievalObj().getAttributes().size();
assertThat(eventsReceived, hasSize(numberOfRetrievalAttributes));
// TODO: Is there a way to distinguish between the events sent by the
// separate retrieval attributes, e.g. to make sure each attribute sent
// an event and not one attribute sent two?
}
// TODO: Add tests for one retrieval failing and another succeeding, make
// sure correct events are sent and correct number of plugin data objects
// generated
@Test
public void doesNotProcessRetrievalForAnotherNetwork()
throws DataAccessLayerException {
stageRetrievals();
runRetrievalTask();
verifyCorrectStateForRetrieval(sbnRetrieval, State.PENDING);
}
@Test
public void retrievalTaskCanStoreDataToDirectoryThatAnotherTaskProcesses()
throws Exception {
dao.create(RetrievalRequestRecordFixture.INSTANCE.get());
IRetrievalsFinder retrievalDataFinder = new PerformRetrievalsThenReturnFinder(
retrievalQueue, dao);
final ConcurrentLinkedQueue<String> retrievalQueue = new ConcurrentLinkedQueue<String>();
final File testDirectory = TestUtil
.setupTestClassDir(RetrievalTaskTest.class);
IRetrievalPluginDataObjectsProcessor serializeToDirectory = new SerializeRetrievedDataToDirectory(
testDirectory, new AlwaysSameWmoHeader("SMYG10 LYBM 280000"),
dao);
RetrievalTask downloadTask = new RetrievalTask(Network.OPSNET,
retrievalDataFinder, serializeToDirectory,
mock(IRetrievalResponseCompleter.class));
RetrievalTask readDownloadsTask = new RetrievalTask(Network.OPSNET,
new DeserializeRetrievedDataFromIngest(retrievalQueue),
retrievedDataProcessor, new RetrievalResponseCompleter(
mock(SubscriptionNotifyTask.class), dao));
downloadTask.run();
final List<RetrievalRequestRecord> all = dao.getAll();
for (RetrievalRequestRecord request : all) {
assertThat(request.getState(), is(State.RUNNING));
}
for (File file : FileUtil.listFiles(testDirectory,
FilenameFilters.ACCEPT_FILES, false)) {
retrievalQueue.add(FileUtil.file2String(file));
}
readDownloadsTask.run();
final List<RetrievalRequestRecord> allRetrievals = dao.getAll();
assertThat(allRetrievals, hasSize(1));
assertThat(retrievedDataProcessor.pluginDataObjects, hasSize(2));
for (RetrievalRequestRecord request : allRetrievals) {
assertThat(request.getState(), is(State.COMPLETED));
}
}
/**
* Stage the retrievals in the database.
*/
@Transactional
private void stageRetrievals() {
dao.create(opsnetRetrieval);
retrievalQueue.add(opsnetRetrieval.getId());
dao.create(sbnRetrieval);
retrievalQueue.add(sbnRetrieval.getId());
}
/**
* Run the actual retrieval task.
*/
private void runRetrievalTask() {
// Create required strategies for finding, processing, and completing
// retrievals
final IRetrievalsFinder retrievalDataFinder = new PerformRetrievalsThenReturnFinder(
retrievalQueue, dao);
final IRetrievalResponseCompleter retrievalCompleter = new RetrievalResponseCompleter(
mock(SubscriptionNotifyTask.class), dao);
new RetrievalTask(Network.OPSNET, retrievalDataFinder,
retrievedDataProcessor, retrievalCompleter).run();
}
/**
* Verify the retrieval record is in the expected state.
*
* @param retrieval
* the retrieval
* @param state
* the expected state
* @throws DataAccessLayerException
*/
private void verifyCorrectStateForRetrieval(
RetrievalRequestRecord retrieval, State state)
throws DataAccessLayerException {
RetrievalRequestRecord recordInDb = dao
.getRequests(retrieval.getId().getSubscriptionName())
.iterator().next();
assertThat(recordInDb.getState(), is(equalTo(state)));
}
/**
* This method will be invoked by the EventBus when a data retrieval event
* is sent.
*
* @param event
* the event
*/
@Subscribe
public void receivedDataDeliveryEvent(DataRetrievalEvent event) {
eventsReceived.add(event);
}
}