diff --git a/edexOsgi/com.raytheon.uf.common.datadelivery.bandwidth/src/com/raytheon/uf/common/datadelivery/bandwidth/data/BandwidthBucketDescription.java b/edexOsgi/com.raytheon.uf.common.datadelivery.bandwidth/src/com/raytheon/uf/common/datadelivery/bandwidth/data/BandwidthBucketDescription.java index 0741b0856d..670a2a7d0e 100644 --- a/edexOsgi/com.raytheon.uf.common.datadelivery.bandwidth/src/com/raytheon/uf/common/datadelivery/bandwidth/data/BandwidthBucketDescription.java +++ b/edexOsgi/com.raytheon.uf.common.datadelivery.bandwidth/src/com/raytheon/uf/common/datadelivery/bandwidth/data/BandwidthBucketDescription.java @@ -35,7 +35,8 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement; * Date Ticket# Engineer Description * ------------ ---------- ----------- -------------------------- * Sep 20, 2013 2397 bgonzale Initial creation. - * Nov 25, 2545 2545 mpduff Added bucketTimeInMinutes. + * Nov 25, 2013 2545 mpduff Added bucketTimeInMinutes. + * Dec 17, 2013 2636 bgonzale Refactored bucket fill in edex. * * * @@ -173,29 +174,4 @@ public class BandwidthBucketDescription implements this.bucketTimeMinutes = bucketTimeMinutes; } - /** - * Get any data that is leftover after filling the bucket. - * - * @return The amount of data overage - */ - public long getLeftovers() { - long leftover = 0; - - if (this.usedBytes > this.bucketSize) { - leftover = usedBytes - bucketSize; - usedBytes = bucketSize; - } - - return leftover; - } - - /** - * Add any leftovers from the previous buckets. - * - * @param leftovers - * data overage from previous buckets - */ - public void addLeftovers(long leftovers) { - this.usedBytes += leftovers; - } } diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/BandwidthGraphDataAdapter.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/BandwidthGraphDataAdapter.java index 337f0ee69d..8a9f16f080 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/BandwidthGraphDataAdapter.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/BandwidthGraphDataAdapter.java @@ -62,6 +62,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalPlan; * Sep 20, 2013 2397 bgonzale Add Map of Bucket Descriptions to BandwidthGraphData. * Nov 27, 2013 2545 mpduff Get data by network * Dec 11, 2013 2566 bgonzale handle case when there are no reservations. + * Dec 17, 2013 2636 bgonzale Refactored bucket fill in edex. * * * @@ -209,9 +210,6 @@ class BandwidthGraphDataAdapter { BandwidthBucketDescription desc = new BandwidthBucketDescription( bucket.getNetwork(), bucket.getBucketSize(), bucket.getCurrentSize(), bucket.getBucketStartTime()); - desc.addLeftovers(leftovers); - - leftovers = desc.getLeftovers(); descriptions.add(desc); } diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/BandwidthManager.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/BandwidthManager.java index ca9ba38967..56a9538ac0 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/BandwidthManager.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/BandwidthManager.java @@ -131,6 +131,7 @@ import com.raytheon.uf.edex.registry.ebxml.exception.EbxmlRegistryException; * Nov 19, 2013 2545 bgonzale changed getBandwidthGraphData to protected. * Dec 04, 2013 2566 bgonzale added method to retrieve and parse spring files for a mode. * Dec 11, 2013 2566 bgonzale fix spring resource resolution. + * Dec 17, 2013 2636 bgonzale Changed logging to differentiate the output. * * * @@ -256,10 +257,10 @@ public abstract class BandwidthManager .newArrayListWithCapacity(numberOfRetrievalTimes); for (Calendar retrievalTime : retrievalTimes) { - statusHandler.info("schedule() - Scheduling subscription [" + statusHandler.info("Scheduling subscription [" + subscription.getName() + String.format( - "] baseReferenceTime [%1$tY%1$tm%1$td%1$tH%1$tM", + "] retrievalTime [%1$tY%1$tm%1$td%1$tH%1$tM", retrievalTime) + "]"); // Add the current subscription to the ones BandwidthManager already @@ -295,7 +296,7 @@ public abstract class BandwidthManager .getBandwidthSubscriptions(dao.getProvider(), dao.getDataSetName(), retrievalTime); - statusHandler.info("schedule() - Scheduling subscription [" + statusHandler.info("Scheduling subscription [" + dao.getName() + String.format( "] baseReferenceTime [%1$tY%1$tm%1$td%1$tH%1$tM", diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/InMemoryBandwidthDao.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/InMemoryBandwidthDao.java index e1f07a9652..5fc482bb69 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/InMemoryBandwidthDao.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/InMemoryBandwidthDao.java @@ -68,6 +68,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil; * Jul 11, 2013 2106 djohnson Use BandwidthSubscription instead of Subscription. * Jul 18, 2013 1653 mpduff Implemented method. * Oct 2, 2013 1797 dhladky generics + * Dec 17, 2013 2636 bgonzale Added method to get a BandwidthAllocation. * * * @@ -668,4 +669,14 @@ class InMemoryBandwidthDao implements IBandw return null; } + @Override + public BandwidthAllocation getBandwidthAllocation(long id) { + for (BandwidthAllocation current : bandwidthAllocations) { + if (current.getId() == id) { + return current; + } + } + return null; + } + } diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/dao/BandwidthBucket.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/dao/BandwidthBucket.java index ad6d1afee7..8d60aafcff 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/dao/BandwidthBucket.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/dao/BandwidthBucket.java @@ -47,6 +47,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil; * Date Ticket# Engineer Description * ------------ ---------- ----------- -------------------------- * Jun 18, 2013 2106 djohnson Extracted from {@link RetrievalPlan}. + * Dec 17, 2013 2636 bgonzale Throw exception if attempt to overfill the bucket. * * * @@ -104,7 +105,7 @@ public class BandwidthBucket implements Comparable, } public long getAvailableBandwidth() { - return Math.max(0, bucketSize - currentSize); + return bucketSize - currentSize; } public long getBucketSize() { @@ -119,8 +120,13 @@ public class BandwidthBucket implements Comparable, return currentSize; } - public void setCurrentSize(long currentSize) { - this.currentSize = currentSize; + public void setCurrentSize(long size) { + if (size > this.bucketSize) { + throw new IllegalArgumentException("New data size, " + size + + ", is greater than available bucket size " + + this.bucketSize); + } + this.currentSize = size; } /** diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/dao/IBandwidthDao.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/dao/IBandwidthDao.java index b3426f50d9..b34cdb88bc 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/dao/IBandwidthDao.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/dao/IBandwidthDao.java @@ -51,6 +51,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalStatus; * Jun 13, 2013 2095 djohnson Implement ability to store a collection of subscriptions. * Jun 24, 2013 2106 djohnson Add more methods. * Jul 18, 2013 1653 mpduff Added getSubscriptionStatusSummary. + * Dec 17, 2013 2636 bgonzale Added method to get a BandwidthAllocation. * * * @@ -487,4 +488,11 @@ public interface IBandwidthDao { * @return the SubscriptionStatusSummary */ SubscriptionStatusSummary getSubscriptionStatusSummary(Subscription sub); + + /** + * Get the BandwidthAllocation identified by the given id. + * + * @param id + */ + BandwidthAllocation getBandwidthAllocation(long id); } \ No newline at end of file diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/hibernate/HibernateBandwidthDao.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/hibernate/HibernateBandwidthDao.java index ce4adbbce7..6638a93a0d 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/hibernate/HibernateBandwidthDao.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/hibernate/HibernateBandwidthDao.java @@ -66,6 +66,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil; * Jul 18, 2013 1653 mpduff Added getSubscriptionStatusSummary. * Aug 28, 2013 2290 mpduff Check for no subscriptions. * Oct 2, 2013 1797 dhladky Generics + * Dec 17, 2013 2636 bgonzale Added method to get a BandwidthAllocation. * * * @@ -583,4 +584,9 @@ public class HibernateBandwidthDao implement return summary; } + + @Override + public BandwidthAllocation getBandwidthAllocation(long id) { + return bandwidthAllocationDao.getById(id); + } } diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/InMemoryBandwidthBucketAllocationAssociator.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/InMemoryBandwidthBucketAllocationAssociator.java index 3c9e7707f3..27d494d938 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/InMemoryBandwidthBucketAllocationAssociator.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/InMemoryBandwidthBucketAllocationAssociator.java @@ -41,6 +41,8 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDao; * Date Ticket# Engineer Description * ------------ ---------- ----------- -------------------------- * Jun 25, 2013 2106 djohnson Extracted from {@link BandwidthBucket} and {@link RetrievalPlan}. + * Dec 17, 2013 2636 bgonzale Prevent stale BandwidthAllocation updates by retrieving + * them from the dao before updating. * * * @@ -137,10 +139,16 @@ public class InMemoryBandwidthBucketAllocationAssociator implements for (BandwidthAllocation o : allocations.get(bucket.getIdentifier())) { if (RetrievalStatus.READY.equals(o.getStatus()) && o.getAgentType().equals(agentType)) { - allocation = o; - allocation.setStatus(RetrievalStatus.PROCESSING); - // Persist this change to the database - bandwidthDao.createOrUpdate(allocation); + allocation = bandwidthDao + .getBandwidthAllocation(o.getId()); + if (allocation == null) { + // allocation was removed from persistence, sync the + // mapping + allocations.remove(o.getId(), o); + } else { + allocation.setStatus(RetrievalStatus.PROCESSING); + bandwidthDao.createOrUpdate(allocation); + } break; } } diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/PriorityRetrievalScheduler.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/PriorityRetrievalScheduler.java index 6e6e500981..ea219e34b4 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/PriorityRetrievalScheduler.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/PriorityRetrievalScheduler.java @@ -30,6 +30,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil; * Oct 26, 2012 1286 djohnson Return list of unscheduled allocations. * Jan 25, 2013 1528 djohnson Lower priority requests should not be able to unschedule higher priority requests. * Jun 25, 2013 2106 djohnson Access bandwidth bucket contents through RetrievalPlan. + * Dec 17, 2013 2636 bgonzale When adding to buckets, call the constrained method. * * * @version 1.0 @@ -143,7 +144,7 @@ public class PriorityRetrievalScheduler implements IRetrievalScheduler { if (o instanceof BandwidthAllocation) { BandwidthAllocation obj = (BandwidthAllocation) o; obj.setStatus(RetrievalStatus.SCHEDULED); - plan.addToBucket(key, obj); + plan.addToBucketWithSizeConstraint(key, obj); } else { plan.addToBucket(key, (BandwidthReservation) o); } diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/RetrievalPlan.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/RetrievalPlan.java index 7bc2b3f8bd..b4a5d16182 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/RetrievalPlan.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/RetrievalPlan.java @@ -46,6 +46,9 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil; * Nov 16, 2013 1736 dhladky Alter size of available bandwidth by subtracting that used by registry. * Dec 05, 2013 2545 mpduff BandwidthReservation now stored in bytes. * Dec 13, 2013 2545 mpduff Prevent negative values in bandwidth bucket sizes. + * Dec 17, 2013 2636 bgonzale Check for removed buckets when removing BandwidthAllocations or + * BandwidthReservations. Add constrained bucket addition method. + * Added debug logging. * * * @@ -407,10 +410,16 @@ public class RetrievalPlan { return; } for (Long bucketId : bucketIds) { - BandwidthBucket bucket = getBucket(bucketId); - bucket.setCurrentSize(Math.max(0, bucket.getCurrentSize() - - allocation.getEstimatedSizeInBytes())); - associator.removeFromBucket(bucket, allocation); + // get bucket without checks. sometimes the + // first bucket may have been removed. + BandwidthBucket bucket = getBucketNoChecks(bucketId); + if (bucket != null) { + bucket.setCurrentSize(Math.max( + 0, + bucket.getCurrentSize() + - allocation.getEstimatedSizeInBytes())); + associator.removeFromBucket(bucket, allocation); + } } } } @@ -434,10 +443,14 @@ public class RetrievalPlan { return; } for (Long bucketId : bucketIds) { - BandwidthBucket bucket = getBucket(bucketId); - bucket.setCurrentSize(Math.max(0, bucket.getCurrentSize() - - reservation.getSize())); - associator.removeFromBucket(bucket, reservation); + // get bucket without checks. sometimes the + // first bucket may have been removed. + BandwidthBucket bucket = getBucketNoChecks(bucketId); + if (bucket != null) { + bucket.setCurrentSize(Math.max(0, + bucket.getCurrentSize() - reservation.getSize())); + associator.removeFromBucket(bucket, reservation); + } } } } @@ -508,6 +521,17 @@ public class RetrievalPlan { return bucket; } + /** + * Return the bucket for the specified id. + * + * @param bucketId + * the bucketId + * @return the bucket; null if not found + */ + private BandwidthBucket getBucketNoChecks(long bucketId) { + return bucketsDao.getByStartTime(bucketId, network); + } + /** * Return the buckets in the specified window, both boundaries are * inclusive. Buckets will be in order of their start time. @@ -524,6 +548,43 @@ public class RetrievalPlan { return bucketsDao.getBucketsInWindow(earliestTime, latestTime, network); } + /** + * Adds the {@link BandwidthAllocation} to the specified bucket. + * + * @param bucket + * the bucket + * @param allocation + * the allocation + * @throws NullPointerException + * if the bucket start time is invalid + */ + public void addToBucketWithSizeConstraint(BandwidthBucket bucket, + BandwidthAllocation allocation) { + long bucketStartTime = bucket.getBucketStartTime(); + + synchronized (bucketsLock) { + BandwidthBucket actualBucket = getBucket(bucketStartTime); + long bucketSize = actualBucket.getBucketSize(); + long totalSize = actualBucket.getCurrentSize() + + allocation.getEstimatedSizeInBytes(); + // constrain size by size of bucket. Reservations will have filled + // out the rest of the allocation in subsequent buckets. + totalSize = totalSize > bucketSize ? bucketSize + : totalSize; + actualBucket.setCurrentSize(totalSize); + associator.addToBucket(actualBucket, allocation); + if (statusHandler.isPriorityEnabled(Priority.DEBUG)) { + statusHandler.debug("Adding (constrained) to bucket " + + actualBucket.getBucketStartTime() + " with size " + + actualBucket.getBucketSize() / 1000 + + "k an Allocation " + + allocation.getEstimatedSizeInBytes() / 1000 + + "k. Remaining in bucket " + + actualBucket.getAvailableBandwidth() / 1000 + "k"); + } + } + } + /** * Adds the {@link BandwidthAllocation} to the specified bucket. * @@ -543,6 +604,15 @@ public class RetrievalPlan { actualBucket.setCurrentSize(actualBucket.getCurrentSize() + allocation.getEstimatedSizeInBytes()); associator.addToBucket(actualBucket, allocation); + if (statusHandler.isPriorityEnabled(Priority.DEBUG)) { + statusHandler.debug("Adding to bucket " + + actualBucket.getBucketStartTime() + " with size " + + actualBucket.getBucketSize() / 1000 + + "k an Allocation " + + allocation.getEstimatedSizeInBytes() / 1000 + + "k. Remaining in bucket " + + actualBucket.getAvailableBandwidth() / 1000 + "k"); + } } } @@ -565,6 +635,14 @@ public class RetrievalPlan { actualBucket.setCurrentSize(actualBucket.getCurrentSize() + reservation.getSize()); associator.addToBucket(actualBucket, reservation); + if (statusHandler.isPriorityEnabled(Priority.DEBUG)) { + statusHandler.debug("Adding to bucket " + + actualBucket.getBucketStartTime() + " with size " + + actualBucket.getBucketSize() / 1000 + + "k a Reservation " + reservation.getSize() / 1000 + + "k. Remaining in bucket " + + actualBucket.getAvailableBandwidth() / 1000 + "k"); + } } }