Issue #1802 Fix event processing in regards to transaction commits/rollbacks
Change-Id: If5988d96b7e7382e04879ca6126a61237d70bd22 Former-commit-id: 25398fcce99b15d3fda71cad34f39222e30458dd
This commit is contained in:
parent
f02b073cc8
commit
ecae75ecd8
12 changed files with 630 additions and 76 deletions
|
@ -2,6 +2,8 @@ package com.raytheon.uf.common.event;
|
|||
|
||||
import java.util.ServiceLoader;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* The EventBus.
|
||||
*
|
||||
|
@ -17,6 +19,7 @@ import java.util.ServiceLoader;
|
|||
* Feb 05, 2013 1580 mpduff Moved to common, use IEventBusHandler.
|
||||
* Apr 29, 2013 1910 djohnson Watch for NPEs and errors unregistering.
|
||||
* May 28, 2013 1650 djohnson Simplify and extract out the general event bus handling for reuse.
|
||||
* Jun 20, 2013 1802 djohnson Allow test code to set an explicit event bus handler.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -25,7 +28,8 @@ import java.util.ServiceLoader;
|
|||
*/
|
||||
public final class EventBus {
|
||||
|
||||
private static final IEventBusHandler handler;
|
||||
@VisibleForTesting
|
||||
static IEventBusHandler handler;
|
||||
static {
|
||||
handler = ServiceLoader.<IEventBusHandler> load(IEventBusHandler.class)
|
||||
.iterator().next();
|
||||
|
|
|
@ -115,6 +115,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil;
|
|||
* May 02, 2013 1910 djohnson Shutdown proposed bandwidth managers in a finally.
|
||||
* May 20, 2013 1650 djohnson Add in capability to find required dataset size.
|
||||
* Jun 03, 2013 2038 djohnson Add base functionality to handle point data type subscriptions.
|
||||
* Jun 20, 2013 1802 djohnson Check several times for the metadata for now.
|
||||
* </pre>
|
||||
*
|
||||
* @author dhladky
|
||||
|
@ -188,7 +189,20 @@ public abstract class BandwidthManager extends
|
|||
|
||||
if (DataDeliveryRegistryObjectTypes.DATASETMETADATA.equals(objectType)) {
|
||||
|
||||
DataSetMetaData dsmd = getDataSetMetaData(id);
|
||||
DataSetMetaData dsmd = null;
|
||||
int attempts = 0;
|
||||
do {
|
||||
attempts++;
|
||||
dsmd = getDataSetMetaData(id);
|
||||
if (dsmd == null) {
|
||||
try {
|
||||
Thread.sleep(10);
|
||||
} catch (InterruptedException e) {
|
||||
statusHandler.handle(Priority.PROBLEM,
|
||||
e.getLocalizedMessage(), e);
|
||||
}
|
||||
}
|
||||
} while (dsmd == null && attempts < 20);
|
||||
|
||||
if (dsmd != null) {
|
||||
// Repost the Object to the BandwidthEventBus to free
|
||||
|
@ -209,7 +223,7 @@ public abstract class BandwidthManager extends
|
|||
BandwidthEventBus.publish(dsmd);
|
||||
} else {
|
||||
statusHandler.error("No DataSetMetaData found for id [" + id
|
||||
+ "]");
|
||||
+ "] after " + attempts + " attempts");
|
||||
}
|
||||
|
||||
} else if (DataDeliveryRegistryObjectTypes.SITE_SUBSCRIPTION
|
||||
|
@ -265,8 +279,7 @@ public abstract class BandwidthManager extends
|
|||
*/
|
||||
@Subscribe
|
||||
public void updateGriddedDataSetMetaData(
|
||||
GriddedDataSetMetaData dataSetMetaData)
|
||||
throws ParseException {
|
||||
GriddedDataSetMetaData dataSetMetaData) throws ParseException {
|
||||
// Daily/Hourly/Monthly datasets
|
||||
if (dataSetMetaData.getCycle() == GriddedDataSetMetaData.NO_CYCLE) {
|
||||
updateDataSetMetaDataWithoutCycle(dataSetMetaData);
|
||||
|
|
|
@ -28,6 +28,7 @@ import com.raytheon.uf.common.datadelivery.registry.DataDeliveryRegistryObjectTy
|
|||
import com.raytheon.uf.common.datadelivery.registry.DataSet;
|
||||
import com.raytheon.uf.common.datadelivery.registry.Subscription;
|
||||
import com.raytheon.uf.common.datadelivery.registry.handlers.DataDeliveryHandlers;
|
||||
import com.raytheon.uf.common.datadelivery.registry.handlers.IDataSetHandler;
|
||||
import com.raytheon.uf.common.datadelivery.registry.handlers.ISubscriptionHandler;
|
||||
import com.raytheon.uf.common.event.EventBus;
|
||||
import com.raytheon.uf.common.registry.event.InsertRegistryEvent;
|
||||
|
@ -51,6 +52,7 @@ import com.raytheon.uf.common.util.CollectionUtil;
|
|||
* 3/18/2013 1802 bphillip Modified to use proper transaction boundaries
|
||||
* May 08, 2013 2000 djohnson Shortcut out if no subscriptions are returned for the dataset.
|
||||
* May 20, 2013 2000 djohnson Shortcut out if no subscription handler is available.
|
||||
* Jun 20, 2013 1802 djohnson Check several times for the dataset for now.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -218,8 +220,24 @@ public class SubscriptionIntegrityVerifier {
|
|||
|
||||
if (DataDeliveryRegistryObjectTypes.DATASET.equals(objectType)) {
|
||||
try {
|
||||
DataSet dataSet = DataDeliveryHandlers.getDataSetHandler()
|
||||
.getById(event.getId());
|
||||
final IDataSetHandler dataSetHandler = DataDeliveryHandlers
|
||||
.getDataSetHandler();
|
||||
|
||||
DataSet dataSet = null;
|
||||
int attempts = 0;
|
||||
do {
|
||||
attempts++;
|
||||
dataSet = dataSetHandler.getById(event.getId());
|
||||
if (dataSet == null) {
|
||||
try {
|
||||
Thread.sleep(10);
|
||||
} catch (InterruptedException e) {
|
||||
statusHandler.handle(Priority.PROBLEM,
|
||||
e.getLocalizedMessage(), e);
|
||||
}
|
||||
}
|
||||
} while (dataSet == null && attempts < 20);
|
||||
|
||||
if (dataSet != null) {
|
||||
dataSetUpdated(dataSet);
|
||||
}
|
||||
|
|
|
@ -24,10 +24,8 @@ import java.util.IdentityHashMap;
|
|||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.springframework.transaction.support.TransactionSynchronization;
|
||||
import org.springframework.transaction.support.TransactionSynchronizationManager;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.eventbus.EventBus;
|
||||
import com.raytheon.uf.common.event.IBaseEventBusHandler;
|
||||
import com.raytheon.uf.common.status.IUFStatusHandler;
|
||||
|
@ -44,6 +42,7 @@ import com.raytheon.uf.common.status.UFStatus.Priority;
|
|||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* May 28, 2013 1650 djohnson Simplified and extracted from {@link EdexEventBusHandler}.
|
||||
* Jun 20, 2013 1802 djohnson Thread local is not safe across multiple transaction levels.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -52,20 +51,11 @@ import com.raytheon.uf.common.status.UFStatus.Priority;
|
|||
*/
|
||||
|
||||
public abstract class BaseEdexEventBusHandler<T> implements
|
||||
IBaseEventBusHandler<T>, TransactionSynchronization {
|
||||
IBaseEventBusHandler<T> {
|
||||
|
||||
private static final IUFStatusHandler statusHandler = UFStatus
|
||||
.getHandler(BaseEdexEventBusHandler.class);
|
||||
|
||||
private final ThreadLocal<List<T>> eventStorageList = new ThreadLocal<List<T>>() {
|
||||
|
||||
@Override
|
||||
protected List<T> initialValue() {
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
private static final String NULL_SUBSCRIBER = "Ignoring a null subscriber.";
|
||||
|
||||
// Set that keeps a reference to all objects which have registered, which
|
||||
|
@ -109,13 +99,10 @@ public abstract class BaseEdexEventBusHandler<T> implements
|
|||
if (isTransactionActive()) {
|
||||
|
||||
if (TransactionSynchronizationManager.isSynchronizationActive()) {
|
||||
if (!TransactionSynchronizationManager.getSynchronizations()
|
||||
.contains(this)) {
|
||||
TransactionSynchronizationManager
|
||||
.registerSynchronization(this);
|
||||
.registerSynchronization(new EventTransactionSynchronization(
|
||||
event, googleEventBuses));
|
||||
}
|
||||
}
|
||||
eventStorageList.get().add(event);
|
||||
} else {
|
||||
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
|
||||
statusHandler
|
||||
|
@ -202,48 +189,4 @@ public abstract class BaseEdexEventBusHandler<T> implements
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void suspend() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resume() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeCommit(boolean readOnly) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeCompletion() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterCommit() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterCompletion(int status) {
|
||||
List<T> list = eventStorageList.get();
|
||||
if (status == TransactionSynchronization.STATUS_COMMITTED) {
|
||||
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
|
||||
statusHandler.debug("Posting " + list.size()
|
||||
+ " objects on the event bus");
|
||||
}
|
||||
for (T event : list) {
|
||||
for (EventBus eventBus : googleEventBuses) {
|
||||
eventBus.post(event);
|
||||
}
|
||||
}
|
||||
} else if (status == TransactionSynchronization.STATUS_ROLLED_BACK) {
|
||||
statusHandler.info("Transaction rolled back. Discarding "
|
||||
+ list.size() + " events.");
|
||||
}
|
||||
list.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() {
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,127 @@
|
|||
/**
|
||||
* This software was developed and / or modified by Raytheon Company,
|
||||
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
||||
*
|
||||
* U.S. EXPORT CONTROLLED TECHNICAL DATA
|
||||
* This software product contains export-restricted data whose
|
||||
* export/transfer/disclosure is restricted by U.S. law. Dissemination
|
||||
* to non-U.S. persons whether in the United States or abroad requires
|
||||
* an export license or other authorization.
|
||||
*
|
||||
* Contractor Name: Raytheon Company
|
||||
* Contractor Address: 6825 Pine Street, Suite 340
|
||||
* Mail Stop B8
|
||||
* Omaha, NE 68106
|
||||
* 402.291.0100
|
||||
*
|
||||
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||
* further licensing information.
|
||||
**/
|
||||
package com.raytheon.uf.edex.event;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
import org.springframework.transaction.support.TransactionSynchronization;
|
||||
|
||||
import com.google.common.eventbus.EventBus;
|
||||
import com.raytheon.uf.common.status.IUFStatusHandler;
|
||||
import com.raytheon.uf.common.status.UFStatus;
|
||||
import com.raytheon.uf.common.status.UFStatus.Priority;
|
||||
|
||||
/**
|
||||
* Spring {@link TransactionSynchronization} that will post or discard events
|
||||
* based on whether or not the transaction commits/rolls back. Intentionally
|
||||
* package-private as the class is not part of the public API.
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
* SOFTWARE HISTORY
|
||||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Jun 20, 2013 1802 djohnson Initial creation
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
* @author djohnson
|
||||
* @version 1.0
|
||||
*/
|
||||
class EventTransactionSynchronization implements
|
||||
TransactionSynchronization {
|
||||
|
||||
private static final IUFStatusHandler statusHandler = UFStatus
|
||||
.getHandler(EventTransactionSynchronization.class);
|
||||
|
||||
private final Object event;
|
||||
|
||||
private final Collection<EventBus> googleEventBuses;
|
||||
|
||||
public EventTransactionSynchronization(Object event,
|
||||
Collection<EventBus> googleEventBuses) {
|
||||
this.event = event;
|
||||
this.googleEventBuses = googleEventBuses;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public void suspend() {
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public void resume() {
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public void flush() {
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public void beforeCommit(boolean readOnly) {
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public void beforeCompletion() {
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public void afterCommit() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterCompletion(int status) {
|
||||
if (status == TransactionSynchronization.STATUS_COMMITTED) {
|
||||
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
|
||||
statusHandler.debug("Posting event of type ["
|
||||
+ event.getClass().getName() + "] on the event bus");
|
||||
}
|
||||
|
||||
for (EventBus eventBus : googleEventBuses) {
|
||||
eventBus.post(event);
|
||||
}
|
||||
} else if (status == TransactionSynchronization.STATUS_ROLLED_BACK) {
|
||||
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
|
||||
statusHandler.debug("Discarding event of type ["
|
||||
+ event.getClass().getName()
|
||||
+ "] due to transaction rolling back.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1 +1 @@
|
|||
com.raytheon.uf.edex.event.TestEventBusHandler
|
||||
com.raytheon.uf.edex.event.TransactionalSynchronousEventBusHandler
|
|
@ -32,6 +32,7 @@ import com.raytheon.uf.common.stats.ProcessEvent;
|
|||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* May 28, 2013 1650 djohnson Initial creation
|
||||
* Jun 20, 2013 1802 djohnson Allow test code to set an explicit event bus handler.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -41,6 +42,18 @@ import com.raytheon.uf.common.stats.ProcessEvent;
|
|||
|
||||
public class EventBusTest extends BaseEventBusTest<Event, IEventBusHandler> {
|
||||
|
||||
/**
|
||||
* Allows test code to use explicit event bus handlers if the default does
|
||||
* not suffice.
|
||||
*
|
||||
* @param eventBusHandler
|
||||
* the event bus handler
|
||||
*/
|
||||
public static void useExplicitEventBusHandler(
|
||||
IEventBusHandler eventBusHandler) {
|
||||
EventBus.handler = eventBusHandler;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,206 @@
|
|||
/**
|
||||
* This software was developed and / or modified by Raytheon Company,
|
||||
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
||||
*
|
||||
* U.S. EXPORT CONTROLLED TECHNICAL DATA
|
||||
* This software product contains export-restricted data whose
|
||||
* export/transfer/disclosure is restricted by U.S. law. Dissemination
|
||||
* to non-U.S. persons whether in the United States or abroad requires
|
||||
* an export license or other authorization.
|
||||
*
|
||||
* Contractor Name: Raytheon Company
|
||||
* Contractor Address: 6825 Pine Street, Suite 340
|
||||
* Mail Stop B8
|
||||
* Omaha, NE 68106
|
||||
* 402.291.0100
|
||||
*
|
||||
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||
* further licensing information.
|
||||
**/
|
||||
package com.raytheon.uf.common.event;
|
||||
|
||||
import static com.raytheon.uf.common.event.TransactionalEventSendingService.OUTER_TRANSACTION_EVENT_MESSAGE;
|
||||
import static com.raytheon.uf.common.event.TransactionalEventSendingService.REQUIRES_NEW_TRANSACTION_EVENT_MESSAGE;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.hibernate.HibernateException;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.eventbus.Subscribe;
|
||||
import com.raytheon.uf.common.stats.ProcessEvent;
|
||||
import com.raytheon.uf.common.util.SpringFiles;
|
||||
|
||||
/**
|
||||
* Test {@link EventBus} transactional behavior.
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
* SOFTWARE HISTORY
|
||||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Jun 20, 2013 1802 djohnson Initial creation
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
* @author djohnson
|
||||
* @version 1.0
|
||||
*/
|
||||
@RunWith(SpringJUnit4ClassRunner.class)
|
||||
@ContextConfiguration(locations = { SpringFiles.UNIT_TEST_DB_BEANS_XML,
|
||||
EventBusTransactionBehaviorTest.SPRING_FILE })
|
||||
public class EventBusTransactionBehaviorTest {
|
||||
|
||||
protected static final String SPRING_FILE = "eventBusTransactionBehaviorTest.xml";
|
||||
|
||||
@Autowired
|
||||
private TransactionalEventSendingService service;
|
||||
|
||||
private final List<ProcessEvent> eventsReceived = Lists.newArrayList();
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
EventBus.register(this);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
EventBus.unregister(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that events published in an outer transaction are not made
|
||||
* available if that transaction is rolled back, even if a new transaction
|
||||
* is started in the meantime and commits successfully.
|
||||
*/
|
||||
@Test
|
||||
public void rolledBackOuterTransactionEventsAreNotPublishedWhenRequiresNewTransactionCommits() {
|
||||
try {
|
||||
service.sendEventFromTransactionThenInvokeRequiresNewTransactionAndRollbackOriginal();
|
||||
|
||||
fail("A HibernateException should have been thrown!");
|
||||
} catch (HibernateException ex) {
|
||||
// Expected path
|
||||
}
|
||||
|
||||
boolean foundOuterTransactionEventMessage = findProcessEventWithMessage(OUTER_TRANSACTION_EVENT_MESSAGE);
|
||||
|
||||
assertThat(
|
||||
"Should not have received the event sent during a transaction that rolled back!",
|
||||
foundOuterTransactionEventMessage, is(false));
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that events published in an outer transaction are not made
|
||||
* available if that transaction is rolled back, even if a new transaction
|
||||
* is started in the meantime and commits successfully.
|
||||
*/
|
||||
@Test
|
||||
public void committedOuterTransactionEventsArePublishedWhenTransactionCommits() {
|
||||
service.sendEventFromTransactionThenInvokeRequiresNewTransactionAndCommitOriginal();
|
||||
|
||||
boolean foundOuterTransactionEventMessage = findProcessEventWithMessage(OUTER_TRANSACTION_EVENT_MESSAGE);
|
||||
|
||||
assertThat(
|
||||
"Should have received the event sent during a transaction that committed!",
|
||||
foundOuterTransactionEventMessage, is(true));
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that events published in an outer transaction are published when it
|
||||
* is committed, even if a requires new transaction is rolled back.
|
||||
*/
|
||||
@Test
|
||||
public void committedOuterTransactionEventsArePublishedWhenTransactionCommitsAndRequiresNewTransactionRollsBack() {
|
||||
service.sendEventFromTransactionThenInvokeRequiresNewTransactionThatRollsBackAndCommitOriginal();
|
||||
|
||||
boolean foundRequiresNewEventMessage = findProcessEventWithMessage(OUTER_TRANSACTION_EVENT_MESSAGE);
|
||||
|
||||
assertThat(
|
||||
"Should have received the event sent during a transaction that committed, even when the requires new transaction inside of it rolls back!",
|
||||
foundRequiresNewEventMessage, is(true));
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that events published in a requires new transaction are made
|
||||
* available if that transaction is committed, even if the outer transaction
|
||||
* is rolled back.
|
||||
*/
|
||||
@Test
|
||||
public void requiresNewTransactionEventsArePublishedWhenOuterTransactionRollsBack() {
|
||||
try {
|
||||
service.sendEventFromTransactionThenInvokeRequiresNewTransactionAndRollbackOriginal();
|
||||
|
||||
fail("A HibernateException should have been thrown!");
|
||||
} catch (HibernateException ex) {
|
||||
// Expected path
|
||||
}
|
||||
|
||||
boolean foundRequiresNewEventMessage = findProcessEventWithMessage(REQUIRES_NEW_TRANSACTION_EVENT_MESSAGE);
|
||||
|
||||
assertThat(
|
||||
"Should have received the event sent during a requires new transaction when the outer transaction rolls back!",
|
||||
foundRequiresNewEventMessage, is(true));
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that events published in a requires new transaction are made
|
||||
* available if that transaction is committed, and the outer transaction is
|
||||
* committed as well.
|
||||
*/
|
||||
@Test
|
||||
public void requiresNewTransactionEventsArePublishedWhenOuterTransactionCommits() {
|
||||
service.sendEventFromTransactionThenInvokeRequiresNewTransactionAndCommitOriginal();
|
||||
|
||||
boolean foundRequiresNewEventMessage = findProcessEventWithMessage(REQUIRES_NEW_TRANSACTION_EVENT_MESSAGE);
|
||||
|
||||
assertThat(
|
||||
"Should have received the event sent during a requires new transaction when the outer transaction rolls back!",
|
||||
foundRequiresNewEventMessage, is(true));
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that events published in a requires new transaction are NOT made
|
||||
* available if that transaction is rolled back, even if the outer
|
||||
* transaction is committed.
|
||||
*/
|
||||
@Test
|
||||
public void requiresNewTransactionEventsAreNotPublishedWhenItRollsBackAndOuterTransactionCommits() {
|
||||
service.sendEventFromTransactionThenInvokeRequiresNewTransactionThatRollsBackAndCommitOriginal();
|
||||
|
||||
boolean foundRequiresNewEventMessage = findProcessEventWithMessage(REQUIRES_NEW_TRANSACTION_EVENT_MESSAGE);
|
||||
|
||||
assertThat(
|
||||
"Should NOT have received the event sent during a requires new transaction when it rolls back!",
|
||||
foundRequiresNewEventMessage, is(false));
|
||||
}
|
||||
|
||||
/**
|
||||
* @param outerTransactionEventMessage
|
||||
* @return
|
||||
*/
|
||||
private boolean findProcessEventWithMessage(String message) {
|
||||
for (ProcessEvent event : eventsReceived) {
|
||||
if (message.equals(event.getMessage())) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Subscribe
|
||||
public void receivedEvent(ProcessEvent event) {
|
||||
eventsReceived.add(event);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,163 @@
|
|||
/**
|
||||
* This software was developed and / or modified by Raytheon Company,
|
||||
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
||||
*
|
||||
* U.S. EXPORT CONTROLLED TECHNICAL DATA
|
||||
* This software product contains export-restricted data whose
|
||||
* export/transfer/disclosure is restricted by U.S. law. Dissemination
|
||||
* to non-U.S. persons whether in the United States or abroad requires
|
||||
* an export license or other authorization.
|
||||
*
|
||||
* Contractor Name: Raytheon Company
|
||||
* Contractor Address: 6825 Pine Street, Suite 340
|
||||
* Mail Stop B8
|
||||
* Omaha, NE 68106
|
||||
* 402.291.0100
|
||||
*
|
||||
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||
* further licensing information.
|
||||
**/
|
||||
package com.raytheon.uf.common.event;
|
||||
|
||||
import org.hibernate.HibernateException;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationContextAware;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Propagation;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import com.raytheon.uf.common.stats.ProcessEvent;
|
||||
|
||||
/**
|
||||
* Provides methods providing several different forms of transaction behavior,
|
||||
* which allows testing of various transaction issues with {@link Event}s.
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
* SOFTWARE HISTORY
|
||||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Jun 20, 2013 1802 djohnson Initial creation
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
* @author djohnson
|
||||
* @version 1.0
|
||||
*/
|
||||
@Ignore
|
||||
@Service
|
||||
@Transactional
|
||||
public class TransactionalEventSendingService implements
|
||||
ApplicationContextAware {
|
||||
|
||||
public static final String OUTER_TRANSACTION_EVENT_MESSAGE = "this event occurs in the outer transaction";
|
||||
|
||||
public static final String REQUIRES_NEW_TRANSACTION_EVENT_MESSAGE = "this event occurs in the requires new transaction";
|
||||
|
||||
private ApplicationContext applicationContext;
|
||||
|
||||
/**
|
||||
* Sends a {@link ProcessEvent}, invokes a requires new transaction, and
|
||||
* rolls back the original transaction which means the event should NOT be
|
||||
* published.
|
||||
*
|
||||
* @throws HibernateException
|
||||
* every time
|
||||
*/
|
||||
public void sendEventFromTransactionThenInvokeRequiresNewTransactionAndRollbackOriginal()
|
||||
throws HibernateException {
|
||||
// The event sent from this method should never be received
|
||||
sendOuterTransactionEvent();
|
||||
|
||||
applicationContext.getBean(TransactionalEventSendingService.class)
|
||||
.requiresNewTransactionSendsEvent();
|
||||
|
||||
throw new HibernateException("Rolling back transaction");
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a {@link ProcessEvent}, invokes a requires new transaction, and
|
||||
* commits the original transaction which means the event should be
|
||||
* published.
|
||||
*/
|
||||
public void sendEventFromTransactionThenInvokeRequiresNewTransactionAndCommitOriginal() {
|
||||
sendOuterTransactionEvent();
|
||||
|
||||
applicationContext.getBean(TransactionalEventSendingService.class)
|
||||
.requiresNewTransactionSendsEvent();
|
||||
|
||||
// No exception thrown, will commit successfully
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a {@link ProcessEvent}, invokes a requires new transaction that
|
||||
* rolls back, and commits the original transaction which means the event
|
||||
* should be published but not the requires new transaction event.
|
||||
*
|
||||
* @throws IllegalStateException
|
||||
* if the requires new transaction does not throw an exception
|
||||
* and rollback
|
||||
*/
|
||||
@Test
|
||||
public void sendEventFromTransactionThenInvokeRequiresNewTransactionThatRollsBackAndCommitOriginal()
|
||||
throws IllegalStateException {
|
||||
sendOuterTransactionEvent();
|
||||
|
||||
try {
|
||||
applicationContext.getBean(TransactionalEventSendingService.class)
|
||||
.requiresNewTransactionSendsEventThatRollsBack();
|
||||
|
||||
throw new IllegalStateException(
|
||||
"Expected the requires new transaction to throw a HibernateException!");
|
||||
} catch (HibernateException e) {
|
||||
// Expected path
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts a new transaction, sends an event, and rolls back the transaction
|
||||
* by throwing an exception.
|
||||
*
|
||||
* @throws {@link HibernateException} every time
|
||||
*/
|
||||
@Transactional(propagation = Propagation.REQUIRES_NEW)
|
||||
public void requiresNewTransactionSendsEventThatRollsBack()
|
||||
throws HibernateException {
|
||||
final ProcessEvent event = new ProcessEvent();
|
||||
event.setMessage(REQUIRES_NEW_TRANSACTION_EVENT_MESSAGE);
|
||||
EventBus.publish(event);
|
||||
|
||||
throw new HibernateException(
|
||||
"this requires new transaction will roll back");
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts a new transaction, sends an event, and commits the transaction.
|
||||
*/
|
||||
@Transactional(propagation = Propagation.REQUIRES_NEW)
|
||||
public void requiresNewTransactionSendsEvent() {
|
||||
final ProcessEvent event = new ProcessEvent();
|
||||
event.setMessage(REQUIRES_NEW_TRANSACTION_EVENT_MESSAGE);
|
||||
EventBus.publish(event);
|
||||
}
|
||||
|
||||
private void sendOuterTransactionEvent() {
|
||||
final ProcessEvent event = new ProcessEvent();
|
||||
event.setMessage(OUTER_TRANSACTION_EVENT_MESSAGE);
|
||||
EventBus.publish(event);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public void setApplicationContext(ApplicationContext applicationContext)
|
||||
throws BeansException {
|
||||
this.applicationContext = applicationContext;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,10 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<beans xmlns="http://www.springframework.org/schema/beans"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://www.springframework.org/schema/beans
|
||||
http://www.springframework.org/schema/beans/spring-beans.xsd">
|
||||
|
||||
<bean id="transactionService" class="com.raytheon.uf.common.event.TransactionalEventSendingService" />
|
||||
|
||||
|
||||
</beans>
|
|
@ -22,10 +22,14 @@ package com.raytheon.uf.edex.event;
|
|||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.junit.Ignore;
|
||||
|
||||
import com.google.common.eventbus.EventBus;
|
||||
import com.raytheon.uf.common.event.IEventBusHandler;
|
||||
|
||||
/**
|
||||
* Test event bus handler.
|
||||
* {@link IEventBusHandler} which uses a synchronous {@link EventBus} and does
|
||||
* not participate in transactions.
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
|
@ -35,15 +39,16 @@ import com.google.common.eventbus.EventBus;
|
|||
* ------------ ---------- ----------- --------------------------
|
||||
* Feb 5, 2013 1580 mpduff Initial creation
|
||||
* May 28, 2013 1650 djohnson Add getEventBuses.
|
||||
* Jun 20, 2013 1802 djohnson Explicitly denote as non-transactional.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
* @author mpduff
|
||||
* @version 1.0
|
||||
*/
|
||||
|
||||
public class TestEventBusHandler extends EdexEventBusHandler {
|
||||
private static class SynchronousEventBusFactory implements
|
||||
@Ignore
|
||||
public class NonTransactionalSynchronousEventBusHandler extends EdexEventBusHandler {
|
||||
static class SynchronousEventBusFactory implements
|
||||
GoogleEventBusFactory {
|
||||
@Override
|
||||
public List<EventBus> getEventBuses() {
|
||||
|
@ -53,13 +58,13 @@ public class TestEventBusHandler extends EdexEventBusHandler {
|
|||
}
|
||||
}
|
||||
|
||||
public TestEventBusHandler() {
|
||||
public NonTransactionalSynchronousEventBusHandler() {
|
||||
super(new SynchronousEventBusFactory());
|
||||
}
|
||||
|
||||
/**
|
||||
* Overridden to return false, because the transaction semantics are
|
||||
* different with tests.
|
||||
* Overridden to return false, which will force events to be sent
|
||||
* immediately regardless of transaction status.
|
||||
*/
|
||||
@Override
|
||||
protected boolean isTransactionActive() {
|
|
@ -0,0 +1,52 @@
|
|||
/**
|
||||
* This software was developed and / or modified by Raytheon Company,
|
||||
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
||||
*
|
||||
* U.S. EXPORT CONTROLLED TECHNICAL DATA
|
||||
* This software product contains export-restricted data whose
|
||||
* export/transfer/disclosure is restricted by U.S. law. Dissemination
|
||||
* to non-U.S. persons whether in the United States or abroad requires
|
||||
* an export license or other authorization.
|
||||
*
|
||||
* Contractor Name: Raytheon Company
|
||||
* Contractor Address: 6825 Pine Street, Suite 340
|
||||
* Mail Stop B8
|
||||
* Omaha, NE 68106
|
||||
* 402.291.0100
|
||||
*
|
||||
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||
* further licensing information.
|
||||
**/
|
||||
package com.raytheon.uf.edex.event;
|
||||
|
||||
import org.junit.Ignore;
|
||||
|
||||
import com.google.common.eventbus.EventBus;
|
||||
import com.raytheon.uf.common.event.IEventBusHandler;
|
||||
import com.raytheon.uf.edex.event.NonTransactionalSynchronousEventBusHandler.SynchronousEventBusFactory;
|
||||
|
||||
/**
|
||||
* {@link IEventBusHandler} which uses a synchronous {@link EventBus} that
|
||||
* participates in transactions.
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
* SOFTWARE HISTORY
|
||||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Jun 20, 2013 1802 djohnson Initial creation.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
* @author mpduff
|
||||
* @version 1.0
|
||||
*/
|
||||
@Ignore
|
||||
public class TransactionalSynchronousEventBusHandler extends EdexEventBusHandler {
|
||||
|
||||
public TransactionalSynchronousEventBusHandler() {
|
||||
super(new SynchronousEventBusFactory());
|
||||
}
|
||||
|
||||
}
|
Loading…
Add table
Reference in a new issue