From 10884ca1371d580e319811565f47fa1c9e047eb6 Mon Sep 17 00:00:00 2001 From: Dustin Johnson Date: Wed, 10 Jul 2013 15:36:00 -0500 Subject: [PATCH] Issue #2106 Improve BandwidthManager performance, proposed changes are checked much quicker Change-Id: I3a61de1904d7256de04a97f40f38bf18d78b1438 Former-commit-id: 0c6d48d985e553e3a9fee95302a543fc2b73836e [formerly b36200d583e6ea74182386e3ba5038e7645b218c] [formerly ec2ddd13b1cb3a99bf99176b1bc7f7311272735b [formerly 59eafdd07754e3711e9c3babd946dfd18c161404]] Former-commit-id: ec2ddd13b1cb3a99bf99176b1bc7f7311272735b Former-commit-id: 580bc2cadf44a91e877ce4fbfc36dc838c1f5c8b --- .../bandwidth-datadelivery-ncf-edex-impl.xml | 1 + .../ncf/NcfBandwidthManagerCreator.java | 19 +- .../bandwidth-datadelivery-edex-impl.xml | 43 +- .../bandwidth-datadelivery-eventbus.xml | 3 + .../bandwidth-datadelivery-inmemory-impl.xml | 8 - .../bandwidth-datadelivery-wfo-edex-impl.xml | 2 + .../res/spring/bandwidth-datadelivery.xml | 155 ++-- ....uf.edex.datadelivery.bandwidth.properties | 1 - .../bandwidth/BandwidthManager.java | 580 +-------------- .../EdexBandwidthContextFactory.java | 40 +- .../bandwidth/EdexBandwidthManager.java | 670 ++++++++++++++++++ .../bandwidth/IBandwidthManager.java | 41 +- .../bandwidth/InMemoryBandwidthDao.java | 289 ++++---- .../bandwidth/InMemoryBandwidthManager.java | 14 +- .../bandwidth/WfoBandwidthManagerCreator.java | 18 +- .../BandwidthAsyncEventBusFactory.java | 48 +- .../notification/BandwidthEventBusConfig.java | 48 +- .../BandwidthEventBusFactory.java | 6 +- .../EdexBandwidthEventBusHandler.java | 15 +- .../bandwidth/processing/Processor.java | 90 --- .../bandwidth/retrieval/RetrievalManager.java | 3 +- .../retrieval/SubscriptionRetrievalAgent.java | 13 +- .../SubscriptionBundleSeparator.java | 122 ---- .../util/FindActiveSubscriptionsForRoute.java | 23 +- .../retrieval/RetrievalGenerationHandler.java | 165 ----- .../AbstractBandwidthManagerIntTest.java | 6 +- .../bandwidth/BandwidthManagerIntTest.java | 115 +-- ...ntegrationTestBandwidthContextFactory.java | 18 +- .../IntegrationTestWfoBandwidthManager.java | 10 +- ...grationTestWfoBandwidthManagerCreator.java | 10 +- .../WfoNcfBandwidthManagerIntTest.java | 2 + .../IntegrationTestNcfBandwidthManager.java | 10 +- ...grationTestNcfBandwidthManagerCreator.java | 13 +- ...idth-datadelivery-integrationtest-impl.xml | 12 +- ....uf.edex.datadelivery.bandwidth.properties | 3 - .../handler/RegistryObjectHandlersUtil.java | 7 +- .../raytheon/uf/common/util/SpringFiles.java | 3 + .../BandwidthSyncEventBusFactory.java | 14 +- .../SubscriptionRetrievalAgentTest.java | 3 +- .../FindActiveSubscriptionsForRouteTest.java | 12 +- 40 files changed, 1108 insertions(+), 1547 deletions(-) create mode 100644 edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/EdexBandwidthManager.java delete mode 100644 edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/processing/Processor.java delete mode 100644 edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/separator/SubscriptionBundleSeparator.java delete mode 100644 edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/RetrievalGenerationHandler.java diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth.ncf/res/spring/bandwidth-datadelivery-ncf-edex-impl.xml b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth.ncf/res/spring/bandwidth-datadelivery-ncf-edex-impl.xml index 3fbb7c8021..5a17a62ee5 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth.ncf/res/spring/bandwidth-datadelivery-ncf-edex-impl.xml +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth.ncf/res/spring/bandwidth-datadelivery-ncf-edex-impl.xml @@ -16,6 +16,7 @@ + diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth.ncf/src/com/raytheon/uf/edex/datadelivery/bandwidth/ncf/NcfBandwidthManagerCreator.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth.ncf/src/com/raytheon/uf/edex/datadelivery/bandwidth/ncf/NcfBandwidthManagerCreator.java index 2489823fcc..645b4eadbd 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth.ncf/src/com/raytheon/uf/edex/datadelivery/bandwidth/ncf/NcfBandwidthManagerCreator.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth.ncf/src/com/raytheon/uf/edex/datadelivery/bandwidth/ncf/NcfBandwidthManagerCreator.java @@ -24,10 +24,13 @@ import java.util.Set; import com.raytheon.uf.common.datadelivery.bandwidth.ProposeScheduleResponse; import com.raytheon.uf.common.datadelivery.registry.Subscription; +import com.raytheon.uf.common.datadelivery.registry.handlers.IDataSetMetaDataHandler; +import com.raytheon.uf.common.datadelivery.registry.handlers.ISubscriptionHandler; import com.raytheon.uf.common.serialization.SerializationException; import com.raytheon.uf.common.util.JarUtil; import com.raytheon.uf.edex.datadelivery.bandwidth.BandwidthManager; import com.raytheon.uf.edex.datadelivery.bandwidth.EdexBandwidthContextFactory.IEdexBandwidthManagerCreator; +import com.raytheon.uf.edex.datadelivery.bandwidth.EdexBandwidthManager; import com.raytheon.uf.edex.datadelivery.bandwidth.IBandwidthManager; import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDao; import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDbInit; @@ -47,6 +50,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthDaoUtil; * Feb 27, 2013 1644 djohnson Schedule SBN subscriptions. * Mar 11, 2013 1645 djohnson Add missing Spring file. * May 15, 2013 2000 djohnson Include daos. + * Jul 10, 2013 2106 djohnson Dependency inject registry handlers. * * * @@ -58,7 +62,7 @@ public class NcfBandwidthManagerCreator implements IEdexBandwidthManagerCreator /** * NCF {@link BandwidthManager} implementation. */ - static class NcfBandwidthManager extends BandwidthManager { + static class NcfBandwidthManager extends EdexBandwidthManager { private static final String[] NCF_BANDWIDTH_MANAGER_FILES = new String[] { JarUtil.getResResourcePath("/spring/bandwidth-datadelivery-ncf-edex-impl.xml"), @@ -79,8 +83,11 @@ public class NcfBandwidthManagerCreator implements IEdexBandwidthManagerCreator */ public NcfBandwidthManager(IBandwidthDbInit dbInit, IBandwidthDao bandwidthDao, RetrievalManager retrievalManager, - BandwidthDaoUtil bandwidthDaoUtil) { - super(dbInit, bandwidthDao, retrievalManager, bandwidthDaoUtil); + BandwidthDaoUtil bandwidthDaoUtil, + IDataSetMetaDataHandler dataSetMetaDataHandler, + ISubscriptionHandler subscriptionHandler) { + super(dbInit, bandwidthDao, retrievalManager, bandwidthDaoUtil, + dataSetMetaDataHandler, subscriptionHandler); } @Override @@ -113,9 +120,11 @@ public class NcfBandwidthManagerCreator implements IEdexBandwidthManagerCreator @Override public IBandwidthManager getBandwidthManager(IBandwidthDbInit dbInit, IBandwidthDao bandwidthDao, RetrievalManager retrievalManager, - BandwidthDaoUtil bandwidthDaoUtil) { + BandwidthDaoUtil bandwidthDaoUtil, + IDataSetMetaDataHandler dataSetMetaDataHandler, + ISubscriptionHandler subscriptionHandler) { return new NcfBandwidthManager(dbInit, bandwidthDao, retrievalManager, - bandwidthDaoUtil); + bandwidthDaoUtil, dataSetMetaDataHandler, subscriptionHandler); } } diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/res/spring/bandwidth-datadelivery-edex-impl.xml b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/res/spring/bandwidth-datadelivery-edex-impl.xml index 431547f993..0d9490c928 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/res/spring/bandwidth-datadelivery-edex-impl.xml +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/res/spring/bandwidth-datadelivery-edex-impl.xml @@ -2,8 +2,7 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/tx - http://www.springframework.org/schema/tx/spring-tx.xsd - http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> + http://www.springframework.org/schema/tx/spring-tx.xsd"> @@ -21,6 +20,14 @@ + + + + + + @@ -29,7 +36,8 @@ - + @@ -38,6 +46,7 @@ + @@ -47,6 +56,9 @@ + + + - - - - - - - - - - - - - - - java.lang.Throwable - - - - - - \ No newline at end of file diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/res/spring/bandwidth-datadelivery-eventbus.xml b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/res/spring/bandwidth-datadelivery-eventbus.xml index c22e810348..dfb2364066 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/res/spring/bandwidth-datadelivery-eventbus.xml +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/res/spring/bandwidth-datadelivery-eventbus.xml @@ -2,6 +2,9 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> + + diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/res/spring/bandwidth-datadelivery-inmemory-impl.xml b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/res/spring/bandwidth-datadelivery-inmemory-impl.xml index d6230f90fa..165616c36e 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/res/spring/bandwidth-datadelivery-inmemory-impl.xml +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/res/spring/bandwidth-datadelivery-inmemory-impl.xml @@ -8,12 +8,4 @@ - - - - - - - \ No newline at end of file diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/res/spring/bandwidth-datadelivery-wfo-edex-impl.xml b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/res/spring/bandwidth-datadelivery-wfo-edex-impl.xml index d6f48cd905..60851496c3 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/res/spring/bandwidth-datadelivery-wfo-edex-impl.xml +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/res/spring/bandwidth-datadelivery-wfo-edex-impl.xml @@ -16,6 +16,7 @@ + + diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/res/spring/bandwidth-datadelivery.xml b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/res/spring/bandwidth-datadelivery.xml index 2eb46a8028..cdce5e7807 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/res/spring/bandwidth-datadelivery.xml +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/res/spring/bandwidth-datadelivery.xml @@ -2,114 +2,89 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> - - - - classpath:/com.raytheon.uf.edex.datadelivery.bandwidth.properties - - - - - - + + + + classpath:/com.raytheon.uf.edex.datadelivery.bandwidth.properties + + + - + - - + + - - + - + - - - - - - - - + + + + + + + + - - - - - - - - - + + + + + + + + + - - - - + + + + - - - + + + - - - + + + - - - - - + + - - + + + + + - - - - - - - - - - - - - - - - - diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/resources/com.raytheon.uf.edex.datadelivery.bandwidth.properties b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/resources/com.raytheon.uf.edex.datadelivery.bandwidth.properties index 09a953ff33..d3caf0bc86 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/resources/com.raytheon.uf.edex.datadelivery.bandwidth.properties +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/resources/com.raytheon.uf.edex.datadelivery.bandwidth.properties @@ -1,6 +1,5 @@ bandwidth.dataSetMetaDataPoolSize=2 bandwidth.retrievalPoolSize=4 -bandwidth.subscriptionPoolSize=2 bandwidth.dataSetAvailabilityCalculator.delay=100 bandwidth.subscription.latency=120 bandwidth.default.retrieval.priority=3 \ No newline at end of file diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/BandwidthManager.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/BandwidthManager.java index e8b4d7ef95..81965515cc 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/BandwidthManager.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/BandwidthManager.java @@ -1,6 +1,5 @@ package com.raytheon.uf.edex.datadelivery.bandwidth; -import java.text.ParseException; import java.util.ArrayList; import java.util.Arrays; import java.util.Calendar; @@ -11,19 +10,12 @@ import java.util.List; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import org.springframework.context.support.ClassPathXmlApplicationContext; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import com.google.common.eventbus.AllowConcurrentEvents; -import com.google.common.eventbus.Subscribe; import com.raytheon.edex.util.Util; import com.raytheon.uf.common.auth.exception.AuthorizationException; import com.raytheon.uf.common.auth.user.IUser; @@ -33,22 +25,12 @@ import com.raytheon.uf.common.datadelivery.bandwidth.IProposeScheduleResponse; import com.raytheon.uf.common.datadelivery.bandwidth.ProposeScheduleResponse; import com.raytheon.uf.common.datadelivery.bandwidth.data.BandwidthGraphData; import com.raytheon.uf.common.datadelivery.registry.AdhocSubscription; -import com.raytheon.uf.common.datadelivery.registry.DataDeliveryRegistryObjectTypes; -import com.raytheon.uf.common.datadelivery.registry.DataSetMetaData; import com.raytheon.uf.common.datadelivery.registry.DataType; -import com.raytheon.uf.common.datadelivery.registry.GriddedDataSetMetaData; import com.raytheon.uf.common.datadelivery.registry.Network; -import com.raytheon.uf.common.datadelivery.registry.PointDataSetMetaData; import com.raytheon.uf.common.datadelivery.registry.PointTime; import com.raytheon.uf.common.datadelivery.registry.SiteSubscription; import com.raytheon.uf.common.datadelivery.registry.Subscription; import com.raytheon.uf.common.datadelivery.registry.Time; -import com.raytheon.uf.common.datadelivery.registry.handlers.DataDeliveryHandlers; -import com.raytheon.uf.common.event.EventBus; -import com.raytheon.uf.common.registry.event.InsertRegistryEvent; -import com.raytheon.uf.common.registry.event.RemoveRegistryEvent; -import com.raytheon.uf.common.registry.handler.IRegistryObjectHandler; -import com.raytheon.uf.common.registry.handler.RegistryHandlerException; import com.raytheon.uf.common.serialization.SerializationException; import com.raytheon.uf.common.status.IPerformanceStatusHandler; import com.raytheon.uf.common.status.IUFStatusHandler; @@ -59,8 +41,6 @@ import com.raytheon.uf.common.time.util.IPerformanceTimer; import com.raytheon.uf.common.time.util.ITimer; import com.raytheon.uf.common.time.util.TimeUtil; import com.raytheon.uf.common.util.CollectionUtil; -import com.raytheon.uf.common.util.FileUtil; -import com.raytheon.uf.common.util.IFileModifiedWatcher; import com.raytheon.uf.common.util.LogUtil; import com.raytheon.uf.common.util.algorithm.AlgorithmUtil; import com.raytheon.uf.common.util.algorithm.AlgorithmUtil.IBinarySearchResponse; @@ -76,14 +56,12 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDbInit; import com.raytheon.uf.edex.datadelivery.bandwidth.dao.SubscriptionRetrieval; import com.raytheon.uf.edex.datadelivery.bandwidth.interfaces.BandwidthInitializer; import com.raytheon.uf.edex.datadelivery.bandwidth.interfaces.ISubscriptionAggregator; -import com.raytheon.uf.edex.datadelivery.bandwidth.notification.BandwidthEventBus; import com.raytheon.uf.edex.datadelivery.bandwidth.processing.SimpleSubscriptionAggregator; import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.BandwidthMap; import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.BandwidthRoute; import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalManager; import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalPlan; import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalStatus; -import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.SubscriptionRetrievalFulfilled; import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthDaoUtil; import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil; @@ -122,6 +100,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil; * Jun 18, 2013 2120 dhladky Add times to pointtime array * Jun 20, 2013 1802 djohnson Check several times for the metadata for now. * Jun 24, 2013 2106 djohnson Access BandwidthBucket contents through RetrievalPlan. + * Jul 10, 2013 2106 djohnson Move EDEX instance specific code into its own class. * * * @author dhladky @@ -131,21 +110,17 @@ public abstract class BandwidthManager extends AbstractPrivilegedRequestHandler implements IBandwidthManager { - private static final IUFStatusHandler statusHandler = UFStatus + protected static final IUFStatusHandler statusHandler = UFStatus .getHandler(BandwidthManager.class); - private static final Pattern RAP_PATTERN = Pattern - .compile(".*rap_f\\d\\d$"); + // Requires package access so it can be accessed from the maintenance task + final IBandwidthDao bandwidthDao; private ISubscriptionAggregator aggregator; private BandwidthInitializer initializer; - private final ScheduledExecutorService scheduler; - - private final IBandwidthDao bandwidthDao; - - private final BandwidthDaoUtil bandwidthDaoUtil; + protected final BandwidthDaoUtil bandwidthDaoUtil; private final IBandwidthDbInit dbInit; @@ -157,21 +132,6 @@ public abstract class BandwidthManager extends @VisibleForTesting final RetrievalManager retrievalManager; - @VisibleForTesting - final Runnable watchForConfigFileChanges = new Runnable() { - - private final IFileModifiedWatcher fileModifiedWatcher = FileUtil - .getFileModifiedWatcher(EdexBandwidthContextFactory - .getBandwidthMapConfig()); - - @Override - public void run() { - if (fileModifiedWatcher.hasBeenModified()) { - bandwidthMapConfigurationUpdated(); - } - } - }; - public BandwidthManager(IBandwidthDbInit dbInit, IBandwidthDao bandwidthDao, RetrievalManager retrievalManager, BandwidthDaoUtil bandwidthDaoUtil) { @@ -179,348 +139,6 @@ public abstract class BandwidthManager extends this.bandwidthDao = bandwidthDao; this.retrievalManager = retrievalManager; this.bandwidthDaoUtil = bandwidthDaoUtil; - - // schedule maintenance tasks - scheduler = Executors.newScheduledThreadPool(1); - scheduler.scheduleAtFixedRate(watchForConfigFileChanges, 1, 1, - TimeUnit.MINUTES); - scheduler.scheduleAtFixedRate(new MaintanenceTask(), 1, 5, - TimeUnit.MINUTES); - } - - /** - * {@inheritDoc} - */ - @Override - @Subscribe - @AllowConcurrentEvents - public void registryEventListener(InsertRegistryEvent re) { - final String objectType = re.getObjectType(); - final String id = re.getId(); - - if (DataDeliveryRegistryObjectTypes.DATASETMETADATA.equals(objectType)) { - - DataSetMetaData dsmd = getDataSetMetaData(id); - - if (dsmd != null) { - // Repost the Object to the BandwidthEventBus to free - // the notification thread. - - // TODO: A hack to prevent rap_f and rap datasets being - // Identified as the - // same dataset... - Matcher matcher = RAP_PATTERN.matcher(dsmd.getUrl()); - if (matcher.matches()) { - statusHandler - .info("Found rap_f dataset - updating dataset name from [" - + dsmd.getDataSetName() + "] to [rap_f]"); - dsmd.setDataSetName("rap_f"); - } - // TODO: End of hack.. - - BandwidthEventBus.publish(dsmd); - } else { - statusHandler.error("No DataSetMetaData found for id [" + id - + "]"); - } - - } else if (DataDeliveryRegistryObjectTypes.SITE_SUBSCRIPTION - .equals(objectType) - || DataDeliveryRegistryObjectTypes.SHARED_SUBSCRIPTION - .equals(objectType)) { - - Subscription subscription = getSubscription(id); - - if (subscription != null) { - - // Make sure the subscriptionId is set to the - // RegistryObjectId - subscription.setId(id); - // Repost the Object to the BandwidthEventBus to free - // the notification thread. - BandwidthEventBus.publish(subscription); - - } else { - statusHandler - .error("No Subscription found for id [" + id + "]"); - } - } - } - - private static DataSetMetaData getDataSetMetaData(String id) { - return getRegistryObjectById( - DataDeliveryHandlers.getDataSetMetaDataHandler(), id); - } - - private static Subscription getSubscription(String id) { - return getRegistryObjectById( - DataDeliveryHandlers.getSubscriptionHandler(), id); - } - - private static T getRegistryObjectById( - IRegistryObjectHandler handler, String id) { - try { - return handler.getById(id); - } catch (RegistryHandlerException e) { - statusHandler.error("Error attempting to retrieve RegistryObject[" - + id + "] from Registry.", e); - return null; - } - } - - /** - * Process a {@link GriddedDataSetMetaData} that was received from the event - * bus. - * - * @param dataSetMetaData - * the metadadata - */ - @Subscribe - public void updateGriddedDataSetMetaData( - GriddedDataSetMetaData dataSetMetaData) throws ParseException { - // Daily/Hourly/Monthly datasets - if (dataSetMetaData.getCycle() == GriddedDataSetMetaData.NO_CYCLE) { - updateDataSetMetaDataWithoutCycle(dataSetMetaData); - } - // Regular cycle containing datasets - else { - updateDataSetMetaDataWithCycle(dataSetMetaData); - } - } - - /** - * Process a {@link PointDataSetMetaData} that was received from the event - * bus. - * - * @param dataSetMetaData - * the metadadata - */ - @Subscribe - public void updatePointDataSetMetaData(PointDataSetMetaData dataSetMetaData) { - // TODO: Change PointDataSetMetaData to only be able to use PointTime - // objects - final PointTime time = (PointTime) dataSetMetaData.getTime(); - final String providerName = dataSetMetaData.getProviderName(); - final String dataSetName = dataSetMetaData.getDataSetName(); - final Date pointTimeStart = time.getStartDate(); - final Date pointTimeEnd = time.getEndDate(); - - final SortedSet allowedRefreshIntervals = PointTime - .getAllowedRefreshIntervals(); - final long maxAllowedRefreshIntervalInMillis = TimeUtil.MILLIS_PER_MINUTE - * allowedRefreshIntervals.last(); - final long minAllowedRefreshIntervalInMillis = TimeUtil.MILLIS_PER_MINUTE - * allowedRefreshIntervals.first(); - - // Find any retrievals ranging from those with the minimum refresh - // interval to the maximum refresh interval - final Date startDate = new Date(pointTimeStart.getTime() - + minAllowedRefreshIntervalInMillis); - final Date endDate = new Date(pointTimeEnd.getTime() - + maxAllowedRefreshIntervalInMillis); - - final SortedSet subscriptionRetrievals = bandwidthDao - .getSubscriptionRetrievals(providerName, dataSetName, - RetrievalStatus.SCHEDULED, startDate, endDate); - - if (!CollectionUtil.isNullOrEmpty(subscriptionRetrievals)) { - for (SubscriptionRetrieval retrieval : subscriptionRetrievals) { - // Now check and make sure that at least one of the times falls - // in their retrieval range, their latency is the retrieval - // interval - final int retrievalInterval = retrieval - .getSubscriptionLatency(); - - // This is the latest time on the data we care about, once the - // retrieval is signaled to go it retrieves everything up to - // its start time - final Date latestRetrievalDataTime = retrieval.getStartTime() - .getTime(); - // This is the earliest possible time this retrieval cares about - final Date earliestRetrievalDataTime = new Date( - latestRetrievalDataTime.getTime() - - (TimeUtil.MILLIS_PER_MINUTE * retrievalInterval)); - - // If the end is before any times we care about or the start is - // after the latest times we care about, skip it - if (pointTimeEnd.before(earliestRetrievalDataTime) - || pointTimeStart.after(latestRetrievalDataTime)) { - continue; - } - - try { - // Update the retrieval times on the subscription object - // which goes through the retrieval process - final Subscription subscription = retrieval - .getSubscription(); - subscription.setUrl(dataSetMetaData.getUrl()); - subscription.setProvider(dataSetMetaData.getProviderName()); - - if (subscription.getTime() instanceof PointTime) { - final PointTime subTime = (PointTime) subscription - .getTime(); - subTime.setRequestStartAsDate(earliestRetrievalDataTime); - subTime.setRequestEndAsDate(latestRetrievalDataTime); - subTime.setTimes(time.getTimes()); - // Now update the retrieval to be ready - retrieval.setStatus(RetrievalStatus.READY); - bandwidthDaoUtil.update(retrieval); - } else { - throw new IllegalArgumentException("Subscription time not PointType! " + subscription.getName()); - } - - } catch (SerializationException e) { - statusHandler.handle(Priority.PROBLEM, - e.getLocalizedMessage(), e); - } - } - } - } - - /** - * Handles updates for datasets that do not contain cycles. - * - * @param dataSetMetaData - * the dataset metadata - * @throws ParseException - * on parsing errors - */ - private void updateDataSetMetaDataWithoutCycle( - GriddedDataSetMetaData dataSetMetaData) throws ParseException { - bandwidthDao.newBandwidthDataSetUpdate(dataSetMetaData); - - // Looking for active subscriptions to the dataset. - try { - List subscriptions = DataDeliveryHandlers - .getSubscriptionHandler().getActiveByDataSetAndProvider( - dataSetMetaData.getDataSetName(), - dataSetMetaData.getProviderName()); - - if (subscriptions.isEmpty()) { - return; - } - - statusHandler - .info(String - .format("Found [%s] subscriptions that will have an " - + "adhoc subscription generated and scheduled for url [%s].", - subscriptions.size(), - dataSetMetaData.getUrl())); - - // Create an adhoc for each one, and schedule it - for (Subscription subscription : subscriptions) { - Subscription sub = updateSubscriptionWithDataSetMetaData( - subscription, dataSetMetaData); - - if (sub instanceof SiteSubscription) { - schedule(new AdhocSubscription((SiteSubscription) sub)); - } else { - statusHandler - .warn("Unable to create adhoc queries for shared subscriptions at this point. This functionality should be added in the future..."); - } - } - } catch (RegistryHandlerException e) { - statusHandler.handle(Priority.PROBLEM, - "Failed to lookup subscriptions.", e); - } - } - - /** - * Handles updates for datasets that contain cycles. - * - * @param dataSetMetaData - * the dataset metadata - * @throws ParseException - * on parsing errors - */ - private void updateDataSetMetaDataWithCycle( - GriddedDataSetMetaData dataSetMetaData) throws ParseException { - BandwidthDataSetUpdate dataset = bandwidthDao - .newBandwidthDataSetUpdate(dataSetMetaData); - - // Looking for active subscriptions to the dataset. - List subscriptions = bandwidthDao - .getSubscriptionRetrievals(dataset.getProviderName(), - dataset.getDataSetName(), dataset.getDataSetBaseTime()); - - if (!subscriptions.isEmpty()) { - // Loop through the scheduled SubscriptionRetrievals and mark - // the scheduled retrievals as ready for retrieval - for (SubscriptionRetrieval retrieval : subscriptions) { - // TODO: Evaluate the state changes for receiving multiple - // dataset update messages. This seems to be happening - // quite a bit. - - if (RetrievalStatus.SCHEDULED.equals(retrieval.getStatus())) { - // Need to update the Subscription Object in the - // SubscriptionRetrieval with the current DataSetMetaData - // URL and time Object - Subscription sub; - try { - sub = updateSubscriptionWithDataSetMetaData( - retrieval.getSubscription(), dataSetMetaData); - - // Update the SubscriptionRetrieval record with the new - // data... - retrieval.setSubscription(sub); - } catch (SerializationException e) { - statusHandler - .handle(Priority.PROBLEM, - "Unable to serialize the subscription for the retrieval, skipping...", - e); - continue; - } - - retrieval.setStatus(RetrievalStatus.READY); - - bandwidthDaoUtil.update(retrieval); - - statusHandler - .info(String.format("Updated retrieval [%s] for " - + "subscription [%s] to use " - + "url [%s] and " - + "base reference time [%s]", retrieval - .getIdentifier(), sub.getName(), - dataSetMetaData.getUrl(), BandwidthUtil - .format(sub.getTime() - .getStartDate()))); - } - } - - // Notify RetrievalAgentManager of updated RetrievalRequests. - retrievalManager.wakeAgents(); - } else { - if (statusHandler.isPriorityEnabled(Priority.DEBUG)) { - statusHandler - .debug("No Subscriptions scheduled for BandwidthDataSetUpdate [" - + dataset.getIdentifier() - + "] base time [" - + BandwidthUtil.format(dataset - .getDataSetBaseTime()) + "]"); - } - } - } - - /** - * Updates a {@link Subscription) to reflect important attributes of the - * specified {@link DataSetMetaData}. - * - * @param sub - * the subscription - * @param dataSetMetaData - * the datasetmetadata update - * @return the subscription - */ - private static Subscription updateSubscriptionWithDataSetMetaData( - Subscription sub, DataSetMetaData dataSetMetaData) { - final Time dsmdTime = dataSetMetaData.getTime(); - final Time subTime = sub.getTime(); - dsmdTime.setSelectedTimeIndices(subTime.getSelectedTimeIndices()); - dsmdTime.setCycleTimes(subTime.getCycleTimes()); - sub.setTime(dsmdTime); - sub.setUrl(dataSetMetaData.getUrl()); - - return sub; } private List schedule(Subscription subscription, @@ -611,7 +229,7 @@ public abstract class BandwidthManager extends return unscheduled; } - private List schedule(BandwidthSubscription dao) { + protected List schedule(BandwidthSubscription dao) { Calendar retrievalTime = dao.getBaseReferenceTime(); // Retrieve all the current subscriptions by provider, dataset name and @@ -742,33 +360,6 @@ public abstract class BandwidthManager extends return unscheduled; } - /** - * {@inheritDoc} - */ - @Override - @Subscribe - @AllowConcurrentEvents - public void subscriptionRemoved(RemoveRegistryEvent event) { - String objectType = event.getObjectType(); - if (objectType != null) { - if (DataDeliveryRegistryObjectTypes.SITE_SUBSCRIPTION - .equals(objectType) - || DataDeliveryRegistryObjectTypes.SHARED_SUBSCRIPTION - .equals(objectType)) { - statusHandler - .info("Recieved Subscription removal notification for Subscription [" - + event.getId() + "]"); - // Need to locate and remove all BandwidthReservations for the - // given subscription.. - List l = bandwidthDao - .getBandwidthSubscriptionByRegistryId(event.getId()); - if (!l.isEmpty()) { - remove(l); - } - } - } - } - /** * {@inheritDoc} */ @@ -1016,7 +607,7 @@ public abstract class BandwidthManager extends * The subscriptionDao's to remove. * @return */ - private List remove( + protected List remove( List bandwidthSubscriptions) { List unscheduled = new ArrayList(); @@ -1028,100 +619,6 @@ public abstract class BandwidthManager extends return unscheduled; } - /** - * {@inheritDoc} - */ - @Override - @Subscribe - public void subscriptionFulfilled( - SubscriptionRetrievalFulfilled subscriptionRetrievalFulfilled) { - - statusHandler.info("subscriptionFullfilled() :: " - + subscriptionRetrievalFulfilled); - - SubscriptionRetrieval sr = subscriptionRetrievalFulfilled - .getSubscriptionRetrieval(); - - List subscriptionRetrievals = bandwidthDao - .querySubscriptionRetrievals(sr.getBandwidthSubscription()); - - // Look to see if all the SubscriptionRetrieval's for a subscription are - // completed. - boolean complete = true; - for (SubscriptionRetrieval subscription : subscriptionRetrievals) { - if (!RetrievalStatus.FULFILLED.equals(subscription.getStatus())) { - complete = false; - break; - } - } - - if (complete) { - // Remove the completed SubscriptionRetrieval Objects from the - // plan.. - RetrievalPlan plan = retrievalManager.getPlan(sr.getNetwork()); - plan.remove(sr); - - // Schedule the next iteration of the subscription - BandwidthSubscription dao = sr.getBandwidthSubscription(); - Subscription subscription = null; - try { - subscription = dao.getSubscription(); - } catch (SerializationException e) { - statusHandler.error( - "Failed to extract Subscription from BandwidthSubscription [" - + dao.getIdentifier() + "]", e); - // No sense in continuing - return; - } - - // AdhocSubscriptions are one and done, so don't reschedule. - if (subscription instanceof AdhocSubscription) { - return; - } - - Calendar next = BandwidthUtil.copy(dao.getBaseReferenceTime()); - // See how far into the future the plan goes.. - int days = retrievalManager.getPlan(dao.getRoute()).getPlanDays(); - - for (int day = 1; day <= days; day++) { - - next.add(Calendar.DAY_OF_YEAR, 1); - // Since subscriptions are based on cycles in a day, add one day - // to the - // completed BandwidthSubscription to get the next days - // retrieval. - - // Now check if that BandwidthSubscription has already been - // scheduled. - BandwidthSubscription a = bandwidthDao - .getBandwidthSubscription(dao.getRegistryId(), next); - if (a == null) { - // Create the new BandwidthSubscription record with the next - // time.. - try { - a = bandwidthDao.newBandwidthSubscription(subscription, - next); - } catch (SerializationException e) { - - statusHandler.error( - "Failed to create new BandwidthSubscription from Subscription [" - + subscription.getId() - + "] baseReferenceTime [" - + BandwidthUtil.format(next) + "]", e); - } - - schedule(a); - } else { - statusHandler - .info("Subscription [" - + subscription.getName() - + "] has already been scheduled for baseReferenceTime [" - + BandwidthUtil.format(next) + "]"); - } - } - } - } - /** * {@inheritDoc} */ @@ -1618,29 +1115,6 @@ public abstract class BandwidthManager extends initializer.init(this, dbInit, retrievalManager); } - /** - * Private inner work thread used to keep the RetrievalPlans up to date. - * - */ - private class MaintanenceTask implements Runnable { - - @Override - public void run() { - for (RetrievalPlan plan : retrievalManager.getRetrievalPlans() - .values()) { - plan.resize(); - Calendar newEnd = plan.getPlanEnd(); - - // Find DEFERRED Allocations and load them into the plan... - List deferred = bandwidthDao.getDeferred( - plan.getNetwork(), newEnd); - if (!deferred.isEmpty()) { - retrievalManager.schedule(deferred); - } - } - } - } - /** * {@inheritDoc} */ @@ -1725,35 +1199,14 @@ public abstract class BandwidthManager extends */ @VisibleForTesting void shutdown() { - unregisterFromEventBus(); - unregisterFromBandwidthEventBus(); - try { retrievalManager.shutdown(); } catch (Exception e) { statusHandler.handle(Priority.WARN, "Unable to shutdown the retrievalManager.", e); + } finally { + shutdownInternal(); } - try { - scheduler.shutdownNow(); - } catch (Exception e) { - statusHandler.handle(Priority.WARN, - "Unable to shutdown the scheduler.", e); - } - } - - /** - * Unregister from the {@link EventBus}. - */ - private void unregisterFromEventBus() { - EventBus.unregister(this); - } - - /** - * Unregister from the {@link BandwidthEventBus}. - */ - private void unregisterFromBandwidthEventBus() { - BandwidthEventBus.register(this); } /** @@ -1911,18 +1364,7 @@ public abstract class BandwidthManager extends } /** - * Signals the bandwidth map localization file is updated, perform a - * reinitialize operation. + * Provide implementation specific shutdown. */ - private void bandwidthMapConfigurationUpdated() { - IBandwidthRequest request = new IBandwidthRequest(); - request.setRequestType(RequestType.REINITIALIZE); - - try { - handleRequest(request); - } catch (Exception e) { - statusHandler.handle(Priority.PROBLEM, - "Error while reinitializing the bandwidth manager.", e); - } - } + protected abstract void shutdownInternal(); } diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/EdexBandwidthContextFactory.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/EdexBandwidthContextFactory.java index 703921b7c2..698cf49e57 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/EdexBandwidthContextFactory.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/EdexBandwidthContextFactory.java @@ -21,6 +21,8 @@ package com.raytheon.uf.edex.datadelivery.bandwidth; import java.io.File; +import com.raytheon.uf.common.datadelivery.registry.handlers.IDataSetMetaDataHandler; +import com.raytheon.uf.common.datadelivery.registry.handlers.ISubscriptionHandler; import com.raytheon.uf.common.localization.IPathManager; import com.raytheon.uf.common.localization.LocalizationContext; import com.raytheon.uf.common.localization.LocalizationContext.LocalizationLevel; @@ -47,6 +49,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthDaoUtil; * ------------ ---------- ----------- -------------------------- * Oct 24, 2012 1286 djohnson Initial creation * Feb 20, 2013 1543 djohnson Add IEdexBandwidthManagerCreator. + * Jul 10, 2013 2106 djohnson Dependency inject registry handlers. * * * @@ -68,14 +71,18 @@ public class EdexBandwidthContextFactory implements BandwidthContextFactory { * @param bandwidthDao * @param retrievalManager * @param bandwidthDaoUtil + * @param dataSetMetaDataHandler + * @param subscriptionHandler * @return the bandwidth manager */ IBandwidthManager getBandwidthManager(IBandwidthDbInit dbInit, IBandwidthDao bandwidthDao, RetrievalManager retrievalManager, - BandwidthDaoUtil bandwidthDaoUtil); + BandwidthDaoUtil bandwidthDaoUtil, + IDataSetMetaDataHandler dataSetMetaDataHandler, + ISubscriptionHandler subscriptionHandler); } - private static BandwidthManager instance; + private static EdexBandwidthManager instance; private final IBandwidthDao bandwidthDao; @@ -87,23 +94,36 @@ public class EdexBandwidthContextFactory implements BandwidthContextFactory { private final IBandwidthDbInit dbInit; + private final IDataSetMetaDataHandler dataSetMetaDataHandler; + + private final ISubscriptionHandler subscriptionHandler; + /** * Intentionally package-private constructor, as it is created from Spring * which is able to reflectively instantiate. * * @param bandwidthDao - * @param findSubscriptionStrategy + * @param bandwidthBucketDao + * @param bandwidthInitializer + * @param bandwidthManagerCreator + * @param dbInit + * @param dataSetMetaDataHandler + * @param subscriptionHandler */ EdexBandwidthContextFactory(IBandwidthDao bandwidthDao, IBandwidthBucketDao bandwidthBucketDao, BandwidthInitializer bandwidthInitializer, IEdexBandwidthManagerCreator bandwidthManagerCreator, - IBandwidthDbInit dbInit) { + IBandwidthDbInit dbInit, + IDataSetMetaDataHandler dataSetMetaDataHandler, + ISubscriptionHandler subscriptionHandler) { this.bandwidthDao = bandwidthDao; this.bandwidthBucketDao = bandwidthBucketDao; this.bandwidthInitializer = bandwidthInitializer; this.bandwidthManagerCreator = bandwidthManagerCreator; this.dbInit = dbInit; + this.dataSetMetaDataHandler = dataSetMetaDataHandler; + this.subscriptionHandler = subscriptionHandler; } /** @@ -114,13 +134,9 @@ public class EdexBandwidthContextFactory implements BandwidthContextFactory { * @param instance * the {@link BandwidthManager} instance */ - EdexBandwidthContextFactory(BandwidthManager instance) { + EdexBandwidthContextFactory(EdexBandwidthManager instance) { + this(null, null, null, null, null, null, null); EdexBandwidthContextFactory.instance = instance; - this.bandwidthDao = null; - this.bandwidthBucketDao = null; - this.bandwidthInitializer = null; - this.bandwidthManagerCreator = null; - this.dbInit = null; } /** @@ -130,7 +146,7 @@ public class EdexBandwidthContextFactory implements BandwidthContextFactory { * * @return the instance */ - static BandwidthManager getInstance() { + static EdexBandwidthManager getInstance() { return instance; } @@ -210,6 +226,6 @@ public class EdexBandwidthContextFactory implements BandwidthContextFactory { IBandwidthDao bandwidthDao, RetrievalManager retrievalManager, BandwidthDaoUtil bandwidthDaoUtil) { return bandwidthManagerCreator.getBandwidthManager(dbInit, - bandwidthDao, retrievalManager, bandwidthDaoUtil); + bandwidthDao, retrievalManager, bandwidthDaoUtil, dataSetMetaDataHandler, subscriptionHandler); } } diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/EdexBandwidthManager.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/EdexBandwidthManager.java new file mode 100644 index 0000000000..d268b9f6f0 --- /dev/null +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/EdexBandwidthManager.java @@ -0,0 +1,670 @@ +/** + * This software was developed and / or modified by Raytheon Company, + * pursuant to Contract DG133W-05-CQ-1067 with the US Government. + * + * U.S. EXPORT CONTROLLED TECHNICAL DATA + * This software product contains export-restricted data whose + * export/transfer/disclosure is restricted by U.S. law. Dissemination + * to non-U.S. persons whether in the United States or abroad requires + * an export license or other authorization. + * + * Contractor Name: Raytheon Company + * Contractor Address: 6825 Pine Street, Suite 340 + * Mail Stop B8 + * Omaha, NE 68106 + * 402.291.0100 + * + * See the AWIPS II Master Rights File ("Master Rights File.pdf") for + * further licensing information. + **/ +package com.raytheon.uf.edex.datadelivery.bandwidth; + +import java.text.ParseException; +import java.util.Calendar; +import java.util.Date; +import java.util.List; +import java.util.SortedSet; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.eventbus.AllowConcurrentEvents; +import com.google.common.eventbus.Subscribe; +import com.raytheon.uf.common.datadelivery.bandwidth.IBandwidthRequest; +import com.raytheon.uf.common.datadelivery.bandwidth.IBandwidthRequest.RequestType; +import com.raytheon.uf.common.datadelivery.registry.AdhocSubscription; +import com.raytheon.uf.common.datadelivery.registry.DataDeliveryRegistryObjectTypes; +import com.raytheon.uf.common.datadelivery.registry.DataSetMetaData; +import com.raytheon.uf.common.datadelivery.registry.GriddedDataSetMetaData; +import com.raytheon.uf.common.datadelivery.registry.PointDataSetMetaData; +import com.raytheon.uf.common.datadelivery.registry.PointTime; +import com.raytheon.uf.common.datadelivery.registry.SiteSubscription; +import com.raytheon.uf.common.datadelivery.registry.Subscription; +import com.raytheon.uf.common.datadelivery.registry.Time; +import com.raytheon.uf.common.datadelivery.registry.handlers.IDataSetMetaDataHandler; +import com.raytheon.uf.common.datadelivery.registry.handlers.ISubscriptionHandler; +import com.raytheon.uf.common.event.EventBus; +import com.raytheon.uf.common.registry.event.InsertRegistryEvent; +import com.raytheon.uf.common.registry.event.RemoveRegistryEvent; +import com.raytheon.uf.common.registry.handler.IRegistryObjectHandler; +import com.raytheon.uf.common.registry.handler.RegistryHandlerException; +import com.raytheon.uf.common.serialization.SerializationException; +import com.raytheon.uf.common.status.UFStatus.Priority; +import com.raytheon.uf.common.time.util.TimeUtil; +import com.raytheon.uf.common.util.CollectionUtil; +import com.raytheon.uf.common.util.FileUtil; +import com.raytheon.uf.common.util.IFileModifiedWatcher; +import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthAllocation; +import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthDataSetUpdate; +import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthSubscription; +import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDao; +import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDbInit; +import com.raytheon.uf.edex.datadelivery.bandwidth.dao.SubscriptionRetrieval; +import com.raytheon.uf.edex.datadelivery.bandwidth.notification.BandwidthEventBus; +import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalManager; +import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalPlan; +import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalStatus; +import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.SubscriptionRetrievalFulfilled; +import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthDaoUtil; +import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil; + +/** + * Implementation of {@link BandwidthManager} that isolates EDEX specific + * functionality. This keeps things out of the {@link InMemoryBandwidthManager} + * that could interfere with garbage collection/threading concerns. + * + *
+ * 
+ * SOFTWARE HISTORY
+ * 
+ * Date         Ticket#    Engineer    Description
+ * ------------ ---------- ----------- --------------------------
+ * Jul 10, 2013 2106       djohnson     Extracted from {@link BandwidthManager}.
+ * 
+ * 
+ * + * @author djohnson + * @version 1.0 + */ +public abstract class EdexBandwidthManager extends BandwidthManager { + + private static final Pattern RAP_PATTERN = Pattern + .compile(".*rap_f\\d\\d$"); + + private final IDataSetMetaDataHandler dataSetMetaDataHandler; + + private final ISubscriptionHandler subscriptionHandler; + + private final ScheduledExecutorService scheduler; + + @VisibleForTesting + final Runnable watchForConfigFileChanges = new Runnable() { + + private final IFileModifiedWatcher fileModifiedWatcher = FileUtil + .getFileModifiedWatcher(EdexBandwidthContextFactory + .getBandwidthMapConfig()); + + @Override + public void run() { + if (fileModifiedWatcher.hasBeenModified()) { + bandwidthMapConfigurationUpdated(); + } + } + }; + + /** + * @param dbInit + * @param bandwidthDao + * @param retrievalManager + * @param bandwidthDaoUtil + */ + public EdexBandwidthManager(IBandwidthDbInit dbInit, + IBandwidthDao bandwidthDao, RetrievalManager retrievalManager, + BandwidthDaoUtil bandwidthDaoUtil, + IDataSetMetaDataHandler dataSetMetaDataHandler, + ISubscriptionHandler subscriptionHandler) { + super(dbInit, bandwidthDao, retrievalManager, bandwidthDaoUtil); + + this.dataSetMetaDataHandler = dataSetMetaDataHandler; + this.subscriptionHandler = subscriptionHandler; + + // schedule maintenance tasks + scheduler = Executors.newScheduledThreadPool(1); + // TODO: Uncomment the last line in this comment block when fully + // switched over to Java 1.7 and remove the finally block in shutdown, + // that is also marked as TODO + // This will allow the bandwidth manager to be garbage collected without + // waiting for all of the delayed tasks to expire, currently they are + // manually removed in the shutdown method by casting to the + // implementation and clearing the queue + // scheduler.setRemoveOnCancelPolicy(true); + scheduler.scheduleAtFixedRate(watchForConfigFileChanges, 1, 1, + TimeUnit.MINUTES); + scheduler.scheduleAtFixedRate(new MaintanenceTask(), 1, 5, + TimeUnit.MINUTES); + } + + /** + * {@inheritDoc} + */ + @Override + protected void shutdownInternal() { + unregisterFromEventBus(); + unregisterFromBandwidthEventBus(); + + try { + scheduler.shutdownNow(); + } catch (Exception e) { + statusHandler.handle(Priority.WARN, + "Unable to shutdown the scheduler.", e); + } finally { + // TODO: Remove this finally block when fully switched over to Java + // 1.7. See TODO comment in the constructor. + if (scheduler != null + && scheduler instanceof ScheduledThreadPoolExecutor) { + ((ScheduledThreadPoolExecutor) scheduler).getQueue().clear(); + } + } + EventBus.unregister(retrievalManager); + } + + /** + * Unregister from the {@link EventBus}. + */ + private void unregisterFromEventBus() { + EventBus.unregister(this); + } + + /** + * Unregister from the {@link BandwidthEventBus}. + */ + private void unregisterFromBandwidthEventBus() { + BandwidthEventBus.unregister(this); + } + + /** + * The callback method for BandwidthEventBus to use to notify + * BandwidthManager that retrievalManager has completed the retrievals for a + * Subscription. The updated BandwidthSubscription Object is placed on the + * BandwidthEventBus. + * + * @param subscription + * The completed subscription. + */ + @Subscribe + public void subscriptionFulfilled( + SubscriptionRetrievalFulfilled subscriptionRetrievalFulfilled) { + + statusHandler.info("subscriptionFullfilled() :: " + + subscriptionRetrievalFulfilled); + + SubscriptionRetrieval sr = subscriptionRetrievalFulfilled + .getSubscriptionRetrieval(); + + List subscriptionRetrievals = bandwidthDao + .querySubscriptionRetrievals(sr.getBandwidthSubscription()); + + // Look to see if all the SubscriptionRetrieval's for a subscription are + // completed. + boolean complete = true; + for (SubscriptionRetrieval subscription : subscriptionRetrievals) { + if (!RetrievalStatus.FULFILLED.equals(subscription.getStatus())) { + complete = false; + break; + } + } + + if (complete) { + // Remove the completed SubscriptionRetrieval Objects from the + // plan.. + RetrievalPlan plan = retrievalManager.getPlan(sr.getNetwork()); + plan.remove(sr); + + // Schedule the next iteration of the subscription + BandwidthSubscription dao = sr.getBandwidthSubscription(); + Subscription subscription = null; + try { + subscription = dao.getSubscription(); + } catch (SerializationException e) { + statusHandler.error( + "Failed to extract Subscription from BandwidthSubscription [" + + dao.getIdentifier() + "]", e); + // No sense in continuing + return; + } + + // AdhocSubscriptions are one and done, so don't reschedule. + if (subscription instanceof AdhocSubscription) { + return; + } + + Calendar next = BandwidthUtil.copy(dao.getBaseReferenceTime()); + // See how far into the future the plan goes.. + int days = retrievalManager.getPlan(dao.getRoute()).getPlanDays(); + + for (int day = 1; day <= days; day++) { + + next.add(Calendar.DAY_OF_YEAR, 1); + // Since subscriptions are based on cycles in a day, add one day + // to the + // completed BandwidthSubscription to get the next days + // retrieval. + + // Now check if that BandwidthSubscription has already been + // scheduled. + BandwidthSubscription a = bandwidthDao + .getBandwidthSubscription(dao.getRegistryId(), next); + if (a == null) { + // Create the new BandwidthSubscription record with the next + // time.. + try { + a = bandwidthDao.newBandwidthSubscription(subscription, + next); + } catch (SerializationException e) { + + statusHandler.error( + "Failed to create new BandwidthSubscription from Subscription [" + + subscription.getId() + + "] baseReferenceTime [" + + BandwidthUtil.format(next) + "]", e); + } + + schedule(a); + } else { + statusHandler + .info("Subscription [" + + subscription.getName() + + "] has already been scheduled for baseReferenceTime [" + + BandwidthUtil.format(next) + "]"); + } + } + } + } + + /** + * When a Subscription is removed from the Registry, a RemoveRegistryEvent + * is generated and forwarded to this method to remove the necessary + * BandwidthReservations (and perhaps redefine others). + * + * @param event + */ + @Subscribe + @AllowConcurrentEvents + public void subscriptionRemoved(RemoveRegistryEvent event) { + String objectType = event.getObjectType(); + if (objectType != null) { + if (DataDeliveryRegistryObjectTypes.SITE_SUBSCRIPTION + .equals(objectType) + || DataDeliveryRegistryObjectTypes.SHARED_SUBSCRIPTION + .equals(objectType)) { + statusHandler + .info("Recieved Subscription removal notification for Subscription [" + + event.getId() + "]"); + // Need to locate and remove all BandwidthReservations for the + // given subscription.. + List l = bandwidthDao + .getBandwidthSubscriptionByRegistryId(event.getId()); + if (!l.isEmpty()) { + remove(l); + } + } + } + } + + /** + * Create a hook into the EDEX Notification sub-system to receive the the + * necessary InsertRegistryEvents to drive Bandwidth Management. + * + * @param re + * The InsertRegistryEvent Object to evaluate. + */ + @Subscribe + @AllowConcurrentEvents + public void registryEventListener(InsertRegistryEvent re) { + final String objectType = re.getObjectType(); + final String id = re.getId(); + + if (DataDeliveryRegistryObjectTypes.DATASETMETADATA.equals(objectType)) { + + DataSetMetaData dsmd = getDataSetMetaData(id); + + if (dsmd != null) { + // Repost the Object to the BandwidthEventBus to free + // the notification thread. + + // TODO: A hack to prevent rap_f and rap datasets being + // Identified as the + // same dataset... + Matcher matcher = RAP_PATTERN.matcher(dsmd.getUrl()); + if (matcher.matches()) { + statusHandler + .info("Found rap_f dataset - updating dataset name from [" + + dsmd.getDataSetName() + "] to [rap_f]"); + dsmd.setDataSetName("rap_f"); + } + // TODO: End of hack.. + + BandwidthEventBus.publish(dsmd); + } else { + statusHandler.error("No DataSetMetaData found for id [" + id + + "]"); + } + + } + } + + private DataSetMetaData getDataSetMetaData(String id) { + return getRegistryObjectById(dataSetMetaDataHandler, id); + } + + private static T getRegistryObjectById( + IRegistryObjectHandler handler, String id) { + try { + return handler.getById(id); + } catch (RegistryHandlerException e) { + statusHandler.error("Error attempting to retrieve RegistryObject[" + + id + "] from Registry.", e); + return null; + } + } + + /** + * Process a {@link GriddedDataSetMetaData} that was received from the event + * bus. + * + * @param dataSetMetaData + * the metadadata + */ + @Subscribe + public void updateGriddedDataSetMetaData( + GriddedDataSetMetaData dataSetMetaData) throws ParseException { + // Daily/Hourly/Monthly datasets + if (dataSetMetaData.getCycle() == GriddedDataSetMetaData.NO_CYCLE) { + updateDataSetMetaDataWithoutCycle(dataSetMetaData); + } + // Regular cycle containing datasets + else { + updateDataSetMetaDataWithCycle(dataSetMetaData); + } + } + + /** + * Process a {@link PointDataSetMetaData} that was received from the event + * bus. + * + * @param dataSetMetaData + * the metadadata + */ + @Subscribe + public void updatePointDataSetMetaData(PointDataSetMetaData dataSetMetaData) { + // TODO: Change PointDataSetMetaData to only be able to use PointTime + // objects + final PointTime time = (PointTime) dataSetMetaData.getTime(); + final String providerName = dataSetMetaData.getProviderName(); + final String dataSetName = dataSetMetaData.getDataSetName(); + final Date pointTimeStart = time.getStartDate(); + final Date pointTimeEnd = time.getEndDate(); + + final SortedSet allowedRefreshIntervals = PointTime + .getAllowedRefreshIntervals(); + final long maxAllowedRefreshIntervalInMillis = TimeUtil.MILLIS_PER_MINUTE + * allowedRefreshIntervals.last(); + final long minAllowedRefreshIntervalInMillis = TimeUtil.MILLIS_PER_MINUTE + * allowedRefreshIntervals.first(); + + // Find any retrievals ranging from those with the minimum refresh + // interval to the maximum refresh interval + final Date startDate = new Date(pointTimeStart.getTime() + + minAllowedRefreshIntervalInMillis); + final Date endDate = new Date(pointTimeEnd.getTime() + + maxAllowedRefreshIntervalInMillis); + + final SortedSet subscriptionRetrievals = bandwidthDao + .getSubscriptionRetrievals(providerName, dataSetName, + RetrievalStatus.SCHEDULED, startDate, endDate); + + if (!CollectionUtil.isNullOrEmpty(subscriptionRetrievals)) { + for (SubscriptionRetrieval retrieval : subscriptionRetrievals) { + // Now check and make sure that at least one of the times falls + // in their retrieval range, their latency is the retrieval + // interval + final int retrievalInterval = retrieval + .getSubscriptionLatency(); + + // This is the latest time on the data we care about, once the + // retrieval is signaled to go it retrieves everything up to + // its start time + final Date latestRetrievalDataTime = retrieval.getStartTime() + .getTime(); + // This is the earliest possible time this retrieval cares about + final Date earliestRetrievalDataTime = new Date( + latestRetrievalDataTime.getTime() + - (TimeUtil.MILLIS_PER_MINUTE * retrievalInterval)); + + // If the end is before any times we care about or the start is + // after the latest times we care about, skip it + if (pointTimeEnd.before(earliestRetrievalDataTime) + || pointTimeStart.after(latestRetrievalDataTime)) { + continue; + } + + try { + // Update the retrieval times on the subscription object + // which goes through the retrieval process + final Subscription subscription = retrieval + .getSubscription(); + subscription.setUrl(dataSetMetaData.getUrl()); + subscription.setProvider(dataSetMetaData.getProviderName()); + + if (subscription.getTime() instanceof PointTime) { + final PointTime subTime = (PointTime) subscription + .getTime(); + subTime.setRequestStartAsDate(earliestRetrievalDataTime); + subTime.setRequestEndAsDate(latestRetrievalDataTime); + subTime.setTimes(time.getTimes()); + // Now update the retrieval to be ready + retrieval.setStatus(RetrievalStatus.READY); + bandwidthDaoUtil.update(retrieval); + } else { + throw new IllegalArgumentException( + "Subscription time not PointType! " + + subscription.getName()); + } + + } catch (SerializationException e) { + statusHandler.handle(Priority.PROBLEM, + e.getLocalizedMessage(), e); + } + } + } + } + + /** + * Handles updates for datasets that do not contain cycles. + * + * @param dataSetMetaData + * the dataset metadata + * @throws ParseException + * on parsing errors + */ + private void updateDataSetMetaDataWithoutCycle( + GriddedDataSetMetaData dataSetMetaData) throws ParseException { + bandwidthDao.newBandwidthDataSetUpdate(dataSetMetaData); + + // Looking for active subscriptions to the dataset. + try { + List subscriptions = subscriptionHandler + .getActiveByDataSetAndProvider( + dataSetMetaData.getDataSetName(), + dataSetMetaData.getProviderName()); + + if (subscriptions.isEmpty()) { + return; + } + + statusHandler + .info(String + .format("Found [%s] subscriptions that will have an " + + "adhoc subscription generated and scheduled for url [%s].", + subscriptions.size(), + dataSetMetaData.getUrl())); + + // Create an adhoc for each one, and schedule it + for (Subscription subscription : subscriptions) { + Subscription sub = updateSubscriptionWithDataSetMetaData( + subscription, dataSetMetaData); + + if (sub instanceof SiteSubscription) { + schedule(new AdhocSubscription((SiteSubscription) sub)); + } else { + statusHandler + .warn("Unable to create adhoc queries for shared subscriptions at this point. This functionality should be added in the future..."); + } + } + } catch (RegistryHandlerException e) { + statusHandler.handle(Priority.PROBLEM, + "Failed to lookup subscriptions.", e); + } + } + + /** + * Handles updates for datasets that contain cycles. + * + * @param dataSetMetaData + * the dataset metadata + * @throws ParseException + * on parsing errors + */ + private void updateDataSetMetaDataWithCycle( + GriddedDataSetMetaData dataSetMetaData) throws ParseException { + BandwidthDataSetUpdate dataset = bandwidthDao + .newBandwidthDataSetUpdate(dataSetMetaData); + + // Looking for active subscriptions to the dataset. + List subscriptions = bandwidthDao + .getSubscriptionRetrievals(dataset.getProviderName(), + dataset.getDataSetName(), dataset.getDataSetBaseTime()); + + if (!subscriptions.isEmpty()) { + // Loop through the scheduled SubscriptionRetrievals and mark + // the scheduled retrievals as ready for retrieval + for (SubscriptionRetrieval retrieval : subscriptions) { + // TODO: Evaluate the state changes for receiving multiple + // dataset update messages. This seems to be happening + // quite a bit. + + if (RetrievalStatus.SCHEDULED.equals(retrieval.getStatus())) { + // Need to update the Subscription Object in the + // SubscriptionRetrieval with the current DataSetMetaData + // URL and time Object + Subscription sub; + try { + sub = updateSubscriptionWithDataSetMetaData( + retrieval.getSubscription(), dataSetMetaData); + + // Update the SubscriptionRetrieval record with the new + // data... + retrieval.setSubscription(sub); + } catch (SerializationException e) { + statusHandler + .handle(Priority.PROBLEM, + "Unable to serialize the subscription for the retrieval, skipping...", + e); + continue; + } + + retrieval.setStatus(RetrievalStatus.READY); + + bandwidthDaoUtil.update(retrieval); + + statusHandler + .info(String.format("Updated retrieval [%s] for " + + "subscription [%s] to use " + + "url [%s] and " + + "base reference time [%s]", retrieval + .getIdentifier(), sub.getName(), + dataSetMetaData.getUrl(), BandwidthUtil + .format(sub.getTime() + .getStartDate()))); + } + } + + // Notify RetrievalAgentManager of updated RetrievalRequests. + retrievalManager.wakeAgents(); + } else { + if (statusHandler.isPriorityEnabled(Priority.DEBUG)) { + statusHandler + .debug("No Subscriptions scheduled for BandwidthDataSetUpdate [" + + dataset.getIdentifier() + + "] base time [" + + BandwidthUtil.format(dataset + .getDataSetBaseTime()) + "]"); + } + } + } + + /** + * Updates a {@link Subscription) to reflect important attributes of the + * specified {@link DataSetMetaData}. + * + * @param sub + * the subscription + * @param dataSetMetaData + * the datasetmetadata update + * @return the subscription + */ + private static Subscription updateSubscriptionWithDataSetMetaData( + Subscription sub, DataSetMetaData dataSetMetaData) { + final Time dsmdTime = dataSetMetaData.getTime(); + final Time subTime = sub.getTime(); + dsmdTime.setSelectedTimeIndices(subTime.getSelectedTimeIndices()); + dsmdTime.setCycleTimes(subTime.getCycleTimes()); + sub.setTime(dsmdTime); + sub.setUrl(dataSetMetaData.getUrl()); + + return sub; + } + + /** + * Signals the bandwidth map localization file is updated, perform a + * reinitialize operation. + */ + private void bandwidthMapConfigurationUpdated() { + IBandwidthRequest request = new IBandwidthRequest(); + request.setRequestType(RequestType.REINITIALIZE); + + try { + handleRequest(request); + } catch (Exception e) { + statusHandler.handle(Priority.PROBLEM, + "Error while reinitializing the bandwidth manager.", e); + } + } + + /** + * Private inner work thread used to keep the RetrievalPlans up to date. + */ + private class MaintanenceTask implements Runnable { + + @Override + public void run() { + for (RetrievalPlan plan : retrievalManager.getRetrievalPlans() + .values()) { + plan.resize(); + Calendar newEnd = plan.getPlanEnd(); + + // Find DEFERRED Allocations and load them into the plan... + List deferred = bandwidthDao.getDeferred( + plan.getNetwork(), newEnd); + if (!deferred.isEmpty()) { + retrievalManager.schedule(deferred); + } + } + } + } + +} diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/IBandwidthManager.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/IBandwidthManager.java index 7d749abe4b..da8347506f 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/IBandwidthManager.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/IBandwidthManager.java @@ -21,17 +21,12 @@ package com.raytheon.uf.edex.datadelivery.bandwidth; import java.util.List; -import com.google.common.eventbus.AllowConcurrentEvents; -import com.google.common.eventbus.Subscribe; import com.raytheon.uf.common.datadelivery.registry.AdhocSubscription; import com.raytheon.uf.common.datadelivery.registry.Subscription; -import com.raytheon.uf.common.registry.event.InsertRegistryEvent; -import com.raytheon.uf.common.registry.event.RemoveRegistryEvent; import com.raytheon.uf.common.serialization.SerializationException; import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthAllocation; import com.raytheon.uf.edex.datadelivery.bandwidth.interfaces.BandwidthInitializer; import com.raytheon.uf.edex.datadelivery.bandwidth.interfaces.ISubscriptionAggregator; -import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.SubscriptionRetrievalFulfilled; /** * Defines the interface of a BandwidthManager. @@ -43,6 +38,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.SubscriptionRetriev * Date Ticket# Engineer Description * ------------ ---------- ----------- -------------------------- * Oct 30, 2012 1286 djohnson Initial creation + * Jul 10, 2013 2106 djohnson Remove EDEX instance specific methods. * * * @@ -52,27 +48,6 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.SubscriptionRetriev public interface IBandwidthManager { - /** - * Create a hook into the EDEX Notification sub-system to receive the the - * necessary InsertRegistryEvents to drive Bandwidth Management. - * - * @param re - * The InsertRegistryEvent Object to evaluate. - */ - @Subscribe - void registryEventListener(InsertRegistryEvent re); - - /** - * When a Subscription is removed from the Registry, a RemoveRegistryEvent - * is generated and forwarded to this method to remove the necessary - * BandwidthReservations (and perhaps redefine others). - * - * @param event - */ - @Subscribe - @AllowConcurrentEvents - void subscriptionRemoved(RemoveRegistryEvent event); - /** * Schedule all cycles of a Subscription. * @@ -99,7 +74,6 @@ public interface IBandwidthManager { * @return * @throws SerializationException */ - @Subscribe List subscriptionUpdated(Subscription subscription) throws SerializationException; @@ -110,19 +84,6 @@ public interface IBandwidthManager { */ List adhocSubscription(AdhocSubscription adhoc); - /** - * The callback method for BandwidthEventBus to use to notify - * BandwidthManager that retrievalManager has completed the retrievals for a - * Subscription. The updated BandwidthSubscription Object is placed on the - * BandwidthEventBus. - * - * @param subscription - * The completed subscription. - */ - @Subscribe - void subscriptionFulfilled( - SubscriptionRetrievalFulfilled subscriptionRetrievalFulfilled); - void setAggregator(ISubscriptionAggregator aggregator); ISubscriptionAggregator getAggregator(); diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/InMemoryBandwidthDao.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/InMemoryBandwidthDao.java index 860d51ddf0..05491226bc 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/InMemoryBandwidthDao.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/InMemoryBandwidthDao.java @@ -31,6 +31,7 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.raytheon.uf.common.datadelivery.registry.DataSetMetaData; import com.raytheon.uf.common.datadelivery.registry.Network; @@ -40,7 +41,6 @@ import com.raytheon.uf.common.serialization.SerializationException; import com.raytheon.uf.common.status.IUFStatusHandler; import com.raytheon.uf.common.status.UFStatus; import com.raytheon.uf.common.status.UFStatus.Priority; -import com.raytheon.uf.common.util.IDeepCopyable; import com.raytheon.uf.common.util.ReflectionUtil; import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthAllocation; import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthDataSetUpdate; @@ -64,6 +64,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil; * Dec 12, 2012 1286 djohnson Use concurrent lists to avoid concurrent modification exceptions. * Jun 03, 2013 2038 djohnson Add method to get subscription retrievals by provider, dataset, and status. * Jun 13, 2013 2095 djohnson Implement ability to store a collection of subscriptions. + * Jul 09, 2013 2106 djohnson Rather than copy all elements and remove unnecessary, just copy the ones that apply. * * * @@ -90,19 +91,16 @@ class InMemoryBandwidthDao implements IBandwidthDao { */ @Override public List getBandwidthAllocations(Long subscriptionId) { - List allocations = clone(bandwidthAllocations); + List allocations = new ArrayList(); - for (Iterator iter = allocations.iterator(); iter - .hasNext();) { - BandwidthAllocation current = iter.next(); + for (BandwidthAllocation current : bandwidthAllocations) { if ((current instanceof SubscriptionRetrieval) && ((SubscriptionRetrieval) current) .getBandwidthSubscription().getId() == subscriptionId) { - continue; + allocations.add(current.copy()); } - - iter.remove(); } + return allocations; } @@ -111,32 +109,15 @@ class InMemoryBandwidthDao implements IBandwidthDao { */ @Override public List getBandwidthAllocations(Network network) { - List results = clone(bandwidthAllocations); + List allocations = new ArrayList(); - for (Iterator iter = results.iterator(); iter - .hasNext();) { - BandwidthAllocation current = iter.next(); - if (network.equals(current.getNetwork())) { - continue; + for (BandwidthAllocation current : bandwidthAllocations) { + if (current.getNetwork() == network) { + allocations.add(current.copy()); } - - iter.remove(); - } - return results; - } - - /** - * @param sourceList - * @return - */ - private static > ArrayList clone( - ConcurrentLinkedQueue sourceList) { - ArrayList results = new ArrayList(sourceList.size()); - for (T instance : sourceList) { - results.add(instance.copy()); } - return results; + return allocations; } /** @@ -145,17 +126,15 @@ class InMemoryBandwidthDao implements IBandwidthDao { @Override public List getBandwidthAllocationsInState( RetrievalStatus state) { - List results = clone(bandwidthAllocations); - for (Iterator iter = results.iterator(); iter - .hasNext();) { - BandwidthAllocation current = iter.next(); - if (state.equals(current.getStatus())) { - continue; - } + List allocations = new ArrayList(); - iter.remove(); + for (BandwidthAllocation current : bandwidthAllocations) { + if (state.equals(current.getStatus())) { + allocations.add(current.copy()); + } } - return results; + + return allocations; } /** @@ -164,17 +143,13 @@ class InMemoryBandwidthDao implements IBandwidthDao { @Override public List getBandwidthDataSetUpdate( String providerName, String dataSetName) { - ArrayList results = clone(bandwidthDataSetUpdates); + List results = new ArrayList(); - for (Iterator iter = results.iterator(); iter - .hasNext();) { - BandwidthDataSetUpdate current = iter.next(); + for (BandwidthDataSetUpdate current : bandwidthDataSetUpdates) { if (providerName.equals(current.getProviderName()) && dataSetName.equals(current.getDataSetName())) { - continue; + results.add(current.copy()); } - - iter.remove(); } return results; @@ -208,19 +183,18 @@ class InMemoryBandwidthDao implements IBandwidthDao { @Override public List getDeferred(Network network, Calendar endTime) { - List results = getBandwidthAllocations(network); - for (Iterator iter = results.iterator(); iter - .hasNext();) { - BandwidthAllocation current = iter.next(); - if (RetrievalStatus.DEFERRED.equals(current.getStatus()) - && !current.getEndTime().after(endTime)) { - continue; - } - iter.remove(); + List allocations = new ArrayList(); + + for (BandwidthAllocation current : bandwidthAllocations) { + if (network == current.getNetwork() + && RetrievalStatus.DEFERRED.equals(current.getStatus()) + && !current.getEndTime().after(endTime)) { + allocations.add(current.copy()); + } } - return results; + return allocations; } /** @@ -228,10 +202,9 @@ class InMemoryBandwidthDao implements IBandwidthDao { */ @Override public BandwidthSubscription getBandwidthSubscription(long identifier) { - ArrayList bandwidthSubscriptions = clone(this.bandwidthSubscriptions); for (BandwidthSubscription dao : bandwidthSubscriptions) { if (dao.getIdentifier() == identifier) { - return dao; + return dao.copy(); } } return null; @@ -268,15 +241,12 @@ class InMemoryBandwidthDao implements IBandwidthDao { @Override public List getBandwidthSubscriptionByRegistryId( String registryId) { - final ArrayList results = clone(bandwidthSubscriptions); - for (Iterator iter = results.iterator(); iter - .hasNext();) { - final BandwidthSubscription current = iter.next(); - if (registryId.equals(current.getRegistryId())) { - continue; - } + final List results = Lists.newArrayList(); - iter.remove(); + for (BandwidthSubscription current : bandwidthSubscriptions) { + if (registryId.equals(current.getRegistryId())) { + results.add(current.copy()); + } } return results; } @@ -286,11 +256,10 @@ class InMemoryBandwidthDao implements IBandwidthDao { */ @Override public SubscriptionRetrieval getSubscriptionRetrieval(long identifier) { - ArrayList clone = clone(bandwidthAllocations); - for (BandwidthAllocation current : clone) { + for (BandwidthAllocation current : bandwidthAllocations) { if (current.getId() == identifier && current instanceof SubscriptionRetrieval) { - return (SubscriptionRetrieval) current; + return ((SubscriptionRetrieval) current).copy(); } } return null; @@ -302,21 +271,30 @@ class InMemoryBandwidthDao implements IBandwidthDao { @Override public List getSubscriptionRetrievals( String provider, String dataSetName, Calendar baseReferenceTime) { - List results = new ArrayList( - getSubscriptionRetrievals(provider, dataSetName)); - List subscriptionsMatching = getBandwidthSubscriptions( - provider, dataSetName, baseReferenceTime); + List results = Lists.newArrayList(); - OUTER: for (Iterator iter = results.iterator(); iter - .hasNext();) { - SubscriptionRetrieval current = iter.next(); - for (BandwidthSubscription subscription : subscriptionsMatching) { - if (current.getBandwidthSubscription().getId() == subscription - .getIdentifier()) { - continue OUTER; + for (BandwidthAllocation current : bandwidthAllocations) { + if (current instanceof SubscriptionRetrieval) { + Subscription subscription; + try { + final SubscriptionRetrieval subscriptionRetrieval = (SubscriptionRetrieval) current; + subscription = subscriptionRetrieval.getSubscription(); + if (provider.equals(subscription.getProvider()) + && dataSetName + .equals(subscription.getDataSetName()) + && baseReferenceTime.getTimeInMillis() == subscriptionRetrieval + .getBandwidthSubscription() + .getBaseReferenceTime().getTimeInMillis()) { + results.add(subscriptionRetrieval.copy()); + } + } catch (SerializationException e) { + statusHandler + .handle(Priority.PROBLEM, + "Unable to deserialize the retrieval's subscription, skipping it...", + e); } + } - iter.remove(); } return results; @@ -328,30 +306,24 @@ class InMemoryBandwidthDao implements IBandwidthDao { @Override public List getSubscriptionRetrievals( String provider, String dataSetName) { - ArrayList clone = clone(bandwidthAllocations); - List results = new ArrayList( - bandwidthAllocations.size()); + List results = Lists.newArrayList(); - for (Iterator iter = clone.iterator(); iter - .hasNext();) { - BandwidthAllocation current = iter.next(); + for (BandwidthAllocation current : bandwidthAllocations) { if (current instanceof SubscriptionRetrieval) { Subscription subscription; try { - subscription = ((SubscriptionRetrieval) current) - .getSubscription(); + final SubscriptionRetrieval subscriptionRetrieval = (SubscriptionRetrieval) current; + subscription = subscriptionRetrieval.getSubscription(); if (provider.equals(subscription.getProvider()) && dataSetName .equals(subscription.getDataSetName())) { - results.add((SubscriptionRetrieval) current); + results.add(subscriptionRetrieval.copy()); } } catch (SerializationException e) { statusHandler .handle(Priority.PROBLEM, - "Unable to deserialize the retrieval's subscription, removing it...", + "Unable to deserialize the retrieval's subscription, skipping it...", e); - iter.remove(); - continue; } } @@ -365,7 +337,11 @@ class InMemoryBandwidthDao implements IBandwidthDao { */ @Override public List getBandwidthSubscriptions() { - return clone(bandwidthSubscriptions); + List results = Lists.newArrayList(); + for (BandwidthSubscription subscription : bandwidthSubscriptions) { + results.add(subscription.copy()); + } + return results; } /** @@ -374,18 +350,16 @@ class InMemoryBandwidthDao implements IBandwidthDao { @Override public List getBandwidthSubscriptions( String provider, String dataSetName, Calendar baseReferenceTime) { - List bandwidthSubscriptions = getBandwidthSubscriptions(); + List bandwidthSubscriptions = Lists + .newArrayList(); - for (Iterator iter = bandwidthSubscriptions - .iterator(); iter.hasNext();) { - BandwidthSubscription current = iter.next(); + for (BandwidthSubscription current : this.bandwidthSubscriptions) { if (provider.equals(current.getProvider()) && dataSetName.equals(current.getDataSetName()) && baseReferenceTime.getTimeInMillis() == current .getBaseReferenceTime().getTimeInMillis()) { - continue; + bandwidthSubscriptions.add(current.copy()); } - iter.remove(); } return bandwidthSubscriptions; @@ -428,17 +402,13 @@ class InMemoryBandwidthDao implements IBandwidthDao { @Override public List querySubscriptionRetrievals( long subscriptionId) { - ArrayList clone = clone(bandwidthAllocations); - List results = new ArrayList( - bandwidthAllocations.size()); + List results = new ArrayList(); - for (Iterator iter = clone.iterator(); iter - .hasNext();) { - BandwidthAllocation current = iter.next(); + for (BandwidthAllocation current : bandwidthAllocations) { if (current instanceof SubscriptionRetrieval) { - if (((SubscriptionRetrieval) current) - .getBandwidthSubscription().getId() == subscriptionId) { - results.add((SubscriptionRetrieval) current); + final SubscriptionRetrieval subscriptionRetrieval = (SubscriptionRetrieval) current; + if (subscriptionRetrieval.getBandwidthSubscription().getId() == subscriptionId) { + results.add(subscriptionRetrieval.copy()); } } } @@ -567,14 +537,27 @@ class InMemoryBandwidthDao implements IBandwidthDao { public SortedSet getSubscriptionRetrievals( String provider, String dataSetName, RetrievalStatus status) { - final List subscriptionRetrievals = getSubscriptionRetrievals( - provider, dataSetName); + List results = Lists.newArrayList(); + + for (BandwidthAllocation current : bandwidthAllocations) { + if (current instanceof SubscriptionRetrieval) { + Subscription subscription; + try { + final SubscriptionRetrieval subscriptionRetrieval = (SubscriptionRetrieval) current; + subscription = subscriptionRetrieval.getSubscription(); + if (provider.equals(subscription.getProvider()) + && dataSetName + .equals(subscription.getDataSetName()) + && status.equals(subscriptionRetrieval.getStatus())) { + results.add(subscriptionRetrieval.copy()); + } + } catch (SerializationException e) { + statusHandler + .handle(Priority.PROBLEM, + "Unable to deserialize the retrieval's subscription, skipping it...", + e); + } - for (Iterator iter = subscriptionRetrievals - .iterator(); iter.hasNext();) { - SubscriptionRetrieval subRetrieval = iter.next(); - if (!status.equals(subRetrieval.getStatus())) { - iter.remove(); } } @@ -587,7 +570,7 @@ class InMemoryBandwidthDao implements IBandwidthDao { } }); - treeSet.addAll(subscriptionRetrievals); + treeSet.addAll(results); return treeSet; } @@ -600,19 +583,50 @@ class InMemoryBandwidthDao implements IBandwidthDao { String provider, String dataSetName, RetrievalStatus status, Date earliestDate, Date latestDate) { - SortedSet results = getSubscriptionRetrievals( - provider, dataSetName, status); + List results = Lists.newArrayList(); + + for (BandwidthAllocation current : bandwidthAllocations) { + if (current instanceof SubscriptionRetrieval) { + Subscription subscription; + try { + final SubscriptionRetrieval subscriptionRetrieval = (SubscriptionRetrieval) current; + subscription = subscriptionRetrieval.getSubscription(); + + final Date subRetrievalStartTime = subscriptionRetrieval + .getStartTime().getTime(); + final boolean withinTimeLimits = !(earliestDate + .after(subRetrievalStartTime) || latestDate + .before(subRetrievalStartTime)); + + if (provider.equals(subscription.getProvider()) + && dataSetName + .equals(subscription.getDataSetName()) + && status.equals(subscriptionRetrieval.getStatus()) + && withinTimeLimits) { + results.add(subscriptionRetrieval.copy()); + } + } catch (SerializationException e) { + statusHandler + .handle(Priority.PROBLEM, + "Unable to deserialize the retrieval's subscription, skipping it...", + e); + } - for (Iterator iter = results.iterator(); iter - .hasNext();) { - SubscriptionRetrieval subRetrieval = iter.next(); - if (earliestDate.after(subRetrieval.getStartTime().getTime()) - || latestDate.before(subRetrieval.getStartTime().getTime())) { - iter.remove(); } } - return results; + final TreeSet treeSet = Sets + .newTreeSet(new Comparator() { + @Override + public int compare(SubscriptionRetrieval o1, + SubscriptionRetrieval o2) { + return o1.getStartTime().compareTo(o2.getStartTime()); + } + }); + + treeSet.addAll(results); + + return treeSet; } /** @@ -620,15 +634,12 @@ class InMemoryBandwidthDao implements IBandwidthDao { */ @Override public List getSubscriptionRetrievals() { - ArrayList clone = clone(bandwidthAllocations); List results = new ArrayList( bandwidthAllocations.size()); - for (Iterator iter = clone.iterator(); iter - .hasNext();) { - BandwidthAllocation current = iter.next(); + for (BandwidthAllocation current : bandwidthAllocations) { if (current instanceof SubscriptionRetrieval) { - results.add((SubscriptionRetrieval) current); + results.add(((SubscriptionRetrieval) current).copy()); } } return results; @@ -640,15 +651,17 @@ class InMemoryBandwidthDao implements IBandwidthDao { @Override public List getBandwidthAllocationsForNetworkAndBucketStartTime( Network network, long bucketStartTime) { - final List bandwidthAllocations = getBandwidthAllocations(network); - for (Iterator iter = bandwidthAllocations - .iterator(); iter.hasNext();) { - final BandwidthAllocation allocation = iter.next(); - if (allocation.getBandwidthBucket() != bucketStartTime) { - iter.remove(); + + List allocations = new ArrayList(); + + for (BandwidthAllocation current : bandwidthAllocations) { + if (current.getNetwork() == network + && current.getBandwidthBucket() == bucketStartTime) { + allocations.add(current.copy()); } } - return bandwidthAllocations; + + return allocations; } } diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/InMemoryBandwidthManager.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/InMemoryBandwidthManager.java index 152dddaed9..09fc66e02f 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/InMemoryBandwidthManager.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/InMemoryBandwidthManager.java @@ -50,6 +50,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthDaoUtil; * Feb 27, 2013 1644 djohnson Schedule SBN subscriptions. * Apr 16, 2013 1906 djohnson Implements RegistryInitializedListener. * Jun 25, 2013 2106 djohnson init() now takes a {@link RetrievalManager} as well. + * Jul 09, 2013 2106 djohnson Add shutdownInternal(). * * * @@ -61,8 +62,9 @@ class InMemoryBandwidthManager extends BandwidthManager { private static final IUFStatusHandler statusHandler = UFStatus .getHandler(InMemoryBandwidthManager.class); - // TODO DPJ: The NCF and WFO bandwidth managers probably each need an - // in-memory version + // NOTE: NEVER add the bandwidth-datadelivery-eventbus.xml file to this + // array, in-memory versions should not coordinate with the event bus in any + // fashion public static final String[] IN_MEMORY_BANDWIDTH_MANAGER_FILES = new String[] { JarUtil.getResResourcePath("/spring/bandwidth-datadelivery-inmemory-impl.xml"), JarUtil.getResResourcePath("/spring/bandwidth-datadelivery.xml"), @@ -145,4 +147,12 @@ class InMemoryBandwidthManager extends BandwidthManager { return scheduleSubscriptions(subscriptions); } + /** + * {@inheritDoc} + */ + @Override + protected void shutdownInternal() { + // Nothing to do for in-memory version + } + } diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/WfoBandwidthManagerCreator.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/WfoBandwidthManagerCreator.java index a9734a9d79..30131448a5 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/WfoBandwidthManagerCreator.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/WfoBandwidthManagerCreator.java @@ -25,6 +25,8 @@ import java.util.Set; import com.raytheon.uf.common.datadelivery.bandwidth.IBandwidthService; import com.raytheon.uf.common.datadelivery.bandwidth.IProposeScheduleResponse; import com.raytheon.uf.common.datadelivery.registry.Subscription; +import com.raytheon.uf.common.datadelivery.registry.handlers.IDataSetMetaDataHandler; +import com.raytheon.uf.common.datadelivery.registry.handlers.ISubscriptionHandler; import com.raytheon.uf.common.serialization.SerializationException; import com.raytheon.uf.common.util.JarUtil; import com.raytheon.uf.edex.datadelivery.bandwidth.EdexBandwidthContextFactory.IEdexBandwidthManagerCreator; @@ -46,6 +48,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthDaoUtil; * Feb 27, 2013 1644 djohnson Schedule SBN subscriptions by routing to the NCF bandwidth manager. * Mar 11, 2013 1645 djohnson Add missing Spring file. * May 15, 2013 2000 djohnson Include daos. + * Jul 10, 2013 2106 djohnson Dependency inject registry handlers. * * * @@ -57,7 +60,7 @@ public class WfoBandwidthManagerCreator implements IEdexBandwidthManagerCreator /** * WFO {@link BandwidthManager} implementation. */ - static class WfoBandwidthManager extends BandwidthManager { + static class WfoBandwidthManager extends EdexBandwidthManager { private static final String[] WFO_BANDWIDTH_MANAGER_FILES = new String[] { JarUtil.getResResourcePath("/spring/bandwidth-datadelivery-wfo-edex-impl.xml"), @@ -81,8 +84,11 @@ public class WfoBandwidthManagerCreator implements IEdexBandwidthManagerCreator */ public WfoBandwidthManager(IBandwidthDbInit dbInit, IBandwidthDao bandwidthDao, RetrievalManager retrievalManager, - BandwidthDaoUtil bandwidthDaoUtil) { - super(dbInit, bandwidthDao, retrievalManager, bandwidthDaoUtil); + BandwidthDaoUtil bandwidthDaoUtil, + IDataSetMetaDataHandler dataSetMetaDataHandler, + ISubscriptionHandler subscriptionHandler) { + super(dbInit, bandwidthDao, retrievalManager, bandwidthDaoUtil, + dataSetMetaDataHandler, subscriptionHandler); } @Override @@ -131,9 +137,11 @@ public class WfoBandwidthManagerCreator implements IEdexBandwidthManagerCreator @Override public IBandwidthManager getBandwidthManager(IBandwidthDbInit dbInit, IBandwidthDao bandwidthDao, RetrievalManager retrievalManager, - BandwidthDaoUtil bandwidthDaoUtil) { + BandwidthDaoUtil bandwidthDaoUtil, + IDataSetMetaDataHandler dataSetMetaDataHandler, + ISubscriptionHandler subscriptionHandler) { return new WfoBandwidthManager(dbInit, bandwidthDao, retrievalManager, - bandwidthDaoUtil); + bandwidthDaoUtil, dataSetMetaDataHandler, subscriptionHandler); } } diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/notification/BandwidthAsyncEventBusFactory.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/notification/BandwidthAsyncEventBusFactory.java index 7765c57730..dcf2334077 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/notification/BandwidthAsyncEventBusFactory.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/notification/BandwidthAsyncEventBusFactory.java @@ -25,9 +25,7 @@ import java.util.concurrent.Executors; import com.google.common.eventbus.AsyncEventBus; import com.google.common.eventbus.EventBus; -import com.raytheon.uf.common.status.IUFStatusHandler; import com.raytheon.uf.common.status.UFStatus; -import com.raytheon.uf.edex.core.EDEXUtil; /** * Creates asynchronous Google event buses. @@ -41,6 +39,7 @@ import com.raytheon.uf.edex.core.EDEXUtil; * Dec 11, 2012 1286 djohnson Initial creation * Feb 06, 2013 1543 djohnson Changes to correspond with EventBus changes. * May 28, 2013 1650 djohnson Changes to match functionality in general event bus handling. + * Jul 09, 2013 2106 djohnson No Spring required to get thread pool sizes, remove subscriptionBus. * * * @@ -48,31 +47,25 @@ import com.raytheon.uf.edex.core.EDEXUtil; * @version 1.0 */ public class BandwidthAsyncEventBusFactory implements BandwidthEventBusFactory { - private static final IUFStatusHandler statusHandler = UFStatus - .getHandler(BandwidthAsyncEventBusFactory.class); - private final AsyncEventBus dataSetBus; - private final AsyncEventBus subscriptionBus; - private final AsyncEventBus retrievalBus; public BandwidthAsyncEventBusFactory() { - BandwidthEventBusConfig config = (BandwidthEventBusConfig) EDEXUtil - .getESBComponent("BandwidthEventBusConfig"); - // If no bean was defined, use the defaults defined in the - // class. - if (config == null) { - statusHandler - .info("No BandwidthEventBusConfig defined. Using defaults."); - config = new BandwidthEventBusConfig(); - } - dataSetBus = new AsyncEventBus(Executors.newFixedThreadPool(config - .getDataSetMetaDataPoolSize())); - subscriptionBus = new AsyncEventBus(Executors.newFixedThreadPool(config - .getSubscriptionPoolSize())); - retrievalBus = new AsyncEventBus(Executors.newFixedThreadPool(config - .getRetrievalPoolSize())); + BandwidthEventBusConfig config = new BandwidthEventBusConfig(); + + final int dataSetMetaDataPoolSize = config.getDataSetMetaDataPoolSize(); + final int retrievalPoolSize = config.getRetrievalPoolSize(); + + UFStatus.getHandler(BandwidthAsyncEventBusFactory.class) + .info(String + .format("Creating event bus with dataSetMetaDataPoolSize [%s] retrievalPoolSize [%s].", + dataSetMetaDataPoolSize, retrievalPoolSize)); + + dataSetBus = new AsyncEventBus( + Executors.newFixedThreadPool(dataSetMetaDataPoolSize)); + retrievalBus = new AsyncEventBus( + Executors.newFixedThreadPool(retrievalPoolSize)); } /** @@ -83,14 +76,6 @@ public class BandwidthAsyncEventBusFactory implements BandwidthEventBusFactory { return dataSetBus; } - /** - * {@inheritDoc} - */ - @Override - public EventBus getSubscriptionBus() { - return subscriptionBus; - } - /** * {@inheritDoc} */ @@ -104,8 +89,7 @@ public class BandwidthAsyncEventBusFactory implements BandwidthEventBusFactory { */ @Override public List getEventBuses() { - return Arrays. asList(dataSetBus, retrievalBus, - subscriptionBus); + return Arrays. asList(dataSetBus, retrievalBus); } } diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/notification/BandwidthEventBusConfig.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/notification/BandwidthEventBusConfig.java index 1e85c6098b..27bddeddcc 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/notification/BandwidthEventBusConfig.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/notification/BandwidthEventBusConfig.java @@ -10,6 +10,7 @@ package com.raytheon.uf.edex.datadelivery.bandwidth.notification; * Date Ticket# Engineer Description * ------------ ---------- ----------- -------------------------- * Jul 3, 2012 0726 jspinks Initial creation + * Jul 09, 2013 2106 djohnson No Spring required to get thread pool sizes, remove subscriptionBus. * * * @@ -19,11 +20,11 @@ package com.raytheon.uf.edex.datadelivery.bandwidth.notification; public class BandwidthEventBusConfig { // Set reasonable default values - private int dataSetMetaDataPoolSize = 2; + private static final int dataSetMetaDataPoolSize = Integer.getInteger( + "bandwidth.dataSetMetaDataPoolSize", 2); - private int retrievalPoolSize = 3; - - private int subscriptionPoolSize = 2; + private static final int retrievalPoolSize = Integer.getInteger( + "bandwidth.retrievalPoolSize", 3); /** * Get attribute dataSetMetaDataPoolSize. @@ -42,43 +43,4 @@ public class BandwidthEventBusConfig { public int getRetrievalPoolSize() { return retrievalPoolSize; } - - /** - * Get attribute subscriptionPoolSize. - * - * @return The value of attribute subscriptionPoolSize. - */ - public int getSubscriptionPoolSize() { - return subscriptionPoolSize; - } - - /** - * Set the dataSetMetaDataPoolSize. - * - * @param dataSetMetaDataPoolSize - * The value to set attribute dataSetMetaDataPoolSize to. - */ - public void setDataSetMetaDataPoolSize(int dataSetMetaDataPoolSize) { - this.dataSetMetaDataPoolSize = dataSetMetaDataPoolSize; - } - - /** - * Set the retrievalPoolSize. - * - * @param retrievalPoolSize - * The value to set attribute retrievalPoolSize to. - */ - public void setRetrievalPoolSize(int retrievalPoolSize) { - this.retrievalPoolSize = retrievalPoolSize; - } - - /** - * Set the subscriptionPoolSize. - * - * @param subscriptionPoolSize - * The value to set attribute subscriptionPoolSize to. - */ - public void setSubscriptionPoolSize(int subscriptionPoolSize) { - this.subscriptionPoolSize = subscriptionPoolSize; - } } diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/notification/BandwidthEventBusFactory.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/notification/BandwidthEventBusFactory.java index 14c248751c..5cbfe13aa9 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/notification/BandwidthEventBusFactory.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/notification/BandwidthEventBusFactory.java @@ -33,6 +33,7 @@ import com.raytheon.uf.edex.event.GoogleEventBusFactory; * ------------ ---------- ----------- -------------------------- * Dec 11, 2012 djohnson Initial creation * May 28, 2013 1650 djohnson Returns the event buses required by extending GoogleEventBusFactory. + * Jul 09, 2013 2106 djohnson Remove subscriptionBus. * * * @@ -47,11 +48,6 @@ interface BandwidthEventBusFactory extends GoogleEventBusFactory { */ EventBus getDataSetBus(); - /** - * Get the subscription bus. - */ - EventBus getSubscriptionBus(); - /** * Get the retrieval bus. */ diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/notification/EdexBandwidthEventBusHandler.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/notification/EdexBandwidthEventBusHandler.java index a0d314f1c8..3148bb868a 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/notification/EdexBandwidthEventBusHandler.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/notification/EdexBandwidthEventBusHandler.java @@ -20,9 +20,6 @@ package com.raytheon.uf.edex.datadelivery.bandwidth.notification; import com.raytheon.uf.common.datadelivery.registry.DataSetMetaData; -import com.raytheon.uf.common.datadelivery.registry.Subscription; -import com.raytheon.uf.common.registry.event.RemoveRegistryEvent; -import com.raytheon.uf.edex.datadelivery.bandwidth.dao.SubscriptionRetrieval; import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.SubscriptionRetrievalFulfilled; import com.raytheon.uf.edex.event.BaseEdexEventBusHandler; @@ -36,6 +33,7 @@ import com.raytheon.uf.edex.event.BaseEdexEventBusHandler; * Date Ticket# Engineer Description * ------------ ---------- ----------- -------------------------- * May 28, 2013 1650 djohnson Extracted from {@link BandwidthEventBus}. + * Jul 09, 2013 2106 djohnson Remove subscriptionBus. * * * @@ -48,8 +46,6 @@ public class EdexBandwidthEventBusHandler extends private final com.google.common.eventbus.EventBus dataSetBus; - private final com.google.common.eventbus.EventBus subscriptionBus; - private final com.google.common.eventbus.EventBus retrievalBus; /** @@ -68,7 +64,6 @@ public class EdexBandwidthEventBusHandler extends EdexBandwidthEventBusHandler(BandwidthEventBusFactory eventBusFactory) { super(eventBusFactory); this.dataSetBus = eventBusFactory.getDataSetBus(); - this.subscriptionBus = eventBusFactory.getSubscriptionBus(); this.retrievalBus = eventBusFactory.getRetrievalBus(); } @@ -78,16 +73,10 @@ public class EdexBandwidthEventBusHandler extends @Override protected void publishInternal(Object object) { - if (object instanceof SubscriptionRetrieval) { + if (object instanceof SubscriptionRetrievalFulfilled) { retrievalBus.post(object); - } else if (object instanceof SubscriptionRetrievalFulfilled) { - subscriptionBus.post(object); } else if (object instanceof DataSetMetaData) { dataSetBus.post(object); - } else if (object instanceof Subscription) { - subscriptionBus.post(object); - } else if (object instanceof RemoveRegistryEvent) { - subscriptionBus.post(object); } else { throw new IllegalArgumentException("Object type [" + object.getClass().getName() diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/processing/Processor.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/processing/Processor.java deleted file mode 100644 index e619043747..0000000000 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/processing/Processor.java +++ /dev/null @@ -1,90 +0,0 @@ -package com.raytheon.uf.edex.datadelivery.bandwidth.processing; - -import java.util.ArrayList; - -import com.raytheon.uf.common.datadelivery.registry.Provider; -import com.raytheon.uf.common.datadelivery.registry.Subscription; -import com.raytheon.uf.common.datadelivery.registry.SubscriptionBundle; -import com.raytheon.uf.common.datadelivery.registry.handlers.DataDeliveryHandlers; -import com.raytheon.uf.common.registry.handler.RegistryHandlerException; -import com.raytheon.uf.common.status.IUFStatusHandler; -import com.raytheon.uf.common.status.UFStatus; -import com.raytheon.uf.common.status.UFStatus.Priority; -import com.raytheon.uf.edex.datadelivery.bandwidth.interfaces.IProcessSubscription; - -/** - * Process Available Subscriptions for bundling - * - *
- * 
- * SOFTWARE HISTORY
- * Date         Ticket#    Engineer    Description
- * ------------ ---------- ----------- --------------------------
- * Feb 07, 2012            dhladky     Initial creation
- * Jun 21, 2012 736        djohnson    Change OPERATION_STATUS to OperationStatus.
- * Aug 20, 2012 0743       djohnson    Finish making registry type-safe.
- * Oct 05, 2012 1241       djohnson    Replace RegistryManager calls with registry handler calls.
- * 
- * 
- * - * @author dhladky - * @version 1.0 - */ - -public class Processor implements IProcessSubscription { - - private static final IUFStatusHandler statusHandler = UFStatus - .getHandler(Processor.class); - - public Processor() { - - } - - @Override - public ArrayList process( - ArrayList subscriptions) { - // either a new subscription/adhoc or all subscriptions for a given - // dataset - - ArrayList bundles = null; - - // TODO: When we start aggregating the bundles will matter - if (subscriptions != null) { - bundles = new ArrayList(subscriptions.size()); - - for (Subscription sub : subscriptions) { - statusHandler.info("Processing Subscription NAME: " - + sub.getDescription() + " DATASET: " - + sub.getDataSetName()); - SubscriptionBundle bundle = new SubscriptionBundle(); - Provider provider = getProvider(sub.getProvider()); - - bundle.setBundleId(sub.getSubscriptionId()); - bundle.setPriority(1); - bundle.setProvider(provider); - bundle.setConnection(provider.getConnection()); - bundle.setSubscription(sub); - // when aggregated set source subscriptions to bundle - - bundles.add(bundle); - } - } else { - statusHandler.info("No mature subscriptions available."); - } - - return bundles; - } - - public Provider getProvider(String providerName) { - - try { - return DataDeliveryHandlers.getProviderHandler().getByName( - providerName); - } catch (RegistryHandlerException e) { - statusHandler.handle(Priority.PROBLEM, - "Unable to look up the provider by its name.", e); - return null; - } - } - -} diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/RetrievalManager.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/RetrievalManager.java index 9bdb2eaf6a..c8692f74e4 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/RetrievalManager.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/RetrievalManager.java @@ -9,7 +9,6 @@ import java.util.TreeMap; import com.google.common.eventbus.Subscribe; import com.raytheon.uf.common.datadelivery.registry.Network; -import com.raytheon.uf.common.event.EventBus; import com.raytheon.uf.common.status.IUFStatusHandler; import com.raytheon.uf.common.status.UFStatus; import com.raytheon.uf.common.time.util.TimeUtil; @@ -38,6 +37,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.RetrievalManagerNotifyEvent; * 3/13/2013 1802 bphillip Moved event bus registration from post-construct to spring static method call * Jun 13, 2013 2095 djohnson Can schedule any subclass of BandwidthAllocation. * Jun 25, 2013 2106 djohnson Copy state from another instance, add ability to check for proposed bandwidth throughput changes. + * Jul 09, 2013 2106 djohnson Only needs to unregister from the EventBus when used in an EDEX instance, so handled in EdexBandwidthManager. * * * @@ -218,7 +218,6 @@ public class RetrievalManager { * Shutdown the retrieval manager. */ public void shutdown() { - EventBus.unregister(this); // From this point forward, only return a poison pill for this retrieval // manager, which will cause threads attempting to receive bandwidth // allocations to die diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/SubscriptionRetrievalAgent.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/SubscriptionRetrievalAgent.java index 806206ba1d..0558462ef4 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/SubscriptionRetrievalAgent.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/SubscriptionRetrievalAgent.java @@ -13,7 +13,7 @@ import com.raytheon.uf.common.datadelivery.registry.Provider; import com.raytheon.uf.common.datadelivery.registry.ProviderType; import com.raytheon.uf.common.datadelivery.registry.Subscription; import com.raytheon.uf.common.datadelivery.registry.SubscriptionBundle; -import com.raytheon.uf.common.datadelivery.registry.handlers.DataDeliveryHandlers; +import com.raytheon.uf.common.datadelivery.registry.handlers.IProviderHandler; import com.raytheon.uf.common.datadelivery.retrieval.xml.Retrieval; import com.raytheon.uf.common.event.EventBus; import com.raytheon.uf.common.registry.handler.RegistryHandlerException; @@ -49,6 +49,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord; * Jan 30, 2013 1543 djohnson Should not implement IRetrievalHandler. * Feb 05, 2013 1580 mpduff EventBus refactor. * Jun 24, 2013 2106 djohnson Set actual start time when sending to retrieval rather than overwrite scheduled start. + * Jul 09, 2013 2106 djohnson Dependency inject registry handlers. * * * @@ -68,14 +69,17 @@ public class SubscriptionRetrievalAgent extends private final IRetrievalDao retrievalDao; + private final IProviderHandler providerHandler; + public SubscriptionRetrievalAgent(Network network, String destinationUri, final Object notifier, int defaultPriority, RetrievalManager retrievalManager, IBandwidthDao bandwidthDao, - IRetrievalDao retrievalDao) { + IRetrievalDao retrievalDao, IProviderHandler providerHandler) { super(network, destinationUri, notifier, retrievalManager); this.defaultPriority = defaultPriority; this.bandwidthDao = bandwidthDao; this.retrievalDao = retrievalDao; + this.providerHandler = providerHandler; } @Override @@ -246,10 +250,9 @@ public class SubscriptionRetrievalAgent extends return retrievalsGenerated; } - private static Provider getProvider(String providerName) { + private Provider getProvider(String providerName) { try { - return DataDeliveryHandlers.getProviderHandler().getByName( - providerName); + return providerHandler.getByName(providerName); } catch (RegistryHandlerException e) { statusHandler.handle(Priority.PROBLEM, "Unable to retrieve provider by name.", e); diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/separator/SubscriptionBundleSeparator.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/separator/SubscriptionBundleSeparator.java deleted file mode 100644 index 68f1c193de..0000000000 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/separator/SubscriptionBundleSeparator.java +++ /dev/null @@ -1,122 +0,0 @@ -package com.raytheon.uf.edex.datadelivery.bandwidth.separator; - -/** - * This software was developed and / or modified by Raytheon Company, - * pursuant to Contract DG133W-05-CQ-1067 with the US Government. - * - * U.S. EXPORT CONTROLLED TECHNICAL DATA - * This software product contains export-restricted data whose - * export/transfer/disclosure is restricted by U.S. law. Dissemination - * to non-U.S. persons whether in the United States or abroad requires - * an export license or other authorization. - * - * Contractor Name: Raytheon Company - * Contractor Address: 6825 Pine Street, Suite 340 - * Mail Stop B8 - * Omaha, NE 68106 - * 402.291.0100 - * - * See the AWIPS II Master Rights File ("Master Rights File.pdf") for - * further licensing information. - **/ - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -import com.raytheon.edex.esb.Headers; -import com.raytheon.uf.common.datadelivery.registry.SubscriptionBundle; -import com.raytheon.uf.common.status.IUFStatusHandler; -import com.raytheon.uf.common.status.UFStatus; -import com.raytheon.uf.common.status.UFStatus.Priority; - -/** - * Separate Subscription bundles in Queue - * - *
- * 
- * SOFTWARE HISTORY
- * Date         Ticket#    Engineer    Description
- * ------------ ---------- ----------- --------------------------
- * Feb 07, 2012            dhladky     Initial creation
- * 
- * 
- * - * @author dhladky - * @version 1.0 - */ - -public class SubscriptionBundleSeparator implements - Iterator { - - private static final transient IUFStatusHandler statusHandler = UFStatus - .getHandler(SubscriptionBundleSeparator.class); - - private List reports = null; - - private int currentCount = -1; - - public static SubscriptionBundleSeparator separate( - ArrayList data, Headers headers) - throws Exception { - SubscriptionBundleSeparator sbs = new SubscriptionBundleSeparator(); - sbs.setData(data, headers); - return sbs; - } - - @Override - public boolean hasNext() { - - boolean answer = ((reports != null) && (reports.size() > 0) && (currentCount < reports - .size())); - - if (!answer) { - reports.clear(); - reports = null; - } - - return answer; - } - - @Override - public SubscriptionBundle next() { - return reports.get(currentCount++); - } - - public void setData(ArrayList sbs, Headers headers) { - - try { - if (sbs != null) { - reports = new ArrayList(); - separate(sbs); - } - - } catch (Exception e) { - statusHandler.handle(Priority.ERROR, e.getLocalizedMessage(), e); - } - - if ((reports != null) && (reports.size() > 0)) { - currentCount = 0; - } else { - System.err.println("No bundles found in data."); - } - } - - private void separate(ArrayList sbs) { - - for (int i = 0; i < sbs.size(); i++) { - try { - reports.add(sbs.get(i)); - } catch (Exception e) { - statusHandler.handle(Priority.PROBLEM, e.getLocalizedMessage(), - e); - } - } - } - - @Override - public void remove() { - - } - -} diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/util/FindActiveSubscriptionsForRoute.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/util/FindActiveSubscriptionsForRoute.java index 90ad8eee01..4e202f9fbb 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/util/FindActiveSubscriptionsForRoute.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/util/FindActiveSubscriptionsForRoute.java @@ -25,7 +25,7 @@ import java.util.Set; import com.google.common.collect.Sets; import com.raytheon.uf.common.datadelivery.registry.Network; import com.raytheon.uf.common.datadelivery.registry.Subscription; -import com.raytheon.uf.common.datadelivery.registry.handlers.DataDeliveryHandlers; +import com.raytheon.uf.common.datadelivery.registry.handlers.ISubscriptionHandler; import com.raytheon.uf.common.registry.handler.RegistryHandlerException; import com.raytheon.uf.edex.datadelivery.bandwidth.hibernate.IFindSubscriptionsForScheduling; @@ -39,6 +39,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.hibernate.IFindSubscriptionsF * Date Ticket# Engineer Description * ------------ ---------- ----------- -------------------------- * Feb 18, 2013 1543 djohnson Initial creation + * Jul 09, 2013 2106 djohnson Dependency inject registry handlers. * * * @@ -49,25 +50,35 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.hibernate.IFindSubscriptionsF public class FindActiveSubscriptionsForRoute implements IFindSubscriptionsForScheduling { + private final ISubscriptionHandler subscriptionHandler; + private final Network[] routes; + /** * Find active subscriptions for a specific route. * + * @param subscriptionHandler + * the subscription handler * @param route * the route */ - public FindActiveSubscriptionsForRoute(Network route) { - this(new Network[] { route }); + public FindActiveSubscriptionsForRoute( + ISubscriptionHandler subscriptionHandler, Network route) { + this(subscriptionHandler, new Network[] { route }); } /** * Find active subscriptions for specific routes. * + * @param subscriptionHandler + * the subscription handler * @param routes * the routes */ - public FindActiveSubscriptionsForRoute(Network... routes) { + public FindActiveSubscriptionsForRoute( + ISubscriptionHandler subscriptionHandler, Network... routes) { + this.subscriptionHandler = subscriptionHandler; this.routes = routes; } @@ -77,8 +88,8 @@ public class FindActiveSubscriptionsForRoute implements @Override public Set findSubscriptionsToSchedule() throws RegistryHandlerException { - final List activeForRoutes = DataDeliveryHandlers - .getSubscriptionHandler().getActiveForRoutes(routes); + final List activeForRoutes = subscriptionHandler + .getActiveForRoutes(routes); return Sets.newHashSet(activeForRoutes); } diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/RetrievalGenerationHandler.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/RetrievalGenerationHandler.java deleted file mode 100644 index 3501ff1145..0000000000 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/src/com/raytheon/uf/edex/datadelivery/retrieval/RetrievalGenerationHandler.java +++ /dev/null @@ -1,165 +0,0 @@ -package com.raytheon.uf.edex.datadelivery.retrieval; - -/** - * This software was developed and / or modified by Raytheon Company, - * pursuant to Contract DG133W-05-CQ-1067 with the US Government. - * - * U.S. EXPORT CONTROLLED TECHNICAL DATA - * This software product contains export-restricted data whose - * export/transfer/disclosure is restricted by U.S. law. Dissemination - * to non-U.S. persons whether in the United States or abroad requires - * an export license or other authorization. - * - * Contractor Name: Raytheon Company - * Contractor Address: 6825 Pine Street, Suite 340 - * Mail Stop B8 - * Omaha, NE 68106 - * 402.291.0100 - * - * See the AWIPS II Master Rights File ("Master Rights File.pdf") for - * further licensing information. - **/ - -import java.util.ArrayList; -import java.util.Calendar; -import java.util.Date; -import java.util.List; - -import com.raytheon.uf.common.datadelivery.registry.ProviderType; -import com.raytheon.uf.common.datadelivery.registry.SubscriptionBundle; -import com.raytheon.uf.common.datadelivery.retrieval.xml.Retrieval; -import com.raytheon.uf.common.serialization.SerializationUtil; -import com.raytheon.uf.common.status.IUFStatusHandler; -import com.raytheon.uf.common.status.UFStatus; -import com.raytheon.uf.common.util.CollectionUtil; -import com.raytheon.uf.edex.datadelivery.retrieval.db.IRetrievalDao; -import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord; -import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord.State; - -/** - * Handle Retrieval creation - * - *
- * 
- * SOFTWARE HISTORY
- * Date         Ticket#    Engineer    Description
- * ------------ ---------- ----------- --------------------------
- * Apr 09, 2012            dhladky      Initial creation
- * Jul 25, 2012 955        djohnson     Use {@link ServiceTypeFactory}.
- * Oct 10, 2012 0726       djohnson     Pass -1 for subscription retrieval id, since not bandwidth managed.
- * 
- * 
- * - * @author dhladky - * @version 1.0 - */ -public class RetrievalGenerationHandler implements IGenerateRetrieval { - private static final IUFStatusHandler statusHandler = UFStatus - .getHandler(RetrievalGenerationHandler.class); - - private final IRetrievalDao retrievalDao; - - public RetrievalGenerationHandler(IRetrievalDao retrievalDao) { - this.retrievalDao = retrievalDao; - } - - @Override - public List generateRetrieval(List bundles) { - - if (bundles != null) { - ArrayList names = new ArrayList(bundles.size()); - - for (SubscriptionBundle bundle : bundles) { - statusHandler.info("Bundle: " + bundle.getBundleId() - + " Create Retrieval Messages...."); - - // process the bundle into a retrieval - RetrievalGenerator rg = ServiceTypeFactory - .retrieveServiceFactory(bundle.getProvider()) - .getRetrievalGenerator(); - - final String subscriptionName = bundle.getSubscription() - .getName(); - statusHandler.info("Subcription: " + subscriptionName - + " Being Processed for Retrieval..."); - - List retrievals = rg.buildRetrieval(bundle); - - if (!CollectionUtil.isNullOrEmpty(retrievals)) { - String owner = bundle.getSubscription().getOwner(); - String provider = bundle.getSubscription().getProvider(); - int priority = 3; - Integer bundlePriority = bundle.getPriority(); - if (bundlePriority != null) { - priority = bundlePriority.intValue(); - } - Date insertTime = Calendar.getInstance().getTime(); - - List requestRecords = new ArrayList( - retrievals.size()); - long cumultTime1 = 0; - int index = 0; - final ProviderType providerType = bundle.getProvider() - .getProviderType(bundle.getDataType()); - final String plugin = providerType.getPlugin(); - for (Retrieval retrieval : retrievals) { - - RetrievalRequestRecord rec = new RetrievalRequestRecord( - subscriptionName, index++, -1L); - rec.setOwner(owner); - rec.setPlugin(plugin); - rec.setProvider(provider); - rec.setSubscriptionType(retrieval.getSubscriptionType()); - rec.setNetwork(retrieval.getNetwork()); - rec.setPriority(priority); - rec.setInsertTime(insertTime); - - try { - long t1 = System.currentTimeMillis(); - rec.setRetrieval(SerializationUtil - .transformToThrift(retrieval)); - long t2 = System.currentTimeMillis(); - cumultTime1 += t2 - t1; - rec.setState(State.PENDING); - } catch (Exception e) { - statusHandler.error("Subcription: " - + subscriptionName - + " Failed to serialize request [" - + retrieval + "]", e); - rec.setRetrieval(new byte[0]); - rec.setState(State.FAILED); - } - - requestRecords.add(rec); - } - - statusHandler.info("Cumulative time to serialize " - + requestRecords.size() + " requests: thrift [" - + cumultTime1 + "] ms"); - - try { - long t1 = System.currentTimeMillis(); - retrievalDao.persistAll(requestRecords); - statusHandler.info("Time to persist requests to db [" - + (System.currentTimeMillis() - t1) + "] ms"); - names.add(subscriptionName); - } catch (Exception e) { - statusHandler.warn("Subscription: " + subscriptionName - + " Failed to store to retrievals."); - // this should send notification - } - } else { - statusHandler.warn("Subscription: " + subscriptionName - + " Did not generate any retrieval messages"); - // should this send notification - } - } - return names; - } else { - statusHandler.info("NO VALID SUBCSRIPTIONS NEEDING RETRIEVAL...."); - } - - return null; - - } -} diff --git a/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/AbstractBandwidthManagerIntTest.java b/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/AbstractBandwidthManagerIntTest.java index a68e470c85..5ec0b4aba1 100644 --- a/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/AbstractBandwidthManagerIntTest.java +++ b/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/AbstractBandwidthManagerIntTest.java @@ -74,6 +74,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil; * Mar 28, 2013 1841 djohnson Subscription is now UserSubscription. * Apr 29, 2013 1910 djohnson Always shutdown bandwidth managers in tests. * Jun 03, 2013 2095 djohnson Move getPointDataSet in from subclass. + * Jul 09, 2013 2106 djohnson Add datadelivery handlers, since they are now dependency injected. * * * @@ -82,7 +83,8 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil; */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = { SpringFiles.UNIT_TEST_DB_BEANS_XML, - SpringFiles.EVENTBUS_COMMON_XML, + SpringFiles.EVENTBUS_COMMON_XML, SpringFiles.DATADELIVERY_HANDLERS_XML, + SpringFiles.MEMORY_DATADELIVERY_HANDLERS_XML, SpringFiles.RETRIEVAL_DATADELIVERY_DAOS_XML, SpringFiles.BANDWIDTH_DATADELIVERY_DAOS_XML, SpringFiles.BANDWIDTH_DATADELIVERY_XML, @@ -95,7 +97,7 @@ public abstract class AbstractBandwidthManagerIntTest { protected ApplicationContext context; @Autowired - protected BandwidthManager bandwidthManager; + protected EdexBandwidthManager bandwidthManager; @Autowired protected RetrievalManager retrievalManager; diff --git a/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/BandwidthManagerIntTest.java b/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/BandwidthManagerIntTest.java index 56a17370df..d39e98e92a 100644 --- a/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/BandwidthManagerIntTest.java +++ b/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/BandwidthManagerIntTest.java @@ -38,13 +38,10 @@ import java.util.Arrays; import java.util.Calendar; import java.util.Collections; import java.util.Comparator; -import java.util.ConcurrentModificationException; import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.SortedSet; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import org.junit.Test; @@ -54,7 +51,6 @@ import com.raytheon.uf.common.datadelivery.registry.GriddedDataSetMetaData; import com.raytheon.uf.common.datadelivery.registry.Network; import com.raytheon.uf.common.datadelivery.registry.OpenDapGriddedDataSetMetaData; import com.raytheon.uf.common.datadelivery.registry.OpenDapGriddedDataSetMetaDataFixture; -import com.raytheon.uf.common.datadelivery.registry.ParameterFixture; import com.raytheon.uf.common.datadelivery.registry.PointDataSetMetaData; import com.raytheon.uf.common.datadelivery.registry.PointDataSetMetaDataFixture; import com.raytheon.uf.common.datadelivery.registry.PointTime; @@ -66,7 +62,6 @@ import com.raytheon.uf.common.datadelivery.registry.Time; import com.raytheon.uf.common.datadelivery.registry.handlers.DataDeliveryHandlers; import com.raytheon.uf.common.registry.event.RemoveRegistryEvent; import com.raytheon.uf.common.registry.handler.RegistryHandlerException; -import com.raytheon.uf.common.registry.handler.RegistryObjectHandlersUtil; import com.raytheon.uf.common.serialization.SerializationException; import com.raytheon.uf.common.time.util.ImmutableDate; import com.raytheon.uf.common.time.util.TimeUtil; @@ -76,8 +71,6 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthAllocation; import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthBucket; import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthSubscription; import com.raytheon.uf.edex.datadelivery.bandwidth.dao.SubscriptionRetrieval; -import com.raytheon.uf.edex.datadelivery.bandwidth.notification.BandwidthEventBus; -import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.BandwidthMap; import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalPlan; import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalPlanTest; import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalStatus; @@ -106,6 +99,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.RetrievalManagerNotifyEvent; * Jun 03, 2013 2038 djohnson Add support for point data based subscriptions. * Jun 03, 2013 2095 djohnson Move getPointDataSet to superclass. * Jun 25, 2013 2106 djohnson Set subscription latency, access bucket allocations through RetrievalPlan. + * Jul 09, 2013 2106 djohnson InMemoryBandwidthManager no longer receives updates from the EventBus. * * * @@ -267,8 +261,6 @@ public class BandwidthManagerIntTest extends AbstractWfoBandwidthManagerIntTest public void testDailyProductSubscriptionReceivesTimeAndUrlFromUpdate() throws RegistryHandlerException, ParseException, SerializationException { - RegistryObjectHandlersUtil.initMemory(); - // Store the original subscription Subscription subscription = SiteSubscriptionFixture.INSTANCE.get(); DataDeliveryHandlers.getSubscriptionHandler().store(subscription); @@ -303,8 +295,6 @@ public class BandwidthManagerIntTest extends AbstractWfoBandwidthManagerIntTest @Test public void testDailyProductSubscriptionIsSetToReadyStatus() throws RegistryHandlerException, ParseException { - RegistryObjectHandlersUtil.initMemory(); - // Store the original subscription Subscription subscription = SiteSubscriptionFixture.INSTANCE.get(); subscription.getTime().setCycleTimes(Collections. emptyList()); @@ -338,7 +328,6 @@ public class BandwidthManagerIntTest extends AbstractWfoBandwidthManagerIntTest @Test public void testSubscriptionLatencyIsPlacedOnSubscriptionDao() throws RegistryHandlerException, ParseException { - RegistryObjectHandlersUtil.initMemory(); // Store the original subscription Subscription subscription = SiteSubscriptionFixture.INSTANCE.get(); @@ -715,108 +704,6 @@ public class BandwidthManagerIntTest extends AbstractWfoBandwidthManagerIntTest bandwidthManager.subscriptionUpdated(subscription); } - /** - * Long-running in-memory bandwidth manager proposed schedule operations - * were causing {@link ConcurrentModificationException}s to occur when - * receiving events from the {@link BandwidthEventBus}. - * - * @throws Exception - * on test failure - */ - @Test - public void testInMemoryBandwidthManagerCanReceiveDataSetMetaDataUpdates() - throws Exception { - - Subscription subscription = createSubscriptionThatFillsUpABucket(); - subscription.getTime().setCycleTimes(Arrays.asList(Integer.valueOf(0))); - - bandwidthManager.schedule(subscription); - BandwidthManager bwProposed = null; - try { - bwProposed = bandwidthManager - .startProposedBandwidthManager(BandwidthMap - .load(EdexBandwidthContextFactory - .getBandwidthMapConfig())); - final BandwidthManager proposed = bwProposed; - - final BlockingQueue queue = new ArrayBlockingQueue( - 1); - - final int invocationCount = 10; - final CountDownLatch waitForAllThreadsReadyLatch = new CountDownLatch( - invocationCount * 2); - final CountDownLatch doneLatch = new CountDownLatch( - invocationCount * 2); - for (int i = 0; i < invocationCount; i++) { - final int current = i; - Thread thread = new Thread() { - @Override - public void run() { - try { - // Wait for all threads to check in, then they all - // start - // working at once - waitForAllThreadsReadyLatch.countDown(); - waitForAllThreadsReadyLatch.await(); - proposed.updateGriddedDataSetMetaData(OpenDapGriddedDataSetMetaDataFixture.INSTANCE - .get(current)); - } catch (Exception e) { - queue.offer(e); - } - doneLatch.countDown(); - } - }; - thread.start(); - } - - for (int i = 0; i < invocationCount; i++) { - final int current = i; - Thread thread = new Thread() { - @Override - public void run() { - try { - final Subscription subscription2 = SiteSubscriptionFixture.INSTANCE - .get(current); - subscription2 - .addParameter(ParameterFixture.INSTANCE - .get(1)); - subscription2 - .addParameter(ParameterFixture.INSTANCE - .get(2)); - subscription2 - .addParameter(ParameterFixture.INSTANCE - .get(3)); - subscription2.getTime().setCycleTimes( - Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, - 10, 11, 12, 13, 14, 15, 16, 17)); - subscription2.setLatencyInMinutes(current); - // Wait for all threads to check in, then they all - // start - // working at once - waitForAllThreadsReadyLatch.countDown(); - waitForAllThreadsReadyLatch.await(); - proposed.schedule(subscription2); - } catch (Exception e) { - queue.offer(e); - } - doneLatch.countDown(); - } - }; - thread.start(); - } - - // Wait for all threads to finish - doneLatch.await(); - - final Exception exception = queue.poll(); - if (exception != null) { - throw exception; - } - } finally { - shutdownBandwidthManager(bwProposed); - } - } - /** * Subscriptions that are deleted should have all of their bandwidth * allocations removed deleted. diff --git a/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/IntegrationTestBandwidthContextFactory.java b/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/IntegrationTestBandwidthContextFactory.java index 06718a8455..849bd582ac 100644 --- a/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/IntegrationTestBandwidthContextFactory.java +++ b/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/IntegrationTestBandwidthContextFactory.java @@ -21,6 +21,8 @@ package com.raytheon.uf.edex.datadelivery.bandwidth; import java.io.File; +import com.raytheon.uf.common.datadelivery.registry.handlers.IDataSetMetaDataHandler; +import com.raytheon.uf.common.datadelivery.registry.handlers.ISubscriptionHandler; import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthContextFactory; import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthBucketDao; import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDao; @@ -38,6 +40,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDbInit; * Oct 24, 2012 1286 djohnson Initial creation * Feb 20, 2013 1543 djohnson Pass additional super-class constructor arguments. * Jun 25, 2013 2106 djohnson Add {@link IBandwidthBucketDao}. + * Jul 10, 2013 2106 djohnson Dependency inject registry handlers. * * * @@ -57,14 +60,19 @@ public class IntegrationTestBandwidthContextFactory extends * the creator for the bandwidth manager instance * @param dbInit * the database initializer + * @param dataSetMetaDataHandler + * @param subscriptionHandler */ IntegrationTestBandwidthContextFactory(IBandwidthDao bandwidthDao, IBandwidthBucketDao bandwidthBucketsDao, IEdexBandwidthManagerCreator bandwidthManagerCreator, - IBandwidthDbInit dbInit) { + IBandwidthDbInit dbInit, + IDataSetMetaDataHandler dataSetMetaDataHandler, + ISubscriptionHandler subscriptionHandler) { super(bandwidthDao, bandwidthBucketsDao, new IntegrationTestBandwidthInitializer(), - bandwidthManagerCreator, dbInit); + bandwidthManagerCreator, dbInit, dataSetMetaDataHandler, + subscriptionHandler); } /** @@ -81,9 +89,7 @@ public class IntegrationTestBandwidthContextFactory extends * @return the file */ public static File getIntegrationTestBandwidthMapConfigFile() { - return new IntegrationTestBandwidthContextFactory((IBandwidthDao) null, - (IBandwidthBucketDao) null, - (IEdexBandwidthManagerCreator) null, (IBandwidthDbInit) null) - .getBandwidthMapConfigFile(); + return new IntegrationTestBandwidthContextFactory(null, null, null, + null, null, null).getBandwidthMapConfigFile(); } } diff --git a/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/IntegrationTestWfoBandwidthManager.java b/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/IntegrationTestWfoBandwidthManager.java index 1fd6807ac9..8ed21e3869 100644 --- a/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/IntegrationTestWfoBandwidthManager.java +++ b/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/IntegrationTestWfoBandwidthManager.java @@ -19,6 +19,8 @@ **/ package com.raytheon.uf.edex.datadelivery.bandwidth; +import com.raytheon.uf.common.datadelivery.registry.handlers.IDataSetMetaDataHandler; +import com.raytheon.uf.common.datadelivery.registry.handlers.ISubscriptionHandler; import com.raytheon.uf.common.util.JarUtil; import com.raytheon.uf.common.util.SpringFiles; import com.raytheon.uf.edex.datadelivery.bandwidth.WfoBandwidthManagerCreator.WfoBandwidthManager; @@ -40,6 +42,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthDaoUtil; * Oct 30, 2012 1286 djohnson Initial creation * Feb 27, 2013 1644 djohnson Extends WFO bandwidth manager. * May 15, 2013 2000 djohnson Include daos. + * Jul 10, 2013 2106 djohnson Dependency inject registry handlers. * * * @@ -66,8 +69,11 @@ public class IntegrationTestWfoBandwidthManager extends WfoBandwidthManager { */ public IntegrationTestWfoBandwidthManager(IBandwidthDbInit dbInit, IBandwidthDao bandwidthDao, RetrievalManager retrievalManager, - BandwidthDaoUtil bandwidthDaoUtil) { - super(dbInit, bandwidthDao, retrievalManager, bandwidthDaoUtil); + BandwidthDaoUtil bandwidthDaoUtil, + IDataSetMetaDataHandler dataSetMetaDataHandler, + ISubscriptionHandler subscriptionHandler) { + super(dbInit, bandwidthDao, retrievalManager, bandwidthDaoUtil, + dataSetMetaDataHandler, subscriptionHandler); } /** diff --git a/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/IntegrationTestWfoBandwidthManagerCreator.java b/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/IntegrationTestWfoBandwidthManagerCreator.java index f7f2833095..425ffe01ad 100644 --- a/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/IntegrationTestWfoBandwidthManagerCreator.java +++ b/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/IntegrationTestWfoBandwidthManagerCreator.java @@ -19,6 +19,8 @@ **/ package com.raytheon.uf.edex.datadelivery.bandwidth; +import com.raytheon.uf.common.datadelivery.registry.handlers.IDataSetMetaDataHandler; +import com.raytheon.uf.common.datadelivery.registry.handlers.ISubscriptionHandler; import com.raytheon.uf.edex.datadelivery.bandwidth.EdexBandwidthContextFactory.IEdexBandwidthManagerCreator; import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDao; import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDbInit; @@ -35,6 +37,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthDaoUtil; * Date Ticket# Engineer Description * ------------ ---------- ----------- -------------------------- * Feb 20, 2013 1543 djohnson Initial creation + * Jul 10, 2013 2106 djohnson Dependency inject registry handlers. * * * @@ -50,8 +53,11 @@ public class IntegrationTestWfoBandwidthManagerCreator implements @Override public IBandwidthManager getBandwidthManager(IBandwidthDbInit dbInit, IBandwidthDao bandwidthDao, RetrievalManager retrievalManager, - BandwidthDaoUtil bandwidthDaoUtil) { + BandwidthDaoUtil bandwidthDaoUtil, + IDataSetMetaDataHandler dataSetMetaDataHandler, + ISubscriptionHandler subscriptionHandler) { return new IntegrationTestWfoBandwidthManager(dbInit, bandwidthDao, - retrievalManager, bandwidthDaoUtil); + retrievalManager, bandwidthDaoUtil, dataSetMetaDataHandler, + subscriptionHandler); } } diff --git a/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/WfoNcfBandwidthManagerIntTest.java b/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/WfoNcfBandwidthManagerIntTest.java index d05db9d28c..240fefe26d 100644 --- a/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/WfoNcfBandwidthManagerIntTest.java +++ b/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/WfoNcfBandwidthManagerIntTest.java @@ -57,6 +57,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDao; * Date Ticket# Engineer Description * ------------ ---------- ----------- -------------------------- * Feb 27, 2013 1644 djohnson Initial creation + * Jul 10, 2013 2106 djohnson Dependency inject registry handlers. * * * @@ -78,6 +79,7 @@ public class WfoNcfBandwidthManagerIntTest extends ApplicationContext ncfBandwidthManagerCtx = new ClassPathXmlApplicationContext( new String[] { SpringFiles.UNIT_TEST_DB_BEANS_XML, + SpringFiles.MEMORY_DATADELIVERY_HANDLERS_XML, SpringFiles.RETRIEVAL_DATADELIVERY_DAOS_XML, SpringFiles.BANDWIDTH_DATADELIVERY_DAOS_XML, SpringFiles.BANDWIDTH_DATADELIVERY_XML, diff --git a/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/ncf/IntegrationTestNcfBandwidthManager.java b/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/ncf/IntegrationTestNcfBandwidthManager.java index 402331ec3a..ebfca7e8f3 100644 --- a/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/ncf/IntegrationTestNcfBandwidthManager.java +++ b/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/ncf/IntegrationTestNcfBandwidthManager.java @@ -19,6 +19,8 @@ **/ package com.raytheon.uf.edex.datadelivery.bandwidth.ncf; +import com.raytheon.uf.common.datadelivery.registry.handlers.IDataSetMetaDataHandler; +import com.raytheon.uf.common.datadelivery.registry.handlers.ISubscriptionHandler; import com.raytheon.uf.common.util.JarUtil; import com.raytheon.uf.common.util.SpringFiles; import com.raytheon.uf.edex.datadelivery.bandwidth.IBandwidthManager; @@ -40,6 +42,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthDaoUtil; * ------------ ---------- ----------- -------------------------- * Feb 18, 2013 1543 djohnson Initial creation * Feb 27, 2013 1644 djohnson Extend NCF bandwidth manager. + * Jul 10, 2013 2106 djohnson Dependency inject registry handlers. * * * @@ -66,8 +69,11 @@ public class IntegrationTestNcfBandwidthManager extends NcfBandwidthManager { */ public IntegrationTestNcfBandwidthManager(IBandwidthDbInit dbInit, IBandwidthDao bandwidthDao, RetrievalManager retrievalManager, - BandwidthDaoUtil bandwidthDaoUtil) { - super(dbInit, bandwidthDao, retrievalManager, bandwidthDaoUtil); + BandwidthDaoUtil bandwidthDaoUtil, + IDataSetMetaDataHandler dataSetMetaDataHandler, + ISubscriptionHandler subscriptionHandler) { + super(dbInit, bandwidthDao, retrievalManager, bandwidthDaoUtil, + dataSetMetaDataHandler, subscriptionHandler); } /** diff --git a/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/ncf/IntegrationTestNcfBandwidthManagerCreator.java b/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/ncf/IntegrationTestNcfBandwidthManagerCreator.java index 4ffcf343dc..cff0c38ed5 100644 --- a/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/ncf/IntegrationTestNcfBandwidthManagerCreator.java +++ b/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/ncf/IntegrationTestNcfBandwidthManagerCreator.java @@ -19,9 +19,10 @@ **/ package com.raytheon.uf.edex.datadelivery.bandwidth.ncf; -import com.raytheon.uf.edex.datadelivery.bandwidth.EdexBandwidthContextFactory; -import com.raytheon.uf.edex.datadelivery.bandwidth.IBandwidthManager; +import com.raytheon.uf.common.datadelivery.registry.handlers.IDataSetMetaDataHandler; +import com.raytheon.uf.common.datadelivery.registry.handlers.ISubscriptionHandler; import com.raytheon.uf.edex.datadelivery.bandwidth.EdexBandwidthContextFactory.IEdexBandwidthManagerCreator; +import com.raytheon.uf.edex.datadelivery.bandwidth.IBandwidthManager; import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDao; import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDbInit; import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalManager; @@ -37,6 +38,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthDaoUtil; * Date Ticket# Engineer Description * ------------ ---------- ----------- -------------------------- * Feb 20, 2013 1543 djohnson Initial creation + * Jul 10, 2013 2106 djohnson Dependency inject registry handlers. * * * @@ -52,8 +54,11 @@ public class IntegrationTestNcfBandwidthManagerCreator implements @Override public IBandwidthManager getBandwidthManager(IBandwidthDbInit dbInit, IBandwidthDao bandwidthDao, RetrievalManager retrievalManager, - BandwidthDaoUtil bandwidthDaoUtil) { + BandwidthDaoUtil bandwidthDaoUtil, + IDataSetMetaDataHandler dataSetMetaDataHandler, + ISubscriptionHandler subscriptionHandler) { return new IntegrationTestNcfBandwidthManager(dbInit, bandwidthDao, - retrievalManager, bandwidthDaoUtil); + retrievalManager, bandwidthDaoUtil, dataSetMetaDataHandler, + subscriptionHandler); } } diff --git a/tests/resources/bandwidth/bandwidth-datadelivery-integrationtest-impl.xml b/tests/resources/bandwidth/bandwidth-datadelivery-integrationtest-impl.xml index d3cdb8128d..d88a6d8ce6 100644 --- a/tests/resources/bandwidth/bandwidth-datadelivery-integrationtest-impl.xml +++ b/tests/resources/bandwidth/bandwidth-datadelivery-integrationtest-impl.xml @@ -17,6 +17,9 @@
+ + + - - - - - - - - * @@ -48,8 +49,6 @@ public class RegistryObjectHandlersUtil { private static final String MOCK_DATADELIVERY_HANDLERS_XML = "/datadelivery/mock-datadelivery-handlers.xml"; - private static final String MEMORY_DATADELIVERY_HANDLERS_XML = "/datadelivery/memory-datadelivery-handlers.xml"; - /** * Initializes the handlers with the set of production implementations, * which interact with the registry proper. @@ -69,7 +68,7 @@ public class RegistryObjectHandlersUtil { * Initializes the handlers with a set of in-memory implementations. */ public static void initMemory() { - initHandlersFromSpringFile(MEMORY_DATADELIVERY_HANDLERS_XML); + initHandlersFromSpringFile(SpringFiles.MEMORY_DATADELIVERY_HANDLERS_XML); } /** diff --git a/tests/unit/com/raytheon/uf/common/util/SpringFiles.java b/tests/unit/com/raytheon/uf/common/util/SpringFiles.java index efd8b8ee64..7e9e4d466a 100644 --- a/tests/unit/com/raytheon/uf/common/util/SpringFiles.java +++ b/tests/unit/com/raytheon/uf/common/util/SpringFiles.java @@ -35,6 +35,7 @@ import org.junit.Ignore; * May 02, 2013 1910 djohnson Add validator plugins spring file. * May 28, 2013 1650 djohnson Add event bus spring files. * Jun 24, 2013 2106 djohnson Remove spring file. + * Jul 10, 2013 2106 djohnson Add MEMORY_DATADELIVERY_HANDLERS_XML. * * * @@ -92,6 +93,8 @@ public class SpringFiles { public static final String EVENTBUS_COMMON_XML = "/spring/eventbus-common.xml"; + public static final String MEMORY_DATADELIVERY_HANDLERS_XML = "/datadelivery/memory-datadelivery-handlers.xml"; + public static final String UNIT_TEST_DB_BEANS_XML = "/unit-test-db-beans.xml"; public static final String UNIT_TEST_LOCALIZATION_BEANS_XML = "/unit-test-localization-beans.xml"; diff --git a/tests/unit/com/raytheon/uf/edex/datadelivery/bandwidth/notification/BandwidthSyncEventBusFactory.java b/tests/unit/com/raytheon/uf/edex/datadelivery/bandwidth/notification/BandwidthSyncEventBusFactory.java index 0f83702f7c..a6c87b3376 100644 --- a/tests/unit/com/raytheon/uf/edex/datadelivery/bandwidth/notification/BandwidthSyncEventBusFactory.java +++ b/tests/unit/com/raytheon/uf/edex/datadelivery/bandwidth/notification/BandwidthSyncEventBusFactory.java @@ -35,6 +35,7 @@ import com.google.common.eventbus.EventBus; * ------------ ---------- ----------- -------------------------- * Feb 06, 2013 1543 djohnson Initial creation * May 28, 2013 1650 djohnson Add getEventBuses. + * Jul 10, 2013 2106 djohnson Remove subscriptionBus. * * * @@ -46,18 +47,8 @@ public class BandwidthSyncEventBusFactory implements BandwidthEventBusFactory { private final EventBus dataSetBus = new EventBus(); - private final EventBus subscriptionBus = new EventBus(); - private final EventBus retrievalBus = new EventBus(); - /** - * {@inheritDoc} - */ - @Override - public EventBus getSubscriptionBus() { - return subscriptionBus; - } - /** * {@inheritDoc} */ @@ -79,7 +70,6 @@ public class BandwidthSyncEventBusFactory implements BandwidthEventBusFactory { */ @Override public List getEventBuses() { - return Arrays. asList(dataSetBus, retrievalBus, - subscriptionBus); + return Arrays. asList(dataSetBus, retrievalBus); } } diff --git a/tests/unit/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/SubscriptionRetrievalAgentTest.java b/tests/unit/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/SubscriptionRetrievalAgentTest.java index 9c85d573a0..d83af267c3 100644 --- a/tests/unit/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/SubscriptionRetrievalAgentTest.java +++ b/tests/unit/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/SubscriptionRetrievalAgentTest.java @@ -66,6 +66,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord.Sta * Date Ticket# Engineer Description * ------------ ---------- ----------- -------------------------- * Jan 30, 2013 1543 djohnson Initial creation + * Jul 10, 2013 2106 djohnson Inject providerHandler. * * * @@ -119,7 +120,7 @@ public class SubscriptionRetrievalAgentTest { SubscriptionRetrievalAgent agent = new SubscriptionRetrievalAgent( route, "someUri", new Object(), 1, null, bandwidthDao, - retrievalDao) { + retrievalDao, DataDeliveryHandlers.getProviderHandler()) { @Override void wakeRetrievalTasks() throws EdexException { // Do nothing diff --git a/tests/unit/com/raytheon/uf/edex/datadelivery/bandwidth/util/FindActiveSubscriptionsForRouteTest.java b/tests/unit/com/raytheon/uf/edex/datadelivery/bandwidth/util/FindActiveSubscriptionsForRouteTest.java index 1972173e07..45a6577430 100644 --- a/tests/unit/com/raytheon/uf/edex/datadelivery/bandwidth/util/FindActiveSubscriptionsForRouteTest.java +++ b/tests/unit/com/raytheon/uf/edex/datadelivery/bandwidth/util/FindActiveSubscriptionsForRouteTest.java @@ -29,9 +29,9 @@ import org.junit.BeforeClass; import org.junit.Test; import com.raytheon.uf.common.datadelivery.registry.Network; +import com.raytheon.uf.common.datadelivery.registry.SiteSubscription; import com.raytheon.uf.common.datadelivery.registry.Subscription; import com.raytheon.uf.common.datadelivery.registry.SubscriptionBuilder; -import com.raytheon.uf.common.datadelivery.registry.SiteSubscription; import com.raytheon.uf.common.datadelivery.registry.handlers.DataDeliveryHandlers; import com.raytheon.uf.common.datadelivery.registry.handlers.ISubscriptionHandler; import com.raytheon.uf.common.registry.handler.RegistryHandlerException; @@ -48,6 +48,7 @@ import com.raytheon.uf.common.registry.handler.RegistryObjectHandlersUtil; * ------------ ---------- ----------- -------------------------- * Feb 19, 2013 1543 djohnson Initial creation * Mar 28, 2013 1841 djohnson Subscription is now UserSubscription. + * Jul 10, 2013 2106 djohnson Inject subscriptionHandler. * * * @@ -56,12 +57,12 @@ import com.raytheon.uf.common.registry.handler.RegistryObjectHandlersUtil; */ public class FindActiveSubscriptionsForRouteTest { + private static ISubscriptionHandler subscriptionHandler; @BeforeClass public static void classSetUp() throws RegistryHandlerException { RegistryObjectHandlersUtil.initMemory(); - final ISubscriptionHandler subscriptionHandler = DataDeliveryHandlers - .getSubscriptionHandler(); + subscriptionHandler = DataDeliveryHandlers.getSubscriptionHandler(); // Two OPSNET subscriptions final SiteSubscription opsnetSub1 = new SubscriptionBuilder() @@ -86,7 +87,7 @@ public class FindActiveSubscriptionsForRouteTest { public void findsSubscriptionForSingleRoute() throws RegistryHandlerException { final Set subscriptions = new FindActiveSubscriptionsForRoute( - Network.SBN).findSubscriptionsToSchedule(); + subscriptionHandler, Network.SBN).findSubscriptionsToSchedule(); assertThat(subscriptions, hasSize(2)); for (Subscription subscription : subscriptions) { assertThat(subscription.getRoute(), is(Network.SBN)); @@ -97,7 +98,8 @@ public class FindActiveSubscriptionsForRouteTest { public void findsSubscriptionsForMultipleRoutes() throws RegistryHandlerException { final Set subscriptions = new FindActiveSubscriptionsForRoute( - Network.OPSNET, Network.SBN).findSubscriptionsToSchedule(); + subscriptionHandler, Network.OPSNET, Network.SBN) + .findSubscriptionsToSchedule(); assertThat(subscriptions, hasSize(4)); }