Issue #2636 fixes for the scheduling gaps in the middle of subscription schedules.

Amend: Added max checks to prevent negative bucket sizes in RetrievalPlan remove methods.  Wrapped debug logging in is debug on checks.

Change-Id: I5ab6d3c103e82bf2b1aa0eaf167f0993e5778c90

Former-commit-id: 3db5d79b914436b8d0feed600fc586d7d3884706
This commit is contained in:
Brad Gonzales 2013-12-17 11:18:34 -06:00
parent d233a5161a
commit cc75442fa3
10 changed files with 141 additions and 48 deletions

View file

@ -35,7 +35,8 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Sep 20, 2013 2397 bgonzale Initial creation.
* Nov 25, 2545 2545 mpduff Added bucketTimeInMinutes.
* Nov 25, 2013 2545 mpduff Added bucketTimeInMinutes.
* Dec 17, 2013 2636 bgonzale Refactored bucket fill in edex.
*
* </pre>
*
@ -173,29 +174,4 @@ public class BandwidthBucketDescription implements
this.bucketTimeMinutes = bucketTimeMinutes;
}
/**
* Get any data that is leftover after filling the bucket.
*
* @return The amount of data overage
*/
public long getLeftovers() {
long leftover = 0;
if (this.usedBytes > this.bucketSize) {
leftover = usedBytes - bucketSize;
usedBytes = bucketSize;
}
return leftover;
}
/**
* Add any leftovers from the previous buckets.
*
* @param leftovers
* data overage from previous buckets
*/
public void addLeftovers(long leftovers) {
this.usedBytes += leftovers;
}
}

View file

@ -62,6 +62,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalPlan;
* Sep 20, 2013 2397 bgonzale Add Map of Bucket Descriptions to BandwidthGraphData.
* Nov 27, 2013 2545 mpduff Get data by network
* Dec 11, 2013 2566 bgonzale handle case when there are no reservations.
* Dec 17, 2013 2636 bgonzale Refactored bucket fill in edex.
*
* </pre>
*
@ -209,9 +210,6 @@ class BandwidthGraphDataAdapter {
BandwidthBucketDescription desc = new BandwidthBucketDescription(
bucket.getNetwork(), bucket.getBucketSize(),
bucket.getCurrentSize(), bucket.getBucketStartTime());
desc.addLeftovers(leftovers);
leftovers = desc.getLeftovers();
descriptions.add(desc);
}

View file

@ -131,6 +131,7 @@ import com.raytheon.uf.edex.registry.ebxml.exception.EbxmlRegistryException;
* Nov 19, 2013 2545 bgonzale changed getBandwidthGraphData to protected.
* Dec 04, 2013 2566 bgonzale added method to retrieve and parse spring files for a mode.
* Dec 11, 2013 2566 bgonzale fix spring resource resolution.
* Dec 17, 2013 2636 bgonzale Changed logging to differentiate the output.
*
* </pre>
*
@ -256,10 +257,10 @@ public abstract class BandwidthManager<T extends Time, C extends Coverage>
.newArrayListWithCapacity(numberOfRetrievalTimes);
for (Calendar retrievalTime : retrievalTimes) {
statusHandler.info("schedule() - Scheduling subscription ["
statusHandler.info("Scheduling subscription ["
+ subscription.getName()
+ String.format(
"] baseReferenceTime [%1$tY%1$tm%1$td%1$tH%1$tM",
"] retrievalTime [%1$tY%1$tm%1$td%1$tH%1$tM",
retrievalTime) + "]");
// Add the current subscription to the ones BandwidthManager already
@ -295,7 +296,7 @@ public abstract class BandwidthManager<T extends Time, C extends Coverage>
.getBandwidthSubscriptions(dao.getProvider(),
dao.getDataSetName(), retrievalTime);
statusHandler.info("schedule() - Scheduling subscription ["
statusHandler.info("Scheduling subscription ["
+ dao.getName()
+ String.format(
"] baseReferenceTime [%1$tY%1$tm%1$td%1$tH%1$tM",

View file

@ -68,6 +68,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil;
* Jul 11, 2013 2106 djohnson Use BandwidthSubscription instead of Subscription.
* Jul 18, 2013 1653 mpduff Implemented method.
* Oct 2, 2013 1797 dhladky generics
* Dec 17, 2013 2636 bgonzale Added method to get a BandwidthAllocation.
*
* </pre>
*
@ -668,4 +669,14 @@ class InMemoryBandwidthDao<T extends Time, C extends Coverage> implements IBandw
return null;
}
@Override
public BandwidthAllocation getBandwidthAllocation(long id) {
for (BandwidthAllocation current : bandwidthAllocations) {
if (current.getId() == id) {
return current;
}
}
return null;
}
}

View file

@ -47,6 +47,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Jun 18, 2013 2106 djohnson Extracted from {@link RetrievalPlan}.
* Dec 17, 2013 2636 bgonzale Throw exception if attempt to overfill the bucket.
*
* </pre>
*
@ -104,7 +105,7 @@ public class BandwidthBucket implements Comparable<BandwidthBucket>,
}
public long getAvailableBandwidth() {
return Math.max(0, bucketSize - currentSize);
return bucketSize - currentSize;
}
public long getBucketSize() {
@ -119,8 +120,13 @@ public class BandwidthBucket implements Comparable<BandwidthBucket>,
return currentSize;
}
public void setCurrentSize(long currentSize) {
this.currentSize = currentSize;
public void setCurrentSize(long size) {
if (size > this.bucketSize) {
throw new IllegalArgumentException("New data size, " + size
+ ", is greater than available bucket size "
+ this.bucketSize);
}
this.currentSize = size;
}
/**

View file

@ -51,6 +51,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalStatus;
* Jun 13, 2013 2095 djohnson Implement ability to store a collection of subscriptions.
* Jun 24, 2013 2106 djohnson Add more methods.
* Jul 18, 2013 1653 mpduff Added getSubscriptionStatusSummary.
* Dec 17, 2013 2636 bgonzale Added method to get a BandwidthAllocation.
*
* </pre>
*
@ -487,4 +488,11 @@ public interface IBandwidthDao<T extends Time, C extends Coverage> {
* @return the SubscriptionStatusSummary
*/
SubscriptionStatusSummary getSubscriptionStatusSummary(Subscription<T, C> sub);
/**
* Get the BandwidthAllocation identified by the given id.
*
* @param id
*/
BandwidthAllocation getBandwidthAllocation(long id);
}

View file

@ -66,6 +66,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil;
* Jul 18, 2013 1653 mpduff Added getSubscriptionStatusSummary.
* Aug 28, 2013 2290 mpduff Check for no subscriptions.
* Oct 2, 2013 1797 dhladky Generics
* Dec 17, 2013 2636 bgonzale Added method to get a BandwidthAllocation.
*
* </pre>
*
@ -583,4 +584,9 @@ public class HibernateBandwidthDao<T extends Time, C extends Coverage> implement
return summary;
}
@Override
public BandwidthAllocation getBandwidthAllocation(long id) {
return bandwidthAllocationDao.getById(id);
}
}

View file

@ -41,6 +41,8 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDao;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Jun 25, 2013 2106 djohnson Extracted from {@link BandwidthBucket} and {@link RetrievalPlan}.
* Dec 17, 2013 2636 bgonzale Prevent stale BandwidthAllocation updates by retrieving
* them from the dao before updating.
*
* </pre>
*
@ -137,10 +139,16 @@ public class InMemoryBandwidthBucketAllocationAssociator implements
for (BandwidthAllocation o : allocations.get(bucket.getIdentifier())) {
if (RetrievalStatus.READY.equals(o.getStatus())
&& o.getAgentType().equals(agentType)) {
allocation = o;
allocation.setStatus(RetrievalStatus.PROCESSING);
// Persist this change to the database
bandwidthDao.createOrUpdate(allocation);
allocation = bandwidthDao
.getBandwidthAllocation(o.getId());
if (allocation == null) {
// allocation was removed from persistence, sync the
// mapping
allocations.remove(o.getId(), o);
} else {
allocation.setStatus(RetrievalStatus.PROCESSING);
bandwidthDao.createOrUpdate(allocation);
}
break;
}
}

View file

@ -30,6 +30,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil;
* Oct 26, 2012 1286 djohnson Return list of unscheduled allocations.
* Jan 25, 2013 1528 djohnson Lower priority requests should not be able to unschedule higher priority requests.
* Jun 25, 2013 2106 djohnson Access bandwidth bucket contents through RetrievalPlan.
* Dec 17, 2013 2636 bgonzale When adding to buckets, call the constrained method.
* </pre>
*
* @version 1.0
@ -143,7 +144,7 @@ public class PriorityRetrievalScheduler implements IRetrievalScheduler {
if (o instanceof BandwidthAllocation) {
BandwidthAllocation obj = (BandwidthAllocation) o;
obj.setStatus(RetrievalStatus.SCHEDULED);
plan.addToBucket(key, obj);
plan.addToBucketWithSizeConstraint(key, obj);
} else {
plan.addToBucket(key, (BandwidthReservation) o);
}

View file

@ -46,6 +46,9 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil;
* Nov 16, 2013 1736 dhladky Alter size of available bandwidth by subtracting that used by registry.
* Dec 05, 2013 2545 mpduff BandwidthReservation now stored in bytes.
* Dec 13, 2013 2545 mpduff Prevent negative values in bandwidth bucket sizes.
* Dec 17, 2013 2636 bgonzale Check for removed buckets when removing BandwidthAllocations or
* BandwidthReservations. Add constrained bucket addition method.
* Added debug logging.
*
* </pre>
*
@ -407,10 +410,16 @@ public class RetrievalPlan {
return;
}
for (Long bucketId : bucketIds) {
BandwidthBucket bucket = getBucket(bucketId);
bucket.setCurrentSize(Math.max(0, bucket.getCurrentSize()
- allocation.getEstimatedSizeInBytes()));
associator.removeFromBucket(bucket, allocation);
// get bucket without checks. sometimes the
// first bucket may have been removed.
BandwidthBucket bucket = getBucketNoChecks(bucketId);
if (bucket != null) {
bucket.setCurrentSize(Math.max(
0,
bucket.getCurrentSize()
- allocation.getEstimatedSizeInBytes()));
associator.removeFromBucket(bucket, allocation);
}
}
}
}
@ -434,10 +443,14 @@ public class RetrievalPlan {
return;
}
for (Long bucketId : bucketIds) {
BandwidthBucket bucket = getBucket(bucketId);
bucket.setCurrentSize(Math.max(0, bucket.getCurrentSize()
- reservation.getSize()));
associator.removeFromBucket(bucket, reservation);
// get bucket without checks. sometimes the
// first bucket may have been removed.
BandwidthBucket bucket = getBucketNoChecks(bucketId);
if (bucket != null) {
bucket.setCurrentSize(Math.max(0,
bucket.getCurrentSize() - reservation.getSize()));
associator.removeFromBucket(bucket, reservation);
}
}
}
}
@ -508,6 +521,17 @@ public class RetrievalPlan {
return bucket;
}
/**
* Return the bucket for the specified id.
*
* @param bucketId
* the bucketId
* @return the bucket; null if not found
*/
private BandwidthBucket getBucketNoChecks(long bucketId) {
return bucketsDao.getByStartTime(bucketId, network);
}
/**
* Return the buckets in the specified window, both boundaries are
* inclusive. Buckets will be in order of their start time.
@ -524,6 +548,43 @@ public class RetrievalPlan {
return bucketsDao.getBucketsInWindow(earliestTime, latestTime, network);
}
/**
* Adds the {@link BandwidthAllocation} to the specified bucket.
*
* @param bucket
* the bucket
* @param allocation
* the allocation
* @throws NullPointerException
* if the bucket start time is invalid
*/
public void addToBucketWithSizeConstraint(BandwidthBucket bucket,
BandwidthAllocation allocation) {
long bucketStartTime = bucket.getBucketStartTime();
synchronized (bucketsLock) {
BandwidthBucket actualBucket = getBucket(bucketStartTime);
long bucketSize = actualBucket.getBucketSize();
long totalSize = actualBucket.getCurrentSize()
+ allocation.getEstimatedSizeInBytes();
// constrain size by size of bucket. Reservations will have filled
// out the rest of the allocation in subsequent buckets.
totalSize = totalSize > bucketSize ? bucketSize
: totalSize;
actualBucket.setCurrentSize(totalSize);
associator.addToBucket(actualBucket, allocation);
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
statusHandler.debug("Adding (constrained) to bucket "
+ actualBucket.getBucketStartTime() + " with size "
+ actualBucket.getBucketSize() / 1000
+ "k an Allocation "
+ allocation.getEstimatedSizeInBytes() / 1000
+ "k. Remaining in bucket "
+ actualBucket.getAvailableBandwidth() / 1000 + "k");
}
}
}
/**
* Adds the {@link BandwidthAllocation} to the specified bucket.
*
@ -543,6 +604,15 @@ public class RetrievalPlan {
actualBucket.setCurrentSize(actualBucket.getCurrentSize()
+ allocation.getEstimatedSizeInBytes());
associator.addToBucket(actualBucket, allocation);
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
statusHandler.debug("Adding to bucket "
+ actualBucket.getBucketStartTime() + " with size "
+ actualBucket.getBucketSize() / 1000
+ "k an Allocation "
+ allocation.getEstimatedSizeInBytes() / 1000
+ "k. Remaining in bucket "
+ actualBucket.getAvailableBandwidth() / 1000 + "k");
}
}
}
@ -565,6 +635,14 @@ public class RetrievalPlan {
actualBucket.setCurrentSize(actualBucket.getCurrentSize()
+ reservation.getSize());
associator.addToBucket(actualBucket, reservation);
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
statusHandler.debug("Adding to bucket "
+ actualBucket.getBucketStartTime() + " with size "
+ actualBucket.getBucketSize() / 1000
+ "k a Reservation " + reservation.getSize() / 1000
+ "k. Remaining in bucket "
+ actualBucket.getAvailableBandwidth() / 1000 + "k");
}
}
}