Merge "Issue #2678 end retrieval double clutching Change-Id: I34b68b94f5cbb921a3ea3cfeaefb9155adeab0dc" into omaha_14.2.1
Former-commit-id: 555358125bfb925a2c8ed0d393911b4d045d18b1
This commit is contained in:
commit
f59d1433db
4 changed files with 199 additions and 69 deletions
|
@ -1,5 +1,8 @@
|
|||
package com.raytheon.uf.edex.datadelivery.bandwidth.retrieval;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import com.raytheon.uf.common.datadelivery.registry.Network;
|
||||
import com.raytheon.uf.common.status.IUFStatusHandler;
|
||||
import com.raytheon.uf.common.status.UFStatus;
|
||||
|
@ -23,6 +26,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthAllocation;
|
|||
* Nov 09, 2012 1286 djohnson Add ability to kill the threads when BandwidthManager instance is replaced.
|
||||
* Mar 05, 2013 1647 djohnson Sleep one minute between checks.
|
||||
* Jan 30, 2014 2686 dhladky refactor of retrieval.
|
||||
* Feb 10, 2014 2678 dhladky Prevent duplicate allocations.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -102,28 +106,38 @@ public abstract class RetrievalAgent<ALLOCATION_TYPE extends BandwidthAllocation
|
|||
* @throws EdexException
|
||||
*/
|
||||
public void doRun() throws EdexException {
|
||||
statusHandler.info(network+ ": Checking for bandwidth allocations to process...");
|
||||
BandwidthAllocation reservation = retrievalManager.nextAllocation(
|
||||
network, getAgentType());
|
||||
|
||||
if (reservation == RetrievalManager.POISON_PILL) {
|
||||
statusHandler.info(network
|
||||
+ ": Checking for bandwidth allocations to process...");
|
||||
List<BandwidthAllocation> allocationReservations = retrievalManager
|
||||
.getRecentAllocations(network, getAgentType());
|
||||
|
||||
if (allocationReservations != null) {
|
||||
|
||||
List<ALLOCATION_TYPE> allocations = new ArrayList<ALLOCATION_TYPE>(
|
||||
allocationReservations.size());
|
||||
|
||||
for (BandwidthAllocation bandwidthAlloc : allocationReservations) {
|
||||
if (bandwidthAlloc == RetrievalManager.POISON_PILL) {
|
||||
statusHandler
|
||||
.info("Received kill request, this thread is shutting down...");
|
||||
dead = true;
|
||||
return;
|
||||
}
|
||||
|
||||
if (reservation != null) {
|
||||
ALLOCATION_TYPE allocation = getAllocationTypeClass().cast(
|
||||
reservation);
|
||||
statusHandler.info(network+": Processing allocation id ["
|
||||
// cast to type class
|
||||
ALLOCATION_TYPE allocation = (ALLOCATION_TYPE) getAllocationTypeClass()
|
||||
.cast(bandwidthAlloc);
|
||||
allocations.add(allocation);
|
||||
statusHandler.info(network + ": Processing allocation["
|
||||
+ allocation.getId() + "]");
|
||||
}
|
||||
|
||||
processAllocations(allocations);
|
||||
|
||||
processAllocation(allocation);
|
||||
} else {
|
||||
synchronized (notifier) {
|
||||
try {
|
||||
statusHandler.info(network+": None found, sleeping for ["
|
||||
statusHandler.info(network + ": None found, sleeping for ["
|
||||
+ SLEEP_TIME + "]");
|
||||
|
||||
notifier.wait(SLEEP_TIME);
|
||||
|
@ -152,12 +166,12 @@ public abstract class RetrievalAgent<ALLOCATION_TYPE extends BandwidthAllocation
|
|||
/**
|
||||
* Process the {@link BandwidthAllocation} retrieved.
|
||||
*
|
||||
* @param allocation
|
||||
* the allocation
|
||||
* @param allocations
|
||||
* the allocations
|
||||
* @throws EdexException
|
||||
* on error processing the allocation
|
||||
*/
|
||||
abstract void processAllocation(ALLOCATION_TYPE allocation)
|
||||
abstract void processAllocations(List<ALLOCATION_TYPE> allocations)
|
||||
throws EdexException;
|
||||
/**
|
||||
* Get the network
|
||||
|
|
|
@ -40,6 +40,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.RetrievalManagerNotifyEvent;
|
|||
* Jul 09, 2013 2106 djohnson Only needs to unregister from the EventBus when used in an EDEX instance, so handled in EdexBandwidthManager.
|
||||
* Oct 03, 2013 2267 bgonzale Added check for no retrieval plan matching in the proposed retrieval plans.
|
||||
* Jan 30, 2014 2686 dhladky refactor of retrieval.
|
||||
* Feb 10, 2014 2678 dhladky Prevent duplicate allocations.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -194,6 +195,32 @@ public class RetrievalManager {
|
|||
return null;
|
||||
}
|
||||
|
||||
/***
|
||||
* Method used in practice because we need to search for expired allocations.
|
||||
* @param network
|
||||
* @param agentType
|
||||
* @return
|
||||
*/
|
||||
public List<BandwidthAllocation> getRecentAllocations(Network network, String agentType) {
|
||||
|
||||
List<BandwidthAllocation> allocations = null;
|
||||
|
||||
if (shutdown) {
|
||||
allocations = new ArrayList<BandwidthAllocation>(1);
|
||||
allocations.add(POISON_PILL);
|
||||
return allocations;
|
||||
}
|
||||
|
||||
RetrievalPlan plan = getRetrievalPlans().get(network);
|
||||
if (plan != null) {
|
||||
synchronized (plan) {
|
||||
return plan.getRecentAllocations(agentType);
|
||||
}
|
||||
}
|
||||
|
||||
return allocations;
|
||||
}
|
||||
|
||||
public final RetrievalPlan getPlan(Network network) {
|
||||
return getRetrievalPlans().get(network);
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package com.raytheon.uf.edex.datadelivery.bandwidth.retrieval;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Calendar;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
@ -50,6 +51,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil;
|
|||
* BandwidthReservations. Add constrained bucket addition method.
|
||||
* Added debug logging.
|
||||
* Jan 08, 2014 2615 bgonzale Log registry bandwidth calculation errors.
|
||||
* Feb 10, 2014 2678 dhladky Prevent duplicate allocations.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -389,6 +391,40 @@ public class RetrievalPlan {
|
|||
return reservation;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the scheduled allocations from recent buckets
|
||||
*
|
||||
* @param agentType
|
||||
*
|
||||
* @return reservations
|
||||
*/
|
||||
public List<BandwidthAllocation> getRecentAllocations(String agentType) {
|
||||
List<BandwidthAllocation> reservations = null;
|
||||
|
||||
synchronized (bucketsLock) {
|
||||
|
||||
// Get the portion of the Map that is before the
|
||||
// current time (DO NOT want to return future reservations)
|
||||
final List<BandwidthBucket> buckets = bucketsDao
|
||||
.getWhereStartTimeIsLessThanOrEqualTo(
|
||||
TimeUtil.currentTimeMillis(), network);
|
||||
|
||||
// Iterate over the buckets and find all
|
||||
// BandwidthAllocation that are in the READY state
|
||||
for (BandwidthBucket bucket : buckets) {
|
||||
BandwidthAllocation allocationReservation = associator.getNextReservation(bucket, agentType);
|
||||
if (allocationReservation != null) {
|
||||
if (reservations == null) {
|
||||
reservations = new ArrayList<BandwidthAllocation>();
|
||||
}
|
||||
reservations.add(allocationReservation);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return reservations;
|
||||
}
|
||||
|
||||
public void updateRequestMapping(long requestId,
|
||||
Set<BandwidthBucket> buckets) {
|
||||
Set<Long> bucketIds = new TreeSet<Long>();
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
|
@ -6,6 +7,8 @@ package com.raytheon.uf.edex.datadelivery.bandwidth.retrieval;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import com.raytheon.uf.common.datadelivery.registry.Network;
|
||||
import com.raytheon.uf.common.datadelivery.registry.Provider;
|
||||
|
@ -55,6 +58,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.util.RetrievalGeneratorUtilit
|
|||
* RetrievalTasks (PerformRetrievalsThenReturnFinder).
|
||||
* Added constructor that sets the retrievalQueue to null.
|
||||
* Jan 30, 2014 2686 dhladky refactor of retrieval.
|
||||
* Feb 10, 2014 2678 dhladky Prevent duplicate allocations.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -86,22 +90,66 @@ public class SubscriptionRetrievalAgent extends
|
|||
}
|
||||
|
||||
@Override
|
||||
void processAllocation(SubscriptionRetrieval retrieval)
|
||||
void processAllocations(List<SubscriptionRetrieval> subRetrievals)
|
||||
throws EdexException {
|
||||
Subscription<?, ?> sub;
|
||||
try {
|
||||
sub = bandwidthDao.getSubscriptionRetrievalAttributes(retrieval)
|
||||
.getSubscription();
|
||||
} catch (SerializationException e) {
|
||||
throw new EdexException("Unable to deserialize the subscription.",
|
||||
e);
|
||||
}
|
||||
final String originalSubName = sub.getName();
|
||||
|
||||
SubscriptionBundle bundle = new SubscriptionBundle();
|
||||
ConcurrentHashMap<Subscription<?, ?>, SubscriptionRetrieval> retrievalsMap = new ConcurrentHashMap<Subscription<?, ?>, SubscriptionRetrieval>();
|
||||
|
||||
// Get subs from allocations and search for duplicates
|
||||
for (SubscriptionRetrieval subRetrieval : subRetrievals) {
|
||||
|
||||
Subscription<?, ?> sub = null;
|
||||
|
||||
try {
|
||||
sub = bandwidthDao.getSubscriptionRetrievalAttributes(
|
||||
subRetrieval).getSubscription();
|
||||
} catch (SerializationException e) {
|
||||
throw new EdexException(
|
||||
"Unable to deserialize the subscription.", e);
|
||||
}
|
||||
|
||||
// We only allow one subscription retrieval per DSM update.
|
||||
// Remove any duplicate subscription allocations/retrievals, cancel them.
|
||||
if (!retrievalsMap.containsKey(sub)) {
|
||||
retrievalsMap.put(sub, subRetrieval);
|
||||
} else {
|
||||
// Check for most recent startTime, that's the one we want for
|
||||
// retrieval.
|
||||
SubscriptionRetrieval currentRetrieval = retrievalsMap.get(sub);
|
||||
if (subRetrieval.getStartTime().getTime()
|
||||
.after(currentRetrieval.getStartTime().getTime())) {
|
||||
// Replace it in the map, set previous to canceled.
|
||||
currentRetrieval.setStatus(RetrievalStatus.CANCELLED);
|
||||
bandwidthDao.update(currentRetrieval);
|
||||
retrievalsMap.replace(sub, subRetrieval);
|
||||
statusHandler
|
||||
.info("More recent, setting previous allocation to Cancelled ["
|
||||
+ currentRetrieval.getIdentifier()
|
||||
+ "] "
|
||||
+ sub.getName());
|
||||
} else {
|
||||
// Not more recent, cancel
|
||||
subRetrieval.setStatus(RetrievalStatus.CANCELLED);
|
||||
bandwidthDao.update(subRetrieval);
|
||||
statusHandler
|
||||
.info("Older, setting to Cancelled ["
|
||||
+ currentRetrieval.getIdentifier() + "] "
|
||||
+ sub.getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (Entry<Subscription<?, ?>, SubscriptionRetrieval> entry: retrievalsMap.entrySet()) {
|
||||
|
||||
SubscriptionRetrieval retrieval = entry.getValue();
|
||||
Subscription<?, ?> sub = entry.getKey();
|
||||
final String originalSubName = sub.getName();
|
||||
|
||||
Provider provider = getProvider(sub.getProvider());
|
||||
if (provider == null) {
|
||||
statusHandler.error("provider was null, skipping subscription ["
|
||||
statusHandler
|
||||
.error("provider was null, skipping subscription ["
|
||||
+ originalSubName + "]");
|
||||
return;
|
||||
}
|
||||
|
@ -117,10 +165,12 @@ public class SubscriptionRetrievalAgent extends
|
|||
// update database
|
||||
bandwidthDao.update(retrieval);
|
||||
|
||||
// generateRetrieval will pipeline the RetrievalRecord Objects created to the DB.
|
||||
// The PK objects returned are sent to the RetrievalQueue for processing.
|
||||
List<RetrievalRequestRecordPK> retrievals = generateRetrieval(bundle,
|
||||
retrieval.getIdentifier());
|
||||
// generateRetrieval will pipeline the RetrievalRecord Objects
|
||||
// created to the DB.
|
||||
// The PK objects returned are sent to the RetrievalQueue for
|
||||
// processing.
|
||||
List<RetrievalRequestRecordPK> retrievals = generateRetrieval(
|
||||
bundle, retrieval.getIdentifier());
|
||||
|
||||
if (!CollectionUtil.isNullOrEmpty(retrievals)) {
|
||||
try {
|
||||
|
@ -134,13 +184,16 @@ public class SubscriptionRetrievalAgent extends
|
|||
statusHandler.info("Sent " + retrievals.size()
|
||||
+ " retrievals to queue. " + network.toString());
|
||||
} else {
|
||||
// Normally this is the job of the SubscriptionNotifyTask, but if no
|
||||
// Normally this is the job of the SubscriptionNotifyTask, but
|
||||
// if no
|
||||
// retrievals were generated we have to send it manually
|
||||
RetrievalManagerNotifyEvent retrievalManagerNotifyEvent = new RetrievalManagerNotifyEvent();
|
||||
retrievalManagerNotifyEvent.setId(Long.toString(retrieval.getId()));
|
||||
retrievalManagerNotifyEvent.setId(Long.toString(retrieval
|
||||
.getId()));
|
||||
EventBus.publish(retrievalManagerNotifyEvent);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getAgentType() {
|
||||
|
|
Loading…
Add table
Reference in a new issue