diff --git a/cave/com.raytheon.uf.viz.core/src/com/raytheon/uf/viz/core/notification/jobs/NotificationManagerJob.java b/cave/com.raytheon.uf.viz.core/src/com/raytheon/uf/viz/core/notification/jobs/NotificationManagerJob.java index 09e372328f..e56ef77af2 100644 --- a/cave/com.raytheon.uf.viz.core/src/com/raytheon/uf/viz/core/notification/jobs/NotificationManagerJob.java +++ b/cave/com.raytheon.uf.viz.core/src/com/raytheon/uf/viz/core/notification/jobs/NotificationManagerJob.java @@ -70,7 +70,8 @@ import com.raytheon.uf.viz.core.notification.NotificationMessage; * 05/08/08 1127 randerso Initial Creation * 09/03/08 1448 chammack Refactored notification observer interface * 04/23/13 1939 randerso Add separate connect method to allow application - * to complete initialization before connecting to JMS + * to complete initialization before connecting to JMS. + * 10/15/2013 2389 rjpeter Updated synchronization to remove session leaks. * * * @author randerso @@ -107,9 +108,10 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable { public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + result = (prime * result) + ((queryString == null) ? 0 : queryString.hashCode()); - result = prime * result + ((topic == null) ? 0 : topic.hashCode()); + result = (prime * result) + + ((topic == null) ? 0 : topic.hashCode()); return result; } @@ -156,11 +158,11 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable { private static NotificationManagerJob instance; /** The observer map of topic to listeners */ - protected Map listeners; + protected final Map listeners; private Connection connection; - private boolean connected = false; + private volatile boolean connected = false; /** * Get the active subscription manager job. If one does not exist, start an @@ -191,18 +193,29 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable { protected void connect(boolean notifyError) { boolean successful = true; - try { - ConnectionFactory connectionFactory = JMSConnection.getInstance() - .getFactory(); - disconnect(notifyError); + synchronized (this) { + try { + ConnectionFactory connectionFactory = JMSConnection + .getInstance().getFactory(); + disconnect(notifyError); - // Create a Connection - connection = connectionFactory.createConnection(); - connection.setExceptionListener(this); - // connection.setClientID(VizApp.getWsId().toString()); + // Create a Connection + connection = connectionFactory.createConnection(); + connection.setExceptionListener(this); + // connection.setClientID(VizApp.getWsId().toString()); - connection.start(); - connected = true; + connection.start(); + connected = true; + } catch (JMSException e) { + if (notifyError) { + statusHandler.handle(Priority.SIGNIFICANT, + "NotificationManager failed to connect.", e); + } + successful = false; + } + } + + synchronized (listeners) { for (NotificationListener listener : listeners.values()) { try { listener.setupConnection(this); @@ -216,12 +229,6 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable { } } } - } catch (JMSException e) { - if (notifyError) { - statusHandler.handle(Priority.SIGNIFICANT, - "NotificationManager failed to connect.", e); - } - successful = false; } if (!successful) { @@ -229,7 +236,22 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable { } } - protected void disconnect(boolean notifyError) { + /** + * Creates a new AUTO_ACKNOWLEDGE session if connected to JMS, otherwise + * returns null. + * + * @return + * @throws JMSException + */ + protected synchronized Session createSession() throws JMSException { + if (connected) { + return connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } else { + return null; + } + } + + protected synchronized void disconnect(boolean notifyError) { if (connection != null) { try { connection.stop(); @@ -271,9 +293,11 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable { new Timer().schedule(task, 5 * 1000); } - // Reset connected bool - for (NotificationListener listener : listeners.values()) { - listener.connected = false; + synchronized (listeners) { + // disconnect listeners + for (NotificationListener listener : listeners.values()) { + listener.disconnect(); + } } if (e != null) { @@ -282,33 +306,38 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable { } } - public static synchronized void addQueueObserver(String queue, - INotificationObserver obs) { + public static void addQueueObserver(String queue, INotificationObserver obs) { addQueueObserver(queue, obs, null); } - public static synchronized void addQueueObserver(String queue, + public static void addQueueObserver(String queue, INotificationObserver obs, String queryString) { NotificationManagerJob notifMgr = getInstance(); ListenerKey key = new ListenerKey(queue, queryString); - NotificationListener listener = notifMgr.listeners.get(key); - if (listener == null || listener.consumer == null) { - try { - listener = new NotificationListener(queue, queryString, - Type.QUEUE); - notifMgr.listeners.put(key, listener); - listener.addObserver(obs); - if (notifMgr.connected) { - listener.setupConnection(notifMgr); + + synchronized (notifMgr.listeners) { + NotificationListener listener = notifMgr.listeners.get(key); + if (listener == null) { + try { + listener = new NotificationListener(queue, queryString, + Type.QUEUE); + notifMgr.listeners.put(key, listener); + listener.addObserver(obs); + if (notifMgr.connected) { + listener.setupConnection(notifMgr); + } + } catch (JMSException e) { + Status s = new Status( + IStatus.ERROR, + Activator.PLUGIN_ID, + 0, + "NotificationManager failed to create queue consumer.", + e); + StatusManager.getManager().handle(s); } - } catch (JMSException e) { - Status s = new Status(IStatus.ERROR, Activator.PLUGIN_ID, 0, - "NotificationManager failed to create queue consumer.", - e); - StatusManager.getManager().handle(s); + } else { + listener.addObserver(obs); } - } else { - listener.addObserver(obs); } } @@ -320,32 +349,35 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable { * @param obs * the alert observer callback */ - public static synchronized void addObserver(String topic, - INotificationObserver obs) { + public static void addObserver(String topic, INotificationObserver obs) { addObserver(topic, obs, null); } - public static synchronized void addObserver(String topic, - INotificationObserver obs, String queryString) { + public static void addObserver(String topic, INotificationObserver obs, + String queryString) { NotificationManagerJob notifMgr = getInstance(); ListenerKey key = new ListenerKey(topic, queryString); - NotificationListener listener = notifMgr.listeners.get(key); - if (listener == null) { - try { - listener = new NotificationListener(topic, queryString, - Type.TOPIC); - notifMgr.listeners.put(key, listener); - listener.addObserver(obs); - if (notifMgr.connected) { - listener.setupConnection(notifMgr); + + synchronized (notifMgr.listeners) { + NotificationListener listener = notifMgr.listeners.get(key); + if (listener == null) { + try { + listener = new NotificationListener(topic, queryString, + Type.TOPIC); + notifMgr.listeners.put(key, listener); + listener.addObserver(obs); + if (notifMgr.connected) { + listener.setupConnection(notifMgr); + } + } catch (JMSException e) { + Status s = new Status(IStatus.ERROR, Activator.PLUGIN_ID, + 0, + "NotificationManager failed to create consumer.", e); + StatusManager.getManager().handle(s); } - } catch (JMSException e) { - Status s = new Status(IStatus.ERROR, Activator.PLUGIN_ID, 0, - "NotificationManager failed to create consumer.", e); - StatusManager.getManager().handle(s); + } else { + listener.addObserver(obs); } - } else { - listener.addObserver(obs); } } @@ -358,8 +390,7 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable { * @param obs * the observer to remove */ - public static synchronized void removeObserver(String topic, - INotificationObserver obs) { + public static void removeObserver(String topic, INotificationObserver obs) { removeObserver(topic, obs, null); } @@ -373,18 +404,20 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable { * the observer to remove * @param queryString */ - public static synchronized void removeObserver(String topic, - INotificationObserver obs, String queryString) { + public static void removeObserver(String topic, INotificationObserver obs, + String queryString) { NotificationManagerJob notifMgr = getInstance(); ListenerKey key = new ListenerKey(topic, queryString); - NotificationListener listener = notifMgr.listeners.get(key); - if (listener == null) { - return; - } - listener.removeObserver(obs); - if (listener.size() <= 0) { - listener.disconnect(); - notifMgr.listeners.remove(key); + synchronized (notifMgr.listeners) { + NotificationListener listener = notifMgr.listeners.get(key); + if (listener == null) { + return; + } + listener.removeObserver(obs); + if (listener.size() <= 0) { + listener.disconnect(); + notifMgr.listeners.remove(key); + } } } @@ -397,18 +430,20 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable { * @param obs * the observer to remove */ - public static synchronized void removeQueueObserver(String queue, - String queryString, INotificationObserver obs) { + public static void removeQueueObserver(String queue, String queryString, + INotificationObserver obs) { NotificationManagerJob notifMgr = getInstance(); ListenerKey key = new ListenerKey(queue, queryString); - NotificationListener listener = notifMgr.listeners.get(key); - if (listener == null) { - return; - } - listener.removeObserver(obs); - if (listener.size() <= 0) { - listener.disconnect(); - notifMgr.listeners.remove(key); + synchronized (notifMgr.listeners) { + NotificationListener listener = notifMgr.listeners.get(key); + if (listener == null) { + return; + } + listener.removeObserver(obs); + if (listener.size() <= 0) { + listener.disconnect(); + notifMgr.listeners.remove(key); + } } } @@ -428,24 +463,22 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable { private static class NotificationListener implements MessageListener { - private Type type; + private final Type type; - private String id; + private final String id; - private String queryString; + private final String queryString; /** The list of interested parties */ - protected List observers; + protected final List observers; /** The map of job threads from observers */ - protected Map jobWrappers; + protected final Map jobWrappers; protected MessageConsumer consumer; protected Session session; - protected boolean connected = false; - public NotificationListener(String id, String queryString, Type type) { this.observers = new ArrayList(); this.jobWrappers = new HashMap(); @@ -466,40 +499,34 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable { break; } } - // If we made it here we are good - connected = true; } - public void disconnect() { - if (connected) { - if (consumer != null) { - try { - consumer.close(); - } catch (JMSException e) { - statusHandler.handle(Priority.PROBLEM, - "Error closing consumer connection", e); - } - consumer = null; + public synchronized void disconnect() { + if (consumer != null) { + try { + consumer.close(); + } catch (JMSException e) { + statusHandler.handle(Priority.PROBLEM, + "Error closing consumer connection", e); } + consumer = null; + } - if (session != null) { - try { - session.close(); - } catch (JMSException e) { - statusHandler.handle(Priority.PROBLEM, - "Error closing session", e); - } - session = null; + if (session != null) { + try { + session.close(); + } catch (JMSException e) { + statusHandler.handle(Priority.PROBLEM, + "Error closing session", e); } - connected = false; + session = null; } } - private void setupQueue(NotificationManagerJob manager) + private synchronized void setupQueue(NotificationManagerJob manager) throws JMSException { disconnect(); - session = manager.connection.createSession(false, - Session.AUTO_ACKNOWLEDGE); + session = manager.createSession(); if (session != null) { String queueName = id; Queue t = session.createQueue(queueName); @@ -510,16 +537,14 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable { consumer = session.createConsumer(t); } - setConsumer(consumer); consumer.setMessageListener(this); } } - private void setupTopic(NotificationManagerJob manager) + private synchronized void setupTopic(NotificationManagerJob manager) throws JMSException { disconnect(); - session = manager.connection.createSession(false, - Session.AUTO_ACKNOWLEDGE); + session = manager.createSession(); if (session != null) { String topicName = id; Topic t = session.createTopic(topicName); @@ -534,10 +559,6 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable { } } - public void setConsumer(MessageConsumer consumer) { - this.consumer = consumer; - } - /* * (non-Javadoc) * @@ -545,48 +566,37 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable { */ @Override public void onMessage(Message msg) { - if (observers == null) { - return; - } - - for (INotificationObserver obs : observers) { - sendToObserver(obs, msg); - } - - // Iterator iterator = jobWrappers.values().iterator(); - // while (iterator.hasNext()) { - // JobWrapper wrapper = iterator.next(); - // if (!wrapper.isEmpty() && wrapper.getState() != Job.RUNNING) { - // wrapper.schedule(); - // } - // } - } - - public synchronized void addObserver(INotificationObserver obs) { - observers.add(obs); - } - - public synchronized void removeObserver(INotificationObserver obs) { - observers.remove(obs); - } - - protected void sendToObserver(INotificationObserver observer, - Message msg) { - // Get the corresponding job, creating the - // wrapper if necessary - JobWrapper wrapper = null; - synchronized (this) { - wrapper = jobWrappers.get(observer); - if (wrapper == null) { - wrapper = new JobWrapper(observer); - jobWrappers.put(observer, wrapper); + synchronized (observers) { + for (INotificationObserver obs : observers) { + // Get the corresponding job, creating the + // wrapper if necessary, really on observers lock for sync + // purposes + JobWrapper wrapper = jobWrappers.get(obs); + if (wrapper == null) { + wrapper = new JobWrapper(obs); + jobWrappers.put(obs, wrapper); + } + wrapper.put(msg); } } - wrapper.put(msg); + } + + public void addObserver(INotificationObserver obs) { + synchronized (observers) { + observers.add(obs); + } + } + + public void removeObserver(INotificationObserver obs) { + synchronized (observers) { + observers.remove(obs); + } } public int size() { - return observers.size(); + synchronized (observers) { + return observers.size(); + } } } @@ -669,7 +679,7 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable { if (messageCount.incrementAndGet() > IN_MEM_MESSAGE_LIMIT) { messageCount.decrementAndGet(); messages.remove(); - if (System.currentTimeMillis() - lastErrorPrintTime > 600000) { + if ((System.currentTimeMillis() - lastErrorPrintTime) > 600000) { final Status s = new Status( Status.ERROR, Activator.PLUGIN_ID, @@ -683,15 +693,6 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable { this.schedule(); } - /** - * Checks if is empty. - * - * @return true, if is empty - */ - public boolean isEmpty() { - return this.messages.isEmpty(); - } - } /* @@ -701,9 +702,12 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable { */ @Override public void dispose() { - for (NotificationListener listener : listeners.values()) { - listener.disconnect(); + synchronized (listeners) { + for (NotificationListener listener : listeners.values()) { + listener.disconnect(); + } + listeners.clear(); } - listeners.clear(); + disconnect(true); } }