Merge "Issue #2389: Update NotificationManagerJob" into development

Former-commit-id: 7617fa37e3 [formerly 4d7125ecc5] [formerly 697d0f8853 [formerly 9bd7ddeeb1e850eb1f8729f556fa46d5c70dc342]]
Former-commit-id: 697d0f8853
Former-commit-id: b11c8e5c06
This commit is contained in:
Nate Jensen 2013-10-16 09:33:25 -05:00 committed by Gerrit Code Review
commit 8f9ca27d50

View file

@ -70,7 +70,8 @@ import com.raytheon.uf.viz.core.notification.NotificationMessage;
* 05/08/08 1127 randerso Initial Creation
* 09/03/08 1448 chammack Refactored notification observer interface
* 04/23/13 1939 randerso Add separate connect method to allow application
* to complete initialization before connecting to JMS
* to complete initialization before connecting to JMS.
* 10/15/2013 2389 rjpeter Updated synchronization to remove session leaks.
* </pre>
*
* @author randerso
@ -107,9 +108,10 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result
result = (prime * result)
+ ((queryString == null) ? 0 : queryString.hashCode());
result = prime * result + ((topic == null) ? 0 : topic.hashCode());
result = (prime * result)
+ ((topic == null) ? 0 : topic.hashCode());
return result;
}
@ -156,11 +158,11 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
private static NotificationManagerJob instance;
/** The observer map of topic to listeners */
protected Map<ListenerKey, NotificationListener> listeners;
protected final Map<ListenerKey, NotificationListener> listeners;
private Connection connection;
private boolean connected = false;
private volatile boolean connected = false;
/**
* Get the active subscription manager job. If one does not exist, start an
@ -191,18 +193,29 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
protected void connect(boolean notifyError) {
boolean successful = true;
try {
ConnectionFactory connectionFactory = JMSConnection.getInstance()
.getFactory();
disconnect(notifyError);
synchronized (this) {
try {
ConnectionFactory connectionFactory = JMSConnection
.getInstance().getFactory();
disconnect(notifyError);
// Create a Connection
connection = connectionFactory.createConnection();
connection.setExceptionListener(this);
// connection.setClientID(VizApp.getWsId().toString());
// Create a Connection
connection = connectionFactory.createConnection();
connection.setExceptionListener(this);
// connection.setClientID(VizApp.getWsId().toString());
connection.start();
connected = true;
connection.start();
connected = true;
} catch (JMSException e) {
if (notifyError) {
statusHandler.handle(Priority.SIGNIFICANT,
"NotificationManager failed to connect.", e);
}
successful = false;
}
}
synchronized (listeners) {
for (NotificationListener listener : listeners.values()) {
try {
listener.setupConnection(this);
@ -216,12 +229,6 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
}
}
}
} catch (JMSException e) {
if (notifyError) {
statusHandler.handle(Priority.SIGNIFICANT,
"NotificationManager failed to connect.", e);
}
successful = false;
}
if (!successful) {
@ -229,7 +236,22 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
}
}
protected void disconnect(boolean notifyError) {
/**
* Creates a new AUTO_ACKNOWLEDGE session if connected to JMS, otherwise
* returns null.
*
* @return
* @throws JMSException
*/
protected synchronized Session createSession() throws JMSException {
if (connected) {
return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
} else {
return null;
}
}
protected synchronized void disconnect(boolean notifyError) {
if (connection != null) {
try {
connection.stop();
@ -271,9 +293,11 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
new Timer().schedule(task, 5 * 1000);
}
// Reset connected bool
for (NotificationListener listener : listeners.values()) {
listener.connected = false;
synchronized (listeners) {
// disconnect listeners
for (NotificationListener listener : listeners.values()) {
listener.disconnect();
}
}
if (e != null) {
@ -282,33 +306,38 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
}
}
public static synchronized void addQueueObserver(String queue,
INotificationObserver obs) {
public static void addQueueObserver(String queue, INotificationObserver obs) {
addQueueObserver(queue, obs, null);
}
public static synchronized void addQueueObserver(String queue,
public static void addQueueObserver(String queue,
INotificationObserver obs, String queryString) {
NotificationManagerJob notifMgr = getInstance();
ListenerKey key = new ListenerKey(queue, queryString);
NotificationListener listener = notifMgr.listeners.get(key);
if (listener == null || listener.consumer == null) {
try {
listener = new NotificationListener(queue, queryString,
Type.QUEUE);
notifMgr.listeners.put(key, listener);
listener.addObserver(obs);
if (notifMgr.connected) {
listener.setupConnection(notifMgr);
synchronized (notifMgr.listeners) {
NotificationListener listener = notifMgr.listeners.get(key);
if (listener == null) {
try {
listener = new NotificationListener(queue, queryString,
Type.QUEUE);
notifMgr.listeners.put(key, listener);
listener.addObserver(obs);
if (notifMgr.connected) {
listener.setupConnection(notifMgr);
}
} catch (JMSException e) {
Status s = new Status(
IStatus.ERROR,
Activator.PLUGIN_ID,
0,
"NotificationManager failed to create queue consumer.",
e);
StatusManager.getManager().handle(s);
}
} catch (JMSException e) {
Status s = new Status(IStatus.ERROR, Activator.PLUGIN_ID, 0,
"NotificationManager failed to create queue consumer.",
e);
StatusManager.getManager().handle(s);
} else {
listener.addObserver(obs);
}
} else {
listener.addObserver(obs);
}
}
@ -320,32 +349,35 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
* @param obs
* the alert observer callback
*/
public static synchronized void addObserver(String topic,
INotificationObserver obs) {
public static void addObserver(String topic, INotificationObserver obs) {
addObserver(topic, obs, null);
}
public static synchronized void addObserver(String topic,
INotificationObserver obs, String queryString) {
public static void addObserver(String topic, INotificationObserver obs,
String queryString) {
NotificationManagerJob notifMgr = getInstance();
ListenerKey key = new ListenerKey(topic, queryString);
NotificationListener listener = notifMgr.listeners.get(key);
if (listener == null) {
try {
listener = new NotificationListener(topic, queryString,
Type.TOPIC);
notifMgr.listeners.put(key, listener);
listener.addObserver(obs);
if (notifMgr.connected) {
listener.setupConnection(notifMgr);
synchronized (notifMgr.listeners) {
NotificationListener listener = notifMgr.listeners.get(key);
if (listener == null) {
try {
listener = new NotificationListener(topic, queryString,
Type.TOPIC);
notifMgr.listeners.put(key, listener);
listener.addObserver(obs);
if (notifMgr.connected) {
listener.setupConnection(notifMgr);
}
} catch (JMSException e) {
Status s = new Status(IStatus.ERROR, Activator.PLUGIN_ID,
0,
"NotificationManager failed to create consumer.", e);
StatusManager.getManager().handle(s);
}
} catch (JMSException e) {
Status s = new Status(IStatus.ERROR, Activator.PLUGIN_ID, 0,
"NotificationManager failed to create consumer.", e);
StatusManager.getManager().handle(s);
} else {
listener.addObserver(obs);
}
} else {
listener.addObserver(obs);
}
}
@ -358,8 +390,7 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
* @param obs
* the observer to remove
*/
public static synchronized void removeObserver(String topic,
INotificationObserver obs) {
public static void removeObserver(String topic, INotificationObserver obs) {
removeObserver(topic, obs, null);
}
@ -373,18 +404,20 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
* the observer to remove
* @param queryString
*/
public static synchronized void removeObserver(String topic,
INotificationObserver obs, String queryString) {
public static void removeObserver(String topic, INotificationObserver obs,
String queryString) {
NotificationManagerJob notifMgr = getInstance();
ListenerKey key = new ListenerKey(topic, queryString);
NotificationListener listener = notifMgr.listeners.get(key);
if (listener == null) {
return;
}
listener.removeObserver(obs);
if (listener.size() <= 0) {
listener.disconnect();
notifMgr.listeners.remove(key);
synchronized (notifMgr.listeners) {
NotificationListener listener = notifMgr.listeners.get(key);
if (listener == null) {
return;
}
listener.removeObserver(obs);
if (listener.size() <= 0) {
listener.disconnect();
notifMgr.listeners.remove(key);
}
}
}
@ -397,18 +430,20 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
* @param obs
* the observer to remove
*/
public static synchronized void removeQueueObserver(String queue,
String queryString, INotificationObserver obs) {
public static void removeQueueObserver(String queue, String queryString,
INotificationObserver obs) {
NotificationManagerJob notifMgr = getInstance();
ListenerKey key = new ListenerKey(queue, queryString);
NotificationListener listener = notifMgr.listeners.get(key);
if (listener == null) {
return;
}
listener.removeObserver(obs);
if (listener.size() <= 0) {
listener.disconnect();
notifMgr.listeners.remove(key);
synchronized (notifMgr.listeners) {
NotificationListener listener = notifMgr.listeners.get(key);
if (listener == null) {
return;
}
listener.removeObserver(obs);
if (listener.size() <= 0) {
listener.disconnect();
notifMgr.listeners.remove(key);
}
}
}
@ -428,24 +463,22 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
private static class NotificationListener implements MessageListener {
private Type type;
private final Type type;
private String id;
private final String id;
private String queryString;
private final String queryString;
/** The list of interested parties */
protected List<INotificationObserver> observers;
protected final List<INotificationObserver> observers;
/** The map of job threads from observers */
protected Map<INotificationObserver, JobWrapper> jobWrappers;
protected final Map<INotificationObserver, JobWrapper> jobWrappers;
protected MessageConsumer consumer;
protected Session session;
protected boolean connected = false;
public NotificationListener(String id, String queryString, Type type) {
this.observers = new ArrayList<INotificationObserver>();
this.jobWrappers = new HashMap<INotificationObserver, JobWrapper>();
@ -466,40 +499,34 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
break;
}
}
// If we made it here we are good
connected = true;
}
public void disconnect() {
if (connected) {
if (consumer != null) {
try {
consumer.close();
} catch (JMSException e) {
statusHandler.handle(Priority.PROBLEM,
"Error closing consumer connection", e);
}
consumer = null;
public synchronized void disconnect() {
if (consumer != null) {
try {
consumer.close();
} catch (JMSException e) {
statusHandler.handle(Priority.PROBLEM,
"Error closing consumer connection", e);
}
consumer = null;
}
if (session != null) {
try {
session.close();
} catch (JMSException e) {
statusHandler.handle(Priority.PROBLEM,
"Error closing session", e);
}
session = null;
if (session != null) {
try {
session.close();
} catch (JMSException e) {
statusHandler.handle(Priority.PROBLEM,
"Error closing session", e);
}
connected = false;
session = null;
}
}
private void setupQueue(NotificationManagerJob manager)
private synchronized void setupQueue(NotificationManagerJob manager)
throws JMSException {
disconnect();
session = manager.connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
session = manager.createSession();
if (session != null) {
String queueName = id;
Queue t = session.createQueue(queueName);
@ -510,16 +537,14 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
consumer = session.createConsumer(t);
}
setConsumer(consumer);
consumer.setMessageListener(this);
}
}
private void setupTopic(NotificationManagerJob manager)
private synchronized void setupTopic(NotificationManagerJob manager)
throws JMSException {
disconnect();
session = manager.connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
session = manager.createSession();
if (session != null) {
String topicName = id;
Topic t = session.createTopic(topicName);
@ -534,10 +559,6 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
}
}
public void setConsumer(MessageConsumer consumer) {
this.consumer = consumer;
}
/*
* (non-Javadoc)
*
@ -545,48 +566,37 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
*/
@Override
public void onMessage(Message msg) {
if (observers == null) {
return;
}
for (INotificationObserver obs : observers) {
sendToObserver(obs, msg);
}
// Iterator<JobWrapper> iterator = jobWrappers.values().iterator();
// while (iterator.hasNext()) {
// JobWrapper wrapper = iterator.next();
// if (!wrapper.isEmpty() && wrapper.getState() != Job.RUNNING) {
// wrapper.schedule();
// }
// }
}
public synchronized void addObserver(INotificationObserver obs) {
observers.add(obs);
}
public synchronized void removeObserver(INotificationObserver obs) {
observers.remove(obs);
}
protected void sendToObserver(INotificationObserver observer,
Message msg) {
// Get the corresponding job, creating the
// wrapper if necessary
JobWrapper wrapper = null;
synchronized (this) {
wrapper = jobWrappers.get(observer);
if (wrapper == null) {
wrapper = new JobWrapper(observer);
jobWrappers.put(observer, wrapper);
synchronized (observers) {
for (INotificationObserver obs : observers) {
// Get the corresponding job, creating the
// wrapper if necessary, really on observers lock for sync
// purposes
JobWrapper wrapper = jobWrappers.get(obs);
if (wrapper == null) {
wrapper = new JobWrapper(obs);
jobWrappers.put(obs, wrapper);
}
wrapper.put(msg);
}
}
wrapper.put(msg);
}
public void addObserver(INotificationObserver obs) {
synchronized (observers) {
observers.add(obs);
}
}
public void removeObserver(INotificationObserver obs) {
synchronized (observers) {
observers.remove(obs);
}
}
public int size() {
return observers.size();
synchronized (observers) {
return observers.size();
}
}
}
@ -669,7 +679,7 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
if (messageCount.incrementAndGet() > IN_MEM_MESSAGE_LIMIT) {
messageCount.decrementAndGet();
messages.remove();
if (System.currentTimeMillis() - lastErrorPrintTime > 600000) {
if ((System.currentTimeMillis() - lastErrorPrintTime) > 600000) {
final Status s = new Status(
Status.ERROR,
Activator.PLUGIN_ID,
@ -683,15 +693,6 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
this.schedule();
}
/**
* Checks if is empty.
*
* @return true, if is empty
*/
public boolean isEmpty() {
return this.messages.isEmpty();
}
}
/*
@ -701,9 +702,12 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
*/
@Override
public void dispose() {
for (NotificationListener listener : listeners.values()) {
listener.disconnect();
synchronized (listeners) {
for (NotificationListener listener : listeners.values()) {
listener.disconnect();
}
listeners.clear();
}
listeners.clear();
disconnect(true);
}
}