Merge "Issue #1910 EventBus register/unregister fixes and test memory management" into development
Former-commit-id:a09592538b
[formerly65fe5d4aab
] [formerly60b973021f
] [formerly812995d587
[formerly60b973021f
[formerly 33e45869e43a6c8ea8a631fd93dce357eaf414bf]]] Former-commit-id:812995d587
Former-commit-id: 47ca9057ef6220bb37fcd6b4fe528ceb4cc7f72b [formerlyaa7bf81d0a
] Former-commit-id:125167a8bf
This commit is contained in:
commit
c91d7501c7
7 changed files with 187 additions and 84 deletions
|
@ -2,6 +2,10 @@ package com.raytheon.uf.common.event;
|
|||
|
||||
import java.util.ServiceLoader;
|
||||
|
||||
import com.raytheon.uf.common.status.IUFStatusHandler;
|
||||
import com.raytheon.uf.common.status.UFStatus;
|
||||
import com.raytheon.uf.common.status.UFStatus.Priority;
|
||||
|
||||
/**
|
||||
* The EventBus.
|
||||
*
|
||||
|
@ -15,6 +19,7 @@ import java.util.ServiceLoader;
|
|||
* add unregister.
|
||||
* Dec 11, 2012 1407 djohnson Separate the creation of the Google EventBus from the wrapper class.
|
||||
* Feb 05, 2013 1580 mpduff Moved to common, use IEventBusHandler.
|
||||
* Apr 29, 2013 1910 djohnson Watch for NPEs and errors unregistering.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -29,6 +34,11 @@ public final class EventBus {
|
|||
.iterator().next();
|
||||
}
|
||||
|
||||
private static final IUFStatusHandler statusHandler = UFStatus
|
||||
.getHandler(EventBus.class);
|
||||
|
||||
private static final String NULL_SUBSCRIBER = "Ignoring a null subscriber.";
|
||||
|
||||
private EventBus() {
|
||||
|
||||
}
|
||||
|
@ -40,7 +50,12 @@ public final class EventBus {
|
|||
* The subscriber to register
|
||||
*/
|
||||
public static void register(Object subscriber) {
|
||||
if (subscriber != null) {
|
||||
handler.register(subscriber);
|
||||
} else {
|
||||
statusHandler.handle(Priority.WARN, NULL_SUBSCRIBER,
|
||||
new IllegalArgumentException(NULL_SUBSCRIBER));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -50,7 +65,19 @@ public final class EventBus {
|
|||
* The subscriber to unregister
|
||||
*/
|
||||
public static void unregister(Object subscriber) {
|
||||
if (subscriber != null) {
|
||||
try {
|
||||
handler.unregister(subscriber);
|
||||
} catch (Throwable t) {
|
||||
statusHandler.handle(Priority.WARN,
|
||||
"Unable to unregister subscriber of type ["
|
||||
+ subscriber.getClass().getName()
|
||||
+ "] from the retrieval event bus!", t);
|
||||
}
|
||||
} else {
|
||||
statusHandler.handle(Priority.WARN, NULL_SUBSCRIBER,
|
||||
new IllegalArgumentException(NULL_SUBSCRIBER));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -107,8 +107,7 @@ public class EdexBandwidthContextFactory implements BandwidthContextFactory {
|
|||
* @param instance
|
||||
* the {@link BandwidthManager} instance
|
||||
*/
|
||||
@SuppressWarnings("unused")
|
||||
private EdexBandwidthContextFactory(BandwidthManager instance) {
|
||||
EdexBandwidthContextFactory(BandwidthManager instance) {
|
||||
EdexBandwidthContextFactory.instance = instance;
|
||||
this.bandwidthDao = null;
|
||||
this.bandwidthInitializer = null;
|
||||
|
|
|
@ -5,6 +5,9 @@ import java.util.ServiceLoader;
|
|||
import com.raytheon.uf.common.datadelivery.registry.DataSetMetaData;
|
||||
import com.raytheon.uf.common.datadelivery.registry.Subscription;
|
||||
import com.raytheon.uf.common.registry.event.RemoveRegistryEvent;
|
||||
import com.raytheon.uf.common.status.IUFStatusHandler;
|
||||
import com.raytheon.uf.common.status.UFStatus;
|
||||
import com.raytheon.uf.common.status.UFStatus.Priority;
|
||||
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.SubscriptionRetrieval;
|
||||
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.SubscriptionRetrievalFulfilled;
|
||||
|
||||
|
@ -20,6 +23,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.SubscriptionRetriev
|
|||
* Oct 10, 2012 0726 djohnson Make buses final.
|
||||
* Dec 11, 2012 1286 djohnson Create a factory to hold Google event buses.
|
||||
* Feb 07, 2013 1543 djohnson Changed to behave similarly to EventBus.
|
||||
* Apr 29, 2013 1910 djohnson Watch for NPEs and errors unregistering.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -43,15 +47,25 @@ public class BandwidthEventBus {
|
|||
private static final com.google.common.eventbus.EventBus retrievalBus = eventBusFactory
|
||||
.getRetrievalBus();
|
||||
|
||||
private static final IUFStatusHandler statusHandler = UFStatus
|
||||
.getHandler(BandwidthEventBus.class);
|
||||
|
||||
private static final String NULL_SUBSCRIBER = "Ignoring a null subscriber.";
|
||||
|
||||
/**
|
||||
* Registers an object with the event bus.
|
||||
*
|
||||
* @param subscriber
|
||||
*/
|
||||
public static void register(Object subscriber) {
|
||||
if (subscriber != null) {
|
||||
BandwidthEventBus.retrievalBus.register(subscriber);
|
||||
BandwidthEventBus.subscriptionBus.register(subscriber);
|
||||
BandwidthEventBus.dataSetBus.register(subscriber);
|
||||
} else {
|
||||
statusHandler.handle(Priority.WARN, NULL_SUBSCRIBER,
|
||||
new IllegalArgumentException(NULL_SUBSCRIBER));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -60,9 +74,35 @@ public class BandwidthEventBus {
|
|||
* @param subscriber
|
||||
*/
|
||||
public static void unregister(Object subscriber) {
|
||||
if (subscriber != null) {
|
||||
try {
|
||||
BandwidthEventBus.retrievalBus.unregister(subscriber);
|
||||
} catch (Throwable t) {
|
||||
statusHandler.handle(Priority.WARN,
|
||||
"Unable to unregister subscriber of type ["
|
||||
+ subscriber.getClass().getName()
|
||||
+ "] from the retrieval event bus!", t);
|
||||
}
|
||||
try {
|
||||
BandwidthEventBus.subscriptionBus.unregister(subscriber);
|
||||
} catch (Throwable t) {
|
||||
statusHandler.handle(Priority.WARN,
|
||||
"Unable to unregister subscriber of type ["
|
||||
+ subscriber.getClass().getName()
|
||||
+ "] from the subscription event bus!", t);
|
||||
}
|
||||
try {
|
||||
BandwidthEventBus.dataSetBus.unregister(subscriber);
|
||||
} catch (Throwable t) {
|
||||
statusHandler.handle(Priority.WARN,
|
||||
"Unable to unregister subscriber of type ["
|
||||
+ subscriber.getClass().getName()
|
||||
+ "] from the dataSet event bus!", t);
|
||||
}
|
||||
} else {
|
||||
statusHandler.handle(Priority.WARN, NULL_SUBSCRIBER,
|
||||
new IllegalArgumentException(NULL_SUBSCRIBER));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -63,6 +63,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil;
|
|||
* Feb 07, 2013 1543 djohnson Remove unnecessary test setup methods.
|
||||
* Feb 20, 2013 1543 djohnson Delegate to sub-classes for which route to create subscriptions for.
|
||||
* Mar 28, 2013 1841 djohnson Subscription is now UserSubscription.
|
||||
* Apr 29, 2013 1910 djohnson Always shutdown bandwidth managers in tests.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -136,12 +137,24 @@ public abstract class AbstractBandwidthManagerIntTest {
|
|||
@After
|
||||
public void tearDown() {
|
||||
PathManagerFactoryTest.initLocalization();
|
||||
shutdownBandwidthManager(bandwidthManager);
|
||||
shutdownBandwidthManager(EdexBandwidthContextFactory.getInstance());
|
||||
new EdexBandwidthContextFactory(null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown the bandwidth manager safely.
|
||||
*
|
||||
* @param instance
|
||||
*/
|
||||
protected void shutdownBandwidthManager(BandwidthManager bwManager) {
|
||||
if (bwManager != null) {
|
||||
try {
|
||||
bandwidthManager.shutdown();
|
||||
bwManager.shutdown();
|
||||
} catch (IllegalArgumentException iae) {
|
||||
// ignore any exceptions occurring about not being a registered
|
||||
// event bus handler
|
||||
iae.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -104,6 +104,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.RetrievalManagerNotifyEvent;
|
|||
* Feb 14, 2013 1596 djohnson Add test duplicating errors deleting multiple subscriptions for the same provider/dataset.
|
||||
* Mar 11, 2013 1645 djohnson Test configuration file modifications.
|
||||
* Mar 28, 2013 1841 djohnson Subscription is now UserSubscription.
|
||||
* Apr 29, 2013 1910 djohnson Always shutdown bandwidth managers in tests.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -418,8 +419,10 @@ public class BandwidthManagerIntTest extends AbstractBandwidthManagerIntTest {
|
|||
public void expiredSubscriptionUpdatedToNonExpiredIsScheduled()
|
||||
throws Exception {
|
||||
|
||||
final Date yesterday = new Date(TimeUtil.currentTimeMillis() - TimeUtil.MILLIS_PER_DAY);
|
||||
final Date oneHourAgo = new Date(TimeUtil.currentTimeMillis() - TimeUtil.MILLIS_PER_HOUR);
|
||||
final Date yesterday = new Date(TimeUtil.currentTimeMillis()
|
||||
- TimeUtil.MILLIS_PER_DAY);
|
||||
final Date oneHourAgo = new Date(TimeUtil.currentTimeMillis()
|
||||
- TimeUtil.MILLIS_PER_HOUR);
|
||||
|
||||
Subscription subscription = createSubscriptionThatFillsHalfABucket();
|
||||
subscription.setSubscriptionStart(yesterday);
|
||||
|
@ -428,7 +431,8 @@ public class BandwidthManagerIntTest extends AbstractBandwidthManagerIntTest {
|
|||
bandwidthManager.subscriptionUpdated(subscription);
|
||||
|
||||
// Make sure nothing is scheduled when the subscription is expired
|
||||
assertThat(bandwidthDao.getBandwidthAllocations(subscription.getRoute()),
|
||||
assertThat(
|
||||
bandwidthDao.getBandwidthAllocations(subscription.getRoute()),
|
||||
is(empty()));
|
||||
|
||||
// No longer expired
|
||||
|
@ -653,10 +657,13 @@ public class BandwidthManagerIntTest extends AbstractBandwidthManagerIntTest {
|
|||
subscription.getTime().setCycleTimes(Arrays.asList(Integer.valueOf(0)));
|
||||
|
||||
bandwidthManager.schedule(subscription);
|
||||
final BandwidthManager proposed = bandwidthManager
|
||||
BandwidthManager bwProposed = null;
|
||||
try {
|
||||
bwProposed = bandwidthManager
|
||||
.startProposedBandwidthManager(BandwidthMap
|
||||
.load(EdexBandwidthContextFactory
|
||||
.getBandwidthMapConfig()));
|
||||
final BandwidthManager proposed = bwProposed;
|
||||
|
||||
final BlockingQueue<Exception> queue = new ArrayBlockingQueue<Exception>(
|
||||
1);
|
||||
|
@ -664,14 +671,16 @@ public class BandwidthManagerIntTest extends AbstractBandwidthManagerIntTest {
|
|||
final int invocationCount = 10;
|
||||
final CountDownLatch waitForAllThreadsReadyLatch = new CountDownLatch(
|
||||
invocationCount * 2);
|
||||
final CountDownLatch doneLatch = new CountDownLatch(invocationCount * 2);
|
||||
final CountDownLatch doneLatch = new CountDownLatch(
|
||||
invocationCount * 2);
|
||||
for (int i = 0; i < invocationCount; i++) {
|
||||
final int current = i;
|
||||
Thread thread = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
// Wait for all threads to check in, then they all start
|
||||
// Wait for all threads to check in, then they all
|
||||
// start
|
||||
// working at once
|
||||
waitForAllThreadsReadyLatch.countDown();
|
||||
waitForAllThreadsReadyLatch.await();
|
||||
|
@ -694,17 +703,21 @@ public class BandwidthManagerIntTest extends AbstractBandwidthManagerIntTest {
|
|||
try {
|
||||
final Subscription subscription2 = SubscriptionFixture.INSTANCE
|
||||
.get(current);
|
||||
subscription2.addParameter(ParameterFixture.INSTANCE
|
||||
subscription2
|
||||
.addParameter(ParameterFixture.INSTANCE
|
||||
.get(1));
|
||||
subscription2.addParameter(ParameterFixture.INSTANCE
|
||||
subscription2
|
||||
.addParameter(ParameterFixture.INSTANCE
|
||||
.get(2));
|
||||
subscription2.addParameter(ParameterFixture.INSTANCE
|
||||
subscription2
|
||||
.addParameter(ParameterFixture.INSTANCE
|
||||
.get(3));
|
||||
subscription2.getTime().setCycleTimes(
|
||||
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
|
||||
11, 12, 13, 14, 15, 16, 17));
|
||||
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9,
|
||||
10, 11, 12, 13, 14, 15, 16, 17));
|
||||
subscription2.setLatencyInMinutes(current);
|
||||
// Wait for all threads to check in, then they all start
|
||||
// Wait for all threads to check in, then they all
|
||||
// start
|
||||
// working at once
|
||||
waitForAllThreadsReadyLatch.countDown();
|
||||
waitForAllThreadsReadyLatch.await();
|
||||
|
@ -725,6 +738,9 @@ public class BandwidthManagerIntTest extends AbstractBandwidthManagerIntTest {
|
|||
if (exception != null) {
|
||||
throw exception;
|
||||
}
|
||||
} finally {
|
||||
shutdownBandwidthManager(bwProposed);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -64,6 +64,7 @@ import com.raytheon.uf.edex.registry.ebxml.util.EbxmlObjectUtil;
|
|||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Apr 23, 2013 1910 djohnson Initial creation
|
||||
* Apr 29, 2013 1910 djohnson Move to integration tests section.
|
||||
*
|
||||
* </pre>
|
||||
*
|
|
@ -33,6 +33,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
@ -84,6 +85,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.interfaces.IRetrievalResponse
|
|||
* Feb 15, 2013 1543 djohnson Class renames.
|
||||
* Mar 05, 2013 1647 djohnson Pass wmo header strategy to constructor.
|
||||
* Mar 19, 2013 1794 djohnson RetrievalTasks integrate at a queue.
|
||||
* Apr 29, 2013 1910 djohnson Unregister from EventBus after each test.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -165,6 +167,11 @@ public class RetrievalTaskTest {
|
|||
EventBus.register(this);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
EventBus.unregister(this);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void processesRetrievalForItsSpecifiedNetwork()
|
||||
throws DataAccessLayerException {
|
||||
|
|
Loading…
Add table
Reference in a new issue