Issue #2106 Improve BandwidthManager performance, proposed changes are checked much quicker

Change-Id: I3a61de1904d7256de04a97f40f38bf18d78b1438

Former-commit-id: b36200d583 [formerly 59eafdd07754e3711e9c3babd946dfd18c161404]
Former-commit-id: ec2ddd13b1
This commit is contained in:
Dustin Johnson 2013-07-10 15:36:00 -05:00
parent b827522f92
commit 580bc2cadf
40 changed files with 1108 additions and 1547 deletions

View file

@ -16,6 +16,7 @@
<constructor-arg ref="retrievalManager" />
<constructor-arg ref="bandwidthDao" />
<constructor-arg ref="retrievalDao" />
<constructor-arg ref="ProviderHandler" />
</bean>
<util:map id="retrievalAgents">

View file

@ -24,10 +24,13 @@ import java.util.Set;
import com.raytheon.uf.common.datadelivery.bandwidth.ProposeScheduleResponse;
import com.raytheon.uf.common.datadelivery.registry.Subscription;
import com.raytheon.uf.common.datadelivery.registry.handlers.IDataSetMetaDataHandler;
import com.raytheon.uf.common.datadelivery.registry.handlers.ISubscriptionHandler;
import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.common.util.JarUtil;
import com.raytheon.uf.edex.datadelivery.bandwidth.BandwidthManager;
import com.raytheon.uf.edex.datadelivery.bandwidth.EdexBandwidthContextFactory.IEdexBandwidthManagerCreator;
import com.raytheon.uf.edex.datadelivery.bandwidth.EdexBandwidthManager;
import com.raytheon.uf.edex.datadelivery.bandwidth.IBandwidthManager;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDao;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDbInit;
@ -47,6 +50,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthDaoUtil;
* Feb 27, 2013 1644 djohnson Schedule SBN subscriptions.
* Mar 11, 2013 1645 djohnson Add missing Spring file.
* May 15, 2013 2000 djohnson Include daos.
* Jul 10, 2013 2106 djohnson Dependency inject registry handlers.
*
* </pre>
*
@ -58,7 +62,7 @@ public class NcfBandwidthManagerCreator implements IEdexBandwidthManagerCreator
/**
* NCF {@link BandwidthManager} implementation.
*/
static class NcfBandwidthManager extends BandwidthManager {
static class NcfBandwidthManager extends EdexBandwidthManager {
private static final String[] NCF_BANDWIDTH_MANAGER_FILES = new String[] {
JarUtil.getResResourcePath("/spring/bandwidth-datadelivery-ncf-edex-impl.xml"),
@ -79,8 +83,11 @@ public class NcfBandwidthManagerCreator implements IEdexBandwidthManagerCreator
*/
public NcfBandwidthManager(IBandwidthDbInit dbInit,
IBandwidthDao bandwidthDao, RetrievalManager retrievalManager,
BandwidthDaoUtil bandwidthDaoUtil) {
super(dbInit, bandwidthDao, retrievalManager, bandwidthDaoUtil);
BandwidthDaoUtil bandwidthDaoUtil,
IDataSetMetaDataHandler dataSetMetaDataHandler,
ISubscriptionHandler subscriptionHandler) {
super(dbInit, bandwidthDao, retrievalManager, bandwidthDaoUtil,
dataSetMetaDataHandler, subscriptionHandler);
}
@Override
@ -113,9 +120,11 @@ public class NcfBandwidthManagerCreator implements IEdexBandwidthManagerCreator
@Override
public IBandwidthManager getBandwidthManager(IBandwidthDbInit dbInit,
IBandwidthDao bandwidthDao, RetrievalManager retrievalManager,
BandwidthDaoUtil bandwidthDaoUtil) {
BandwidthDaoUtil bandwidthDaoUtil,
IDataSetMetaDataHandler dataSetMetaDataHandler,
ISubscriptionHandler subscriptionHandler) {
return new NcfBandwidthManager(dbInit, bandwidthDao, retrievalManager,
bandwidthDaoUtil);
bandwidthDaoUtil, dataSetMetaDataHandler, subscriptionHandler);
}
}

View file

@ -2,8 +2,7 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
http://www.springframework.org/schema/tx/spring-tx.xsd">
<tx:annotation-driven transaction-manager="metadataTxManager"
proxy-target-class="true" />
@ -21,6 +20,14 @@
<property name="dao" ref="bandwidthAllocationDao" />
</bean>
<bean id="retrievalAgentManager"
class="com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalAgentManager"
init-method="start">
<constructor-arg ref="retrievalAgentNotifier" />
<constructor-arg ref="retrievalAgents" />
<constructor-arg ref="retrievalDao" />
</bean>
<!-- Used as the context for creating the BandwidthManager -->
<bean id="bandwidthContextFactory"
class="com.raytheon.uf.edex.datadelivery.bandwidth.EdexBandwidthContextFactory">
@ -29,7 +36,8 @@
<!-- TODO: Switch to use database bandwidth buckets
<constructor-arg ref="hibernateBandwidthBucketDao" /> -->
<constructor-arg>
<bean class="com.raytheon.uf.edex.datadelivery.bandwidth.InMemoryBandwidthBucketDao" />
<bean
class="com.raytheon.uf.edex.datadelivery.bandwidth.InMemoryBandwidthBucketDao" />
</constructor-arg>
<!-- The bandwidth manager initializer -->
<constructor-arg>
@ -38,6 +46,7 @@
<constructor-arg>
<bean
class="com.raytheon.uf.edex.datadelivery.bandwidth.util.FindActiveSubscriptionsForRoute">
<constructor-arg ref="SubscriptionHandler" />
<constructor-arg ref="subscriptionRoutesToSchedule" />
</bean>
</constructor-arg>
@ -47,6 +56,9 @@
<constructor-arg ref="bandwidthManagerCreator" />
<!-- The db initializer -->
<constructor-arg ref="hibernateBandwidthDbInit" />
<!-- Registry handlers required for EdexBandwidthManager -->
<constructor-arg ref="DataSetMetaDataHandler" />
<constructor-arg ref="SubscriptionHandler" />
</bean>
<bean id="dataSetAvailabilityCalculator"
@ -54,29 +66,4 @@
<constructor-arg ref="ProviderHandler" />
</bean>
<camelContext id="BandwidthManager-context"
xmlns="http://camel.apache.org/schema/spring" errorHandlerRef="errorHandler">
<route id="bandwidthManagerProcessWork">
<from
uri="jms-generic:queue:matureSubscriptions?destinationResolver=#qpidDurableResolver" />
<doTry>
<pipeline>
<bean ref="serializationUtil" method="transformFromThrift" />
<bean ref="BandwidthManagerProcessor" method="process" />
<bean ref="BandwidthManagerRetrieval" method="generateRetrieval" />
<!-- notify retrieval threads, if data delivery made
into cluster this should move to topic post -->
<to uri="direct-vm:notifyRetrieval" />
</pipeline>
<doCatch>
<exception>java.lang.Throwable</exception>
<to
uri="log:BandwidthManagerProcess?level=ERROR" />
</doCatch>
</doTry>
</route>
</camelContext>
</beans>

View file

@ -2,6 +2,9 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd">
<bean id="bandwidthEventBus"
class="com.raytheon.uf.edex.datadelivery.bandwidth.notification.BandwidthEventBusBean" />
<bean factory-bean="eventBus" factory-method="register">
<constructor-arg ref="retrievalManager" />
</bean>

View file

@ -8,12 +8,4 @@
<bean id="bandwidthContextFactory"
class="com.raytheon.uf.edex.datadelivery.bandwidth.InMemoryBandwidthContextFactory" />
<bean id="retrievalAgents" class="java.util.Collections"
factory-method="emptyMap">
<!-- No retrievals for in-memory -->
</bean>
<bean id="registerDataDeliveryHandlers" class="java.lang.String">
<!-- required for depends-on -->
</bean>
</beans>

View file

@ -16,6 +16,7 @@
<constructor-arg ref="retrievalManager" />
<constructor-arg ref="bandwidthDao" />
<constructor-arg ref="retrievalDao" />
<constructor-arg ref="ProviderHandler" />
</bean>
<bean id="sbnSubscriptionRetrievalAgentPrototype"
@ -28,6 +29,7 @@
<constructor-arg ref="retrievalManager" />
<constructor-arg ref="bandwidthDao" />
<constructor-arg ref="retrievalDao" />
<constructor-arg ref="ProviderHandler" />
</bean>
<util:map id="retrievalAgents">

View file

@ -2,114 +2,89 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd">
<bean id="propertyPlaceholderConfigurer"
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<list>
<value>classpath:/com.raytheon.uf.edex.datadelivery.bandwidth.properties
</value>
</list>
</property>
</bean>
<bean id="bandwidthEventBus" class="com.raytheon.uf.edex.datadelivery.bandwidth.notification.BandwidthEventBusBean" />
<bean id="propertyPlaceholderConfigurer"
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<list>
<value>classpath:/com.raytheon.uf.edex.datadelivery.bandwidth.properties</value>
</list>
</property>
</bean>
<bean id="bandwidthDao" factory-bean="bandwidthContextFactory"
factory-method="getBandwidthDao" />
<bean id="bandwidthDao" factory-bean="bandwidthContextFactory"
factory-method="getBandwidthDao" />
<bean id="bandwidthBucketDao" factory-bean="bandwidthContextFactory"
factory-method="getBandwidthBucketDao" />
<!-- TODO: Move the associations between a bucket and its allocations into the database -->
<bean id="bandwidthBucketAllocationAssociator" class="com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.InMemoryBandwidthBucketAllocationAssociator">
<!-- TODO: Move the associations between a bucket and its allocations
into the database -->
<bean id="bandwidthBucketAllocationAssociator"
class="com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.InMemoryBandwidthBucketAllocationAssociator">
<constructor-arg ref="bandwidthDao" />
<constructor-arg ref="bandwidthBucketDao" />
</bean>
<bean id="bandwidthDbInit" factory-bean="bandwidthContextFactory"
<bean id="bandwidthDbInit" factory-bean="bandwidthContextFactory"
factory-method="getBandwidthDbInit" />
<bean id="bandwidthManagerInitializer" factory-bean="bandwidthContextFactory"
factory-method="getBandwidthInitializer"/>
<bean id="bandwidthManagerInitializer" factory-bean="bandwidthContextFactory"
factory-method="getBandwidthInitializer" />
<bean id="bandwidthMapConfigFile" factory-bean="bandwidthContextFactory"
factory-method="getBandwidthMapConfigFile" />
<bean id="bandwidthMapConfigFile" factory-bean="bandwidthContextFactory"
factory-method="getBandwidthMapConfigFile" />
<bean id="bandwidthManager" factory-bean="bandwidthContextFactory"
factory-method="getBandwidthManager"
depends-on="BandwidthEventBusConfig,bandwidthUtil,registerDataDeliveryHandlers"
init-method="init">
<constructor-arg ref="bandwidthDbInit" />
<constructor-arg ref="bandwidthDao" />
<constructor-arg ref="retrievalManager" />
<constructor-arg ref="bandwidthDaoUtil" />
<property name="aggregator" ref="aggregator" />
<property name="initializer" ref="bandwidthManagerInitializer" />
</bean>
<bean id="bandwidthManager" factory-bean="bandwidthContextFactory"
factory-method="getBandwidthManager" depends-on="bandwidthUtil"
init-method="init">
<constructor-arg ref="bandwidthDbInit" />
<constructor-arg ref="bandwidthDao" />
<constructor-arg ref="retrievalManager" />
<constructor-arg ref="bandwidthDaoUtil" />
<property name="aggregator" ref="aggregator" />
<property name="initializer" ref="bandwidthManagerInitializer" />
</bean>
<bean id="bandwidthUtil"
class="com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil"
factory-method="getInstance">
<property name="dataSetAvailabilityCalculator" ref="dataSetAvailabilityCalculator" />
<property name="subscriptionLatencyCalculator">
<bean
class="com.raytheon.uf.edex.datadelivery.bandwidth.util.SubscriptionValueLatencyCalculator" />
</property>
<property name="subscriptionRescheduleStrategy">
<bean
class="com.raytheon.uf.edex.datadelivery.bandwidth.util.AlwaysRescheduleSubscriptions" />
</property>
</bean>
<bean id="bandwidthUtil"
class="com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil"
factory-method="getInstance">
<property name="dataSetAvailabilityCalculator" ref="dataSetAvailabilityCalculator" />
<property name="subscriptionLatencyCalculator">
<bean
class="com.raytheon.uf.edex.datadelivery.bandwidth.util.SubscriptionValueLatencyCalculator" />
</property>
<property name="subscriptionRescheduleStrategy">
<bean
class="com.raytheon.uf.edex.datadelivery.bandwidth.util.AlwaysRescheduleSubscriptions" />
</property>
</bean>
<bean id="bandwidthDaoUtil"
class="com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthDaoUtil">
<constructor-arg ref="bandwidthDao" />
<constructor-arg ref="retrievalManager" />
</bean>
<bean id="bandwidthDaoUtil"
class="com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthDaoUtil">
<constructor-arg ref="bandwidthDao" />
<constructor-arg ref="retrievalManager" />
</bean>
<bean id="BandwidthMap"
class="com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.BandwidthMap"
factory-method="load">
<constructor-arg ref="bandwidthMapConfigFile" />
</bean>
<bean id="BandwidthMap"
class="com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.BandwidthMap"
factory-method="load">
<constructor-arg ref="bandwidthMapConfigFile" />
</bean>
<bean id="aggregator"
class="com.raytheon.uf.edex.datadelivery.bandwidth.processing.SimpleSubscriptionAggregator">
<constructor-arg ref="bandwidthDao" />
</bean>
<bean id="aggregator"
class="com.raytheon.uf.edex.datadelivery.bandwidth.processing.SimpleSubscriptionAggregator">
<constructor-arg ref="bandwidthDao" />
</bean>
<bean id="BandwidthEventBusConfig"
class="com.raytheon.uf.edex.datadelivery.bandwidth.notification.BandwidthEventBusConfig">
<property name="dataSetMetaDataPoolSize" value="${bandwidth.dataSetMetaDataPoolSize}" />
<property name="retrievalPoolSize" value="${bandwidth.retrievalPoolSize}" />
<property name="subscriptionPoolSize" value="${bandwidth.subscriptionPoolSize}" />
</bean>
<!-- The shared monitor object between the RetrievalAgentManager and
its Agents -->
<bean id="retrievalAgentNotifier" class="java.lang.Object" />
<!-- The shared monitor object between the RetrievalAgentManager and its
Agents -->
<bean id="retrievalAgentNotifier" class="java.lang.Object" />
<bean id="retrievalManager"
class="com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalManager">
<constructor-arg ref="bandwidthDao" />
<constructor-arg ref="retrievalAgentNotifier" />
<property name="retrievalPlans" ref="retrievalPlans" />
</bean>
<bean id="retrievalAgentManager"
class="com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalAgentManager"
init-method="start">
<constructor-arg ref="retrievalAgentNotifier" />
<constructor-arg ref="retrievalAgents" />
<constructor-arg ref="retrievalDao" />
</bean>
<bean id="retrievalManager"
class="com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalManager">
<constructor-arg ref="bandwidthDao" />
<constructor-arg ref="retrievalAgentNotifier" />
<property name="retrievalPlans" ref="retrievalPlans" />
</bean>
<bean id="BandwidthManagerProcessor"
class="com.raytheon.uf.edex.datadelivery.bandwidth.processing.Processor" />
<bean id="BandwidthManagerRetrieval"
class="com.raytheon.uf.edex.datadelivery.retrieval.RetrievalGenerationHandler">
<constructor-arg ref="retrievalDao" />
</bean>
<bean id="SubscriptionBundleSeparator"
class="com.raytheon.uf.edex.datadelivery.bandwidth.separator.SubscriptionBundleSeparator" />
</beans>

View file

@ -1,6 +1,5 @@
bandwidth.dataSetMetaDataPoolSize=2
bandwidth.retrievalPoolSize=4
bandwidth.subscriptionPoolSize=2
bandwidth.dataSetAvailabilityCalculator.delay=100
bandwidth.subscription.latency=120
bandwidth.default.retrieval.priority=3

View file

@ -1,6 +1,5 @@
package com.raytheon.uf.edex.datadelivery.bandwidth;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
@ -11,19 +10,12 @@ import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import com.raytheon.edex.util.Util;
import com.raytheon.uf.common.auth.exception.AuthorizationException;
import com.raytheon.uf.common.auth.user.IUser;
@ -33,22 +25,12 @@ import com.raytheon.uf.common.datadelivery.bandwidth.IProposeScheduleResponse;
import com.raytheon.uf.common.datadelivery.bandwidth.ProposeScheduleResponse;
import com.raytheon.uf.common.datadelivery.bandwidth.data.BandwidthGraphData;
import com.raytheon.uf.common.datadelivery.registry.AdhocSubscription;
import com.raytheon.uf.common.datadelivery.registry.DataDeliveryRegistryObjectTypes;
import com.raytheon.uf.common.datadelivery.registry.DataSetMetaData;
import com.raytheon.uf.common.datadelivery.registry.DataType;
import com.raytheon.uf.common.datadelivery.registry.GriddedDataSetMetaData;
import com.raytheon.uf.common.datadelivery.registry.Network;
import com.raytheon.uf.common.datadelivery.registry.PointDataSetMetaData;
import com.raytheon.uf.common.datadelivery.registry.PointTime;
import com.raytheon.uf.common.datadelivery.registry.SiteSubscription;
import com.raytheon.uf.common.datadelivery.registry.Subscription;
import com.raytheon.uf.common.datadelivery.registry.Time;
import com.raytheon.uf.common.datadelivery.registry.handlers.DataDeliveryHandlers;
import com.raytheon.uf.common.event.EventBus;
import com.raytheon.uf.common.registry.event.InsertRegistryEvent;
import com.raytheon.uf.common.registry.event.RemoveRegistryEvent;
import com.raytheon.uf.common.registry.handler.IRegistryObjectHandler;
import com.raytheon.uf.common.registry.handler.RegistryHandlerException;
import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.common.status.IPerformanceStatusHandler;
import com.raytheon.uf.common.status.IUFStatusHandler;
@ -59,8 +41,6 @@ import com.raytheon.uf.common.time.util.IPerformanceTimer;
import com.raytheon.uf.common.time.util.ITimer;
import com.raytheon.uf.common.time.util.TimeUtil;
import com.raytheon.uf.common.util.CollectionUtil;
import com.raytheon.uf.common.util.FileUtil;
import com.raytheon.uf.common.util.IFileModifiedWatcher;
import com.raytheon.uf.common.util.LogUtil;
import com.raytheon.uf.common.util.algorithm.AlgorithmUtil;
import com.raytheon.uf.common.util.algorithm.AlgorithmUtil.IBinarySearchResponse;
@ -76,14 +56,12 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDbInit;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.SubscriptionRetrieval;
import com.raytheon.uf.edex.datadelivery.bandwidth.interfaces.BandwidthInitializer;
import com.raytheon.uf.edex.datadelivery.bandwidth.interfaces.ISubscriptionAggregator;
import com.raytheon.uf.edex.datadelivery.bandwidth.notification.BandwidthEventBus;
import com.raytheon.uf.edex.datadelivery.bandwidth.processing.SimpleSubscriptionAggregator;
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.BandwidthMap;
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.BandwidthRoute;
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalManager;
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalPlan;
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalStatus;
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.SubscriptionRetrievalFulfilled;
import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthDaoUtil;
import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil;
@ -122,6 +100,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil;
* Jun 18, 2013 2120 dhladky Add times to pointtime array
* Jun 20, 2013 1802 djohnson Check several times for the metadata for now.
* Jun 24, 2013 2106 djohnson Access BandwidthBucket contents through RetrievalPlan.
* Jul 10, 2013 2106 djohnson Move EDEX instance specific code into its own class.
* </pre>
*
* @author dhladky
@ -131,21 +110,17 @@ public abstract class BandwidthManager extends
AbstractPrivilegedRequestHandler<IBandwidthRequest> implements
IBandwidthManager {
private static final IUFStatusHandler statusHandler = UFStatus
protected static final IUFStatusHandler statusHandler = UFStatus
.getHandler(BandwidthManager.class);
private static final Pattern RAP_PATTERN = Pattern
.compile(".*rap_f\\d\\d$");
// Requires package access so it can be accessed from the maintenance task
final IBandwidthDao bandwidthDao;
private ISubscriptionAggregator aggregator;
private BandwidthInitializer initializer;
private final ScheduledExecutorService scheduler;
private final IBandwidthDao bandwidthDao;
private final BandwidthDaoUtil bandwidthDaoUtil;
protected final BandwidthDaoUtil bandwidthDaoUtil;
private final IBandwidthDbInit dbInit;
@ -157,21 +132,6 @@ public abstract class BandwidthManager extends
@VisibleForTesting
final RetrievalManager retrievalManager;
@VisibleForTesting
final Runnable watchForConfigFileChanges = new Runnable() {
private final IFileModifiedWatcher fileModifiedWatcher = FileUtil
.getFileModifiedWatcher(EdexBandwidthContextFactory
.getBandwidthMapConfig());
@Override
public void run() {
if (fileModifiedWatcher.hasBeenModified()) {
bandwidthMapConfigurationUpdated();
}
}
};
public BandwidthManager(IBandwidthDbInit dbInit,
IBandwidthDao bandwidthDao, RetrievalManager retrievalManager,
BandwidthDaoUtil bandwidthDaoUtil) {
@ -179,348 +139,6 @@ public abstract class BandwidthManager extends
this.bandwidthDao = bandwidthDao;
this.retrievalManager = retrievalManager;
this.bandwidthDaoUtil = bandwidthDaoUtil;
// schedule maintenance tasks
scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(watchForConfigFileChanges, 1, 1,
TimeUnit.MINUTES);
scheduler.scheduleAtFixedRate(new MaintanenceTask(), 1, 5,
TimeUnit.MINUTES);
}
/**
* {@inheritDoc}
*/
@Override
@Subscribe
@AllowConcurrentEvents
public void registryEventListener(InsertRegistryEvent re) {
final String objectType = re.getObjectType();
final String id = re.getId();
if (DataDeliveryRegistryObjectTypes.DATASETMETADATA.equals(objectType)) {
DataSetMetaData dsmd = getDataSetMetaData(id);
if (dsmd != null) {
// Repost the Object to the BandwidthEventBus to free
// the notification thread.
// TODO: A hack to prevent rap_f and rap datasets being
// Identified as the
// same dataset...
Matcher matcher = RAP_PATTERN.matcher(dsmd.getUrl());
if (matcher.matches()) {
statusHandler
.info("Found rap_f dataset - updating dataset name from ["
+ dsmd.getDataSetName() + "] to [rap_f]");
dsmd.setDataSetName("rap_f");
}
// TODO: End of hack..
BandwidthEventBus.publish(dsmd);
} else {
statusHandler.error("No DataSetMetaData found for id [" + id
+ "]");
}
} else if (DataDeliveryRegistryObjectTypes.SITE_SUBSCRIPTION
.equals(objectType)
|| DataDeliveryRegistryObjectTypes.SHARED_SUBSCRIPTION
.equals(objectType)) {
Subscription subscription = getSubscription(id);
if (subscription != null) {
// Make sure the subscriptionId is set to the
// RegistryObjectId
subscription.setId(id);
// Repost the Object to the BandwidthEventBus to free
// the notification thread.
BandwidthEventBus.publish(subscription);
} else {
statusHandler
.error("No Subscription found for id [" + id + "]");
}
}
}
private static DataSetMetaData getDataSetMetaData(String id) {
return getRegistryObjectById(
DataDeliveryHandlers.getDataSetMetaDataHandler(), id);
}
private static Subscription getSubscription(String id) {
return getRegistryObjectById(
DataDeliveryHandlers.getSubscriptionHandler(), id);
}
private static <T> T getRegistryObjectById(
IRegistryObjectHandler<T> handler, String id) {
try {
return handler.getById(id);
} catch (RegistryHandlerException e) {
statusHandler.error("Error attempting to retrieve RegistryObject["
+ id + "] from Registry.", e);
return null;
}
}
/**
* Process a {@link GriddedDataSetMetaData} that was received from the event
* bus.
*
* @param dataSetMetaData
* the metadadata
*/
@Subscribe
public void updateGriddedDataSetMetaData(
GriddedDataSetMetaData dataSetMetaData) throws ParseException {
// Daily/Hourly/Monthly datasets
if (dataSetMetaData.getCycle() == GriddedDataSetMetaData.NO_CYCLE) {
updateDataSetMetaDataWithoutCycle(dataSetMetaData);
}
// Regular cycle containing datasets
else {
updateDataSetMetaDataWithCycle(dataSetMetaData);
}
}
/**
* Process a {@link PointDataSetMetaData} that was received from the event
* bus.
*
* @param dataSetMetaData
* the metadadata
*/
@Subscribe
public void updatePointDataSetMetaData(PointDataSetMetaData dataSetMetaData) {
// TODO: Change PointDataSetMetaData to only be able to use PointTime
// objects
final PointTime time = (PointTime) dataSetMetaData.getTime();
final String providerName = dataSetMetaData.getProviderName();
final String dataSetName = dataSetMetaData.getDataSetName();
final Date pointTimeStart = time.getStartDate();
final Date pointTimeEnd = time.getEndDate();
final SortedSet<Integer> allowedRefreshIntervals = PointTime
.getAllowedRefreshIntervals();
final long maxAllowedRefreshIntervalInMillis = TimeUtil.MILLIS_PER_MINUTE
* allowedRefreshIntervals.last();
final long minAllowedRefreshIntervalInMillis = TimeUtil.MILLIS_PER_MINUTE
* allowedRefreshIntervals.first();
// Find any retrievals ranging from those with the minimum refresh
// interval to the maximum refresh interval
final Date startDate = new Date(pointTimeStart.getTime()
+ minAllowedRefreshIntervalInMillis);
final Date endDate = new Date(pointTimeEnd.getTime()
+ maxAllowedRefreshIntervalInMillis);
final SortedSet<SubscriptionRetrieval> subscriptionRetrievals = bandwidthDao
.getSubscriptionRetrievals(providerName, dataSetName,
RetrievalStatus.SCHEDULED, startDate, endDate);
if (!CollectionUtil.isNullOrEmpty(subscriptionRetrievals)) {
for (SubscriptionRetrieval retrieval : subscriptionRetrievals) {
// Now check and make sure that at least one of the times falls
// in their retrieval range, their latency is the retrieval
// interval
final int retrievalInterval = retrieval
.getSubscriptionLatency();
// This is the latest time on the data we care about, once the
// retrieval is signaled to go it retrieves everything up to
// its start time
final Date latestRetrievalDataTime = retrieval.getStartTime()
.getTime();
// This is the earliest possible time this retrieval cares about
final Date earliestRetrievalDataTime = new Date(
latestRetrievalDataTime.getTime()
- (TimeUtil.MILLIS_PER_MINUTE * retrievalInterval));
// If the end is before any times we care about or the start is
// after the latest times we care about, skip it
if (pointTimeEnd.before(earliestRetrievalDataTime)
|| pointTimeStart.after(latestRetrievalDataTime)) {
continue;
}
try {
// Update the retrieval times on the subscription object
// which goes through the retrieval process
final Subscription subscription = retrieval
.getSubscription();
subscription.setUrl(dataSetMetaData.getUrl());
subscription.setProvider(dataSetMetaData.getProviderName());
if (subscription.getTime() instanceof PointTime) {
final PointTime subTime = (PointTime) subscription
.getTime();
subTime.setRequestStartAsDate(earliestRetrievalDataTime);
subTime.setRequestEndAsDate(latestRetrievalDataTime);
subTime.setTimes(time.getTimes());
// Now update the retrieval to be ready
retrieval.setStatus(RetrievalStatus.READY);
bandwidthDaoUtil.update(retrieval);
} else {
throw new IllegalArgumentException("Subscription time not PointType! " + subscription.getName());
}
} catch (SerializationException e) {
statusHandler.handle(Priority.PROBLEM,
e.getLocalizedMessage(), e);
}
}
}
}
/**
* Handles updates for datasets that do not contain cycles.
*
* @param dataSetMetaData
* the dataset metadata
* @throws ParseException
* on parsing errors
*/
private void updateDataSetMetaDataWithoutCycle(
GriddedDataSetMetaData dataSetMetaData) throws ParseException {
bandwidthDao.newBandwidthDataSetUpdate(dataSetMetaData);
// Looking for active subscriptions to the dataset.
try {
List<Subscription> subscriptions = DataDeliveryHandlers
.getSubscriptionHandler().getActiveByDataSetAndProvider(
dataSetMetaData.getDataSetName(),
dataSetMetaData.getProviderName());
if (subscriptions.isEmpty()) {
return;
}
statusHandler
.info(String
.format("Found [%s] subscriptions that will have an "
+ "adhoc subscription generated and scheduled for url [%s].",
subscriptions.size(),
dataSetMetaData.getUrl()));
// Create an adhoc for each one, and schedule it
for (Subscription subscription : subscriptions) {
Subscription sub = updateSubscriptionWithDataSetMetaData(
subscription, dataSetMetaData);
if (sub instanceof SiteSubscription) {
schedule(new AdhocSubscription((SiteSubscription) sub));
} else {
statusHandler
.warn("Unable to create adhoc queries for shared subscriptions at this point. This functionality should be added in the future...");
}
}
} catch (RegistryHandlerException e) {
statusHandler.handle(Priority.PROBLEM,
"Failed to lookup subscriptions.", e);
}
}
/**
* Handles updates for datasets that contain cycles.
*
* @param dataSetMetaData
* the dataset metadata
* @throws ParseException
* on parsing errors
*/
private void updateDataSetMetaDataWithCycle(
GriddedDataSetMetaData dataSetMetaData) throws ParseException {
BandwidthDataSetUpdate dataset = bandwidthDao
.newBandwidthDataSetUpdate(dataSetMetaData);
// Looking for active subscriptions to the dataset.
List<SubscriptionRetrieval> subscriptions = bandwidthDao
.getSubscriptionRetrievals(dataset.getProviderName(),
dataset.getDataSetName(), dataset.getDataSetBaseTime());
if (!subscriptions.isEmpty()) {
// Loop through the scheduled SubscriptionRetrievals and mark
// the scheduled retrievals as ready for retrieval
for (SubscriptionRetrieval retrieval : subscriptions) {
// TODO: Evaluate the state changes for receiving multiple
// dataset update messages. This seems to be happening
// quite a bit.
if (RetrievalStatus.SCHEDULED.equals(retrieval.getStatus())) {
// Need to update the Subscription Object in the
// SubscriptionRetrieval with the current DataSetMetaData
// URL and time Object
Subscription sub;
try {
sub = updateSubscriptionWithDataSetMetaData(
retrieval.getSubscription(), dataSetMetaData);
// Update the SubscriptionRetrieval record with the new
// data...
retrieval.setSubscription(sub);
} catch (SerializationException e) {
statusHandler
.handle(Priority.PROBLEM,
"Unable to serialize the subscription for the retrieval, skipping...",
e);
continue;
}
retrieval.setStatus(RetrievalStatus.READY);
bandwidthDaoUtil.update(retrieval);
statusHandler
.info(String.format("Updated retrieval [%s] for "
+ "subscription [%s] to use "
+ "url [%s] and "
+ "base reference time [%s]", retrieval
.getIdentifier(), sub.getName(),
dataSetMetaData.getUrl(), BandwidthUtil
.format(sub.getTime()
.getStartDate())));
}
}
// Notify RetrievalAgentManager of updated RetrievalRequests.
retrievalManager.wakeAgents();
} else {
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
statusHandler
.debug("No Subscriptions scheduled for BandwidthDataSetUpdate ["
+ dataset.getIdentifier()
+ "] base time ["
+ BandwidthUtil.format(dataset
.getDataSetBaseTime()) + "]");
}
}
}
/**
* Updates a {@link Subscription) to reflect important attributes of the
* specified {@link DataSetMetaData}.
*
* @param sub
* the subscription
* @param dataSetMetaData
* the datasetmetadata update
* @return the subscription
*/
private static Subscription updateSubscriptionWithDataSetMetaData(
Subscription sub, DataSetMetaData dataSetMetaData) {
final Time dsmdTime = dataSetMetaData.getTime();
final Time subTime = sub.getTime();
dsmdTime.setSelectedTimeIndices(subTime.getSelectedTimeIndices());
dsmdTime.setCycleTimes(subTime.getCycleTimes());
sub.setTime(dsmdTime);
sub.setUrl(dataSetMetaData.getUrl());
return sub;
}
private List<BandwidthAllocation> schedule(Subscription subscription,
@ -611,7 +229,7 @@ public abstract class BandwidthManager extends
return unscheduled;
}
private List<BandwidthAllocation> schedule(BandwidthSubscription dao) {
protected List<BandwidthAllocation> schedule(BandwidthSubscription dao) {
Calendar retrievalTime = dao.getBaseReferenceTime();
// Retrieve all the current subscriptions by provider, dataset name and
@ -742,33 +360,6 @@ public abstract class BandwidthManager extends
return unscheduled;
}
/**
* {@inheritDoc}
*/
@Override
@Subscribe
@AllowConcurrentEvents
public void subscriptionRemoved(RemoveRegistryEvent event) {
String objectType = event.getObjectType();
if (objectType != null) {
if (DataDeliveryRegistryObjectTypes.SITE_SUBSCRIPTION
.equals(objectType)
|| DataDeliveryRegistryObjectTypes.SHARED_SUBSCRIPTION
.equals(objectType)) {
statusHandler
.info("Recieved Subscription removal notification for Subscription ["
+ event.getId() + "]");
// Need to locate and remove all BandwidthReservations for the
// given subscription..
List<BandwidthSubscription> l = bandwidthDao
.getBandwidthSubscriptionByRegistryId(event.getId());
if (!l.isEmpty()) {
remove(l);
}
}
}
}
/**
* {@inheritDoc}
*/
@ -1016,7 +607,7 @@ public abstract class BandwidthManager extends
* The subscriptionDao's to remove.
* @return
*/
private List<BandwidthAllocation> remove(
protected List<BandwidthAllocation> remove(
List<BandwidthSubscription> bandwidthSubscriptions) {
List<BandwidthAllocation> unscheduled = new ArrayList<BandwidthAllocation>();
@ -1028,100 +619,6 @@ public abstract class BandwidthManager extends
return unscheduled;
}
/**
* {@inheritDoc}
*/
@Override
@Subscribe
public void subscriptionFulfilled(
SubscriptionRetrievalFulfilled subscriptionRetrievalFulfilled) {
statusHandler.info("subscriptionFullfilled() :: "
+ subscriptionRetrievalFulfilled);
SubscriptionRetrieval sr = subscriptionRetrievalFulfilled
.getSubscriptionRetrieval();
List<SubscriptionRetrieval> subscriptionRetrievals = bandwidthDao
.querySubscriptionRetrievals(sr.getBandwidthSubscription());
// Look to see if all the SubscriptionRetrieval's for a subscription are
// completed.
boolean complete = true;
for (SubscriptionRetrieval subscription : subscriptionRetrievals) {
if (!RetrievalStatus.FULFILLED.equals(subscription.getStatus())) {
complete = false;
break;
}
}
if (complete) {
// Remove the completed SubscriptionRetrieval Objects from the
// plan..
RetrievalPlan plan = retrievalManager.getPlan(sr.getNetwork());
plan.remove(sr);
// Schedule the next iteration of the subscription
BandwidthSubscription dao = sr.getBandwidthSubscription();
Subscription subscription = null;
try {
subscription = dao.getSubscription();
} catch (SerializationException e) {
statusHandler.error(
"Failed to extract Subscription from BandwidthSubscription ["
+ dao.getIdentifier() + "]", e);
// No sense in continuing
return;
}
// AdhocSubscriptions are one and done, so don't reschedule.
if (subscription instanceof AdhocSubscription) {
return;
}
Calendar next = BandwidthUtil.copy(dao.getBaseReferenceTime());
// See how far into the future the plan goes..
int days = retrievalManager.getPlan(dao.getRoute()).getPlanDays();
for (int day = 1; day <= days; day++) {
next.add(Calendar.DAY_OF_YEAR, 1);
// Since subscriptions are based on cycles in a day, add one day
// to the
// completed BandwidthSubscription to get the next days
// retrieval.
// Now check if that BandwidthSubscription has already been
// scheduled.
BandwidthSubscription a = bandwidthDao
.getBandwidthSubscription(dao.getRegistryId(), next);
if (a == null) {
// Create the new BandwidthSubscription record with the next
// time..
try {
a = bandwidthDao.newBandwidthSubscription(subscription,
next);
} catch (SerializationException e) {
statusHandler.error(
"Failed to create new BandwidthSubscription from Subscription ["
+ subscription.getId()
+ "] baseReferenceTime ["
+ BandwidthUtil.format(next) + "]", e);
}
schedule(a);
} else {
statusHandler
.info("Subscription ["
+ subscription.getName()
+ "] has already been scheduled for baseReferenceTime ["
+ BandwidthUtil.format(next) + "]");
}
}
}
}
/**
* {@inheritDoc}
*/
@ -1618,29 +1115,6 @@ public abstract class BandwidthManager extends
initializer.init(this, dbInit, retrievalManager);
}
/**
* Private inner work thread used to keep the RetrievalPlans up to date.
*
*/
private class MaintanenceTask implements Runnable {
@Override
public void run() {
for (RetrievalPlan plan : retrievalManager.getRetrievalPlans()
.values()) {
plan.resize();
Calendar newEnd = plan.getPlanEnd();
// Find DEFERRED Allocations and load them into the plan...
List<BandwidthAllocation> deferred = bandwidthDao.getDeferred(
plan.getNetwork(), newEnd);
if (!deferred.isEmpty()) {
retrievalManager.schedule(deferred);
}
}
}
}
/**
* {@inheritDoc}
*/
@ -1725,35 +1199,14 @@ public abstract class BandwidthManager extends
*/
@VisibleForTesting
void shutdown() {
unregisterFromEventBus();
unregisterFromBandwidthEventBus();
try {
retrievalManager.shutdown();
} catch (Exception e) {
statusHandler.handle(Priority.WARN,
"Unable to shutdown the retrievalManager.", e);
} finally {
shutdownInternal();
}
try {
scheduler.shutdownNow();
} catch (Exception e) {
statusHandler.handle(Priority.WARN,
"Unable to shutdown the scheduler.", e);
}
}
/**
* Unregister from the {@link EventBus}.
*/
private void unregisterFromEventBus() {
EventBus.unregister(this);
}
/**
* Unregister from the {@link BandwidthEventBus}.
*/
private void unregisterFromBandwidthEventBus() {
BandwidthEventBus.register(this);
}
/**
@ -1911,18 +1364,7 @@ public abstract class BandwidthManager extends
}
/**
* Signals the bandwidth map localization file is updated, perform a
* reinitialize operation.
* Provide implementation specific shutdown.
*/
private void bandwidthMapConfigurationUpdated() {
IBandwidthRequest request = new IBandwidthRequest();
request.setRequestType(RequestType.REINITIALIZE);
try {
handleRequest(request);
} catch (Exception e) {
statusHandler.handle(Priority.PROBLEM,
"Error while reinitializing the bandwidth manager.", e);
}
}
protected abstract void shutdownInternal();
}

View file

@ -21,6 +21,8 @@ package com.raytheon.uf.edex.datadelivery.bandwidth;
import java.io.File;
import com.raytheon.uf.common.datadelivery.registry.handlers.IDataSetMetaDataHandler;
import com.raytheon.uf.common.datadelivery.registry.handlers.ISubscriptionHandler;
import com.raytheon.uf.common.localization.IPathManager;
import com.raytheon.uf.common.localization.LocalizationContext;
import com.raytheon.uf.common.localization.LocalizationContext.LocalizationLevel;
@ -47,6 +49,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthDaoUtil;
* ------------ ---------- ----------- --------------------------
* Oct 24, 2012 1286 djohnson Initial creation
* Feb 20, 2013 1543 djohnson Add IEdexBandwidthManagerCreator.
* Jul 10, 2013 2106 djohnson Dependency inject registry handlers.
*
* </pre>
*
@ -68,14 +71,18 @@ public class EdexBandwidthContextFactory implements BandwidthContextFactory {
* @param bandwidthDao
* @param retrievalManager
* @param bandwidthDaoUtil
* @param dataSetMetaDataHandler
* @param subscriptionHandler
* @return the bandwidth manager
*/
IBandwidthManager getBandwidthManager(IBandwidthDbInit dbInit,
IBandwidthDao bandwidthDao, RetrievalManager retrievalManager,
BandwidthDaoUtil bandwidthDaoUtil);
BandwidthDaoUtil bandwidthDaoUtil,
IDataSetMetaDataHandler dataSetMetaDataHandler,
ISubscriptionHandler subscriptionHandler);
}
private static BandwidthManager instance;
private static EdexBandwidthManager instance;
private final IBandwidthDao bandwidthDao;
@ -87,23 +94,36 @@ public class EdexBandwidthContextFactory implements BandwidthContextFactory {
private final IBandwidthDbInit dbInit;
private final IDataSetMetaDataHandler dataSetMetaDataHandler;
private final ISubscriptionHandler subscriptionHandler;
/**
* Intentionally package-private constructor, as it is created from Spring
* which is able to reflectively instantiate.
*
* @param bandwidthDao
* @param findSubscriptionStrategy
* @param bandwidthBucketDao
* @param bandwidthInitializer
* @param bandwidthManagerCreator
* @param dbInit
* @param dataSetMetaDataHandler
* @param subscriptionHandler
*/
EdexBandwidthContextFactory(IBandwidthDao bandwidthDao,
IBandwidthBucketDao bandwidthBucketDao,
BandwidthInitializer bandwidthInitializer,
IEdexBandwidthManagerCreator bandwidthManagerCreator,
IBandwidthDbInit dbInit) {
IBandwidthDbInit dbInit,
IDataSetMetaDataHandler dataSetMetaDataHandler,
ISubscriptionHandler subscriptionHandler) {
this.bandwidthDao = bandwidthDao;
this.bandwidthBucketDao = bandwidthBucketDao;
this.bandwidthInitializer = bandwidthInitializer;
this.bandwidthManagerCreator = bandwidthManagerCreator;
this.dbInit = dbInit;
this.dataSetMetaDataHandler = dataSetMetaDataHandler;
this.subscriptionHandler = subscriptionHandler;
}
/**
@ -114,13 +134,9 @@ public class EdexBandwidthContextFactory implements BandwidthContextFactory {
* @param instance
* the {@link BandwidthManager} instance
*/
EdexBandwidthContextFactory(BandwidthManager instance) {
EdexBandwidthContextFactory(EdexBandwidthManager instance) {
this(null, null, null, null, null, null, null);
EdexBandwidthContextFactory.instance = instance;
this.bandwidthDao = null;
this.bandwidthBucketDao = null;
this.bandwidthInitializer = null;
this.bandwidthManagerCreator = null;
this.dbInit = null;
}
/**
@ -130,7 +146,7 @@ public class EdexBandwidthContextFactory implements BandwidthContextFactory {
*
* @return the instance
*/
static BandwidthManager getInstance() {
static EdexBandwidthManager getInstance() {
return instance;
}
@ -210,6 +226,6 @@ public class EdexBandwidthContextFactory implements BandwidthContextFactory {
IBandwidthDao bandwidthDao, RetrievalManager retrievalManager,
BandwidthDaoUtil bandwidthDaoUtil) {
return bandwidthManagerCreator.getBandwidthManager(dbInit,
bandwidthDao, retrievalManager, bandwidthDaoUtil);
bandwidthDao, retrievalManager, bandwidthDaoUtil, dataSetMetaDataHandler, subscriptionHandler);
}
}

View file

@ -0,0 +1,670 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.edex.datadelivery.bandwidth;
import java.text.ParseException;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.SortedSet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import com.raytheon.uf.common.datadelivery.bandwidth.IBandwidthRequest;
import com.raytheon.uf.common.datadelivery.bandwidth.IBandwidthRequest.RequestType;
import com.raytheon.uf.common.datadelivery.registry.AdhocSubscription;
import com.raytheon.uf.common.datadelivery.registry.DataDeliveryRegistryObjectTypes;
import com.raytheon.uf.common.datadelivery.registry.DataSetMetaData;
import com.raytheon.uf.common.datadelivery.registry.GriddedDataSetMetaData;
import com.raytheon.uf.common.datadelivery.registry.PointDataSetMetaData;
import com.raytheon.uf.common.datadelivery.registry.PointTime;
import com.raytheon.uf.common.datadelivery.registry.SiteSubscription;
import com.raytheon.uf.common.datadelivery.registry.Subscription;
import com.raytheon.uf.common.datadelivery.registry.Time;
import com.raytheon.uf.common.datadelivery.registry.handlers.IDataSetMetaDataHandler;
import com.raytheon.uf.common.datadelivery.registry.handlers.ISubscriptionHandler;
import com.raytheon.uf.common.event.EventBus;
import com.raytheon.uf.common.registry.event.InsertRegistryEvent;
import com.raytheon.uf.common.registry.event.RemoveRegistryEvent;
import com.raytheon.uf.common.registry.handler.IRegistryObjectHandler;
import com.raytheon.uf.common.registry.handler.RegistryHandlerException;
import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.time.util.TimeUtil;
import com.raytheon.uf.common.util.CollectionUtil;
import com.raytheon.uf.common.util.FileUtil;
import com.raytheon.uf.common.util.IFileModifiedWatcher;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthAllocation;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthDataSetUpdate;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthSubscription;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDao;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDbInit;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.SubscriptionRetrieval;
import com.raytheon.uf.edex.datadelivery.bandwidth.notification.BandwidthEventBus;
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalManager;
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalPlan;
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalStatus;
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.SubscriptionRetrievalFulfilled;
import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthDaoUtil;
import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil;
/**
* Implementation of {@link BandwidthManager} that isolates EDEX specific
* functionality. This keeps things out of the {@link InMemoryBandwidthManager}
* that could interfere with garbage collection/threading concerns.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Jul 10, 2013 2106 djohnson Extracted from {@link BandwidthManager}.
*
* </pre>
*
* @author djohnson
* @version 1.0
*/
public abstract class EdexBandwidthManager extends BandwidthManager {
private static final Pattern RAP_PATTERN = Pattern
.compile(".*rap_f\\d\\d$");
private final IDataSetMetaDataHandler dataSetMetaDataHandler;
private final ISubscriptionHandler subscriptionHandler;
private final ScheduledExecutorService scheduler;
@VisibleForTesting
final Runnable watchForConfigFileChanges = new Runnable() {
private final IFileModifiedWatcher fileModifiedWatcher = FileUtil
.getFileModifiedWatcher(EdexBandwidthContextFactory
.getBandwidthMapConfig());
@Override
public void run() {
if (fileModifiedWatcher.hasBeenModified()) {
bandwidthMapConfigurationUpdated();
}
}
};
/**
* @param dbInit
* @param bandwidthDao
* @param retrievalManager
* @param bandwidthDaoUtil
*/
public EdexBandwidthManager(IBandwidthDbInit dbInit,
IBandwidthDao bandwidthDao, RetrievalManager retrievalManager,
BandwidthDaoUtil bandwidthDaoUtil,
IDataSetMetaDataHandler dataSetMetaDataHandler,
ISubscriptionHandler subscriptionHandler) {
super(dbInit, bandwidthDao, retrievalManager, bandwidthDaoUtil);
this.dataSetMetaDataHandler = dataSetMetaDataHandler;
this.subscriptionHandler = subscriptionHandler;
// schedule maintenance tasks
scheduler = Executors.newScheduledThreadPool(1);
// TODO: Uncomment the last line in this comment block when fully
// switched over to Java 1.7 and remove the finally block in shutdown,
// that is also marked as TODO
// This will allow the bandwidth manager to be garbage collected without
// waiting for all of the delayed tasks to expire, currently they are
// manually removed in the shutdown method by casting to the
// implementation and clearing the queue
// scheduler.setRemoveOnCancelPolicy(true);
scheduler.scheduleAtFixedRate(watchForConfigFileChanges, 1, 1,
TimeUnit.MINUTES);
scheduler.scheduleAtFixedRate(new MaintanenceTask(), 1, 5,
TimeUnit.MINUTES);
}
/**
* {@inheritDoc}
*/
@Override
protected void shutdownInternal() {
unregisterFromEventBus();
unregisterFromBandwidthEventBus();
try {
scheduler.shutdownNow();
} catch (Exception e) {
statusHandler.handle(Priority.WARN,
"Unable to shutdown the scheduler.", e);
} finally {
// TODO: Remove this finally block when fully switched over to Java
// 1.7. See TODO comment in the constructor.
if (scheduler != null
&& scheduler instanceof ScheduledThreadPoolExecutor) {
((ScheduledThreadPoolExecutor) scheduler).getQueue().clear();
}
}
EventBus.unregister(retrievalManager);
}
/**
* Unregister from the {@link EventBus}.
*/
private void unregisterFromEventBus() {
EventBus.unregister(this);
}
/**
* Unregister from the {@link BandwidthEventBus}.
*/
private void unregisterFromBandwidthEventBus() {
BandwidthEventBus.unregister(this);
}
/**
* The callback method for BandwidthEventBus to use to notify
* BandwidthManager that retrievalManager has completed the retrievals for a
* Subscription. The updated BandwidthSubscription Object is placed on the
* BandwidthEventBus.
*
* @param subscription
* The completed subscription.
*/
@Subscribe
public void subscriptionFulfilled(
SubscriptionRetrievalFulfilled subscriptionRetrievalFulfilled) {
statusHandler.info("subscriptionFullfilled() :: "
+ subscriptionRetrievalFulfilled);
SubscriptionRetrieval sr = subscriptionRetrievalFulfilled
.getSubscriptionRetrieval();
List<SubscriptionRetrieval> subscriptionRetrievals = bandwidthDao
.querySubscriptionRetrievals(sr.getBandwidthSubscription());
// Look to see if all the SubscriptionRetrieval's for a subscription are
// completed.
boolean complete = true;
for (SubscriptionRetrieval subscription : subscriptionRetrievals) {
if (!RetrievalStatus.FULFILLED.equals(subscription.getStatus())) {
complete = false;
break;
}
}
if (complete) {
// Remove the completed SubscriptionRetrieval Objects from the
// plan..
RetrievalPlan plan = retrievalManager.getPlan(sr.getNetwork());
plan.remove(sr);
// Schedule the next iteration of the subscription
BandwidthSubscription dao = sr.getBandwidthSubscription();
Subscription subscription = null;
try {
subscription = dao.getSubscription();
} catch (SerializationException e) {
statusHandler.error(
"Failed to extract Subscription from BandwidthSubscription ["
+ dao.getIdentifier() + "]", e);
// No sense in continuing
return;
}
// AdhocSubscriptions are one and done, so don't reschedule.
if (subscription instanceof AdhocSubscription) {
return;
}
Calendar next = BandwidthUtil.copy(dao.getBaseReferenceTime());
// See how far into the future the plan goes..
int days = retrievalManager.getPlan(dao.getRoute()).getPlanDays();
for (int day = 1; day <= days; day++) {
next.add(Calendar.DAY_OF_YEAR, 1);
// Since subscriptions are based on cycles in a day, add one day
// to the
// completed BandwidthSubscription to get the next days
// retrieval.
// Now check if that BandwidthSubscription has already been
// scheduled.
BandwidthSubscription a = bandwidthDao
.getBandwidthSubscription(dao.getRegistryId(), next);
if (a == null) {
// Create the new BandwidthSubscription record with the next
// time..
try {
a = bandwidthDao.newBandwidthSubscription(subscription,
next);
} catch (SerializationException e) {
statusHandler.error(
"Failed to create new BandwidthSubscription from Subscription ["
+ subscription.getId()
+ "] baseReferenceTime ["
+ BandwidthUtil.format(next) + "]", e);
}
schedule(a);
} else {
statusHandler
.info("Subscription ["
+ subscription.getName()
+ "] has already been scheduled for baseReferenceTime ["
+ BandwidthUtil.format(next) + "]");
}
}
}
}
/**
* When a Subscription is removed from the Registry, a RemoveRegistryEvent
* is generated and forwarded to this method to remove the necessary
* BandwidthReservations (and perhaps redefine others).
*
* @param event
*/
@Subscribe
@AllowConcurrentEvents
public void subscriptionRemoved(RemoveRegistryEvent event) {
String objectType = event.getObjectType();
if (objectType != null) {
if (DataDeliveryRegistryObjectTypes.SITE_SUBSCRIPTION
.equals(objectType)
|| DataDeliveryRegistryObjectTypes.SHARED_SUBSCRIPTION
.equals(objectType)) {
statusHandler
.info("Recieved Subscription removal notification for Subscription ["
+ event.getId() + "]");
// Need to locate and remove all BandwidthReservations for the
// given subscription..
List<BandwidthSubscription> l = bandwidthDao
.getBandwidthSubscriptionByRegistryId(event.getId());
if (!l.isEmpty()) {
remove(l);
}
}
}
}
/**
* Create a hook into the EDEX Notification sub-system to receive the the
* necessary InsertRegistryEvents to drive Bandwidth Management.
*
* @param re
* The <code>InsertRegistryEvent</code> Object to evaluate.
*/
@Subscribe
@AllowConcurrentEvents
public void registryEventListener(InsertRegistryEvent re) {
final String objectType = re.getObjectType();
final String id = re.getId();
if (DataDeliveryRegistryObjectTypes.DATASETMETADATA.equals(objectType)) {
DataSetMetaData dsmd = getDataSetMetaData(id);
if (dsmd != null) {
// Repost the Object to the BandwidthEventBus to free
// the notification thread.
// TODO: A hack to prevent rap_f and rap datasets being
// Identified as the
// same dataset...
Matcher matcher = RAP_PATTERN.matcher(dsmd.getUrl());
if (matcher.matches()) {
statusHandler
.info("Found rap_f dataset - updating dataset name from ["
+ dsmd.getDataSetName() + "] to [rap_f]");
dsmd.setDataSetName("rap_f");
}
// TODO: End of hack..
BandwidthEventBus.publish(dsmd);
} else {
statusHandler.error("No DataSetMetaData found for id [" + id
+ "]");
}
}
}
private DataSetMetaData getDataSetMetaData(String id) {
return getRegistryObjectById(dataSetMetaDataHandler, id);
}
private static <T> T getRegistryObjectById(
IRegistryObjectHandler<T> handler, String id) {
try {
return handler.getById(id);
} catch (RegistryHandlerException e) {
statusHandler.error("Error attempting to retrieve RegistryObject["
+ id + "] from Registry.", e);
return null;
}
}
/**
* Process a {@link GriddedDataSetMetaData} that was received from the event
* bus.
*
* @param dataSetMetaData
* the metadadata
*/
@Subscribe
public void updateGriddedDataSetMetaData(
GriddedDataSetMetaData dataSetMetaData) throws ParseException {
// Daily/Hourly/Monthly datasets
if (dataSetMetaData.getCycle() == GriddedDataSetMetaData.NO_CYCLE) {
updateDataSetMetaDataWithoutCycle(dataSetMetaData);
}
// Regular cycle containing datasets
else {
updateDataSetMetaDataWithCycle(dataSetMetaData);
}
}
/**
* Process a {@link PointDataSetMetaData} that was received from the event
* bus.
*
* @param dataSetMetaData
* the metadadata
*/
@Subscribe
public void updatePointDataSetMetaData(PointDataSetMetaData dataSetMetaData) {
// TODO: Change PointDataSetMetaData to only be able to use PointTime
// objects
final PointTime time = (PointTime) dataSetMetaData.getTime();
final String providerName = dataSetMetaData.getProviderName();
final String dataSetName = dataSetMetaData.getDataSetName();
final Date pointTimeStart = time.getStartDate();
final Date pointTimeEnd = time.getEndDate();
final SortedSet<Integer> allowedRefreshIntervals = PointTime
.getAllowedRefreshIntervals();
final long maxAllowedRefreshIntervalInMillis = TimeUtil.MILLIS_PER_MINUTE
* allowedRefreshIntervals.last();
final long minAllowedRefreshIntervalInMillis = TimeUtil.MILLIS_PER_MINUTE
* allowedRefreshIntervals.first();
// Find any retrievals ranging from those with the minimum refresh
// interval to the maximum refresh interval
final Date startDate = new Date(pointTimeStart.getTime()
+ minAllowedRefreshIntervalInMillis);
final Date endDate = new Date(pointTimeEnd.getTime()
+ maxAllowedRefreshIntervalInMillis);
final SortedSet<SubscriptionRetrieval> subscriptionRetrievals = bandwidthDao
.getSubscriptionRetrievals(providerName, dataSetName,
RetrievalStatus.SCHEDULED, startDate, endDate);
if (!CollectionUtil.isNullOrEmpty(subscriptionRetrievals)) {
for (SubscriptionRetrieval retrieval : subscriptionRetrievals) {
// Now check and make sure that at least one of the times falls
// in their retrieval range, their latency is the retrieval
// interval
final int retrievalInterval = retrieval
.getSubscriptionLatency();
// This is the latest time on the data we care about, once the
// retrieval is signaled to go it retrieves everything up to
// its start time
final Date latestRetrievalDataTime = retrieval.getStartTime()
.getTime();
// This is the earliest possible time this retrieval cares about
final Date earliestRetrievalDataTime = new Date(
latestRetrievalDataTime.getTime()
- (TimeUtil.MILLIS_PER_MINUTE * retrievalInterval));
// If the end is before any times we care about or the start is
// after the latest times we care about, skip it
if (pointTimeEnd.before(earliestRetrievalDataTime)
|| pointTimeStart.after(latestRetrievalDataTime)) {
continue;
}
try {
// Update the retrieval times on the subscription object
// which goes through the retrieval process
final Subscription subscription = retrieval
.getSubscription();
subscription.setUrl(dataSetMetaData.getUrl());
subscription.setProvider(dataSetMetaData.getProviderName());
if (subscription.getTime() instanceof PointTime) {
final PointTime subTime = (PointTime) subscription
.getTime();
subTime.setRequestStartAsDate(earliestRetrievalDataTime);
subTime.setRequestEndAsDate(latestRetrievalDataTime);
subTime.setTimes(time.getTimes());
// Now update the retrieval to be ready
retrieval.setStatus(RetrievalStatus.READY);
bandwidthDaoUtil.update(retrieval);
} else {
throw new IllegalArgumentException(
"Subscription time not PointType! "
+ subscription.getName());
}
} catch (SerializationException e) {
statusHandler.handle(Priority.PROBLEM,
e.getLocalizedMessage(), e);
}
}
}
}
/**
* Handles updates for datasets that do not contain cycles.
*
* @param dataSetMetaData
* the dataset metadata
* @throws ParseException
* on parsing errors
*/
private void updateDataSetMetaDataWithoutCycle(
GriddedDataSetMetaData dataSetMetaData) throws ParseException {
bandwidthDao.newBandwidthDataSetUpdate(dataSetMetaData);
// Looking for active subscriptions to the dataset.
try {
List<Subscription> subscriptions = subscriptionHandler
.getActiveByDataSetAndProvider(
dataSetMetaData.getDataSetName(),
dataSetMetaData.getProviderName());
if (subscriptions.isEmpty()) {
return;
}
statusHandler
.info(String
.format("Found [%s] subscriptions that will have an "
+ "adhoc subscription generated and scheduled for url [%s].",
subscriptions.size(),
dataSetMetaData.getUrl()));
// Create an adhoc for each one, and schedule it
for (Subscription subscription : subscriptions) {
Subscription sub = updateSubscriptionWithDataSetMetaData(
subscription, dataSetMetaData);
if (sub instanceof SiteSubscription) {
schedule(new AdhocSubscription((SiteSubscription) sub));
} else {
statusHandler
.warn("Unable to create adhoc queries for shared subscriptions at this point. This functionality should be added in the future...");
}
}
} catch (RegistryHandlerException e) {
statusHandler.handle(Priority.PROBLEM,
"Failed to lookup subscriptions.", e);
}
}
/**
* Handles updates for datasets that contain cycles.
*
* @param dataSetMetaData
* the dataset metadata
* @throws ParseException
* on parsing errors
*/
private void updateDataSetMetaDataWithCycle(
GriddedDataSetMetaData dataSetMetaData) throws ParseException {
BandwidthDataSetUpdate dataset = bandwidthDao
.newBandwidthDataSetUpdate(dataSetMetaData);
// Looking for active subscriptions to the dataset.
List<SubscriptionRetrieval> subscriptions = bandwidthDao
.getSubscriptionRetrievals(dataset.getProviderName(),
dataset.getDataSetName(), dataset.getDataSetBaseTime());
if (!subscriptions.isEmpty()) {
// Loop through the scheduled SubscriptionRetrievals and mark
// the scheduled retrievals as ready for retrieval
for (SubscriptionRetrieval retrieval : subscriptions) {
// TODO: Evaluate the state changes for receiving multiple
// dataset update messages. This seems to be happening
// quite a bit.
if (RetrievalStatus.SCHEDULED.equals(retrieval.getStatus())) {
// Need to update the Subscription Object in the
// SubscriptionRetrieval with the current DataSetMetaData
// URL and time Object
Subscription sub;
try {
sub = updateSubscriptionWithDataSetMetaData(
retrieval.getSubscription(), dataSetMetaData);
// Update the SubscriptionRetrieval record with the new
// data...
retrieval.setSubscription(sub);
} catch (SerializationException e) {
statusHandler
.handle(Priority.PROBLEM,
"Unable to serialize the subscription for the retrieval, skipping...",
e);
continue;
}
retrieval.setStatus(RetrievalStatus.READY);
bandwidthDaoUtil.update(retrieval);
statusHandler
.info(String.format("Updated retrieval [%s] for "
+ "subscription [%s] to use "
+ "url [%s] and "
+ "base reference time [%s]", retrieval
.getIdentifier(), sub.getName(),
dataSetMetaData.getUrl(), BandwidthUtil
.format(sub.getTime()
.getStartDate())));
}
}
// Notify RetrievalAgentManager of updated RetrievalRequests.
retrievalManager.wakeAgents();
} else {
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
statusHandler
.debug("No Subscriptions scheduled for BandwidthDataSetUpdate ["
+ dataset.getIdentifier()
+ "] base time ["
+ BandwidthUtil.format(dataset
.getDataSetBaseTime()) + "]");
}
}
}
/**
* Updates a {@link Subscription) to reflect important attributes of the
* specified {@link DataSetMetaData}.
*
* @param sub
* the subscription
* @param dataSetMetaData
* the datasetmetadata update
* @return the subscription
*/
private static Subscription updateSubscriptionWithDataSetMetaData(
Subscription sub, DataSetMetaData dataSetMetaData) {
final Time dsmdTime = dataSetMetaData.getTime();
final Time subTime = sub.getTime();
dsmdTime.setSelectedTimeIndices(subTime.getSelectedTimeIndices());
dsmdTime.setCycleTimes(subTime.getCycleTimes());
sub.setTime(dsmdTime);
sub.setUrl(dataSetMetaData.getUrl());
return sub;
}
/**
* Signals the bandwidth map localization file is updated, perform a
* reinitialize operation.
*/
private void bandwidthMapConfigurationUpdated() {
IBandwidthRequest request = new IBandwidthRequest();
request.setRequestType(RequestType.REINITIALIZE);
try {
handleRequest(request);
} catch (Exception e) {
statusHandler.handle(Priority.PROBLEM,
"Error while reinitializing the bandwidth manager.", e);
}
}
/**
* Private inner work thread used to keep the RetrievalPlans up to date.
*/
private class MaintanenceTask implements Runnable {
@Override
public void run() {
for (RetrievalPlan plan : retrievalManager.getRetrievalPlans()
.values()) {
plan.resize();
Calendar newEnd = plan.getPlanEnd();
// Find DEFERRED Allocations and load them into the plan...
List<BandwidthAllocation> deferred = bandwidthDao.getDeferred(
plan.getNetwork(), newEnd);
if (!deferred.isEmpty()) {
retrievalManager.schedule(deferred);
}
}
}
}
}

View file

@ -21,17 +21,12 @@ package com.raytheon.uf.edex.datadelivery.bandwidth;
import java.util.List;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import com.raytheon.uf.common.datadelivery.registry.AdhocSubscription;
import com.raytheon.uf.common.datadelivery.registry.Subscription;
import com.raytheon.uf.common.registry.event.InsertRegistryEvent;
import com.raytheon.uf.common.registry.event.RemoveRegistryEvent;
import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthAllocation;
import com.raytheon.uf.edex.datadelivery.bandwidth.interfaces.BandwidthInitializer;
import com.raytheon.uf.edex.datadelivery.bandwidth.interfaces.ISubscriptionAggregator;
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.SubscriptionRetrievalFulfilled;
/**
* Defines the interface of a BandwidthManager.
@ -43,6 +38,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.SubscriptionRetriev
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Oct 30, 2012 1286 djohnson Initial creation
* Jul 10, 2013 2106 djohnson Remove EDEX instance specific methods.
*
* </pre>
*
@ -52,27 +48,6 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.SubscriptionRetriev
public interface IBandwidthManager {
/**
* Create a hook into the EDEX Notification sub-system to receive the the
* necessary InsertRegistryEvents to drive Bandwidth Management.
*
* @param re
* The <code>InsertRegistryEvent</code> Object to evaluate.
*/
@Subscribe
void registryEventListener(InsertRegistryEvent re);
/**
* When a Subscription is removed from the Registry, a RemoveRegistryEvent
* is generated and forwarded to this method to remove the necessary
* BandwidthReservations (and perhaps redefine others).
*
* @param event
*/
@Subscribe
@AllowConcurrentEvents
void subscriptionRemoved(RemoveRegistryEvent event);
/**
* Schedule all cycles of a Subscription.
*
@ -99,7 +74,6 @@ public interface IBandwidthManager {
* @return
* @throws SerializationException
*/
@Subscribe
List<BandwidthAllocation> subscriptionUpdated(Subscription subscription)
throws SerializationException;
@ -110,19 +84,6 @@ public interface IBandwidthManager {
*/
List<BandwidthAllocation> adhocSubscription(AdhocSubscription adhoc);
/**
* The callback method for BandwidthEventBus to use to notify
* BandwidthManager that retrievalManager has completed the retrievals for a
* Subscription. The updated BandwidthSubscription Object is placed on the
* BandwidthEventBus.
*
* @param subscription
* The completed subscription.
*/
@Subscribe
void subscriptionFulfilled(
SubscriptionRetrievalFulfilled subscriptionRetrievalFulfilled);
void setAggregator(ISubscriptionAggregator aggregator);
ISubscriptionAggregator getAggregator();

View file

@ -31,6 +31,7 @@ import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.raytheon.uf.common.datadelivery.registry.DataSetMetaData;
import com.raytheon.uf.common.datadelivery.registry.Network;
@ -40,7 +41,6 @@ import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.util.IDeepCopyable;
import com.raytheon.uf.common.util.ReflectionUtil;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthAllocation;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthDataSetUpdate;
@ -64,6 +64,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil;
* Dec 12, 2012 1286 djohnson Use concurrent lists to avoid concurrent modification exceptions.
* Jun 03, 2013 2038 djohnson Add method to get subscription retrievals by provider, dataset, and status.
* Jun 13, 2013 2095 djohnson Implement ability to store a collection of subscriptions.
* Jul 09, 2013 2106 djohnson Rather than copy all elements and remove unnecessary, just copy the ones that apply.
*
* </pre>
*
@ -90,19 +91,16 @@ class InMemoryBandwidthDao implements IBandwidthDao {
*/
@Override
public List<BandwidthAllocation> getBandwidthAllocations(Long subscriptionId) {
List<BandwidthAllocation> allocations = clone(bandwidthAllocations);
List<BandwidthAllocation> allocations = new ArrayList<BandwidthAllocation>();
for (Iterator<BandwidthAllocation> iter = allocations.iterator(); iter
.hasNext();) {
BandwidthAllocation current = iter.next();
for (BandwidthAllocation current : bandwidthAllocations) {
if ((current instanceof SubscriptionRetrieval)
&& ((SubscriptionRetrieval) current)
.getBandwidthSubscription().getId() == subscriptionId) {
continue;
allocations.add(current.copy());
}
iter.remove();
}
return allocations;
}
@ -111,32 +109,15 @@ class InMemoryBandwidthDao implements IBandwidthDao {
*/
@Override
public List<BandwidthAllocation> getBandwidthAllocations(Network network) {
List<BandwidthAllocation> results = clone(bandwidthAllocations);
List<BandwidthAllocation> allocations = new ArrayList<BandwidthAllocation>();
for (Iterator<BandwidthAllocation> iter = results.iterator(); iter
.hasNext();) {
BandwidthAllocation current = iter.next();
if (network.equals(current.getNetwork())) {
continue;
for (BandwidthAllocation current : bandwidthAllocations) {
if (current.getNetwork() == network) {
allocations.add(current.copy());
}
iter.remove();
}
return results;
}
/**
* @param sourceList
* @return
*/
private static <T extends IDeepCopyable<T>> ArrayList<T> clone(
ConcurrentLinkedQueue<T> sourceList) {
ArrayList<T> results = new ArrayList<T>(sourceList.size());
for (T instance : sourceList) {
results.add(instance.copy());
}
return results;
return allocations;
}
/**
@ -145,17 +126,15 @@ class InMemoryBandwidthDao implements IBandwidthDao {
@Override
public List<BandwidthAllocation> getBandwidthAllocationsInState(
RetrievalStatus state) {
List<BandwidthAllocation> results = clone(bandwidthAllocations);
for (Iterator<BandwidthAllocation> iter = results.iterator(); iter
.hasNext();) {
BandwidthAllocation current = iter.next();
if (state.equals(current.getStatus())) {
continue;
}
List<BandwidthAllocation> allocations = new ArrayList<BandwidthAllocation>();
iter.remove();
for (BandwidthAllocation current : bandwidthAllocations) {
if (state.equals(current.getStatus())) {
allocations.add(current.copy());
}
}
return results;
return allocations;
}
/**
@ -164,17 +143,13 @@ class InMemoryBandwidthDao implements IBandwidthDao {
@Override
public List<BandwidthDataSetUpdate> getBandwidthDataSetUpdate(
String providerName, String dataSetName) {
ArrayList<BandwidthDataSetUpdate> results = clone(bandwidthDataSetUpdates);
List<BandwidthDataSetUpdate> results = new ArrayList<BandwidthDataSetUpdate>();
for (Iterator<BandwidthDataSetUpdate> iter = results.iterator(); iter
.hasNext();) {
BandwidthDataSetUpdate current = iter.next();
for (BandwidthDataSetUpdate current : bandwidthDataSetUpdates) {
if (providerName.equals(current.getProviderName())
&& dataSetName.equals(current.getDataSetName())) {
continue;
results.add(current.copy());
}
iter.remove();
}
return results;
@ -208,19 +183,18 @@ class InMemoryBandwidthDao implements IBandwidthDao {
@Override
public List<BandwidthAllocation> getDeferred(Network network,
Calendar endTime) {
List<BandwidthAllocation> results = getBandwidthAllocations(network);
for (Iterator<BandwidthAllocation> iter = results.iterator(); iter
.hasNext();) {
BandwidthAllocation current = iter.next();
if (RetrievalStatus.DEFERRED.equals(current.getStatus())
&& !current.getEndTime().after(endTime)) {
continue;
}
iter.remove();
List<BandwidthAllocation> allocations = new ArrayList<BandwidthAllocation>();
for (BandwidthAllocation current : bandwidthAllocations) {
if (network == current.getNetwork()
&& RetrievalStatus.DEFERRED.equals(current.getStatus())
&& !current.getEndTime().after(endTime)) {
allocations.add(current.copy());
}
}
return results;
return allocations;
}
/**
@ -228,10 +202,9 @@ class InMemoryBandwidthDao implements IBandwidthDao {
*/
@Override
public BandwidthSubscription getBandwidthSubscription(long identifier) {
ArrayList<BandwidthSubscription> bandwidthSubscriptions = clone(this.bandwidthSubscriptions);
for (BandwidthSubscription dao : bandwidthSubscriptions) {
if (dao.getIdentifier() == identifier) {
return dao;
return dao.copy();
}
}
return null;
@ -268,15 +241,12 @@ class InMemoryBandwidthDao implements IBandwidthDao {
@Override
public List<BandwidthSubscription> getBandwidthSubscriptionByRegistryId(
String registryId) {
final ArrayList<BandwidthSubscription> results = clone(bandwidthSubscriptions);
for (Iterator<BandwidthSubscription> iter = results.iterator(); iter
.hasNext();) {
final BandwidthSubscription current = iter.next();
if (registryId.equals(current.getRegistryId())) {
continue;
}
final List<BandwidthSubscription> results = Lists.newArrayList();
iter.remove();
for (BandwidthSubscription current : bandwidthSubscriptions) {
if (registryId.equals(current.getRegistryId())) {
results.add(current.copy());
}
}
return results;
}
@ -286,11 +256,10 @@ class InMemoryBandwidthDao implements IBandwidthDao {
*/
@Override
public SubscriptionRetrieval getSubscriptionRetrieval(long identifier) {
ArrayList<BandwidthAllocation> clone = clone(bandwidthAllocations);
for (BandwidthAllocation current : clone) {
for (BandwidthAllocation current : bandwidthAllocations) {
if (current.getId() == identifier
&& current instanceof SubscriptionRetrieval) {
return (SubscriptionRetrieval) current;
return ((SubscriptionRetrieval) current).copy();
}
}
return null;
@ -302,21 +271,30 @@ class InMemoryBandwidthDao implements IBandwidthDao {
@Override
public List<SubscriptionRetrieval> getSubscriptionRetrievals(
String provider, String dataSetName, Calendar baseReferenceTime) {
List<SubscriptionRetrieval> results = new ArrayList<SubscriptionRetrieval>(
getSubscriptionRetrievals(provider, dataSetName));
List<BandwidthSubscription> subscriptionsMatching = getBandwidthSubscriptions(
provider, dataSetName, baseReferenceTime);
List<SubscriptionRetrieval> results = Lists.newArrayList();
OUTER: for (Iterator<SubscriptionRetrieval> iter = results.iterator(); iter
.hasNext();) {
SubscriptionRetrieval current = iter.next();
for (BandwidthSubscription subscription : subscriptionsMatching) {
if (current.getBandwidthSubscription().getId() == subscription
.getIdentifier()) {
continue OUTER;
for (BandwidthAllocation current : bandwidthAllocations) {
if (current instanceof SubscriptionRetrieval) {
Subscription subscription;
try {
final SubscriptionRetrieval subscriptionRetrieval = (SubscriptionRetrieval) current;
subscription = subscriptionRetrieval.getSubscription();
if (provider.equals(subscription.getProvider())
&& dataSetName
.equals(subscription.getDataSetName())
&& baseReferenceTime.getTimeInMillis() == subscriptionRetrieval
.getBandwidthSubscription()
.getBaseReferenceTime().getTimeInMillis()) {
results.add(subscriptionRetrieval.copy());
}
} catch (SerializationException e) {
statusHandler
.handle(Priority.PROBLEM,
"Unable to deserialize the retrieval's subscription, skipping it...",
e);
}
}
iter.remove();
}
return results;
@ -328,30 +306,24 @@ class InMemoryBandwidthDao implements IBandwidthDao {
@Override
public List<SubscriptionRetrieval> getSubscriptionRetrievals(
String provider, String dataSetName) {
ArrayList<BandwidthAllocation> clone = clone(bandwidthAllocations);
List<SubscriptionRetrieval> results = new ArrayList<SubscriptionRetrieval>(
bandwidthAllocations.size());
List<SubscriptionRetrieval> results = Lists.newArrayList();
for (Iterator<BandwidthAllocation> iter = clone.iterator(); iter
.hasNext();) {
BandwidthAllocation current = iter.next();
for (BandwidthAllocation current : bandwidthAllocations) {
if (current instanceof SubscriptionRetrieval) {
Subscription subscription;
try {
subscription = ((SubscriptionRetrieval) current)
.getSubscription();
final SubscriptionRetrieval subscriptionRetrieval = (SubscriptionRetrieval) current;
subscription = subscriptionRetrieval.getSubscription();
if (provider.equals(subscription.getProvider())
&& dataSetName
.equals(subscription.getDataSetName())) {
results.add((SubscriptionRetrieval) current);
results.add(subscriptionRetrieval.copy());
}
} catch (SerializationException e) {
statusHandler
.handle(Priority.PROBLEM,
"Unable to deserialize the retrieval's subscription, removing it...",
"Unable to deserialize the retrieval's subscription, skipping it...",
e);
iter.remove();
continue;
}
}
@ -365,7 +337,11 @@ class InMemoryBandwidthDao implements IBandwidthDao {
*/
@Override
public List<BandwidthSubscription> getBandwidthSubscriptions() {
return clone(bandwidthSubscriptions);
List<BandwidthSubscription> results = Lists.newArrayList();
for (BandwidthSubscription subscription : bandwidthSubscriptions) {
results.add(subscription.copy());
}
return results;
}
/**
@ -374,18 +350,16 @@ class InMemoryBandwidthDao implements IBandwidthDao {
@Override
public List<BandwidthSubscription> getBandwidthSubscriptions(
String provider, String dataSetName, Calendar baseReferenceTime) {
List<BandwidthSubscription> bandwidthSubscriptions = getBandwidthSubscriptions();
List<BandwidthSubscription> bandwidthSubscriptions = Lists
.newArrayList();
for (Iterator<BandwidthSubscription> iter = bandwidthSubscriptions
.iterator(); iter.hasNext();) {
BandwidthSubscription current = iter.next();
for (BandwidthSubscription current : this.bandwidthSubscriptions) {
if (provider.equals(current.getProvider())
&& dataSetName.equals(current.getDataSetName())
&& baseReferenceTime.getTimeInMillis() == current
.getBaseReferenceTime().getTimeInMillis()) {
continue;
bandwidthSubscriptions.add(current.copy());
}
iter.remove();
}
return bandwidthSubscriptions;
@ -428,17 +402,13 @@ class InMemoryBandwidthDao implements IBandwidthDao {
@Override
public List<SubscriptionRetrieval> querySubscriptionRetrievals(
long subscriptionId) {
ArrayList<BandwidthAllocation> clone = clone(bandwidthAllocations);
List<SubscriptionRetrieval> results = new ArrayList<SubscriptionRetrieval>(
bandwidthAllocations.size());
List<SubscriptionRetrieval> results = new ArrayList<SubscriptionRetrieval>();
for (Iterator<BandwidthAllocation> iter = clone.iterator(); iter
.hasNext();) {
BandwidthAllocation current = iter.next();
for (BandwidthAllocation current : bandwidthAllocations) {
if (current instanceof SubscriptionRetrieval) {
if (((SubscriptionRetrieval) current)
.getBandwidthSubscription().getId() == subscriptionId) {
results.add((SubscriptionRetrieval) current);
final SubscriptionRetrieval subscriptionRetrieval = (SubscriptionRetrieval) current;
if (subscriptionRetrieval.getBandwidthSubscription().getId() == subscriptionId) {
results.add(subscriptionRetrieval.copy());
}
}
}
@ -567,14 +537,27 @@ class InMemoryBandwidthDao implements IBandwidthDao {
public SortedSet<SubscriptionRetrieval> getSubscriptionRetrievals(
String provider, String dataSetName, RetrievalStatus status) {
final List<SubscriptionRetrieval> subscriptionRetrievals = getSubscriptionRetrievals(
provider, dataSetName);
List<SubscriptionRetrieval> results = Lists.newArrayList();
for (BandwidthAllocation current : bandwidthAllocations) {
if (current instanceof SubscriptionRetrieval) {
Subscription subscription;
try {
final SubscriptionRetrieval subscriptionRetrieval = (SubscriptionRetrieval) current;
subscription = subscriptionRetrieval.getSubscription();
if (provider.equals(subscription.getProvider())
&& dataSetName
.equals(subscription.getDataSetName())
&& status.equals(subscriptionRetrieval.getStatus())) {
results.add(subscriptionRetrieval.copy());
}
} catch (SerializationException e) {
statusHandler
.handle(Priority.PROBLEM,
"Unable to deserialize the retrieval's subscription, skipping it...",
e);
}
for (Iterator<SubscriptionRetrieval> iter = subscriptionRetrievals
.iterator(); iter.hasNext();) {
SubscriptionRetrieval subRetrieval = iter.next();
if (!status.equals(subRetrieval.getStatus())) {
iter.remove();
}
}
@ -587,7 +570,7 @@ class InMemoryBandwidthDao implements IBandwidthDao {
}
});
treeSet.addAll(subscriptionRetrievals);
treeSet.addAll(results);
return treeSet;
}
@ -600,19 +583,50 @@ class InMemoryBandwidthDao implements IBandwidthDao {
String provider, String dataSetName, RetrievalStatus status,
Date earliestDate, Date latestDate) {
SortedSet<SubscriptionRetrieval> results = getSubscriptionRetrievals(
provider, dataSetName, status);
List<SubscriptionRetrieval> results = Lists.newArrayList();
for (BandwidthAllocation current : bandwidthAllocations) {
if (current instanceof SubscriptionRetrieval) {
Subscription subscription;
try {
final SubscriptionRetrieval subscriptionRetrieval = (SubscriptionRetrieval) current;
subscription = subscriptionRetrieval.getSubscription();
final Date subRetrievalStartTime = subscriptionRetrieval
.getStartTime().getTime();
final boolean withinTimeLimits = !(earliestDate
.after(subRetrievalStartTime) || latestDate
.before(subRetrievalStartTime));
if (provider.equals(subscription.getProvider())
&& dataSetName
.equals(subscription.getDataSetName())
&& status.equals(subscriptionRetrieval.getStatus())
&& withinTimeLimits) {
results.add(subscriptionRetrieval.copy());
}
} catch (SerializationException e) {
statusHandler
.handle(Priority.PROBLEM,
"Unable to deserialize the retrieval's subscription, skipping it...",
e);
}
for (Iterator<SubscriptionRetrieval> iter = results.iterator(); iter
.hasNext();) {
SubscriptionRetrieval subRetrieval = iter.next();
if (earliestDate.after(subRetrieval.getStartTime().getTime())
|| latestDate.before(subRetrieval.getStartTime().getTime())) {
iter.remove();
}
}
return results;
final TreeSet<SubscriptionRetrieval> treeSet = Sets
.newTreeSet(new Comparator<SubscriptionRetrieval>() {
@Override
public int compare(SubscriptionRetrieval o1,
SubscriptionRetrieval o2) {
return o1.getStartTime().compareTo(o2.getStartTime());
}
});
treeSet.addAll(results);
return treeSet;
}
/**
@ -620,15 +634,12 @@ class InMemoryBandwidthDao implements IBandwidthDao {
*/
@Override
public List<SubscriptionRetrieval> getSubscriptionRetrievals() {
ArrayList<BandwidthAllocation> clone = clone(bandwidthAllocations);
List<SubscriptionRetrieval> results = new ArrayList<SubscriptionRetrieval>(
bandwidthAllocations.size());
for (Iterator<BandwidthAllocation> iter = clone.iterator(); iter
.hasNext();) {
BandwidthAllocation current = iter.next();
for (BandwidthAllocation current : bandwidthAllocations) {
if (current instanceof SubscriptionRetrieval) {
results.add((SubscriptionRetrieval) current);
results.add(((SubscriptionRetrieval) current).copy());
}
}
return results;
@ -640,15 +651,17 @@ class InMemoryBandwidthDao implements IBandwidthDao {
@Override
public List<BandwidthAllocation> getBandwidthAllocationsForNetworkAndBucketStartTime(
Network network, long bucketStartTime) {
final List<BandwidthAllocation> bandwidthAllocations = getBandwidthAllocations(network);
for (Iterator<BandwidthAllocation> iter = bandwidthAllocations
.iterator(); iter.hasNext();) {
final BandwidthAllocation allocation = iter.next();
if (allocation.getBandwidthBucket() != bucketStartTime) {
iter.remove();
List<BandwidthAllocation> allocations = new ArrayList<BandwidthAllocation>();
for (BandwidthAllocation current : bandwidthAllocations) {
if (current.getNetwork() == network
&& current.getBandwidthBucket() == bucketStartTime) {
allocations.add(current.copy());
}
}
return bandwidthAllocations;
return allocations;
}
}

View file

@ -50,6 +50,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthDaoUtil;
* Feb 27, 2013 1644 djohnson Schedule SBN subscriptions.
* Apr 16, 2013 1906 djohnson Implements RegistryInitializedListener.
* Jun 25, 2013 2106 djohnson init() now takes a {@link RetrievalManager} as well.
* Jul 09, 2013 2106 djohnson Add shutdownInternal().
*
* </pre>
*
@ -61,8 +62,9 @@ class InMemoryBandwidthManager extends BandwidthManager {
private static final IUFStatusHandler statusHandler = UFStatus
.getHandler(InMemoryBandwidthManager.class);
// TODO DPJ: The NCF and WFO bandwidth managers probably each need an
// in-memory version
// NOTE: NEVER add the bandwidth-datadelivery-eventbus.xml file to this
// array, in-memory versions should not coordinate with the event bus in any
// fashion
public static final String[] IN_MEMORY_BANDWIDTH_MANAGER_FILES = new String[] {
JarUtil.getResResourcePath("/spring/bandwidth-datadelivery-inmemory-impl.xml"),
JarUtil.getResResourcePath("/spring/bandwidth-datadelivery.xml"),
@ -145,4 +147,12 @@ class InMemoryBandwidthManager extends BandwidthManager {
return scheduleSubscriptions(subscriptions);
}
/**
* {@inheritDoc}
*/
@Override
protected void shutdownInternal() {
// Nothing to do for in-memory version
}
}

View file

@ -25,6 +25,8 @@ import java.util.Set;
import com.raytheon.uf.common.datadelivery.bandwidth.IBandwidthService;
import com.raytheon.uf.common.datadelivery.bandwidth.IProposeScheduleResponse;
import com.raytheon.uf.common.datadelivery.registry.Subscription;
import com.raytheon.uf.common.datadelivery.registry.handlers.IDataSetMetaDataHandler;
import com.raytheon.uf.common.datadelivery.registry.handlers.ISubscriptionHandler;
import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.common.util.JarUtil;
import com.raytheon.uf.edex.datadelivery.bandwidth.EdexBandwidthContextFactory.IEdexBandwidthManagerCreator;
@ -46,6 +48,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthDaoUtil;
* Feb 27, 2013 1644 djohnson Schedule SBN subscriptions by routing to the NCF bandwidth manager.
* Mar 11, 2013 1645 djohnson Add missing Spring file.
* May 15, 2013 2000 djohnson Include daos.
* Jul 10, 2013 2106 djohnson Dependency inject registry handlers.
*
* </pre>
*
@ -57,7 +60,7 @@ public class WfoBandwidthManagerCreator implements IEdexBandwidthManagerCreator
/**
* WFO {@link BandwidthManager} implementation.
*/
static class WfoBandwidthManager extends BandwidthManager {
static class WfoBandwidthManager extends EdexBandwidthManager {
private static final String[] WFO_BANDWIDTH_MANAGER_FILES = new String[] {
JarUtil.getResResourcePath("/spring/bandwidth-datadelivery-wfo-edex-impl.xml"),
@ -81,8 +84,11 @@ public class WfoBandwidthManagerCreator implements IEdexBandwidthManagerCreator
*/
public WfoBandwidthManager(IBandwidthDbInit dbInit,
IBandwidthDao bandwidthDao, RetrievalManager retrievalManager,
BandwidthDaoUtil bandwidthDaoUtil) {
super(dbInit, bandwidthDao, retrievalManager, bandwidthDaoUtil);
BandwidthDaoUtil bandwidthDaoUtil,
IDataSetMetaDataHandler dataSetMetaDataHandler,
ISubscriptionHandler subscriptionHandler) {
super(dbInit, bandwidthDao, retrievalManager, bandwidthDaoUtil,
dataSetMetaDataHandler, subscriptionHandler);
}
@Override
@ -131,9 +137,11 @@ public class WfoBandwidthManagerCreator implements IEdexBandwidthManagerCreator
@Override
public IBandwidthManager getBandwidthManager(IBandwidthDbInit dbInit,
IBandwidthDao bandwidthDao, RetrievalManager retrievalManager,
BandwidthDaoUtil bandwidthDaoUtil) {
BandwidthDaoUtil bandwidthDaoUtil,
IDataSetMetaDataHandler dataSetMetaDataHandler,
ISubscriptionHandler subscriptionHandler) {
return new WfoBandwidthManager(dbInit, bandwidthDao, retrievalManager,
bandwidthDaoUtil);
bandwidthDaoUtil, dataSetMetaDataHandler, subscriptionHandler);
}
}

View file

@ -25,9 +25,7 @@ import java.util.concurrent.Executors;
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.edex.core.EDEXUtil;
/**
* Creates asynchronous Google event buses.
@ -41,6 +39,7 @@ import com.raytheon.uf.edex.core.EDEXUtil;
* Dec 11, 2012 1286 djohnson Initial creation
* Feb 06, 2013 1543 djohnson Changes to correspond with EventBus changes.
* May 28, 2013 1650 djohnson Changes to match functionality in general event bus handling.
* Jul 09, 2013 2106 djohnson No Spring required to get thread pool sizes, remove subscriptionBus.
*
* </pre>
*
@ -48,31 +47,25 @@ import com.raytheon.uf.edex.core.EDEXUtil;
* @version 1.0
*/
public class BandwidthAsyncEventBusFactory implements BandwidthEventBusFactory {
private static final IUFStatusHandler statusHandler = UFStatus
.getHandler(BandwidthAsyncEventBusFactory.class);
private final AsyncEventBus dataSetBus;
private final AsyncEventBus subscriptionBus;
private final AsyncEventBus retrievalBus;
public BandwidthAsyncEventBusFactory() {
BandwidthEventBusConfig config = (BandwidthEventBusConfig) EDEXUtil
.getESBComponent("BandwidthEventBusConfig");
// If no bean was defined, use the defaults defined in the
// class.
if (config == null) {
statusHandler
.info("No BandwidthEventBusConfig defined. Using defaults.");
config = new BandwidthEventBusConfig();
}
dataSetBus = new AsyncEventBus(Executors.newFixedThreadPool(config
.getDataSetMetaDataPoolSize()));
subscriptionBus = new AsyncEventBus(Executors.newFixedThreadPool(config
.getSubscriptionPoolSize()));
retrievalBus = new AsyncEventBus(Executors.newFixedThreadPool(config
.getRetrievalPoolSize()));
BandwidthEventBusConfig config = new BandwidthEventBusConfig();
final int dataSetMetaDataPoolSize = config.getDataSetMetaDataPoolSize();
final int retrievalPoolSize = config.getRetrievalPoolSize();
UFStatus.getHandler(BandwidthAsyncEventBusFactory.class)
.info(String
.format("Creating event bus with dataSetMetaDataPoolSize [%s] retrievalPoolSize [%s].",
dataSetMetaDataPoolSize, retrievalPoolSize));
dataSetBus = new AsyncEventBus(
Executors.newFixedThreadPool(dataSetMetaDataPoolSize));
retrievalBus = new AsyncEventBus(
Executors.newFixedThreadPool(retrievalPoolSize));
}
/**
@ -83,14 +76,6 @@ public class BandwidthAsyncEventBusFactory implements BandwidthEventBusFactory {
return dataSetBus;
}
/**
* {@inheritDoc}
*/
@Override
public EventBus getSubscriptionBus() {
return subscriptionBus;
}
/**
* {@inheritDoc}
*/
@ -104,8 +89,7 @@ public class BandwidthAsyncEventBusFactory implements BandwidthEventBusFactory {
*/
@Override
public List<EventBus> getEventBuses() {
return Arrays.<EventBus> asList(dataSetBus, retrievalBus,
subscriptionBus);
return Arrays.<EventBus> asList(dataSetBus, retrievalBus);
}
}

View file

@ -10,6 +10,7 @@ package com.raytheon.uf.edex.datadelivery.bandwidth.notification;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Jul 3, 2012 0726 jspinks Initial creation
* Jul 09, 2013 2106 djohnson No Spring required to get thread pool sizes, remove subscriptionBus.
*
* </pre>
*
@ -19,11 +20,11 @@ package com.raytheon.uf.edex.datadelivery.bandwidth.notification;
public class BandwidthEventBusConfig {
// Set reasonable default values
private int dataSetMetaDataPoolSize = 2;
private static final int dataSetMetaDataPoolSize = Integer.getInteger(
"bandwidth.dataSetMetaDataPoolSize", 2);
private int retrievalPoolSize = 3;
private int subscriptionPoolSize = 2;
private static final int retrievalPoolSize = Integer.getInteger(
"bandwidth.retrievalPoolSize", 3);
/**
* Get attribute dataSetMetaDataPoolSize.
@ -42,43 +43,4 @@ public class BandwidthEventBusConfig {
public int getRetrievalPoolSize() {
return retrievalPoolSize;
}
/**
* Get attribute subscriptionPoolSize.
*
* @return The value of attribute subscriptionPoolSize.
*/
public int getSubscriptionPoolSize() {
return subscriptionPoolSize;
}
/**
* Set the dataSetMetaDataPoolSize.
*
* @param dataSetMetaDataPoolSize
* The value to set attribute dataSetMetaDataPoolSize to.
*/
public void setDataSetMetaDataPoolSize(int dataSetMetaDataPoolSize) {
this.dataSetMetaDataPoolSize = dataSetMetaDataPoolSize;
}
/**
* Set the retrievalPoolSize.
*
* @param retrievalPoolSize
* The value to set attribute retrievalPoolSize to.
*/
public void setRetrievalPoolSize(int retrievalPoolSize) {
this.retrievalPoolSize = retrievalPoolSize;
}
/**
* Set the subscriptionPoolSize.
*
* @param subscriptionPoolSize
* The value to set attribute subscriptionPoolSize to.
*/
public void setSubscriptionPoolSize(int subscriptionPoolSize) {
this.subscriptionPoolSize = subscriptionPoolSize;
}
}

View file

@ -33,6 +33,7 @@ import com.raytheon.uf.edex.event.GoogleEventBusFactory;
* ------------ ---------- ----------- --------------------------
* Dec 11, 2012 djohnson Initial creation
* May 28, 2013 1650 djohnson Returns the event buses required by extending GoogleEventBusFactory.
* Jul 09, 2013 2106 djohnson Remove subscriptionBus.
*
* </pre>
*
@ -47,11 +48,6 @@ interface BandwidthEventBusFactory extends GoogleEventBusFactory {
*/
EventBus getDataSetBus();
/**
* Get the subscription bus.
*/
EventBus getSubscriptionBus();
/**
* Get the retrieval bus.
*/

View file

@ -20,9 +20,6 @@
package com.raytheon.uf.edex.datadelivery.bandwidth.notification;
import com.raytheon.uf.common.datadelivery.registry.DataSetMetaData;
import com.raytheon.uf.common.datadelivery.registry.Subscription;
import com.raytheon.uf.common.registry.event.RemoveRegistryEvent;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.SubscriptionRetrieval;
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.SubscriptionRetrievalFulfilled;
import com.raytheon.uf.edex.event.BaseEdexEventBusHandler;
@ -36,6 +33,7 @@ import com.raytheon.uf.edex.event.BaseEdexEventBusHandler;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* May 28, 2013 1650 djohnson Extracted from {@link BandwidthEventBus}.
* Jul 09, 2013 2106 djohnson Remove subscriptionBus.
*
* </pre>
*
@ -48,8 +46,6 @@ public class EdexBandwidthEventBusHandler extends
private final com.google.common.eventbus.EventBus dataSetBus;
private final com.google.common.eventbus.EventBus subscriptionBus;
private final com.google.common.eventbus.EventBus retrievalBus;
/**
@ -68,7 +64,6 @@ public class EdexBandwidthEventBusHandler extends
EdexBandwidthEventBusHandler(BandwidthEventBusFactory eventBusFactory) {
super(eventBusFactory);
this.dataSetBus = eventBusFactory.getDataSetBus();
this.subscriptionBus = eventBusFactory.getSubscriptionBus();
this.retrievalBus = eventBusFactory.getRetrievalBus();
}
@ -78,16 +73,10 @@ public class EdexBandwidthEventBusHandler extends
@Override
protected void publishInternal(Object object) {
if (object instanceof SubscriptionRetrieval) {
if (object instanceof SubscriptionRetrievalFulfilled) {
retrievalBus.post(object);
} else if (object instanceof SubscriptionRetrievalFulfilled) {
subscriptionBus.post(object);
} else if (object instanceof DataSetMetaData) {
dataSetBus.post(object);
} else if (object instanceof Subscription) {
subscriptionBus.post(object);
} else if (object instanceof RemoveRegistryEvent) {
subscriptionBus.post(object);
} else {
throw new IllegalArgumentException("Object type ["
+ object.getClass().getName()

View file

@ -1,90 +0,0 @@
package com.raytheon.uf.edex.datadelivery.bandwidth.processing;
import java.util.ArrayList;
import com.raytheon.uf.common.datadelivery.registry.Provider;
import com.raytheon.uf.common.datadelivery.registry.Subscription;
import com.raytheon.uf.common.datadelivery.registry.SubscriptionBundle;
import com.raytheon.uf.common.datadelivery.registry.handlers.DataDeliveryHandlers;
import com.raytheon.uf.common.registry.handler.RegistryHandlerException;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.edex.datadelivery.bandwidth.interfaces.IProcessSubscription;
/**
* Process Available Subscriptions for bundling
*
* <pre>
*
* SOFTWARE HISTORY
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Feb 07, 2012 dhladky Initial creation
* Jun 21, 2012 736 djohnson Change OPERATION_STATUS to OperationStatus.
* Aug 20, 2012 0743 djohnson Finish making registry type-safe.
* Oct 05, 2012 1241 djohnson Replace RegistryManager calls with registry handler calls.
*
* </pre>
*
* @author dhladky
* @version 1.0
*/
public class Processor implements IProcessSubscription {
private static final IUFStatusHandler statusHandler = UFStatus
.getHandler(Processor.class);
public Processor() {
}
@Override
public ArrayList<SubscriptionBundle> process(
ArrayList<Subscription> subscriptions) {
// either a new subscription/adhoc or all subscriptions for a given
// dataset
ArrayList<SubscriptionBundle> bundles = null;
// TODO: When we start aggregating the bundles will matter
if (subscriptions != null) {
bundles = new ArrayList<SubscriptionBundle>(subscriptions.size());
for (Subscription sub : subscriptions) {
statusHandler.info("Processing Subscription NAME: "
+ sub.getDescription() + " DATASET: "
+ sub.getDataSetName());
SubscriptionBundle bundle = new SubscriptionBundle();
Provider provider = getProvider(sub.getProvider());
bundle.setBundleId(sub.getSubscriptionId());
bundle.setPriority(1);
bundle.setProvider(provider);
bundle.setConnection(provider.getConnection());
bundle.setSubscription(sub);
// when aggregated set source subscriptions to bundle
bundles.add(bundle);
}
} else {
statusHandler.info("No mature subscriptions available.");
}
return bundles;
}
public Provider getProvider(String providerName) {
try {
return DataDeliveryHandlers.getProviderHandler().getByName(
providerName);
} catch (RegistryHandlerException e) {
statusHandler.handle(Priority.PROBLEM,
"Unable to look up the provider by its name.", e);
return null;
}
}
}

View file

@ -9,7 +9,6 @@ import java.util.TreeMap;
import com.google.common.eventbus.Subscribe;
import com.raytheon.uf.common.datadelivery.registry.Network;
import com.raytheon.uf.common.event.EventBus;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.time.util.TimeUtil;
@ -38,6 +37,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.RetrievalManagerNotifyEvent;
* 3/13/2013 1802 bphillip Moved event bus registration from post-construct to spring static method call
* Jun 13, 2013 2095 djohnson Can schedule any subclass of BandwidthAllocation.
* Jun 25, 2013 2106 djohnson Copy state from another instance, add ability to check for proposed bandwidth throughput changes.
* Jul 09, 2013 2106 djohnson Only needs to unregister from the EventBus when used in an EDEX instance, so handled in EdexBandwidthManager.
*
* </pre>
*
@ -218,7 +218,6 @@ public class RetrievalManager {
* Shutdown the retrieval manager.
*/
public void shutdown() {
EventBus.unregister(this);
// From this point forward, only return a poison pill for this retrieval
// manager, which will cause threads attempting to receive bandwidth
// allocations to die

View file

@ -13,7 +13,7 @@ import com.raytheon.uf.common.datadelivery.registry.Provider;
import com.raytheon.uf.common.datadelivery.registry.ProviderType;
import com.raytheon.uf.common.datadelivery.registry.Subscription;
import com.raytheon.uf.common.datadelivery.registry.SubscriptionBundle;
import com.raytheon.uf.common.datadelivery.registry.handlers.DataDeliveryHandlers;
import com.raytheon.uf.common.datadelivery.registry.handlers.IProviderHandler;
import com.raytheon.uf.common.datadelivery.retrieval.xml.Retrieval;
import com.raytheon.uf.common.event.EventBus;
import com.raytheon.uf.common.registry.handler.RegistryHandlerException;
@ -49,6 +49,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord;
* Jan 30, 2013 1543 djohnson Should not implement IRetrievalHandler.
* Feb 05, 2013 1580 mpduff EventBus refactor.
* Jun 24, 2013 2106 djohnson Set actual start time when sending to retrieval rather than overwrite scheduled start.
* Jul 09, 2013 2106 djohnson Dependency inject registry handlers.
*
* </pre>
*
@ -68,14 +69,17 @@ public class SubscriptionRetrievalAgent extends
private final IRetrievalDao retrievalDao;
private final IProviderHandler providerHandler;
public SubscriptionRetrievalAgent(Network network, String destinationUri,
final Object notifier, int defaultPriority,
RetrievalManager retrievalManager, IBandwidthDao bandwidthDao,
IRetrievalDao retrievalDao) {
IRetrievalDao retrievalDao, IProviderHandler providerHandler) {
super(network, destinationUri, notifier, retrievalManager);
this.defaultPriority = defaultPriority;
this.bandwidthDao = bandwidthDao;
this.retrievalDao = retrievalDao;
this.providerHandler = providerHandler;
}
@Override
@ -246,10 +250,9 @@ public class SubscriptionRetrievalAgent extends
return retrievalsGenerated;
}
private static Provider getProvider(String providerName) {
private Provider getProvider(String providerName) {
try {
return DataDeliveryHandlers.getProviderHandler().getByName(
providerName);
return providerHandler.getByName(providerName);
} catch (RegistryHandlerException e) {
statusHandler.handle(Priority.PROBLEM,
"Unable to retrieve provider by name.", e);

View file

@ -1,122 +0,0 @@
package com.raytheon.uf.edex.datadelivery.bandwidth.separator;
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import com.raytheon.edex.esb.Headers;
import com.raytheon.uf.common.datadelivery.registry.SubscriptionBundle;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
/**
* Separate Subscription bundles in Queue
*
* <pre>
*
* SOFTWARE HISTORY
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Feb 07, 2012 dhladky Initial creation
*
* </pre>
*
* @author dhladky
* @version 1.0
*/
public class SubscriptionBundleSeparator implements
Iterator<SubscriptionBundle> {
private static final transient IUFStatusHandler statusHandler = UFStatus
.getHandler(SubscriptionBundleSeparator.class);
private List<SubscriptionBundle> reports = null;
private int currentCount = -1;
public static SubscriptionBundleSeparator separate(
ArrayList<SubscriptionBundle> data, Headers headers)
throws Exception {
SubscriptionBundleSeparator sbs = new SubscriptionBundleSeparator();
sbs.setData(data, headers);
return sbs;
}
@Override
public boolean hasNext() {
boolean answer = ((reports != null) && (reports.size() > 0) && (currentCount < reports
.size()));
if (!answer) {
reports.clear();
reports = null;
}
return answer;
}
@Override
public SubscriptionBundle next() {
return reports.get(currentCount++);
}
public void setData(ArrayList<SubscriptionBundle> sbs, Headers headers) {
try {
if (sbs != null) {
reports = new ArrayList<SubscriptionBundle>();
separate(sbs);
}
} catch (Exception e) {
statusHandler.handle(Priority.ERROR, e.getLocalizedMessage(), e);
}
if ((reports != null) && (reports.size() > 0)) {
currentCount = 0;
} else {
System.err.println("No bundles found in data.");
}
}
private void separate(ArrayList<SubscriptionBundle> sbs) {
for (int i = 0; i < sbs.size(); i++) {
try {
reports.add(sbs.get(i));
} catch (Exception e) {
statusHandler.handle(Priority.PROBLEM, e.getLocalizedMessage(),
e);
}
}
}
@Override
public void remove() {
}
}

View file

@ -25,7 +25,7 @@ import java.util.Set;
import com.google.common.collect.Sets;
import com.raytheon.uf.common.datadelivery.registry.Network;
import com.raytheon.uf.common.datadelivery.registry.Subscription;
import com.raytheon.uf.common.datadelivery.registry.handlers.DataDeliveryHandlers;
import com.raytheon.uf.common.datadelivery.registry.handlers.ISubscriptionHandler;
import com.raytheon.uf.common.registry.handler.RegistryHandlerException;
import com.raytheon.uf.edex.datadelivery.bandwidth.hibernate.IFindSubscriptionsForScheduling;
@ -39,6 +39,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.hibernate.IFindSubscriptionsF
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Feb 18, 2013 1543 djohnson Initial creation
* Jul 09, 2013 2106 djohnson Dependency inject registry handlers.
*
* </pre>
*
@ -49,25 +50,35 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.hibernate.IFindSubscriptionsF
public class FindActiveSubscriptionsForRoute implements
IFindSubscriptionsForScheduling {
private final ISubscriptionHandler subscriptionHandler;
private final Network[] routes;
/**
* Find active subscriptions for a specific route.
*
* @param subscriptionHandler
* the subscription handler
* @param route
* the route
*/
public FindActiveSubscriptionsForRoute(Network route) {
this(new Network[] { route });
public FindActiveSubscriptionsForRoute(
ISubscriptionHandler subscriptionHandler, Network route) {
this(subscriptionHandler, new Network[] { route });
}
/**
* Find active subscriptions for specific routes.
*
* @param subscriptionHandler
* the subscription handler
* @param routes
* the routes
*/
public FindActiveSubscriptionsForRoute(Network... routes) {
public FindActiveSubscriptionsForRoute(
ISubscriptionHandler subscriptionHandler, Network... routes) {
this.subscriptionHandler = subscriptionHandler;
this.routes = routes;
}
@ -77,8 +88,8 @@ public class FindActiveSubscriptionsForRoute implements
@Override
public Set<Subscription> findSubscriptionsToSchedule()
throws RegistryHandlerException {
final List<Subscription> activeForRoutes = DataDeliveryHandlers
.getSubscriptionHandler().getActiveForRoutes(routes);
final List<Subscription> activeForRoutes = subscriptionHandler
.getActiveForRoutes(routes);
return Sets.newHashSet(activeForRoutes);
}

View file

@ -1,165 +0,0 @@
package com.raytheon.uf.edex.datadelivery.retrieval;
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import com.raytheon.uf.common.datadelivery.registry.ProviderType;
import com.raytheon.uf.common.datadelivery.registry.SubscriptionBundle;
import com.raytheon.uf.common.datadelivery.retrieval.xml.Retrieval;
import com.raytheon.uf.common.serialization.SerializationUtil;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.util.CollectionUtil;
import com.raytheon.uf.edex.datadelivery.retrieval.db.IRetrievalDao;
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord;
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord.State;
/**
* Handle Retrieval creation
*
* <pre>
*
* SOFTWARE HISTORY
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Apr 09, 2012 dhladky Initial creation
* Jul 25, 2012 955 djohnson Use {@link ServiceTypeFactory}.
* Oct 10, 2012 0726 djohnson Pass -1 for subscription retrieval id, since not bandwidth managed.
*
* </pre>
*
* @author dhladky
* @version 1.0
*/
public class RetrievalGenerationHandler implements IGenerateRetrieval {
private static final IUFStatusHandler statusHandler = UFStatus
.getHandler(RetrievalGenerationHandler.class);
private final IRetrievalDao retrievalDao;
public RetrievalGenerationHandler(IRetrievalDao retrievalDao) {
this.retrievalDao = retrievalDao;
}
@Override
public List<String> generateRetrieval(List<SubscriptionBundle> bundles) {
if (bundles != null) {
ArrayList<String> names = new ArrayList<String>(bundles.size());
for (SubscriptionBundle bundle : bundles) {
statusHandler.info("Bundle: " + bundle.getBundleId()
+ " Create Retrieval Messages....");
// process the bundle into a retrieval
RetrievalGenerator rg = ServiceTypeFactory
.retrieveServiceFactory(bundle.getProvider())
.getRetrievalGenerator();
final String subscriptionName = bundle.getSubscription()
.getName();
statusHandler.info("Subcription: " + subscriptionName
+ " Being Processed for Retrieval...");
List<Retrieval> retrievals = rg.buildRetrieval(bundle);
if (!CollectionUtil.isNullOrEmpty(retrievals)) {
String owner = bundle.getSubscription().getOwner();
String provider = bundle.getSubscription().getProvider();
int priority = 3;
Integer bundlePriority = bundle.getPriority();
if (bundlePriority != null) {
priority = bundlePriority.intValue();
}
Date insertTime = Calendar.getInstance().getTime();
List<RetrievalRequestRecord> requestRecords = new ArrayList<RetrievalRequestRecord>(
retrievals.size());
long cumultTime1 = 0;
int index = 0;
final ProviderType providerType = bundle.getProvider()
.getProviderType(bundle.getDataType());
final String plugin = providerType.getPlugin();
for (Retrieval retrieval : retrievals) {
RetrievalRequestRecord rec = new RetrievalRequestRecord(
subscriptionName, index++, -1L);
rec.setOwner(owner);
rec.setPlugin(plugin);
rec.setProvider(provider);
rec.setSubscriptionType(retrieval.getSubscriptionType());
rec.setNetwork(retrieval.getNetwork());
rec.setPriority(priority);
rec.setInsertTime(insertTime);
try {
long t1 = System.currentTimeMillis();
rec.setRetrieval(SerializationUtil
.transformToThrift(retrieval));
long t2 = System.currentTimeMillis();
cumultTime1 += t2 - t1;
rec.setState(State.PENDING);
} catch (Exception e) {
statusHandler.error("Subcription: "
+ subscriptionName
+ " Failed to serialize request ["
+ retrieval + "]", e);
rec.setRetrieval(new byte[0]);
rec.setState(State.FAILED);
}
requestRecords.add(rec);
}
statusHandler.info("Cumulative time to serialize "
+ requestRecords.size() + " requests: thrift ["
+ cumultTime1 + "] ms");
try {
long t1 = System.currentTimeMillis();
retrievalDao.persistAll(requestRecords);
statusHandler.info("Time to persist requests to db ["
+ (System.currentTimeMillis() - t1) + "] ms");
names.add(subscriptionName);
} catch (Exception e) {
statusHandler.warn("Subscription: " + subscriptionName
+ " Failed to store to retrievals.");
// this should send notification
}
} else {
statusHandler.warn("Subscription: " + subscriptionName
+ " Did not generate any retrieval messages");
// should this send notification
}
}
return names;
} else {
statusHandler.info("NO VALID SUBCSRIPTIONS NEEDING RETRIEVAL....");
}
return null;
}
}

View file

@ -74,6 +74,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil;
* Mar 28, 2013 1841 djohnson Subscription is now UserSubscription.
* Apr 29, 2013 1910 djohnson Always shutdown bandwidth managers in tests.
* Jun 03, 2013 2095 djohnson Move getPointDataSet in from subclass.
* Jul 09, 2013 2106 djohnson Add datadelivery handlers, since they are now dependency injected.
*
* </pre>
*
@ -82,7 +83,8 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil;
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { SpringFiles.UNIT_TEST_DB_BEANS_XML,
SpringFiles.EVENTBUS_COMMON_XML,
SpringFiles.EVENTBUS_COMMON_XML, SpringFiles.DATADELIVERY_HANDLERS_XML,
SpringFiles.MEMORY_DATADELIVERY_HANDLERS_XML,
SpringFiles.RETRIEVAL_DATADELIVERY_DAOS_XML,
SpringFiles.BANDWIDTH_DATADELIVERY_DAOS_XML,
SpringFiles.BANDWIDTH_DATADELIVERY_XML,
@ -95,7 +97,7 @@ public abstract class AbstractBandwidthManagerIntTest {
protected ApplicationContext context;
@Autowired
protected BandwidthManager bandwidthManager;
protected EdexBandwidthManager bandwidthManager;
@Autowired
protected RetrievalManager retrievalManager;

View file

@ -38,13 +38,10 @@ import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.Comparator;
import java.util.ConcurrentModificationException;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.SortedSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import org.junit.Test;
@ -54,7 +51,6 @@ import com.raytheon.uf.common.datadelivery.registry.GriddedDataSetMetaData;
import com.raytheon.uf.common.datadelivery.registry.Network;
import com.raytheon.uf.common.datadelivery.registry.OpenDapGriddedDataSetMetaData;
import com.raytheon.uf.common.datadelivery.registry.OpenDapGriddedDataSetMetaDataFixture;
import com.raytheon.uf.common.datadelivery.registry.ParameterFixture;
import com.raytheon.uf.common.datadelivery.registry.PointDataSetMetaData;
import com.raytheon.uf.common.datadelivery.registry.PointDataSetMetaDataFixture;
import com.raytheon.uf.common.datadelivery.registry.PointTime;
@ -66,7 +62,6 @@ import com.raytheon.uf.common.datadelivery.registry.Time;
import com.raytheon.uf.common.datadelivery.registry.handlers.DataDeliveryHandlers;
import com.raytheon.uf.common.registry.event.RemoveRegistryEvent;
import com.raytheon.uf.common.registry.handler.RegistryHandlerException;
import com.raytheon.uf.common.registry.handler.RegistryObjectHandlersUtil;
import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.common.time.util.ImmutableDate;
import com.raytheon.uf.common.time.util.TimeUtil;
@ -76,8 +71,6 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthAllocation;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthBucket;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthSubscription;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.SubscriptionRetrieval;
import com.raytheon.uf.edex.datadelivery.bandwidth.notification.BandwidthEventBus;
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.BandwidthMap;
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalPlan;
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalPlanTest;
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalStatus;
@ -106,6 +99,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.RetrievalManagerNotifyEvent;
* Jun 03, 2013 2038 djohnson Add support for point data based subscriptions.
* Jun 03, 2013 2095 djohnson Move getPointDataSet to superclass.
* Jun 25, 2013 2106 djohnson Set subscription latency, access bucket allocations through RetrievalPlan.
* Jul 09, 2013 2106 djohnson InMemoryBandwidthManager no longer receives updates from the EventBus.
*
* </pre>
*
@ -267,8 +261,6 @@ public class BandwidthManagerIntTest extends AbstractWfoBandwidthManagerIntTest
public void testDailyProductSubscriptionReceivesTimeAndUrlFromUpdate()
throws RegistryHandlerException, ParseException,
SerializationException {
RegistryObjectHandlersUtil.initMemory();
// Store the original subscription
Subscription subscription = SiteSubscriptionFixture.INSTANCE.get();
DataDeliveryHandlers.getSubscriptionHandler().store(subscription);
@ -303,8 +295,6 @@ public class BandwidthManagerIntTest extends AbstractWfoBandwidthManagerIntTest
@Test
public void testDailyProductSubscriptionIsSetToReadyStatus()
throws RegistryHandlerException, ParseException {
RegistryObjectHandlersUtil.initMemory();
// Store the original subscription
Subscription subscription = SiteSubscriptionFixture.INSTANCE.get();
subscription.getTime().setCycleTimes(Collections.<Integer> emptyList());
@ -338,7 +328,6 @@ public class BandwidthManagerIntTest extends AbstractWfoBandwidthManagerIntTest
@Test
public void testSubscriptionLatencyIsPlacedOnSubscriptionDao()
throws RegistryHandlerException, ParseException {
RegistryObjectHandlersUtil.initMemory();
// Store the original subscription
Subscription subscription = SiteSubscriptionFixture.INSTANCE.get();
@ -715,108 +704,6 @@ public class BandwidthManagerIntTest extends AbstractWfoBandwidthManagerIntTest
bandwidthManager.subscriptionUpdated(subscription);
}
/**
* Long-running in-memory bandwidth manager proposed schedule operations
* were causing {@link ConcurrentModificationException}s to occur when
* receiving events from the {@link BandwidthEventBus}.
*
* @throws Exception
* on test failure
*/
@Test
public void testInMemoryBandwidthManagerCanReceiveDataSetMetaDataUpdates()
throws Exception {
Subscription subscription = createSubscriptionThatFillsUpABucket();
subscription.getTime().setCycleTimes(Arrays.asList(Integer.valueOf(0)));
bandwidthManager.schedule(subscription);
BandwidthManager bwProposed = null;
try {
bwProposed = bandwidthManager
.startProposedBandwidthManager(BandwidthMap
.load(EdexBandwidthContextFactory
.getBandwidthMapConfig()));
final BandwidthManager proposed = bwProposed;
final BlockingQueue<Exception> queue = new ArrayBlockingQueue<Exception>(
1);
final int invocationCount = 10;
final CountDownLatch waitForAllThreadsReadyLatch = new CountDownLatch(
invocationCount * 2);
final CountDownLatch doneLatch = new CountDownLatch(
invocationCount * 2);
for (int i = 0; i < invocationCount; i++) {
final int current = i;
Thread thread = new Thread() {
@Override
public void run() {
try {
// Wait for all threads to check in, then they all
// start
// working at once
waitForAllThreadsReadyLatch.countDown();
waitForAllThreadsReadyLatch.await();
proposed.updateGriddedDataSetMetaData(OpenDapGriddedDataSetMetaDataFixture.INSTANCE
.get(current));
} catch (Exception e) {
queue.offer(e);
}
doneLatch.countDown();
}
};
thread.start();
}
for (int i = 0; i < invocationCount; i++) {
final int current = i;
Thread thread = new Thread() {
@Override
public void run() {
try {
final Subscription subscription2 = SiteSubscriptionFixture.INSTANCE
.get(current);
subscription2
.addParameter(ParameterFixture.INSTANCE
.get(1));
subscription2
.addParameter(ParameterFixture.INSTANCE
.get(2));
subscription2
.addParameter(ParameterFixture.INSTANCE
.get(3));
subscription2.getTime().setCycleTimes(
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9,
10, 11, 12, 13, 14, 15, 16, 17));
subscription2.setLatencyInMinutes(current);
// Wait for all threads to check in, then they all
// start
// working at once
waitForAllThreadsReadyLatch.countDown();
waitForAllThreadsReadyLatch.await();
proposed.schedule(subscription2);
} catch (Exception e) {
queue.offer(e);
}
doneLatch.countDown();
}
};
thread.start();
}
// Wait for all threads to finish
doneLatch.await();
final Exception exception = queue.poll();
if (exception != null) {
throw exception;
}
} finally {
shutdownBandwidthManager(bwProposed);
}
}
/**
* Subscriptions that are deleted should have all of their bandwidth
* allocations removed deleted.

View file

@ -21,6 +21,8 @@ package com.raytheon.uf.edex.datadelivery.bandwidth;
import java.io.File;
import com.raytheon.uf.common.datadelivery.registry.handlers.IDataSetMetaDataHandler;
import com.raytheon.uf.common.datadelivery.registry.handlers.ISubscriptionHandler;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthContextFactory;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthBucketDao;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDao;
@ -38,6 +40,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDbInit;
* Oct 24, 2012 1286 djohnson Initial creation
* Feb 20, 2013 1543 djohnson Pass additional super-class constructor arguments.
* Jun 25, 2013 2106 djohnson Add {@link IBandwidthBucketDao}.
* Jul 10, 2013 2106 djohnson Dependency inject registry handlers.
*
* </pre>
*
@ -57,14 +60,19 @@ public class IntegrationTestBandwidthContextFactory extends
* the creator for the bandwidth manager instance
* @param dbInit
* the database initializer
* @param dataSetMetaDataHandler
* @param subscriptionHandler
*/
IntegrationTestBandwidthContextFactory(IBandwidthDao bandwidthDao,
IBandwidthBucketDao bandwidthBucketsDao,
IEdexBandwidthManagerCreator bandwidthManagerCreator,
IBandwidthDbInit dbInit) {
IBandwidthDbInit dbInit,
IDataSetMetaDataHandler dataSetMetaDataHandler,
ISubscriptionHandler subscriptionHandler) {
super(bandwidthDao, bandwidthBucketsDao,
new IntegrationTestBandwidthInitializer(),
bandwidthManagerCreator, dbInit);
bandwidthManagerCreator, dbInit, dataSetMetaDataHandler,
subscriptionHandler);
}
/**
@ -81,9 +89,7 @@ public class IntegrationTestBandwidthContextFactory extends
* @return the file
*/
public static File getIntegrationTestBandwidthMapConfigFile() {
return new IntegrationTestBandwidthContextFactory((IBandwidthDao) null,
(IBandwidthBucketDao) null,
(IEdexBandwidthManagerCreator) null, (IBandwidthDbInit) null)
.getBandwidthMapConfigFile();
return new IntegrationTestBandwidthContextFactory(null, null, null,
null, null, null).getBandwidthMapConfigFile();
}
}

View file

@ -19,6 +19,8 @@
**/
package com.raytheon.uf.edex.datadelivery.bandwidth;
import com.raytheon.uf.common.datadelivery.registry.handlers.IDataSetMetaDataHandler;
import com.raytheon.uf.common.datadelivery.registry.handlers.ISubscriptionHandler;
import com.raytheon.uf.common.util.JarUtil;
import com.raytheon.uf.common.util.SpringFiles;
import com.raytheon.uf.edex.datadelivery.bandwidth.WfoBandwidthManagerCreator.WfoBandwidthManager;
@ -40,6 +42,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthDaoUtil;
* Oct 30, 2012 1286 djohnson Initial creation
* Feb 27, 2013 1644 djohnson Extends WFO bandwidth manager.
* May 15, 2013 2000 djohnson Include daos.
* Jul 10, 2013 2106 djohnson Dependency inject registry handlers.
*
* </pre>
*
@ -66,8 +69,11 @@ public class IntegrationTestWfoBandwidthManager extends WfoBandwidthManager {
*/
public IntegrationTestWfoBandwidthManager(IBandwidthDbInit dbInit,
IBandwidthDao bandwidthDao, RetrievalManager retrievalManager,
BandwidthDaoUtil bandwidthDaoUtil) {
super(dbInit, bandwidthDao, retrievalManager, bandwidthDaoUtil);
BandwidthDaoUtil bandwidthDaoUtil,
IDataSetMetaDataHandler dataSetMetaDataHandler,
ISubscriptionHandler subscriptionHandler) {
super(dbInit, bandwidthDao, retrievalManager, bandwidthDaoUtil,
dataSetMetaDataHandler, subscriptionHandler);
}
/**

View file

@ -19,6 +19,8 @@
**/
package com.raytheon.uf.edex.datadelivery.bandwidth;
import com.raytheon.uf.common.datadelivery.registry.handlers.IDataSetMetaDataHandler;
import com.raytheon.uf.common.datadelivery.registry.handlers.ISubscriptionHandler;
import com.raytheon.uf.edex.datadelivery.bandwidth.EdexBandwidthContextFactory.IEdexBandwidthManagerCreator;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDao;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDbInit;
@ -35,6 +37,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthDaoUtil;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Feb 20, 2013 1543 djohnson Initial creation
* Jul 10, 2013 2106 djohnson Dependency inject registry handlers.
*
* </pre>
*
@ -50,8 +53,11 @@ public class IntegrationTestWfoBandwidthManagerCreator implements
@Override
public IBandwidthManager getBandwidthManager(IBandwidthDbInit dbInit,
IBandwidthDao bandwidthDao, RetrievalManager retrievalManager,
BandwidthDaoUtil bandwidthDaoUtil) {
BandwidthDaoUtil bandwidthDaoUtil,
IDataSetMetaDataHandler dataSetMetaDataHandler,
ISubscriptionHandler subscriptionHandler) {
return new IntegrationTestWfoBandwidthManager(dbInit, bandwidthDao,
retrievalManager, bandwidthDaoUtil);
retrievalManager, bandwidthDaoUtil, dataSetMetaDataHandler,
subscriptionHandler);
}
}

View file

@ -57,6 +57,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDao;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Feb 27, 2013 1644 djohnson Initial creation
* Jul 10, 2013 2106 djohnson Dependency inject registry handlers.
*
* </pre>
*
@ -78,6 +79,7 @@ public class WfoNcfBandwidthManagerIntTest extends
ApplicationContext ncfBandwidthManagerCtx = new ClassPathXmlApplicationContext(
new String[] {
SpringFiles.UNIT_TEST_DB_BEANS_XML,
SpringFiles.MEMORY_DATADELIVERY_HANDLERS_XML,
SpringFiles.RETRIEVAL_DATADELIVERY_DAOS_XML,
SpringFiles.BANDWIDTH_DATADELIVERY_DAOS_XML,
SpringFiles.BANDWIDTH_DATADELIVERY_XML,

View file

@ -19,6 +19,8 @@
**/
package com.raytheon.uf.edex.datadelivery.bandwidth.ncf;
import com.raytheon.uf.common.datadelivery.registry.handlers.IDataSetMetaDataHandler;
import com.raytheon.uf.common.datadelivery.registry.handlers.ISubscriptionHandler;
import com.raytheon.uf.common.util.JarUtil;
import com.raytheon.uf.common.util.SpringFiles;
import com.raytheon.uf.edex.datadelivery.bandwidth.IBandwidthManager;
@ -40,6 +42,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthDaoUtil;
* ------------ ---------- ----------- --------------------------
* Feb 18, 2013 1543 djohnson Initial creation
* Feb 27, 2013 1644 djohnson Extend NCF bandwidth manager.
* Jul 10, 2013 2106 djohnson Dependency inject registry handlers.
*
* </pre>
*
@ -66,8 +69,11 @@ public class IntegrationTestNcfBandwidthManager extends NcfBandwidthManager {
*/
public IntegrationTestNcfBandwidthManager(IBandwidthDbInit dbInit,
IBandwidthDao bandwidthDao, RetrievalManager retrievalManager,
BandwidthDaoUtil bandwidthDaoUtil) {
super(dbInit, bandwidthDao, retrievalManager, bandwidthDaoUtil);
BandwidthDaoUtil bandwidthDaoUtil,
IDataSetMetaDataHandler dataSetMetaDataHandler,
ISubscriptionHandler subscriptionHandler) {
super(dbInit, bandwidthDao, retrievalManager, bandwidthDaoUtil,
dataSetMetaDataHandler, subscriptionHandler);
}
/**

View file

@ -19,9 +19,10 @@
**/
package com.raytheon.uf.edex.datadelivery.bandwidth.ncf;
import com.raytheon.uf.edex.datadelivery.bandwidth.EdexBandwidthContextFactory;
import com.raytheon.uf.edex.datadelivery.bandwidth.IBandwidthManager;
import com.raytheon.uf.common.datadelivery.registry.handlers.IDataSetMetaDataHandler;
import com.raytheon.uf.common.datadelivery.registry.handlers.ISubscriptionHandler;
import com.raytheon.uf.edex.datadelivery.bandwidth.EdexBandwidthContextFactory.IEdexBandwidthManagerCreator;
import com.raytheon.uf.edex.datadelivery.bandwidth.IBandwidthManager;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDao;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDbInit;
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalManager;
@ -37,6 +38,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthDaoUtil;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Feb 20, 2013 1543 djohnson Initial creation
* Jul 10, 2013 2106 djohnson Dependency inject registry handlers.
*
* </pre>
*
@ -52,8 +54,11 @@ public class IntegrationTestNcfBandwidthManagerCreator implements
@Override
public IBandwidthManager getBandwidthManager(IBandwidthDbInit dbInit,
IBandwidthDao bandwidthDao, RetrievalManager retrievalManager,
BandwidthDaoUtil bandwidthDaoUtil) {
BandwidthDaoUtil bandwidthDaoUtil,
IDataSetMetaDataHandler dataSetMetaDataHandler,
ISubscriptionHandler subscriptionHandler) {
return new IntegrationTestNcfBandwidthManager(dbInit, bandwidthDao,
retrievalManager, bandwidthDaoUtil);
retrievalManager, bandwidthDaoUtil, dataSetMetaDataHandler,
subscriptionHandler);
}
}

View file

@ -17,6 +17,9 @@
</constructor-arg>
<constructor-arg ref="bandwidthManagerCreator" />
<constructor-arg ref="hibernateBandwidthDbInit" />
<!-- Registry handlers required for EdexBandwidthManager -->
<constructor-arg ref="DataSetMetaDataHandler" />
<constructor-arg ref="SubscriptionHandler" />
</bean>
<bean
@ -24,15 +27,6 @@
<constructor-arg ref="bandwidthManager" />
</bean>
<bean id="registerDataDeliveryHandlers" class="java.lang.String">
<!-- required for depends-on -->
</bean>
<bean id="retrievalAgents" class="java.util.Collections"
factory-method="emptyMap">
<!-- No retrievals for integration test -->
</bean>
<bean id="dataSetAvailabilityCalculator"
class="com.raytheon.uf.edex.datadelivery.bandwidth.util.SimpleAvailablityCalculator">
<property name="delay"

View file

@ -1,7 +1,4 @@
# BandwidthManagement properties for testing
bandwidth.dataSetMetaDataPoolSize=1
bandwidth.retrievalPoolSize=1
bandwidth.subscriptionPoolSize=1
# 0 availability delay to make math simple
bandwidth.dataSetAvailabilityCalculator.delay=0
bandwidth.subscription.latency=0

View file

@ -36,7 +36,8 @@ import com.raytheon.uf.common.util.TestUtil;
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Oct 4, 2012 1241 djohnson Initial creation
* Oct 04, 2012 1241 djohnson Initial creation
* Jul 10, 2013 2106 djohnson Spring file path moved to SpringFiles for reuse.
*
* </pre>
*
@ -48,8 +49,6 @@ public class RegistryObjectHandlersUtil {
private static final String MOCK_DATADELIVERY_HANDLERS_XML = "/datadelivery/mock-datadelivery-handlers.xml";
private static final String MEMORY_DATADELIVERY_HANDLERS_XML = "/datadelivery/memory-datadelivery-handlers.xml";
/**
* Initializes the handlers with the set of production implementations,
* which interact with the registry proper.
@ -69,7 +68,7 @@ public class RegistryObjectHandlersUtil {
* Initializes the handlers with a set of in-memory implementations.
*/
public static void initMemory() {
initHandlersFromSpringFile(MEMORY_DATADELIVERY_HANDLERS_XML);
initHandlersFromSpringFile(SpringFiles.MEMORY_DATADELIVERY_HANDLERS_XML);
}
/**

View file

@ -35,6 +35,7 @@ import org.junit.Ignore;
* May 02, 2013 1910 djohnson Add validator plugins spring file.
* May 28, 2013 1650 djohnson Add event bus spring files.
* Jun 24, 2013 2106 djohnson Remove spring file.
* Jul 10, 2013 2106 djohnson Add MEMORY_DATADELIVERY_HANDLERS_XML.
*
* </pre>
*
@ -92,6 +93,8 @@ public class SpringFiles {
public static final String EVENTBUS_COMMON_XML = "/spring/eventbus-common.xml";
public static final String MEMORY_DATADELIVERY_HANDLERS_XML = "/datadelivery/memory-datadelivery-handlers.xml";
public static final String UNIT_TEST_DB_BEANS_XML = "/unit-test-db-beans.xml";
public static final String UNIT_TEST_LOCALIZATION_BEANS_XML = "/unit-test-localization-beans.xml";

View file

@ -35,6 +35,7 @@ import com.google.common.eventbus.EventBus;
* ------------ ---------- ----------- --------------------------
* Feb 06, 2013 1543 djohnson Initial creation
* May 28, 2013 1650 djohnson Add getEventBuses.
* Jul 10, 2013 2106 djohnson Remove subscriptionBus.
*
* </pre>
*
@ -46,18 +47,8 @@ public class BandwidthSyncEventBusFactory implements BandwidthEventBusFactory {
private final EventBus dataSetBus = new EventBus();
private final EventBus subscriptionBus = new EventBus();
private final EventBus retrievalBus = new EventBus();
/**
* {@inheritDoc}
*/
@Override
public EventBus getSubscriptionBus() {
return subscriptionBus;
}
/**
* {@inheritDoc}
*/
@ -79,7 +70,6 @@ public class BandwidthSyncEventBusFactory implements BandwidthEventBusFactory {
*/
@Override
public List<EventBus> getEventBuses() {
return Arrays.<EventBus> asList(dataSetBus, retrievalBus,
subscriptionBus);
return Arrays.<EventBus> asList(dataSetBus, retrievalBus);
}
}

View file

@ -66,6 +66,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord.Sta
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Jan 30, 2013 1543 djohnson Initial creation
* Jul 10, 2013 2106 djohnson Inject providerHandler.
*
* </pre>
*
@ -119,7 +120,7 @@ public class SubscriptionRetrievalAgentTest {
SubscriptionRetrievalAgent agent = new SubscriptionRetrievalAgent(
route, "someUri", new Object(), 1, null, bandwidthDao,
retrievalDao) {
retrievalDao, DataDeliveryHandlers.getProviderHandler()) {
@Override
void wakeRetrievalTasks() throws EdexException {
// Do nothing

View file

@ -29,9 +29,9 @@ import org.junit.BeforeClass;
import org.junit.Test;
import com.raytheon.uf.common.datadelivery.registry.Network;
import com.raytheon.uf.common.datadelivery.registry.SiteSubscription;
import com.raytheon.uf.common.datadelivery.registry.Subscription;
import com.raytheon.uf.common.datadelivery.registry.SubscriptionBuilder;
import com.raytheon.uf.common.datadelivery.registry.SiteSubscription;
import com.raytheon.uf.common.datadelivery.registry.handlers.DataDeliveryHandlers;
import com.raytheon.uf.common.datadelivery.registry.handlers.ISubscriptionHandler;
import com.raytheon.uf.common.registry.handler.RegistryHandlerException;
@ -48,6 +48,7 @@ import com.raytheon.uf.common.registry.handler.RegistryObjectHandlersUtil;
* ------------ ---------- ----------- --------------------------
* Feb 19, 2013 1543 djohnson Initial creation
* Mar 28, 2013 1841 djohnson Subscription is now UserSubscription.
* Jul 10, 2013 2106 djohnson Inject subscriptionHandler.
*
* </pre>
*
@ -56,12 +57,12 @@ import com.raytheon.uf.common.registry.handler.RegistryObjectHandlersUtil;
*/
public class FindActiveSubscriptionsForRouteTest {
private static ISubscriptionHandler subscriptionHandler;
@BeforeClass
public static void classSetUp() throws RegistryHandlerException {
RegistryObjectHandlersUtil.initMemory();
final ISubscriptionHandler subscriptionHandler = DataDeliveryHandlers
.getSubscriptionHandler();
subscriptionHandler = DataDeliveryHandlers.getSubscriptionHandler();
// Two OPSNET subscriptions
final SiteSubscription opsnetSub1 = new SubscriptionBuilder()
@ -86,7 +87,7 @@ public class FindActiveSubscriptionsForRouteTest {
public void findsSubscriptionForSingleRoute()
throws RegistryHandlerException {
final Set<Subscription> subscriptions = new FindActiveSubscriptionsForRoute(
Network.SBN).findSubscriptionsToSchedule();
subscriptionHandler, Network.SBN).findSubscriptionsToSchedule();
assertThat(subscriptions, hasSize(2));
for (Subscription subscription : subscriptions) {
assertThat(subscription.getRoute(), is(Network.SBN));
@ -97,7 +98,8 @@ public class FindActiveSubscriptionsForRouteTest {
public void findsSubscriptionsForMultipleRoutes()
throws RegistryHandlerException {
final Set<Subscription> subscriptions = new FindActiveSubscriptionsForRoute(
Network.OPSNET, Network.SBN).findSubscriptionsToSchedule();
subscriptionHandler, Network.OPSNET, Network.SBN)
.findSubscriptionsToSchedule();
assertThat(subscriptions, hasSize(4));
}