Issue #1642: Fix QPID Deadlock scenario

- Address peer review comments

Change-Id: I3c52fd57f762a762d54fdce3651c7f17a8fc6462

Former-commit-id: 2071d4f3cc54a3da43443bc71cecf038736194d8
This commit is contained in:
Richard Peter 2013-02-25 17:44:18 -06:00
parent 9cf3ca07b7
commit 9a0177861f
10 changed files with 307 additions and 331 deletions

View file

@ -35,7 +35,16 @@ import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
/**
* TODO Add Description
* Jms Pooled Connection. Tracks references to the connection to know when the
* connection can be released to the pool. Any exception will close pooled
* session instead of returning to the pool. The sessions are tracked in both
* active and available states. An available session can be reused by the next
* client.
*
* Synchronization Principle To prevent deadlocks: Chained sync blocks can only
* happen in a downward direction. A manager has a synchronized lock can make a
* call down to a wrapper, but not nice versa. Also a session inside a sync
* block can make a call down to a producer but not vice versa.
*
* <pre>
*
@ -44,8 +53,8 @@ import com.raytheon.uf.common.status.UFStatus.Priority;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Apr 15, 2011 rjpeter Initial creation
* Mar 08, 2012 194 njensen Improved safety of close()
*
* Mar 08, 2012 194 njensen Improved safety of close()
* Feb 21, 2013 1642 rjpeter Fix deadlock scenario
* </pre>
*
* @author rjpeter
@ -57,35 +66,33 @@ public class JmsPooledConnection implements ExceptionListener {
private final IUFStatusHandler statusHandler = UFStatus
.getHandler(JmsPooledConnection.class);
private JmsPooledConnectionFactory connFactory = null;
private final JmsPooledConnectionFactory connFactory;
private Connection conn = null;
private volatile Connection conn = null;
// keeps track of number of creates vs. closes to know when it can be
// returned to the pool
List<JmsConnectionWrapper> references = new ArrayList<JmsConnectionWrapper>(
private final List<JmsConnectionWrapper> references = new ArrayList<JmsConnectionWrapper>(
1);
private Object stateLock = new Object();
private final Object stateLock = new Object();
private State state = State.InUse;
private volatile State state = State.InUse;
// technically can have multiple sessions to one connection and sessions can
// differ by transaction mode and acknowledgement mode, currently not
// supported
private JmsPooledSession session = null;
private volatile JmsPooledSession session = null;
private AvailableJmsPooledObject<JmsPooledSession> availableSession = null;
private volatile AvailableJmsPooledObject<JmsPooledSession> availableSession = null;
private String key = null;
private volatile String key = null;
private String clientId = null;
private final String clientId;
private long connectionStartTime = 0;
private volatile long connectionStartTime = 0;
private boolean exceptionOccurred = false;
private Throwable trappedExc;
private volatile boolean exceptionOccurred = false;
public JmsPooledConnection(JmsPooledConnectionFactory connFactory) {
this.connFactory = connFactory;
@ -101,7 +108,7 @@ public class JmsPooledConnection implements ExceptionListener {
throw new IllegalStateException("Connection is closed");
}
synchronized (this) {
synchronized (stateLock) {
// TODO: Add multiple session support
if (session != null) {
JmsSessionWrapper ref = session.createReference();
@ -112,6 +119,7 @@ public class JmsPooledConnection implements ExceptionListener {
this.session = null;
}
}
if (availableSession != null) {
JmsPooledSession availSess = availableSession.getPooledObject();
synchronized (availSess.getStateLock()) {
@ -130,6 +138,7 @@ public class JmsPooledConnection implements ExceptionListener {
transacted, acknowledgeMode));
}
}
return session.createReference();
}
@ -160,64 +169,61 @@ public class JmsPooledConnection implements ExceptionListener {
public void onException(JMSException jmsExc) {
// need to worry about multiple exceptions causing repeated
// disconnect/reconnect
statusHandler.handle(Priority.INFO, "Caught Exception on "
statusHandler.handle(Priority.WARN, "Caught Exception on "
+ connFactory.getProvider()
+ " connection. Closing connection", jmsExc);
close();
}
public void close() {
// synchronize on the connection to avoid deadlock conditions in qpid
boolean canClose = false;
synchronized (stateLock) {
if (!State.Closed.equals(state)) {
canClose = true;
state = State.Closed;
if (trappedExc != null) {
statusHandler.handle(Priority.INFO,
"Trapped internal exception", trappedExc);
}
}
}
if (canClose) {
synchronized (this) {
// njensen: I moved removing the connection from the pool to be
// the first thing in this block instead of last thing so
// there's no chance it could be closed and then retrieved from
// the pool by something else
connFactory.removeConnectionFromPool(this);
if (conn != null) {
try {
conn.stop();
} catch (Exception e) {
statusHandler.handle(Priority.INFO,
"Failed to stop connection", e);
}
}
synchronized (references) {
for (JmsConnectionWrapper wrapper : references) {
wrapper.closeInternal();
}
references.clear();
}
if (session != null) {
session.close();
session = null;
}
// njensen: I moved removing the connection from the pool to be
// the first thing in this block instead of last thing so
// there's no chance it could be closed and then retrieved from
// the pool by something else
connFactory.removeConnectionFromPool(this);
if (conn != null) {
try {
conn.close();
conn.stop();
} catch (Exception e) {
statusHandler.handle(Priority.INFO,
"Failed to close connection " + conn, e);
statusHandler.handle(Priority.WARN,
"Failed to stop connection", e);
}
}
conn = null;
synchronized (references) {
for (JmsConnectionWrapper wrapper : references) {
wrapper.closeWrapper();
}
references.clear();
}
if (session != null) {
session.close();
session = null;
}
if (availableSession != null) {
availableSession.getPooledObject().close();
availableSession = null;
}
try {
conn.close();
} catch (Exception e) {
statusHandler.handle(Priority.WARN,
"Failed to close connection " + conn, e);
}
}
conn = null;
}
/**
@ -228,8 +234,13 @@ public class JmsPooledConnection implements ExceptionListener {
* @return
*/
public int closeOldPooledResources(int resourceRetention) {
if (!isValid()) {
return 0;
}
int count = 0;
JmsPooledSession sessionToCheck = null;
synchronized (stateLock) {
if (session != null) {
sessionToCheck = session;
@ -265,8 +276,12 @@ public class JmsPooledConnection implements ExceptionListener {
}
public Connection getConnection() {
// lazy initialized so that we can handle reconnect and logging since
// spring reconnect doesn't give any status information and appears to
// the user that the process is hung
if (conn == null) {
synchronized (this) {
// safe since conn is volatile
synchronized (stateLock) {
if (conn == null) {
long exceptionLastHandled = 0;
boolean connected = false;
@ -345,12 +360,12 @@ public class JmsPooledConnection implements ExceptionListener {
if (!valid) {
removeSession(sess);
} else {
synchronized (this) {
synchronized (stateLock) {
// should only be able to have 1 session
if (availableSession != null) {
availableSession.getPooledObject().close();
statusHandler
.warn("Pooled session already existed for this connection, closing previous session");
availableSession.getPooledObject().close();
}
availableSession = new AvailableJmsPooledObject<JmsPooledSession>(
session);
@ -361,13 +376,19 @@ public class JmsPooledConnection implements ExceptionListener {
return valid;
}
/**
* Removes this pooled session from the pooled connection. Does NOT close
* the session, this should be handled independently.
*
* @param sess
*/
public void removeSession(JmsPooledSession sess) {
synchronized (this) {
synchronized (stateLock) {
if (sess != null) {
if (this.session == sess) {
this.session = null;
} else if (availableSession != null
&& availableSession.getPooledObject() == sess) {
} else if ((availableSession != null)
&& (availableSession.getPooledObject() == sess)) {
this.availableSession = null;
}
}
@ -378,7 +399,11 @@ public class JmsPooledConnection implements ExceptionListener {
synchronized (stateLock) {
if (isValid(State.InUse, true)) {
JmsConnectionWrapper wrapper = new JmsConnectionWrapper(this);
references.add(wrapper);
synchronized (references) {
references.add(wrapper);
}
return wrapper;
}
}
@ -433,30 +458,35 @@ public class JmsPooledConnection implements ExceptionListener {
public void removeReference(JmsConnectionWrapper wrapper) {
boolean returnToPool = false;
synchronized (stateLock) {
if (references.remove(wrapper) && references.isEmpty()
&& State.InUse.equals(state)) {
state = State.Available;
returnToPool = true;
synchronized (references) {
if (references.remove(wrapper) && references.isEmpty()
&& State.InUse.equals(state)) {
state = State.Available;
returnToPool = true;
// double check state of session, should be available
if (session != null) {
statusHandler
.warn("Connection marked available, but Session not Available. Sessions state is: "
+ session.getState());
session.close();
session = null;
}
if (availableSession != null) {
JmsPooledSession availSess = availableSession
.getPooledObject();
if (availSess != null
&& !State.Available.equals(availSess.getState())) {
// double check state of session, should be available
if (session != null) {
statusHandler
.warn("Connection marked available, but Session not Available. Sessions state is: "
+ availSess.getState());
availSess.close();
availableSession = null;
+ session.getState());
session.close();
session = null;
}
if (availableSession != null) {
JmsPooledSession availSess = availableSession
.getPooledObject();
synchronized (availSess.getStateLock()) {
if ((availSess != null)
&& !State.Available.equals(availSess
.getState())) {
statusHandler
.warn("Connection marked available, but Session not Available. Sessions state is: "
+ availSess.getState());
availSess.close();
availableSession = null;
}
}
}
}
}

View file

@ -1,6 +1,7 @@
package com.raytheon.uf.common.jms;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@ -63,16 +64,16 @@ public class JmsPooledConnectionFactory implements ConnectionFactory {
private String provider = "QPID";
// connections in use, key is "threadId-threadName"
private Map<String, JmsPooledConnection> inUseConnections = new HashMap<String, JmsPooledConnection>();
private final Map<String, JmsPooledConnection> inUseConnections = new HashMap<String, JmsPooledConnection>();
// connections that were recently returned, key is "threadId-threadName"
private Map<String, AvailableJmsPooledObject<JmsPooledConnection>> pendingConnections = new HashMap<String, AvailableJmsPooledObject<JmsPooledConnection>>();
private final Map<String, AvailableJmsPooledObject<JmsPooledConnection>> pendingConnections = new HashMap<String, AvailableJmsPooledObject<JmsPooledConnection>>();
// connections that have been released from pendingConnections and are
// awaiting being closed.
private LinkedList<AvailableJmsPooledObject<JmsPooledConnection>> availableConnections = new LinkedList<AvailableJmsPooledObject<JmsPooledConnection>>();
private final Deque<AvailableJmsPooledObject<JmsPooledConnection>> availableConnections = new LinkedList<AvailableJmsPooledObject<JmsPooledConnection>>();
private ConcurrentLinkedQueue<JmsPooledConnection> deadConnections = new ConcurrentLinkedQueue<JmsPooledConnection>();
private final ConcurrentLinkedQueue<JmsPooledConnection> deadConnections = new ConcurrentLinkedQueue<JmsPooledConnection>();
private int reconnectInterval = 30000;
@ -350,13 +351,13 @@ public class JmsPooledConnectionFactory implements ConnectionFactory {
}
}
for (AvailableJmsPooledObject<JmsPooledConnection> wrapper : connectionsToProcess) {
wrapper.reset();
// putting to available pool
JmsPooledConnection conn = wrapper.getPooledObject();
conn.setKey(null);
synchronized (availableConnections) {
for (AvailableJmsPooledObject<JmsPooledConnection> wrapper : connectionsToProcess) {
wrapper.reset();
synchronized (availableConnections) {
// putting to available pool
JmsPooledConnection conn = wrapper.getPooledObject();
conn.setKey(null);
availableConnections.add(wrapper);
}
}
@ -372,10 +373,9 @@ public class JmsPooledConnectionFactory implements ConnectionFactory {
// available sessions added based on time, so oldest is front of
// queue
if (wrapper.expired(curTime, connectionHoldTime)
|| availableConnections.size() > maxSpareConnections) {
|| (availableConnections.size() > maxSpareConnections)) {
// not immediately closing connection so that we minimize
// time
// in sync block
// time in sync block
deadConnections.add(wrapper.getPooledObject());
iter.remove();
} else {
@ -409,15 +409,17 @@ public class JmsPooledConnectionFactory implements ConnectionFactory {
connectionsToProcess = new ArrayList<AvailableJmsPooledObject<JmsPooledConnection>>(
pendingConnections.values());
}
synchronized (availableConnections) {
connectionsToProcess.addAll(availableConnections);
}
for (AvailableJmsPooledObject<JmsPooledConnection> wrapper : connectionsToProcess) {
resourcesClosed += wrapper.getPooledObject()
.closeOldPooledResources(resourceRetention);
}
if (connectionsClosed > 0 || resourcesClosed > 0) {
if ((connectionsClosed > 0) || (resourcesClosed > 0)) {
statusHandler.handle(
Priority.INFO,
"Closed unused jms pooled resources: connections closed: "

View file

@ -32,7 +32,14 @@ import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
/**
* TODO Add Description
* Jms Pooled Consumer. Tracks references to the consumers to know when consumer
* can be released to pool. Any exception will close pooled consumer instead of
* returning to pool.
*
* Synchronization Principle To prevent deadlocks: Chained sync blocks can only
* happen in a downward direction. A manager has a synchronized lock can make a
* call down to a wrapper, but not nice versa. Also a session inside a sync
* block can make a call down to a producer but not vice versa.
*
* <pre>
*
@ -41,8 +48,8 @@ import com.raytheon.uf.common.status.UFStatus.Priority;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Apr 18, 2011 rjpeter Initial creation
* Mar 08, 2012 194 njensen Improved logging
*
* Mar 08, 2012 194 njensen Improved logging
* Feb 26, 2013 1642 rjpeter Removed lazy initialization
* </pre>
*
* @author rjpeter
@ -55,34 +62,31 @@ public class JmsPooledConsumer {
private final JmsPooledSession sess;
private final Destination destination;
private final String messageSelector;
private MessageConsumer consumer;
private final MessageConsumer consumer;
private final String destKey;
private boolean exceptionOccurred = false;
private volatile boolean exceptionOccurred = false;
private Object stateLock = new Object();
private final Object stateLock = new Object();
private State state = State.InUse;
private volatile State state = State.InUse;
/**
* Technically a pooled consumer should only have 1 reference at a time.
* Bullet proofing in case 3rd party ever tries to get multiple consumers to
* the same destination.
*/
private List<JmsConsumerWrapper> references = new ArrayList<JmsConsumerWrapper>(
private final List<JmsConsumerWrapper> references = new ArrayList<JmsConsumerWrapper>(
1);
public JmsPooledConsumer(JmsPooledSession sess, String destKey,
Destination destination, String messageSelector) {
Destination destination, String messageSelector)
throws JMSException {
this.sess = sess;
this.destKey = destKey;
this.destination = destination;
this.messageSelector = messageSelector;
consumer = sess.getSession().createConsumer(destination,
messageSelector);
}
public String getDestKey() {
@ -151,7 +155,7 @@ public class JmsPooledConsumer {
close = true;
for (JmsConsumerWrapper wrapper : references) {
wrapper.closeInternal();
wrapper.closeWrapper();
}
references.clear();
@ -159,15 +163,12 @@ public class JmsPooledConsumer {
}
if (close) {
if (consumer != null) {
try {
statusHandler.info("Closing consumer " + consumer); // njensen
consumer.close();
} catch (Throwable e) {
statusHandler.handle(Priority.INFO,
"Failed to close consumer " + consumer, e);
}
consumer = null;
try {
statusHandler.info("Closing consumer " + consumer); // njensen
consumer.close();
} catch (Throwable e) {
statusHandler.handle(Priority.WARN, "Failed to close consumer "
+ consumer, e);
}
sess.removeConsumerFromPool(this);
@ -206,18 +207,6 @@ public class JmsPooledConsumer {
}
public MessageConsumer getConsumer() throws JMSException {
// TODO: allow this to automatically grab a new consumer if the current
// one fails, try up to 3 times so that we don't always drop messages on
// a single failure
if (consumer == null) {
synchronized (this) {
if (consumer == null) {
consumer = sess.getSession().createConsumer(destination,
messageSelector);
}
}
}
return consumer;
}

View file

@ -22,7 +22,6 @@ package com.raytheon.uf.common.jms;
import java.util.ArrayList;
import java.util.List;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
@ -32,8 +31,12 @@ import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
/**
* Jms Pooled Producer. Tracks references to the producers to know when the
* producer can be released to the pool. Any exception will close pooled
* producer instead of returning to the pool.
*
* Synchronization Principle To prevent deadlocks: Chained sync blocks can only
* happen in a doward direction. A manager has a synchonized lock can make a
* happen in a downward direction. A manager has a synchronized lock can make a
* call down to a wrapper, but not nice versa. Also a session inside a sync
* block can make a call down to a producer but not vice versa.
*
@ -44,8 +47,8 @@ import com.raytheon.uf.common.status.UFStatus.Priority;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Apr 18, 2011 rjpeter Initial creation
* Mar 08, 2012 194 njensen Improved logging
*
* Mar 08, 2012 194 njensen Improved logging
* Feb 26, 2013 1642 rjpeter Removed lazy initialization
* </pre>
*
* @author rjpeter
@ -58,31 +61,29 @@ public class JmsPooledProducer {
private final JmsPooledSession sess;
private final Destination destination;
private MessageProducer producer;
private final MessageProducer producer;
private final String destKey;
private boolean exceptionOccurred = false;
private volatile boolean exceptionOccurred = false;
private Object stateLock = new Object();
private final Object stateLock = new Object();
private State state = State.InUse;
private volatile State state = State.InUse;
/**
* Technically a pooled producer should only have 1 reference at a time.
* Bullet proofing in case 3rd party ever tries to get multiple producers to
* the same destination.
*/
private List<JmsProducerWrapper> references = new ArrayList<JmsProducerWrapper>(
private final List<JmsProducerWrapper> references = new ArrayList<JmsProducerWrapper>(
1);
public JmsPooledProducer(JmsPooledSession sess, String destKey,
Destination destination) {
MessageProducer producer) {
this.sess = sess;
this.destKey = destKey;
this.destination = destination;
this.producer = producer;
}
public String getDestKey() {
@ -151,7 +152,7 @@ public class JmsPooledProducer {
close = true;
for (JmsProducerWrapper wrapper : references) {
wrapper.closeInternal();
wrapper.closeWrapper();
}
references.clear();
@ -159,15 +160,12 @@ public class JmsPooledProducer {
}
if (close) {
if (producer != null) {
try {
statusHandler.info("Closing producer " + producer); // njensen
producer.close();
} catch (Throwable e) {
statusHandler.handle(Priority.INFO,
"Failed to close producer", e);
}
producer = null;
try {
statusHandler.info("Closing producer " + producer); // njensen
producer.close();
} catch (Throwable e) {
statusHandler.handle(Priority.WARN, "Failed to close producer",
e);
}
sess.removeProducerFromPool(this);
@ -207,17 +205,6 @@ public class JmsPooledProducer {
}
public MessageProducer getProducer() throws JMSException {
// TODO: allow this to automatically grab a new producer if the current
// one fails, try up to 3 times so that we don't always drop messages on
// a single failure
if (producer == null) {
synchronized (this) {
if (producer == null) {
producer = sess.getSession().createProducer(destination);
}
}
}
return producer;
}

View file

@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import javax.jms.Destination;
import javax.jms.JMSException;
@ -39,7 +40,16 @@ import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
/**
* TODO Add Description
* Jms Pooled Session. Tracks references to the session to know when the session
* can be released to the pool. Any exception will close pooled session instead
* of returning to the pool. The consumers/producers are tracked in both active
* and available states. An available consumer/producer can be reused by the
* next client.
*
* Synchronization Principle To prevent deadlocks: Chained sync blocks can only
* happen in a downward direction. A manager has a synchronized lock can make a
* call down to a wrapper, but not nice versa. Also a session inside a sync
* block can make a call down to a producer but not vice versa.
*
* <pre>
*
@ -48,8 +58,8 @@ import com.raytheon.uf.common.status.UFStatus.Priority;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Apr 15, 2011 rjpeter Initial creation
* Mar 08, 2012 194 njensen Improved logging
*
* Mar 08, 2012 194 njensen Improved logging
* Feb 21, 2013 1642 rjpeter Fix deadlock scenario
* </pre>
*
* @author rjpeter
@ -65,27 +75,28 @@ public class JmsPooledSession {
private final Session sess;
// The thread this session was most recently used by for tracking a pending
// session that is being reserved for a given thread.
private String threadKey;
private boolean exceptionOccurred = false;
private volatile boolean exceptionOccurred = false;
private Throwable trappedExc = null;
private final Object stateLock = new Object();
private Object stateLock = new Object();
private State state = State.InUse;
private volatile State state = State.InUse;
// keeps track of number of creates vs. closes to know when it can be
// returned to the pool
List<JmsSessionWrapper> references = new ArrayList<JmsSessionWrapper>(1);
private final List<JmsSessionWrapper> references = new ArrayList<JmsSessionWrapper>(
1);
private HashMap<String, AvailableJmsPooledObject<JmsPooledConsumer>> availableConsumers = new HashMap<String, AvailableJmsPooledObject<JmsPooledConsumer>>();
private final Map<String, AvailableJmsPooledObject<JmsPooledConsumer>> availableConsumers = new HashMap<String, AvailableJmsPooledObject<JmsPooledConsumer>>();
private HashMap<String, AvailableJmsPooledObject<JmsPooledProducer>> availableProducers = new HashMap<String, AvailableJmsPooledObject<JmsPooledProducer>>();
private final Map<String, AvailableJmsPooledObject<JmsPooledProducer>> availableProducers = new HashMap<String, AvailableJmsPooledObject<JmsPooledProducer>>();
private HashMap<String, JmsPooledConsumer> inUseConsumers = new HashMap<String, JmsPooledConsumer>();
private final Map<String, JmsPooledConsumer> inUseConsumers = new HashMap<String, JmsPooledConsumer>();
private HashMap<String, JmsPooledProducer> inUseProducers = new HashMap<String, JmsPooledProducer>();
private final Map<String, JmsPooledProducer> inUseProducers = new HashMap<String, JmsPooledProducer>();
public JmsPooledSession(JmsPooledConnection conn, Session sess) {
this.conn = conn;
@ -250,10 +261,12 @@ public class JmsPooledSession {
List<JmsPooledConsumer> consumersToClose = new ArrayList<JmsPooledConsumer>(
inUseConsumers.size() + availableConsumers.size());
synchronized (inUseConsumers) {
consumersToClose.addAll(inUseConsumers.values());
inUseConsumers.clear();
}
synchronized (availableConsumers) {
for (AvailableJmsPooledObject<JmsPooledConsumer> wrapper : availableConsumers
.values()) {
@ -288,7 +301,7 @@ public class JmsPooledSession {
// of time this is correct
success = inUse == producer;
if (!success && inUse != null) {
if (!success && (inUse != null)) {
// put the bad removal back in. Done this way instead of
// get/remove as 99% of time remove is correct, this
// really only here for bullet proofing code against bad
@ -340,7 +353,7 @@ public class JmsPooledSession {
JmsPooledProducer inUse = inUseProducers.remove(destKey);
removed = inUse == producer;
if (!removed && inUse != null) {
if (!removed && (inUse != null)) {
// put the bad removal back in. Done this way instead of
// get/remove as 95% of time remove is correct, this
// really only here for bullet proofing code against bad
@ -427,7 +440,7 @@ public class JmsPooledSession {
JmsPooledConsumer inUse = inUseConsumers.remove(destKey);
removed = inUse == consumer;
if (!removed && inUse != null) {
if (!removed && (inUse != null)) {
// put the bad removal back in. Done this way instead of
// get/remove as 95% of time remove is correct, this
// really only here for bullet proofing code against bad
@ -449,31 +462,22 @@ public class JmsPooledSession {
}
if (canClose) {
if (trappedExc != null) {
statusHandler.handle(Priority.INFO,
"Trapped internal exception", trappedExc);
}
closePooledConsumersProducers();
// need to close down all wrappers
for (JmsSessionWrapper wrapper : references) {
wrapper.closeInternal();
wrapper.closeWrapper();
}
references.clear();
conn.removeSession(this);
// synchronize on the connection to avoid deadlock conditions in
// qpid
synchronized (conn) {
try {
sess.close();
} catch (Exception e) {
statusHandler.handle(Priority.INFO,
"Failed to close session " + sess, e);
}
try {
sess.close();
} catch (Exception e) {
statusHandler.handle(Priority.WARN, "Failed to close session "
+ sess, e);
}
}
}
@ -603,7 +607,8 @@ public class JmsPooledSession {
}
if (producer == null) {
producer = new JmsPooledProducer(this, destKey, destination);
producer = new JmsPooledProducer(this, destKey,
sess.createProducer(destination));
}
inUseProducers.put(destKey, producer);

View file

@ -20,7 +20,7 @@
package com.raytheon.uf.common.jms;
/**
* TODO Add Description
* State of Jms Pooled object.
*
* <pre>
*

View file

@ -36,7 +36,9 @@ import javax.jms.Topic;
import com.raytheon.uf.common.jms.JmsPooledConnection;
/**
* TODO Add Description
* Wrapper class for jms connection pooling. Tracks wrapped sessions created
* from this wrapped connection to know when the connection can be returned to
* the pool.
*
* <pre>
*
@ -45,7 +47,7 @@ import com.raytheon.uf.common.jms.JmsPooledConnection;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Apr 15, 2011 rjpeter Initial creation
*
* Feb 21, 2013 1642 rjpeter Added volatile references for better concurrency handling.
* </pre>
*
* @author rjpeter
@ -53,18 +55,16 @@ import com.raytheon.uf.common.jms.JmsPooledConnection;
*/
public class JmsConnectionWrapper implements Connection {
private JmsPooledConnection mgr = null;
private final JmsPooledConnection mgr;
private boolean closed = false;
private volatile boolean closed = false;
private boolean exceptionOccurred = false;
private volatile boolean exceptionOccurred = false;
private Throwable trappedExc = null;
private List<JmsSessionWrapper> sessions = new ArrayList<JmsSessionWrapper>(
private final List<JmsSessionWrapper> sessions = new ArrayList<JmsSessionWrapper>(
1);
private String clientId = null;
private final String clientId = null;
public JmsConnectionWrapper(JmsPooledConnection mgr) {
this.mgr = mgr;
@ -76,20 +76,16 @@ public class JmsConnectionWrapper implements Connection {
*
* @return True if this wrapper hasn't been closed before, false otherwise.
*/
public boolean closeInternal() {
public boolean closeWrapper() {
synchronized (this) {
if (!closed) {
closed = true;
if (sessions != null) {
for (JmsSessionWrapper session : sessions) {
try {
session.close();
} catch (JMSException e) {
}
for (JmsSessionWrapper session : sessions) {
try {
session.close();
} catch (JMSException e) {
// closing of wrapper doesn't throw an exception
}
sessions = null;
}
if (exceptionOccurred) {
@ -109,7 +105,7 @@ public class JmsConnectionWrapper implements Connection {
*/
@Override
public void close() throws JMSException {
if (closeInternal()) {
if (closeWrapper()) {
// remove this wrapper from the manager
mgr.removeReference(this);
@ -169,9 +165,6 @@ public class JmsConnectionWrapper implements Connection {
JmsSessionWrapper session = mgr.getSession(transacted,
acknowledgeMode);
if (session != null) {
if (sessions == null) {
sessions = new ArrayList<JmsSessionWrapper>(1);
}
sessions.add(session);
} else {
throw new IllegalStateException("Underlying session is closed");
@ -179,7 +172,6 @@ public class JmsConnectionWrapper implements Connection {
return session;
} catch (Throwable e) {
exceptionOccurred = true;
trappedExc = e;
JMSException exc = new JMSException(
"Exception occurred on pooled connection");
exc.initCause(e);
@ -220,7 +212,6 @@ public class JmsConnectionWrapper implements Connection {
return conn.getMetaData();
} catch (Throwable e) {
exceptionOccurred = true;
trappedExc = e;
JMSException exc = new JMSException(
"Exception occurred on pooled connection");
exc.initCause(e);

View file

@ -28,7 +28,8 @@ import javax.jms.MessageListener;
import com.raytheon.uf.common.jms.JmsPooledConsumer;
/**
* TODO Add Description
* Wrapper class for jms consumer pooling. Helps track references to the pooled
* consumer to know when the consumer can be closed.
*
* <pre>
*
@ -37,7 +38,7 @@ import com.raytheon.uf.common.jms.JmsPooledConsumer;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Apr 18, 2011 rjpeter Initial creation
*
* Feb 26, 2013 1642 rjpeter Added volatile references for better concurrency handling.
* </pre>
*
* @author rjpeter
@ -45,11 +46,11 @@ import com.raytheon.uf.common.jms.JmsPooledConsumer;
*/
public class JmsConsumerWrapper implements MessageConsumer {
private JmsPooledConsumer mgr = null;
private final JmsPooledConsumer mgr;
private boolean exceptionOccurred = false;
private volatile boolean exceptionOccurred = false;
private boolean closed = false;
private volatile boolean closed = false;
public JmsConsumerWrapper(JmsPooledConsumer mgr) {
this.mgr = mgr;
@ -74,21 +75,20 @@ public class JmsConsumerWrapper implements MessageConsumer {
*
* @return True if this wrapper hasn't been closed before, false otherwise.
*/
public boolean closeInternal() {
boolean close = false;
public boolean closeWrapper() {
synchronized (this) {
if (!closed) {
closed = true;
close = true;
if (exceptionOccurred) {
mgr.setExceptionOccurred(true);
}
return true;
}
}
if (close && exceptionOccurred) {
mgr.setExceptionOccurred(true);
}
return close;
return false;
}
/*
@ -101,7 +101,7 @@ public class JmsConsumerWrapper implements MessageConsumer {
*/
@Override
public void close() throws JMSException {
if (closeInternal()) {
if (closeWrapper()) {
mgr.removeReference(this);
if (exceptionOccurred) {

View file

@ -28,7 +28,8 @@ import javax.jms.MessageProducer;
import com.raytheon.uf.common.jms.JmsPooledProducer;
/**
* TODO Add Description
* Wrapper class for jms producer pooling. Helps track references to the pooled
* producer to know when the producer can be closed.
*
* <pre>
*
@ -36,8 +37,8 @@ import com.raytheon.uf.common.jms.JmsPooledProducer;
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Dec 8, 2011 rjpeter Initial creation
*
* Dec fi8, 2011 rjpeter Initial creation
* Feb 26, 2013 1642 rjpeter Added volatile references for better concurrency handling.
* </pre>
*
* @author rjpeter
@ -45,11 +46,11 @@ import com.raytheon.uf.common.jms.JmsPooledProducer;
*/
public class JmsProducerWrapper implements MessageProducer {
private JmsPooledProducer mgr = null;
private final JmsPooledProducer mgr;
private boolean exceptionOccurred = false;
private volatile boolean exceptionOccurred = false;
private boolean closed = false;
private volatile boolean closed = false;
public JmsProducerWrapper(JmsPooledProducer mgr) {
this.mgr = mgr;
@ -61,21 +62,18 @@ public class JmsProducerWrapper implements MessageProducer {
*
* @return True if this wrapper hasn't been closed before, false otherwise.
*/
public boolean closeInternal() {
boolean close = false;
public boolean closeWrapper() {
synchronized (this) {
if (!closed) {
closed = true;
close = true;
if (exceptionOccurred) {
mgr.setExceptionOccurred(true);
}
}
}
if (close && exceptionOccurred) {
mgr.setExceptionOccurred(true);
}
return close;
return false;
}
/*
@ -88,7 +86,7 @@ public class JmsProducerWrapper implements MessageProducer {
*/
@Override
public void close() throws JMSException {
if (closeInternal()) {
if (closeWrapper()) {
mgr.removeReference(this);
if (exceptionOccurred) {

View file

@ -46,7 +46,9 @@ import javax.jms.TopicSubscriber;
import com.raytheon.uf.common.jms.JmsPooledSession;
/**
* TODO Add Description
* Wrapper class for jms session pooling. Tracks wrapped consumers/producers
* created from this wrapped session to know when the session can be returned to
* the pool.
*
* <pre>
*
@ -55,7 +57,7 @@ import com.raytheon.uf.common.jms.JmsPooledSession;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Nov 30, 2011 rjpeter Initial creation
*
* Feb 26, 2013 1642 rjpeter Added volatile references for better concurrency handling.
* </pre>
*
* @author rjpeter
@ -63,20 +65,18 @@ import com.raytheon.uf.common.jms.JmsPooledSession;
*/
public class JmsSessionWrapper implements Session {
private JmsPooledSession mgr = null;
private final JmsPooledSession mgr;
private boolean closed = false;
private volatile boolean closed = false;
private boolean exceptionOccurred = false;
private volatile boolean exceptionOccurred = false;
private Throwable trappedExc = null;
private final List<JmsProducerWrapper> producers = new ArrayList<JmsProducerWrapper>(
1);;
private List<JmsProducerWrapper> producers = null;
private final List<JmsConsumerWrapper> consumers = new ArrayList<JmsConsumerWrapper>(
1);
private List<JmsConsumerWrapper> consumers = null;
// TODO: needs to track the wrappers opened by this wrapped session so when
// wrapped session is closed, all underlying wrappers are closed.
public JmsSessionWrapper(JmsPooledSession mgr) {
this.mgr = mgr;
}
@ -87,32 +87,24 @@ public class JmsSessionWrapper implements Session {
*
* @return True if this wrapper hasn't been closed before, false otherwise.
*/
public boolean closeInternal() {
public boolean closeWrapper() {
synchronized (this) {
if (!closed) {
closed = true;
if (consumers != null) {
for (JmsConsumerWrapper consumer : consumers) {
try {
consumer.close();
} catch (JMSException e) {
for (JmsConsumerWrapper consumer : consumers) {
try {
consumer.close();
} catch (JMSException e) {
}
}
consumers = null;
}
if (producers != null) {
for (JmsProducerWrapper producer : producers) {
try {
producer.close();
} catch (JMSException e) {
for (JmsProducerWrapper producer : producers) {
try {
producer.close();
} catch (JMSException e) {
}
}
producers = null;
}
if (exceptionOccurred) {
@ -132,7 +124,7 @@ public class JmsSessionWrapper implements Session {
*/
@Override
public void close() throws JMSException {
if (closeInternal()) {
if (closeWrapper()) {
// remove this wrapper from the manager
mgr.removeReference(this);
@ -164,7 +156,6 @@ public class JmsSessionWrapper implements Session {
sess.commit();
} catch (Throwable e) {
exceptionOccurred = true;
trappedExc = e;
JMSException exc = new JMSException(
"Exception occurred on pooled session");
exc.initCause(e);
@ -185,7 +176,6 @@ public class JmsSessionWrapper implements Session {
return sess.createBrowser(queue);
} catch (Throwable e) {
exceptionOccurred = true;
trappedExc = e;
JMSException exc = new JMSException(
"Exception occurred on pooled session");
exc.initCause(e);
@ -207,7 +197,6 @@ public class JmsSessionWrapper implements Session {
return sess.createBrowser(queue, messageSelector);
} catch (Throwable e) {
exceptionOccurred = true;
trappedExc = e;
JMSException exc = new JMSException(
"Exception occurred on pooled session");
exc.initCause(e);
@ -228,7 +217,6 @@ public class JmsSessionWrapper implements Session {
return sess.createBytesMessage();
} catch (Throwable e) {
exceptionOccurred = true;
trappedExc = e;
JMSException exc = new JMSException(
"Exception occurred on pooled session");
exc.initCause(e);
@ -256,20 +244,24 @@ public class JmsSessionWrapper implements Session {
@Override
public MessageConsumer createConsumer(Destination destination,
String messageSelector) throws JMSException {
JmsConsumerWrapper consumer = mgr.getConsumer(destination,
messageSelector);
try {
JmsConsumerWrapper consumer = mgr.getConsumer(destination,
messageSelector);
if (consumer != null) {
if (consumers == null) {
consumers = new ArrayList<JmsConsumerWrapper>(1);
if (consumer != null) {
consumers.add(consumer);
} else {
throw new IllegalStateException("Underlying consumer is closed");
}
consumers.add(consumer);
} else {
throw new IllegalStateException("Underlying consumer is closed");
return consumer;
} catch (Throwable e) {
exceptionOccurred = true;
JMSException exc = new JMSException(
"Exception occurred on pooled session");
exc.initCause(e);
throw exc;
}
return consumer;
}
/*
@ -290,7 +282,6 @@ public class JmsSessionWrapper implements Session {
noLocal);
} catch (Throwable e) {
exceptionOccurred = true;
trappedExc = e;
JMSException exc = new JMSException(
"Exception occurred on pooled session");
exc.initCause(e);
@ -317,7 +308,6 @@ public class JmsSessionWrapper implements Session {
return sess.createDurableSubscriber(topic, name);
} catch (Throwable e) {
exceptionOccurred = true;
trappedExc = e;
JMSException exc = new JMSException(
"Exception occurred on pooled session");
exc.initCause(e);
@ -342,7 +332,6 @@ public class JmsSessionWrapper implements Session {
noLocal);
} catch (Throwable e) {
exceptionOccurred = true;
trappedExc = e;
JMSException exc = new JMSException(
"Exception occurred on pooled session");
exc.initCause(e);
@ -363,7 +352,6 @@ public class JmsSessionWrapper implements Session {
return sess.createMapMessage();
} catch (Throwable e) {
exceptionOccurred = true;
trappedExc = e;
JMSException exc = new JMSException(
"Exception occurred on pooled session");
exc.initCause(e);
@ -384,7 +372,6 @@ public class JmsSessionWrapper implements Session {
return sess.createMessage();
} catch (Throwable e) {
exceptionOccurred = true;
trappedExc = e;
JMSException exc = new JMSException(
"Exception occurred on pooled session");
exc.initCause(e);
@ -405,7 +392,6 @@ public class JmsSessionWrapper implements Session {
return sess.createObjectMessage();
} catch (Throwable e) {
exceptionOccurred = true;
trappedExc = e;
JMSException exc = new JMSException(
"Exception occurred on pooled session");
exc.initCause(e);
@ -427,7 +413,6 @@ public class JmsSessionWrapper implements Session {
return sess.createObjectMessage(obj);
} catch (Throwable e) {
exceptionOccurred = true;
trappedExc = e;
JMSException exc = new JMSException(
"Exception occurred on pooled session");
exc.initCause(e);
@ -443,19 +428,23 @@ public class JmsSessionWrapper implements Session {
@Override
public MessageProducer createProducer(Destination destination)
throws JMSException {
JmsProducerWrapper producer = mgr.getProducer(destination);
try {
JmsProducerWrapper producer = mgr.getProducer(destination);
if (producer != null) {
if (producers == null) {
producers = new ArrayList<JmsProducerWrapper>(1);
if (producer != null) {
producers.add(producer);
} else {
throw new IllegalStateException("Underlying producer is closed");
}
producers.add(producer);
} else {
throw new IllegalStateException("Underlying producer is closed");
return producer;
} catch (Throwable e) {
exceptionOccurred = true;
JMSException exc = new JMSException(
"Exception occurred on pooled session");
exc.initCause(e);
throw exc;
}
return producer;
}
/*
@ -471,7 +460,6 @@ public class JmsSessionWrapper implements Session {
return sess.createQueue(queueName);
} catch (Throwable e) {
exceptionOccurred = true;
trappedExc = e;
JMSException exc = new JMSException(
"Exception occurred on pooled session");
exc.initCause(e);
@ -492,7 +480,6 @@ public class JmsSessionWrapper implements Session {
return sess.createStreamMessage();
} catch (Throwable e) {
exceptionOccurred = true;
trappedExc = e;
JMSException exc = new JMSException(
"Exception occurred on pooled session");
exc.initCause(e);
@ -513,7 +500,6 @@ public class JmsSessionWrapper implements Session {
return sess.createTemporaryQueue();
} catch (Throwable e) {
exceptionOccurred = true;
trappedExc = e;
JMSException exc = new JMSException(
"Exception occurred on pooled session");
exc.initCause(e);
@ -534,7 +520,6 @@ public class JmsSessionWrapper implements Session {
return sess.createTemporaryTopic();
} catch (Throwable e) {
exceptionOccurred = true;
trappedExc = e;
JMSException exc = new JMSException(
"Exception occurred on pooled session");
exc.initCause(e);
@ -555,7 +540,6 @@ public class JmsSessionWrapper implements Session {
return sess.createTextMessage();
} catch (Throwable e) {
exceptionOccurred = true;
trappedExc = e;
JMSException exc = new JMSException(
"Exception occurred on pooled session");
exc.initCause(e);
@ -576,7 +560,6 @@ public class JmsSessionWrapper implements Session {
return sess.createTextMessage(text);
} catch (Throwable e) {
exceptionOccurred = true;
trappedExc = e;
JMSException exc = new JMSException(
"Exception occurred on pooled session");
exc.initCause(e);
@ -597,7 +580,6 @@ public class JmsSessionWrapper implements Session {
return sess.createTopic(topicName);
} catch (Throwable e) {
exceptionOccurred = true;
trappedExc = e;
JMSException exc = new JMSException(
"Exception occurred on pooled session");
exc.initCause(e);
@ -618,7 +600,6 @@ public class JmsSessionWrapper implements Session {
return sess.getAcknowledgeMode();
} catch (Throwable e) {
exceptionOccurred = true;
trappedExc = e;
JMSException exc = new JMSException(
"Exception occurred on pooled session");
exc.initCause(e);
@ -639,7 +620,6 @@ public class JmsSessionWrapper implements Session {
return sess.getMessageListener();
} catch (Throwable e) {
exceptionOccurred = true;
trappedExc = e;
JMSException exc = new JMSException(
"Exception occurred on pooled session");
exc.initCause(e);
@ -660,7 +640,6 @@ public class JmsSessionWrapper implements Session {
return sess.getTransacted();
} catch (Throwable e) {
exceptionOccurred = true;
trappedExc = e;
JMSException exc = new JMSException(
"Exception occurred on pooled session");
exc.initCause(e);
@ -681,7 +660,6 @@ public class JmsSessionWrapper implements Session {
sess.recover();
} catch (Throwable e) {
exceptionOccurred = true;
trappedExc = e;
JMSException exc = new JMSException(
"Exception occurred on pooled session");
exc.initCause(e);
@ -702,7 +680,6 @@ public class JmsSessionWrapper implements Session {
sess.rollback();
} catch (Throwable e) {
exceptionOccurred = true;
trappedExc = e;
JMSException exc = new JMSException(
"Exception occurred on pooled session");
exc.initCause(e);
@ -722,7 +699,6 @@ public class JmsSessionWrapper implements Session {
sess.run();
} catch (Throwable e) {
exceptionOccurred = true;
trappedExc = e;
RuntimeException exc = new RuntimeException(
"Exception occurred on pooled session", e);
throw exc;
@ -743,7 +719,6 @@ public class JmsSessionWrapper implements Session {
sess.setMessageListener(listener);
} catch (Throwable e) {
exceptionOccurred = true;
trappedExc = e;
JMSException exc = new JMSException(
"Exception occurred on pooled session");
exc.initCause(e);
@ -764,7 +739,6 @@ public class JmsSessionWrapper implements Session {
sess.unsubscribe(name);
} catch (Throwable e) {
exceptionOccurred = true;
trappedExc = e;
JMSException exc = new JMSException(
"Exception occurred on pooled session");
exc.initCause(e);