Merge "Issue #2678 Fix for multiple threads grabbing a retrieval for processing." into omaha_14.2.1
Former-commit-id:b90000df3e
[formerly89163b6e3b
[formerlyd682bcee93
] [formerlyb90000df3e
[formerly 81e57beaa231d696b7d40d6afbda365a27f6cae8]]] Former-commit-id:89163b6e3b
[formerlyd682bcee93
] Former-commit-id:89163b6e3b
Former-commit-id:cf41524e58
This commit is contained in:
commit
a02b1f3de0
12 changed files with 61 additions and 20 deletions
|
@ -25,6 +25,7 @@ import java.io.Serializable;
|
|||
import javax.xml.bind.annotation.XmlAccessType;
|
||||
import javax.xml.bind.annotation.XmlAccessorType;
|
||||
import javax.xml.bind.annotation.XmlElement;
|
||||
import javax.xml.bind.annotation.XmlRootElement;
|
||||
import javax.xml.bind.annotation.XmlSeeAlso;
|
||||
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
|
||||
|
||||
|
@ -45,14 +46,15 @@ import com.vividsolutions.jts.geom.Coordinate;
|
|||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Jan 17, 2011 191 dhladky Initial creation
|
||||
* Dec 10, 2012 1259 bsteffen Switch Data Delivery from LatLon to referenced envelopes.
|
||||
*
|
||||
* Dec 10, 2012 1259 bsteffen Switch Data Delivery from LatLon to referenced envelopes.
|
||||
* Jan 15, 2014 2678 bgonzale Added XmlRootElement annotation.
|
||||
* </pre>
|
||||
*
|
||||
* @author dhladky
|
||||
* @version 1.0
|
||||
*/
|
||||
|
||||
@XmlRootElement
|
||||
@XmlAccessorType(XmlAccessType.NONE)
|
||||
@DynamicSerialize
|
||||
@XmlSeeAlso({ GriddedCoverage.class, LatLonGridCoverage.class })
|
||||
|
|
|
@ -33,6 +33,7 @@
|
|||
<constructor-arg ref="bandwidthDao" />
|
||||
<constructor-arg ref="retrievalDao" />
|
||||
<constructor-arg ref="ProviderHandler" />
|
||||
<constructor-arg ref="retrievalQueue" />
|
||||
</bean>
|
||||
|
||||
<bean id="sbnSubscriptionRetrievalAgentPrototype"
|
||||
|
@ -46,6 +47,7 @@
|
|||
<constructor-arg ref="bandwidthDao" />
|
||||
<constructor-arg ref="retrievalDao" />
|
||||
<constructor-arg ref="ProviderHandler" />
|
||||
<constructor-arg ref="retrievalQueue" />
|
||||
</bean>
|
||||
|
||||
<util:map id="retrievalAgents">
|
||||
|
|
|
@ -47,6 +47,7 @@
|
|||
<constructor-arg ref="bandwidthDao" />
|
||||
<constructor-arg ref="retrievalDao" />
|
||||
<constructor-arg ref="ProviderHandler" />
|
||||
<constructor-arg ref="retrievalQueue" />
|
||||
</bean>
|
||||
|
||||
<util:map id="retrievalAgents">
|
||||
|
|
|
@ -47,6 +47,7 @@
|
|||
<constructor-arg ref="bandwidthDao" />
|
||||
<constructor-arg ref="retrievalDao" />
|
||||
<constructor-arg ref="ProviderHandler" />
|
||||
<constructor-arg ref="retrievalQueue" />
|
||||
</bean>
|
||||
|
||||
<bean id="sbnSubscriptionRetrievalAgentPrototype"
|
||||
|
@ -60,6 +61,9 @@
|
|||
<constructor-arg ref="bandwidthDao" />
|
||||
<constructor-arg ref="retrievalDao" />
|
||||
<constructor-arg ref="ProviderHandler" />
|
||||
<constructor-arg name="retrievalQueue" >
|
||||
<null/>
|
||||
</constructor-arg>
|
||||
</bean>
|
||||
|
||||
<util:map id="retrievalAgents">
|
||||
|
|
|
@ -6,6 +6,7 @@ package com.raytheon.uf.edex.datadelivery.bandwidth.retrieval;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.raytheon.uf.common.datadelivery.registry.Network;
|
||||
|
@ -34,6 +35,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.RetrievalManagerNotifyEvent;
|
|||
import com.raytheon.uf.edex.datadelivery.retrieval.ServiceTypeFactory;
|
||||
import com.raytheon.uf.edex.datadelivery.retrieval.db.IRetrievalDao;
|
||||
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord;
|
||||
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecordPK;
|
||||
|
||||
/**
|
||||
* Class used to process SubscriptionRetrieval BandwidthAllocations.
|
||||
|
@ -51,6 +53,8 @@ import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord;
|
|||
* Jun 24, 2013 2106 djohnson Set actual start time when sending to retrieval rather than overwrite scheduled start.
|
||||
* Jul 09, 2013 2106 djohnson Dependency inject registry handlers.
|
||||
* Jul 11, 2013 2106 djohnson Use SubscriptionPriority enum.
|
||||
* Jan 15, 2014 2678 bgonzale Use Queue for passing RetrievalRequestRecords to the
|
||||
* RetrievalTasks (PerformRetrievalsThenReturnFinder).
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -72,15 +76,19 @@ public class SubscriptionRetrievalAgent extends
|
|||
|
||||
private final IProviderHandler providerHandler;
|
||||
|
||||
private final ConcurrentLinkedQueue<RetrievalRequestRecordPK> retrievalQueue;
|
||||
|
||||
public SubscriptionRetrievalAgent(Network network, String destinationUri,
|
||||
final Object notifier, int defaultPriority,
|
||||
RetrievalManager retrievalManager, IBandwidthDao bandwidthDao,
|
||||
IRetrievalDao retrievalDao, IProviderHandler providerHandler) {
|
||||
IRetrievalDao retrievalDao, IProviderHandler providerHandler,
|
||||
ConcurrentLinkedQueue<RetrievalRequestRecordPK> retrievalQueue) {
|
||||
super(network, destinationUri, notifier, retrievalManager);
|
||||
this.defaultPriority = defaultPriority;
|
||||
this.bandwidthDao = bandwidthDao;
|
||||
this.retrievalDao = retrievalDao;
|
||||
this.providerHandler = providerHandler;
|
||||
this.retrievalQueue = retrievalQueue;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -134,9 +142,6 @@ public class SubscriptionRetrievalAgent extends
|
|||
@VisibleForTesting
|
||||
void wakeRetrievalTasks() throws EdexException {
|
||||
EDEXUtil.getMessageProducer().sendAsync(destinationUri, null);
|
||||
EDEXUtil.getMessageProducer().sendAsync(destinationUri, null);
|
||||
EDEXUtil.getMessageProducer().sendAsync(destinationUri, null);
|
||||
EDEXUtil.getMessageProducer().sendAsync(destinationUri, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -211,6 +216,9 @@ public class SubscriptionRetrievalAgent extends
|
|||
rec.setRetrieval(SerializationUtil
|
||||
.transformToThrift(retrieval));
|
||||
rec.setState(RetrievalRequestRecord.State.PENDING);
|
||||
if (retrievalQueue != null) {
|
||||
retrievalQueue.add(rec.getId());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
statusHandler.error("Subscription: " + subscriptionName
|
||||
+ " Failed to serialize request [" + retrieval
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
<constructor-arg>
|
||||
<bean
|
||||
class="com.raytheon.uf.edex.datadelivery.retrieval.handlers.PerformRetrievalsThenReturnFinder">
|
||||
<constructor-arg value="SBN" />
|
||||
<constructor-arg ref="retrievalQueue" />
|
||||
<constructor-arg ref="retrievalDao" />
|
||||
</bean>
|
||||
</constructor-arg>
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
<constructor-arg>
|
||||
<bean
|
||||
class="com.raytheon.uf.edex.datadelivery.retrieval.handlers.PerformRetrievalsThenReturnFinder">
|
||||
<constructor-arg value="OPSNET" />
|
||||
<constructor-arg ref="retrievalQueue" />
|
||||
<constructor-arg ref="retrievalDao" />
|
||||
</bean>
|
||||
</constructor-arg>
|
||||
|
|
|
@ -76,6 +76,8 @@
|
|||
</bean>
|
||||
</constructor-arg>
|
||||
</bean>
|
||||
|
||||
<bean id="retrievalQueue" class="java.util.concurrent.ConcurrentLinkedQueue" />
|
||||
|
||||
<camelContext id="dataDeliveryNotify-camel"
|
||||
xmlns="http://camel.apache.org/schema/spring" errorHandlerRef="errorHandler">
|
||||
|
|
|
@ -21,11 +21,11 @@ package com.raytheon.uf.edex.datadelivery.retrieval.handlers;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.raytheon.uf.common.datadelivery.event.status.DataDeliverySystemStatusDefinition;
|
||||
import com.raytheon.uf.common.datadelivery.event.status.SystemStatusEvent;
|
||||
import com.raytheon.uf.common.datadelivery.registry.Network;
|
||||
import com.raytheon.uf.common.datadelivery.registry.Provider.ServiceType;
|
||||
import com.raytheon.uf.common.datadelivery.retrieval.xml.Retrieval;
|
||||
import com.raytheon.uf.common.datadelivery.retrieval.xml.RetrievalAttribute;
|
||||
|
@ -40,6 +40,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.adapters.RetrievalAdapter;
|
|||
import com.raytheon.uf.edex.datadelivery.retrieval.db.IRetrievalDao;
|
||||
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord;
|
||||
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord.State;
|
||||
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecordPK;
|
||||
import com.raytheon.uf.edex.datadelivery.retrieval.interfaces.IRetrievalRequestBuilder;
|
||||
import com.raytheon.uf.edex.datadelivery.retrieval.interfaces.IRetrievalResponse;
|
||||
|
||||
|
@ -58,6 +59,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.interfaces.IRetrievalResponse
|
|||
* Feb 12, 2013 1543 djohnson Retrieval responses are now passed further down the chain.
|
||||
* Feb 15, 2013 1543 djohnson Retrieval responses are now xml.
|
||||
* Jul 16, 2013 1655 mpduff Send a system status event based on the response from the provider.
|
||||
* Jan 15, 2014 2678 bgonzale Retrieve RetrievalRequestRecords from a Queue for processing.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -70,18 +72,19 @@ public class PerformRetrievalsThenReturnFinder implements IRetrievalsFinder {
|
|||
private static final IUFStatusHandler statusHandler = UFStatus
|
||||
.getHandler(PerformRetrievalsThenReturnFinder.class);
|
||||
|
||||
private final Network network;
|
||||
|
||||
private final IRetrievalDao retrievalDao;
|
||||
|
||||
private final ConcurrentLinkedQueue<RetrievalRequestRecordPK> retrievalQueue;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
* @param network
|
||||
*/
|
||||
public PerformRetrievalsThenReturnFinder(Network network,
|
||||
public PerformRetrievalsThenReturnFinder(
|
||||
ConcurrentLinkedQueue<RetrievalRequestRecordPK> retrievalQueue,
|
||||
IRetrievalDao retrievalDao) {
|
||||
this.network = network;
|
||||
this.retrievalQueue = retrievalQueue;
|
||||
this.retrievalDao = retrievalDao;
|
||||
}
|
||||
|
||||
|
@ -95,9 +98,12 @@ public class PerformRetrievalsThenReturnFinder implements IRetrievalsFinder {
|
|||
ITimer timer = TimeUtil.getTimer();
|
||||
try {
|
||||
timer.start();
|
||||
RetrievalRequestRecord request = retrievalDao
|
||||
.activateNextRetrievalRequest(network);
|
||||
RetrievalRequestRecordPK id = retrievalQueue.poll();
|
||||
if (id == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
RetrievalRequestRecord request = retrievalDao.getById(id);
|
||||
if (request == null) {
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import static org.mockito.Mockito.mock;
|
|||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -56,6 +57,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.dao.SubscriptionRetrievalAttr
|
|||
import com.raytheon.uf.edex.datadelivery.retrieval.db.IRetrievalDao;
|
||||
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord;
|
||||
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord.State;
|
||||
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecordPK;
|
||||
|
||||
/**
|
||||
* Test {@link SubscriptionRetrievalAgent}.
|
||||
|
@ -68,6 +70,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord.Sta
|
|||
* ------------ ---------- ----------- --------------------------
|
||||
* Jan 30, 2013 1543 djohnson Initial creation
|
||||
* Jul 10, 2013 2106 djohnson Inject providerHandler.
|
||||
* Jan 15, 2014 2678 bgonzale Added Queue.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -84,6 +87,8 @@ public class SubscriptionRetrievalAgentTest {
|
|||
@Qualifier(value = "retrievalDao")
|
||||
private IRetrievalDao retrievalDao;
|
||||
|
||||
private final ConcurrentLinkedQueue<RetrievalRequestRecordPK> retrievalQueue = new ConcurrentLinkedQueue<RetrievalRequestRecordPK>();
|
||||
|
||||
@Before
|
||||
public void setUp() throws RegistryHandlerException {
|
||||
PathManagerFactoryTest.initLocalization();
|
||||
|
@ -128,7 +133,8 @@ public class SubscriptionRetrievalAgentTest {
|
|||
|
||||
SubscriptionRetrievalAgent agent = new SubscriptionRetrievalAgent(
|
||||
route, "someUri", new Object(), 1, null, bandwidthDao,
|
||||
retrievalDao, DataDeliveryHandlers.getProviderHandler()) {
|
||||
retrievalDao, DataDeliveryHandlers.getProviderHandler(),
|
||||
retrievalQueue) {
|
||||
@Override
|
||||
void wakeRetrievalTasks() throws EdexException {
|
||||
// Do nothing
|
||||
|
|
|
@ -26,11 +26,11 @@ import static org.mockito.Mockito.mock;
|
|||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.raytheon.uf.common.datadelivery.registry.Network;
|
||||
import com.raytheon.uf.common.datadelivery.registry.Provider.ServiceType;
|
||||
import com.raytheon.uf.common.datadelivery.retrieval.xml.Retrieval;
|
||||
import com.raytheon.uf.common.datadelivery.retrieval.xml.RetrievalAttribute;
|
||||
|
@ -39,6 +39,7 @@ import com.raytheon.uf.common.serialization.SerializationException;
|
|||
import com.raytheon.uf.edex.datadelivery.retrieval.db.IRetrievalDao;
|
||||
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord;
|
||||
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord.State;
|
||||
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecordPK;
|
||||
import com.raytheon.uf.edex.datadelivery.retrieval.interfaces.IRetrievalResponse;
|
||||
|
||||
/**
|
||||
|
@ -51,6 +52,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.interfaces.IRetrievalResponse
|
|||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Feb 06, 2013 1543 djohnson Initial creation
|
||||
* Jan 15, 2014 2678 bgonzale Added Queue.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -61,6 +63,8 @@ public class PerformRetrievalPluginDataObjectsFinderTest {
|
|||
|
||||
private static final String EXCEPTION_MESSAGE = "thrown on purpose";
|
||||
|
||||
private final ConcurrentLinkedQueue<RetrievalRequestRecordPK> retrievalQueue = new ConcurrentLinkedQueue<RetrievalRequestRecordPK>();
|
||||
|
||||
private static final IRetrievalDao MOCK_DAO = mock(IRetrievalDao.class);
|
||||
|
||||
private final Retrieval retrievalThatThrowsException = new Retrieval() {
|
||||
|
@ -132,7 +136,7 @@ public class PerformRetrievalPluginDataObjectsFinderTest {
|
|||
|
||||
private void processRetrieval(RetrievalRequestRecord retrieval) {
|
||||
final PerformRetrievalsThenReturnFinder pluginDataObjectsFinder = new PerformRetrievalsThenReturnFinder(
|
||||
Network.OPSNET, MOCK_DAO);
|
||||
retrievalQueue, MOCK_DAO);
|
||||
pluginDataObjectsFinder.process(retrieval);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,6 +68,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.adapters.RetrievalAdapter.Tra
|
|||
import com.raytheon.uf.edex.datadelivery.retrieval.db.IRetrievalDao;
|
||||
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord;
|
||||
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord.State;
|
||||
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecordPK;
|
||||
import com.raytheon.uf.edex.datadelivery.retrieval.interfaces.IRetrievalResponse;
|
||||
|
||||
/**
|
||||
|
@ -89,6 +90,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.interfaces.IRetrievalResponse
|
|||
* Aug 09, 2013 1822 bgonzale Added parameters to processRetrievedPluginDataObjects.
|
||||
* Oct 01, 2013 2267 bgonzale Pass request parameter instead of components of request.
|
||||
* Nov 04, 2013 2506 bgonzale removed IRetrievalDao parameter.
|
||||
* Jan 15, 2014 2678 bgonzale Added Queue.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -157,6 +159,8 @@ public class RetrievalTaskTest {
|
|||
@Qualifier(value = "retrievalDao")
|
||||
private IRetrievalDao dao;
|
||||
|
||||
private final ConcurrentLinkedQueue<RetrievalRequestRecordPK> retrievalQueue = new ConcurrentLinkedQueue<RetrievalRequestRecordPK>();
|
||||
|
||||
private final PlaceInCollectionProcessor retrievedDataProcessor = new PlaceInCollectionProcessor();
|
||||
|
||||
private final List<DataRetrievalEvent> eventsReceived = new ArrayList<DataRetrievalEvent>();
|
||||
|
@ -239,7 +243,7 @@ public class RetrievalTaskTest {
|
|||
dao.create(RetrievalRequestRecordFixture.INSTANCE.get());
|
||||
|
||||
IRetrievalsFinder retrievalDataFinder = new PerformRetrievalsThenReturnFinder(
|
||||
Network.OPSNET, dao);
|
||||
retrievalQueue, dao);
|
||||
|
||||
final ConcurrentLinkedQueue<String> retrievalQueue = new ConcurrentLinkedQueue<String>();
|
||||
final File testDirectory = TestUtil
|
||||
|
@ -287,7 +291,9 @@ public class RetrievalTaskTest {
|
|||
private void stageRetrievals() {
|
||||
|
||||
dao.create(opsnetRetrieval);
|
||||
retrievalQueue.add(opsnetRetrieval.getId());
|
||||
dao.create(sbnRetrieval);
|
||||
retrievalQueue.add(sbnRetrieval.getId());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -297,7 +303,7 @@ public class RetrievalTaskTest {
|
|||
// Create required strategies for finding, processing, and completing
|
||||
// retrievals
|
||||
final IRetrievalsFinder retrievalDataFinder = new PerformRetrievalsThenReturnFinder(
|
||||
Network.OPSNET, dao);
|
||||
retrievalQueue, dao);
|
||||
final IRetrievalResponseCompleter retrievalCompleter = new RetrievalResponseCompleter(
|
||||
mock(SubscriptionNotifyTask.class), dao);
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue