diff --git a/edexOsgi/com.raytheon.uf.common.datadelivery.registry/src/com/raytheon/uf/common/datadelivery/registry/RecurringSubscription.java b/edexOsgi/com.raytheon.uf.common.datadelivery.registry/src/com/raytheon/uf/common/datadelivery/registry/RecurringSubscription.java index 7e312e1d3e..eb38210751 100644 --- a/edexOsgi/com.raytheon.uf.common.datadelivery.registry/src/com/raytheon/uf/common/datadelivery/registry/RecurringSubscription.java +++ b/edexOsgi/com.raytheon.uf.common.datadelivery.registry/src/com/raytheon/uf/common/datadelivery/registry/RecurringSubscription.java @@ -69,6 +69,7 @@ import com.raytheon.uf.common.time.util.TimeUtil; * Jan 24, 2014 2709 bgonzale Fix setting of active period end. Change active period checks * to check day of year. removed now unused active period methods. * Jan 28, 2014 2636 mpduff Changed to use GMT calendar. + * Feb 12, 2014 2636 mpduff Return new instance of calculated start and end. * * * @@ -500,7 +501,7 @@ public abstract class RecurringSubscription return TimeUtil.newGmtCalendar(subscriptionStart); } - return startConstraint; + return TimeUtil.newGmtCalendar(startConstraint.getTime()); } @Override @@ -516,7 +517,7 @@ public abstract class RecurringSubscription return TimeUtil.newGmtCalendar(subscriptionEnd); } - return endConstraint; + return TimeUtil.newGmtCalendar(endConstraint.getTime()); } /** @@ -1072,6 +1073,7 @@ public abstract class RecurringSubscription /** * @return the subscriptionState */ + @Override public SubscriptionState getSubscriptionState() { return subscriptionState; } @@ -1080,6 +1082,7 @@ public abstract class RecurringSubscription * @param subscriptionState * the subscriptionState to set */ + @Override public void setSubscriptionState(SubscriptionState subscriptionState) { this.subscriptionState = subscriptionState; } diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/res/spring/bandwidth-datadelivery.xml b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/res/spring/bandwidth-datadelivery.xml index 96c2ae68b2..316759038f 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/res/spring/bandwidth-datadelivery.xml +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/res/spring/bandwidth-datadelivery.xml @@ -29,9 +29,6 @@ - - @@ -43,7 +40,7 @@ - + * @@ -180,10 +181,6 @@ public abstract class BandwidthManager @VisibleForTesting final RetrievalManager retrievalManager; - /** Map of Network->previous retrieval plan end time */ - private final Map previousRetrievalEndMap = new HashMap( - 1); - public BandwidthManager(IBandwidthDbInit dbInit, IBandwidthDao bandwidthDao, RetrievalManager retrievalManager, @@ -192,11 +189,6 @@ public abstract class BandwidthManager this.bandwidthDao = bandwidthDao; this.retrievalManager = retrievalManager; this.bandwidthDaoUtil = bandwidthDaoUtil; - for (Network network : retrievalManager.getRetrievalPlans().keySet()) { - RetrievalPlan plan = retrievalManager.getRetrievalPlans().get( - network); - previousRetrievalEndMap.put(network, plan.getPlanEnd()); - } } /** @@ -228,9 +220,9 @@ public abstract class BandwidthManager } private List schedule(Subscription subscription, - SortedSet cycles, Calendar start, Calendar end) { + SortedSet cycles) { SortedSet retrievalTimes = bandwidthDaoUtil - .getRetrievalTimes(subscription, cycles, start, end); + .getRetrievalTimes(subscription, cycles); return scheduleSubscriptionForRetrievalTimes(subscription, retrievalTimes); @@ -247,9 +239,9 @@ public abstract class BandwidthManager * @return the list of unscheduled subscriptions */ private List schedule(Subscription subscription, - int retrievalInterval, Calendar start, Calendar end) { + int retrievalInterval) { SortedSet retrievalTimes = bandwidthDaoUtil - .getRetrievalTimes(subscription, retrievalInterval, start, end); + .getRetrievalTimes(subscription, retrievalInterval); return scheduleSubscriptionForRetrievalTimes(subscription, retrievalTimes); @@ -278,13 +270,16 @@ public abstract class BandwidthManager final int numberOfRetrievalTimes = retrievalTimes.size(); List newSubscriptions = Lists .newArrayListWithCapacity(numberOfRetrievalTimes); + statusHandler.info("Scheduling subscription " + subscription.getName()); for (Calendar retrievalTime : retrievalTimes) { - statusHandler.info("Scheduling subscription [" - + subscription.getName() - + String.format( - "] retrievalTime [%1$tY%1$tm%1$td%1$tH%1$tM", - retrievalTime) + "]"); + if (statusHandler.isPriorityEnabled(Priority.DEBUG)) { + statusHandler.info("Scheduling subscription [" + + subscription.getName() + + String.format( + "] retrievalTime [%1$tY%1$tm%1$td%1$tH%1$tM", + retrievalTime) + "]"); + } // Add the current subscription to the ones BandwidthManager already // knows about. @@ -322,7 +317,7 @@ public abstract class BandwidthManager statusHandler.info("Scheduling subscription [" + dao.getName() + String.format( - "] baseReferenceTime [%1$tY%1$tm%1$td%1$tH%1$tM", + "] baseReferenceTime %1$tY%1$tm%1$td%1$tH%1$tM", retrievalTime)); return aggregate(new BandwidthSubscriptionContainer(subscription, @@ -344,24 +339,27 @@ public abstract class BandwidthManager List retrievals = getAggregator().aggregate( bandwidthSubscriptions); timer.lap("aggregator"); + if (CollectionUtil.isNullOrEmpty(retrievals)) { + return new ArrayList(0); + } - // Create a separate list of BandwidthReservations to schedule - // as the aggregation process may return all subsumed - // SubscriptionRetrievals - // for the specified Subscription. + /* + * Create a separate list of BandwidthReservations to schedule as the + * aggregation process may return all subsumedSubscriptionRetrievalsfor + * the specified Subscription. + */ List reservations = new ArrayList(); - for (SubscriptionRetrieval retrieval : retrievals) { - - // New RetrievalRequests will be marked as "PROCESSING" - // we need to make new BandwidthReservations for these - // SubscriptionRetrievals. - - // TODO: How to process "rescheduled" RetrievalRequests - // in the case where subscription aggregation has determined - // that an existing subscription has now be subsumed or - // altered to accommodate a new super set of subscriptions... - // + /* + * New RetrievalRequests will be marked as "PROCESSING" we need to + * make new BandwidthReservations for these SubscriptionRetrievals. + */ + /* + * TODO: How to process "rescheduled" RetrievalRequests in the case + * where subscription aggregation has determined that an existing + * subscription has now be subsumed or altered to accommodate a new + * super set of subscriptions... + */ if ((retrieval.getStatus().equals(RetrievalStatus.RESCHEDULE) || retrieval .getStatus().equals(RetrievalStatus.PROCESSING)) && !retrieval.isSubsumed()) { @@ -370,23 +368,13 @@ public abstract class BandwidthManager .getBandwidthSubscription(); Calendar retrievalTime = bandwidthSubscription .getBaseReferenceTime(); - Calendar startTime = TimeUtil.newCalendar(retrievalTime); + Calendar startTime = TimeUtil.newGmtCalendar(retrievalTime + .getTime()); - int delayMinutes = retrieval.getDataSetAvailablityDelay(); int maxLatency = retrieval.getSubscriptionLatency(); - - if (statusHandler.isPriorityEnabled(Priority.DEBUG)) { - statusHandler.debug("Adding availability minutes of [" - + delayMinutes - + "] to retrieval start time of " - + String.format("[%1$tY%1$tm%1$td%1$tH%1$tM]", - retrievalTime)); - } - - startTime.add(Calendar.MINUTE, delayMinutes); retrieval.setStartTime(startTime); - Calendar endTime = TimeUtil.newCalendar(); + Calendar endTime = TimeUtil.newGmtCalendar(); endTime.setTimeInMillis(startTime.getTimeInMillis()); if (statusHandler.isPriorityEnabled(Priority.DEBUG)) { @@ -459,94 +447,81 @@ public abstract class BandwidthManager return unscheduled; } - /** - * Schedule a single subscription. - * - * @param sub - * The subscription to schedule - * @param fullSchedule - * true to schedule for the full retrieval plan, false to - * schedule from the last time - * @return List of BandwidthAllocations that sould be unscheduled. - */ - public List schedule(Subscription sub, - boolean fullSchedule) { - Map>> map = new HashMap>>( - 1, 1); - List> list = new ArrayList>(1); - list.add(sub); - map.put(sub.getRoute(), list); - - return schedule(map, fullSchedule); - } - - /** - * {@inheritDoc} - */ @Override - public List schedule( - Map>> subMap, boolean fullSchedule) { - List unscheduled = new ArrayList(); + public List schedule(Subscription subscription) { + List unscheduled = null; - for (Network network : subMap.keySet()) { - RetrievalPlan retrievalPlan = retrievalManager.getPlan(network); - - /* - * Determine scheduling window, true for whole plan, false to run - * from last time - */ - Calendar start = retrievalPlan.getPlanStart(); - Calendar end = retrievalPlan.getPlanEnd(); - if (!fullSchedule) { - if (!end.equals(this.previousRetrievalEndMap.get(network))) { - start = this.previousRetrievalEndMap.get(network); - } else { - return unscheduled; - } - - } - - if (statusHandler.isPriorityEnabled(Priority.DEBUG)) { - statusHandler.debug("Check for scheduling window: " - + start.getTime() + " - " + end.getTime()); - } - if (end.getTimeInMillis() - start.getTimeInMillis() >= retrievalPlan - .getBucketMinutes() * 2 * TimeUtil.MILLIS_PER_MINUTE) { - if (statusHandler.isPriorityEnabled(Priority.DEBUG)) { - statusHandler.debug("Scheduling for window: " - + start.getTime() + " - " + end.getTime()); - } - this.previousRetrievalEndMap.put(network, - TimeUtil.newGmtCalendar(end.getTime())); - - for (Subscription subscription : subMap.get(network)) { - statusHandler.info("Scheduling subscription" - + subscription.getName()); - List unscheduledForThisSub = new ArrayList(); - final DataType dataSetType = subscription.getDataSetType(); - switch (dataSetType) { - case GRID: - unscheduledForThisSub = handleGridded(subscription, - start, end); - break; - case POINT: - unscheduledForThisSub = handlePoint(subscription, - start, end); - break; - default: - throw new IllegalArgumentException( - "The BandwidthManager doesn't know how to treat subscriptions with data type [" - + dataSetType + "]!"); - } - unscheduleSubscriptionsForAllocations(unscheduledForThisSub); - unscheduled.addAll(unscheduledForThisSub); - } - } + final DataType dataSetType = subscription.getDataSetType(); + switch (dataSetType) { + case GRID: + unscheduled = handleGridded(subscription); + break; + case POINT: + unscheduled = handlePoint(subscription); + break; + default: + throw new IllegalArgumentException( + "The BandwidthManager doesn't know how to treat subscriptions with data type [" + + dataSetType + "]!"); } + unscheduleSubscriptionsForAllocations(unscheduled); + return unscheduled; } + /** + * Update the retrieval plan for this subscription. + * + * @param subscription + * The subscription that needs its scheduling updated + */ + private void updateSchedule(Subscription subscription) { + final DataType dataSetType = subscription.getDataSetType(); + switch (dataSetType) { + case GRID: + updateGriddedSchedule(subscription); + break; + case POINT: + updatePointSchedule(subscription); + break; + default: + throw new IllegalArgumentException( + "The BandwidthManager doesn't know how to treat subscriptions with data type [" + + dataSetType + "]!"); + } + } + + /* + * (non-Javadoc) + * + * @see com.raytheon.uf.edex.datadelivery.bandwidth.IBandwidthManager# + * updateSchedule(com.raytheon.uf.common.datadelivery.registry.Network) + */ + @Override + public int updateSchedule(Network network) { + List subsToSchedule = getSubscriptionsToSchedule(network); + if (CollectionUtil.isNullOrEmpty(subsToSchedule)) { + return 0; + } + + for (Subscription subscription : subsToSchedule) { + updateSchedule(subscription); + } + + return subsToSchedule.size(); + } + + /** + * Get the subscriptions to schedule for the given network. + * + * @param network + * The network + * @return List of subscriptions for the network + */ + protected abstract List getSubscriptionsToSchedule( + Network network); + /** * Unschedules all subscriptions the allocations are associated to. * @@ -667,7 +642,7 @@ public abstract class BandwidthManager if (bandwidthSubscriptions.isEmpty() && ((RecurringSubscription) subscription).shouldSchedule() && !subscription.isUnscheduled()) { - return schedule(subscription, true); + return schedule(subscription); } else if (subscription.getStatus() == SubscriptionStatus.DEACTIVATED || subscription.isUnscheduled()) { // See if the subscription was deactivated or unscheduled.. @@ -677,12 +652,70 @@ public abstract class BandwidthManager } else { // Normal update, unschedule old allocations and create new ones List unscheduled = remove(bandwidthSubscriptions); - unscheduled.addAll(schedule(subscription, true)); + unscheduled.addAll(schedule(subscription)); return unscheduled; } } } + /** + * Update this point subscription's schedule. + * + * @param Subscription + * The subscription that needs its schedule updated + */ + private void updatePointSchedule(Subscription sub) { + SortedSet retrievalTimes = bandwidthDaoUtil + .getRetrievalTimes(sub, + ((PointTime) sub.getTime()).getInterval()); + + scheduleUpdates(sub, retrievalTimes); + } + + /** + * Update this grid subscription's schedule. + * + * @param Subscription + * The subscription that needs its schedule updated + */ + private void updateGriddedSchedule(Subscription sub) { + final List cycles = ((GriddedTime) sub.getTime()) + .getCycleTimes(); + + SortedSet retrievalTimes = bandwidthDaoUtil + .getRetrievalTimes(sub, Sets.newTreeSet(cycles)); + scheduleUpdates(sub, retrievalTimes); + } + + /** + * Schedule retrievals for this subscription for the provided retrieval + * times. + * + * @param sub + * The subscription + * @param retrievalTimes + * The retrieval times + */ + private void scheduleUpdates(Subscription sub, + SortedSet retrievalTimes) { + List currentBandwidthSubscriptions = bandwidthDao + .getBandwidthSubscription(sub); + + // Remove any times already associated with a retrieval allocation + for (BandwidthSubscription bs : currentBandwidthSubscriptions) { + retrievalTimes.remove(bs.getBaseReferenceTime()); + } + + if (statusHandler.isPriorityEnabled(Priority.DEBUG)) { + statusHandler.info("Scheduling " + sub.getName()); + for (Calendar c : retrievalTimes) { + statusHandler.info("Scheduling for " + c.getTime()); + } + } + + scheduleSubscriptionForRetrievalTimes(sub, retrievalTimes); + } + /** * Handle scheduling point data type subscriptions. * @@ -691,9 +724,9 @@ public abstract class BandwidthManager * @return the list of unscheduled subscriptions */ private List handlePoint( - Subscription subscription, Calendar start, Calendar end) { + Subscription subscription) { List unscheduled = schedule(subscription, - ((PointTime) subscription.getTime()).getInterval(), start, end); + ((PointTime) subscription.getTime()).getInterval()); unscheduled.addAll(getMostRecent(subscription, false)); return unscheduled; } @@ -706,7 +739,7 @@ public abstract class BandwidthManager * @return the list of unscheduled subscriptions */ private List handleGridded( - Subscription subscription, Calendar start, Calendar end) { + Subscription subscription) { final List cycles = ((GriddedTime) subscription.getTime()) .getCycleTimes(); final boolean subscribedToCycles = !CollectionUtil @@ -716,8 +749,7 @@ public abstract class BandwidthManager // expected times List unscheduled = Collections.emptyList(); if (subscribedToCycles) { - unscheduled = schedule(subscription, Sets.newTreeSet(cycles), - start, end); + unscheduled = schedule(subscription, Sets.newTreeSet(cycles)); } return unscheduled; @@ -1196,10 +1228,6 @@ public abstract class BandwidthManager try { proposedBwManager = startProposedBandwidthManager(copyOfCurrentMap); - IBandwidthRequest request = new IBandwidthRequest(); - request.setRequestType(RequestType.SCHEDULE_SUBSCRIPTION); - request.setSubscriptions(subscriptions); - unscheduled = proposedBwManager .scheduleSubscriptions(subscriptions); } finally { @@ -1450,7 +1478,7 @@ public abstract class BandwidthManager // Now for each subscription, attempt to schedule bandwidth for (Subscription subscription : actualSubscriptions) { - unscheduled.addAll(this.schedule(subscription, true)); + unscheduled.addAll(this.schedule(subscription)); } } else { // Otherwise we can just copy the entire state of the current system diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/EdexBandwidthManager.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/EdexBandwidthManager.java index 19a32d5a7f..4874b41c46 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/EdexBandwidthManager.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/EdexBandwidthManager.java @@ -23,7 +23,6 @@ import static com.raytheon.uf.common.registry.ebxml.encoder.RegistryEncoders.Typ import java.text.ParseException; import java.util.ArrayList; -import java.util.Calendar; import java.util.Date; import java.util.HashSet; import java.util.List; @@ -69,6 +68,7 @@ import com.raytheon.uf.common.registry.handler.IRegistryObjectHandler; import com.raytheon.uf.common.registry.handler.RegistryHandlerException; import com.raytheon.uf.common.serialization.SerializationException; import com.raytheon.uf.common.status.UFStatus.Priority; +import com.raytheon.uf.common.time.util.IPerformanceTimer; import com.raytheon.uf.common.time.util.TimeUtil; import com.raytheon.uf.common.util.CollectionUtil; import com.raytheon.uf.common.util.FileUtil; @@ -125,7 +125,7 @@ import com.raytheon.uf.edex.datadelivery.util.DataDeliveryIdUtil; * bandwidth manager to perform the scheduling initialization * because of efficiency. * Feb 11, 2014 2771 bgonzale Use Data Delivery ID instead of Site. - * + * Feb 10, 2014 2636 mpduff Pass Network map to be scheduled. * * * @author djohnson @@ -189,7 +189,7 @@ public abstract class EdexBandwidthManager this.findSubscriptionsStrategy = findSubscriptionsStrategy; // schedule maintenance tasks - scheduler = Executors.newScheduledThreadPool(1); + scheduler = Executors.newSingleThreadScheduledExecutor(); } @Override @@ -218,9 +218,9 @@ public abstract class EdexBandwidthManager subsToSchedule.add(s); } } - unscheduledNames - .addAll(scheduleSubscriptions(subsToSchedule)); - } else { + + unscheduled.addAll(scheduleSubscriptions(subsToSchedule)); + unscheduledNames.addAll(unscheduled); } } @@ -238,13 +238,12 @@ public abstract class EdexBandwidthManager // scheduler.setRemoveOnCancelPolicy(true); scheduler.scheduleAtFixedRate(watchForConfigFileChanges, 1, 1, TimeUnit.MINUTES); - scheduler.scheduleAtFixedRate(new MaintenanceTask(), 5, 5, + scheduler.scheduleAtFixedRate(new MaintenanceTask(), 30, 30, TimeUnit.MINUTES); } return unscheduledNames; } - /** * {@inheritDoc} */ @@ -808,42 +807,74 @@ public abstract class EdexBandwidthManager } } + /* + * (non-Javadoc) + * + * @see com.raytheon.uf.edex.datadelivery.bandwidth.BandwidthManager# + * getSubscriptionsToSchedule + * (com.raytheon.uf.common.datadelivery.registry.Network) + */ + @Override + protected List getSubscriptionsToSchedule(Network network) { + List subList = new ArrayList(0); + try { + Map> activeSubs = findSubscriptionsStrategy + .findSubscriptionsToSchedule(); + if (activeSubs.get(network) != null) { + subList = activeSubs.get(network); + } + } catch (Exception e) { + statusHandler.handle(Priority.PROBLEM, + "Error retrieving subscriptions.", e); + } + + return subList; + } + /** * Private inner work thread used to keep the RetrievalPlans up to date. */ private class MaintenanceTask implements Runnable { @Override public void run() { + IPerformanceTimer timer = TimeUtil.getPerformanceTimer(); + timer.start(); statusHandler.info("MaintenanceTask starting..."); + for (RetrievalPlan plan : retrievalManager.getRetrievalPlans() .values()) { + if (statusHandler.isPriorityEnabled(Priority.DEBUG)) { + statusHandler.info("MaintenanceTask: " + plan.getNetwork()); + statusHandler.info("MaintenanceTask: planStart: " + + plan.getPlanStart().getTime()); + statusHandler.info("MaintenanceTask: planEnd: " + + plan.getPlanEnd().getTime()); + } plan.resize(); - Calendar newEnd = plan.getPlanEnd(); - + if (statusHandler.isPriorityEnabled(Priority.DEBUG)) { + statusHandler.info("MaintenanceTask: resized planStart: " + + plan.getPlanStart().getTime()); + statusHandler.info("MaintenanceTask: resized planEnd: " + + plan.getPlanEnd().getTime()); + statusHandler.info("MaintenanceTask: Update schedule"); + } // Find DEFERRED Allocations and load them into the plan... List deferred = bandwidthDao.getDeferred( - plan.getNetwork(), newEnd); + plan.getNetwork(), plan.getPlanEnd()); if (!deferred.isEmpty()) { retrievalManager.schedule(deferred); } } - try { - Map> activeSubs = findSubscriptionsStrategy - .findSubscriptionsToSchedule(); - - for (Network network : activeSubs.keySet()) { - for (Subscription sub : activeSubs.get(network)) { - statusHandler.debug("MaintenanceTask scheduling for " - + sub.getName()); - schedule(sub, false); - } - } - } catch (Exception e) { - statusHandler.handle(Priority.PROBLEM, - "Error requesting subscriptions from registry.", e); + int numSubsProcessed = 0; + for (RetrievalPlan plan : retrievalManager.getRetrievalPlans() + .values()) { + numSubsProcessed += updateSchedule(plan.getNetwork()); } - statusHandler.info("MaintenanceTask complete"); + timer.stop(); + statusHandler.info("MaintenanceTask complete: " + + timer.getElapsed() + " - " + numSubsProcessed + + " Subscriptions processed."); } } } diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/IBandwidthManager.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/IBandwidthManager.java index 4260d1bf9a..7c5a7360b5 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/IBandwidthManager.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/IBandwidthManager.java @@ -49,6 +49,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.interfaces.ISubscriptionAggre * Jan 08, 2014 2615 bgonzale Added scheduleAdoc method. * Jan 29, 2014 2636 mpduff Scheduling refactor. * Feb 06, 2014 2636 bgonzale added initializeScheduling method. + * Fev 12, 2014 2636 mpduff Add updateSchedule method. * * * @author djohnson @@ -64,9 +65,17 @@ public interface IBandwidthManager { * @return A map of bandwidth allocations that are not scheduled by * subscription name */ - List schedule( - Map>> subscriptions, - boolean fullSchedule); + List schedule(Subscription subscription); + + /** + * Update the retrieval plan scheduling. + * + * @param Network + * the network to update + * + * @return number of subscriptions processed + */ + int updateSchedule(Network network); /** * Schedule AdhocSubscription to run as soon as the RetrievalPlan will diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/InMemoryBandwidthManager.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/InMemoryBandwidthManager.java index a017e9c502..3e42fc482d 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/InMemoryBandwidthManager.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/InMemoryBandwidthManager.java @@ -58,13 +58,15 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthDaoUtil; * Oct 2, 2013 1797 dhladky Generics * Dec 04, 2013 2566 bgonzale use bandwidthmanager method to retrieve spring files. * Feb 06, 2014 2636 bgonzale added initializeScheduling method. + * Feb 12, 2014 2636 mpduff Override getSubscriptionsToSchedule * * * * @author djohnson * @version 1.0 */ -class InMemoryBandwidthManager extends BandwidthManager { +class InMemoryBandwidthManager extends + BandwidthManager { private static final IUFStatusHandler statusHandler = UFStatus .getHandler(InMemoryBandwidthManager.class); @@ -122,8 +124,8 @@ class InMemoryBandwidthManager extends Bandw * @param bandwidthDaoUtil */ public InMemoryBandwidthManager(IBandwidthDbInit dbInit, - IBandwidthDao bandwidthDao, RetrievalManager retrievalManager, - BandwidthDaoUtil bandwidthDaoUtil) { + IBandwidthDao bandwidthDao, + RetrievalManager retrievalManager, BandwidthDaoUtil bandwidthDaoUtil) { super(dbInit, bandwidthDao, retrievalManager, bandwidthDaoUtil); } @@ -140,7 +142,7 @@ class InMemoryBandwidthManager extends Bandw */ @Override protected ProposeScheduleResponse proposeScheduleSbnSubscription( - List> subscriptions) throws Exception { + List> subscriptions) throws Exception { return proposeScheduleSubscriptions(subscriptions); } @@ -149,7 +151,8 @@ class InMemoryBandwidthManager extends Bandw */ @Override protected Set scheduleSbnSubscriptions( - List> subscriptions) throws SerializationException { + List> subscriptions) + throws SerializationException { return scheduleSubscriptions(subscriptions); } @@ -177,4 +180,9 @@ class InMemoryBandwidthManager extends Bandw return new ArrayList(0); } + @Override + protected List getSubscriptionsToSchedule(Network network) { + // Nothing to do for in-memory version + return new ArrayList(0); + } } diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/hibernate/BandwidthBucketDao.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/hibernate/BandwidthBucketDao.java index 7a8570acbd..010e89f65c 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/hibernate/BandwidthBucketDao.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/hibernate/BandwidthBucketDao.java @@ -40,6 +40,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthBucketDao; * ------------ ---------- ----------- -------------------------- * Jun 25, 2013 2106 djohnson Initial creation * Dec 3, 2013 1736 dhladky Bandwidth bucket size attenuation. + * Feb 11, 2013 2636 mpduff Changed GET_WHERE_START_TIME_IS_BETWEEN_INCLUSIVE query. * * * @@ -63,7 +64,7 @@ public class BandwidthBucketDao extends private static final String GET_BY_LATEST_START_TIME = "from BandwidthBucket bb where bb.network = :network and bb.bucketStartTime = " + "(select max(bucketStartTime) from BandwidthBucket bb where bb.network = :network)"; - private static final String GET_WHERE_START_TIME_IS_BETWEEN_INCLUSIVE = "from BandwidthBucket bb where bb.network = :network and bb.bucketStartTime between :earliestTime and :latestTime"; + private static final String GET_WHERE_START_TIME_IS_BETWEEN_INCLUSIVE = "from BandwidthBucket bb where bb.network = :network and bb.bucketStartTime >= :earliestTime and bb.bucketEndTime <= :latestTime"; /** * {@inheritDoc} @@ -155,14 +156,13 @@ public class BandwidthBucketDao extends * {@inheritDoc} */ @Override - public BandwidthBucket getBucketContainingTime(long millis, - Network network) { + public BandwidthBucket getBucketContainingTime(long millis, Network network) { List buckets = getWhereStartTimeIsLessThanOrEqualTo( millis, network); // last bucket. if (!buckets.isEmpty()) { - return buckets.get(buckets.size() -1); + return buckets.get(buckets.size() - 1); } else { return null; } diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/hibernate/HibernateBandwidthInitializer.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/hibernate/HibernateBandwidthInitializer.java index 0c3892f1bb..c0c8d6fb46 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/hibernate/HibernateBandwidthInitializer.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/hibernate/HibernateBandwidthInitializer.java @@ -8,6 +8,7 @@ import com.raytheon.uf.common.datadelivery.registry.Subscription; import com.raytheon.uf.common.status.IUFStatusHandler; import com.raytheon.uf.common.status.UFStatus; import com.raytheon.uf.common.status.UFStatus.Priority; +import com.raytheon.uf.common.util.StringUtil; import com.raytheon.uf.edex.datadelivery.bandwidth.IBandwidthManager; import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDbInit; import com.raytheon.uf.edex.datadelivery.bandwidth.interfaces.BandwidthInitializer; @@ -37,6 +38,7 @@ import com.raytheon.uf.edex.datadelivery.util.DataDeliveryIdUtil; * Jan 29, 2014 2636 mpduff Scheduling refactor. * Feb 06, 2014 2636 bgonzale Use scheduling initialization method after registry init. * Feb 11, 2014 2771 bgonzale Use Data Delivery ID instead of Site. + * Feb 14, 2014 2636 mpduff Clean up logging * * * @author djohnson @@ -103,13 +105,15 @@ public class HibernateBandwidthInitializer implements BandwidthInitializer { Map> subMap = findSubscriptionsStrategy .findSubscriptionsToSchedule(); - List unscheduled = instance - .initializeScheduling(subMap); + List unscheduled = instance.initializeScheduling(subMap); - for (String subscription : unscheduled) { - statusHandler.handle(Priority.PROBLEM, - "The following subscription was not initially scheduled: " - + subscription); + if (!unscheduled.isEmpty()) { + StringBuilder sb = new StringBuilder("The following subscriptions could not be scheduled at startup: "); + sb.append(StringUtil.NEWLINE); + for (String subscription : unscheduled) { + sb.append(subscription).append(" "); + } + statusHandler.handle(Priority.INFO, sb.toString()); } } catch (Exception e) { statusHandler.error( diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/registry/RegistryBandwidthService.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/registry/RegistryBandwidthService.java index 5e18c1915a..660500dc0d 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/registry/RegistryBandwidthService.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/registry/RegistryBandwidthService.java @@ -1,4 +1,5 @@ package com.raytheon.uf.edex.datadelivery.bandwidth.registry; + /** * This software was developed and / or modified by Raytheon Company, * pursuant to Contract DG133W-05-CQ-1067 with the US Government. @@ -28,6 +29,7 @@ import com.raytheon.uf.common.time.util.TimeUtil; import com.raytheon.uf.edex.database.DataAccessLayerException; import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthBucket; import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthBucketDao; + /** * Registry Bandwidth Service. * @@ -38,6 +40,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthBucketDao; * Date Ticket# Engineer Description * ------------ ---------- ----------- -------------------------- * Nov 16, 2013 1736 dhladky Initial creation + * Feb 14, 2014 2636 mpduff Logging cleanup. * * * @@ -45,60 +48,60 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthBucketDao; * @version 1.0 */ public class RegistryBandwidthService { - + private static final IUFStatusHandler statusHandler = UFStatus .getHandler(RegistryBandwidthService.class); - + public static final int BYTES_PER_KILOBYTE = 1024; - + private Network network; - + private IBandwidthBucketDao bucketDao; - + private int bucketSize; - + public RegistryBandwidthService() { - + } - + /** * Construct an instance + * * @param bucketDao * @param network */ - public RegistryBandwidthService(IBandwidthBucketDao bucketDao, Network network, int bucketSize) { + public RegistryBandwidthService(IBandwidthBucketDao bucketDao, + Network network, int bucketSize) { this.bucketDao = bucketDao; this.network = network; this.bucketSize = bucketSize; } - + /** * Gives a time averaged bandwidth for the current time. + * * @return */ public Integer getCurrentRegistryBandwidth() { - + if (network == Network.OPSNET) { - + RegistryBandwidthRecord rbr = getCurrentRegistryBandwidthRecord(); if (rbr != null) { // convert to kb per/second return convertBytesToKilobytes(rbr.getBytes()); - } else { - statusHandler - .handle(Priority.WARN, - "No active registry bandwidth information for current time."); } - } - + } + return 0; - + } - + /** - * Gives the current full record. Which is the previous record time wise + * Gives the current full record. Which is the previous record time wise * because the query is back one full bucketSize in millis. + * * @return */ public RegistryBandwidthRecord getCurrentRegistryBandwidthRecord() { @@ -126,7 +129,7 @@ public class RegistryBandwidthService { return rbr; } - + /** * Gives a time averaged bandwidth utilization for the registry, time passed * in. @@ -149,13 +152,14 @@ public class RegistryBandwidthService { // try current return getCurrentRegistryBandwidth(); } - } - + } + return 0; } - + /** * Retrieve a registry bandwidth record + * * @param cal * @return */ @@ -167,20 +171,21 @@ public class RegistryBandwidthService { if (timePeriodKey != null) { try { - long startMillis = timePeriodKey - bucketSize/2; - long endMillis = timePeriodKey + bucketSize/2; + long startMillis = timePeriodKey - bucketSize / 2; + long endMillis = timePeriodKey + bucketSize / 2; rbr = rbd.queryByTimeRange(startMillis, endMillis); } catch (DataAccessLayerException dale) { statusHandler.handle(Priority.PROBLEM, "Could not lookup Registry Bandwidth Record! ", dale); } } - + return rbr; } - + /** * Retrieve a registry bandwidth record + * * @param cal * @return */ @@ -188,17 +193,18 @@ public class RegistryBandwidthService { Calendar cal = TimeUtil.newGmtCalendar(); cal.setTimeInMillis(millis); - + return getRegistryBandwidthRecord(cal); } - + /** * Get time period key for some other determined time + * * @param cal * @return */ public Long getTimePeriodKey(Calendar cal) { - + long millis = cal.getTimeInMillis(); BandwidthBucket bucket = bucketDao.getBucketContainingTime(millis, network); @@ -210,18 +216,20 @@ public class RegistryBandwidthService { // in the off chance a bucket doesn't exist anywhere near this time. return null; } - + /** * conversion + * * @param bytes * @return */ public static int convertBytesToKilobytes(int bytes) { return bytes / BYTES_PER_KILOBYTE; } - + /** * Add or update the record + * * @param rbr */ public void addorUpdateRecord(RegistryBandwidthRecord rbr) { @@ -239,6 +247,7 @@ public class RegistryBandwidthService { /** * Records in the DB are kept by millis on a one day cycle + * * @param current * @return */ diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/PriorityRetrievalScheduler.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/PriorityRetrievalScheduler.java index ea219e34b4..fea640da4f 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/PriorityRetrievalScheduler.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/PriorityRetrievalScheduler.java @@ -31,6 +31,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil; * Jan 25, 2013 1528 djohnson Lower priority requests should not be able to unschedule higher priority requests. * Jun 25, 2013 2106 djohnson Access bandwidth bucket contents through RetrievalPlan. * Dec 17, 2013 2636 bgonzale When adding to buckets, call the constrained method. + * Feb 14, 2014 2636 mpduff Clean up logging. * * * @version 1.0 @@ -58,7 +59,7 @@ public class PriorityRetrievalScheduler implements IRetrievalScheduler { long startTimeMillis = startTime.getTimeInMillis(); long endTimeMillis = endTime.getTimeInMillis(); - + if (startTimeMillis > endTimeMillis) { throw new IllegalArgumentException(String.format( "Invalid start and end times passed for allocation [%s]: " @@ -164,17 +165,17 @@ public class PriorityRetrievalScheduler implements IRetrievalScheduler { } private List reprioritize(RetrievalPlan plan, - BandwidthAllocation request, - Long startKey, Long endKey) { + BandwidthAllocation request, Long startKey, Long endKey) { - statusHandler.info("Re-prioritizing necessary for BandwidthAllocation[" - + request + "]"); + statusHandler + .debug("Re-prioritizing necessary for BandwidthAllocation: " + + request); // Look in the window between start and end times to see if there are // lower priority // retrievals that can be moved.. - SortedSet window = plan - .getBucketsInWindow(startKey, endKey); + SortedSet window = plan.getBucketsInWindow(startKey, + endKey); boolean enoughBandwidth = false; long total = 0; diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/util/BandwidthDaoUtil.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/util/BandwidthDaoUtil.java index 2c452e5b18..20cbbd119c 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/util/BandwidthDaoUtil.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/util/BandwidthDaoUtil.java @@ -46,6 +46,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthDataSetUpdate; import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthSubscription; import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDao; import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalManager; +import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalPlan; import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalStatus; /** @@ -82,6 +83,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalStatus; * because the calculate start and end time methods no longer use * active period. * Jan 29, 2014 2636 mpduff Scheduling refactor. + * Feb 11, 2014 2636 mpduff Change how retrieval times are calculated. * * * @author djohnson @@ -120,10 +122,9 @@ public class BandwidthDaoUtil { * @return */ public SortedSet getRetrievalTimes( - Subscription subscription, SortedSet cycles, - Calendar start, Calendar end) { + Subscription subscription, SortedSet cycles) { return getRetrievalTimes(subscription, cycles, - Sets.newTreeSet(Arrays.asList(0)), start, end); + Sets.newTreeSet(Arrays.asList(0))); } /** @@ -136,8 +137,7 @@ public class BandwidthDaoUtil { * @return the retrieval times */ public SortedSet getRetrievalTimes( - Subscription subscription, int retrievalInterval, - Calendar start, Calendar end) { + Subscription subscription, int retrievalInterval) { // Add all hours of the days final SortedSet hours = Sets.newTreeSet(); for (int i = 0; i < TimeUtil.HOURS_PER_DAY; i++) { @@ -151,7 +151,7 @@ public class BandwidthDaoUtil { minutes.add(i); } - return getRetrievalTimes(subscription, hours, minutes, start, end); + return getRetrievalTimes(subscription, hours, minutes); } /** @@ -164,74 +164,76 @@ public class BandwidthDaoUtil { * The set of hours * @param minutes * The set of minutes - * @param startTime - * The start time - * @param endTime - * The end time * @return Set of retrieval times */ private SortedSet getRetrievalTimes( Subscription subscription, SortedSet hours, - SortedSet minutes, Calendar startTime, Calendar endTime) { + SortedSet minutes) { SortedSet subscriptionTimes = new TreeSet(); + RetrievalPlan plan = retrievalManager.getPlan(subscription.getRoute()); + Calendar planStart = plan.getPlanStart(); + Calendar planEnd = plan.getPlanEnd(); + // starting time when when subscription is first valid for scheduling // based on plan start and subscription start. Calendar subscriptionCalculatedStart = subscription - .calculateStart(startTime); + .calculateStart(planStart); // end time when when subscription is last valid for scheduling based on // plan end and subscription end. - Calendar subscriptionCalculatedEnd = subscription.calculateEnd(endTime); + Calendar subscriptionCalculatedEnd = subscription.calculateEnd(planEnd); if (statusHandler.isPriorityEnabled(Priority.DEBUG)) { - statusHandler.debug("**** PlanStart: " + startTime.getTime()); - statusHandler.debug("**** PlanEnd : " + endTime.getTime()); + statusHandler.debug("**** PlanStart: " + planStart.getTime()); + statusHandler.debug("**** PlanEnd : " + planEnd.getTime()); statusHandler.debug("**** CalculatedStart: " + subscriptionCalculatedStart.getTime()); statusHandler.debug("**** CalculatedEnd : " + subscriptionCalculatedEnd.getTime()); } - // drop the start time by 6 hours to account for 4 cycle/day models subscriptionCalculatedStart = TimeUtil.minCalendarFields( subscriptionCalculatedStart, Calendar.MINUTE, Calendar.SECOND, Calendar.MILLISECOND); - subscriptionCalculatedStart.add(Calendar.HOUR_OF_DAY, -6); - Calendar start = (Calendar) subscriptionCalculatedStart.clone(); - outerloop: while (!start.after(subscriptionCalculatedEnd)) { + // drop the start time by 6 hours to account for 4 cycle/day models + subscriptionCalculatedStart.add(Calendar.HOUR_OF_DAY, -6); + Calendar start = TimeUtil.newGmtCalendar(subscriptionCalculatedStart + .getTime()); + + int availabilityOffset = 0; + try { + availabilityOffset = BandwidthUtil.getDataSetAvailablityOffset( + subscription, start); + } catch (RegistryHandlerException e) { + // Error occurred querying the registry. Log and continue on + statusHandler + .handle(Priority.PROBLEM, + "Unable to retrieve data availability offset, using 0 for the offset.", + e); + } + + while (!start.after(subscriptionCalculatedEnd)) { for (Integer cycle : hours) { start.set(Calendar.HOUR_OF_DAY, cycle); - // start base equal-to-or-after subscriptionStart - if (start.compareTo(subscriptionCalculatedStart) >= 0) { - for (Integer minute : minutes) { - start.set(Calendar.MINUTE, minute); + for (Integer minute : minutes) { + start.set(Calendar.MINUTE, minute); + Calendar retrievalTime = TimeUtil.newGmtCalendar(); + retrievalTime.setTimeInMillis(start.getTimeInMillis()); + retrievalTime.add(Calendar.MINUTE, availabilityOffset); - // start minutes equal-to-or-after subscriptionStart - if (start.compareTo(subscriptionCalculatedStart) >= 0) { - // Check for nonsense - if (start.after(subscriptionCalculatedEnd)) { - break outerloop; - } else { - Calendar time = TimeUtil.newCalendar(); - time.setTimeInMillis(start.getTimeInMillis()); - /** - * Fine grain check by hour and minute, for - * subscription(start/end), - * activePeriod(start/end) - **/ - // Subscription Start and End time first - if (time.after(subscriptionCalculatedEnd) - || time.before(start) - || !subscription - .inActivePeriodWindow(time)) { - // don't schedule this retrieval time, - // outside subscription window - continue; - } - - subscriptionTimes.add(time); - } + if (retrievalTime.after(planStart) + && retrievalTime.before(planEnd)) { + // Check for nonsense + /* + * Fine grain check by hour and minute, for + * subscription(start/end), activePeriod(start/end) + */ + if (!subscription.inActivePeriodWindow(retrievalTime)) { + // don't schedule this retrieval time, + // outside subscription window + continue; } + subscriptionTimes.add(retrievalTime); } } }