diff --git a/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/JmsPooledConnection.java b/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/JmsPooledConnection.java index 8f4cfd2b35..27076515d6 100644 --- a/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/JmsPooledConnection.java +++ b/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/JmsPooledConnection.java @@ -35,7 +35,16 @@ import com.raytheon.uf.common.status.UFStatus; import com.raytheon.uf.common.status.UFStatus.Priority; /** - * TODO Add Description + * Jms Pooled Connection. Tracks references to the connection to know when the + * connection can be released to the pool. Any exception will close pooled + * session instead of returning to the pool. The sessions are tracked in both + * active and available states. An available session can be reused by the next + * client. + * + * Synchronization Principle To prevent deadlocks: Chained sync blocks can only + * happen in a downward direction. A manager has a synchronized lock can make a + * call down to a wrapper, but not nice versa. Also a session inside a sync + * block can make a call down to a producer but not vice versa. * *
  * 
@@ -44,8 +53,8 @@ import com.raytheon.uf.common.status.UFStatus.Priority;
  * Date         Ticket#    Engineer    Description
  * ------------ ---------- ----------- --------------------------
  * Apr 15, 2011            rjpeter     Initial creation
- * Mar 08, 2012   194   njensen   Improved safety of close()
- * 
+ * Mar 08, 2012 194        njensen     Improved safety of close()
+ * Feb 21, 2013 1642       rjpeter     Fix deadlock scenario
  * 
* * @author rjpeter @@ -57,35 +66,33 @@ public class JmsPooledConnection implements ExceptionListener { private final IUFStatusHandler statusHandler = UFStatus .getHandler(JmsPooledConnection.class); - private JmsPooledConnectionFactory connFactory = null; + private final JmsPooledConnectionFactory connFactory; - private Connection conn = null; + private volatile Connection conn = null; // keeps track of number of creates vs. closes to know when it can be // returned to the pool - List references = new ArrayList( + private final List references = new ArrayList( 1); - private Object stateLock = new Object(); + private final Object stateLock = new Object(); - private State state = State.InUse; + private volatile State state = State.InUse; // technically can have multiple sessions to one connection and sessions can // differ by transaction mode and acknowledgement mode, currently not // supported - private JmsPooledSession session = null; + private volatile JmsPooledSession session = null; - private AvailableJmsPooledObject availableSession = null; + private volatile AvailableJmsPooledObject availableSession = null; - private String key = null; + private volatile String key = null; - private String clientId = null; + private final String clientId; - private long connectionStartTime = 0; + private volatile long connectionStartTime = 0; - private boolean exceptionOccurred = false; - - private Throwable trappedExc; + private volatile boolean exceptionOccurred = false; public JmsPooledConnection(JmsPooledConnectionFactory connFactory) { this.connFactory = connFactory; @@ -101,7 +108,7 @@ public class JmsPooledConnection implements ExceptionListener { throw new IllegalStateException("Connection is closed"); } - synchronized (this) { + synchronized (stateLock) { // TODO: Add multiple session support if (session != null) { JmsSessionWrapper ref = session.createReference(); @@ -112,6 +119,7 @@ public class JmsPooledConnection implements ExceptionListener { this.session = null; } } + if (availableSession != null) { JmsPooledSession availSess = availableSession.getPooledObject(); synchronized (availSess.getStateLock()) { @@ -130,6 +138,7 @@ public class JmsPooledConnection implements ExceptionListener { transacted, acknowledgeMode)); } } + return session.createReference(); } @@ -160,64 +169,61 @@ public class JmsPooledConnection implements ExceptionListener { public void onException(JMSException jmsExc) { // need to worry about multiple exceptions causing repeated // disconnect/reconnect - statusHandler.handle(Priority.INFO, "Caught Exception on " + statusHandler.handle(Priority.WARN, "Caught Exception on " + connFactory.getProvider() + " connection. Closing connection", jmsExc); close(); } public void close() { - // synchronize on the connection to avoid deadlock conditions in qpid boolean canClose = false; synchronized (stateLock) { if (!State.Closed.equals(state)) { canClose = true; state = State.Closed; - - if (trappedExc != null) { - statusHandler.handle(Priority.INFO, - "Trapped internal exception", trappedExc); - } } } if (canClose) { - synchronized (this) { - // njensen: I moved removing the connection from the pool to be - // the first thing in this block instead of last thing so - // there's no chance it could be closed and then retrieved from - // the pool by something else - connFactory.removeConnectionFromPool(this); - if (conn != null) { - try { - conn.stop(); - } catch (Exception e) { - statusHandler.handle(Priority.INFO, - "Failed to stop connection", e); - } - } - - synchronized (references) { - for (JmsConnectionWrapper wrapper : references) { - wrapper.closeInternal(); - } - references.clear(); - } - - if (session != null) { - session.close(); - session = null; - } - + // njensen: I moved removing the connection from the pool to be + // the first thing in this block instead of last thing so + // there's no chance it could be closed and then retrieved from + // the pool by something else + connFactory.removeConnectionFromPool(this); + if (conn != null) { try { - conn.close(); + conn.stop(); } catch (Exception e) { - statusHandler.handle(Priority.INFO, - "Failed to close connection " + conn, e); + statusHandler.handle(Priority.WARN, + "Failed to stop connection", e); } } - conn = null; + + synchronized (references) { + for (JmsConnectionWrapper wrapper : references) { + wrapper.closeWrapper(); + } + references.clear(); + } + + if (session != null) { + session.close(); + session = null; + } + + if (availableSession != null) { + availableSession.getPooledObject().close(); + availableSession = null; + } + + try { + conn.close(); + } catch (Exception e) { + statusHandler.handle(Priority.WARN, + "Failed to close connection " + conn, e); + } } + conn = null; } /** @@ -228,8 +234,13 @@ public class JmsPooledConnection implements ExceptionListener { * @return */ public int closeOldPooledResources(int resourceRetention) { + if (!isValid()) { + return 0; + } + int count = 0; JmsPooledSession sessionToCheck = null; + synchronized (stateLock) { if (session != null) { sessionToCheck = session; @@ -265,8 +276,12 @@ public class JmsPooledConnection implements ExceptionListener { } public Connection getConnection() { + // lazy initialized so that we can handle reconnect and logging since + // spring reconnect doesn't give any status information and appears to + // the user that the process is hung if (conn == null) { - synchronized (this) { + // safe since conn is volatile + synchronized (stateLock) { if (conn == null) { long exceptionLastHandled = 0; boolean connected = false; @@ -345,12 +360,12 @@ public class JmsPooledConnection implements ExceptionListener { if (!valid) { removeSession(sess); } else { - synchronized (this) { + synchronized (stateLock) { // should only be able to have 1 session if (availableSession != null) { - availableSession.getPooledObject().close(); statusHandler .warn("Pooled session already existed for this connection, closing previous session"); + availableSession.getPooledObject().close(); } availableSession = new AvailableJmsPooledObject( session); @@ -361,13 +376,19 @@ public class JmsPooledConnection implements ExceptionListener { return valid; } + /** + * Removes this pooled session from the pooled connection. Does NOT close + * the session, this should be handled independently. + * + * @param sess + */ public void removeSession(JmsPooledSession sess) { - synchronized (this) { + synchronized (stateLock) { if (sess != null) { if (this.session == sess) { this.session = null; - } else if (availableSession != null - && availableSession.getPooledObject() == sess) { + } else if ((availableSession != null) + && (availableSession.getPooledObject() == sess)) { this.availableSession = null; } } @@ -378,7 +399,11 @@ public class JmsPooledConnection implements ExceptionListener { synchronized (stateLock) { if (isValid(State.InUse, true)) { JmsConnectionWrapper wrapper = new JmsConnectionWrapper(this); - references.add(wrapper); + + synchronized (references) { + references.add(wrapper); + } + return wrapper; } } @@ -433,30 +458,35 @@ public class JmsPooledConnection implements ExceptionListener { public void removeReference(JmsConnectionWrapper wrapper) { boolean returnToPool = false; synchronized (stateLock) { - if (references.remove(wrapper) && references.isEmpty() - && State.InUse.equals(state)) { - state = State.Available; - returnToPool = true; + synchronized (references) { + if (references.remove(wrapper) && references.isEmpty() + && State.InUse.equals(state)) { + state = State.Available; + returnToPool = true; - // double check state of session, should be available - if (session != null) { - statusHandler - .warn("Connection marked available, but Session not Available. Sessions state is: " - + session.getState()); - session.close(); - session = null; - } - - if (availableSession != null) { - JmsPooledSession availSess = availableSession - .getPooledObject(); - if (availSess != null - && !State.Available.equals(availSess.getState())) { + // double check state of session, should be available + if (session != null) { statusHandler .warn("Connection marked available, but Session not Available. Sessions state is: " - + availSess.getState()); - availSess.close(); - availableSession = null; + + session.getState()); + session.close(); + session = null; + } + + if (availableSession != null) { + JmsPooledSession availSess = availableSession + .getPooledObject(); + synchronized (availSess.getStateLock()) { + if ((availSess != null) + && !State.Available.equals(availSess + .getState())) { + statusHandler + .warn("Connection marked available, but Session not Available. Sessions state is: " + + availSess.getState()); + availSess.close(); + availableSession = null; + } + } } } } diff --git a/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/JmsPooledConnectionFactory.java b/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/JmsPooledConnectionFactory.java index 9e356c30df..b5c5d03be7 100644 --- a/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/JmsPooledConnectionFactory.java +++ b/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/JmsPooledConnectionFactory.java @@ -1,6 +1,7 @@ package com.raytheon.uf.common.jms; import java.util.ArrayList; +import java.util.Deque; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -63,16 +64,16 @@ public class JmsPooledConnectionFactory implements ConnectionFactory { private String provider = "QPID"; // connections in use, key is "threadId-threadName" - private Map inUseConnections = new HashMap(); + private final Map inUseConnections = new HashMap(); // connections that were recently returned, key is "threadId-threadName" - private Map> pendingConnections = new HashMap>(); + private final Map> pendingConnections = new HashMap>(); // connections that have been released from pendingConnections and are // awaiting being closed. - private LinkedList> availableConnections = new LinkedList>(); + private final Deque> availableConnections = new LinkedList>(); - private ConcurrentLinkedQueue deadConnections = new ConcurrentLinkedQueue(); + private final ConcurrentLinkedQueue deadConnections = new ConcurrentLinkedQueue(); private int reconnectInterval = 30000; @@ -350,13 +351,13 @@ public class JmsPooledConnectionFactory implements ConnectionFactory { } } - for (AvailableJmsPooledObject wrapper : connectionsToProcess) { - wrapper.reset(); - // putting to available pool - JmsPooledConnection conn = wrapper.getPooledObject(); - conn.setKey(null); + synchronized (availableConnections) { + for (AvailableJmsPooledObject wrapper : connectionsToProcess) { + wrapper.reset(); - synchronized (availableConnections) { + // putting to available pool + JmsPooledConnection conn = wrapper.getPooledObject(); + conn.setKey(null); availableConnections.add(wrapper); } } @@ -372,10 +373,9 @@ public class JmsPooledConnectionFactory implements ConnectionFactory { // available sessions added based on time, so oldest is front of // queue if (wrapper.expired(curTime, connectionHoldTime) - || availableConnections.size() > maxSpareConnections) { + || (availableConnections.size() > maxSpareConnections)) { // not immediately closing connection so that we minimize - // time - // in sync block + // time in sync block deadConnections.add(wrapper.getPooledObject()); iter.remove(); } else { @@ -409,15 +409,17 @@ public class JmsPooledConnectionFactory implements ConnectionFactory { connectionsToProcess = new ArrayList>( pendingConnections.values()); } + synchronized (availableConnections) { connectionsToProcess.addAll(availableConnections); } + for (AvailableJmsPooledObject wrapper : connectionsToProcess) { resourcesClosed += wrapper.getPooledObject() .closeOldPooledResources(resourceRetention); } - if (connectionsClosed > 0 || resourcesClosed > 0) { + if ((connectionsClosed > 0) || (resourcesClosed > 0)) { statusHandler.handle( Priority.INFO, "Closed unused jms pooled resources: connections closed: " diff --git a/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/JmsPooledConsumer.java b/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/JmsPooledConsumer.java index 50d894d38a..5554eb1e4f 100644 --- a/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/JmsPooledConsumer.java +++ b/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/JmsPooledConsumer.java @@ -32,7 +32,14 @@ import com.raytheon.uf.common.status.UFStatus; import com.raytheon.uf.common.status.UFStatus.Priority; /** - * TODO Add Description + * Jms Pooled Consumer. Tracks references to the consumers to know when consumer + * can be released to pool. Any exception will close pooled consumer instead of + * returning to pool. + * + * Synchronization Principle To prevent deadlocks: Chained sync blocks can only + * happen in a downward direction. A manager has a synchronized lock can make a + * call down to a wrapper, but not nice versa. Also a session inside a sync + * block can make a call down to a producer but not vice versa. * *
  * 
@@ -41,8 +48,8 @@ import com.raytheon.uf.common.status.UFStatus.Priority;
  * Date         Ticket#    Engineer    Description
  * ------------ ---------- ----------- --------------------------
  * Apr 18, 2011            rjpeter     Initial creation
- * Mar 08, 2012   194   njensen   Improved logging
- * 
+ * Mar 08, 2012 194        njensen     Improved logging
+ * Feb 26, 2013 1642       rjpeter     Removed lazy initialization
  * 
* * @author rjpeter @@ -55,34 +62,31 @@ public class JmsPooledConsumer { private final JmsPooledSession sess; - private final Destination destination; - - private final String messageSelector; - - private MessageConsumer consumer; + private final MessageConsumer consumer; private final String destKey; - private boolean exceptionOccurred = false; + private volatile boolean exceptionOccurred = false; - private Object stateLock = new Object(); + private final Object stateLock = new Object(); - private State state = State.InUse; + private volatile State state = State.InUse; /** * Technically a pooled consumer should only have 1 reference at a time. * Bullet proofing in case 3rd party ever tries to get multiple consumers to * the same destination. */ - private List references = new ArrayList( + private final List references = new ArrayList( 1); public JmsPooledConsumer(JmsPooledSession sess, String destKey, - Destination destination, String messageSelector) { + Destination destination, String messageSelector) + throws JMSException { this.sess = sess; this.destKey = destKey; - this.destination = destination; - this.messageSelector = messageSelector; + consumer = sess.getSession().createConsumer(destination, + messageSelector); } public String getDestKey() { @@ -151,7 +155,7 @@ public class JmsPooledConsumer { close = true; for (JmsConsumerWrapper wrapper : references) { - wrapper.closeInternal(); + wrapper.closeWrapper(); } references.clear(); @@ -159,15 +163,12 @@ public class JmsPooledConsumer { } if (close) { - if (consumer != null) { - try { - statusHandler.info("Closing consumer " + consumer); // njensen - consumer.close(); - } catch (Throwable e) { - statusHandler.handle(Priority.INFO, - "Failed to close consumer " + consumer, e); - } - consumer = null; + try { + statusHandler.info("Closing consumer " + consumer); // njensen + consumer.close(); + } catch (Throwable e) { + statusHandler.handle(Priority.WARN, "Failed to close consumer " + + consumer, e); } sess.removeConsumerFromPool(this); @@ -206,18 +207,6 @@ public class JmsPooledConsumer { } public MessageConsumer getConsumer() throws JMSException { - // TODO: allow this to automatically grab a new consumer if the current - // one fails, try up to 3 times so that we don't always drop messages on - // a single failure - if (consumer == null) { - synchronized (this) { - if (consumer == null) { - consumer = sess.getSession().createConsumer(destination, - messageSelector); - } - } - } - return consumer; } diff --git a/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/JmsPooledProducer.java b/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/JmsPooledProducer.java index 892a90a511..fb6d3c8e4d 100644 --- a/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/JmsPooledProducer.java +++ b/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/JmsPooledProducer.java @@ -22,7 +22,6 @@ package com.raytheon.uf.common.jms; import java.util.ArrayList; import java.util.List; -import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; @@ -32,8 +31,12 @@ import com.raytheon.uf.common.status.UFStatus; import com.raytheon.uf.common.status.UFStatus.Priority; /** + * Jms Pooled Producer. Tracks references to the producers to know when the + * producer can be released to the pool. Any exception will close pooled + * producer instead of returning to the pool. + * * Synchronization Principle To prevent deadlocks: Chained sync blocks can only - * happen in a doward direction. A manager has a synchonized lock can make a + * happen in a downward direction. A manager has a synchronized lock can make a * call down to a wrapper, but not nice versa. Also a session inside a sync * block can make a call down to a producer but not vice versa. * @@ -44,8 +47,8 @@ import com.raytheon.uf.common.status.UFStatus.Priority; * Date Ticket# Engineer Description * ------------ ---------- ----------- -------------------------- * Apr 18, 2011 rjpeter Initial creation - * Mar 08, 2012 194 njensen Improved logging - * + * Mar 08, 2012 194 njensen Improved logging + * Feb 26, 2013 1642 rjpeter Removed lazy initialization * * * @author rjpeter @@ -58,31 +61,29 @@ public class JmsPooledProducer { private final JmsPooledSession sess; - private final Destination destination; - - private MessageProducer producer; + private final MessageProducer producer; private final String destKey; - private boolean exceptionOccurred = false; + private volatile boolean exceptionOccurred = false; - private Object stateLock = new Object(); + private final Object stateLock = new Object(); - private State state = State.InUse; + private volatile State state = State.InUse; /** * Technically a pooled producer should only have 1 reference at a time. * Bullet proofing in case 3rd party ever tries to get multiple producers to * the same destination. */ - private List references = new ArrayList( + private final List references = new ArrayList( 1); public JmsPooledProducer(JmsPooledSession sess, String destKey, - Destination destination) { + MessageProducer producer) { this.sess = sess; this.destKey = destKey; - this.destination = destination; + this.producer = producer; } public String getDestKey() { @@ -151,7 +152,7 @@ public class JmsPooledProducer { close = true; for (JmsProducerWrapper wrapper : references) { - wrapper.closeInternal(); + wrapper.closeWrapper(); } references.clear(); @@ -159,15 +160,12 @@ public class JmsPooledProducer { } if (close) { - if (producer != null) { - try { - statusHandler.info("Closing producer " + producer); // njensen - producer.close(); - } catch (Throwable e) { - statusHandler.handle(Priority.INFO, - "Failed to close producer", e); - } - producer = null; + try { + statusHandler.info("Closing producer " + producer); // njensen + producer.close(); + } catch (Throwable e) { + statusHandler.handle(Priority.WARN, "Failed to close producer", + e); } sess.removeProducerFromPool(this); @@ -207,17 +205,6 @@ public class JmsPooledProducer { } public MessageProducer getProducer() throws JMSException { - // TODO: allow this to automatically grab a new producer if the current - // one fails, try up to 3 times so that we don't always drop messages on - // a single failure - if (producer == null) { - synchronized (this) { - if (producer == null) { - producer = sess.getSession().createProducer(destination); - } - } - } - return producer; } diff --git a/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/JmsPooledSession.java b/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/JmsPooledSession.java index c04465fd73..d3ad93562e 100644 --- a/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/JmsPooledSession.java +++ b/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/JmsPooledSession.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Map; import javax.jms.Destination; import javax.jms.JMSException; @@ -39,7 +40,16 @@ import com.raytheon.uf.common.status.UFStatus; import com.raytheon.uf.common.status.UFStatus.Priority; /** - * TODO Add Description + * Jms Pooled Session. Tracks references to the session to know when the session + * can be released to the pool. Any exception will close pooled session instead + * of returning to the pool. The consumers/producers are tracked in both active + * and available states. An available consumer/producer can be reused by the + * next client. + * + * Synchronization Principle To prevent deadlocks: Chained sync blocks can only + * happen in a downward direction. A manager has a synchronized lock can make a + * call down to a wrapper, but not nice versa. Also a session inside a sync + * block can make a call down to a producer but not vice versa. * *
  * 
@@ -48,8 +58,8 @@ import com.raytheon.uf.common.status.UFStatus.Priority;
  * Date         Ticket#    Engineer    Description
  * ------------ ---------- ----------- --------------------------
  * Apr 15, 2011            rjpeter     Initial creation
- * Mar 08, 2012   194   njensen   Improved logging
- * 
+ * Mar 08, 2012 194        njensen     Improved logging
+ * Feb 21, 2013 1642       rjpeter     Fix deadlock scenario
  * 
* * @author rjpeter @@ -65,27 +75,28 @@ public class JmsPooledSession { private final Session sess; + // The thread this session was most recently used by for tracking a pending + // session that is being reserved for a given thread. private String threadKey; - private boolean exceptionOccurred = false; + private volatile boolean exceptionOccurred = false; - private Throwable trappedExc = null; + private final Object stateLock = new Object(); - private Object stateLock = new Object(); - - private State state = State.InUse; + private volatile State state = State.InUse; // keeps track of number of creates vs. closes to know when it can be // returned to the pool - List references = new ArrayList(1); + private final List references = new ArrayList( + 1); - private HashMap> availableConsumers = new HashMap>(); + private final Map> availableConsumers = new HashMap>(); - private HashMap> availableProducers = new HashMap>(); + private final Map> availableProducers = new HashMap>(); - private HashMap inUseConsumers = new HashMap(); + private final Map inUseConsumers = new HashMap(); - private HashMap inUseProducers = new HashMap(); + private final Map inUseProducers = new HashMap(); public JmsPooledSession(JmsPooledConnection conn, Session sess) { this.conn = conn; @@ -250,10 +261,12 @@ public class JmsPooledSession { List consumersToClose = new ArrayList( inUseConsumers.size() + availableConsumers.size()); + synchronized (inUseConsumers) { consumersToClose.addAll(inUseConsumers.values()); inUseConsumers.clear(); } + synchronized (availableConsumers) { for (AvailableJmsPooledObject wrapper : availableConsumers .values()) { @@ -288,7 +301,7 @@ public class JmsPooledSession { // of time this is correct success = inUse == producer; - if (!success && inUse != null) { + if (!success && (inUse != null)) { // put the bad removal back in. Done this way instead of // get/remove as 99% of time remove is correct, this // really only here for bullet proofing code against bad @@ -340,7 +353,7 @@ public class JmsPooledSession { JmsPooledProducer inUse = inUseProducers.remove(destKey); removed = inUse == producer; - if (!removed && inUse != null) { + if (!removed && (inUse != null)) { // put the bad removal back in. Done this way instead of // get/remove as 95% of time remove is correct, this // really only here for bullet proofing code against bad @@ -427,7 +440,7 @@ public class JmsPooledSession { JmsPooledConsumer inUse = inUseConsumers.remove(destKey); removed = inUse == consumer; - if (!removed && inUse != null) { + if (!removed && (inUse != null)) { // put the bad removal back in. Done this way instead of // get/remove as 95% of time remove is correct, this // really only here for bullet proofing code against bad @@ -449,31 +462,22 @@ public class JmsPooledSession { } if (canClose) { - if (trappedExc != null) { - statusHandler.handle(Priority.INFO, - "Trapped internal exception", trappedExc); - } - closePooledConsumersProducers(); // need to close down all wrappers for (JmsSessionWrapper wrapper : references) { - wrapper.closeInternal(); + wrapper.closeWrapper(); } references.clear(); conn.removeSession(this); - // synchronize on the connection to avoid deadlock conditions in - // qpid - synchronized (conn) { - try { - sess.close(); - } catch (Exception e) { - statusHandler.handle(Priority.INFO, - "Failed to close session " + sess, e); - } + try { + sess.close(); + } catch (Exception e) { + statusHandler.handle(Priority.WARN, "Failed to close session " + + sess, e); } } } @@ -603,7 +607,8 @@ public class JmsPooledSession { } if (producer == null) { - producer = new JmsPooledProducer(this, destKey, destination); + producer = new JmsPooledProducer(this, destKey, + sess.createProducer(destination)); } inUseProducers.put(destKey, producer); diff --git a/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/State.java b/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/State.java index a316b9c76c..021df96178 100644 --- a/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/State.java +++ b/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/State.java @@ -20,7 +20,7 @@ package com.raytheon.uf.common.jms; /** - * TODO Add Description + * State of Jms Pooled object. * *
  * 
diff --git a/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/wrapper/JmsConnectionWrapper.java b/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/wrapper/JmsConnectionWrapper.java
index 754b753a2e..6a7de9ca0d 100644
--- a/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/wrapper/JmsConnectionWrapper.java
+++ b/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/wrapper/JmsConnectionWrapper.java
@@ -36,7 +36,9 @@ import javax.jms.Topic;
 import com.raytheon.uf.common.jms.JmsPooledConnection;
 
 /**
- * TODO Add Description
+ * Wrapper class for jms connection pooling. Tracks wrapped sessions created
+ * from this wrapped connection to know when the connection can be returned to
+ * the pool.
  * 
  * 
  * 
@@ -45,7 +47,7 @@ import com.raytheon.uf.common.jms.JmsPooledConnection;
  * Date         Ticket#    Engineer    Description
  * ------------ ---------- ----------- --------------------------
  * Apr 15, 2011            rjpeter     Initial creation
- * 
+ * Feb 21, 2013 1642       rjpeter     Added volatile references for better concurrency handling.
  * 
* * @author rjpeter @@ -53,18 +55,16 @@ import com.raytheon.uf.common.jms.JmsPooledConnection; */ public class JmsConnectionWrapper implements Connection { - private JmsPooledConnection mgr = null; + private final JmsPooledConnection mgr; - private boolean closed = false; + private volatile boolean closed = false; - private boolean exceptionOccurred = false; + private volatile boolean exceptionOccurred = false; - private Throwable trappedExc = null; - - private List sessions = new ArrayList( + private final List sessions = new ArrayList( 1); - private String clientId = null; + private final String clientId = null; public JmsConnectionWrapper(JmsPooledConnection mgr) { this.mgr = mgr; @@ -76,20 +76,16 @@ public class JmsConnectionWrapper implements Connection { * * @return True if this wrapper hasn't been closed before, false otherwise. */ - public boolean closeInternal() { + public boolean closeWrapper() { synchronized (this) { if (!closed) { closed = true; - if (sessions != null) { - for (JmsSessionWrapper session : sessions) { - try { - session.close(); - } catch (JMSException e) { - - } + for (JmsSessionWrapper session : sessions) { + try { + session.close(); + } catch (JMSException e) { + // closing of wrapper doesn't throw an exception } - - sessions = null; } if (exceptionOccurred) { @@ -109,7 +105,7 @@ public class JmsConnectionWrapper implements Connection { */ @Override public void close() throws JMSException { - if (closeInternal()) { + if (closeWrapper()) { // remove this wrapper from the manager mgr.removeReference(this); @@ -169,9 +165,6 @@ public class JmsConnectionWrapper implements Connection { JmsSessionWrapper session = mgr.getSession(transacted, acknowledgeMode); if (session != null) { - if (sessions == null) { - sessions = new ArrayList(1); - } sessions.add(session); } else { throw new IllegalStateException("Underlying session is closed"); @@ -179,7 +172,6 @@ public class JmsConnectionWrapper implements Connection { return session; } catch (Throwable e) { exceptionOccurred = true; - trappedExc = e; JMSException exc = new JMSException( "Exception occurred on pooled connection"); exc.initCause(e); @@ -220,7 +212,6 @@ public class JmsConnectionWrapper implements Connection { return conn.getMetaData(); } catch (Throwable e) { exceptionOccurred = true; - trappedExc = e; JMSException exc = new JMSException( "Exception occurred on pooled connection"); exc.initCause(e); diff --git a/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/wrapper/JmsConsumerWrapper.java b/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/wrapper/JmsConsumerWrapper.java index 7aa960dd80..ec978fe7b8 100644 --- a/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/wrapper/JmsConsumerWrapper.java +++ b/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/wrapper/JmsConsumerWrapper.java @@ -28,7 +28,8 @@ import javax.jms.MessageListener; import com.raytheon.uf.common.jms.JmsPooledConsumer; /** - * TODO Add Description + * Wrapper class for jms consumer pooling. Helps track references to the pooled + * consumer to know when the consumer can be closed. * *
  * 
@@ -37,7 +38,7 @@ import com.raytheon.uf.common.jms.JmsPooledConsumer;
  * Date         Ticket#    Engineer    Description
  * ------------ ---------- ----------- --------------------------
  * Apr 18, 2011            rjpeter     Initial creation
- * 
+ * Feb 26, 2013 1642       rjpeter     Added volatile references for better concurrency handling.
  * 
* * @author rjpeter @@ -45,11 +46,11 @@ import com.raytheon.uf.common.jms.JmsPooledConsumer; */ public class JmsConsumerWrapper implements MessageConsumer { - private JmsPooledConsumer mgr = null; + private final JmsPooledConsumer mgr; - private boolean exceptionOccurred = false; + private volatile boolean exceptionOccurred = false; - private boolean closed = false; + private volatile boolean closed = false; public JmsConsumerWrapper(JmsPooledConsumer mgr) { this.mgr = mgr; @@ -74,21 +75,20 @@ public class JmsConsumerWrapper implements MessageConsumer { * * @return True if this wrapper hasn't been closed before, false otherwise. */ - public boolean closeInternal() { - boolean close = false; - + public boolean closeWrapper() { synchronized (this) { if (!closed) { closed = true; - close = true; + + if (exceptionOccurred) { + mgr.setExceptionOccurred(true); + } + + return true; } } - if (close && exceptionOccurred) { - mgr.setExceptionOccurred(true); - } - - return close; + return false; } /* @@ -101,7 +101,7 @@ public class JmsConsumerWrapper implements MessageConsumer { */ @Override public void close() throws JMSException { - if (closeInternal()) { + if (closeWrapper()) { mgr.removeReference(this); if (exceptionOccurred) { diff --git a/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/wrapper/JmsProducerWrapper.java b/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/wrapper/JmsProducerWrapper.java index d2500fa400..1c9439d9c7 100644 --- a/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/wrapper/JmsProducerWrapper.java +++ b/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/wrapper/JmsProducerWrapper.java @@ -28,7 +28,8 @@ import javax.jms.MessageProducer; import com.raytheon.uf.common.jms.JmsPooledProducer; /** - * TODO Add Description + * Wrapper class for jms producer pooling. Helps track references to the pooled + * producer to know when the producer can be closed. * *
  * 
@@ -36,8 +37,8 @@ import com.raytheon.uf.common.jms.JmsPooledProducer;
  * 
  * Date         Ticket#    Engineer    Description
  * ------------ ---------- ----------- --------------------------
- * Dec 8, 2011            rjpeter     Initial creation
- * 
+ * Dec  fi8, 2011            rjpeter     Initial creation
+ * Feb 26, 2013 1642       rjpeter     Added volatile references for better concurrency handling.
  * 
* * @author rjpeter @@ -45,11 +46,11 @@ import com.raytheon.uf.common.jms.JmsPooledProducer; */ public class JmsProducerWrapper implements MessageProducer { - private JmsPooledProducer mgr = null; + private final JmsPooledProducer mgr; - private boolean exceptionOccurred = false; + private volatile boolean exceptionOccurred = false; - private boolean closed = false; + private volatile boolean closed = false; public JmsProducerWrapper(JmsPooledProducer mgr) { this.mgr = mgr; @@ -61,21 +62,18 @@ public class JmsProducerWrapper implements MessageProducer { * * @return True if this wrapper hasn't been closed before, false otherwise. */ - public boolean closeInternal() { - boolean close = false; - + public boolean closeWrapper() { synchronized (this) { if (!closed) { closed = true; - close = true; + + if (exceptionOccurred) { + mgr.setExceptionOccurred(true); + } } } - if (close && exceptionOccurred) { - mgr.setExceptionOccurred(true); - } - - return close; + return false; } /* @@ -88,7 +86,7 @@ public class JmsProducerWrapper implements MessageProducer { */ @Override public void close() throws JMSException { - if (closeInternal()) { + if (closeWrapper()) { mgr.removeReference(this); if (exceptionOccurred) { diff --git a/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/wrapper/JmsSessionWrapper.java b/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/wrapper/JmsSessionWrapper.java index 5ffe5f2295..4af46d7ccb 100644 --- a/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/wrapper/JmsSessionWrapper.java +++ b/edexOsgi/com.raytheon.uf.common.jms/src/com/raytheon/uf/common/jms/wrapper/JmsSessionWrapper.java @@ -46,7 +46,9 @@ import javax.jms.TopicSubscriber; import com.raytheon.uf.common.jms.JmsPooledSession; /** - * TODO Add Description + * Wrapper class for jms session pooling. Tracks wrapped consumers/producers + * created from this wrapped session to know when the session can be returned to + * the pool. * *
  * 
@@ -55,7 +57,7 @@ import com.raytheon.uf.common.jms.JmsPooledSession;
  * Date         Ticket#    Engineer    Description
  * ------------ ---------- ----------- --------------------------
  * Nov 30, 2011            rjpeter     Initial creation
- * 
+ * Feb 26, 2013 1642       rjpeter     Added volatile references for better concurrency handling.
  * 
* * @author rjpeter @@ -63,20 +65,18 @@ import com.raytheon.uf.common.jms.JmsPooledSession; */ public class JmsSessionWrapper implements Session { - private JmsPooledSession mgr = null; + private final JmsPooledSession mgr; - private boolean closed = false; + private volatile boolean closed = false; - private boolean exceptionOccurred = false; + private volatile boolean exceptionOccurred = false; - private Throwable trappedExc = null; + private final List producers = new ArrayList( + 1);; - private List producers = null; + private final List consumers = new ArrayList( + 1); - private List consumers = null; - - // TODO: needs to track the wrappers opened by this wrapped session so when - // wrapped session is closed, all underlying wrappers are closed. public JmsSessionWrapper(JmsPooledSession mgr) { this.mgr = mgr; } @@ -87,32 +87,24 @@ public class JmsSessionWrapper implements Session { * * @return True if this wrapper hasn't been closed before, false otherwise. */ - public boolean closeInternal() { + public boolean closeWrapper() { synchronized (this) { if (!closed) { closed = true; - if (consumers != null) { - for (JmsConsumerWrapper consumer : consumers) { - try { - consumer.close(); - } catch (JMSException e) { + for (JmsConsumerWrapper consumer : consumers) { + try { + consumer.close(); + } catch (JMSException e) { - } } - - consumers = null; } - if (producers != null) { - for (JmsProducerWrapper producer : producers) { - try { - producer.close(); - } catch (JMSException e) { + for (JmsProducerWrapper producer : producers) { + try { + producer.close(); + } catch (JMSException e) { - } } - - producers = null; } if (exceptionOccurred) { @@ -132,7 +124,7 @@ public class JmsSessionWrapper implements Session { */ @Override public void close() throws JMSException { - if (closeInternal()) { + if (closeWrapper()) { // remove this wrapper from the manager mgr.removeReference(this); @@ -164,7 +156,6 @@ public class JmsSessionWrapper implements Session { sess.commit(); } catch (Throwable e) { exceptionOccurred = true; - trappedExc = e; JMSException exc = new JMSException( "Exception occurred on pooled session"); exc.initCause(e); @@ -185,7 +176,6 @@ public class JmsSessionWrapper implements Session { return sess.createBrowser(queue); } catch (Throwable e) { exceptionOccurred = true; - trappedExc = e; JMSException exc = new JMSException( "Exception occurred on pooled session"); exc.initCause(e); @@ -207,7 +197,6 @@ public class JmsSessionWrapper implements Session { return sess.createBrowser(queue, messageSelector); } catch (Throwable e) { exceptionOccurred = true; - trappedExc = e; JMSException exc = new JMSException( "Exception occurred on pooled session"); exc.initCause(e); @@ -228,7 +217,6 @@ public class JmsSessionWrapper implements Session { return sess.createBytesMessage(); } catch (Throwable e) { exceptionOccurred = true; - trappedExc = e; JMSException exc = new JMSException( "Exception occurred on pooled session"); exc.initCause(e); @@ -256,20 +244,24 @@ public class JmsSessionWrapper implements Session { @Override public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { - JmsConsumerWrapper consumer = mgr.getConsumer(destination, - messageSelector); + try { + JmsConsumerWrapper consumer = mgr.getConsumer(destination, + messageSelector); - if (consumer != null) { - if (consumers == null) { - consumers = new ArrayList(1); + if (consumer != null) { + consumers.add(consumer); + } else { + throw new IllegalStateException("Underlying consumer is closed"); } - consumers.add(consumer); - } else { - throw new IllegalStateException("Underlying consumer is closed"); + return consumer; + } catch (Throwable e) { + exceptionOccurred = true; + JMSException exc = new JMSException( + "Exception occurred on pooled session"); + exc.initCause(e); + throw exc; } - - return consumer; } /* @@ -290,7 +282,6 @@ public class JmsSessionWrapper implements Session { noLocal); } catch (Throwable e) { exceptionOccurred = true; - trappedExc = e; JMSException exc = new JMSException( "Exception occurred on pooled session"); exc.initCause(e); @@ -317,7 +308,6 @@ public class JmsSessionWrapper implements Session { return sess.createDurableSubscriber(topic, name); } catch (Throwable e) { exceptionOccurred = true; - trappedExc = e; JMSException exc = new JMSException( "Exception occurred on pooled session"); exc.initCause(e); @@ -342,7 +332,6 @@ public class JmsSessionWrapper implements Session { noLocal); } catch (Throwable e) { exceptionOccurred = true; - trappedExc = e; JMSException exc = new JMSException( "Exception occurred on pooled session"); exc.initCause(e); @@ -363,7 +352,6 @@ public class JmsSessionWrapper implements Session { return sess.createMapMessage(); } catch (Throwable e) { exceptionOccurred = true; - trappedExc = e; JMSException exc = new JMSException( "Exception occurred on pooled session"); exc.initCause(e); @@ -384,7 +372,6 @@ public class JmsSessionWrapper implements Session { return sess.createMessage(); } catch (Throwable e) { exceptionOccurred = true; - trappedExc = e; JMSException exc = new JMSException( "Exception occurred on pooled session"); exc.initCause(e); @@ -405,7 +392,6 @@ public class JmsSessionWrapper implements Session { return sess.createObjectMessage(); } catch (Throwable e) { exceptionOccurred = true; - trappedExc = e; JMSException exc = new JMSException( "Exception occurred on pooled session"); exc.initCause(e); @@ -427,7 +413,6 @@ public class JmsSessionWrapper implements Session { return sess.createObjectMessage(obj); } catch (Throwable e) { exceptionOccurred = true; - trappedExc = e; JMSException exc = new JMSException( "Exception occurred on pooled session"); exc.initCause(e); @@ -443,19 +428,23 @@ public class JmsSessionWrapper implements Session { @Override public MessageProducer createProducer(Destination destination) throws JMSException { - JmsProducerWrapper producer = mgr.getProducer(destination); + try { + JmsProducerWrapper producer = mgr.getProducer(destination); - if (producer != null) { - if (producers == null) { - producers = new ArrayList(1); + if (producer != null) { + producers.add(producer); + } else { + throw new IllegalStateException("Underlying producer is closed"); } - producers.add(producer); - } else { - throw new IllegalStateException("Underlying producer is closed"); + return producer; + } catch (Throwable e) { + exceptionOccurred = true; + JMSException exc = new JMSException( + "Exception occurred on pooled session"); + exc.initCause(e); + throw exc; } - - return producer; } /* @@ -471,7 +460,6 @@ public class JmsSessionWrapper implements Session { return sess.createQueue(queueName); } catch (Throwable e) { exceptionOccurred = true; - trappedExc = e; JMSException exc = new JMSException( "Exception occurred on pooled session"); exc.initCause(e); @@ -492,7 +480,6 @@ public class JmsSessionWrapper implements Session { return sess.createStreamMessage(); } catch (Throwable e) { exceptionOccurred = true; - trappedExc = e; JMSException exc = new JMSException( "Exception occurred on pooled session"); exc.initCause(e); @@ -513,7 +500,6 @@ public class JmsSessionWrapper implements Session { return sess.createTemporaryQueue(); } catch (Throwable e) { exceptionOccurred = true; - trappedExc = e; JMSException exc = new JMSException( "Exception occurred on pooled session"); exc.initCause(e); @@ -534,7 +520,6 @@ public class JmsSessionWrapper implements Session { return sess.createTemporaryTopic(); } catch (Throwable e) { exceptionOccurred = true; - trappedExc = e; JMSException exc = new JMSException( "Exception occurred on pooled session"); exc.initCause(e); @@ -555,7 +540,6 @@ public class JmsSessionWrapper implements Session { return sess.createTextMessage(); } catch (Throwable e) { exceptionOccurred = true; - trappedExc = e; JMSException exc = new JMSException( "Exception occurred on pooled session"); exc.initCause(e); @@ -576,7 +560,6 @@ public class JmsSessionWrapper implements Session { return sess.createTextMessage(text); } catch (Throwable e) { exceptionOccurred = true; - trappedExc = e; JMSException exc = new JMSException( "Exception occurred on pooled session"); exc.initCause(e); @@ -597,7 +580,6 @@ public class JmsSessionWrapper implements Session { return sess.createTopic(topicName); } catch (Throwable e) { exceptionOccurred = true; - trappedExc = e; JMSException exc = new JMSException( "Exception occurred on pooled session"); exc.initCause(e); @@ -618,7 +600,6 @@ public class JmsSessionWrapper implements Session { return sess.getAcknowledgeMode(); } catch (Throwable e) { exceptionOccurred = true; - trappedExc = e; JMSException exc = new JMSException( "Exception occurred on pooled session"); exc.initCause(e); @@ -639,7 +620,6 @@ public class JmsSessionWrapper implements Session { return sess.getMessageListener(); } catch (Throwable e) { exceptionOccurred = true; - trappedExc = e; JMSException exc = new JMSException( "Exception occurred on pooled session"); exc.initCause(e); @@ -660,7 +640,6 @@ public class JmsSessionWrapper implements Session { return sess.getTransacted(); } catch (Throwable e) { exceptionOccurred = true; - trappedExc = e; JMSException exc = new JMSException( "Exception occurred on pooled session"); exc.initCause(e); @@ -681,7 +660,6 @@ public class JmsSessionWrapper implements Session { sess.recover(); } catch (Throwable e) { exceptionOccurred = true; - trappedExc = e; JMSException exc = new JMSException( "Exception occurred on pooled session"); exc.initCause(e); @@ -702,7 +680,6 @@ public class JmsSessionWrapper implements Session { sess.rollback(); } catch (Throwable e) { exceptionOccurred = true; - trappedExc = e; JMSException exc = new JMSException( "Exception occurred on pooled session"); exc.initCause(e); @@ -722,7 +699,6 @@ public class JmsSessionWrapper implements Session { sess.run(); } catch (Throwable e) { exceptionOccurred = true; - trappedExc = e; RuntimeException exc = new RuntimeException( "Exception occurred on pooled session", e); throw exc; @@ -743,7 +719,6 @@ public class JmsSessionWrapper implements Session { sess.setMessageListener(listener); } catch (Throwable e) { exceptionOccurred = true; - trappedExc = e; JMSException exc = new JMSException( "Exception occurred on pooled session"); exc.initCause(e); @@ -764,7 +739,6 @@ public class JmsSessionWrapper implements Session { sess.unsubscribe(name); } catch (Throwable e) { exceptionOccurred = true; - trappedExc = e; JMSException exc = new JMSException( "Exception occurred on pooled session"); exc.initCause(e);