Issue #1910 EventBus register/unregister fixes and test memory management

Change-Id: I840b2ff12db8f24d3f5e186dff4e8a513a86f48e

Former-commit-id: 33b21c4faf [formerly 85ca4f57ec] [formerly 5101c1585d] [formerly 33b21c4faf [formerly 85ca4f57ec] [formerly 5101c1585d] [formerly f8bba72501 [formerly 5101c1585d [formerly eaf92d9fedd4a429bab3b19b9e61e044dde103eb]]]]
Former-commit-id: f8bba72501
Former-commit-id: dae0da95e9 [formerly 15658cb411] [formerly 549e7f64a3a83b47ebb715d61c377aef80f21a2a [formerly e51bd3ddcd]]
Former-commit-id: 71b1ab7d73254ef40037833f63c8bdab70e2f66b [formerly c7c996d695]
Former-commit-id: c48cef6b50
This commit is contained in:
Dustin Johnson 2013-04-29 09:26:45 -05:00
parent b609e4cd01
commit 4d5fd42a53
7 changed files with 187 additions and 84 deletions

View file

@ -2,6 +2,10 @@ package com.raytheon.uf.common.event;
import java.util.ServiceLoader; import java.util.ServiceLoader;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
/** /**
* The EventBus. * The EventBus.
* *
@ -15,6 +19,7 @@ import java.util.ServiceLoader;
* add unregister. * add unregister.
* Dec 11, 2012 1407 djohnson Separate the creation of the Google EventBus from the wrapper class. * Dec 11, 2012 1407 djohnson Separate the creation of the Google EventBus from the wrapper class.
* Feb 05, 2013 1580 mpduff Moved to common, use IEventBusHandler. * Feb 05, 2013 1580 mpduff Moved to common, use IEventBusHandler.
* Apr 29, 2013 1910 djohnson Watch for NPEs and errors unregistering.
* *
* </pre> * </pre>
* *
@ -29,6 +34,11 @@ public final class EventBus {
.iterator().next(); .iterator().next();
} }
private static final IUFStatusHandler statusHandler = UFStatus
.getHandler(EventBus.class);
private static final String NULL_SUBSCRIBER = "Ignoring a null subscriber.";
private EventBus() { private EventBus() {
} }
@ -40,7 +50,12 @@ public final class EventBus {
* The subscriber to register * The subscriber to register
*/ */
public static void register(Object subscriber) { public static void register(Object subscriber) {
handler.register(subscriber); if (subscriber != null) {
handler.register(subscriber);
} else {
statusHandler.handle(Priority.WARN, NULL_SUBSCRIBER,
new IllegalArgumentException(NULL_SUBSCRIBER));
}
} }
/** /**
@ -50,7 +65,19 @@ public final class EventBus {
* The subscriber to unregister * The subscriber to unregister
*/ */
public static void unregister(Object subscriber) { public static void unregister(Object subscriber) {
handler.unregister(subscriber); if (subscriber != null) {
try {
handler.unregister(subscriber);
} catch (Throwable t) {
statusHandler.handle(Priority.WARN,
"Unable to unregister subscriber of type ["
+ subscriber.getClass().getName()
+ "] from the retrieval event bus!", t);
}
} else {
statusHandler.handle(Priority.WARN, NULL_SUBSCRIBER,
new IllegalArgumentException(NULL_SUBSCRIBER));
}
} }
/** /**

View file

@ -107,8 +107,7 @@ public class EdexBandwidthContextFactory implements BandwidthContextFactory {
* @param instance * @param instance
* the {@link BandwidthManager} instance * the {@link BandwidthManager} instance
*/ */
@SuppressWarnings("unused") EdexBandwidthContextFactory(BandwidthManager instance) {
private EdexBandwidthContextFactory(BandwidthManager instance) {
EdexBandwidthContextFactory.instance = instance; EdexBandwidthContextFactory.instance = instance;
this.bandwidthDao = null; this.bandwidthDao = null;
this.bandwidthInitializer = null; this.bandwidthInitializer = null;

View file

@ -5,6 +5,9 @@ import java.util.ServiceLoader;
import com.raytheon.uf.common.datadelivery.registry.DataSetMetaData; import com.raytheon.uf.common.datadelivery.registry.DataSetMetaData;
import com.raytheon.uf.common.datadelivery.registry.Subscription; import com.raytheon.uf.common.datadelivery.registry.Subscription;
import com.raytheon.uf.common.registry.event.RemoveRegistryEvent; import com.raytheon.uf.common.registry.event.RemoveRegistryEvent;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.SubscriptionRetrieval; import com.raytheon.uf.edex.datadelivery.bandwidth.dao.SubscriptionRetrieval;
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.SubscriptionRetrievalFulfilled; import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.SubscriptionRetrievalFulfilled;
@ -20,13 +23,14 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.SubscriptionRetriev
* Oct 10, 2012 0726 djohnson Make buses final. * Oct 10, 2012 0726 djohnson Make buses final.
* Dec 11, 2012 1286 djohnson Create a factory to hold Google event buses. * Dec 11, 2012 1286 djohnson Create a factory to hold Google event buses.
* Feb 07, 2013 1543 djohnson Changed to behave similarly to EventBus. * Feb 07, 2013 1543 djohnson Changed to behave similarly to EventBus.
* Apr 29, 2013 1910 djohnson Watch for NPEs and errors unregistering.
* *
* </pre> * </pre>
* *
* @version 1.0 * @version 1.0
*/ */
public class BandwidthEventBus { public class BandwidthEventBus {
private static final BandwidthEventBusFactory eventBusFactory; private static final BandwidthEventBusFactory eventBusFactory;
static { static {
eventBusFactory = ServiceLoader eventBusFactory = ServiceLoader
@ -43,15 +47,25 @@ public class BandwidthEventBus {
private static final com.google.common.eventbus.EventBus retrievalBus = eventBusFactory private static final com.google.common.eventbus.EventBus retrievalBus = eventBusFactory
.getRetrievalBus(); .getRetrievalBus();
private static final IUFStatusHandler statusHandler = UFStatus
.getHandler(BandwidthEventBus.class);
private static final String NULL_SUBSCRIBER = "Ignoring a null subscriber.";
/** /**
* Registers an object with the event bus. * Registers an object with the event bus.
* *
* @param subscriber * @param subscriber
*/ */
public static void register(Object subscriber) { public static void register(Object subscriber) {
BandwidthEventBus.retrievalBus.register(subscriber); if (subscriber != null) {
BandwidthEventBus.subscriptionBus.register(subscriber); BandwidthEventBus.retrievalBus.register(subscriber);
BandwidthEventBus.dataSetBus.register(subscriber); BandwidthEventBus.subscriptionBus.register(subscriber);
BandwidthEventBus.dataSetBus.register(subscriber);
} else {
statusHandler.handle(Priority.WARN, NULL_SUBSCRIBER,
new IllegalArgumentException(NULL_SUBSCRIBER));
}
} }
/** /**
@ -60,9 +74,35 @@ public class BandwidthEventBus {
* @param subscriber * @param subscriber
*/ */
public static void unregister(Object subscriber) { public static void unregister(Object subscriber) {
BandwidthEventBus.retrievalBus.unregister(subscriber); if (subscriber != null) {
BandwidthEventBus.subscriptionBus.unregister(subscriber); try {
BandwidthEventBus.dataSetBus.unregister(subscriber); BandwidthEventBus.retrievalBus.unregister(subscriber);
} catch (Throwable t) {
statusHandler.handle(Priority.WARN,
"Unable to unregister subscriber of type ["
+ subscriber.getClass().getName()
+ "] from the retrieval event bus!", t);
}
try {
BandwidthEventBus.subscriptionBus.unregister(subscriber);
} catch (Throwable t) {
statusHandler.handle(Priority.WARN,
"Unable to unregister subscriber of type ["
+ subscriber.getClass().getName()
+ "] from the subscription event bus!", t);
}
try {
BandwidthEventBus.dataSetBus.unregister(subscriber);
} catch (Throwable t) {
statusHandler.handle(Priority.WARN,
"Unable to unregister subscriber of type ["
+ subscriber.getClass().getName()
+ "] from the dataSet event bus!", t);
}
} else {
statusHandler.handle(Priority.WARN, NULL_SUBSCRIBER,
new IllegalArgumentException(NULL_SUBSCRIBER));
}
} }
/** /**

View file

@ -63,6 +63,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil;
* Feb 07, 2013 1543 djohnson Remove unnecessary test setup methods. * Feb 07, 2013 1543 djohnson Remove unnecessary test setup methods.
* Feb 20, 2013 1543 djohnson Delegate to sub-classes for which route to create subscriptions for. * Feb 20, 2013 1543 djohnson Delegate to sub-classes for which route to create subscriptions for.
* Mar 28, 2013 1841 djohnson Subscription is now UserSubscription. * Mar 28, 2013 1841 djohnson Subscription is now UserSubscription.
* Apr 29, 2013 1910 djohnson Always shutdown bandwidth managers in tests.
* *
* </pre> * </pre>
* *
@ -136,12 +137,24 @@ public abstract class AbstractBandwidthManagerIntTest {
@After @After
public void tearDown() { public void tearDown() {
PathManagerFactoryTest.initLocalization(); PathManagerFactoryTest.initLocalization();
try { shutdownBandwidthManager(bandwidthManager);
bandwidthManager.shutdown(); shutdownBandwidthManager(EdexBandwidthContextFactory.getInstance());
} catch (IllegalArgumentException iae) { new EdexBandwidthContextFactory(null);
// ignore any exceptions occurring about not being a registered }
// event bus handler
iae.printStackTrace(); /**
* Shutdown the bandwidth manager safely.
*
* @param instance
*/
protected void shutdownBandwidthManager(BandwidthManager bwManager) {
if (bwManager != null) {
try {
bwManager.shutdown();
} catch (IllegalArgumentException iae) {
// ignore any exceptions occurring about not being a registered
// event bus handler
}
} }
} }

View file

@ -104,6 +104,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.RetrievalManagerNotifyEvent;
* Feb 14, 2013 1596 djohnson Add test duplicating errors deleting multiple subscriptions for the same provider/dataset. * Feb 14, 2013 1596 djohnson Add test duplicating errors deleting multiple subscriptions for the same provider/dataset.
* Mar 11, 2013 1645 djohnson Test configuration file modifications. * Mar 11, 2013 1645 djohnson Test configuration file modifications.
* Mar 28, 2013 1841 djohnson Subscription is now UserSubscription. * Mar 28, 2013 1841 djohnson Subscription is now UserSubscription.
* Apr 29, 2013 1910 djohnson Always shutdown bandwidth managers in tests.
* *
* </pre> * </pre>
* *
@ -418,8 +419,10 @@ public class BandwidthManagerIntTest extends AbstractBandwidthManagerIntTest {
public void expiredSubscriptionUpdatedToNonExpiredIsScheduled() public void expiredSubscriptionUpdatedToNonExpiredIsScheduled()
throws Exception { throws Exception {
final Date yesterday = new Date(TimeUtil.currentTimeMillis() - TimeUtil.MILLIS_PER_DAY); final Date yesterday = new Date(TimeUtil.currentTimeMillis()
final Date oneHourAgo = new Date(TimeUtil.currentTimeMillis() - TimeUtil.MILLIS_PER_HOUR); - TimeUtil.MILLIS_PER_DAY);
final Date oneHourAgo = new Date(TimeUtil.currentTimeMillis()
- TimeUtil.MILLIS_PER_HOUR);
Subscription subscription = createSubscriptionThatFillsHalfABucket(); Subscription subscription = createSubscriptionThatFillsHalfABucket();
subscription.setSubscriptionStart(yesterday); subscription.setSubscriptionStart(yesterday);
@ -428,7 +431,8 @@ public class BandwidthManagerIntTest extends AbstractBandwidthManagerIntTest {
bandwidthManager.subscriptionUpdated(subscription); bandwidthManager.subscriptionUpdated(subscription);
// Make sure nothing is scheduled when the subscription is expired // Make sure nothing is scheduled when the subscription is expired
assertThat(bandwidthDao.getBandwidthAllocations(subscription.getRoute()), assertThat(
bandwidthDao.getBandwidthAllocations(subscription.getRoute()),
is(empty())); is(empty()));
// No longer expired // No longer expired
@ -653,77 +657,89 @@ public class BandwidthManagerIntTest extends AbstractBandwidthManagerIntTest {
subscription.getTime().setCycleTimes(Arrays.asList(Integer.valueOf(0))); subscription.getTime().setCycleTimes(Arrays.asList(Integer.valueOf(0)));
bandwidthManager.schedule(subscription); bandwidthManager.schedule(subscription);
final BandwidthManager proposed = bandwidthManager BandwidthManager bwProposed = null;
.startProposedBandwidthManager(BandwidthMap try {
.load(EdexBandwidthContextFactory bwProposed = bandwidthManager
.getBandwidthMapConfig())); .startProposedBandwidthManager(BandwidthMap
.load(EdexBandwidthContextFactory
.getBandwidthMapConfig()));
final BandwidthManager proposed = bwProposed;
final BlockingQueue<Exception> queue = new ArrayBlockingQueue<Exception>( final BlockingQueue<Exception> queue = new ArrayBlockingQueue<Exception>(
1); 1);
final int invocationCount = 10; final int invocationCount = 10;
final CountDownLatch waitForAllThreadsReadyLatch = new CountDownLatch( final CountDownLatch waitForAllThreadsReadyLatch = new CountDownLatch(
invocationCount * 2); invocationCount * 2);
final CountDownLatch doneLatch = new CountDownLatch(invocationCount * 2); final CountDownLatch doneLatch = new CountDownLatch(
for (int i = 0; i < invocationCount; i++) { invocationCount * 2);
final int current = i; for (int i = 0; i < invocationCount; i++) {
Thread thread = new Thread() { final int current = i;
@Override Thread thread = new Thread() {
public void run() { @Override
try { public void run() {
// Wait for all threads to check in, then they all start try {
// working at once // Wait for all threads to check in, then they all
waitForAllThreadsReadyLatch.countDown(); // start
waitForAllThreadsReadyLatch.await(); // working at once
proposed.updateDataSetMetaData(OpenDapGriddedDataSetMetaDataFixture.INSTANCE waitForAllThreadsReadyLatch.countDown();
.get(current)); waitForAllThreadsReadyLatch.await();
} catch (Exception e) { proposed.updateDataSetMetaData(OpenDapGriddedDataSetMetaDataFixture.INSTANCE
queue.offer(e); .get(current));
} catch (Exception e) {
queue.offer(e);
}
doneLatch.countDown();
} }
doneLatch.countDown(); };
} thread.start();
}; }
thread.start();
}
for (int i = 0; i < invocationCount; i++) { for (int i = 0; i < invocationCount; i++) {
final int current = i; final int current = i;
Thread thread = new Thread() { Thread thread = new Thread() {
@Override @Override
public void run() { public void run() {
try { try {
final Subscription subscription2 = SubscriptionFixture.INSTANCE final Subscription subscription2 = SubscriptionFixture.INSTANCE
.get(current); .get(current);
subscription2.addParameter(ParameterFixture.INSTANCE subscription2
.get(1)); .addParameter(ParameterFixture.INSTANCE
subscription2.addParameter(ParameterFixture.INSTANCE .get(1));
.get(2)); subscription2
subscription2.addParameter(ParameterFixture.INSTANCE .addParameter(ParameterFixture.INSTANCE
.get(3)); .get(2));
subscription2.getTime().setCycleTimes( subscription2
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, .addParameter(ParameterFixture.INSTANCE
11, 12, 13, 14, 15, 16, 17)); .get(3));
subscription2.setLatencyInMinutes(current); subscription2.getTime().setCycleTimes(
// Wait for all threads to check in, then they all start Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9,
// working at once 10, 11, 12, 13, 14, 15, 16, 17));
waitForAllThreadsReadyLatch.countDown(); subscription2.setLatencyInMinutes(current);
waitForAllThreadsReadyLatch.await(); // Wait for all threads to check in, then they all
proposed.schedule(subscription2); // start
} catch (Exception e) { // working at once
queue.offer(e); waitForAllThreadsReadyLatch.countDown();
waitForAllThreadsReadyLatch.await();
proposed.schedule(subscription2);
} catch (Exception e) {
queue.offer(e);
}
doneLatch.countDown();
} }
doneLatch.countDown(); };
} thread.start();
}; }
thread.start();
}
// Wait for all threads to finish // Wait for all threads to finish
doneLatch.await(); doneLatch.await();
final Exception exception = queue.poll(); final Exception exception = queue.poll();
if (exception != null) { if (exception != null) {
throw exception; throw exception;
}
} finally {
shutdownBandwidthManager(bwProposed);
} }
} }

View file

@ -64,6 +64,7 @@ import com.raytheon.uf.edex.registry.ebxml.util.EbxmlObjectUtil;
* Date Ticket# Engineer Description * Date Ticket# Engineer Description
* ------------ ---------- ----------- -------------------------- * ------------ ---------- ----------- --------------------------
* Apr 23, 2013 1910 djohnson Initial creation * Apr 23, 2013 1910 djohnson Initial creation
* Apr 29, 2013 1910 djohnson Move to integration tests section.
* *
* </pre> * </pre>
* *

View file

@ -33,6 +33,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
@ -84,6 +85,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.interfaces.IRetrievalResponse
* Feb 15, 2013 1543 djohnson Class renames. * Feb 15, 2013 1543 djohnson Class renames.
* Mar 05, 2013 1647 djohnson Pass wmo header strategy to constructor. * Mar 05, 2013 1647 djohnson Pass wmo header strategy to constructor.
* Mar 19, 2013 1794 djohnson RetrievalTasks integrate at a queue. * Mar 19, 2013 1794 djohnson RetrievalTasks integrate at a queue.
* Apr 29, 2013 1910 djohnson Unregister from EventBus after each test.
* *
* </pre> * </pre>
* *
@ -165,6 +167,11 @@ public class RetrievalTaskTest {
EventBus.register(this); EventBus.register(this);
} }
@After
public void tearDown() {
EventBus.unregister(this);
}
@Test @Test
public void processesRetrievalForItsSpecifiedNetwork() public void processesRetrievalForItsSpecifiedNetwork()
throws DataAccessLayerException { throws DataAccessLayerException {