Issue #2636 Changed EdexBandwidthManager startup scheduling to use the

in-memory bandwidth manager to setup initial scheduling because it
is much faster.

Amend: updated log statement.
       fix for rescheduling of already scheduled BandwidthAllocations.

Change-Id: I5c6d6543bce8222cb746fde2951caf6439db5fa3

Former-commit-id: 704e466f28 [formerly f1a4a7b24290b6e85380668563793ac529fc5b31]
Former-commit-id: 9c50fb29fd
This commit is contained in:
Brad Gonzales 2014-02-06 16:44:55 -06:00
parent a26a150a69
commit 9529c1d191
5 changed files with 104 additions and 29 deletions

View file

@ -141,7 +141,9 @@ import com.raytheon.uf.edex.registry.ebxml.exception.EbxmlRegistryException;
* Jan 25, 2014 2636 mpduff Don't do an initial adhoc query for a new subscription.
* Jan 24, 2013 2709 bgonzale Before scheduling adhoc, check if in active period window.
* Jan 29, 2014 2636 mpduff Scheduling refactor.
* Jan 30, 2014 2686 dhladky refactor of retrieval.
* Jan 30, 2014 2686 dhladky refactor of retrieval.
* Feb 06, 2014 2636 bgonzale fix overwrite of unscheduled subscription list. fix scheduling
* of already scheduled BandwidthAllocations.
*
* </pre>
*
@ -496,6 +498,8 @@ public abstract class BandwidthManager<T extends Time, C extends Coverage>
if (!fullSchedule) {
if (!end.equals(this.previousRetrievalEndMap.get(network))) {
start = this.previousRetrievalEndMap.get(network);
} else {
return unscheduled;
}
}
@ -516,25 +520,28 @@ public abstract class BandwidthManager<T extends Time, C extends Coverage>
for (Subscription subscription : subMap.get(network)) {
statusHandler.info("Scheduling subscription"
+ subscription.getName());
List<BandwidthAllocation> unscheduledForThisSub = new ArrayList<BandwidthAllocation>();
final DataType dataSetType = subscription.getDataSetType();
switch (dataSetType) {
case GRID:
unscheduled = handleGridded(subscription, start, end);
unscheduledForThisSub = handleGridded(subscription,
start, end);
break;
case POINT:
unscheduled = handlePoint(subscription, start, end);
unscheduledForThisSub = handlePoint(subscription,
start, end);
break;
default:
throw new IllegalArgumentException(
"The BandwidthManager doesn't know how to treat subscriptions with data type ["
+ dataSetType + "]!");
}
unscheduleSubscriptionsForAllocations(unscheduledForThisSub);
unscheduled.addAll(unscheduledForThisSub);
}
}
}
unscheduleSubscriptionsForAllocations(unscheduled);
return unscheduled;
}

View file

@ -44,6 +44,7 @@ import com.google.common.eventbus.Subscribe;
import com.raytheon.edex.site.SiteUtil;
import com.raytheon.uf.common.datadelivery.bandwidth.IBandwidthRequest;
import com.raytheon.uf.common.datadelivery.bandwidth.IBandwidthRequest.RequestType;
import com.raytheon.uf.common.datadelivery.bandwidth.ProposeScheduleResponse;
import com.raytheon.uf.common.datadelivery.registry.AdhocSubscription;
import com.raytheon.uf.common.datadelivery.registry.Coverage;
import com.raytheon.uf.common.datadelivery.registry.DataDeliveryRegistryObjectTypes;
@ -119,7 +120,10 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil;
* Jan 20, 2013 2398 dhladky Fixed rescheduling beyond active period/expired window.
* Jan 24, 2013 2709 bgonzale Changed parameter to shouldScheduleForTime to a Calendar.
* Jan 29, 2014 2636 mpduff Scheduling refactor.
* Jan 30, 2014 2686 dhladky refactor of retrieval.
* Jan 30, 2014 2686 dhladky refactor of retrieval.
* Feb 06, 2014 2636 bgonzale Added initializeScheduling method that uses the in-memory
* bandwidth manager to perform the scheduling initialization
* because of efficiency.
*
* </pre>
*
@ -185,20 +189,61 @@ public abstract class EdexBandwidthManager<T extends Time, C extends Coverage>
// schedule maintenance tasks
scheduler = Executors.newScheduledThreadPool(1);
// TODO: Uncomment the last line in this comment block when fully
// switched over to Java 1.7 and remove the finally block in shutdown,
// that is also marked as TODO
// This will allow the bandwidth manager to be garbage collected without
// waiting for all of the delayed tasks to expire, currently they are
// manually removed in the shutdown method by casting to the
// implementation and clearing the queue
// scheduler.setRemoveOnCancelPolicy(true);
scheduler.scheduleAtFixedRate(watchForConfigFileChanges, 1, 1,
TimeUnit.MINUTES);
scheduler.scheduleAtFixedRate(new MaintenanceTask(), 5, 5,
TimeUnit.MINUTES);
}
@Override
public List<String> initializeScheduling(
Map<Network, List<Subscription>> subMap)
throws SerializationException {
List<String> unscheduledNames = new ArrayList<String>(0);
try {
for (Network key : subMap.keySet()) {
List<Subscription<T, C>> subscriptions = new ArrayList<Subscription<T, C>>();
// this loop is here only because of the generics mess
for (Subscription s : subMap.get(key)) {
subscriptions.add(s);
}
ProposeScheduleResponse response = proposeScheduleSubscriptions(subscriptions);
Set<String> unscheduled = response
.getUnscheduledSubscriptions();
if (!unscheduled.isEmpty()) {
// if proposed was unable to schedule some subscriptions it
// will schedule nothing. schedule any that can be scheduled
// here.
List<Subscription<T, C>> subsToSchedule = new ArrayList<Subscription<T, C>>();
for (Subscription<T, C> s : subscriptions) {
if (!unscheduled.contains(s.getName())) {
subsToSchedule.add(s);
}
}
unscheduledNames
.addAll(scheduleSubscriptions(subsToSchedule));
} else {
unscheduledNames.addAll(unscheduled);
}
}
} finally {
// TODO: Uncomment the last line in this comment block when fully
// switched over to Java 1.7 and remove the finally block in
// shutdown,
// that is also marked as TODO
// This will allow the bandwidth manager to be garbage collected
// without
// waiting for all of the delayed tasks to expire, currently they
// are
// manually removed in the shutdown method by casting to the
// implementation and clearing the queue
// scheduler.setRemoveOnCancelPolicy(true);
scheduler.scheduleAtFixedRate(watchForConfigFileChanges, 1, 1,
TimeUnit.MINUTES);
scheduler.scheduleAtFixedRate(new MaintenanceTask(), 5, 5,
TimeUnit.MINUTES);
}
return unscheduledNames;
}
/**
* {@inheritDoc}
*/

View file

@ -48,6 +48,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.interfaces.ISubscriptionAggre
* Jan 06, 2014 2636 mpduff Update javadoc
* Jan 08, 2014 2615 bgonzale Added scheduleAdoc method.
* Jan 29, 2014 2636 mpduff Scheduling refactor.
* Feb 06, 2014 2636 bgonzale added initializeScheduling method.
* </pre>
*
* @author djohnson
@ -126,4 +127,18 @@ public interface IBandwidthManager<T extends Time, C extends Coverage> {
* @return the initializer
*/
BandwidthInitializer getInitializer();
/**
* Called after a BandwidthManager has been created to initialize scheduling
* with the given subscriptions in preparation for operation.
*
* @param subMap
* map of subscriptions to initialize scheduling with
* @throws SerializationException
*
* @Returns a list of the names of the subscriptions that were not
* scheduled.
*/
List<String> initializeScheduling(Map<Network, List<Subscription>> subMap)
throws SerializationException;
}

View file

@ -19,11 +19,14 @@
**/
package com.raytheon.uf.edex.datadelivery.bandwidth;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.raytheon.uf.common.datadelivery.bandwidth.ProposeScheduleResponse;
import com.raytheon.uf.common.datadelivery.registry.Coverage;
import com.raytheon.uf.common.datadelivery.registry.Network;
import com.raytheon.uf.common.datadelivery.registry.Subscription;
import com.raytheon.uf.common.datadelivery.registry.Time;
import com.raytheon.uf.common.serialization.SerializationException;
@ -54,6 +57,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthDaoUtil;
* Jul 09, 2013 2106 djohnson Add shutdownInternal().
* Oct 2, 2013 1797 dhladky Generics
* Dec 04, 2013 2566 bgonzale use bandwidthmanager method to retrieve spring files.
* Feb 06, 2014 2636 bgonzale added initializeScheduling method.
*
* </pre>
*
@ -166,4 +170,11 @@ class InMemoryBandwidthManager<T extends Time, C extends Coverage> extends Bandw
// Nothing to do for in-memory version
}
@Override
public List<String> initializeScheduling(
Map<Network, List<Subscription>> subMap) {
// Nothing to do for in-memory version
return new ArrayList<String>(0);
}
}

View file

@ -1,9 +1,7 @@
package com.raytheon.uf.edex.datadelivery.bandwidth.hibernate;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.raytheon.edex.site.SiteUtil;
import com.raytheon.uf.common.datadelivery.registry.Network;
@ -12,7 +10,6 @@ import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.edex.datadelivery.bandwidth.IBandwidthManager;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthAllocation;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDbInit;
import com.raytheon.uf.edex.datadelivery.bandwidth.interfaces.BandwidthInitializer;
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalManager;
@ -38,6 +35,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalManager;
* Nov 19, 2013 2545 bgonzale Removed programmatic customization for central, client, and dev(monolithic)
* registries since the injected FindSubscription handler will be configured now.
* Jan 29, 2014 2636 mpduff Scheduling refactor.
* Feb 06, 2014 2636 bgonzale Use scheduling initialization method after registry init.
* </pre>
*
* @author djohnson
@ -100,18 +98,17 @@ public class HibernateBandwidthInitializer implements BandwidthInitializer {
*/
@Override
public void executeAfterRegistryInit() {
Set<Subscription> activeSubscriptions = new HashSet<Subscription>();
Map<Network, Set<Subscription>> subMap = null;
try {
subMap = findSubscriptionsStrategy.findSubscriptionsToSchedule();
Map<Network, List<Subscription>> subMap = findSubscriptionsStrategy
.findSubscriptionsToSchedule();
List<BandwidthAllocation> unscheduled = instance.schedule(subMap,
true);
List<String> unscheduled = instance
.initializeScheduling(subMap);
for (BandwidthAllocation allocation : unscheduled) {
for (String subscription : unscheduled) {
statusHandler.handle(Priority.PROBLEM,
"The following bandwidth allocation is in an unscheduled state:\n "
+ allocation);
"The following subscription was not initially scheduled: "
+ subscription);
}
} catch (Exception e) {
statusHandler.error(