Issue #2106 Speed up Bandwidth Manager subscription scheduling

Change-Id: Iafceeb64a2b56c562ca737ba083ec3225524f7ba

Former-commit-id: 693bfd77ddc216e658f6e1e4a38f48a2df1fe3bf
This commit is contained in:
Dustin Johnson 2013-06-13 10:54:08 -05:00
parent dbe58fa63d
commit 8500856bc1
20 changed files with 762 additions and 196 deletions

View file

@ -174,6 +174,9 @@ public class PointSubsetManagerDlg extends
newTime.setStartDate(new Date());
newTime.setEndDate(new Date());
sub.setTime(newTime);
// Setting arbitrary size until mpduff and dhladky figure out their data
// size estimates
sub.setDataSetSize(50);
Coverage cov = new Coverage();

View file

@ -58,6 +58,7 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
* Mar 29, 2013 1841 djohnson Renamed to UserSubscription.
* May 15, 2013 1040 mpduff Added addOfficeId.
* May 21, 2013 2020 mpduff Rename UserSubscription to SiteSubscription.
* Jun 13, 2013 2095 djohnson Duplicate 13.5.1 change so bandwidth manager deletes subscriptions correctly.
*
* </pre>
*
@ -103,6 +104,7 @@ public class SiteSubscription extends RecurringSubscription {
public SiteSubscription(SiteSubscription sub) {
super(sub);
this.setOwner(sub.getOwner());
this.setId(RegistryUtil.getRegistryObjectKey(this));
}
@XmlAttribute

View file

@ -17,6 +17,7 @@ import com.raytheon.uf.common.time.domain.api.ITimePoint;
* ------------ ---------- ----------- --------------------------
* Aug 16, 2012 0743 djohnson Initial creation
* Jan 14, 2013 1286 djohnson Use time domain API.
* Jun 14, 2013 2095 djohnson Change access level on instance variables to protected.
*
* </pre>
*
@ -24,11 +25,11 @@ import com.raytheon.uf.common.time.domain.api.ITimePoint;
* @version 1.0
*/
abstract class AbstractTimer implements ITimer {
private ITimePoint start;
protected ITimePoint start;
private ITimePoint stop;
protected ITimePoint stop;
private IDuration elapsedTime = Durations.ZERO;
protected IDuration elapsedTime = Durations.ZERO;
/**
* {@inheritDoc}

View file

@ -0,0 +1,93 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.common.time.util;
import com.raytheon.uf.common.status.IPerformanceStatusHandler;
import com.raytheon.uf.common.time.domain.api.IDuration;
/**
* Extends {@link ITimer} to add some performance timing characteristics
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Jun 14, 2013 2095 djohnson Initial creation
*
* </pre>
*
* @author djohnson
* @version 1.0
*/
public interface IPerformanceTimer extends ITimer {
/**
* Marks a lap on the timer. Each invocation will return the time since
* either the timer was started, or the last time {@link #lap()} was called.
* By default will use a name of "lap-1, lap-2, ...". Lap names can be
* customized by using the {@link #lap(String)} version.
*
* @return the duration of the lap
*/
IDuration lap();
/**
* Marks a lap on the timer. Each invocation will return the time since
* either the timer was started, or the last time {@link #lap()} was called.
*
* @param lapName
* the name to associate with the lap
* @return the duration of the lap
*/
IDuration lap(String lapName);
/**
* Convenience method to get the time of a lap in milliseconds.
*
* @return the duration of the lap in milliseconds
* @see {@link #lap()}
*/
long lapMillis();
/**
* Convenience method to get the time of a lap in milliseconds.
*
* @param lapName
* the name to associate with the lap
* @return the duration of the lap in milliseconds
* @see {@link #lap()}
*/
long lapMillis(String lapName);
/**
* Logs laps to the {@link IPerformanceStatusHandler}.
*
* @param prefix
* the base prefix which should denote the method or some other
* unique identifier
*
* @param statusHandler
* the statusHandler
*/
void logLaps(String prefix, IPerformanceStatusHandler statusHandler);
}

View file

@ -0,0 +1,123 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.common.time.util;
import java.util.LinkedHashMap;
import java.util.Map.Entry;
import com.raytheon.uf.common.status.IPerformanceStatusHandler;
import com.raytheon.uf.common.time.domain.Durations;
import com.raytheon.uf.common.time.domain.TimePoints;
import com.raytheon.uf.common.time.domain.api.IDuration;
import com.raytheon.uf.common.time.domain.api.ITimePoint;
/**
* Base implementation of {@link IPerformanceTimer}. Intentionally
* package-private, all access should be through the public API method of
* {@link TimeUtil#getPerformanceTimer()}.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Jun 14, 2013 2095 djohnson Initial creation
*
* </pre>
*
* @author djohnson
* @version 1.0
*/
// @NotThreadSafe
class PerformanceTimerImpl extends AbstractTimer implements IPerformanceTimer {
private ITimePoint lastLapTime;
private final LinkedHashMap<String, IDuration> laps = new LinkedHashMap<String, IDuration>(
5);
PerformanceTimerImpl() {
}
/**
* {@inheritDoc}
*/
@Override
public IDuration lap() {
return lap("lap-" + (laps.size() + 1));
}
/**
* {@inheritDoc}
*/
@Override
public IDuration lap(String lapName) {
ITimePoint startOrLastLapTime = (lastLapTime == null) ? start
: lastLapTime;
// The new last lap time is the current time
lastLapTime = getCurrentTime();
IDuration currentLap = Durations.between(startOrLastLapTime,
lastLapTime);
laps.put(lapName, currentLap);
return currentLap;
}
/**
* {@inheritDoc}
*/
@Override
public long lapMillis() {
return lap().getMillis();
}
/**
* {@inheritDoc}
*/
@Override
public long lapMillis(String lapName) {
return lap(lapName).getMillis();
}
/**
* {@inheritDoc}
*/
@Override
public void logLaps(String prefix, IPerformanceStatusHandler statusHandler) {
StringBuilder sb = new StringBuilder();
sb.append(prefix);
sb.append(" total [").append(getElapsedTime()).append(" ms]");
for (Entry<String, IDuration> entry : laps.entrySet()) {
sb.append(" ").append(entry.getKey()).append(" [")
.append(entry.getValue().getMillis()).append(" ms]");
}
statusHandler.log(sb.toString());
}
/**
* {@inheritDoc}
*/
@Override
protected ITimePoint getCurrentTime() {
// Always uses the wall clock
return TimePoints.fromMillis(System.currentTimeMillis());
}
}

View file

@ -278,6 +278,17 @@ public final class TimeUtil {
return new TimerImpl();
}
/**
* Retrieve a {@link ITimer} that allows the demarcation of arbitrary start
* and stop times. This version will always use the actual system time. It
* also provides lapping functionality to keep track of multiple durations.
*
* @return a {@link ITimer}
*/
public static IPerformanceTimer getPerformanceTimer() {
return new PerformanceTimerImpl();
}
/**
* Check whether the time represented by a {@link Date} is a new day
* compared to another {@link Date} object.

View file

@ -50,9 +50,12 @@ import com.raytheon.uf.common.registry.event.RemoveRegistryEvent;
import com.raytheon.uf.common.registry.handler.IRegistryObjectHandler;
import com.raytheon.uf.common.registry.handler.RegistryHandlerException;
import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.common.status.IPerformanceStatusHandler;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.PerformanceStatus;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.time.util.IPerformanceTimer;
import com.raytheon.uf.common.time.util.ITimer;
import com.raytheon.uf.common.time.util.TimeUtil;
import com.raytheon.uf.common.util.CollectionUtil;
@ -115,6 +118,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil;
* May 02, 2013 1910 djohnson Shutdown proposed bandwidth managers in a finally.
* May 20, 2013 1650 djohnson Add in capability to find required dataset size.
* Jun 03, 2013 2038 djohnson Add base functionality to handle point data type subscriptions.
* Jun 13, 2013 2095 djohnson Improve bandwidth manager speed, and add performance logging.
* </pre>
*
* @author dhladky
@ -142,6 +146,11 @@ public abstract class BandwidthManager extends
private final IBandwidthDbInit dbInit;
// Instance variable and not static, because there are multiple child
// implementation classes which should each have a unique prefix
private final IPerformanceStatusHandler performanceHandler = PerformanceStatus
.getHandler(this.getClass().getSimpleName());
@VisibleForTesting
final RetrievalManager retrievalManager;
@ -265,8 +274,7 @@ public abstract class BandwidthManager extends
*/
@Subscribe
public void updateGriddedDataSetMetaData(
GriddedDataSetMetaData dataSetMetaData)
throws ParseException {
GriddedDataSetMetaData dataSetMetaData) throws ParseException {
// Daily/Hourly/Monthly datasets
if (dataSetMetaData.getCycle() == GriddedDataSetMetaData.NO_CYCLE) {
updateDataSetMetaDataWithoutCycle(dataSetMetaData);
@ -542,6 +550,8 @@ public abstract class BandwidthManager extends
*/
private List<BandwidthAllocation> scheduleSubscriptionForRetrievalTimes(
Subscription subscription, SortedSet<Calendar> retrievalTimes) {
IPerformanceTimer timer = TimeUtil.getPerformanceTimer();
timer.start();
if (retrievalTimes.isEmpty()) {
return Collections.emptyList();
@ -549,14 +559,11 @@ public abstract class BandwidthManager extends
List<BandwidthAllocation> unscheduled = Lists.newArrayList();
final int numberOfRetrievalTimes = retrievalTimes.size();
List<BandwidthSubscription> newSubscriptions = Lists
.newArrayListWithCapacity(numberOfRetrievalTimes);
for (Calendar retrievalTime : retrievalTimes) {
// Retrieve all the current subscriptions by provider, dataset name
// and base time.
List<BandwidthSubscription> subscriptions = bandwidthDao
.getBandwidthSubscriptions(subscription.getProvider(),
subscription.getDataSetName(), retrievalTime);
statusHandler.info("schedule() - Scheduling subscription ["
+ subscription.getName()
+ String.format(
@ -566,17 +573,28 @@ public abstract class BandwidthManager extends
// Add the current subscription to the ones BandwidthManager already
// knows about.
try {
subscriptions.add(bandwidthDao.newBandwidthSubscription(
subscription, retrievalTime));
newSubscriptions.add(BandwidthUtil
.getSubscriptionDaoForSubscription(subscription,
retrievalTime));
} catch (SerializationException e) {
statusHandler.error(
"Trapped Exception trying to schedule Subscription["
+ subscription.getName() + "]", e);
return null;
}
unscheduled.addAll(aggregate(subscriptions));
}
timer.lap("createBandwidthSubscriptions");
bandwidthDao.storeBandwidthSubscriptions(newSubscriptions);
timer.lap("storeBandwidthSubscriptions");
unscheduled.addAll(aggregate(newSubscriptions));
timer.lap("aggregate");
timer.stop();
timer.logLaps("scheduleSubscriptionForRetrievalTimes() subscription ["
+ subscription.getName() + "] retrievalTimes ["
+ retrievalTimes.size() + "]", performanceHandler);
return unscheduled;
}
@ -608,15 +626,18 @@ public abstract class BandwidthManager extends
*/
private List<BandwidthAllocation> aggregate(
List<BandwidthSubscription> bandwidthSubscriptions) {
IPerformanceTimer timer = TimeUtil.getPerformanceTimer();
timer.start();
List<SubscriptionRetrieval> retrievals = getAggregator().aggregate(
bandwidthSubscriptions);
timer.lap("aggregator");
// Create a separate list of BandwidthReservations to schedule
// as the aggregation process may return all subsumed
// SubscriptionRetrievals
// for the specified Subscription.
List<BandwidthAllocation> reservations = new ArrayList<BandwidthAllocation>();
List<SubscriptionRetrieval> reservations = new ArrayList<SubscriptionRetrieval>();
for (SubscriptionRetrieval retrieval : retrievals) {
@ -667,6 +688,16 @@ public abstract class BandwidthManager extends
endTime.add(Calendar.MINUTE, maxLatency);
retrieval.setEndTime(endTime);
// Add SubscriptionRetrieval to the list to schedule..
reservations.add(retrieval);
}
}
timer.lap("creating retrievals");
for (SubscriptionRetrieval retrieval : reservations) {
BandwidthSubscription bandwidthSubscription = retrieval
.getBandwidthSubscription();
if (bandwidthSubscription.isCheckForDataSetUpdate()) {
// Check to see if the data subscribed to is available..
// if so, mark the status of the BandwidthReservation as
// READY.
@ -678,19 +709,25 @@ public abstract class BandwidthManager extends
if (!z.isEmpty()) {
retrieval.setStatus(RetrievalStatus.READY);
}
bandwidthDao.store(retrieval);
}
}
timer.lap("checking if ready");
// Add SubscriptionRetrieval to the list to schedule..
reservations.add(retrieval);
}
}
bandwidthDao.store(reservations);
timer.lap("storing retrievals");
if (reservations.isEmpty()) {
return Collections.emptyList();
} else {
// Schedule the Retrievals
return retrievalManager.schedule(reservations);
}
List<BandwidthAllocation> unscheduled = (reservations.isEmpty()) ? Collections
.<BandwidthAllocation> emptyList() : retrievalManager
.schedule(reservations);
timer.lap("scheduling retrievals");
timer.stop();
final int numberOfBandwidthSubscriptions = bandwidthSubscriptions
.size();
timer.logLaps("aggregate() bandwidthSubscriptions ["
+ numberOfBandwidthSubscriptions + "]", performanceHandler);
return unscheduled;
}
/**
@ -1348,6 +1385,7 @@ public abstract class BandwidthManager extends
.getName());
}
}
return unscheduledSubscriptions;
}

View file

@ -21,6 +21,7 @@ package com.raytheon.uf.edex.datadelivery.bandwidth;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.Comparator;
import java.util.Date;
import java.util.Iterator;
@ -61,6 +62,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil;
* Oct 24, 2012 1286 djohnson Initial creation
* Dec 12, 2012 1286 djohnson Use concurrent lists to avoid concurrent modification exceptions.
* Jun 03, 2013 2038 djohnson Add method to get subscription retrievals by provider, dataset, and status.
* Jun 13, 2013 2095 djohnson Implement ability to store a collection of subscriptions.
*
* </pre>
*
@ -481,6 +483,17 @@ class InMemoryBandwidthDao implements IBandwidthDao {
replaceOldOrAddToCollection(bandwidthSubscriptions, subscriptionDao);
}
/**
* {@inheritDoc}
*/
@Override
public void storeBandwidthSubscriptions(
Collection<BandwidthSubscription> newSubscriptions) {
for (BandwidthSubscription newSubscription : newSubscriptions) {
store(newSubscription);
}
}
/**
* @return
*/

View file

@ -39,6 +39,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil;
* ------------ ---------- ----------- --------------------------
* Oct 16, 2012 0726 djohnson Added SW history, added length to subscription.
* Nov 09, 2012 1286 djohnson Add convenience methods for retrieving the subscription.
* Jun 13, 2013 2095 djohnson Add flag for whether or not data set update should be looked for on aggregating.
*
* </pre>
*
@ -104,6 +105,10 @@ public class BandwidthSubscription extends PersistableDataObject<Long> implement
@Column(nullable = false)
private String registryId;
@DynamicSerializeElement
@Column(nullable = false)
private boolean checkForDataSetUpdate;
@DynamicSerializeElement
@Column(nullable = false, length = 100000)
private byte[] subSubscription;
@ -301,6 +306,21 @@ public class BandwidthSubscription extends PersistableDataObject<Long> implement
return registryId;
}
/**
* @return the checkForDataSetUpdate
*/
public boolean isCheckForDataSetUpdate() {
return checkForDataSetUpdate;
}
/**
* @param checkForDataSetUpdate
* the checkForDataSetUpdate to set
*/
public void setCheckForDataSetUpdate(boolean checkForDataSetUpdate) {
this.checkForDataSetUpdate = checkForDataSetUpdate;
}
/**
* Added only to comply with DynamicSerialize, use
* {@link #getSubscription()} instead.

View file

@ -20,6 +20,7 @@
package com.raytheon.uf.edex.datadelivery.bandwidth.dao;
import java.util.Calendar;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.SortedSet;
@ -45,6 +46,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalStatus;
* ------------ ---------- ----------- --------------------------
* Oct 23, 2012 1286 djohnson Initial creation
* Jun 03, 2013 2038 djohnson Add method to get subscription retrievals by provider, dataset, and status.
* Jun 13, 2013 2095 djohnson Implement ability to store a collection of subscriptions.
*
* </pre>
*
@ -371,6 +373,16 @@ public interface IBandwidthDao {
*/
void store(BandwidthSubscription subscriptionDao);
/**
* Persist a {@link Collection} of {@link BandwidthSubscription}s to the
* database.
*
* @param newSubscriptions
* the subscriptions to persist
*/
void storeBandwidthSubscriptions(
Collection<BandwidthSubscription> newSubscriptions);
/**
* Update a BandwidthAllocation in the database.
*

View file

@ -20,6 +20,7 @@
package com.raytheon.uf.edex.datadelivery.bandwidth.hibernate;
import java.util.Calendar;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.SortedSet;
@ -55,6 +56,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil;
* Feb 11, 2013 1543 djohnson Use Spring transactions.
* Feb 13, 2013 1543 djohnson Converted into a service, created new DAOs as required.
* Jun 03, 2013 2038 djohnson Add method to get subscription retrievals by provider, dataset, and status.
* Jun 13, 2013 2095 djohnson Implement ability to store a collection of subscriptions.
*
* </pre>
*
@ -320,11 +322,19 @@ public class HibernateBandwidthDao implements IBandwidthDao {
* {@inheritDoc}
*/
@Override
@Transactional
public void store(BandwidthSubscription subscriptionDao) {
bandwidthSubscriptionDao.create(subscriptionDao);
}
/**
* {@inheritDoc}
*/
@Override
public void storeBandwidthSubscriptions(
Collection<BandwidthSubscription> newSubscriptions) {
bandwidthSubscriptionDao.persistAll(newSubscriptions);
}
/**
* {@inheritDoc}
*/

View file

@ -8,9 +8,9 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.dao.SubscriptionRetrieval;
/**
*
* Interface for Subscription aggregation and RetrievalRequest generation. Each
* implementation of this interface will examine the List of BandwidthSubscription
* Objects provided and evaluate how to combine and/or subset those
* Subscriptions into SubscriptionRetrieval Objects.
* implementation of this interface will examine the List of
* BandwidthSubscription Objects provided and evaluate how to combine and/or
* subset those Subscriptions into SubscriptionRetrieval Objects.
*
* <pre>
*
@ -20,6 +20,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.dao.SubscriptionRetrieval;
* ------------ ---------- ----------- --------------------------
* Jul 18, 2012 726 jspinks Initial creation
* Nov 09, 2012 1286 djohnson Renamed to comply with AWIPS standards.
* Jun 13, 2013 2095 djohnson Aggregator only receives the newly created bandwidth subscriptions now.
*
* </pre>
*
@ -32,13 +33,13 @@ public interface ISubscriptionAggregator {
* Generate a List of SubscriptionRetrieval Object for the provided
* BandwidthSubscription Objects.
*
* @param subscriptions
* A List of BandwidthSubscription Objects to examine for retrieval.
* @param newSubscriptions
* A List of BandwidthSubscription Objects which were just added
*
* @return The SubscriptionRetrieval Objects used to fulfill the
* BandwidthSubscription Objects provided.
*/
List<SubscriptionRetrieval> aggregate(List<BandwidthSubscription> subscriptions);
List<SubscriptionRetrieval> aggregate(List<BandwidthSubscription> newSubscriptions);
/**
* This method is called once all the SubscriptionRetrievals for a

View file

@ -8,8 +8,8 @@ import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDao;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthSubscription;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDao;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.SubscriptionRetrieval;
import com.raytheon.uf.edex.datadelivery.bandwidth.interfaces.ISubscriptionAggregator;
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalStatus;
@ -31,6 +31,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil;
* Jul 18, 2012 726 jspinks Initial creation
* Nov 09, 2012 1286 djohnson Rename interface to comply with standards.
* Nov 20, 2012 1286 djohnson Change some logging to debug.
* Jun 13, 2013 2095 djohnson No need to query the database, we are only receiving new bandwidth subscriptions.
*
* </pre>
*
@ -50,7 +51,7 @@ public class SimpleSubscriptionAggregator implements ISubscriptionAggregator {
@Override
public List<SubscriptionRetrieval> aggregate(
List<BandwidthSubscription> subscriptions) {
List<BandwidthSubscription> newSubscriptions) {
List<SubscriptionRetrieval> subscriptionRetrievals = new ArrayList<SubscriptionRetrieval>();
@ -58,14 +59,11 @@ public class SimpleSubscriptionAggregator implements ISubscriptionAggregator {
// necessary retrievals without regards to 'sharing' retrievals across
// subscriptions.
for (BandwidthSubscription subDao : subscriptions) {
for (BandwidthSubscription subDao : newSubscriptions) {
List<SubscriptionRetrieval> t = bandwidthDao
.querySubscriptionRetrievals(subDao.getIdentifier());
// First check to see if the Object already was scheduled
// (i.e. has SubscriptionRetrievals associated with it) if
// not, create a SubscriptionRetrieval for the subscription
if (t.size() == 0) {
try {
SubscriptionRetrieval subscriptionRetrieval = new SubscriptionRetrieval();
@ -97,7 +95,6 @@ public class SimpleSubscriptionAggregator implements ISubscriptionAggregator {
+ subDao.getIdentifier()
+ "]: Subscription will not be scheduled.");
}
}
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
statusHandler

View file

@ -35,6 +35,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.RetrievalManagerNotifyEvent;
* Feb 14, 2013 1596 djohnson Warn log when unable to find a SubscriptionRetrieval.
* 3/18/2013 1802 bphillip Event bus registration is now a post-construct operation to ensure proxy is registered with bus
* 3/13/2013 1802 bphillip Moved event bus registration from post-construct to spring static method call
* Jun 13, 2013 2095 djohnson Can schedule any subclass of BandwidthAllocation.
*
* </pre>
*
@ -80,8 +81,8 @@ public class RetrievalManager {
* @return the list of {@link BandwidthAllocation}s that were unable to be
* scheduled
*/
public List<BandwidthAllocation> schedule(
List<BandwidthAllocation> bandwidthAllocations) {
public <T extends BandwidthAllocation> List<BandwidthAllocation> schedule(
List<T> bandwidthAllocations) {
List<BandwidthAllocation> unscheduled = new ArrayList<BandwidthAllocation>();
for (BandwidthAllocation bandwidthAllocation : bandwidthAllocations) {

View file

@ -5,6 +5,7 @@ import java.util.Calendar;
import java.util.Date;
import com.raytheon.uf.common.datadelivery.registry.DataSetMetaData;
import com.raytheon.uf.common.datadelivery.registry.DataType;
import com.raytheon.uf.common.datadelivery.registry.Subscription;
import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.common.serialization.SerializationUtil;
@ -26,6 +27,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthSubscription;
* Nov 09, 2012 1286 djohnson Separate DAO utility methods from general utility.
* Dec 11, 2012 1403 djohnson No longer valid to run without bandwidth management.
* Feb 14, 2013 1595 djohnson Use subscription rescheduling strategy.
* Jun 13, 2013 2095 djohnson Point subscriptions don't check for dataset updates on aggregation.
*
* </pre>
*
@ -215,12 +217,10 @@ public class BandwidthUtil {
dao.setSubscription(subscription);
dao.setRoute(subscription.getRoute());
dao.setBaseReferenceTime(baseReferenceTime);
// TODO: This is grid specific and only works for gridded times.
// will have to revisit when other data type are introduced.
// perhaps minute of the day?
dao.setCycle(baseReferenceTime.get(Calendar.HOUR_OF_DAY));
dao.setPriority(subscription.getPriority().getPriorityValue());
dao.setRegistryId(subscription.getId());
dao.setCheckForDataSetUpdate(subscription.getDataSetType() != DataType.POINT);
return dao;
}

View file

@ -40,9 +40,12 @@ import org.springframework.test.annotation.DirtiesContext.ClassMode;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.raytheon.uf.common.datadelivery.registry.DataType;
import com.raytheon.uf.common.datadelivery.registry.Network;
import com.raytheon.uf.common.datadelivery.registry.PointTime;
import com.raytheon.uf.common.datadelivery.registry.SiteSubscription;
import com.raytheon.uf.common.datadelivery.registry.SiteSubscriptionFixture;
import com.raytheon.uf.common.datadelivery.registry.Subscription;
import com.raytheon.uf.common.localization.PathManagerFactoryTest;
import com.raytheon.uf.common.time.util.TimeUtil;
import com.raytheon.uf.common.time.util.TimeUtilTest;
@ -70,6 +73,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil;
* Feb 20, 2013 1543 djohnson Delegate to sub-classes for which route to create subscriptions for.
* Mar 28, 2013 1841 djohnson Subscription is now UserSubscription.
* Apr 29, 2013 1910 djohnson Always shutdown bandwidth managers in tests.
* Jun 03, 2013 2095 djohnson Move getPointDataSet in from subclass.
*
* </pre>
*
@ -228,6 +232,24 @@ public abstract class AbstractBandwidthManagerIntTest {
return subscription;
}
/**
* Get a point data subscription with the given retrieval interval.
*
* @param retrievalInterval
* the retrieval interval
* @return
*/
protected Subscription getPointDataSubscription(int retrievalInterval) {
final PointTime pointTime = new PointTime();
pointTime.setInterval(retrievalInterval);
Subscription subscription = SiteSubscriptionFixture.INSTANCE.get();
subscription.setTime(pointTime);
subscription.setDataSetType(DataType.POINT);
subscription.setLatencyInMinutes(retrievalInterval);
return subscription;
}
/**
* Retrieve the {@link Network} that subscriptions should be created for.
*

View file

@ -50,7 +50,6 @@ import java.util.concurrent.CountDownLatch;
import org.junit.Test;
import com.raytheon.uf.common.datadelivery.registry.DataDeliveryRegistryObjectTypes;
import com.raytheon.uf.common.datadelivery.registry.DataType;
import com.raytheon.uf.common.datadelivery.registry.GriddedDataSetMetaData;
import com.raytheon.uf.common.datadelivery.registry.Network;
import com.raytheon.uf.common.datadelivery.registry.OpenDapGriddedDataSetMetaData;
@ -105,6 +104,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.RetrievalManagerNotifyEvent;
* Mar 28, 2013 1841 djohnson Subscription is now UserSubscription.
* Apr 29, 2013 1910 djohnson Always shutdown bandwidth managers in tests.
* Jun 03, 2013 2038 djohnson Add support for point data based subscriptions.
* Jun 03, 2013 2095 djohnson Move getPointDataSet to superclass.
*
* </pre>
*
@ -1111,22 +1111,4 @@ public class BandwidthManagerIntTest extends AbstractWfoBandwidthManagerIntTest
return Network.OPSNET;
}
/**
* Get a point data subscription with the given retrieval interval.
*
* @param retrievalInterval
* the retrieval interval
* @return
*/
protected Subscription getPointDataSubscription(int retrievalInterval) {
final PointTime pointTime = new PointTime();
pointTime.setInterval(retrievalInterval);
Subscription subscription = SiteSubscriptionFixture.INSTANCE.get();
subscription.setTime(pointTime);
subscription.setDataSetType(DataType.POINT);
subscription.setLatencyInMinutes(retrievalInterval);
return subscription;
}
}

View file

@ -0,0 +1,176 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.common.time.util;
import static org.junit.Assert.assertEquals;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
/**
* Genericized and extracted from {@link TimerImplTest}.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Jun 14, 2013 2095 djohnson Initial creation
*
* </pre>
*
* @author djohnson
* @version 1.0
*/
@Ignore
public abstract class AbstractTimerTest<TIMER extends ITimer> {
protected TIMER timer = getTimer();
@Before
public void setUp() {
TimeUtilTest.installMockTimes(new long[] { 100L, 200L });
}
@After
public void cleanUp() {
TimeUtilTest.resumeTime();
}
@Test(expected = IllegalStateException.class)
public void testTimerImplCantBeStoppedIfHasntBeenStarted() {
timer.stop();
}
@Test
public void testRestartedTimerWillNotUseIntermediateTime() {
TimeUtilTest.installMockTimes(new long[] { 50L, 75L, 150L, 250L });
// Starts at 50L
timer.start();
// Stops at 75L: 25L elapsed time
timer.stop();
// Restart at 150L: still 25L elapsed time
timer.start();
// Stops again at 250L: 100L more elapsed time
timer.stop();
// 100L + 25L
assertEquals(
"The intermediate time the timer was stopped should not have counted toward the elapsed time!",
125L, timer.getElapsedTime());
}
@Test
public void testTimerImplReturnsElapsedTime() throws InterruptedException {
timer.start();
timer.stop();
assertEquals("Invalid elapsed time returned!", 100L,
timer.getElapsedTime());
}
@Test
public void testResetWillAllowTimerToBeReused() {
// The first difference will be 75-50 = 25L
// The second difference will be 250-150 = 100L
TimeUtilTest.installMockTimes(new long[] { 50L, 75L, 150L, 250L });
timer.start();
timer.stop();
assertEquals("Incorrect elapsed time returned!", 25L,
timer.getElapsedTime());
timer.reset();
timer.start();
timer.stop();
assertEquals("Incorrect elapsed time returned!", 100L,
timer.getElapsedTime());
}
@Test
public void testResetDoesNotUsePreviousElapsedTime() {
TimeUtilTest.installMockTimes(new long[] { 50L, 75L, 150L, 250L });
timer.start();
timer.stop();
assertEquals("Incorrect elapsed time returned!", 25L,
timer.getElapsedTime());
timer.reset();
assertEquals("A reset timer should not have elapsed time!", 0,
timer.getElapsedTime());
}
@Test(expected = IllegalStateException.class)
public void testStartBeingCalledTwiceThrowsException() {
timer.start();
timer.start();
}
@Test
public void testStoppingATimerTwiceDoesNotChangeStopTime() {
TimeUtilTest.installMockTimes(new long[] { 100L, 200L, 300L });
timer.start();
timer.stop();
// Elapsed time should still be 100L since the stop time should be stuck
// at 200L
timer.stop();
assertEquals(
"Expected the stop time to not have changed after the second invocation!",
100L, timer.getElapsedTime());
}
@Test
public void testGetElapsedTimeCanBeCalledOnARunningTimer() {
TimeUtilTest.installMockTimes(new long[] { 100L, 200L, 300L });
timer.start();
// 200L - 100L
assertEquals("Incorrect amount of time has been elapsed!", 100L,
timer.getElapsedTime());
timer.stop();
// 300L - 100L
assertEquals("Incorrect amount of time has been elapsed!", 200L,
timer.getElapsedTime());
}
/**
* Get the implementation under test.
*
* @return the implementation instance
*/
protected abstract TIMER getTimer();
}

View file

@ -0,0 +1,181 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.common.time.util;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import org.junit.Test;
import com.raytheon.uf.common.status.IPerformanceStatusHandler;
import com.raytheon.uf.common.time.domain.TimePoints;
import com.raytheon.uf.common.time.domain.api.ITimePoint;
/**
* Test {@link PerformanceTimerImpl}.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Jun 14, 2013 2095 djohnson Initial creation
*
* </pre>
*
* @author djohnson
* @version 1.0
*/
public class PerformanceTimerImplTest extends
AbstractTimerTest<PerformanceTimerImpl> {
@Test
public void testLapWillReturnTimeSinceStartWhenFirstLap() {
TimeUtilTest.installMockTimes(new long[] { 100L, 200L, 300L });
timer.start();
// 200L - 100L
assertThat(timer.lap().getMillis(), is(100L));
}
@Test
public void testLapWillReturnTimeSinceLastLapWhenNotFirstLap() {
TimeUtilTest.installMockTimes(new long[] { 100L, 200L, 500L });
timer.start();
timer.lap();
// 500L - 200L
assertThat(timer.lap().getMillis(), is(300L));
}
@Test
public void testLapMillisWillReturnTimeSinceStartWhenFirstLap() {
TimeUtilTest.installMockTimes(new long[] { 100L, 200L, 300L });
timer.start();
// 200L - 100L
assertThat(timer.lapMillis(), is(100L));
}
@Test
public void testLapMillisWillReturnTimeSinceLastLapWhenNotFirstLap() {
TimeUtilTest.installMockTimes(new long[] { 100L, 200L, 500L });
timer.start();
timer.lap();
// 500L - 200L
assertThat(timer.lapMillis(), is(300L));
}
@Test
public void testLapMillisByNameWillReturnTimeSinceStartWhenFirstLap() {
TimeUtilTest.installMockTimes(new long[] { 100L, 200L, 300L });
timer.start();
// 200L - 100L
assertThat(timer.lapMillis("someName"), is(100L));
}
@Test
public void testLapMillisByNameWillReturnTimeSinceLastLapWhenNotFirstLap() {
TimeUtilTest.installMockTimes(new long[] { 100L, 200L, 500L });
timer.start();
timer.lap();
// 500L - 200L
assertThat(timer.lapMillis("someName"), is(300L));
}
@Test
public void testMultipleLapsReturnsCorrectValue() {
TimeUtilTest.installMockTimes(new long[] { 100L, 200L, 400L, 800L });
timer.start();
// 200L - 100L
assertThat(timer.lap().getMillis(), is(100L));
// 400L - 200L
assertThat(timer.lap().getMillis(), is(200L));
// 800L - 400L
assertThat(timer.lap().getMillis(), is(400L));
}
@Test
public void testLapsAreLoggedByName() {
IPerformanceStatusHandler log = mock(IPerformanceStatusHandler.class);
TimeUtilTest.installMockTimes(new long[] { 100L, 200L, 400L, 800L });
timer.start();
// 200L - 100L
timer.lap();
// 400L - 200L
timer.lap();
// 800L - 400L
timer.lap();
timer.stop();
timer.logLaps("testLapsAreLoggedByName()", log);
verify(log)
.log("testLapsAreLoggedByName() total [700 ms] lap-1 [100 ms] lap-2 [200 ms] lap-3 [400 ms]");
}
@Test
public void testNoLapsOnlyLogsTotalDuration() {
IPerformanceStatusHandler log = mock(IPerformanceStatusHandler.class);
TimeUtilTest.installMockTimes(new long[] { 100L, 200L });
timer.start();
timer.stop();
timer.logLaps("testLapsAreLoggedByName()", log);
verify(log).log("testLapsAreLoggedByName() total [100 ms]");
}
/**
* {@inheritDoc}
*/
@Override
protected PerformanceTimerImpl getTimer() {
return new PerformanceTimerImpl() {
@Override
protected ITimePoint getCurrentTime() {
// Overridden to return mock times in the test
return TimePoints.fromMillis(TimeUtil.currentTimeMillis());
}
};
}
}

View file

@ -1,10 +1,5 @@
package com.raytheon.uf.common.time.util;
import static org.junit.Assert.assertEquals;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
*
@ -17,136 +12,21 @@ import org.junit.Test;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Aug 16, 2012 0743 djohnson Initial creation
* Jun 14, 2013 2095 djohnson Extracted contents to {@link AbstractTimerTest}.
*
* </pre>
*
* @author djohnson
* @version 1.0
*/
public class TimerImplTest {
public class TimerImplTest extends AbstractTimerTest<TimerImpl> {
@Before
public void setUp() {
TimeUtilTest.installMockTimes(new long[] { 100L, 200L });
/**
* {@inheritDoc}
*/
@Override
protected TimerImpl getTimer() {
return new TimerImpl();
}
@After
public void cleanUp() {
TimeUtilTest.resumeTime();
}
@Test(expected = IllegalStateException.class)
public void testTimerImplCantBeStoppedIfHasntBeenStarted() {
TimerImpl timer = new TimerImpl();
timer.stop();
}
@Test
public void testRestartedTimerWillNotUseIntermediateTime() {
TimeUtilTest.installMockTimes(new long[] { 50L, 75L, 150L, 250L });
TimerImpl timer = new TimerImpl();
// Starts at 50L
timer.start();
// Stops at 75L: 25L elapsed time
timer.stop();
// Restart at 150L: still 25L elapsed time
timer.start();
// Stops again at 250L: 100L more elapsed time
timer.stop();
// 100L + 25L
assertEquals(
"The intermediate time the timer was stopped should not have counted toward the elapsed time!",
125L, timer.getElapsedTime());
}
@Test
public void testTimerImplReturnsElapsedTime() throws InterruptedException {
TimerImpl timer = new TimerImpl();
timer.start();
timer.stop();
assertEquals("Invalid elapsed time returned!", 100L,
timer.getElapsedTime());
}
@Test
public void testResetWillAllowTimerToBeReused() {
// The first difference will be 75-50 = 25L
// The second difference will be 250-150 = 100L
TimeUtilTest.installMockTimes(new long[] { 50L, 75L, 150L, 250L });
TimerImpl timer = new TimerImpl();
timer.start();
timer.stop();
assertEquals("Incorrect elapsed time returned!", 25L,
timer.getElapsedTime());
timer.reset();
timer.start();
timer.stop();
assertEquals("Incorrect elapsed time returned!", 100L,
timer.getElapsedTime());
}
@Test
public void testResetDoesNotUsePreviousElapsedTime() {
TimeUtilTest.installMockTimes(new long[] { 50L, 75L, 150L, 250L });
TimerImpl timer = new TimerImpl();
timer.start();
timer.stop();
assertEquals("Incorrect elapsed time returned!", 25L,
timer.getElapsedTime());
timer.reset();
assertEquals("A reset timer should not have elapsed time!", 0,
timer.getElapsedTime());
}
@Test(expected = IllegalStateException.class)
public void testStartBeingCalledTwiceThrowsException() {
TimerImpl timer = new TimerImpl();
timer.start();
timer.start();
}
@Test
public void testStoppingATimerTwiceDoesNotChangeStopTime() {
TimeUtilTest.installMockTimes(new long[] { 100L, 200L, 300L });
TimerImpl timer = new TimerImpl();
timer.start();
timer.stop();
// Elapsed time should still be 100L since the stop time should be stuck
// at 200L
timer.stop();
assertEquals(
"Expected the stop time to not have changed after the second invocation!",
100L, timer.getElapsedTime());
}
@Test
public void testGetElapsedTimeCanBeCalledOnARunningLock() {
TimeUtilTest.installMockTimes(new long[] { 100L, 200L, 300L });
TimerImpl timer = new TimerImpl();
timer.start();
// 200L - 100L
assertEquals("Incorrect amount of time has been elapsed!", 100L,
timer.getElapsedTime());
timer.stop();
// 300L - 100L
assertEquals("Incorrect amount of time has been elapsed!", 200L,
timer.getElapsedTime());
}
}