Merge branch 'omaha_14.2.1' (14.2.1-11) into development

Change-Id: I5634c5762d47463b1e636271462800d36100b8a1

Former-commit-id: 751b37616e [formerly 042468f761] [formerly 7af0cfaf38] [formerly 7ebbc0cf78 [formerly 7af0cfaf38 [formerly 3f0cf9200a42adb2bd7dbd70e50642e940d6b822]]]
Former-commit-id: 7ebbc0cf78
Former-commit-id: 67499353bc5b2802a0bbc17ca5f119b6ddd6836d [formerly e8ea7b80e6]
Former-commit-id: f5e1fb569a
This commit is contained in:
Richard Peter 2014-03-12 09:47:16 -05:00
commit abf39442ba
7 changed files with 224 additions and 80 deletions

View file

@ -128,7 +128,7 @@
<memory-setting>
<command-line-args>
<first-arg>-perspective</first-arg>
<second-arg>National Centers</second-arg>
<second-arg>NCP</second-arg>
</command-line-args>
<ini-substitutions>
@ -238,7 +238,7 @@
<memory-setting>
<command-line-args>
<first-arg>-perspective</first-arg>
<second-arg>National Centers</second-arg>
<second-arg>NCP</second-arg>
</command-line-args>
<ini-substitutions>

View file

@ -62,12 +62,14 @@ import com.vividsolutions.jts.geom.Polygon;
* <pre>
*
* SOFTWARE HISTORY
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Jul 30, 2010 mschenke Initial creation
* Oct 31, 2012 DR 15287 D. Friedman Fix overlap calculation
* Nov 06, 2012 DR 15157 D. Friedman Allow configured inclusion percentage
* Oct 10, 2013 2104 mschenke Fixed broken percentage calculation
* Date Ticket# Engineer Description
* ------------- -------- ----------- --------------------------
* Jul 30, 2010 mschenke Initial creation
* Oct 31, 2012 15287 D. Friedman Fix overlap calculation
* Nov 06, 2012 15157 D. Friedman Allow configured inclusion percentage
* Oct 10, 2013 2104 mschenke Fixed broken percentage calculation
* Mar 11, 2014 2896 bsteffen Limit the number of divisions.
*
*
* </pre>
*
@ -322,11 +324,22 @@ public class SatBestResResourceData extends AbstractRequestableResourceData {
double threshold = targetGeometry.getEnvelope().getSpan(0)
/ targetGeometry.getGridRange().getSpan(0);
int xDiv = (int) (envWidth / 100);
int yDiv = (int) (envHeight / 100);
if (xDiv * yDiv > 1024 * 1024) {
/* Don't wasste too much time/memory, preserve aspect ratio. */
if (xDiv > yDiv) {
yDiv = 1024 * yDiv / xDiv;
xDiv = 1024;
} else {
xDiv = 1024 * xDiv / yDiv;
yDiv = 1024;
}
}
Geometry intersection = EnvelopeIntersection
.createEnvelopeIntersection(gridGeometry.getEnvelope(),
targetGeometry.getEnvelope(), threshold,
(int) (envWidth / 100.0),
(int) (envHeight / 100.0));
targetGeometry.getEnvelope(), threshold, xDiv,
yDiv);
if (area == null) {
area = intersection;
} else {

View file

@ -1,5 +1,8 @@
package com.raytheon.uf.edex.datadelivery.bandwidth.retrieval;
import java.util.ArrayList;
import java.util.List;
import com.raytheon.uf.common.datadelivery.registry.Network;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
@ -23,6 +26,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthAllocation;
* Nov 09, 2012 1286 djohnson Add ability to kill the threads when BandwidthManager instance is replaced.
* Mar 05, 2013 1647 djohnson Sleep one minute between checks.
* Jan 30, 2014 2686 dhladky refactor of retrieval.
* Feb 10, 2014 2678 dhladky Prevent duplicate allocations.
*
* </pre>
*
@ -102,28 +106,38 @@ public abstract class RetrievalAgent<ALLOCATION_TYPE extends BandwidthAllocation
* @throws EdexException
*/
public void doRun() throws EdexException {
statusHandler.info(network+ ": Checking for bandwidth allocations to process...");
BandwidthAllocation reservation = retrievalManager.nextAllocation(
network, getAgentType());
if (reservation == RetrievalManager.POISON_PILL) {
statusHandler
.info("Received kill request, this thread is shutting down...");
dead = true;
return;
}
statusHandler.info(network
+ ": Checking for bandwidth allocations to process...");
List<BandwidthAllocation> allocationReservations = retrievalManager
.getRecentAllocations(network, getAgentType());
if (reservation != null) {
ALLOCATION_TYPE allocation = getAllocationTypeClass().cast(
reservation);
statusHandler.info(network+": Processing allocation id ["
+ allocation.getId() + "]");
if (allocationReservations != null) {
List<ALLOCATION_TYPE> allocations = new ArrayList<ALLOCATION_TYPE>(
allocationReservations.size());
for (BandwidthAllocation bandwidthAlloc : allocationReservations) {
if (bandwidthAlloc == RetrievalManager.POISON_PILL) {
statusHandler
.info("Received kill request, this thread is shutting down...");
dead = true;
return;
}
// cast to type class
ALLOCATION_TYPE allocation = (ALLOCATION_TYPE) getAllocationTypeClass()
.cast(bandwidthAlloc);
allocations.add(allocation);
statusHandler.info(network + ": Processing allocation["
+ allocation.getId() + "]");
}
processAllocations(allocations);
processAllocation(allocation);
} else {
synchronized (notifier) {
try {
statusHandler.info(network+": None found, sleeping for ["
statusHandler.info(network + ": None found, sleeping for ["
+ SLEEP_TIME + "]");
notifier.wait(SLEEP_TIME);
@ -152,12 +166,12 @@ public abstract class RetrievalAgent<ALLOCATION_TYPE extends BandwidthAllocation
/**
* Process the {@link BandwidthAllocation} retrieved.
*
* @param allocation
* the allocation
* @param allocations
* the allocations
* @throws EdexException
* on error processing the allocation
*/
abstract void processAllocation(ALLOCATION_TYPE allocation)
abstract void processAllocations(List<ALLOCATION_TYPE> allocations)
throws EdexException;
/**
* Get the network
@ -166,4 +180,4 @@ public abstract class RetrievalAgent<ALLOCATION_TYPE extends BandwidthAllocation
public Network getNetwork() {
return network;
}
}
}

View file

@ -40,6 +40,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.RetrievalManagerNotifyEvent;
* Jul 09, 2013 2106 djohnson Only needs to unregister from the EventBus when used in an EDEX instance, so handled in EdexBandwidthManager.
* Oct 03, 2013 2267 bgonzale Added check for no retrieval plan matching in the proposed retrieval plans.
* Jan 30, 2014 2686 dhladky refactor of retrieval.
* Feb 10, 2014 2678 dhladky Prevent duplicate allocations.
*
* </pre>
*
@ -53,7 +54,7 @@ public class RetrievalManager {
// Package-private on purpose so agents have visibility
static final BandwidthAllocation POISON_PILL = new BandwidthAllocation();
private final IBandwidthDao bandwidthDao;
// A Map of the Paths to retrievalPlans
@ -193,6 +194,32 @@ public class RetrievalManager {
}
return null;
}
/***
* Method used in practice because we need to search for expired allocations.
* @param network
* @param agentType
* @return
*/
public List<BandwidthAllocation> getRecentAllocations(Network network, String agentType) {
List<BandwidthAllocation> allocations = null;
if (shutdown) {
allocations = new ArrayList<BandwidthAllocation>(1);
allocations.add(POISON_PILL);
return allocations;
}
RetrievalPlan plan = getRetrievalPlans().get(network);
if (plan != null) {
synchronized (plan) {
return plan.getRecentAllocations(agentType);
}
}
return allocations;
}
public final RetrievalPlan getPlan(Network network) {
return getRetrievalPlans().get(network);
@ -288,4 +315,4 @@ public class RetrievalManager {
retrievalPlan.init();
}
}
}
}

View file

@ -1,5 +1,6 @@
package com.raytheon.uf.edex.datadelivery.bandwidth.retrieval;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
@ -50,6 +51,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil;
* BandwidthReservations. Add constrained bucket addition method.
* Added debug logging.
* Jan 08, 2014 2615 bgonzale Log registry bandwidth calculation errors.
* Feb 10, 2014 2678 dhladky Prevent duplicate allocations.
*
* </pre>
*
@ -388,6 +390,40 @@ public class RetrievalPlan {
return reservation;
}
/**
* Get the scheduled allocations from recent buckets
*
* @param agentType
*
* @return reservations
*/
public List<BandwidthAllocation> getRecentAllocations(String agentType) {
List<BandwidthAllocation> reservations = null;
synchronized (bucketsLock) {
// Get the portion of the Map that is before the
// current time (DO NOT want to return future reservations)
final List<BandwidthBucket> buckets = bucketsDao
.getWhereStartTimeIsLessThanOrEqualTo(
TimeUtil.currentTimeMillis(), network);
// Iterate over the buckets and find all
// BandwidthAllocation that are in the READY state
for (BandwidthBucket bucket : buckets) {
BandwidthAllocation allocationReservation = associator.getNextReservation(bucket, agentType);
if (allocationReservation != null) {
if (reservations == null) {
reservations = new ArrayList<BandwidthAllocation>();
}
reservations.add(allocationReservation);
}
}
}
return reservations;
}
public void updateRequestMapping(long requestId,
Set<BandwidthBucket> buckets) {
@ -706,4 +742,4 @@ public class RetrievalPlan {
this.associator.copyState(fromPlan.associator);
}
}
}

View file

@ -1,3 +1,4 @@
/**
*
*/
@ -6,6 +7,8 @@ package com.raytheon.uf.edex.datadelivery.bandwidth.retrieval;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import com.raytheon.uf.common.datadelivery.registry.Network;
import com.raytheon.uf.common.datadelivery.registry.Provider;
@ -55,6 +58,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.util.RetrievalGeneratorUtilit
* RetrievalTasks (PerformRetrievalsThenReturnFinder).
* Added constructor that sets the retrievalQueue to null.
* Jan 30, 2014 2686 dhladky refactor of retrieval.
* Feb 10, 2014 2678 dhladky Prevent duplicate allocations.
*
* </pre>
*
@ -86,59 +90,108 @@ public class SubscriptionRetrievalAgent extends
}
@Override
void processAllocation(SubscriptionRetrieval retrieval)
void processAllocations(List<SubscriptionRetrieval> subRetrievals)
throws EdexException {
Subscription<?, ?> sub;
try {
sub = bandwidthDao.getSubscriptionRetrievalAttributes(retrieval)
.getSubscription();
} catch (SerializationException e) {
throw new EdexException("Unable to deserialize the subscription.",
e);
}
final String originalSubName = sub.getName();
SubscriptionBundle bundle = new SubscriptionBundle();
Provider provider = getProvider(sub.getProvider());
if (provider == null) {
statusHandler.error("provider was null, skipping subscription ["
+ originalSubName + "]");
return;
}
bundle.setBundleId(sub.getSubscriptionId());
bundle.setPriority(retrieval.getPriority());
bundle.setProvider(provider);
bundle.setConnection(provider.getConnection());
bundle.setSubscription(sub);
ConcurrentHashMap<Subscription<?, ?>, SubscriptionRetrieval> retrievalsMap = new ConcurrentHashMap<Subscription<?, ?>, SubscriptionRetrieval>();
retrieval.setActualStart(TimeUtil.newGmtCalendar());
retrieval.setStatus(RetrievalStatus.RETRIEVAL);
// Get subs from allocations and search for duplicates
for (SubscriptionRetrieval subRetrieval : subRetrievals) {
// update database
bandwidthDao.update(retrieval);
Subscription<?, ?> sub = null;
// generateRetrieval will pipeline the RetrievalRecord Objects created to the DB.
// The PK objects returned are sent to the RetrievalQueue for processing.
List<RetrievalRequestRecordPK> retrievals = generateRetrieval(bundle,
retrieval.getIdentifier());
if (!CollectionUtil.isNullOrEmpty(retrievals)) {
try {
Object[] payload = retrievals.toArray();
RetrievalGeneratorUtilities.sendToRetrieval(destinationUri,
network, payload);
} catch (Exception e) {
statusHandler.handle(Priority.PROBLEM,
"Couldn't send RetrievalRecords to Queue!", e);
sub = bandwidthDao.getSubscriptionRetrievalAttributes(
subRetrieval).getSubscription();
} catch (SerializationException e) {
throw new EdexException(
"Unable to deserialize the subscription.", e);
}
// We only allow one subscription retrieval per DSM update.
// Remove any duplicate subscription allocations/retrievals, cancel them.
if (!retrievalsMap.containsKey(sub)) {
retrievalsMap.put(sub, subRetrieval);
} else {
// Check for most recent startTime, that's the one we want for
// retrieval.
SubscriptionRetrieval currentRetrieval = retrievalsMap.get(sub);
if (subRetrieval.getStartTime().getTime()
.after(currentRetrieval.getStartTime().getTime())) {
// Replace it in the map, set previous to canceled.
currentRetrieval.setStatus(RetrievalStatus.CANCELLED);
bandwidthDao.update(currentRetrieval);
retrievalsMap.replace(sub, subRetrieval);
statusHandler
.info("More recent, setting previous allocation to Cancelled ["
+ currentRetrieval.getIdentifier()
+ "] "
+ sub.getName());
} else {
// Not more recent, cancel
subRetrieval.setStatus(RetrievalStatus.CANCELLED);
bandwidthDao.update(subRetrieval);
statusHandler
.info("Older, setting to Cancelled ["
+ currentRetrieval.getIdentifier() + "] "
+ sub.getName());
}
}
}
for (Entry<Subscription<?, ?>, SubscriptionRetrieval> entry: retrievalsMap.entrySet()) {
SubscriptionRetrieval retrieval = entry.getValue();
Subscription<?, ?> sub = entry.getKey();
final String originalSubName = sub.getName();
Provider provider = getProvider(sub.getProvider());
if (provider == null) {
statusHandler
.error("provider was null, skipping subscription ["
+ originalSubName + "]");
return;
}
bundle.setBundleId(sub.getSubscriptionId());
bundle.setPriority(retrieval.getPriority());
bundle.setProvider(provider);
bundle.setConnection(provider.getConnection());
bundle.setSubscription(sub);
retrieval.setActualStart(TimeUtil.newGmtCalendar());
retrieval.setStatus(RetrievalStatus.RETRIEVAL);
// update database
bandwidthDao.update(retrieval);
// generateRetrieval will pipeline the RetrievalRecord Objects
// created to the DB.
// The PK objects returned are sent to the RetrievalQueue for
// processing.
List<RetrievalRequestRecordPK> retrievals = generateRetrieval(
bundle, retrieval.getIdentifier());
if (!CollectionUtil.isNullOrEmpty(retrievals)) {
try {
Object[] payload = retrievals.toArray();
RetrievalGeneratorUtilities.sendToRetrieval(destinationUri,
network, payload);
} catch (Exception e) {
statusHandler.handle(Priority.PROBLEM,
"Couldn't send RetrievalRecords to Queue!", e);
}
statusHandler.info("Sent " + retrievals.size()
+ " retrievals to queue. " + network.toString());
} else {
// Normally this is the job of the SubscriptionNotifyTask, but
// if no
// retrievals were generated we have to send it manually
RetrievalManagerNotifyEvent retrievalManagerNotifyEvent = new RetrievalManagerNotifyEvent();
retrievalManagerNotifyEvent.setId(Long.toString(retrieval
.getId()));
EventBus.publish(retrievalManagerNotifyEvent);
}
statusHandler.info("Sent " + retrievals.size()
+ " retrievals to queue. " + network.toString());
} else {
// Normally this is the job of the SubscriptionNotifyTask, but if no
// retrievals were generated we have to send it manually
RetrievalManagerNotifyEvent retrievalManagerNotifyEvent = new RetrievalManagerNotifyEvent();
retrievalManagerNotifyEvent.setId(Long.toString(retrieval.getId()));
EventBus.publish(retrievalManagerNotifyEvent);
}
}
@ -266,4 +319,4 @@ public class SubscriptionRetrievalAgent extends
}
}
}
}

View file

@ -15,6 +15,7 @@
<!-- Fed from plugin notification -->
<from uri="jms-durable:grid-staticdata-generate" />
<doTry>
<bean ref="serializationUtil" method="transformFromThrift" />
<bean ref="staticDataGenerator" method="processNotification"/>
<to uri="direct-vm:stageNotification"/>
<doCatch>