Merge "Issue #2678 Fix for multiple threads grabbing a retrieval for processing." into omaha_14.2.1

Former-commit-id: 89163b6e3b [formerly d682bcee93] [formerly 89163b6e3b [formerly d682bcee93] [formerly b90000df3e [formerly 81e57beaa231d696b7d40d6afbda365a27f6cae8]]]
Former-commit-id: b90000df3e
Former-commit-id: 193825115d [formerly 4999c205d7]
Former-commit-id: f9fb89dea3
This commit is contained in:
Nate Jensen 2014-01-15 17:36:11 -06:00 committed by Gerrit Code Review
commit 2231328f77
12 changed files with 61 additions and 20 deletions

View file

@ -25,6 +25,7 @@ import java.io.Serializable;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlSeeAlso;
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
@ -45,14 +46,15 @@ import com.vividsolutions.jts.geom.Coordinate;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Jan 17, 2011 191 dhladky Initial creation
* Dec 10, 2012 1259 bsteffen Switch Data Delivery from LatLon to referenced envelopes.
*
* Dec 10, 2012 1259 bsteffen Switch Data Delivery from LatLon to referenced envelopes.
* Jan 15, 2014 2678 bgonzale Added XmlRootElement annotation.
* </pre>
*
* @author dhladky
* @version 1.0
*/
@XmlRootElement
@XmlAccessorType(XmlAccessType.NONE)
@DynamicSerialize
@XmlSeeAlso({ GriddedCoverage.class, LatLonGridCoverage.class })

View file

@ -33,6 +33,7 @@
<constructor-arg ref="bandwidthDao" />
<constructor-arg ref="retrievalDao" />
<constructor-arg ref="ProviderHandler" />
<constructor-arg ref="retrievalQueue" />
</bean>
<bean id="sbnSubscriptionRetrievalAgentPrototype"
@ -46,6 +47,7 @@
<constructor-arg ref="bandwidthDao" />
<constructor-arg ref="retrievalDao" />
<constructor-arg ref="ProviderHandler" />
<constructor-arg ref="retrievalQueue" />
</bean>
<util:map id="retrievalAgents">

View file

@ -47,6 +47,7 @@
<constructor-arg ref="bandwidthDao" />
<constructor-arg ref="retrievalDao" />
<constructor-arg ref="ProviderHandler" />
<constructor-arg ref="retrievalQueue" />
</bean>
<util:map id="retrievalAgents">

View file

@ -47,6 +47,7 @@
<constructor-arg ref="bandwidthDao" />
<constructor-arg ref="retrievalDao" />
<constructor-arg ref="ProviderHandler" />
<constructor-arg ref="retrievalQueue" />
</bean>
<bean id="sbnSubscriptionRetrievalAgentPrototype"
@ -60,6 +61,9 @@
<constructor-arg ref="bandwidthDao" />
<constructor-arg ref="retrievalDao" />
<constructor-arg ref="ProviderHandler" />
<constructor-arg name="retrievalQueue" >
<null/>
</constructor-arg>
</bean>
<util:map id="retrievalAgents">

View file

@ -6,6 +6,7 @@ package com.raytheon.uf.edex.datadelivery.bandwidth.retrieval;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import com.google.common.annotations.VisibleForTesting;
import com.raytheon.uf.common.datadelivery.registry.Network;
@ -34,6 +35,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.RetrievalManagerNotifyEvent;
import com.raytheon.uf.edex.datadelivery.retrieval.ServiceTypeFactory;
import com.raytheon.uf.edex.datadelivery.retrieval.db.IRetrievalDao;
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord;
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecordPK;
/**
* Class used to process SubscriptionRetrieval BandwidthAllocations.
@ -51,6 +53,8 @@ import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord;
* Jun 24, 2013 2106 djohnson Set actual start time when sending to retrieval rather than overwrite scheduled start.
* Jul 09, 2013 2106 djohnson Dependency inject registry handlers.
* Jul 11, 2013 2106 djohnson Use SubscriptionPriority enum.
* Jan 15, 2014 2678 bgonzale Use Queue for passing RetrievalRequestRecords to the
* RetrievalTasks (PerformRetrievalsThenReturnFinder).
*
* </pre>
*
@ -72,15 +76,19 @@ public class SubscriptionRetrievalAgent extends
private final IProviderHandler providerHandler;
private final ConcurrentLinkedQueue<RetrievalRequestRecordPK> retrievalQueue;
public SubscriptionRetrievalAgent(Network network, String destinationUri,
final Object notifier, int defaultPriority,
RetrievalManager retrievalManager, IBandwidthDao bandwidthDao,
IRetrievalDao retrievalDao, IProviderHandler providerHandler) {
IRetrievalDao retrievalDao, IProviderHandler providerHandler,
ConcurrentLinkedQueue<RetrievalRequestRecordPK> retrievalQueue) {
super(network, destinationUri, notifier, retrievalManager);
this.defaultPriority = defaultPriority;
this.bandwidthDao = bandwidthDao;
this.retrievalDao = retrievalDao;
this.providerHandler = providerHandler;
this.retrievalQueue = retrievalQueue;
}
@Override
@ -134,9 +142,6 @@ public class SubscriptionRetrievalAgent extends
@VisibleForTesting
void wakeRetrievalTasks() throws EdexException {
EDEXUtil.getMessageProducer().sendAsync(destinationUri, null);
EDEXUtil.getMessageProducer().sendAsync(destinationUri, null);
EDEXUtil.getMessageProducer().sendAsync(destinationUri, null);
EDEXUtil.getMessageProducer().sendAsync(destinationUri, null);
}
@Override
@ -211,6 +216,9 @@ public class SubscriptionRetrievalAgent extends
rec.setRetrieval(SerializationUtil
.transformToThrift(retrieval));
rec.setState(RetrievalRequestRecord.State.PENDING);
if (retrievalQueue != null) {
retrievalQueue.add(rec.getId());
}
} catch (Exception e) {
statusHandler.error("Subscription: " + subscriptionName
+ " Failed to serialize request [" + retrieval

View file

@ -14,7 +14,7 @@
<constructor-arg>
<bean
class="com.raytheon.uf.edex.datadelivery.retrieval.handlers.PerformRetrievalsThenReturnFinder">
<constructor-arg value="SBN" />
<constructor-arg ref="retrievalQueue" />
<constructor-arg ref="retrievalDao" />
</bean>
</constructor-arg>

View file

@ -14,7 +14,7 @@
<constructor-arg>
<bean
class="com.raytheon.uf.edex.datadelivery.retrieval.handlers.PerformRetrievalsThenReturnFinder">
<constructor-arg value="OPSNET" />
<constructor-arg ref="retrievalQueue" />
<constructor-arg ref="retrievalDao" />
</bean>
</constructor-arg>

View file

@ -76,6 +76,8 @@
</bean>
</constructor-arg>
</bean>
<bean id="retrievalQueue" class="java.util.concurrent.ConcurrentLinkedQueue" />
<camelContext id="dataDeliveryNotify-camel"
xmlns="http://camel.apache.org/schema/spring" errorHandlerRef="errorHandler">

View file

@ -21,11 +21,11 @@ package com.raytheon.uf.edex.datadelivery.retrieval.handlers;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import com.google.common.annotations.VisibleForTesting;
import com.raytheon.uf.common.datadelivery.event.status.DataDeliverySystemStatusDefinition;
import com.raytheon.uf.common.datadelivery.event.status.SystemStatusEvent;
import com.raytheon.uf.common.datadelivery.registry.Network;
import com.raytheon.uf.common.datadelivery.registry.Provider.ServiceType;
import com.raytheon.uf.common.datadelivery.retrieval.xml.Retrieval;
import com.raytheon.uf.common.datadelivery.retrieval.xml.RetrievalAttribute;
@ -40,6 +40,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.adapters.RetrievalAdapter;
import com.raytheon.uf.edex.datadelivery.retrieval.db.IRetrievalDao;
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord;
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord.State;
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecordPK;
import com.raytheon.uf.edex.datadelivery.retrieval.interfaces.IRetrievalRequestBuilder;
import com.raytheon.uf.edex.datadelivery.retrieval.interfaces.IRetrievalResponse;
@ -58,6 +59,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.interfaces.IRetrievalResponse
* Feb 12, 2013 1543 djohnson Retrieval responses are now passed further down the chain.
* Feb 15, 2013 1543 djohnson Retrieval responses are now xml.
* Jul 16, 2013 1655 mpduff Send a system status event based on the response from the provider.
* Jan 15, 2014 2678 bgonzale Retrieve RetrievalRequestRecords from a Queue for processing.
*
* </pre>
*
@ -70,18 +72,19 @@ public class PerformRetrievalsThenReturnFinder implements IRetrievalsFinder {
private static final IUFStatusHandler statusHandler = UFStatus
.getHandler(PerformRetrievalsThenReturnFinder.class);
private final Network network;
private final IRetrievalDao retrievalDao;
private final ConcurrentLinkedQueue<RetrievalRequestRecordPK> retrievalQueue;
/**
* Constructor.
*
* @param network
*/
public PerformRetrievalsThenReturnFinder(Network network,
public PerformRetrievalsThenReturnFinder(
ConcurrentLinkedQueue<RetrievalRequestRecordPK> retrievalQueue,
IRetrievalDao retrievalDao) {
this.network = network;
this.retrievalQueue = retrievalQueue;
this.retrievalDao = retrievalDao;
}
@ -95,9 +98,12 @@ public class PerformRetrievalsThenReturnFinder implements IRetrievalsFinder {
ITimer timer = TimeUtil.getTimer();
try {
timer.start();
RetrievalRequestRecord request = retrievalDao
.activateNextRetrievalRequest(network);
RetrievalRequestRecordPK id = retrievalQueue.poll();
if (id == null) {
return null;
}
RetrievalRequestRecord request = retrievalDao.getById(id);
if (request == null) {
return null;
}

View file

@ -28,6 +28,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.junit.Before;
import org.junit.Test;
@ -56,6 +57,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.dao.SubscriptionRetrievalAttr
import com.raytheon.uf.edex.datadelivery.retrieval.db.IRetrievalDao;
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord;
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord.State;
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecordPK;
/**
* Test {@link SubscriptionRetrievalAgent}.
@ -68,6 +70,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord.Sta
* ------------ ---------- ----------- --------------------------
* Jan 30, 2013 1543 djohnson Initial creation
* Jul 10, 2013 2106 djohnson Inject providerHandler.
* Jan 15, 2014 2678 bgonzale Added Queue.
*
* </pre>
*
@ -84,6 +87,8 @@ public class SubscriptionRetrievalAgentTest {
@Qualifier(value = "retrievalDao")
private IRetrievalDao retrievalDao;
private final ConcurrentLinkedQueue<RetrievalRequestRecordPK> retrievalQueue = new ConcurrentLinkedQueue<RetrievalRequestRecordPK>();
@Before
public void setUp() throws RegistryHandlerException {
PathManagerFactoryTest.initLocalization();
@ -128,7 +133,8 @@ public class SubscriptionRetrievalAgentTest {
SubscriptionRetrievalAgent agent = new SubscriptionRetrievalAgent(
route, "someUri", new Object(), 1, null, bandwidthDao,
retrievalDao, DataDeliveryHandlers.getProviderHandler()) {
retrievalDao, DataDeliveryHandlers.getProviderHandler(),
retrievalQueue) {
@Override
void wakeRetrievalTasks() throws EdexException {
// Do nothing

View file

@ -26,11 +26,11 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.junit.Before;
import org.junit.Test;
import com.raytheon.uf.common.datadelivery.registry.Network;
import com.raytheon.uf.common.datadelivery.registry.Provider.ServiceType;
import com.raytheon.uf.common.datadelivery.retrieval.xml.Retrieval;
import com.raytheon.uf.common.datadelivery.retrieval.xml.RetrievalAttribute;
@ -39,6 +39,7 @@ import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.edex.datadelivery.retrieval.db.IRetrievalDao;
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord;
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord.State;
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecordPK;
import com.raytheon.uf.edex.datadelivery.retrieval.interfaces.IRetrievalResponse;
/**
@ -51,6 +52,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.interfaces.IRetrievalResponse
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Feb 06, 2013 1543 djohnson Initial creation
* Jan 15, 2014 2678 bgonzale Added Queue.
*
* </pre>
*
@ -61,6 +63,8 @@ public class PerformRetrievalPluginDataObjectsFinderTest {
private static final String EXCEPTION_MESSAGE = "thrown on purpose";
private final ConcurrentLinkedQueue<RetrievalRequestRecordPK> retrievalQueue = new ConcurrentLinkedQueue<RetrievalRequestRecordPK>();
private static final IRetrievalDao MOCK_DAO = mock(IRetrievalDao.class);
private final Retrieval retrievalThatThrowsException = new Retrieval() {
@ -132,7 +136,7 @@ public class PerformRetrievalPluginDataObjectsFinderTest {
private void processRetrieval(RetrievalRequestRecord retrieval) {
final PerformRetrievalsThenReturnFinder pluginDataObjectsFinder = new PerformRetrievalsThenReturnFinder(
Network.OPSNET, MOCK_DAO);
retrievalQueue, MOCK_DAO);
pluginDataObjectsFinder.process(retrieval);
}
}

View file

@ -68,6 +68,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.adapters.RetrievalAdapter.Tra
import com.raytheon.uf.edex.datadelivery.retrieval.db.IRetrievalDao;
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord;
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord.State;
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecordPK;
import com.raytheon.uf.edex.datadelivery.retrieval.interfaces.IRetrievalResponse;
/**
@ -89,6 +90,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.interfaces.IRetrievalResponse
* Aug 09, 2013 1822 bgonzale Added parameters to processRetrievedPluginDataObjects.
* Oct 01, 2013 2267 bgonzale Pass request parameter instead of components of request.
* Nov 04, 2013 2506 bgonzale removed IRetrievalDao parameter.
* Jan 15, 2014 2678 bgonzale Added Queue.
*
* </pre>
*
@ -157,6 +159,8 @@ public class RetrievalTaskTest {
@Qualifier(value = "retrievalDao")
private IRetrievalDao dao;
private final ConcurrentLinkedQueue<RetrievalRequestRecordPK> retrievalQueue = new ConcurrentLinkedQueue<RetrievalRequestRecordPK>();
private final PlaceInCollectionProcessor retrievedDataProcessor = new PlaceInCollectionProcessor();
private final List<DataRetrievalEvent> eventsReceived = new ArrayList<DataRetrievalEvent>();
@ -239,7 +243,7 @@ public class RetrievalTaskTest {
dao.create(RetrievalRequestRecordFixture.INSTANCE.get());
IRetrievalsFinder retrievalDataFinder = new PerformRetrievalsThenReturnFinder(
Network.OPSNET, dao);
retrievalQueue, dao);
final ConcurrentLinkedQueue<String> retrievalQueue = new ConcurrentLinkedQueue<String>();
final File testDirectory = TestUtil
@ -287,7 +291,9 @@ public class RetrievalTaskTest {
private void stageRetrievals() {
dao.create(opsnetRetrieval);
retrievalQueue.add(opsnetRetrieval.getId());
dao.create(sbnRetrieval);
retrievalQueue.add(sbnRetrieval.getId());
}
/**
@ -297,7 +303,7 @@ public class RetrievalTaskTest {
// Create required strategies for finding, processing, and completing
// retrievals
final IRetrievalsFinder retrievalDataFinder = new PerformRetrievalsThenReturnFinder(
Network.OPSNET, dao);
retrievalQueue, dao);
final IRetrievalResponseCompleter retrievalCompleter = new RetrievalResponseCompleter(
mock(SubscriptionNotifyTask.class), dao);