Issue #2636 - Change how retrieval plan is updated over time.

Peer review comments

Change-Id: If7308ad0300460ed14bc4b53707b81a6eba51950

Former-commit-id: 89bd047283 [formerly 0144c23a35b6e7134d5c4caecbaa78f6b0c613f5]
Former-commit-id: 911789b910
This commit is contained in:
Mike Duff 2014-02-10 14:41:59 -06:00
parent 3be3380944
commit 5f9d913c6d
11 changed files with 372 additions and 280 deletions

View file

@ -69,6 +69,7 @@ import com.raytheon.uf.common.time.util.TimeUtil;
* Jan 24, 2014 2709 bgonzale Fix setting of active period end. Change active period checks
* to check day of year. removed now unused active period methods.
* Jan 28, 2014 2636 mpduff Changed to use GMT calendar.
* Feb 12, 2014 2636 mpduff Return new instance of calculated start and end.
*
* </pre>
*
@ -500,7 +501,7 @@ public abstract class RecurringSubscription<T extends Time, C extends Coverage>
return TimeUtil.newGmtCalendar(subscriptionStart);
}
return startConstraint;
return TimeUtil.newGmtCalendar(startConstraint.getTime());
}
@Override
@ -516,7 +517,7 @@ public abstract class RecurringSubscription<T extends Time, C extends Coverage>
return TimeUtil.newGmtCalendar(subscriptionEnd);
}
return endConstraint;
return TimeUtil.newGmtCalendar(endConstraint.getTime());
}
/**
@ -1072,6 +1073,7 @@ public abstract class RecurringSubscription<T extends Time, C extends Coverage>
/**
* @return the subscriptionState
*/
@Override
public SubscriptionState getSubscriptionState() {
return subscriptionState;
}
@ -1080,6 +1082,7 @@ public abstract class RecurringSubscription<T extends Time, C extends Coverage>
* @param subscriptionState
* the subscriptionState to set
*/
@Override
public void setSubscriptionState(SubscriptionState subscriptionState) {
this.subscriptionState = subscriptionState;
}

View file

@ -29,9 +29,6 @@
<bean id="bandwidthDbInit" factory-bean="bandwidthContextFactory"
factory-method="getBandwidthDbInit" />
<bean id="bandwidthManagerInitializer" factory-bean="bandwidthContextFactory"
factory-method="getBandwidthInitializer" />
<bean id="bandwidthMapConfigFile" factory-bean="bandwidthContextFactory"
factory-method="getBandwidthMapConfigFile" />
@ -43,7 +40,7 @@
<constructor-arg ref="retrievalManager" />
<constructor-arg ref="bandwidthDaoUtil" />
<property name="aggregator" ref="aggregator" />
<property name="initializer" ref="bandwidthManagerInitializer" />
<property name="initializer" ref="bandwidthInitializer" />
</bean>
<bean id="bandwidthUtil"

View file

@ -146,6 +146,7 @@ import com.raytheon.uf.edex.registry.ebxml.exception.EbxmlRegistryException;
* Feb 06, 2014 2636 bgonzale fix overwrite of unscheduled subscription list. fix scheduling
* of already scheduled BandwidthAllocations.
* Feb 11, 2014 2771 bgonzale Added handler for GET_DATADELIVERY_ID request.
* Feb 10, 2014 2636 mpduff Changed how retrieval plan is updated over time.
*
* </pre>
*
@ -180,10 +181,6 @@ public abstract class BandwidthManager<T extends Time, C extends Coverage>
@VisibleForTesting
final RetrievalManager retrievalManager;
/** Map of Network->previous retrieval plan end time */
private final Map<Network, Calendar> previousRetrievalEndMap = new HashMap<Network, Calendar>(
1);
public BandwidthManager(IBandwidthDbInit dbInit,
IBandwidthDao<T, C> bandwidthDao,
RetrievalManager retrievalManager,
@ -192,11 +189,6 @@ public abstract class BandwidthManager<T extends Time, C extends Coverage>
this.bandwidthDao = bandwidthDao;
this.retrievalManager = retrievalManager;
this.bandwidthDaoUtil = bandwidthDaoUtil;
for (Network network : retrievalManager.getRetrievalPlans().keySet()) {
RetrievalPlan plan = retrievalManager.getRetrievalPlans().get(
network);
previousRetrievalEndMap.put(network, plan.getPlanEnd());
}
}
/**
@ -228,9 +220,9 @@ public abstract class BandwidthManager<T extends Time, C extends Coverage>
}
private List<BandwidthAllocation> schedule(Subscription<T, C> subscription,
SortedSet<Integer> cycles, Calendar start, Calendar end) {
SortedSet<Integer> cycles) {
SortedSet<Calendar> retrievalTimes = bandwidthDaoUtil
.getRetrievalTimes(subscription, cycles, start, end);
.getRetrievalTimes(subscription, cycles);
return scheduleSubscriptionForRetrievalTimes(subscription,
retrievalTimes);
@ -247,9 +239,9 @@ public abstract class BandwidthManager<T extends Time, C extends Coverage>
* @return the list of unscheduled subscriptions
*/
private List<BandwidthAllocation> schedule(Subscription<T, C> subscription,
int retrievalInterval, Calendar start, Calendar end) {
int retrievalInterval) {
SortedSet<Calendar> retrievalTimes = bandwidthDaoUtil
.getRetrievalTimes(subscription, retrievalInterval, start, end);
.getRetrievalTimes(subscription, retrievalInterval);
return scheduleSubscriptionForRetrievalTimes(subscription,
retrievalTimes);
@ -278,13 +270,16 @@ public abstract class BandwidthManager<T extends Time, C extends Coverage>
final int numberOfRetrievalTimes = retrievalTimes.size();
List<BandwidthSubscription> newSubscriptions = Lists
.newArrayListWithCapacity(numberOfRetrievalTimes);
statusHandler.info("Scheduling subscription " + subscription.getName());
for (Calendar retrievalTime : retrievalTimes) {
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
statusHandler.info("Scheduling subscription ["
+ subscription.getName()
+ String.format(
"] retrievalTime [%1$tY%1$tm%1$td%1$tH%1$tM",
retrievalTime) + "]");
}
// Add the current subscription to the ones BandwidthManager already
// knows about.
@ -322,7 +317,7 @@ public abstract class BandwidthManager<T extends Time, C extends Coverage>
statusHandler.info("Scheduling subscription ["
+ dao.getName()
+ String.format(
"] baseReferenceTime [%1$tY%1$tm%1$td%1$tH%1$tM",
"] baseReferenceTime %1$tY%1$tm%1$td%1$tH%1$tM",
retrievalTime));
return aggregate(new BandwidthSubscriptionContainer(subscription,
@ -344,24 +339,27 @@ public abstract class BandwidthManager<T extends Time, C extends Coverage>
List<SubscriptionRetrieval> retrievals = getAggregator().aggregate(
bandwidthSubscriptions);
timer.lap("aggregator");
if (CollectionUtil.isNullOrEmpty(retrievals)) {
return new ArrayList<BandwidthAllocation>(0);
}
// Create a separate list of BandwidthReservations to schedule
// as the aggregation process may return all subsumed
// SubscriptionRetrievals
// for the specified Subscription.
/*
* Create a separate list of BandwidthReservations to schedule as the
* aggregation process may return all subsumedSubscriptionRetrievalsfor
* the specified Subscription.
*/
List<SubscriptionRetrieval> reservations = new ArrayList<SubscriptionRetrieval>();
for (SubscriptionRetrieval retrieval : retrievals) {
// New RetrievalRequests will be marked as "PROCESSING"
// we need to make new BandwidthReservations for these
// SubscriptionRetrievals.
// TODO: How to process "rescheduled" RetrievalRequests
// in the case where subscription aggregation has determined
// that an existing subscription has now be subsumed or
// altered to accommodate a new super set of subscriptions...
//
/*
* New RetrievalRequests will be marked as "PROCESSING" we need to
* make new BandwidthReservations for these SubscriptionRetrievals.
*/
/*
* TODO: How to process "rescheduled" RetrievalRequests in the case
* where subscription aggregation has determined that an existing
* subscription has now be subsumed or altered to accommodate a new
* super set of subscriptions...
*/
if ((retrieval.getStatus().equals(RetrievalStatus.RESCHEDULE) || retrieval
.getStatus().equals(RetrievalStatus.PROCESSING))
&& !retrieval.isSubsumed()) {
@ -370,23 +368,13 @@ public abstract class BandwidthManager<T extends Time, C extends Coverage>
.getBandwidthSubscription();
Calendar retrievalTime = bandwidthSubscription
.getBaseReferenceTime();
Calendar startTime = TimeUtil.newCalendar(retrievalTime);
Calendar startTime = TimeUtil.newGmtCalendar(retrievalTime
.getTime());
int delayMinutes = retrieval.getDataSetAvailablityDelay();
int maxLatency = retrieval.getSubscriptionLatency();
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
statusHandler.debug("Adding availability minutes of ["
+ delayMinutes
+ "] to retrieval start time of "
+ String.format("[%1$tY%1$tm%1$td%1$tH%1$tM]",
retrievalTime));
}
startTime.add(Calendar.MINUTE, delayMinutes);
retrieval.setStartTime(startTime);
Calendar endTime = TimeUtil.newCalendar();
Calendar endTime = TimeUtil.newGmtCalendar();
endTime.setTimeInMillis(startTime.getTimeInMillis());
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
@ -459,94 +447,81 @@ public abstract class BandwidthManager<T extends Time, C extends Coverage>
return unscheduled;
}
/**
* Schedule a single subscription.
*
* @param sub
* The subscription to schedule
* @param fullSchedule
* true to schedule for the full retrieval plan, false to
* schedule from the last time
* @return List of BandwidthAllocations that sould be unscheduled.
*/
public List<BandwidthAllocation> schedule(Subscription<T, C> sub,
boolean fullSchedule) {
Map<Network, List<Subscription<T, C>>> map = new HashMap<Network, List<Subscription<T, C>>>(
1, 1);
List<Subscription<T, C>> list = new ArrayList<Subscription<T, C>>(1);
list.add(sub);
map.put(sub.getRoute(), list);
return schedule(map, fullSchedule);
}
/**
* {@inheritDoc}
*/
@Override
public List<BandwidthAllocation> schedule(
Map<Network, List<Subscription<T, C>>> subMap, boolean fullSchedule) {
List<BandwidthAllocation> unscheduled = new ArrayList<BandwidthAllocation>();
public List<BandwidthAllocation> schedule(Subscription<T, C> subscription) {
List<BandwidthAllocation> unscheduled = null;
for (Network network : subMap.keySet()) {
RetrievalPlan retrievalPlan = retrievalManager.getPlan(network);
/*
* Determine scheduling window, true for whole plan, false to run
* from last time
*/
Calendar start = retrievalPlan.getPlanStart();
Calendar end = retrievalPlan.getPlanEnd();
if (!fullSchedule) {
if (!end.equals(this.previousRetrievalEndMap.get(network))) {
start = this.previousRetrievalEndMap.get(network);
} else {
return unscheduled;
}
}
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
statusHandler.debug("Check for scheduling window: "
+ start.getTime() + " - " + end.getTime());
}
if (end.getTimeInMillis() - start.getTimeInMillis() >= retrievalPlan
.getBucketMinutes() * 2 * TimeUtil.MILLIS_PER_MINUTE) {
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
statusHandler.debug("Scheduling for window: "
+ start.getTime() + " - " + end.getTime());
}
this.previousRetrievalEndMap.put(network,
TimeUtil.newGmtCalendar(end.getTime()));
for (Subscription subscription : subMap.get(network)) {
statusHandler.info("Scheduling subscription"
+ subscription.getName());
List<BandwidthAllocation> unscheduledForThisSub = new ArrayList<BandwidthAllocation>();
final DataType dataSetType = subscription.getDataSetType();
switch (dataSetType) {
case GRID:
unscheduledForThisSub = handleGridded(subscription,
start, end);
unscheduled = handleGridded(subscription);
break;
case POINT:
unscheduledForThisSub = handlePoint(subscription,
start, end);
unscheduled = handlePoint(subscription);
break;
default:
throw new IllegalArgumentException(
"The BandwidthManager doesn't know how to treat subscriptions with data type ["
+ dataSetType + "]!");
}
unscheduleSubscriptionsForAllocations(unscheduledForThisSub);
unscheduled.addAll(unscheduledForThisSub);
}
}
}
unscheduleSubscriptionsForAllocations(unscheduled);
return unscheduled;
}
/**
* Update the retrieval plan for this subscription.
*
* @param subscription
* The subscription that needs its scheduling updated
*/
private void updateSchedule(Subscription subscription) {
final DataType dataSetType = subscription.getDataSetType();
switch (dataSetType) {
case GRID:
updateGriddedSchedule(subscription);
break;
case POINT:
updatePointSchedule(subscription);
break;
default:
throw new IllegalArgumentException(
"The BandwidthManager doesn't know how to treat subscriptions with data type ["
+ dataSetType + "]!");
}
}
/*
* (non-Javadoc)
*
* @see com.raytheon.uf.edex.datadelivery.bandwidth.IBandwidthManager#
* updateSchedule(com.raytheon.uf.common.datadelivery.registry.Network)
*/
@Override
public int updateSchedule(Network network) {
List<Subscription> subsToSchedule = getSubscriptionsToSchedule(network);
if (CollectionUtil.isNullOrEmpty(subsToSchedule)) {
return 0;
}
for (Subscription subscription : subsToSchedule) {
updateSchedule(subscription);
}
return subsToSchedule.size();
}
/**
* Get the subscriptions to schedule for the given network.
*
* @param network
* The network
* @return List of subscriptions for the network
*/
protected abstract List<Subscription> getSubscriptionsToSchedule(
Network network);
/**
* Unschedules all subscriptions the allocations are associated to.
*
@ -667,7 +642,7 @@ public abstract class BandwidthManager<T extends Time, C extends Coverage>
if (bandwidthSubscriptions.isEmpty()
&& ((RecurringSubscription) subscription).shouldSchedule()
&& !subscription.isUnscheduled()) {
return schedule(subscription, true);
return schedule(subscription);
} else if (subscription.getStatus() == SubscriptionStatus.DEACTIVATED
|| subscription.isUnscheduled()) {
// See if the subscription was deactivated or unscheduled..
@ -677,12 +652,70 @@ public abstract class BandwidthManager<T extends Time, C extends Coverage>
} else {
// Normal update, unschedule old allocations and create new ones
List<BandwidthAllocation> unscheduled = remove(bandwidthSubscriptions);
unscheduled.addAll(schedule(subscription, true));
unscheduled.addAll(schedule(subscription));
return unscheduled;
}
}
}
/**
* Update this point subscription's schedule.
*
* @param Subscription
* The subscription that needs its schedule updated
*/
private void updatePointSchedule(Subscription sub) {
SortedSet<Calendar> retrievalTimes = bandwidthDaoUtil
.getRetrievalTimes(sub,
((PointTime) sub.getTime()).getInterval());
scheduleUpdates(sub, retrievalTimes);
}
/**
* Update this grid subscription's schedule.
*
* @param Subscription
* The subscription that needs its schedule updated
*/
private void updateGriddedSchedule(Subscription sub) {
final List<Integer> cycles = ((GriddedTime) sub.getTime())
.getCycleTimes();
SortedSet<Calendar> retrievalTimes = bandwidthDaoUtil
.getRetrievalTimes(sub, Sets.newTreeSet(cycles));
scheduleUpdates(sub, retrievalTimes);
}
/**
* Schedule retrievals for this subscription for the provided retrieval
* times.
*
* @param sub
* The subscription
* @param retrievalTimes
* The retrieval times
*/
private void scheduleUpdates(Subscription sub,
SortedSet<Calendar> retrievalTimes) {
List<BandwidthSubscription> currentBandwidthSubscriptions = bandwidthDao
.getBandwidthSubscription(sub);
// Remove any times already associated with a retrieval allocation
for (BandwidthSubscription bs : currentBandwidthSubscriptions) {
retrievalTimes.remove(bs.getBaseReferenceTime());
}
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
statusHandler.info("Scheduling " + sub.getName());
for (Calendar c : retrievalTimes) {
statusHandler.info("Scheduling for " + c.getTime());
}
}
scheduleSubscriptionForRetrievalTimes(sub, retrievalTimes);
}
/**
* Handle scheduling point data type subscriptions.
*
@ -691,9 +724,9 @@ public abstract class BandwidthManager<T extends Time, C extends Coverage>
* @return the list of unscheduled subscriptions
*/
private List<BandwidthAllocation> handlePoint(
Subscription<T, C> subscription, Calendar start, Calendar end) {
Subscription<T, C> subscription) {
List<BandwidthAllocation> unscheduled = schedule(subscription,
((PointTime) subscription.getTime()).getInterval(), start, end);
((PointTime) subscription.getTime()).getInterval());
unscheduled.addAll(getMostRecent(subscription, false));
return unscheduled;
}
@ -706,7 +739,7 @@ public abstract class BandwidthManager<T extends Time, C extends Coverage>
* @return the list of unscheduled subscriptions
*/
private List<BandwidthAllocation> handleGridded(
Subscription<T, C> subscription, Calendar start, Calendar end) {
Subscription<T, C> subscription) {
final List<Integer> cycles = ((GriddedTime) subscription.getTime())
.getCycleTimes();
final boolean subscribedToCycles = !CollectionUtil
@ -716,8 +749,7 @@ public abstract class BandwidthManager<T extends Time, C extends Coverage>
// expected times
List<BandwidthAllocation> unscheduled = Collections.emptyList();
if (subscribedToCycles) {
unscheduled = schedule(subscription, Sets.newTreeSet(cycles),
start, end);
unscheduled = schedule(subscription, Sets.newTreeSet(cycles));
}
return unscheduled;
@ -1196,10 +1228,6 @@ public abstract class BandwidthManager<T extends Time, C extends Coverage>
try {
proposedBwManager = startProposedBandwidthManager(copyOfCurrentMap);
IBandwidthRequest<T, C> request = new IBandwidthRequest<T, C>();
request.setRequestType(RequestType.SCHEDULE_SUBSCRIPTION);
request.setSubscriptions(subscriptions);
unscheduled = proposedBwManager
.scheduleSubscriptions(subscriptions);
} finally {
@ -1450,7 +1478,7 @@ public abstract class BandwidthManager<T extends Time, C extends Coverage>
// Now for each subscription, attempt to schedule bandwidth
for (Subscription<T, C> subscription : actualSubscriptions) {
unscheduled.addAll(this.schedule(subscription, true));
unscheduled.addAll(this.schedule(subscription));
}
} else {
// Otherwise we can just copy the entire state of the current system

View file

@ -23,7 +23,6 @@ import static com.raytheon.uf.common.registry.ebxml.encoder.RegistryEncoders.Typ
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
@ -69,6 +68,7 @@ import com.raytheon.uf.common.registry.handler.IRegistryObjectHandler;
import com.raytheon.uf.common.registry.handler.RegistryHandlerException;
import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.time.util.IPerformanceTimer;
import com.raytheon.uf.common.time.util.TimeUtil;
import com.raytheon.uf.common.util.CollectionUtil;
import com.raytheon.uf.common.util.FileUtil;
@ -125,7 +125,7 @@ import com.raytheon.uf.edex.datadelivery.util.DataDeliveryIdUtil;
* bandwidth manager to perform the scheduling initialization
* because of efficiency.
* Feb 11, 2014 2771 bgonzale Use Data Delivery ID instead of Site.
*
* Feb 10, 2014 2636 mpduff Pass Network map to be scheduled.
* </pre>
*
* @author djohnson
@ -189,7 +189,7 @@ public abstract class EdexBandwidthManager<T extends Time, C extends Coverage>
this.findSubscriptionsStrategy = findSubscriptionsStrategy;
// schedule maintenance tasks
scheduler = Executors.newScheduledThreadPool(1);
scheduler = Executors.newSingleThreadScheduledExecutor();
}
@Override
@ -218,9 +218,9 @@ public abstract class EdexBandwidthManager<T extends Time, C extends Coverage>
subsToSchedule.add(s);
}
}
unscheduledNames
.addAll(scheduleSubscriptions(subsToSchedule));
} else {
unscheduled.addAll(scheduleSubscriptions(subsToSchedule));
unscheduledNames.addAll(unscheduled);
}
}
@ -238,13 +238,12 @@ public abstract class EdexBandwidthManager<T extends Time, C extends Coverage>
// scheduler.setRemoveOnCancelPolicy(true);
scheduler.scheduleAtFixedRate(watchForConfigFileChanges, 1, 1,
TimeUnit.MINUTES);
scheduler.scheduleAtFixedRate(new MaintenanceTask(), 5, 5,
scheduler.scheduleAtFixedRate(new MaintenanceTask(), 30, 30,
TimeUnit.MINUTES);
}
return unscheduledNames;
}
/**
* {@inheritDoc}
*/
@ -808,42 +807,74 @@ public abstract class EdexBandwidthManager<T extends Time, C extends Coverage>
}
}
/*
* (non-Javadoc)
*
* @see com.raytheon.uf.edex.datadelivery.bandwidth.BandwidthManager#
* getSubscriptionsToSchedule
* (com.raytheon.uf.common.datadelivery.registry.Network)
*/
@Override
protected List<Subscription> getSubscriptionsToSchedule(Network network) {
List<Subscription> subList = new ArrayList<Subscription>(0);
try {
Map<Network, List<Subscription>> activeSubs = findSubscriptionsStrategy
.findSubscriptionsToSchedule();
if (activeSubs.get(network) != null) {
subList = activeSubs.get(network);
}
} catch (Exception e) {
statusHandler.handle(Priority.PROBLEM,
"Error retrieving subscriptions.", e);
}
return subList;
}
/**
* Private inner work thread used to keep the RetrievalPlans up to date.
*/
private class MaintenanceTask implements Runnable {
@Override
public void run() {
IPerformanceTimer timer = TimeUtil.getPerformanceTimer();
timer.start();
statusHandler.info("MaintenanceTask starting...");
for (RetrievalPlan plan : retrievalManager.getRetrievalPlans()
.values()) {
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
statusHandler.info("MaintenanceTask: " + plan.getNetwork());
statusHandler.info("MaintenanceTask: planStart: "
+ plan.getPlanStart().getTime());
statusHandler.info("MaintenanceTask: planEnd: "
+ plan.getPlanEnd().getTime());
}
plan.resize();
Calendar newEnd = plan.getPlanEnd();
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
statusHandler.info("MaintenanceTask: resized planStart: "
+ plan.getPlanStart().getTime());
statusHandler.info("MaintenanceTask: resized planEnd: "
+ plan.getPlanEnd().getTime());
statusHandler.info("MaintenanceTask: Update schedule");
}
// Find DEFERRED Allocations and load them into the plan...
List<BandwidthAllocation> deferred = bandwidthDao.getDeferred(
plan.getNetwork(), newEnd);
plan.getNetwork(), plan.getPlanEnd());
if (!deferred.isEmpty()) {
retrievalManager.schedule(deferred);
}
}
try {
Map<Network, List<Subscription>> activeSubs = findSubscriptionsStrategy
.findSubscriptionsToSchedule();
for (Network network : activeSubs.keySet()) {
for (Subscription sub : activeSubs.get(network)) {
statusHandler.debug("MaintenanceTask scheduling for "
+ sub.getName());
schedule(sub, false);
int numSubsProcessed = 0;
for (RetrievalPlan plan : retrievalManager.getRetrievalPlans()
.values()) {
numSubsProcessed += updateSchedule(plan.getNetwork());
}
}
} catch (Exception e) {
statusHandler.handle(Priority.PROBLEM,
"Error requesting subscriptions from registry.", e);
}
statusHandler.info("MaintenanceTask complete");
timer.stop();
statusHandler.info("MaintenanceTask complete: "
+ timer.getElapsed() + " - " + numSubsProcessed
+ " Subscriptions processed.");
}
}
}

View file

@ -49,6 +49,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.interfaces.ISubscriptionAggre
* Jan 08, 2014 2615 bgonzale Added scheduleAdoc method.
* Jan 29, 2014 2636 mpduff Scheduling refactor.
* Feb 06, 2014 2636 bgonzale added initializeScheduling method.
* Fev 12, 2014 2636 mpduff Add updateSchedule method.
* </pre>
*
* @author djohnson
@ -64,9 +65,17 @@ public interface IBandwidthManager<T extends Time, C extends Coverage> {
* @return A map of bandwidth allocations that are not scheduled by
* subscription name
*/
List<BandwidthAllocation> schedule(
Map<Network, List<Subscription<T, C>>> subscriptions,
boolean fullSchedule);
List<BandwidthAllocation> schedule(Subscription<T, C> subscription);
/**
* Update the retrieval plan scheduling.
*
* @param Network
* the network to update
*
* @return number of subscriptions processed
*/
int updateSchedule(Network network);
/**
* Schedule AdhocSubscription to run as soon as the RetrievalPlan will

View file

@ -58,13 +58,15 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthDaoUtil;
* Oct 2, 2013 1797 dhladky Generics
* Dec 04, 2013 2566 bgonzale use bandwidthmanager method to retrieve spring files.
* Feb 06, 2014 2636 bgonzale added initializeScheduling method.
* Feb 12, 2014 2636 mpduff Override getSubscriptionsToSchedule
*
* </pre>
*
* @author djohnson
* @version 1.0
*/
class InMemoryBandwidthManager<T extends Time, C extends Coverage> extends BandwidthManager<T, C> {
class InMemoryBandwidthManager<T extends Time, C extends Coverage> extends
BandwidthManager<T, C> {
private static final IUFStatusHandler statusHandler = UFStatus
.getHandler(InMemoryBandwidthManager.class);
@ -122,8 +124,8 @@ class InMemoryBandwidthManager<T extends Time, C extends Coverage> extends Bandw
* @param bandwidthDaoUtil
*/
public InMemoryBandwidthManager(IBandwidthDbInit dbInit,
IBandwidthDao<T,C> bandwidthDao, RetrievalManager retrievalManager,
BandwidthDaoUtil bandwidthDaoUtil) {
IBandwidthDao<T, C> bandwidthDao,
RetrievalManager retrievalManager, BandwidthDaoUtil bandwidthDaoUtil) {
super(dbInit, bandwidthDao, retrievalManager, bandwidthDaoUtil);
}
@ -149,7 +151,8 @@ class InMemoryBandwidthManager<T extends Time, C extends Coverage> extends Bandw
*/
@Override
protected Set<String> scheduleSbnSubscriptions(
List<Subscription<T,C>> subscriptions) throws SerializationException {
List<Subscription<T, C>> subscriptions)
throws SerializationException {
return scheduleSubscriptions(subscriptions);
}
@ -177,4 +180,9 @@ class InMemoryBandwidthManager<T extends Time, C extends Coverage> extends Bandw
return new ArrayList<String>(0);
}
@Override
protected List<Subscription> getSubscriptionsToSchedule(Network network) {
// Nothing to do for in-memory version
return new ArrayList<Subscription>(0);
}
}

View file

@ -40,6 +40,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthBucketDao;
* ------------ ---------- ----------- --------------------------
* Jun 25, 2013 2106 djohnson Initial creation
* Dec 3, 2013 1736 dhladky Bandwidth bucket size attenuation.
* Feb 11, 2013 2636 mpduff Changed GET_WHERE_START_TIME_IS_BETWEEN_INCLUSIVE query.
*
* </pre>
*
@ -63,7 +64,7 @@ public class BandwidthBucketDao extends
private static final String GET_BY_LATEST_START_TIME = "from BandwidthBucket bb where bb.network = :network and bb.bucketStartTime = "
+ "(select max(bucketStartTime) from BandwidthBucket bb where bb.network = :network)";
private static final String GET_WHERE_START_TIME_IS_BETWEEN_INCLUSIVE = "from BandwidthBucket bb where bb.network = :network and bb.bucketStartTime between :earliestTime and :latestTime";
private static final String GET_WHERE_START_TIME_IS_BETWEEN_INCLUSIVE = "from BandwidthBucket bb where bb.network = :network and bb.bucketStartTime >= :earliestTime and bb.bucketEndTime <= :latestTime";
/**
* {@inheritDoc}
@ -155,8 +156,7 @@ public class BandwidthBucketDao extends
* {@inheritDoc}
*/
@Override
public BandwidthBucket getBucketContainingTime(long millis,
Network network) {
public BandwidthBucket getBucketContainingTime(long millis, Network network) {
List<BandwidthBucket> buckets = getWhereStartTimeIsLessThanOrEqualTo(
millis, network);

View file

@ -8,6 +8,7 @@ import com.raytheon.uf.common.datadelivery.registry.Subscription;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.util.StringUtil;
import com.raytheon.uf.edex.datadelivery.bandwidth.IBandwidthManager;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDbInit;
import com.raytheon.uf.edex.datadelivery.bandwidth.interfaces.BandwidthInitializer;
@ -37,6 +38,7 @@ import com.raytheon.uf.edex.datadelivery.util.DataDeliveryIdUtil;
* Jan 29, 2014 2636 mpduff Scheduling refactor.
* Feb 06, 2014 2636 bgonzale Use scheduling initialization method after registry init.
* Feb 11, 2014 2771 bgonzale Use Data Delivery ID instead of Site.
* Feb 14, 2014 2636 mpduff Clean up logging
* </pre>
*
* @author djohnson
@ -103,13 +105,15 @@ public class HibernateBandwidthInitializer implements BandwidthInitializer {
Map<Network, List<Subscription>> subMap = findSubscriptionsStrategy
.findSubscriptionsToSchedule();
List<String> unscheduled = instance
.initializeScheduling(subMap);
List<String> unscheduled = instance.initializeScheduling(subMap);
if (!unscheduled.isEmpty()) {
StringBuilder sb = new StringBuilder("The following subscriptions could not be scheduled at startup: ");
sb.append(StringUtil.NEWLINE);
for (String subscription : unscheduled) {
statusHandler.handle(Priority.PROBLEM,
"The following subscription was not initially scheduled: "
+ subscription);
sb.append(subscription).append(" ");
}
statusHandler.handle(Priority.INFO, sb.toString());
}
} catch (Exception e) {
statusHandler.error(

View file

@ -1,4 +1,5 @@
package com.raytheon.uf.edex.datadelivery.bandwidth.registry;
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
@ -28,6 +29,7 @@ import com.raytheon.uf.common.time.util.TimeUtil;
import com.raytheon.uf.edex.database.DataAccessLayerException;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthBucket;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthBucketDao;
/**
* Registry Bandwidth Service.
*
@ -38,6 +40,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthBucketDao;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Nov 16, 2013 1736 dhladky Initial creation
* Feb 14, 2014 2636 mpduff Logging cleanup.
*
* </pre>
*
@ -63,10 +66,12 @@ public class RegistryBandwidthService {
/**
* Construct an instance
*
* @param bucketDao
* @param network
*/
public RegistryBandwidthService(IBandwidthBucketDao bucketDao, Network network, int bucketSize) {
public RegistryBandwidthService(IBandwidthBucketDao bucketDao,
Network network, int bucketSize) {
this.bucketDao = bucketDao;
this.network = network;
this.bucketSize = bucketSize;
@ -74,6 +79,7 @@ public class RegistryBandwidthService {
/**
* Gives a time averaged bandwidth for the current time.
*
* @return
*/
public Integer getCurrentRegistryBandwidth() {
@ -85,10 +91,6 @@ public class RegistryBandwidthService {
if (rbr != null) {
// convert to kb per/second
return convertBytesToKilobytes(rbr.getBytes());
} else {
statusHandler
.handle(Priority.WARN,
"No active registry bandwidth information for current time.");
}
}
@ -99,6 +101,7 @@ public class RegistryBandwidthService {
/**
* Gives the current full record. Which is the previous record time wise
* because the query is back one full bucketSize in millis.
*
* @return
*/
public RegistryBandwidthRecord getCurrentRegistryBandwidthRecord() {
@ -156,6 +159,7 @@ public class RegistryBandwidthService {
/**
* Retrieve a registry bandwidth record
*
* @param cal
* @return
*/
@ -181,6 +185,7 @@ public class RegistryBandwidthService {
/**
* Retrieve a registry bandwidth record
*
* @param cal
* @return
*/
@ -194,6 +199,7 @@ public class RegistryBandwidthService {
/**
* Get time period key for some other determined time
*
* @param cal
* @return
*/
@ -213,6 +219,7 @@ public class RegistryBandwidthService {
/**
* conversion
*
* @param bytes
* @return
*/
@ -222,6 +229,7 @@ public class RegistryBandwidthService {
/**
* Add or update the record
*
* @param rbr
*/
public void addorUpdateRecord(RegistryBandwidthRecord rbr) {
@ -239,6 +247,7 @@ public class RegistryBandwidthService {
/**
* Records in the DB are kept by millis on a one day cycle
*
* @param current
* @return
*/

View file

@ -31,6 +31,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil;
* Jan 25, 2013 1528 djohnson Lower priority requests should not be able to unschedule higher priority requests.
* Jun 25, 2013 2106 djohnson Access bandwidth bucket contents through RetrievalPlan.
* Dec 17, 2013 2636 bgonzale When adding to buckets, call the constrained method.
* Feb 14, 2014 2636 mpduff Clean up logging.
* </pre>
*
* @version 1.0
@ -164,17 +165,17 @@ public class PriorityRetrievalScheduler implements IRetrievalScheduler {
}
private List<BandwidthAllocation> reprioritize(RetrievalPlan plan,
BandwidthAllocation request,
Long startKey, Long endKey) {
BandwidthAllocation request, Long startKey, Long endKey) {
statusHandler.info("Re-prioritizing necessary for BandwidthAllocation["
+ request + "]");
statusHandler
.debug("Re-prioritizing necessary for BandwidthAllocation: "
+ request);
// Look in the window between start and end times to see if there are
// lower priority
// retrievals that can be moved..
SortedSet<BandwidthBucket> window = plan
.getBucketsInWindow(startKey, endKey);
SortedSet<BandwidthBucket> window = plan.getBucketsInWindow(startKey,
endKey);
boolean enoughBandwidth = false;
long total = 0;

View file

@ -46,6 +46,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthDataSetUpdate;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthSubscription;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDao;
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalManager;
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalPlan;
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalStatus;
/**
@ -82,6 +83,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalStatus;
* because the calculate start and end time methods no longer use
* active period.
* Jan 29, 2014 2636 mpduff Scheduling refactor.
* Feb 11, 2014 2636 mpduff Change how retrieval times are calculated.
* </pre>
*
* @author djohnson
@ -120,10 +122,9 @@ public class BandwidthDaoUtil<T extends Time, C extends Coverage> {
* @return
*/
public SortedSet<Calendar> getRetrievalTimes(
Subscription<T, C> subscription, SortedSet<Integer> cycles,
Calendar start, Calendar end) {
Subscription<T, C> subscription, SortedSet<Integer> cycles) {
return getRetrievalTimes(subscription, cycles,
Sets.newTreeSet(Arrays.asList(0)), start, end);
Sets.newTreeSet(Arrays.asList(0)));
}
/**
@ -136,8 +137,7 @@ public class BandwidthDaoUtil<T extends Time, C extends Coverage> {
* @return the retrieval times
*/
public SortedSet<Calendar> getRetrievalTimes(
Subscription<T, C> subscription, int retrievalInterval,
Calendar start, Calendar end) {
Subscription<T, C> subscription, int retrievalInterval) {
// Add all hours of the days
final SortedSet<Integer> hours = Sets.newTreeSet();
for (int i = 0; i < TimeUtil.HOURS_PER_DAY; i++) {
@ -151,7 +151,7 @@ public class BandwidthDaoUtil<T extends Time, C extends Coverage> {
minutes.add(i);
}
return getRetrievalTimes(subscription, hours, minutes, start, end);
return getRetrievalTimes(subscription, hours, minutes);
}
/**
@ -164,74 +164,76 @@ public class BandwidthDaoUtil<T extends Time, C extends Coverage> {
* The set of hours
* @param minutes
* The set of minutes
* @param startTime
* The start time
* @param endTime
* The end time
* @return Set of retrieval times
*/
private SortedSet<Calendar> getRetrievalTimes(
Subscription<T, C> subscription, SortedSet<Integer> hours,
SortedSet<Integer> minutes, Calendar startTime, Calendar endTime) {
SortedSet<Integer> minutes) {
SortedSet<Calendar> subscriptionTimes = new TreeSet<Calendar>();
RetrievalPlan plan = retrievalManager.getPlan(subscription.getRoute());
Calendar planStart = plan.getPlanStart();
Calendar planEnd = plan.getPlanEnd();
// starting time when when subscription is first valid for scheduling
// based on plan start and subscription start.
Calendar subscriptionCalculatedStart = subscription
.calculateStart(startTime);
.calculateStart(planStart);
// end time when when subscription is last valid for scheduling based on
// plan end and subscription end.
Calendar subscriptionCalculatedEnd = subscription.calculateEnd(endTime);
Calendar subscriptionCalculatedEnd = subscription.calculateEnd(planEnd);
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
statusHandler.debug("**** PlanStart: " + startTime.getTime());
statusHandler.debug("**** PlanEnd : " + endTime.getTime());
statusHandler.debug("**** PlanStart: " + planStart.getTime());
statusHandler.debug("**** PlanEnd : " + planEnd.getTime());
statusHandler.debug("**** CalculatedStart: "
+ subscriptionCalculatedStart.getTime());
statusHandler.debug("**** CalculatedEnd : "
+ subscriptionCalculatedEnd.getTime());
}
// drop the start time by 6 hours to account for 4 cycle/day models
subscriptionCalculatedStart = TimeUtil.minCalendarFields(
subscriptionCalculatedStart, Calendar.MINUTE, Calendar.SECOND,
Calendar.MILLISECOND);
subscriptionCalculatedStart.add(Calendar.HOUR_OF_DAY, -6);
Calendar start = (Calendar) subscriptionCalculatedStart.clone();
outerloop: while (!start.after(subscriptionCalculatedEnd)) {
// drop the start time by 6 hours to account for 4 cycle/day models
subscriptionCalculatedStart.add(Calendar.HOUR_OF_DAY, -6);
Calendar start = TimeUtil.newGmtCalendar(subscriptionCalculatedStart
.getTime());
int availabilityOffset = 0;
try {
availabilityOffset = BandwidthUtil.getDataSetAvailablityOffset(
subscription, start);
} catch (RegistryHandlerException e) {
// Error occurred querying the registry. Log and continue on
statusHandler
.handle(Priority.PROBLEM,
"Unable to retrieve data availability offset, using 0 for the offset.",
e);
}
while (!start.after(subscriptionCalculatedEnd)) {
for (Integer cycle : hours) {
start.set(Calendar.HOUR_OF_DAY, cycle);
// start base equal-to-or-after subscriptionStart
if (start.compareTo(subscriptionCalculatedStart) >= 0) {
for (Integer minute : minutes) {
start.set(Calendar.MINUTE, minute);
Calendar retrievalTime = TimeUtil.newGmtCalendar();
retrievalTime.setTimeInMillis(start.getTimeInMillis());
retrievalTime.add(Calendar.MINUTE, availabilityOffset);
// start minutes equal-to-or-after subscriptionStart
if (start.compareTo(subscriptionCalculatedStart) >= 0) {
if (retrievalTime.after(planStart)
&& retrievalTime.before(planEnd)) {
// Check for nonsense
if (start.after(subscriptionCalculatedEnd)) {
break outerloop;
} else {
Calendar time = TimeUtil.newCalendar();
time.setTimeInMillis(start.getTimeInMillis());
/**
/*
* Fine grain check by hour and minute, for
* subscription(start/end),
* activePeriod(start/end)
**/
// Subscription Start and End time first
if (time.after(subscriptionCalculatedEnd)
|| time.before(start)
|| !subscription
.inActivePeriodWindow(time)) {
* subscription(start/end), activePeriod(start/end)
*/
if (!subscription.inActivePeriodWindow(retrievalTime)) {
// don't schedule this retrieval time,
// outside subscription window
continue;
}
subscriptionTimes.add(time);
}
}
subscriptionTimes.add(retrievalTime);
}
}
}