From 681f174d88b5662a21e47537eeefd4a9d8dda2dc Mon Sep 17 00:00:00 2001 From: Richard Peter Date: Fri, 4 Oct 2013 14:22:40 -0500 Subject: [PATCH] Issue #2357: Remove edex jms resource reuse by other threads. Change-Id: I73f5a01f2d943bb9f7f4e51028cc16bfcee3d823 Former-commit-id: d68188c3c23e0a0d2e9d5087341894b1f58b6331 [formerly c146f90a09dd0967b5f4e7a72f84556cead72969] [formerly be99174b65dd35bbaf40c60af2eed890e8c5e7a1] [formerly 802ab87e57ab871108fa8255e27966ad28a27483 [formerly be99174b65dd35bbaf40c60af2eed890e8c5e7a1 [formerly a13a027b07a8d080c4506748afe5882575ae6532]]] Former-commit-id: 802ab87e57ab871108fa8255e27966ad28a27483 Former-commit-id: a391c7d1d91621bfca9c5deeb80e3d2239e0295a [formerly 5320536f54dd30f2e1bfaeab0e8aea6ea7e3222c] Former-commit-id: 7dc61aceeee3558323d542f1692c7e72456f4c0b --- edexOsgi/build.edex/esb/conf/spring/edex.xml | 9 +- .../jms/JmsPooledConnectionFactory.java | 194 +++++------------- 2 files changed, 51 insertions(+), 152 deletions(-) diff --git a/edexOsgi/build.edex/esb/conf/spring/edex.xml b/edexOsgi/build.edex/esb/conf/spring/edex.xml index 390e9aec2b..96d436518b 100644 --- a/edexOsgi/build.edex/esb/conf/spring/edex.xml +++ b/edexOsgi/build.edex/esb/conf/spring/edex.xml @@ -14,18 +14,15 @@ - + - - - - - + + * @@ -48,7 +56,8 @@ import com.raytheon.uf.common.status.UFStatus.Priority; * Date Ticket# Engineer Description * ------------ ---------- ----------- -------------------------- * Apr 15, 2011 rjpeter Initial creation - * + * Oct 04, 2013 2357 rjpeter Removed pooling, keeps resources open for the + * thread that created them for a configured amount of time * * * @author rjpeter @@ -69,20 +78,12 @@ public class JmsPooledConnectionFactory implements ConnectionFactory { // connections that were recently returned, key is "threadId-threadName" private final Map> pendingConnections = new HashMap>(); - // connections that have been released from pendingConnections and are - // awaiting being closed. - private final Deque> availableConnections = new LinkedList>(); - private final ConcurrentLinkedQueue deadConnections = new ConcurrentLinkedQueue(); private int reconnectInterval = 30000; - private int connectionHoldTime = 120000; - private int resourceRetention = 180000; - private int maxSpareConnections = 10; - public JmsPooledConnectionFactory(ConnectionFactory factory) { this.connFactory = factory; } @@ -137,33 +138,6 @@ public class JmsPooledConnectionFactory implements ConnectionFactory { } } - // check available connections - boolean keepChecking = true; - - while (keepChecking) { - synchronized (availableConnections) { - wrapper = availableConnections.poll(); - } - - if (wrapper != null) { - conn = wrapper.getPooledObject(); - } else { - keepChecking = false; - } - - if (conn != null) { - // was retrieved connection valid - JmsConnectionWrapper ref = getConnectionWrapper(threadKey, conn); - - if (ref != null) { - return ref; - } else { - deadConnections.add(conn); - conn = null; - } - } - } - // create new connection? if (conn == null) { conn = new JmsPooledConnection(this); @@ -272,11 +246,6 @@ public class JmsPooledConnectionFactory implements ConnectionFactory { } } } - - // remove it from availableConnections - synchronized (availableConnections) { - availableConnections.remove(conn); - } } public boolean returnConnectionToPool(JmsPooledConnection conn) { @@ -314,16 +283,13 @@ public class JmsPooledConnectionFactory implements ConnectionFactory { new AvailableJmsPooledObject( conn)); } - if (prev != null) { + if ((prev != null) && (prev.getPooledObject() != conn)) { // there was a previous connection registered to - // this thread, move it to available + // this thread, close it statusHandler .handle(Priority.WARN, - "Another connection already pooled for this thread, moving previous connection to available"); - prev.reset(); - synchronized (availableConnections) { - availableConnections.add(prev); - } + "Another connection already pooled for this thread, closing previous connection"); + deadConnections.add(prev.getPooledObject()); } } else { success = false; @@ -333,11 +299,11 @@ public class JmsPooledConnectionFactory implements ConnectionFactory { } public void checkPooledResources() { - long curTime = System.currentTimeMillis(); - List> connectionsToProcess = new LinkedList>(); int connectionsClosed = 0; - // grab connections to move from pending to available + long curTime = System.currentTimeMillis(); + + // check for connections to close synchronized (pendingConnections) { Iterator> iter = pendingConnections .values().iterator(); @@ -346,44 +312,9 @@ public class JmsPooledConnectionFactory implements ConnectionFactory { .next(); if (wrapper.expired(curTime, resourceRetention)) { iter.remove(); - connectionsToProcess.add(wrapper); - } - } - } - - synchronized (availableConnections) { - for (AvailableJmsPooledObject wrapper : connectionsToProcess) { - wrapper.reset(); - - // putting to available pool - JmsPooledConnection conn = wrapper.getPooledObject(); - conn.setKey(null); - availableConnections.add(wrapper); - } - } - - connectionsToProcess.clear(); - - synchronized (availableConnections) { - Iterator> iter = availableConnections - .iterator(); - while (iter.hasNext()) { - AvailableJmsPooledObject wrapper = iter - .next(); - // available sessions added based on time, so oldest is front of - // queue - if (wrapper.expired(curTime, connectionHoldTime) - || (availableConnections.size() > maxSpareConnections)) { - // not immediately closing connection so that we minimize - // time in sync block deadConnections.add(wrapper.getPooledObject()); - iter.remove(); - } else { - // connections ordered in reverse order - break; } } - } while (!deadConnections.isEmpty()) { @@ -394,29 +325,29 @@ public class JmsPooledConnectionFactory implements ConnectionFactory { } } - List connectionsToCheck = null; + ArrayList connectionsToCheck = null; synchronized (inUseConnections) { connectionsToCheck = new ArrayList( inUseConnections.values()); } + int resourcesClosed = 0; for (JmsPooledConnection conn : connectionsToCheck) { resourcesClosed += conn.closeOldPooledResources(resourceRetention); } + connectionsToCheck.clear(); // close pooled resources on pending connections synchronized (pendingConnections) { - connectionsToProcess = new ArrayList>( - pendingConnections.values()); + connectionsToCheck.ensureCapacity(pendingConnections.size()); + for (AvailableJmsPooledObject wrapper : pendingConnections + .values()) { + connectionsToCheck.add(wrapper.getPooledObject()); + } } - synchronized (availableConnections) { - connectionsToProcess.addAll(availableConnections); - } - - for (AvailableJmsPooledObject wrapper : connectionsToProcess) { - resourcesClosed += wrapper.getPooledObject() - .closeOldPooledResources(resourceRetention); + for (JmsPooledConnection conn : connectionsToCheck) { + resourcesClosed += conn.closeOldPooledResources(resourceRetention); } if ((connectionsClosed > 0) || (resourcesClosed > 0)) { @@ -429,33 +360,4 @@ public class JmsPooledConnectionFactory implements ConnectionFactory { } } - /** - * @return the connectionHoldTime - */ - public int getConnectionHoldTime() { - return connectionHoldTime; - } - - /** - * @param connectionHoldTime - * the connectionHoldTime to set - */ - public void setConnectionHoldTime(int connectionHoldTime) { - this.connectionHoldTime = connectionHoldTime; - } - - /** - * @return the maxSpareConnections - */ - public int getMaxSpareConnections() { - return maxSpareConnections; - } - - /** - * @param maxSpareConnections - * the maxSpareConnections to set - */ - public void setMaxSpareConnections(int maxSpareConnections) { - this.maxSpareConnections = maxSpareConnections; - } }