Merge "Issue #2389: Update NotificationManagerJob" into development
Former-commit-id: 9bd7ddeeb1e850eb1f8729f556fa46d5c70dc342
This commit is contained in:
commit
4d7125ecc5
1 changed files with 178 additions and 174 deletions
|
@ -70,7 +70,8 @@ import com.raytheon.uf.viz.core.notification.NotificationMessage;
|
|||
* 05/08/08 1127 randerso Initial Creation
|
||||
* 09/03/08 1448 chammack Refactored notification observer interface
|
||||
* 04/23/13 1939 randerso Add separate connect method to allow application
|
||||
* to complete initialization before connecting to JMS
|
||||
* to complete initialization before connecting to JMS.
|
||||
* 10/15/2013 2389 rjpeter Updated synchronization to remove session leaks.
|
||||
* </pre>
|
||||
*
|
||||
* @author randerso
|
||||
|
@ -107,9 +108,10 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
|
|||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = 1;
|
||||
result = prime * result
|
||||
result = (prime * result)
|
||||
+ ((queryString == null) ? 0 : queryString.hashCode());
|
||||
result = prime * result + ((topic == null) ? 0 : topic.hashCode());
|
||||
result = (prime * result)
|
||||
+ ((topic == null) ? 0 : topic.hashCode());
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -156,11 +158,11 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
|
|||
private static NotificationManagerJob instance;
|
||||
|
||||
/** The observer map of topic to listeners */
|
||||
protected Map<ListenerKey, NotificationListener> listeners;
|
||||
protected final Map<ListenerKey, NotificationListener> listeners;
|
||||
|
||||
private Connection connection;
|
||||
|
||||
private boolean connected = false;
|
||||
private volatile boolean connected = false;
|
||||
|
||||
/**
|
||||
* Get the active subscription manager job. If one does not exist, start an
|
||||
|
@ -191,18 +193,29 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
|
|||
|
||||
protected void connect(boolean notifyError) {
|
||||
boolean successful = true;
|
||||
try {
|
||||
ConnectionFactory connectionFactory = JMSConnection.getInstance()
|
||||
.getFactory();
|
||||
disconnect(notifyError);
|
||||
synchronized (this) {
|
||||
try {
|
||||
ConnectionFactory connectionFactory = JMSConnection
|
||||
.getInstance().getFactory();
|
||||
disconnect(notifyError);
|
||||
|
||||
// Create a Connection
|
||||
connection = connectionFactory.createConnection();
|
||||
connection.setExceptionListener(this);
|
||||
// connection.setClientID(VizApp.getWsId().toString());
|
||||
// Create a Connection
|
||||
connection = connectionFactory.createConnection();
|
||||
connection.setExceptionListener(this);
|
||||
// connection.setClientID(VizApp.getWsId().toString());
|
||||
|
||||
connection.start();
|
||||
connected = true;
|
||||
connection.start();
|
||||
connected = true;
|
||||
} catch (JMSException e) {
|
||||
if (notifyError) {
|
||||
statusHandler.handle(Priority.SIGNIFICANT,
|
||||
"NotificationManager failed to connect.", e);
|
||||
}
|
||||
successful = false;
|
||||
}
|
||||
}
|
||||
|
||||
synchronized (listeners) {
|
||||
for (NotificationListener listener : listeners.values()) {
|
||||
try {
|
||||
listener.setupConnection(this);
|
||||
|
@ -216,12 +229,6 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
|
|||
}
|
||||
}
|
||||
}
|
||||
} catch (JMSException e) {
|
||||
if (notifyError) {
|
||||
statusHandler.handle(Priority.SIGNIFICANT,
|
||||
"NotificationManager failed to connect.", e);
|
||||
}
|
||||
successful = false;
|
||||
}
|
||||
|
||||
if (!successful) {
|
||||
|
@ -229,7 +236,22 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
|
|||
}
|
||||
}
|
||||
|
||||
protected void disconnect(boolean notifyError) {
|
||||
/**
|
||||
* Creates a new AUTO_ACKNOWLEDGE session if connected to JMS, otherwise
|
||||
* returns null.
|
||||
*
|
||||
* @return
|
||||
* @throws JMSException
|
||||
*/
|
||||
protected synchronized Session createSession() throws JMSException {
|
||||
if (connected) {
|
||||
return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
protected synchronized void disconnect(boolean notifyError) {
|
||||
if (connection != null) {
|
||||
try {
|
||||
connection.stop();
|
||||
|
@ -271,9 +293,11 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
|
|||
new Timer().schedule(task, 5 * 1000);
|
||||
}
|
||||
|
||||
// Reset connected bool
|
||||
for (NotificationListener listener : listeners.values()) {
|
||||
listener.connected = false;
|
||||
synchronized (listeners) {
|
||||
// disconnect listeners
|
||||
for (NotificationListener listener : listeners.values()) {
|
||||
listener.disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
if (e != null) {
|
||||
|
@ -282,33 +306,38 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
|
|||
}
|
||||
}
|
||||
|
||||
public static synchronized void addQueueObserver(String queue,
|
||||
INotificationObserver obs) {
|
||||
public static void addQueueObserver(String queue, INotificationObserver obs) {
|
||||
addQueueObserver(queue, obs, null);
|
||||
}
|
||||
|
||||
public static synchronized void addQueueObserver(String queue,
|
||||
public static void addQueueObserver(String queue,
|
||||
INotificationObserver obs, String queryString) {
|
||||
NotificationManagerJob notifMgr = getInstance();
|
||||
ListenerKey key = new ListenerKey(queue, queryString);
|
||||
NotificationListener listener = notifMgr.listeners.get(key);
|
||||
if (listener == null || listener.consumer == null) {
|
||||
try {
|
||||
listener = new NotificationListener(queue, queryString,
|
||||
Type.QUEUE);
|
||||
notifMgr.listeners.put(key, listener);
|
||||
listener.addObserver(obs);
|
||||
if (notifMgr.connected) {
|
||||
listener.setupConnection(notifMgr);
|
||||
|
||||
synchronized (notifMgr.listeners) {
|
||||
NotificationListener listener = notifMgr.listeners.get(key);
|
||||
if (listener == null) {
|
||||
try {
|
||||
listener = new NotificationListener(queue, queryString,
|
||||
Type.QUEUE);
|
||||
notifMgr.listeners.put(key, listener);
|
||||
listener.addObserver(obs);
|
||||
if (notifMgr.connected) {
|
||||
listener.setupConnection(notifMgr);
|
||||
}
|
||||
} catch (JMSException e) {
|
||||
Status s = new Status(
|
||||
IStatus.ERROR,
|
||||
Activator.PLUGIN_ID,
|
||||
0,
|
||||
"NotificationManager failed to create queue consumer.",
|
||||
e);
|
||||
StatusManager.getManager().handle(s);
|
||||
}
|
||||
} catch (JMSException e) {
|
||||
Status s = new Status(IStatus.ERROR, Activator.PLUGIN_ID, 0,
|
||||
"NotificationManager failed to create queue consumer.",
|
||||
e);
|
||||
StatusManager.getManager().handle(s);
|
||||
} else {
|
||||
listener.addObserver(obs);
|
||||
}
|
||||
} else {
|
||||
listener.addObserver(obs);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -320,32 +349,35 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
|
|||
* @param obs
|
||||
* the alert observer callback
|
||||
*/
|
||||
public static synchronized void addObserver(String topic,
|
||||
INotificationObserver obs) {
|
||||
public static void addObserver(String topic, INotificationObserver obs) {
|
||||
addObserver(topic, obs, null);
|
||||
}
|
||||
|
||||
public static synchronized void addObserver(String topic,
|
||||
INotificationObserver obs, String queryString) {
|
||||
public static void addObserver(String topic, INotificationObserver obs,
|
||||
String queryString) {
|
||||
NotificationManagerJob notifMgr = getInstance();
|
||||
ListenerKey key = new ListenerKey(topic, queryString);
|
||||
NotificationListener listener = notifMgr.listeners.get(key);
|
||||
if (listener == null) {
|
||||
try {
|
||||
listener = new NotificationListener(topic, queryString,
|
||||
Type.TOPIC);
|
||||
notifMgr.listeners.put(key, listener);
|
||||
listener.addObserver(obs);
|
||||
if (notifMgr.connected) {
|
||||
listener.setupConnection(notifMgr);
|
||||
|
||||
synchronized (notifMgr.listeners) {
|
||||
NotificationListener listener = notifMgr.listeners.get(key);
|
||||
if (listener == null) {
|
||||
try {
|
||||
listener = new NotificationListener(topic, queryString,
|
||||
Type.TOPIC);
|
||||
notifMgr.listeners.put(key, listener);
|
||||
listener.addObserver(obs);
|
||||
if (notifMgr.connected) {
|
||||
listener.setupConnection(notifMgr);
|
||||
}
|
||||
} catch (JMSException e) {
|
||||
Status s = new Status(IStatus.ERROR, Activator.PLUGIN_ID,
|
||||
0,
|
||||
"NotificationManager failed to create consumer.", e);
|
||||
StatusManager.getManager().handle(s);
|
||||
}
|
||||
} catch (JMSException e) {
|
||||
Status s = new Status(IStatus.ERROR, Activator.PLUGIN_ID, 0,
|
||||
"NotificationManager failed to create consumer.", e);
|
||||
StatusManager.getManager().handle(s);
|
||||
} else {
|
||||
listener.addObserver(obs);
|
||||
}
|
||||
} else {
|
||||
listener.addObserver(obs);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -358,8 +390,7 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
|
|||
* @param obs
|
||||
* the observer to remove
|
||||
*/
|
||||
public static synchronized void removeObserver(String topic,
|
||||
INotificationObserver obs) {
|
||||
public static void removeObserver(String topic, INotificationObserver obs) {
|
||||
removeObserver(topic, obs, null);
|
||||
}
|
||||
|
||||
|
@ -373,18 +404,20 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
|
|||
* the observer to remove
|
||||
* @param queryString
|
||||
*/
|
||||
public static synchronized void removeObserver(String topic,
|
||||
INotificationObserver obs, String queryString) {
|
||||
public static void removeObserver(String topic, INotificationObserver obs,
|
||||
String queryString) {
|
||||
NotificationManagerJob notifMgr = getInstance();
|
||||
ListenerKey key = new ListenerKey(topic, queryString);
|
||||
NotificationListener listener = notifMgr.listeners.get(key);
|
||||
if (listener == null) {
|
||||
return;
|
||||
}
|
||||
listener.removeObserver(obs);
|
||||
if (listener.size() <= 0) {
|
||||
listener.disconnect();
|
||||
notifMgr.listeners.remove(key);
|
||||
synchronized (notifMgr.listeners) {
|
||||
NotificationListener listener = notifMgr.listeners.get(key);
|
||||
if (listener == null) {
|
||||
return;
|
||||
}
|
||||
listener.removeObserver(obs);
|
||||
if (listener.size() <= 0) {
|
||||
listener.disconnect();
|
||||
notifMgr.listeners.remove(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -397,18 +430,20 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
|
|||
* @param obs
|
||||
* the observer to remove
|
||||
*/
|
||||
public static synchronized void removeQueueObserver(String queue,
|
||||
String queryString, INotificationObserver obs) {
|
||||
public static void removeQueueObserver(String queue, String queryString,
|
||||
INotificationObserver obs) {
|
||||
NotificationManagerJob notifMgr = getInstance();
|
||||
ListenerKey key = new ListenerKey(queue, queryString);
|
||||
NotificationListener listener = notifMgr.listeners.get(key);
|
||||
if (listener == null) {
|
||||
return;
|
||||
}
|
||||
listener.removeObserver(obs);
|
||||
if (listener.size() <= 0) {
|
||||
listener.disconnect();
|
||||
notifMgr.listeners.remove(key);
|
||||
synchronized (notifMgr.listeners) {
|
||||
NotificationListener listener = notifMgr.listeners.get(key);
|
||||
if (listener == null) {
|
||||
return;
|
||||
}
|
||||
listener.removeObserver(obs);
|
||||
if (listener.size() <= 0) {
|
||||
listener.disconnect();
|
||||
notifMgr.listeners.remove(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -428,24 +463,22 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
|
|||
|
||||
private static class NotificationListener implements MessageListener {
|
||||
|
||||
private Type type;
|
||||
private final Type type;
|
||||
|
||||
private String id;
|
||||
private final String id;
|
||||
|
||||
private String queryString;
|
||||
private final String queryString;
|
||||
|
||||
/** The list of interested parties */
|
||||
protected List<INotificationObserver> observers;
|
||||
protected final List<INotificationObserver> observers;
|
||||
|
||||
/** The map of job threads from observers */
|
||||
protected Map<INotificationObserver, JobWrapper> jobWrappers;
|
||||
protected final Map<INotificationObserver, JobWrapper> jobWrappers;
|
||||
|
||||
protected MessageConsumer consumer;
|
||||
|
||||
protected Session session;
|
||||
|
||||
protected boolean connected = false;
|
||||
|
||||
public NotificationListener(String id, String queryString, Type type) {
|
||||
this.observers = new ArrayList<INotificationObserver>();
|
||||
this.jobWrappers = new HashMap<INotificationObserver, JobWrapper>();
|
||||
|
@ -466,40 +499,34 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
|
|||
break;
|
||||
}
|
||||
}
|
||||
// If we made it here we are good
|
||||
connected = true;
|
||||
}
|
||||
|
||||
public void disconnect() {
|
||||
if (connected) {
|
||||
if (consumer != null) {
|
||||
try {
|
||||
consumer.close();
|
||||
} catch (JMSException e) {
|
||||
statusHandler.handle(Priority.PROBLEM,
|
||||
"Error closing consumer connection", e);
|
||||
}
|
||||
consumer = null;
|
||||
public synchronized void disconnect() {
|
||||
if (consumer != null) {
|
||||
try {
|
||||
consumer.close();
|
||||
} catch (JMSException e) {
|
||||
statusHandler.handle(Priority.PROBLEM,
|
||||
"Error closing consumer connection", e);
|
||||
}
|
||||
consumer = null;
|
||||
}
|
||||
|
||||
if (session != null) {
|
||||
try {
|
||||
session.close();
|
||||
} catch (JMSException e) {
|
||||
statusHandler.handle(Priority.PROBLEM,
|
||||
"Error closing session", e);
|
||||
}
|
||||
session = null;
|
||||
if (session != null) {
|
||||
try {
|
||||
session.close();
|
||||
} catch (JMSException e) {
|
||||
statusHandler.handle(Priority.PROBLEM,
|
||||
"Error closing session", e);
|
||||
}
|
||||
connected = false;
|
||||
session = null;
|
||||
}
|
||||
}
|
||||
|
||||
private void setupQueue(NotificationManagerJob manager)
|
||||
private synchronized void setupQueue(NotificationManagerJob manager)
|
||||
throws JMSException {
|
||||
disconnect();
|
||||
session = manager.connection.createSession(false,
|
||||
Session.AUTO_ACKNOWLEDGE);
|
||||
session = manager.createSession();
|
||||
if (session != null) {
|
||||
String queueName = id;
|
||||
Queue t = session.createQueue(queueName);
|
||||
|
@ -510,16 +537,14 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
|
|||
consumer = session.createConsumer(t);
|
||||
}
|
||||
|
||||
setConsumer(consumer);
|
||||
consumer.setMessageListener(this);
|
||||
}
|
||||
}
|
||||
|
||||
private void setupTopic(NotificationManagerJob manager)
|
||||
private synchronized void setupTopic(NotificationManagerJob manager)
|
||||
throws JMSException {
|
||||
disconnect();
|
||||
session = manager.connection.createSession(false,
|
||||
Session.AUTO_ACKNOWLEDGE);
|
||||
session = manager.createSession();
|
||||
if (session != null) {
|
||||
String topicName = id;
|
||||
Topic t = session.createTopic(topicName);
|
||||
|
@ -534,10 +559,6 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
|
|||
}
|
||||
}
|
||||
|
||||
public void setConsumer(MessageConsumer consumer) {
|
||||
this.consumer = consumer;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
|
@ -545,48 +566,37 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
|
|||
*/
|
||||
@Override
|
||||
public void onMessage(Message msg) {
|
||||
if (observers == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (INotificationObserver obs : observers) {
|
||||
sendToObserver(obs, msg);
|
||||
}
|
||||
|
||||
// Iterator<JobWrapper> iterator = jobWrappers.values().iterator();
|
||||
// while (iterator.hasNext()) {
|
||||
// JobWrapper wrapper = iterator.next();
|
||||
// if (!wrapper.isEmpty() && wrapper.getState() != Job.RUNNING) {
|
||||
// wrapper.schedule();
|
||||
// }
|
||||
// }
|
||||
}
|
||||
|
||||
public synchronized void addObserver(INotificationObserver obs) {
|
||||
observers.add(obs);
|
||||
}
|
||||
|
||||
public synchronized void removeObserver(INotificationObserver obs) {
|
||||
observers.remove(obs);
|
||||
}
|
||||
|
||||
protected void sendToObserver(INotificationObserver observer,
|
||||
Message msg) {
|
||||
// Get the corresponding job, creating the
|
||||
// wrapper if necessary
|
||||
JobWrapper wrapper = null;
|
||||
synchronized (this) {
|
||||
wrapper = jobWrappers.get(observer);
|
||||
if (wrapper == null) {
|
||||
wrapper = new JobWrapper(observer);
|
||||
jobWrappers.put(observer, wrapper);
|
||||
synchronized (observers) {
|
||||
for (INotificationObserver obs : observers) {
|
||||
// Get the corresponding job, creating the
|
||||
// wrapper if necessary, really on observers lock for sync
|
||||
// purposes
|
||||
JobWrapper wrapper = jobWrappers.get(obs);
|
||||
if (wrapper == null) {
|
||||
wrapper = new JobWrapper(obs);
|
||||
jobWrappers.put(obs, wrapper);
|
||||
}
|
||||
wrapper.put(msg);
|
||||
}
|
||||
}
|
||||
wrapper.put(msg);
|
||||
}
|
||||
|
||||
public void addObserver(INotificationObserver obs) {
|
||||
synchronized (observers) {
|
||||
observers.add(obs);
|
||||
}
|
||||
}
|
||||
|
||||
public void removeObserver(INotificationObserver obs) {
|
||||
synchronized (observers) {
|
||||
observers.remove(obs);
|
||||
}
|
||||
}
|
||||
|
||||
public int size() {
|
||||
return observers.size();
|
||||
synchronized (observers) {
|
||||
return observers.size();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -669,7 +679,7 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
|
|||
if (messageCount.incrementAndGet() > IN_MEM_MESSAGE_LIMIT) {
|
||||
messageCount.decrementAndGet();
|
||||
messages.remove();
|
||||
if (System.currentTimeMillis() - lastErrorPrintTime > 600000) {
|
||||
if ((System.currentTimeMillis() - lastErrorPrintTime) > 600000) {
|
||||
final Status s = new Status(
|
||||
Status.ERROR,
|
||||
Activator.PLUGIN_ID,
|
||||
|
@ -683,15 +693,6 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
|
|||
this.schedule();
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if is empty.
|
||||
*
|
||||
* @return true, if is empty
|
||||
*/
|
||||
public boolean isEmpty() {
|
||||
return this.messages.isEmpty();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -701,9 +702,12 @@ public class NotificationManagerJob implements ExceptionListener, IDisposable {
|
|||
*/
|
||||
@Override
|
||||
public void dispose() {
|
||||
for (NotificationListener listener : listeners.values()) {
|
||||
listener.disconnect();
|
||||
synchronized (listeners) {
|
||||
for (NotificationListener listener : listeners.values()) {
|
||||
listener.disconnect();
|
||||
}
|
||||
listeners.clear();
|
||||
}
|
||||
listeners.clear();
|
||||
disconnect(true);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue