Issue #2357: Remove edex jms resource reuse by other threads.

Change-Id: I73f5a01f2d943bb9f7f4e51028cc16bfcee3d823

Former-commit-id: d68188c3c2 [formerly c146f90a09] [formerly be99174b65] [formerly 802ab87e57 [formerly be99174b65 [formerly a13a027b07a8d080c4506748afe5882575ae6532]]]
Former-commit-id: 802ab87e57
Former-commit-id: a391c7d1d91621bfca9c5deeb80e3d2239e0295a [formerly 5320536f54]
Former-commit-id: 7dc61aceee
This commit is contained in:
Richard Peter 2013-10-04 14:22:40 -05:00
parent 4cd2815705
commit 681f174d88
2 changed files with 51 additions and 152 deletions

View file

@ -14,18 +14,15 @@
<!-- specify the connection to the broker (qpid) -->
<!-- MaxPrefetch set at 0, due to DataPool routers getting messages backed up behind long running tasks -->
<bean id="amqConnectionFactory" class="org.apache.qpid.client.AMQConnectionFactory">
<constructor-arg type="java.lang.String" value="amqp://guest:guest@/edex?brokerlist='tcp://${BROKER_ADDR}?retries='9999'&amp;connecttimeout='5000'&amp;connectdelay='5000''&amp;maxprefetch='0'&amp;sync_publish='all'&amp;sync_ack='true'"/>
<constructor-arg type="java.lang.String" value="amqp://guest:guest@/edex?brokerlist='tcp://${BROKER_ADDR}?retries='9999'&amp;heartbeat='0'&amp;connecttimeout='5000'&amp;connectdelay='5000''&amp;maxprefetch='0'&amp;sync_publish='all'&amp;sync_ack='true'"/>
</bean>
<bean id="jmsPooledConnectionFactory" class="com.raytheon.uf.common.jms.JmsPooledConnectionFactory">
<constructor-arg ref="amqConnectionFactory"/>
<property name="provider" value="QPID"/>
<property name="reconnectInterval" value="5000"/>
<!-- After connection has been closed by thread keep it allocated for another 90 seconds in case thread needs it again -->
<property name="connectionHoldTime" value="90000"/>
<!-- Any resource that has been available in the pool for more than 1 minute will be closed -->
<property name="resourceRetention" value="60000"/>
<property name="maxSpareConnections" value="10"/>
<!-- After resource has been closed by thread keep it allocated for another 2 minutes in case thread needs it again -->
<property name="resourceRetention" value="120000"/>
</bean>
<bean id="genericThreadPool"

View file

@ -1,23 +1,3 @@
package com.raytheon.uf.common.jms;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import com.raytheon.uf.common.jms.wrapper.JmsConnectionWrapper;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
@ -37,9 +17,37 @@ import com.raytheon.uf.common.status.UFStatus.Priority;
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.common.jms;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import com.raytheon.uf.common.jms.wrapper.JmsConnectionWrapper;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
/**
* TODO Add Description
* Connection Factory that keep underlying JMS resources used by a thread open
* for re-use by that same thread. This is to get around Spring's opening and
* closing the full jms stack for each message. We cannot use Spring caching
* mechanism since it requires one connection object to be used for all jms
* sessions. In that scenario one error causes every jms resource to disconnect
* and has been known to dead lock in the qpid code.
*
* The close action puts the resource into a pool for reuse. The jms resource
* may only be reused by the same thread. This is in part since each thread
* always connects to the same set of jms resources. Also on the QPID broker
* transient data is only removed when the session itself is closed. So reusing
* a resource on a different thread can cause transient topic resources with no
* consumers.
*
* <pre>
*
@ -48,7 +56,8 @@ import com.raytheon.uf.common.status.UFStatus.Priority;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Apr 15, 2011 rjpeter Initial creation
*
* Oct 04, 2013 2357 rjpeter Removed pooling, keeps resources open for the
* thread that created them for a configured amount of time
* </pre>
*
* @author rjpeter
@ -69,20 +78,12 @@ public class JmsPooledConnectionFactory implements ConnectionFactory {
// connections that were recently returned, key is "threadId-threadName"
private final Map<String, AvailableJmsPooledObject<JmsPooledConnection>> pendingConnections = new HashMap<String, AvailableJmsPooledObject<JmsPooledConnection>>();
// connections that have been released from pendingConnections and are
// awaiting being closed.
private final Deque<AvailableJmsPooledObject<JmsPooledConnection>> availableConnections = new LinkedList<AvailableJmsPooledObject<JmsPooledConnection>>();
private final ConcurrentLinkedQueue<JmsPooledConnection> deadConnections = new ConcurrentLinkedQueue<JmsPooledConnection>();
private int reconnectInterval = 30000;
private int connectionHoldTime = 120000;
private int resourceRetention = 180000;
private int maxSpareConnections = 10;
public JmsPooledConnectionFactory(ConnectionFactory factory) {
this.connFactory = factory;
}
@ -137,33 +138,6 @@ public class JmsPooledConnectionFactory implements ConnectionFactory {
}
}
// check available connections
boolean keepChecking = true;
while (keepChecking) {
synchronized (availableConnections) {
wrapper = availableConnections.poll();
}
if (wrapper != null) {
conn = wrapper.getPooledObject();
} else {
keepChecking = false;
}
if (conn != null) {
// was retrieved connection valid
JmsConnectionWrapper ref = getConnectionWrapper(threadKey, conn);
if (ref != null) {
return ref;
} else {
deadConnections.add(conn);
conn = null;
}
}
}
// create new connection?
if (conn == null) {
conn = new JmsPooledConnection(this);
@ -272,11 +246,6 @@ public class JmsPooledConnectionFactory implements ConnectionFactory {
}
}
}
// remove it from availableConnections
synchronized (availableConnections) {
availableConnections.remove(conn);
}
}
public boolean returnConnectionToPool(JmsPooledConnection conn) {
@ -314,16 +283,13 @@ public class JmsPooledConnectionFactory implements ConnectionFactory {
new AvailableJmsPooledObject<JmsPooledConnection>(
conn));
}
if (prev != null) {
if ((prev != null) && (prev.getPooledObject() != conn)) {
// there was a previous connection registered to
// this thread, move it to available
// this thread, close it
statusHandler
.handle(Priority.WARN,
"Another connection already pooled for this thread, moving previous connection to available");
prev.reset();
synchronized (availableConnections) {
availableConnections.add(prev);
}
"Another connection already pooled for this thread, closing previous connection");
deadConnections.add(prev.getPooledObject());
}
} else {
success = false;
@ -333,11 +299,11 @@ public class JmsPooledConnectionFactory implements ConnectionFactory {
}
public void checkPooledResources() {
long curTime = System.currentTimeMillis();
List<AvailableJmsPooledObject<JmsPooledConnection>> connectionsToProcess = new LinkedList<AvailableJmsPooledObject<JmsPooledConnection>>();
int connectionsClosed = 0;
// grab connections to move from pending to available
long curTime = System.currentTimeMillis();
// check for connections to close
synchronized (pendingConnections) {
Iterator<AvailableJmsPooledObject<JmsPooledConnection>> iter = pendingConnections
.values().iterator();
@ -346,44 +312,9 @@ public class JmsPooledConnectionFactory implements ConnectionFactory {
.next();
if (wrapper.expired(curTime, resourceRetention)) {
iter.remove();
connectionsToProcess.add(wrapper);
}
}
}
synchronized (availableConnections) {
for (AvailableJmsPooledObject<JmsPooledConnection> wrapper : connectionsToProcess) {
wrapper.reset();
// putting to available pool
JmsPooledConnection conn = wrapper.getPooledObject();
conn.setKey(null);
availableConnections.add(wrapper);
}
}
connectionsToProcess.clear();
synchronized (availableConnections) {
Iterator<AvailableJmsPooledObject<JmsPooledConnection>> iter = availableConnections
.iterator();
while (iter.hasNext()) {
AvailableJmsPooledObject<JmsPooledConnection> wrapper = iter
.next();
// available sessions added based on time, so oldest is front of
// queue
if (wrapper.expired(curTime, connectionHoldTime)
|| (availableConnections.size() > maxSpareConnections)) {
// not immediately closing connection so that we minimize
// time in sync block
deadConnections.add(wrapper.getPooledObject());
iter.remove();
} else {
// connections ordered in reverse order
break;
}
}
}
while (!deadConnections.isEmpty()) {
@ -394,29 +325,29 @@ public class JmsPooledConnectionFactory implements ConnectionFactory {
}
}
List<JmsPooledConnection> connectionsToCheck = null;
ArrayList<JmsPooledConnection> connectionsToCheck = null;
synchronized (inUseConnections) {
connectionsToCheck = new ArrayList<JmsPooledConnection>(
inUseConnections.values());
}
int resourcesClosed = 0;
for (JmsPooledConnection conn : connectionsToCheck) {
resourcesClosed += conn.closeOldPooledResources(resourceRetention);
}
connectionsToCheck.clear();
// close pooled resources on pending connections
synchronized (pendingConnections) {
connectionsToProcess = new ArrayList<AvailableJmsPooledObject<JmsPooledConnection>>(
pendingConnections.values());
connectionsToCheck.ensureCapacity(pendingConnections.size());
for (AvailableJmsPooledObject<JmsPooledConnection> wrapper : pendingConnections
.values()) {
connectionsToCheck.add(wrapper.getPooledObject());
}
}
synchronized (availableConnections) {
connectionsToProcess.addAll(availableConnections);
}
for (AvailableJmsPooledObject<JmsPooledConnection> wrapper : connectionsToProcess) {
resourcesClosed += wrapper.getPooledObject()
.closeOldPooledResources(resourceRetention);
for (JmsPooledConnection conn : connectionsToCheck) {
resourcesClosed += conn.closeOldPooledResources(resourceRetention);
}
if ((connectionsClosed > 0) || (resourcesClosed > 0)) {
@ -429,33 +360,4 @@ public class JmsPooledConnectionFactory implements ConnectionFactory {
}
}
/**
* @return the connectionHoldTime
*/
public int getConnectionHoldTime() {
return connectionHoldTime;
}
/**
* @param connectionHoldTime
* the connectionHoldTime to set
*/
public void setConnectionHoldTime(int connectionHoldTime) {
this.connectionHoldTime = connectionHoldTime;
}
/**
* @return the maxSpareConnections
*/
public int getMaxSpareConnections() {
return maxSpareConnections;
}
/**
* @param maxSpareConnections
* the maxSpareConnections to set
*/
public void setMaxSpareConnections(int maxSpareConnections) {
this.maxSpareConnections = maxSpareConnections;
}
}