Issue #2636 Changed EdexBandwidthManager startup scheduling to use the
in-memory bandwidth manager to setup initial scheduling because it is much faster. Amend: updated log statement. fix for rescheduling of already scheduled BandwidthAllocations. Change-Id: I5c6d6543bce8222cb746fde2951caf6439db5fa3 Former-commit-id:9c50fb29fd
[formerly9c50fb29fd
[formerly f1a4a7b24290b6e85380668563793ac529fc5b31]] Former-commit-id:704e466f28
Former-commit-id:0b211e8036
This commit is contained in:
parent
ad709c9692
commit
104625b66d
5 changed files with 104 additions and 29 deletions
|
@ -142,6 +142,8 @@ import com.raytheon.uf.edex.registry.ebxml.exception.EbxmlRegistryException;
|
|||
* Jan 24, 2013 2709 bgonzale Before scheduling adhoc, check if in active period window.
|
||||
* Jan 29, 2014 2636 mpduff Scheduling refactor.
|
||||
* Jan 30, 2014 2686 dhladky refactor of retrieval.
|
||||
* Feb 06, 2014 2636 bgonzale fix overwrite of unscheduled subscription list. fix scheduling
|
||||
* of already scheduled BandwidthAllocations.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -496,6 +498,8 @@ public abstract class BandwidthManager<T extends Time, C extends Coverage>
|
|||
if (!fullSchedule) {
|
||||
if (!end.equals(this.previousRetrievalEndMap.get(network))) {
|
||||
start = this.previousRetrievalEndMap.get(network);
|
||||
} else {
|
||||
return unscheduled;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -516,25 +520,28 @@ public abstract class BandwidthManager<T extends Time, C extends Coverage>
|
|||
for (Subscription subscription : subMap.get(network)) {
|
||||
statusHandler.info("Scheduling subscription"
|
||||
+ subscription.getName());
|
||||
List<BandwidthAllocation> unscheduledForThisSub = new ArrayList<BandwidthAllocation>();
|
||||
final DataType dataSetType = subscription.getDataSetType();
|
||||
switch (dataSetType) {
|
||||
case GRID:
|
||||
unscheduled = handleGridded(subscription, start, end);
|
||||
unscheduledForThisSub = handleGridded(subscription,
|
||||
start, end);
|
||||
break;
|
||||
case POINT:
|
||||
unscheduled = handlePoint(subscription, start, end);
|
||||
unscheduledForThisSub = handlePoint(subscription,
|
||||
start, end);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException(
|
||||
"The BandwidthManager doesn't know how to treat subscriptions with data type ["
|
||||
+ dataSetType + "]!");
|
||||
}
|
||||
unscheduleSubscriptionsForAllocations(unscheduledForThisSub);
|
||||
unscheduled.addAll(unscheduledForThisSub);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
unscheduleSubscriptionsForAllocations(unscheduled);
|
||||
|
||||
return unscheduled;
|
||||
}
|
||||
|
||||
|
|
|
@ -44,6 +44,7 @@ import com.google.common.eventbus.Subscribe;
|
|||
import com.raytheon.edex.site.SiteUtil;
|
||||
import com.raytheon.uf.common.datadelivery.bandwidth.IBandwidthRequest;
|
||||
import com.raytheon.uf.common.datadelivery.bandwidth.IBandwidthRequest.RequestType;
|
||||
import com.raytheon.uf.common.datadelivery.bandwidth.ProposeScheduleResponse;
|
||||
import com.raytheon.uf.common.datadelivery.registry.AdhocSubscription;
|
||||
import com.raytheon.uf.common.datadelivery.registry.Coverage;
|
||||
import com.raytheon.uf.common.datadelivery.registry.DataDeliveryRegistryObjectTypes;
|
||||
|
@ -120,6 +121,9 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil;
|
|||
* Jan 24, 2013 2709 bgonzale Changed parameter to shouldScheduleForTime to a Calendar.
|
||||
* Jan 29, 2014 2636 mpduff Scheduling refactor.
|
||||
* Jan 30, 2014 2686 dhladky refactor of retrieval.
|
||||
* Feb 06, 2014 2636 bgonzale Added initializeScheduling method that uses the in-memory
|
||||
* bandwidth manager to perform the scheduling initialization
|
||||
* because of efficiency.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -185,11 +189,49 @@ public abstract class EdexBandwidthManager<T extends Time, C extends Coverage>
|
|||
|
||||
// schedule maintenance tasks
|
||||
scheduler = Executors.newScheduledThreadPool(1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> initializeScheduling(
|
||||
Map<Network, List<Subscription>> subMap)
|
||||
throws SerializationException {
|
||||
List<String> unscheduledNames = new ArrayList<String>(0);
|
||||
|
||||
try {
|
||||
for (Network key : subMap.keySet()) {
|
||||
List<Subscription<T, C>> subscriptions = new ArrayList<Subscription<T, C>>();
|
||||
// this loop is here only because of the generics mess
|
||||
for (Subscription s : subMap.get(key)) {
|
||||
subscriptions.add(s);
|
||||
}
|
||||
ProposeScheduleResponse response = proposeScheduleSubscriptions(subscriptions);
|
||||
Set<String> unscheduled = response
|
||||
.getUnscheduledSubscriptions();
|
||||
if (!unscheduled.isEmpty()) {
|
||||
// if proposed was unable to schedule some subscriptions it
|
||||
// will schedule nothing. schedule any that can be scheduled
|
||||
// here.
|
||||
List<Subscription<T, C>> subsToSchedule = new ArrayList<Subscription<T, C>>();
|
||||
for (Subscription<T, C> s : subscriptions) {
|
||||
if (!unscheduled.contains(s.getName())) {
|
||||
subsToSchedule.add(s);
|
||||
}
|
||||
}
|
||||
unscheduledNames
|
||||
.addAll(scheduleSubscriptions(subsToSchedule));
|
||||
} else {
|
||||
unscheduledNames.addAll(unscheduled);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
// TODO: Uncomment the last line in this comment block when fully
|
||||
// switched over to Java 1.7 and remove the finally block in shutdown,
|
||||
// switched over to Java 1.7 and remove the finally block in
|
||||
// shutdown,
|
||||
// that is also marked as TODO
|
||||
// This will allow the bandwidth manager to be garbage collected without
|
||||
// waiting for all of the delayed tasks to expire, currently they are
|
||||
// This will allow the bandwidth manager to be garbage collected
|
||||
// without
|
||||
// waiting for all of the delayed tasks to expire, currently they
|
||||
// are
|
||||
// manually removed in the shutdown method by casting to the
|
||||
// implementation and clearing the queue
|
||||
// scheduler.setRemoveOnCancelPolicy(true);
|
||||
|
@ -198,6 +240,9 @@ public abstract class EdexBandwidthManager<T extends Time, C extends Coverage>
|
|||
scheduler.scheduleAtFixedRate(new MaintenanceTask(), 5, 5,
|
||||
TimeUnit.MINUTES);
|
||||
}
|
||||
return unscheduledNames;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
|
|
|
@ -48,6 +48,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.interfaces.ISubscriptionAggre
|
|||
* Jan 06, 2014 2636 mpduff Update javadoc
|
||||
* Jan 08, 2014 2615 bgonzale Added scheduleAdoc method.
|
||||
* Jan 29, 2014 2636 mpduff Scheduling refactor.
|
||||
* Feb 06, 2014 2636 bgonzale added initializeScheduling method.
|
||||
* </pre>
|
||||
*
|
||||
* @author djohnson
|
||||
|
@ -126,4 +127,18 @@ public interface IBandwidthManager<T extends Time, C extends Coverage> {
|
|||
* @return the initializer
|
||||
*/
|
||||
BandwidthInitializer getInitializer();
|
||||
|
||||
/**
|
||||
* Called after a BandwidthManager has been created to initialize scheduling
|
||||
* with the given subscriptions in preparation for operation.
|
||||
*
|
||||
* @param subMap
|
||||
* map of subscriptions to initialize scheduling with
|
||||
* @throws SerializationException
|
||||
*
|
||||
* @Returns a list of the names of the subscriptions that were not
|
||||
* scheduled.
|
||||
*/
|
||||
List<String> initializeScheduling(Map<Network, List<Subscription>> subMap)
|
||||
throws SerializationException;
|
||||
}
|
|
@ -19,11 +19,14 @@
|
|||
**/
|
||||
package com.raytheon.uf.edex.datadelivery.bandwidth;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import com.raytheon.uf.common.datadelivery.bandwidth.ProposeScheduleResponse;
|
||||
import com.raytheon.uf.common.datadelivery.registry.Coverage;
|
||||
import com.raytheon.uf.common.datadelivery.registry.Network;
|
||||
import com.raytheon.uf.common.datadelivery.registry.Subscription;
|
||||
import com.raytheon.uf.common.datadelivery.registry.Time;
|
||||
import com.raytheon.uf.common.serialization.SerializationException;
|
||||
|
@ -54,6 +57,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthDaoUtil;
|
|||
* Jul 09, 2013 2106 djohnson Add shutdownInternal().
|
||||
* Oct 2, 2013 1797 dhladky Generics
|
||||
* Dec 04, 2013 2566 bgonzale use bandwidthmanager method to retrieve spring files.
|
||||
* Feb 06, 2014 2636 bgonzale added initializeScheduling method.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -166,4 +170,11 @@ class InMemoryBandwidthManager<T extends Time, C extends Coverage> extends Bandw
|
|||
// Nothing to do for in-memory version
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> initializeScheduling(
|
||||
Map<Network, List<Subscription>> subMap) {
|
||||
// Nothing to do for in-memory version
|
||||
return new ArrayList<String>(0);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,9 +1,7 @@
|
|||
package com.raytheon.uf.edex.datadelivery.bandwidth.hibernate;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import com.raytheon.edex.site.SiteUtil;
|
||||
import com.raytheon.uf.common.datadelivery.registry.Network;
|
||||
|
@ -12,7 +10,6 @@ import com.raytheon.uf.common.status.IUFStatusHandler;
|
|||
import com.raytheon.uf.common.status.UFStatus;
|
||||
import com.raytheon.uf.common.status.UFStatus.Priority;
|
||||
import com.raytheon.uf.edex.datadelivery.bandwidth.IBandwidthManager;
|
||||
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthAllocation;
|
||||
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDbInit;
|
||||
import com.raytheon.uf.edex.datadelivery.bandwidth.interfaces.BandwidthInitializer;
|
||||
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalManager;
|
||||
|
@ -38,6 +35,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalManager;
|
|||
* Nov 19, 2013 2545 bgonzale Removed programmatic customization for central, client, and dev(monolithic)
|
||||
* registries since the injected FindSubscription handler will be configured now.
|
||||
* Jan 29, 2014 2636 mpduff Scheduling refactor.
|
||||
* Feb 06, 2014 2636 bgonzale Use scheduling initialization method after registry init.
|
||||
* </pre>
|
||||
*
|
||||
* @author djohnson
|
||||
|
@ -100,18 +98,17 @@ public class HibernateBandwidthInitializer implements BandwidthInitializer {
|
|||
*/
|
||||
@Override
|
||||
public void executeAfterRegistryInit() {
|
||||
Set<Subscription> activeSubscriptions = new HashSet<Subscription>();
|
||||
Map<Network, Set<Subscription>> subMap = null;
|
||||
try {
|
||||
subMap = findSubscriptionsStrategy.findSubscriptionsToSchedule();
|
||||
Map<Network, List<Subscription>> subMap = findSubscriptionsStrategy
|
||||
.findSubscriptionsToSchedule();
|
||||
|
||||
List<BandwidthAllocation> unscheduled = instance.schedule(subMap,
|
||||
true);
|
||||
List<String> unscheduled = instance
|
||||
.initializeScheduling(subMap);
|
||||
|
||||
for (BandwidthAllocation allocation : unscheduled) {
|
||||
for (String subscription : unscheduled) {
|
||||
statusHandler.handle(Priority.PROBLEM,
|
||||
"The following bandwidth allocation is in an unscheduled state:\n "
|
||||
+ allocation);
|
||||
"The following subscription was not initially scheduled: "
|
||||
+ subscription);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
statusHandler.error(
|
||||
|
|
Loading…
Add table
Reference in a new issue