Issue #1814: Remove slow consumer disconnect. Add time to live to all topics

Change-Id: I5aeee431942e2b246e76035a18414a4b801deba7

Former-commit-id: 6d6233b5f9 [formerly c2dcbedf63] [formerly ae84ced7be] [formerly ae84ced7be [formerly 4f3e86574b]] [formerly 6d6233b5f9 [formerly c2dcbedf63] [formerly ae84ced7be] [formerly ae84ced7be [formerly 4f3e86574b]] [formerly 697297803c [formerly ae84ced7be [formerly 4f3e86574b] [formerly 697297803c [formerly c46f5e242493d67f2bb89b2bbba7dcaf9b0f81f4]]]]]
Former-commit-id: 697297803c
Former-commit-id: 981aae5170 [formerly 8917cc0b8d] [formerly 139c234452] [formerly 5e82069cb1df74459c5e285080c6b53dbc08ef17 [formerly c40599aa14bfde53120b5b8c8af969cd6cc57c9e] [formerly 139c234452 [formerly cfaeeaff5f]]]
Former-commit-id: c3fa3f4e6db401b9afa6a79aeec3409e95cbfe5c [formerly f2f7acf2e73af54b0346602508ceed49c091f885] [formerly a2cfd38f6a [formerly dfba22e206]]
Former-commit-id: a2cfd38f6a
Former-commit-id: 2649d1d442
This commit is contained in:
Richard Peter 2013-05-08 17:28:01 -05:00
parent 140cfc4e76
commit 818da9433a
15 changed files with 281 additions and 301 deletions

View file

@ -22,6 +22,7 @@ package com.raytheon.uf.viz.core.comm;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import javax.jms.ConnectionFactory;
import org.apache.qpid.client.AMQConnectionFactory;
@ -41,6 +42,7 @@ import com.raytheon.uf.viz.core.VizApp;
* Nov 2, 2009 #3067 chammack Send all jms connections through failover:// to properly reconnect
* Nov 2, 2011 #7391 bkowal Ensure that the generated WsId is properly formatted to be
* included in a url.
* May 09, 2013 1814 rjpeter Updated prefetch to 10.
* </pre>
*
* @author chammack
@ -50,7 +52,7 @@ public class JMSConnection {
private static JMSConnection instance;
private String jndiProviderUrl;
private final String jndiProviderUrl;
private AMQConnectionFactory factory;
@ -76,10 +78,11 @@ public class JMSConnection {
// reconnect
this.factory = new AMQConnectionFactory(
"amqp://guest:guest@"
+ URLEncoder.encode(VizApp.getWsId().toString(), "UTF-8")
+ URLEncoder.encode(VizApp.getWsId().toString(),
"UTF-8")
+ "/edex?brokerlist='"
+ this.jndiProviderUrl
+ "?connecttimeout='5000'&heartbeat='0''&maxprefetch='0'&sync_publish='all'&failover='nofailover'");
+ "?connecttimeout='5000'&heartbeat='0''&maxprefetch='10'&sync_publish='all'&failover='nofailover'");
} catch (URLSyntaxException e) {
// TODO Auto-generated catch block
e.printStackTrace();

View file

@ -18,7 +18,7 @@
<!-- specify the connection to the broker (qpid) -->
<!-- MaxPrefetch set at 0, due to DataPool routers getting messages backed up behind long running tasks -->
<bean id="amqConnectionFactory" class="org.apache.qpid.client.AMQConnectionFactory">
<constructor-arg type="java.lang.String" value="amqp://guest:guest@/edex?brokerlist='tcp://${broker.addr}?retries='9999'&amp;connecttimeout='5000'&amp;connectdelay='5000''&amp;maxprefetch='0'&amp;sync_publish='all'&amp;sync_ack='true'"/>
<constructor-arg type="java.lang.String" value="amqp://guest:guest@/edex?brokerlist='tcp://${broker.addr}?retries='9999'&amp;connecttimeout='5000'&amp;connectdelay='5000'&amp;heartbeat='0''&amp;maxprefetch='0'&amp;sync_publish='all'&amp;sync_ack='true'"/>
</bean>
<bean id="jmsPooledConnectionFactory" class="com.raytheon.uf.common.jms.JmsPooledConnectionFactory">
@ -239,14 +239,14 @@
<route id="alertVizNotify">
<from uri="vm:edex.alertVizNotification" />
<bean ref="serializationUtil" method="transformToThrift" />
<to uri="jms-generic:topic:edex.alerts.msg?deliveryPersistent=false" />
<to uri="jms-generic:topic:edex.alerts.msg?deliveryPersistent=false&amp;timeToLive=60000" />
</route>
<!-- Route to send text products to alarm/alert -->
<route id="alarmAlertNotify">
<from uri="vm:edex.alarmAlertNotification" />
<bean ref="serializationUtil" method="transformToThrift" />
<to uri="jms-generic:topic:edex.alarms.msg?deliveryPersistent=false" />
<to uri="jms-generic:topic:edex.alarms.msg?deliveryPersistent=false&amp;timeToLive=60000" />
</route>
<route id="siteActivationRoute">
@ -267,7 +267,7 @@
<method bean="siteActivateNotifyFilter" method="isSiteActivateNotification" />
<bean ref="siteActivationMonitor" method="handleNotification"/>
<bean ref="serializationUtil" method="transformToThrift" />
<to uri="jms-generic:topic:edex.alerts.siteActivate" />
<to uri="jms-generic:topic:edex.alerts.siteActivate?timeToLive=60000" />
</filter>
</route>

View file

@ -70,7 +70,7 @@
<filter>
<method bean="gfeNotifyFilter" method="isGfeNotification" />
<bean ref="serializationUtil" method="transformToThrift" />
<to uri="jms-generic:topic:edex.alerts.gfe" />
<to uri="jms-generic:topic:edex.alerts.gfe?timeToLive=60000" />
</filter>
<doCatch>
<exception>java.lang.Throwable</exception>

View file

@ -118,7 +118,7 @@
<!-- Convert the topic into a queue so only one consumer gets each message and we still have competing consumers. -->
<route id="gfeDataURINotificationQueueRoute">
<from uri="jms-gfe-notify:topic:edex.alerts?durableSubscriptionName=gfeNotificationSubscription" />
<from uri="jms-gfe-notify:topic:edex.alerts?clientId=gfeNotify&amp;durableSubscriptionName=gfeNotificationSubscription" />
<doTry>
<to uri="jms-generic:queue:gfeDataURINotification"/>
<doCatch>

View file

@ -204,7 +204,7 @@ public class GfeIngestNotificationFilter {
// if we don't have the other component for this
// fcstHour
if (otherTimes == null
if ((otherTimes == null)
|| !otherTimes.remove(fcstHour)) {
// need to wait for other component
ParmID compPid = new ParmID(d2dParamName,
@ -371,7 +371,8 @@ public class GfeIngestNotificationFilter {
throws Exception {
byte[] message = SerializationUtil.transformToThrift(notifications);
EDEXUtil.getMessageProducer().sendAsyncUri(
"jms-generic:topic:gfeGribNotification", message);
"jms-generic:topic:gfeGribNotification?timeToLive=60000",
message);
SendNotifications.send(notifications);
}

View file

@ -83,7 +83,7 @@ import com.raytheon.uf.edex.database.query.DatabaseQuery;
* that sent notification to D2DParmIdCache.
* 01/14/13 #1469 bkowal Removed the hdf5 data directory
* 04/08/13 #1293 bkowal Removed references to hdffileid.
*
* 05/08/13 1814 rjpeter Added time to live to topic message
* </pre>
*
* @author bphillip
@ -101,7 +101,7 @@ public class GribDao extends PluginDao {
private static final String THINNED_PTS = "thinnedPts";
private static final String PURGE_MODEL_CACHE_TOPIC = "jms-generic:topic:purgeGribModelCache";
private static final String PURGE_MODEL_CACHE_TOPIC = "jms-generic:topic:purgeGribModelCache?timeToLive=60000";
/**
* Creates a new GribPyDao object
@ -171,7 +171,7 @@ public class GribDao extends PluginDao {
IPersistable obj) throws Exception {
GribRecord gribRec = (GribRecord) obj;
if (gribRec.getMessageData() != null
if ((gribRec.getMessageData() != null)
&& !gribRec.getModelInfo().getParameterName().equals("Missing")) {
AbstractStorageRecord storageRecord = null;
AbstractStorageRecord localSection = null;
@ -182,8 +182,8 @@ public class GribDao extends PluginDao {
* Stores the binary data to the HDF5 data store
*/
if (gribRec.getMessageData() instanceof float[]) {
if (gribRec.getSpatialObject() != null
&& gribRec.getMessageData() != null) {
if ((gribRec.getSpatialObject() != null)
&& (gribRec.getMessageData() != null)) {
long[] sizes = new long[] {
(gribRec.getSpatialObject()).getNx(),
(gribRec.getSpatialObject()).getNy() };
@ -316,7 +316,7 @@ public class GribDao extends PluginDao {
for (PluginDataObject record : records) {
GribRecord rec = (GribRecord) record;
GribModel model = rec.getModelInfo();
if (model.getParameterName() == null
if ((model.getParameterName() == null)
|| model.getParameterName().equals("Missing")) {
logger.info("Discarding record due to missing or unknown parameter mapping: "
+ record);
@ -327,7 +327,7 @@ public class GribDao extends PluginDao {
if (level != null) {
MasterLevel ml = level.getMasterLevel();
if (ml != null
if ((ml != null)
&& !LevelFactory.UNKNOWN_LEVEL.equals(ml.getName())) {
validLevel = true;
}
@ -362,7 +362,7 @@ public class GribDao extends PluginDao {
for (PluginDataObject record : records) {
GribRecord rec = (GribRecord) record;
GribModel model = rec.getModelInfo();
if (model.getParameterName() == null
if ((model.getParameterName() == null)
|| model.getParameterName().equals("Missing")) {
logger.info("Discarding record due to missing or unknown parameter mapping: "
+ record);
@ -373,7 +373,7 @@ public class GribDao extends PluginDao {
if (level != null) {
MasterLevel ml = level.getMasterLevel();
if (ml != null
if ((ml != null)
&& !LevelFactory.UNKNOWN_LEVEL.equals(ml.getName())) {
validLevel = true;
}

View file

@ -152,7 +152,7 @@
<method bean="textDecoder" method="separator" />
<bean ref="textDecoder" method="transformToSimpleString" />
<bean ref="serializationUtil" method="transformToThrift"/>
<to uri="jms-text:topic:edex.alarms.msg" />
<to uri="jms-text:topic:edex.alarms.msg?timeToLive=60000" />
</split>
</route>

View file

@ -44,7 +44,7 @@
<route id="utilityNotify">
<from uri="vm://utilityNotify" />
<bean ref="serializationUtil" method="transformToThrift" />
<to uri="jms-generic:topic:edex.alerts.utility" />
<to uri="jms-generic:topic:edex.alerts.utility?timeToLive=60000" />
</route>
</camelContext>

View file

@ -44,7 +44,7 @@
<route id="vtecNotify">
<from uri="vm:edex.vtecAlert" />
<bean ref="serializationUtil" method="transformToThrift" />
<to uri="jms-generic:topic:edex.alerts.vtec" />
<to uri="jms-generic:topic:edex.alerts.vtec?timeToLive=60000" />
</route>
<route id="practiceVtecRoute">
<from uri="jms-activetable:queue:practiceActiveTable?concurrentConsumers=1" />

View file

@ -46,7 +46,7 @@
<bean class="com.raytheon.uf.edex.plugin.grid.dao.GridDao"
factory-method="setPurgeModelCacheTopic">
<constructor-arg value="jms-generic:topic:purgeGridInfoCache" />
<constructor-arg value="jms-generic:topic:purgeGridInfoCache?timeToLive=60000" />
</bean>
<camelContext id="grid-common-camel" xmlns="http://camel.apache.org/schema/spring"

View file

@ -37,7 +37,7 @@ import com.raytheon.uf.edex.plugin.grid.dao.GridDao;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Nov 2, 2012 dgilling Initial creation
*
* May 08, 2013 1814 rjpeter Added time to live to topic.
* </pre>
*
* @author dgilling
@ -47,7 +47,7 @@ import com.raytheon.uf.edex.plugin.grid.dao.GridDao;
public class DeleteAllGridDataHandler implements
IRequestHandler<DeleteAllGridDataRequest> {
private static final String PLUGIN_PURGED_TOPIC = "jms-generic:topic:pluginPurged";
private static final String PLUGIN_PURGED_TOPIC = "jms-generic:topic:pluginPurged?timeToLive=60000";
/*
* (non-Javadoc)

View file

@ -45,7 +45,7 @@ import com.raytheon.uf.edex.database.purge.PurgeLogger;
* ------------ ---------- ----------- --------------------------
* Apr 19, 2012 #470 bphillip Initial creation
* Jun 20, 2012 NC#606 ghull send purge-complete messages
*
* May 08, 2013 1814 rjpeter Added time to live to topic
* </pre>
*
* @author bphillip
@ -58,7 +58,7 @@ public class PurgeJob extends Thread {
PURGE_ALL, PURGE_EXPIRED
}
public static final String PLUGIN_PURGED_TOPIC = "jms-generic:topic:pluginPurged";
public static final String PLUGIN_PURGED_TOPIC = "jms-generic:topic:pluginPurged?timeToLive=60000";
private long startTime;
@ -66,13 +66,13 @@ public class PurgeJob extends Thread {
public static final String TASK_NAME = "Purge Plugin Data";
/** The plugin associated with this purge job */
private String pluginName;
private final String pluginName;
/** The type of purge job being executed */
private PURGE_JOB_TYPE purgeType;
private final PURGE_JOB_TYPE purgeType;
/** Last time job has printed a timed out message */
private long lastTimeOutMessage = 0;
private final long lastTimeOutMessage = 0;
/**
* Creates a new Purge job for the specified plugin.
@ -89,6 +89,7 @@ public class PurgeJob extends Thread {
this.purgeType = purgeType;
}
@Override
public void run() {
// Flag used to track if this job has failed
@ -104,7 +105,8 @@ public class PurgeJob extends Thread {
PurgeLogger.logInfo("Data successfully Purged!", pluginName);
EDEXUtil.getMessageProducer().sendAsyncUri( PLUGIN_PURGED_TOPIC, pluginName );
EDEXUtil.getMessageProducer().sendAsyncUri(PLUGIN_PURGED_TOPIC,
pluginName);
} else {
Method m = dao.getClass().getMethod("purgeExpiredData",
@ -122,9 +124,11 @@ public class PurgeJob extends Thread {
dao.purgeAllData();
}
PurgeLogger.logInfo("Data successfully Purged!", pluginName);
PurgeLogger.logInfo("Data successfully Purged!",
pluginName);
EDEXUtil.getMessageProducer().sendAsyncUri( PLUGIN_PURGED_TOPIC, pluginName );
EDEXUtil.getMessageProducer().sendAsyncUri(
PLUGIN_PURGED_TOPIC, pluginName);
}
}
}
@ -295,8 +299,8 @@ public class PurgeJob extends Thread {
if (stack == null) {
buffer.append("No stack trace could be retrieved for this thread");
} else {
for (int i = 0; i < stack.length; i++) {
buffer.append("\t\t").append(stack[i]).append("\n");
for (StackTraceElement element : stack) {
buffer.append("\t\t").append(element).append("\n");
}
}
return buffer.toString();

View file

@ -45,7 +45,7 @@ import com.raytheon.uf.edex.database.DataAccessLayerException;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* May 1, 2012 14715 rferrel Initial creation
*
* May 08, 2013 1814 rjpeter Added time to live to topic
* </pre>
*
* @author rferrel
@ -92,8 +92,7 @@ public class TafQueueRequestHandler implements IRequestHandler<TafQueueRequest>
case GET_TAFS:
response = new ServerResponse<String>();
idList = (List<String>) request.getArgument();
List<TafQueueRecord> records = (List<TafQueueRecord>) dao
.getRecordsById(idList);
List<TafQueueRecord> records = dao.getRecordsById(idList);
makeTafs(records, response);
break;
case REMOVE_SELECTED:
@ -111,7 +110,7 @@ public class TafQueueRequestHandler implements IRequestHandler<TafQueueRequest>
+ " forecast(s) removed.");
}
makeList(state, dao, response);
if (state == TafQueueState.PENDING && numRemoved > 0) {
if ((state == TafQueueState.PENDING) && (numRemoved > 0)) {
sendNotification(Type.REMOVE_SELECTED);
}
break;
@ -193,6 +192,6 @@ public class TafQueueRequestHandler implements IRequestHandler<TafQueueRequest>
throws SerializationException, EdexException {
byte[] message = SerializationUtil.transformToThrift(type.toString());
EDEXUtil.getMessageProducer().sendAsyncUri(
"jms-generic:topic:tafQueueChanged", message);
"jms-generic:topic:tafQueueChanged?timeToLive=60000", message);
}
}

View file

@ -10,7 +10,7 @@
<bean id="userAuthenticationDataChangedHandler"
class="com.raytheon.uf.edex.useradmin.services.UserAuthenticationDataChangedHandler">
<constructor-arg type="java.lang.String"
value="jms-generic:topic:user.authentication.changed?destinationResolver=#qpidDurableResolver" />
value="jms-generic:topic:user.authentication.changed?timeToLive=60000&amp;destinationResolver=#qpidDurableResolver" />
</bean>
<bean factory-bean="handlerRegistry" factory-method="register">

View file

@ -27,6 +27,7 @@
- Date Ticket# Engineer Description
- ============ ========== =========== ==========================
- Mar 18, 2013 1814 rjpeter Initial Creation
- May 08, 2013 1814 rjpeter Remove slow consumer disconnect
-
-->
<virtualhosts>
@ -39,11 +40,6 @@
<class>org.apache.qpid.server.store.derby.DerbyMessageStore</class>
<environment-path>${QPID_WORK}/messageStore</environment-path>
</store>
<slow-consumer-detection>
<!-- Only check every 5 minutes -->
<delay>5</delay>
<timeunit>minutes</timeunit>
</slow-consumer-detection>
<queues>
<!-- Define default exchange -->
<exchange>amq.direct</exchange>
@ -63,6 +59,8 @@
<durable>true</durable>
<!-- Configure queues
Queues created on demand for AWIPS II
<queue>
<name>external.dropbox</name>
<external..dropbox>
@ -71,31 +69,6 @@
</queue>
-->
</queues>
<topics>
<slow-consumer-detection>
<!-- The maximum depth in bytes before -->
<!-- the policy will be applied-->
<depth>104857600</depth>
<!-- The maximum message age in milliseconds -->
<!-- before the policy will be applied -->
<messageAge>600000</messageAge>
<!-- The maximum number of message before -->
<!-- which the policy will be applied-->
<messageCount>5000</messageCount>
<!-- Policy Selection -->
<policy>
<name>topicDelete</name>
<topicDelete>
<!-- Uncomment to enable deletion of durable subscriptions that fall behind -->
<!--delete-persistent/-->
</topicDelete>
</policy>
</slow-consumer-detection>
<!-- Slow Consumer disconnect could be configured per topic. Use global configuration for now -->
</topics>
</edex>
</virtualhost>
</virtualhosts>