diff --git a/cave/com.raytheon.uf.viz.core/src/com/raytheon/uf/viz/core/comm/JMSConnection.java b/cave/com.raytheon.uf.viz.core/src/com/raytheon/uf/viz/core/comm/JMSConnection.java
index b9dcbaf012..7d06157abf 100644
--- a/cave/com.raytheon.uf.viz.core/src/com/raytheon/uf/viz/core/comm/JMSConnection.java
+++ b/cave/com.raytheon.uf.viz.core/src/com/raytheon/uf/viz/core/comm/JMSConnection.java
@@ -22,6 +22,7 @@ package com.raytheon.uf.viz.core.comm;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
+
import javax.jms.ConnectionFactory;
import org.apache.qpid.client.AMQConnectionFactory;
@@ -41,6 +42,7 @@ import com.raytheon.uf.viz.core.VizApp;
* Nov 2, 2009 #3067 chammack Send all jms connections through failover:// to properly reconnect
* Nov 2, 2011 #7391 bkowal Ensure that the generated WsId is properly formatted to be
* included in a url.
+ * May 09, 2013 1814 rjpeter Updated prefetch to 10.
*
*
* @author chammack
@@ -50,7 +52,7 @@ public class JMSConnection {
private static JMSConnection instance;
- private String jndiProviderUrl;
+ private final String jndiProviderUrl;
private AMQConnectionFactory factory;
@@ -76,17 +78,18 @@ public class JMSConnection {
// reconnect
this.factory = new AMQConnectionFactory(
"amqp://guest:guest@"
- + URLEncoder.encode(VizApp.getWsId().toString(), "UTF-8")
+ + URLEncoder.encode(VizApp.getWsId().toString(),
+ "UTF-8")
+ "/edex?brokerlist='"
+ this.jndiProviderUrl
- + "?connecttimeout='5000'&heartbeat='0''&maxprefetch='0'&sync_publish='all'&failover='nofailover'");
+ + "?connecttimeout='5000'&heartbeat='0''&maxprefetch='10'&sync_publish='all'&failover='nofailover'");
} catch (URLSyntaxException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (UnsupportedEncodingException e1) {
- // TODO Auto-generated catch block
- e1.printStackTrace();
- }
+ // TODO Auto-generated catch block
+ e1.printStackTrace();
+ }
}
/**
diff --git a/edexOsgi/build.edex/esb/conf/spring/edex.xml b/edexOsgi/build.edex/esb/conf/spring/edex.xml
index 4874cf0304..f5e3dd5f5e 100644
--- a/edexOsgi/build.edex/esb/conf/spring/edex.xml
+++ b/edexOsgi/build.edex/esb/conf/spring/edex.xml
@@ -18,7 +18,7 @@
-
+
@@ -239,14 +239,14 @@
-
+
-
+
@@ -267,7 +267,7 @@
-
+
diff --git a/edexOsgi/com.raytheon.edex.plugin.gfe/res/spring/gfe-common.xml b/edexOsgi/com.raytheon.edex.plugin.gfe/res/spring/gfe-common.xml
index d7e8da91d3..95e20d6c2e 100644
--- a/edexOsgi/com.raytheon.edex.plugin.gfe/res/spring/gfe-common.xml
+++ b/edexOsgi/com.raytheon.edex.plugin.gfe/res/spring/gfe-common.xml
@@ -70,7 +70,7 @@
-
+
java.lang.Throwable
diff --git a/edexOsgi/com.raytheon.edex.plugin.gfe/res/spring/gfe-spring.xml b/edexOsgi/com.raytheon.edex.plugin.gfe/res/spring/gfe-spring.xml
index 385d4d6b26..97bbfea36d 100644
--- a/edexOsgi/com.raytheon.edex.plugin.gfe/res/spring/gfe-spring.xml
+++ b/edexOsgi/com.raytheon.edex.plugin.gfe/res/spring/gfe-spring.xml
@@ -118,7 +118,7 @@
-
+
diff --git a/edexOsgi/com.raytheon.edex.plugin.gfe/src/com/raytheon/edex/plugin/gfe/server/notify/GfeIngestNotificationFilter.java b/edexOsgi/com.raytheon.edex.plugin.gfe/src/com/raytheon/edex/plugin/gfe/server/notify/GfeIngestNotificationFilter.java
index 87ec08bec5..39dea740ea 100644
--- a/edexOsgi/com.raytheon.edex.plugin.gfe/src/com/raytheon/edex/plugin/gfe/server/notify/GfeIngestNotificationFilter.java
+++ b/edexOsgi/com.raytheon.edex.plugin.gfe/src/com/raytheon/edex/plugin/gfe/server/notify/GfeIngestNotificationFilter.java
@@ -204,7 +204,7 @@ public class GfeIngestNotificationFilter {
// if we don't have the other component for this
// fcstHour
- if (otherTimes == null
+ if ((otherTimes == null)
|| !otherTimes.remove(fcstHour)) {
// need to wait for other component
ParmID compPid = new ParmID(d2dParamName,
@@ -371,7 +371,8 @@ public class GfeIngestNotificationFilter {
throws Exception {
byte[] message = SerializationUtil.transformToThrift(notifications);
EDEXUtil.getMessageProducer().sendAsyncUri(
- "jms-generic:topic:gfeGribNotification", message);
+ "jms-generic:topic:gfeGribNotification?timeToLive=60000",
+ message);
SendNotifications.send(notifications);
}
diff --git a/edexOsgi/com.raytheon.edex.plugin.grib/src/com/raytheon/edex/plugin/grib/dao/GribDao.java b/edexOsgi/com.raytheon.edex.plugin.grib/src/com/raytheon/edex/plugin/grib/dao/GribDao.java
index e36ba027f9..25f624eaa3 100644
--- a/edexOsgi/com.raytheon.edex.plugin.grib/src/com/raytheon/edex/plugin/grib/dao/GribDao.java
+++ b/edexOsgi/com.raytheon.edex.plugin.grib/src/com/raytheon/edex/plugin/grib/dao/GribDao.java
@@ -83,7 +83,7 @@ import com.raytheon.uf.edex.database.query.DatabaseQuery;
* that sent notification to D2DParmIdCache.
* 01/14/13 #1469 bkowal Removed the hdf5 data directory
* 04/08/13 #1293 bkowal Removed references to hdffileid.
- *
+ * 05/08/13 1814 rjpeter Added time to live to topic message
*
*
* @author bphillip
@@ -101,7 +101,7 @@ public class GribDao extends PluginDao {
private static final String THINNED_PTS = "thinnedPts";
- private static final String PURGE_MODEL_CACHE_TOPIC = "jms-generic:topic:purgeGribModelCache";
+ private static final String PURGE_MODEL_CACHE_TOPIC = "jms-generic:topic:purgeGribModelCache?timeToLive=60000";
/**
* Creates a new GribPyDao object
@@ -171,7 +171,7 @@ public class GribDao extends PluginDao {
IPersistable obj) throws Exception {
GribRecord gribRec = (GribRecord) obj;
- if (gribRec.getMessageData() != null
+ if ((gribRec.getMessageData() != null)
&& !gribRec.getModelInfo().getParameterName().equals("Missing")) {
AbstractStorageRecord storageRecord = null;
AbstractStorageRecord localSection = null;
@@ -182,8 +182,8 @@ public class GribDao extends PluginDao {
* Stores the binary data to the HDF5 data store
*/
if (gribRec.getMessageData() instanceof float[]) {
- if (gribRec.getSpatialObject() != null
- && gribRec.getMessageData() != null) {
+ if ((gribRec.getSpatialObject() != null)
+ && (gribRec.getMessageData() != null)) {
long[] sizes = new long[] {
(gribRec.getSpatialObject()).getNx(),
(gribRec.getSpatialObject()).getNy() };
@@ -316,7 +316,7 @@ public class GribDao extends PluginDao {
for (PluginDataObject record : records) {
GribRecord rec = (GribRecord) record;
GribModel model = rec.getModelInfo();
- if (model.getParameterName() == null
+ if ((model.getParameterName() == null)
|| model.getParameterName().equals("Missing")) {
logger.info("Discarding record due to missing or unknown parameter mapping: "
+ record);
@@ -327,7 +327,7 @@ public class GribDao extends PluginDao {
if (level != null) {
MasterLevel ml = level.getMasterLevel();
- if (ml != null
+ if ((ml != null)
&& !LevelFactory.UNKNOWN_LEVEL.equals(ml.getName())) {
validLevel = true;
}
@@ -362,7 +362,7 @@ public class GribDao extends PluginDao {
for (PluginDataObject record : records) {
GribRecord rec = (GribRecord) record;
GribModel model = rec.getModelInfo();
- if (model.getParameterName() == null
+ if ((model.getParameterName() == null)
|| model.getParameterName().equals("Missing")) {
logger.info("Discarding record due to missing or unknown parameter mapping: "
+ record);
@@ -373,7 +373,7 @@ public class GribDao extends PluginDao {
if (level != null) {
MasterLevel ml = level.getMasterLevel();
- if (ml != null
+ if ((ml != null)
&& !LevelFactory.UNKNOWN_LEVEL.equals(ml.getName())) {
validLevel = true;
}
diff --git a/edexOsgi/com.raytheon.edex.plugin.text/res/spring/text-ingest.xml b/edexOsgi/com.raytheon.edex.plugin.text/res/spring/text-ingest.xml
index edff6f5a3d..df1e3386d9 100644
--- a/edexOsgi/com.raytheon.edex.plugin.text/res/spring/text-ingest.xml
+++ b/edexOsgi/com.raytheon.edex.plugin.text/res/spring/text-ingest.xml
@@ -152,7 +152,7 @@
-
+
diff --git a/edexOsgi/com.raytheon.edex.utilitysrv/res/spring/utility-request.xml b/edexOsgi/com.raytheon.edex.utilitysrv/res/spring/utility-request.xml
index 7513b3a613..b82b7cba36 100644
--- a/edexOsgi/com.raytheon.edex.utilitysrv/res/spring/utility-request.xml
+++ b/edexOsgi/com.raytheon.edex.utilitysrv/res/spring/utility-request.xml
@@ -44,7 +44,7 @@
-
+
diff --git a/edexOsgi/com.raytheon.uf.edex.activetable/res/spring/activetable-common.xml b/edexOsgi/com.raytheon.uf.edex.activetable/res/spring/activetable-common.xml
index d62836660a..aa91c12305 100644
--- a/edexOsgi/com.raytheon.uf.edex.activetable/res/spring/activetable-common.xml
+++ b/edexOsgi/com.raytheon.uf.edex.activetable/res/spring/activetable-common.xml
@@ -44,7 +44,7 @@
-
+
diff --git a/edexOsgi/com.raytheon.uf.edex.plugin.grid/res/spring/grid-common.xml b/edexOsgi/com.raytheon.uf.edex.plugin.grid/res/spring/grid-common.xml
index 2c946935c5..ad337089a3 100644
--- a/edexOsgi/com.raytheon.uf.edex.plugin.grid/res/spring/grid-common.xml
+++ b/edexOsgi/com.raytheon.uf.edex.plugin.grid/res/spring/grid-common.xml
@@ -46,7 +46,7 @@
-
+
*
* @author dgilling
@@ -47,7 +47,7 @@ import com.raytheon.uf.edex.plugin.grid.dao.GridDao;
public class DeleteAllGridDataHandler implements
IRequestHandler {
- private static final String PLUGIN_PURGED_TOPIC = "jms-generic:topic:pluginPurged";
+ private static final String PLUGIN_PURGED_TOPIC = "jms-generic:topic:pluginPurged?timeToLive=60000";
/*
* (non-Javadoc)
diff --git a/edexOsgi/com.raytheon.uf.edex.purgesrv/src/com/raytheon/uf/edex/purgesrv/PurgeJob.java b/edexOsgi/com.raytheon.uf.edex.purgesrv/src/com/raytheon/uf/edex/purgesrv/PurgeJob.java
index 38009b518d..7f874b5095 100644
--- a/edexOsgi/com.raytheon.uf.edex.purgesrv/src/com/raytheon/uf/edex/purgesrv/PurgeJob.java
+++ b/edexOsgi/com.raytheon.uf.edex.purgesrv/src/com/raytheon/uf/edex/purgesrv/PurgeJob.java
@@ -45,7 +45,7 @@ import com.raytheon.uf.edex.database.purge.PurgeLogger;
* ------------ ---------- ----------- --------------------------
* Apr 19, 2012 #470 bphillip Initial creation
* Jun 20, 2012 NC#606 ghull send purge-complete messages
- *
+ * May 08, 2013 1814 rjpeter Added time to live to topic
*
*
* @author bphillip
@@ -53,260 +53,264 @@ import com.raytheon.uf.edex.database.purge.PurgeLogger;
*/
public class PurgeJob extends Thread {
- /** The type of purge */
- public enum PURGE_JOB_TYPE {
- PURGE_ALL, PURGE_EXPIRED
- }
-
- public static final String PLUGIN_PURGED_TOPIC = "jms-generic:topic:pluginPurged";
+ /** The type of purge */
+ public enum PURGE_JOB_TYPE {
+ PURGE_ALL, PURGE_EXPIRED
+ }
- private long startTime;
+ public static final String PLUGIN_PURGED_TOPIC = "jms-generic:topic:pluginPurged?timeToLive=60000";
- /** The cluster task name to use for purge jobs */
- public static final String TASK_NAME = "Purge Plugin Data";
+ private long startTime;
- /** The plugin associated with this purge job */
- private String pluginName;
+ /** The cluster task name to use for purge jobs */
+ public static final String TASK_NAME = "Purge Plugin Data";
- /** The type of purge job being executed */
- private PURGE_JOB_TYPE purgeType;
+ /** The plugin associated with this purge job */
+ private final String pluginName;
- /** Last time job has printed a timed out message */
- private long lastTimeOutMessage = 0;
+ /** The type of purge job being executed */
+ private final PURGE_JOB_TYPE purgeType;
- /**
- * Creates a new Purge job for the specified plugin.
- *
- * @param pluginName
- * The plugin to be purged
- * @param purgeType
- * The type of purge to be executed
- */
- public PurgeJob(String pluginName, PURGE_JOB_TYPE purgeType) {
- // Give the thread a name
- this.setName("Purge-" + pluginName.toUpperCase() + "-Thread");
- this.pluginName = pluginName;
- this.purgeType = purgeType;
- }
+ /** Last time job has printed a timed out message */
+ private final long lastTimeOutMessage = 0;
- public void run() {
+ /**
+ * Creates a new Purge job for the specified plugin.
+ *
+ * @param pluginName
+ * The plugin to be purged
+ * @param purgeType
+ * The type of purge to be executed
+ */
+ public PurgeJob(String pluginName, PURGE_JOB_TYPE purgeType) {
+ // Give the thread a name
+ this.setName("Purge-" + pluginName.toUpperCase() + "-Thread");
+ this.pluginName = pluginName;
+ this.purgeType = purgeType;
+ }
- // Flag used to track if this job has failed
- boolean failed = false;
- startTime = System.currentTimeMillis();
- PurgeLogger.logInfo("Purging expired data...", pluginName);
- PluginDao dao = null;
+ @Override
+ public void run() {
- try {
- dao = PluginFactory.getInstance().getPluginDao(pluginName);
- if (dao.getDaoClass() != null) {
- dao.purgeExpiredData();
-
- PurgeLogger.logInfo("Data successfully Purged!", pluginName);
+ // Flag used to track if this job has failed
+ boolean failed = false;
+ startTime = System.currentTimeMillis();
+ PurgeLogger.logInfo("Purging expired data...", pluginName);
+ PluginDao dao = null;
- EDEXUtil.getMessageProducer().sendAsyncUri( PLUGIN_PURGED_TOPIC, pluginName );
-
- } else {
- Method m = dao.getClass().getMethod("purgeExpiredData",
- new Class[] {});
- if (m != null) {
- if (m.getDeclaringClass().equals(PluginDao.class)) {
- PurgeLogger
- .logWarn(
- "Unable to purge data. This plugin does not specify a record class and does not implement a custom purger.",
- pluginName);
- } else {
- if (this.purgeType.equals(PURGE_JOB_TYPE.PURGE_EXPIRED)) {
- dao.purgeExpiredData();
- } else {
- dao.purgeAllData();
- }
+ try {
+ dao = PluginFactory.getInstance().getPluginDao(pluginName);
+ if (dao.getDaoClass() != null) {
+ dao.purgeExpiredData();
- PurgeLogger.logInfo("Data successfully Purged!", pluginName);
+ PurgeLogger.logInfo("Data successfully Purged!", pluginName);
- EDEXUtil.getMessageProducer().sendAsyncUri( PLUGIN_PURGED_TOPIC, pluginName );
- }
- }
- }
- } catch (Exception e) {
- failed = true;
- // keep getting next exceptions with sql exceptions to ensure
- // we can see the underlying error
- PurgeLogger
- .logError("Error purging expired data!\n", pluginName, e);
- Throwable t = e.getCause();
- while (t != null) {
- if (t instanceof SQLException) {
- SQLException se = ((SQLException) t).getNextException();
- PurgeLogger.logError("Next exception:", pluginName, se);
- }
- t = t.getCause();
- }
- } finally {
- ClusterTask purgeLock = PurgeManager.getInstance().getPurgeLock();
- try {
- /*
- * Update the status accordingly if the purge failed or
- * succeeded
- */
- PurgeDao purgeDao = new PurgeDao();
- PurgeJobStatus status = purgeDao
- .getJobForPlugin(this.pluginName);
- if (status == null) {
- PurgeLogger.logError(
- "Purge job completed but no status object found!",
- this.pluginName);
- } else {
- if (failed) {
- status.incrementFailedCount();
- if (status.getFailedCount() >= PurgeManager
- .getInstance().getFatalFailureCount()) {
- PurgeLogger
- .logFatal(
- "Purger for this plugin has reached or exceeded consecutive failure limit of "
- + PurgeManager
- .getInstance()
- .getFatalFailureCount()
- + ". Data will no longer being purged for this plugin.",
- pluginName);
- } else {
- PurgeLogger.logError("Purge job has failed "
- + status.getFailedCount()
- + " consecutive times.", this.pluginName);
- // Back the start time off by half an hour to try to
- // purgin soon, don't want to start immediately so
- // it doesn't ping pong between servers in a time
- // out scenario
- Date startTime = status.getStartTime();
- startTime.setTime(startTime.getTime() - (1800000));
- }
- } else {
- status.setFailedCount(0);
- }
+ EDEXUtil.getMessageProducer().sendAsyncUri(PLUGIN_PURGED_TOPIC,
+ pluginName);
- /*
- * This purger thread has exceeded the time out duration but
- * finally finished. Output a message and update the status
- */
- int deadPurgeJobAge = PurgeManager.getInstance()
- .getDeadPurgeJobAge();
- Calendar purgeTimeOutLimit = Calendar.getInstance();
- purgeTimeOutLimit.setTimeZone(TimeZone.getTimeZone("GMT"));
- purgeTimeOutLimit.add(Calendar.MINUTE, -deadPurgeJobAge);
- if (startTime < purgeTimeOutLimit.getTimeInMillis()) {
- PurgeLogger
- .logInfo(
- "Purge job has recovered from timed out state!!",
- pluginName);
- }
- status.setRunning(false);
- purgeDao.update(status);
- /*
- * Log execution times
- */
- long executionTime = getAge();
- long execTimeInMinutes = executionTime / 60000;
- if (execTimeInMinutes > 0) {
- PurgeLogger.logInfo("Purge run time: " + executionTime
- + " ms (" + execTimeInMinutes + " minutes)",
- this.pluginName);
- } else {
- PurgeLogger.logInfo("Purge run time: " + executionTime
- + " ms", this.pluginName);
- }
- }
- } catch (Throwable e) {
- PurgeLogger
- .logError(
- "An unexpected error occurred upon completion of the purge job",
- this.pluginName, e);
- } finally {
- ClusterLockUtils.unlock(purgeLock, false);
- }
- }
- }
+ } else {
+ Method m = dao.getClass().getMethod("purgeExpiredData",
+ new Class[] {});
+ if (m != null) {
+ if (m.getDeclaringClass().equals(PluginDao.class)) {
+ PurgeLogger
+ .logWarn(
+ "Unable to purge data. This plugin does not specify a record class and does not implement a custom purger.",
+ pluginName);
+ } else {
+ if (this.purgeType.equals(PURGE_JOB_TYPE.PURGE_EXPIRED)) {
+ dao.purgeExpiredData();
+ } else {
+ dao.purgeAllData();
+ }
- public void printTimedOutMessage(int deadPurgeJobAge) {
- // only print message every 5 minutes
- if (System.currentTimeMillis() - lastTimeOutMessage > 300000) {
- PurgeLogger.logFatal(
- "Purger running time has exceeded timeout duration of "
- + deadPurgeJobAge
- + " minutes. Current running time: "
- + (getAge() / 60000) + " minutes", pluginName);
- printStackTrace();
- }
- }
+ PurgeLogger.logInfo("Data successfully Purged!",
+ pluginName);
- /**
- * Prints the stack trace for this job thread.
- */
- public void printStackTrace() {
- StringBuffer buffer = new StringBuffer();
- buffer.append("Stack trace for Purge Job Thread:\n");
- buffer.append(getStackTrace(this));
- // If this thread is blocked, output the stack traces for the other
- // blocked threads to assist in determining the source of the
- // deadlocked
- // threads
- if (this.getState().equals(State.BLOCKED)) {
- buffer.append("\tDUMPING OTHER BLOCKED THREADS\n");
- buffer.append(getBlockedStackTraces());
+ EDEXUtil.getMessageProducer().sendAsyncUri(
+ PLUGIN_PURGED_TOPIC, pluginName);
+ }
+ }
+ }
+ } catch (Exception e) {
+ failed = true;
+ // keep getting next exceptions with sql exceptions to ensure
+ // we can see the underlying error
+ PurgeLogger
+ .logError("Error purging expired data!\n", pluginName, e);
+ Throwable t = e.getCause();
+ while (t != null) {
+ if (t instanceof SQLException) {
+ SQLException se = ((SQLException) t).getNextException();
+ PurgeLogger.logError("Next exception:", pluginName, se);
+ }
+ t = t.getCause();
+ }
+ } finally {
+ ClusterTask purgeLock = PurgeManager.getInstance().getPurgeLock();
+ try {
+ /*
+ * Update the status accordingly if the purge failed or
+ * succeeded
+ */
+ PurgeDao purgeDao = new PurgeDao();
+ PurgeJobStatus status = purgeDao
+ .getJobForPlugin(this.pluginName);
+ if (status == null) {
+ PurgeLogger.logError(
+ "Purge job completed but no status object found!",
+ this.pluginName);
+ } else {
+ if (failed) {
+ status.incrementFailedCount();
+ if (status.getFailedCount() >= PurgeManager
+ .getInstance().getFatalFailureCount()) {
+ PurgeLogger
+ .logFatal(
+ "Purger for this plugin has reached or exceeded consecutive failure limit of "
+ + PurgeManager
+ .getInstance()
+ .getFatalFailureCount()
+ + ". Data will no longer being purged for this plugin.",
+ pluginName);
+ } else {
+ PurgeLogger.logError("Purge job has failed "
+ + status.getFailedCount()
+ + " consecutive times.", this.pluginName);
+ // Back the start time off by half an hour to try to
+ // purgin soon, don't want to start immediately so
+ // it doesn't ping pong between servers in a time
+ // out scenario
+ Date startTime = status.getStartTime();
+ startTime.setTime(startTime.getTime() - (1800000));
+ }
+ } else {
+ status.setFailedCount(0);
+ }
- }
- PurgeLogger.logError(buffer.toString(), this.pluginName);
+ /*
+ * This purger thread has exceeded the time out duration but
+ * finally finished. Output a message and update the status
+ */
+ int deadPurgeJobAge = PurgeManager.getInstance()
+ .getDeadPurgeJobAge();
+ Calendar purgeTimeOutLimit = Calendar.getInstance();
+ purgeTimeOutLimit.setTimeZone(TimeZone.getTimeZone("GMT"));
+ purgeTimeOutLimit.add(Calendar.MINUTE, -deadPurgeJobAge);
+ if (startTime < purgeTimeOutLimit.getTimeInMillis()) {
+ PurgeLogger
+ .logInfo(
+ "Purge job has recovered from timed out state!!",
+ pluginName);
+ }
+ status.setRunning(false);
+ purgeDao.update(status);
+ /*
+ * Log execution times
+ */
+ long executionTime = getAge();
+ long execTimeInMinutes = executionTime / 60000;
+ if (execTimeInMinutes > 0) {
+ PurgeLogger.logInfo("Purge run time: " + executionTime
+ + " ms (" + execTimeInMinutes + " minutes)",
+ this.pluginName);
+ } else {
+ PurgeLogger.logInfo("Purge run time: " + executionTime
+ + " ms", this.pluginName);
+ }
+ }
+ } catch (Throwable e) {
+ PurgeLogger
+ .logError(
+ "An unexpected error occurred upon completion of the purge job",
+ this.pluginName, e);
+ } finally {
+ ClusterLockUtils.unlock(purgeLock, false);
+ }
+ }
+ }
- }
+ public void printTimedOutMessage(int deadPurgeJobAge) {
+ // only print message every 5 minutes
+ if (System.currentTimeMillis() - lastTimeOutMessage > 300000) {
+ PurgeLogger.logFatal(
+ "Purger running time has exceeded timeout duration of "
+ + deadPurgeJobAge
+ + " minutes. Current running time: "
+ + (getAge() / 60000) + " minutes", pluginName);
+ printStackTrace();
+ }
+ }
- /**
- * Gets the stack traces for all other threads in the BLOCKED state in the
- * JVM
- *
- * @return The stack traces for all other threads in the BLOCKED state in
- * the JVM
- */
- private String getBlockedStackTraces() {
- StringBuffer buffer = new StringBuffer();
- Map threads = Thread.getAllStackTraces();
- for (Thread t : threads.keySet()) {
- if (t.getState().equals(State.BLOCKED)) {
- if (t.getId() != this.getId()) {
- buffer.append(getStackTrace(t));
- }
- }
- }
+ /**
+ * Prints the stack trace for this job thread.
+ */
+ public void printStackTrace() {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append("Stack trace for Purge Job Thread:\n");
+ buffer.append(getStackTrace(this));
+ // If this thread is blocked, output the stack traces for the other
+ // blocked threads to assist in determining the source of the
+ // deadlocked
+ // threads
+ if (this.getState().equals(State.BLOCKED)) {
+ buffer.append("\tDUMPING OTHER BLOCKED THREADS\n");
+ buffer.append(getBlockedStackTraces());
- return buffer.toString();
- }
+ }
+ PurgeLogger.logError(buffer.toString(), this.pluginName);
- /**
- * Gets the stack trace for the given thread
- *
- * @param thread
- * The thread to get the stack trace for
- * @return The stack trace as a String
- */
- private String getStackTrace(Thread thread) {
- StringBuffer buffer = new StringBuffer();
- StackTraceElement[] stack = Thread.getAllStackTraces().get(thread);
- buffer.append("\tThread ID: ").append(thread.getId())
- .append(" Thread state: ").append(this.getState())
- .append("\n");
- if (stack == null) {
- buffer.append("No stack trace could be retrieved for this thread");
- } else {
- for (int i = 0; i < stack.length; i++) {
- buffer.append("\t\t").append(stack[i]).append("\n");
- }
- }
- return buffer.toString();
- }
+ }
- public long getStartTime() {
- return startTime;
- }
+ /**
+ * Gets the stack traces for all other threads in the BLOCKED state in the
+ * JVM
+ *
+ * @return The stack traces for all other threads in the BLOCKED state in
+ * the JVM
+ */
+ private String getBlockedStackTraces() {
+ StringBuffer buffer = new StringBuffer();
+ Map threads = Thread.getAllStackTraces();
+ for (Thread t : threads.keySet()) {
+ if (t.getState().equals(State.BLOCKED)) {
+ if (t.getId() != this.getId()) {
+ buffer.append(getStackTrace(t));
+ }
+ }
+ }
- public long getAge() {
- return System.currentTimeMillis() - startTime;
- }
+ return buffer.toString();
+ }
+
+ /**
+ * Gets the stack trace for the given thread
+ *
+ * @param thread
+ * The thread to get the stack trace for
+ * @return The stack trace as a String
+ */
+ private String getStackTrace(Thread thread) {
+ StringBuffer buffer = new StringBuffer();
+ StackTraceElement[] stack = Thread.getAllStackTraces().get(thread);
+ buffer.append("\tThread ID: ").append(thread.getId())
+ .append(" Thread state: ").append(this.getState())
+ .append("\n");
+ if (stack == null) {
+ buffer.append("No stack trace could be retrieved for this thread");
+ } else {
+ for (StackTraceElement element : stack) {
+ buffer.append("\t\t").append(element).append("\n");
+ }
+ }
+ return buffer.toString();
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public long getAge() {
+ return System.currentTimeMillis() - startTime;
+ }
}
diff --git a/edexOsgi/com.raytheon.uf.edex.tafqueue/src/com/raytheon/uf/edex/tafqueue/TafQueueRequestHandler.java b/edexOsgi/com.raytheon.uf.edex.tafqueue/src/com/raytheon/uf/edex/tafqueue/TafQueueRequestHandler.java
index b148ef0962..d10d80039d 100644
--- a/edexOsgi/com.raytheon.uf.edex.tafqueue/src/com/raytheon/uf/edex/tafqueue/TafQueueRequestHandler.java
+++ b/edexOsgi/com.raytheon.uf.edex.tafqueue/src/com/raytheon/uf/edex/tafqueue/TafQueueRequestHandler.java
@@ -45,7 +45,7 @@ import com.raytheon.uf.edex.database.DataAccessLayerException;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* May 1, 2012 14715 rferrel Initial creation
- *
+ * May 08, 2013 1814 rjpeter Added time to live to topic
*
*
* @author rferrel
@@ -92,8 +92,7 @@ public class TafQueueRequestHandler implements IRequestHandler
case GET_TAFS:
response = new ServerResponse();
idList = (List) request.getArgument();
- List records = (List) dao
- .getRecordsById(idList);
+ List records = dao.getRecordsById(idList);
makeTafs(records, response);
break;
case REMOVE_SELECTED:
@@ -111,7 +110,7 @@ public class TafQueueRequestHandler implements IRequestHandler
+ " forecast(s) removed.");
}
makeList(state, dao, response);
- if (state == TafQueueState.PENDING && numRemoved > 0) {
+ if ((state == TafQueueState.PENDING) && (numRemoved > 0)) {
sendNotification(Type.REMOVE_SELECTED);
}
break;
@@ -193,6 +192,6 @@ public class TafQueueRequestHandler implements IRequestHandler
throws SerializationException, EdexException {
byte[] message = SerializationUtil.transformToThrift(type.toString());
EDEXUtil.getMessageProducer().sendAsyncUri(
- "jms-generic:topic:tafQueueChanged", message);
+ "jms-generic:topic:tafQueueChanged?timeToLive=60000", message);
}
}
diff --git a/edexOsgi/com.raytheon.uf.edex.useradmin/res/spring/useradmin-request.xml b/edexOsgi/com.raytheon.uf.edex.useradmin/res/spring/useradmin-request.xml
index 57271e585e..cc9ccca7b4 100644
--- a/edexOsgi/com.raytheon.uf.edex.useradmin/res/spring/useradmin-request.xml
+++ b/edexOsgi/com.raytheon.uf.edex.useradmin/res/spring/useradmin-request.xml
@@ -10,7 +10,7 @@
+ value="jms-generic:topic:user.authentication.changed?timeToLive=60000&destinationResolver=#qpidDurableResolver" />
diff --git a/rpms/awips2.qpid/0.18/SOURCES/virtualhosts.xml b/rpms/awips2.qpid/0.18/SOURCES/virtualhosts.xml
index 5bf0230387..3fa62abf79 100644
--- a/rpms/awips2.qpid/0.18/SOURCES/virtualhosts.xml
+++ b/rpms/awips2.qpid/0.18/SOURCES/virtualhosts.xml
@@ -27,6 +27,7 @@
- Date Ticket# Engineer Description
- ============ ========== =========== ==========================
- Mar 18, 2013 1814 rjpeter Initial Creation
+ - May 08, 2013 1814 rjpeter Remove slow consumer disconnect
-
-->
@@ -39,11 +40,6 @@
org.apache.qpid.server.store.derby.DerbyMessageStore
${QPID_WORK}/messageStore
-
-
- 5
- minutes
-
amq.direct
@@ -63,6 +59,8 @@
true
-
-
-
-
- 104857600
-
-
-
- 600000
-
-
-
- 5000
-
-
-
- topicDelete
-
-
-
-
-
-
-
-