Merge branch 'omaha_14.2.1' (14.2.1-11) into development
Change-Id: I5634c5762d47463b1e636271462800d36100b8a1 Former-commit-id:93611baab0
[formerly751b37616e
] [formerly042468f761
] [formerly042468f761
[formerly7af0cfaf38
]] [formerly93611baab0
[formerly751b37616e
] [formerly042468f761
] [formerly042468f761
[formerly7af0cfaf38
]] [formerly7ebbc0cf78
[formerly042468f761
[formerly7af0cfaf38
] [formerly7ebbc0cf78
[formerly 3f0cf9200a42adb2bd7dbd70e50642e940d6b822]]]]] Former-commit-id:7ebbc0cf78
Former-commit-id:a508adfca0
[formerlyabf39442ba
] [formerlyf5e1fb569a
] [formerly 4cd0eb28bae18c7af3d946f7c44fd29ed4c89f8b [formerly 67499353bc5b2802a0bbc17ca5f119b6ddd6836d] [formerlyf5e1fb569a
[formerlye8ea7b80e6
]]] Former-commit-id: 38807c094d5b1e2eb2514851e2516f425f32c07a [formerly cbe3cfa55d05a9b0aaca0d648dd1b36fd4c563be] [formerly8b425fed86
[formerlye2ae7de485
]] Former-commit-id:8b425fed86
Former-commit-id:493e805cbe
This commit is contained in:
commit
2f1835100d
7 changed files with 224 additions and 80 deletions
|
@ -128,7 +128,7 @@
|
|||
<memory-setting>
|
||||
<command-line-args>
|
||||
<first-arg>-perspective</first-arg>
|
||||
<second-arg>National Centers</second-arg>
|
||||
<second-arg>NCP</second-arg>
|
||||
</command-line-args>
|
||||
|
||||
<ini-substitutions>
|
||||
|
@ -238,7 +238,7 @@
|
|||
<memory-setting>
|
||||
<command-line-args>
|
||||
<first-arg>-perspective</first-arg>
|
||||
<second-arg>National Centers</second-arg>
|
||||
<second-arg>NCP</second-arg>
|
||||
</command-line-args>
|
||||
|
||||
<ini-substitutions>
|
||||
|
|
|
@ -62,12 +62,14 @@ import com.vividsolutions.jts.geom.Polygon;
|
|||
* <pre>
|
||||
*
|
||||
* SOFTWARE HISTORY
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Jul 30, 2010 mschenke Initial creation
|
||||
* Oct 31, 2012 DR 15287 D. Friedman Fix overlap calculation
|
||||
* Nov 06, 2012 DR 15157 D. Friedman Allow configured inclusion percentage
|
||||
* Oct 10, 2013 2104 mschenke Fixed broken percentage calculation
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------- -------- ----------- --------------------------
|
||||
* Jul 30, 2010 mschenke Initial creation
|
||||
* Oct 31, 2012 15287 D. Friedman Fix overlap calculation
|
||||
* Nov 06, 2012 15157 D. Friedman Allow configured inclusion percentage
|
||||
* Oct 10, 2013 2104 mschenke Fixed broken percentage calculation
|
||||
* Mar 11, 2014 2896 bsteffen Limit the number of divisions.
|
||||
*
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -322,11 +324,22 @@ public class SatBestResResourceData extends AbstractRequestableResourceData {
|
|||
double threshold = targetGeometry.getEnvelope().getSpan(0)
|
||||
/ targetGeometry.getGridRange().getSpan(0);
|
||||
|
||||
int xDiv = (int) (envWidth / 100);
|
||||
int yDiv = (int) (envHeight / 100);
|
||||
if (xDiv * yDiv > 1024 * 1024) {
|
||||
/* Don't wasste too much time/memory, preserve aspect ratio. */
|
||||
if (xDiv > yDiv) {
|
||||
yDiv = 1024 * yDiv / xDiv;
|
||||
xDiv = 1024;
|
||||
} else {
|
||||
xDiv = 1024 * xDiv / yDiv;
|
||||
yDiv = 1024;
|
||||
}
|
||||
}
|
||||
Geometry intersection = EnvelopeIntersection
|
||||
.createEnvelopeIntersection(gridGeometry.getEnvelope(),
|
||||
targetGeometry.getEnvelope(), threshold,
|
||||
(int) (envWidth / 100.0),
|
||||
(int) (envHeight / 100.0));
|
||||
targetGeometry.getEnvelope(), threshold, xDiv,
|
||||
yDiv);
|
||||
if (area == null) {
|
||||
area = intersection;
|
||||
} else {
|
||||
|
|
|
@ -1,5 +1,8 @@
|
|||
package com.raytheon.uf.edex.datadelivery.bandwidth.retrieval;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import com.raytheon.uf.common.datadelivery.registry.Network;
|
||||
import com.raytheon.uf.common.status.IUFStatusHandler;
|
||||
import com.raytheon.uf.common.status.UFStatus;
|
||||
|
@ -23,6 +26,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthAllocation;
|
|||
* Nov 09, 2012 1286 djohnson Add ability to kill the threads when BandwidthManager instance is replaced.
|
||||
* Mar 05, 2013 1647 djohnson Sleep one minute between checks.
|
||||
* Jan 30, 2014 2686 dhladky refactor of retrieval.
|
||||
* Feb 10, 2014 2678 dhladky Prevent duplicate allocations.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -102,28 +106,38 @@ public abstract class RetrievalAgent<ALLOCATION_TYPE extends BandwidthAllocation
|
|||
* @throws EdexException
|
||||
*/
|
||||
public void doRun() throws EdexException {
|
||||
statusHandler.info(network+ ": Checking for bandwidth allocations to process...");
|
||||
BandwidthAllocation reservation = retrievalManager.nextAllocation(
|
||||
network, getAgentType());
|
||||
|
||||
if (reservation == RetrievalManager.POISON_PILL) {
|
||||
statusHandler
|
||||
.info("Received kill request, this thread is shutting down...");
|
||||
dead = true;
|
||||
return;
|
||||
}
|
||||
statusHandler.info(network
|
||||
+ ": Checking for bandwidth allocations to process...");
|
||||
List<BandwidthAllocation> allocationReservations = retrievalManager
|
||||
.getRecentAllocations(network, getAgentType());
|
||||
|
||||
if (reservation != null) {
|
||||
ALLOCATION_TYPE allocation = getAllocationTypeClass().cast(
|
||||
reservation);
|
||||
statusHandler.info(network+": Processing allocation id ["
|
||||
+ allocation.getId() + "]");
|
||||
if (allocationReservations != null) {
|
||||
|
||||
List<ALLOCATION_TYPE> allocations = new ArrayList<ALLOCATION_TYPE>(
|
||||
allocationReservations.size());
|
||||
|
||||
for (BandwidthAllocation bandwidthAlloc : allocationReservations) {
|
||||
if (bandwidthAlloc == RetrievalManager.POISON_PILL) {
|
||||
statusHandler
|
||||
.info("Received kill request, this thread is shutting down...");
|
||||
dead = true;
|
||||
return;
|
||||
}
|
||||
// cast to type class
|
||||
ALLOCATION_TYPE allocation = (ALLOCATION_TYPE) getAllocationTypeClass()
|
||||
.cast(bandwidthAlloc);
|
||||
allocations.add(allocation);
|
||||
statusHandler.info(network + ": Processing allocation["
|
||||
+ allocation.getId() + "]");
|
||||
}
|
||||
|
||||
processAllocations(allocations);
|
||||
|
||||
processAllocation(allocation);
|
||||
} else {
|
||||
synchronized (notifier) {
|
||||
try {
|
||||
statusHandler.info(network+": None found, sleeping for ["
|
||||
statusHandler.info(network + ": None found, sleeping for ["
|
||||
+ SLEEP_TIME + "]");
|
||||
|
||||
notifier.wait(SLEEP_TIME);
|
||||
|
@ -152,12 +166,12 @@ public abstract class RetrievalAgent<ALLOCATION_TYPE extends BandwidthAllocation
|
|||
/**
|
||||
* Process the {@link BandwidthAllocation} retrieved.
|
||||
*
|
||||
* @param allocation
|
||||
* the allocation
|
||||
* @param allocations
|
||||
* the allocations
|
||||
* @throws EdexException
|
||||
* on error processing the allocation
|
||||
*/
|
||||
abstract void processAllocation(ALLOCATION_TYPE allocation)
|
||||
abstract void processAllocations(List<ALLOCATION_TYPE> allocations)
|
||||
throws EdexException;
|
||||
/**
|
||||
* Get the network
|
||||
|
@ -166,4 +180,4 @@ public abstract class RetrievalAgent<ALLOCATION_TYPE extends BandwidthAllocation
|
|||
public Network getNetwork() {
|
||||
return network;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -40,6 +40,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.RetrievalManagerNotifyEvent;
|
|||
* Jul 09, 2013 2106 djohnson Only needs to unregister from the EventBus when used in an EDEX instance, so handled in EdexBandwidthManager.
|
||||
* Oct 03, 2013 2267 bgonzale Added check for no retrieval plan matching in the proposed retrieval plans.
|
||||
* Jan 30, 2014 2686 dhladky refactor of retrieval.
|
||||
* Feb 10, 2014 2678 dhladky Prevent duplicate allocations.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -53,7 +54,7 @@ public class RetrievalManager {
|
|||
|
||||
// Package-private on purpose so agents have visibility
|
||||
static final BandwidthAllocation POISON_PILL = new BandwidthAllocation();
|
||||
|
||||
|
||||
private final IBandwidthDao bandwidthDao;
|
||||
|
||||
// A Map of the Paths to retrievalPlans
|
||||
|
@ -193,6 +194,32 @@ public class RetrievalManager {
|
|||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/***
|
||||
* Method used in practice because we need to search for expired allocations.
|
||||
* @param network
|
||||
* @param agentType
|
||||
* @return
|
||||
*/
|
||||
public List<BandwidthAllocation> getRecentAllocations(Network network, String agentType) {
|
||||
|
||||
List<BandwidthAllocation> allocations = null;
|
||||
|
||||
if (shutdown) {
|
||||
allocations = new ArrayList<BandwidthAllocation>(1);
|
||||
allocations.add(POISON_PILL);
|
||||
return allocations;
|
||||
}
|
||||
|
||||
RetrievalPlan plan = getRetrievalPlans().get(network);
|
||||
if (plan != null) {
|
||||
synchronized (plan) {
|
||||
return plan.getRecentAllocations(agentType);
|
||||
}
|
||||
}
|
||||
|
||||
return allocations;
|
||||
}
|
||||
|
||||
public final RetrievalPlan getPlan(Network network) {
|
||||
return getRetrievalPlans().get(network);
|
||||
|
@ -288,4 +315,4 @@ public class RetrievalManager {
|
|||
retrievalPlan.init();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,5 +1,6 @@
|
|||
package com.raytheon.uf.edex.datadelivery.bandwidth.retrieval;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Calendar;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
@ -50,6 +51,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil;
|
|||
* BandwidthReservations. Add constrained bucket addition method.
|
||||
* Added debug logging.
|
||||
* Jan 08, 2014 2615 bgonzale Log registry bandwidth calculation errors.
|
||||
* Feb 10, 2014 2678 dhladky Prevent duplicate allocations.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -388,6 +390,40 @@ public class RetrievalPlan {
|
|||
|
||||
return reservation;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the scheduled allocations from recent buckets
|
||||
*
|
||||
* @param agentType
|
||||
*
|
||||
* @return reservations
|
||||
*/
|
||||
public List<BandwidthAllocation> getRecentAllocations(String agentType) {
|
||||
List<BandwidthAllocation> reservations = null;
|
||||
|
||||
synchronized (bucketsLock) {
|
||||
|
||||
// Get the portion of the Map that is before the
|
||||
// current time (DO NOT want to return future reservations)
|
||||
final List<BandwidthBucket> buckets = bucketsDao
|
||||
.getWhereStartTimeIsLessThanOrEqualTo(
|
||||
TimeUtil.currentTimeMillis(), network);
|
||||
|
||||
// Iterate over the buckets and find all
|
||||
// BandwidthAllocation that are in the READY state
|
||||
for (BandwidthBucket bucket : buckets) {
|
||||
BandwidthAllocation allocationReservation = associator.getNextReservation(bucket, agentType);
|
||||
if (allocationReservation != null) {
|
||||
if (reservations == null) {
|
||||
reservations = new ArrayList<BandwidthAllocation>();
|
||||
}
|
||||
reservations.add(allocationReservation);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return reservations;
|
||||
}
|
||||
|
||||
public void updateRequestMapping(long requestId,
|
||||
Set<BandwidthBucket> buckets) {
|
||||
|
@ -706,4 +742,4 @@ public class RetrievalPlan {
|
|||
this.associator.copyState(fromPlan.associator);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -1,3 +1,4 @@
|
|||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
|
@ -6,6 +7,8 @@ package com.raytheon.uf.edex.datadelivery.bandwidth.retrieval;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import com.raytheon.uf.common.datadelivery.registry.Network;
|
||||
import com.raytheon.uf.common.datadelivery.registry.Provider;
|
||||
|
@ -55,6 +58,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.util.RetrievalGeneratorUtilit
|
|||
* RetrievalTasks (PerformRetrievalsThenReturnFinder).
|
||||
* Added constructor that sets the retrievalQueue to null.
|
||||
* Jan 30, 2014 2686 dhladky refactor of retrieval.
|
||||
* Feb 10, 2014 2678 dhladky Prevent duplicate allocations.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -86,59 +90,108 @@ public class SubscriptionRetrievalAgent extends
|
|||
}
|
||||
|
||||
@Override
|
||||
void processAllocation(SubscriptionRetrieval retrieval)
|
||||
void processAllocations(List<SubscriptionRetrieval> subRetrievals)
|
||||
throws EdexException {
|
||||
Subscription<?, ?> sub;
|
||||
try {
|
||||
sub = bandwidthDao.getSubscriptionRetrievalAttributes(retrieval)
|
||||
.getSubscription();
|
||||
} catch (SerializationException e) {
|
||||
throw new EdexException("Unable to deserialize the subscription.",
|
||||
e);
|
||||
}
|
||||
final String originalSubName = sub.getName();
|
||||
|
||||
SubscriptionBundle bundle = new SubscriptionBundle();
|
||||
Provider provider = getProvider(sub.getProvider());
|
||||
if (provider == null) {
|
||||
statusHandler.error("provider was null, skipping subscription ["
|
||||
+ originalSubName + "]");
|
||||
return;
|
||||
}
|
||||
bundle.setBundleId(sub.getSubscriptionId());
|
||||
bundle.setPriority(retrieval.getPriority());
|
||||
bundle.setProvider(provider);
|
||||
bundle.setConnection(provider.getConnection());
|
||||
bundle.setSubscription(sub);
|
||||
ConcurrentHashMap<Subscription<?, ?>, SubscriptionRetrieval> retrievalsMap = new ConcurrentHashMap<Subscription<?, ?>, SubscriptionRetrieval>();
|
||||
|
||||
retrieval.setActualStart(TimeUtil.newGmtCalendar());
|
||||
retrieval.setStatus(RetrievalStatus.RETRIEVAL);
|
||||
// Get subs from allocations and search for duplicates
|
||||
for (SubscriptionRetrieval subRetrieval : subRetrievals) {
|
||||
|
||||
// update database
|
||||
bandwidthDao.update(retrieval);
|
||||
Subscription<?, ?> sub = null;
|
||||
|
||||
// generateRetrieval will pipeline the RetrievalRecord Objects created to the DB.
|
||||
// The PK objects returned are sent to the RetrievalQueue for processing.
|
||||
List<RetrievalRequestRecordPK> retrievals = generateRetrieval(bundle,
|
||||
retrieval.getIdentifier());
|
||||
|
||||
if (!CollectionUtil.isNullOrEmpty(retrievals)) {
|
||||
try {
|
||||
Object[] payload = retrievals.toArray();
|
||||
RetrievalGeneratorUtilities.sendToRetrieval(destinationUri,
|
||||
network, payload);
|
||||
} catch (Exception e) {
|
||||
statusHandler.handle(Priority.PROBLEM,
|
||||
"Couldn't send RetrievalRecords to Queue!", e);
|
||||
sub = bandwidthDao.getSubscriptionRetrievalAttributes(
|
||||
subRetrieval).getSubscription();
|
||||
} catch (SerializationException e) {
|
||||
throw new EdexException(
|
||||
"Unable to deserialize the subscription.", e);
|
||||
}
|
||||
|
||||
// We only allow one subscription retrieval per DSM update.
|
||||
// Remove any duplicate subscription allocations/retrievals, cancel them.
|
||||
if (!retrievalsMap.containsKey(sub)) {
|
||||
retrievalsMap.put(sub, subRetrieval);
|
||||
} else {
|
||||
// Check for most recent startTime, that's the one we want for
|
||||
// retrieval.
|
||||
SubscriptionRetrieval currentRetrieval = retrievalsMap.get(sub);
|
||||
if (subRetrieval.getStartTime().getTime()
|
||||
.after(currentRetrieval.getStartTime().getTime())) {
|
||||
// Replace it in the map, set previous to canceled.
|
||||
currentRetrieval.setStatus(RetrievalStatus.CANCELLED);
|
||||
bandwidthDao.update(currentRetrieval);
|
||||
retrievalsMap.replace(sub, subRetrieval);
|
||||
statusHandler
|
||||
.info("More recent, setting previous allocation to Cancelled ["
|
||||
+ currentRetrieval.getIdentifier()
|
||||
+ "] "
|
||||
+ sub.getName());
|
||||
} else {
|
||||
// Not more recent, cancel
|
||||
subRetrieval.setStatus(RetrievalStatus.CANCELLED);
|
||||
bandwidthDao.update(subRetrieval);
|
||||
statusHandler
|
||||
.info("Older, setting to Cancelled ["
|
||||
+ currentRetrieval.getIdentifier() + "] "
|
||||
+ sub.getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (Entry<Subscription<?, ?>, SubscriptionRetrieval> entry: retrievalsMap.entrySet()) {
|
||||
|
||||
SubscriptionRetrieval retrieval = entry.getValue();
|
||||
Subscription<?, ?> sub = entry.getKey();
|
||||
final String originalSubName = sub.getName();
|
||||
|
||||
Provider provider = getProvider(sub.getProvider());
|
||||
if (provider == null) {
|
||||
statusHandler
|
||||
.error("provider was null, skipping subscription ["
|
||||
+ originalSubName + "]");
|
||||
return;
|
||||
}
|
||||
bundle.setBundleId(sub.getSubscriptionId());
|
||||
bundle.setPriority(retrieval.getPriority());
|
||||
bundle.setProvider(provider);
|
||||
bundle.setConnection(provider.getConnection());
|
||||
bundle.setSubscription(sub);
|
||||
|
||||
retrieval.setActualStart(TimeUtil.newGmtCalendar());
|
||||
retrieval.setStatus(RetrievalStatus.RETRIEVAL);
|
||||
|
||||
// update database
|
||||
bandwidthDao.update(retrieval);
|
||||
|
||||
// generateRetrieval will pipeline the RetrievalRecord Objects
|
||||
// created to the DB.
|
||||
// The PK objects returned are sent to the RetrievalQueue for
|
||||
// processing.
|
||||
List<RetrievalRequestRecordPK> retrievals = generateRetrieval(
|
||||
bundle, retrieval.getIdentifier());
|
||||
|
||||
if (!CollectionUtil.isNullOrEmpty(retrievals)) {
|
||||
try {
|
||||
Object[] payload = retrievals.toArray();
|
||||
RetrievalGeneratorUtilities.sendToRetrieval(destinationUri,
|
||||
network, payload);
|
||||
} catch (Exception e) {
|
||||
statusHandler.handle(Priority.PROBLEM,
|
||||
"Couldn't send RetrievalRecords to Queue!", e);
|
||||
}
|
||||
statusHandler.info("Sent " + retrievals.size()
|
||||
+ " retrievals to queue. " + network.toString());
|
||||
} else {
|
||||
// Normally this is the job of the SubscriptionNotifyTask, but
|
||||
// if no
|
||||
// retrievals were generated we have to send it manually
|
||||
RetrievalManagerNotifyEvent retrievalManagerNotifyEvent = new RetrievalManagerNotifyEvent();
|
||||
retrievalManagerNotifyEvent.setId(Long.toString(retrieval
|
||||
.getId()));
|
||||
EventBus.publish(retrievalManagerNotifyEvent);
|
||||
}
|
||||
statusHandler.info("Sent " + retrievals.size()
|
||||
+ " retrievals to queue. " + network.toString());
|
||||
} else {
|
||||
// Normally this is the job of the SubscriptionNotifyTask, but if no
|
||||
// retrievals were generated we have to send it manually
|
||||
RetrievalManagerNotifyEvent retrievalManagerNotifyEvent = new RetrievalManagerNotifyEvent();
|
||||
retrievalManagerNotifyEvent.setId(Long.toString(retrieval.getId()));
|
||||
EventBus.publish(retrievalManagerNotifyEvent);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -266,4 +319,4 @@ public class SubscriptionRetrievalAgent extends
|
|||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -15,6 +15,7 @@
|
|||
<!-- Fed from plugin notification -->
|
||||
<from uri="jms-durable:grid-staticdata-generate" />
|
||||
<doTry>
|
||||
<bean ref="serializationUtil" method="transformFromThrift" />
|
||||
<bean ref="staticDataGenerator" method="processNotification"/>
|
||||
<to uri="direct-vm:stageNotification"/>
|
||||
<doCatch>
|
||||
|
|
Loading…
Add table
Reference in a new issue