Issue #2726: Add endpoint tracking to MessageProducer

Change-Id: I2368b112dd008dba6d699642b0b25abec781927d

Former-commit-id: 7ceffcc229824730e54c34a38cf178a2282cd920
This commit is contained in:
Richard Peter 2014-04-18 12:48:39 -05:00
parent b4af2bc502
commit d80fe56528
2 changed files with 54 additions and 15 deletions

View file

@ -31,6 +31,7 @@ import javax.naming.ConfigurationException;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
@ -43,6 +44,7 @@ import com.raytheon.uf.common.message.IMessage;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.util.Pair;
import com.raytheon.uf.common.util.collections.BoundedMap;
import com.raytheon.uf.edex.core.EDEXUtil;
import com.raytheon.uf.edex.core.EdexException;
import com.raytheon.uf.edex.core.IMessageProducer;
@ -59,7 +61,8 @@ import com.raytheon.uf.edex.esb.camel.context.ContextManager;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Nov 14, 2008 njensen Initial creation.
* Mar 27, 2014 2726 rjpeter Modified for graceful shutdown changes.
* Mar 27, 2014 2726 rjpeter Modified for graceful shutdown changes,
* added tracking of endpoints by context.
* </pre>
*
* @author njensen
@ -79,6 +82,8 @@ public class MessageProducer implements IMessageProducer, InterceptStrategy {
private final ConcurrentMap<CamelContext, ProducerTemplate> contextProducerMap = new ConcurrentHashMap<CamelContext, ProducerTemplate>();
private final ConcurrentMap<CamelContext, Map<String, Endpoint>> contextUriEndpointMap = new ConcurrentHashMap<CamelContext, Map<String, Endpoint>>();
/**
* List of messages waiting to be sent.
*/
@ -151,14 +156,16 @@ public class MessageProducer implements IMessageProducer, InterceptStrategy {
}
try {
ProducerTemplate template = getProducerTemplateForUri(uri);
Pair<ProducerTemplate, Endpoint> ctxAndTemplate = getProducerTemplateAndEndpointForUri(uri);
Map<String, Object> headers = getHeaders(message);
ProducerTemplate template = ctxAndTemplate.getFirst();
Endpoint ep = ctxAndTemplate.getSecond();
if (headers != null) {
template.sendBodyAndHeaders(uri, ExchangePattern.InOnly,
template.sendBodyAndHeaders(ep, ExchangePattern.InOnly,
message, headers);
} else {
template.sendBody(uri, ExchangePattern.InOnly, message);
template.sendBody(ep, ExchangePattern.InOnly, message);
}
} catch (Exception e) {
throw new EdexException("Error sending asynchronous message: "
@ -184,14 +191,16 @@ public class MessageProducer implements IMessageProducer, InterceptStrategy {
String uri = getContextData().getEndpointUriForRouteId(endpoint);
try {
ProducerTemplate template = getProducerTemplateForUri(uri);
Pair<ProducerTemplate, Endpoint> ctxAndTemplate = getProducerTemplateAndEndpointForUri(uri);
Map<String, Object> headers = getHeaders(message);
ProducerTemplate template = ctxAndTemplate.getFirst();
Endpoint ep = ctxAndTemplate.getSecond();
if (headers != null) {
return template.sendBodyAndHeaders(uri, ExchangePattern.OutIn,
return template.sendBodyAndHeaders(ep, ExchangePattern.OutIn,
message, headers);
} else {
return template.sendBody(uri, ExchangePattern.OutIn, message);
return template.sendBody(ep, ExchangePattern.OutIn, message);
}
} catch (Exception e) {
throw new EdexException("Error sending synchronous message: "
@ -226,14 +235,14 @@ public class MessageProducer implements IMessageProducer, InterceptStrategy {
/**
* Returns the a producer template for the CamelContext this thread is
* currently a part of. If thread is not part of a context, will use context
* of the uri. If the uri is not registered in this jvm, will use the first
* context available.
* currently a part of and the endpoint for the URI. If thread is not part
* of a context, will use context of the uri. If the uri is not registered
* in this jvm, will use the first context available.
*
* @return
*/
protected ProducerTemplate getProducerTemplateForUri(String uri)
throws ConfigurationException, EdexException {
protected Pair<ProducerTemplate, Endpoint> getProducerTemplateAndEndpointForUri(
String uri) throws ConfigurationException, EdexException {
CamelContext ctx = currentThreadContext.get();
if (ctx == null) {
@ -273,7 +282,36 @@ public class MessageProducer implements IMessageProducer, InterceptStrategy {
tmp = prev;
}
}
return tmp;
/*
* Caching endpoint for the uri ourselves. Camel considers various
* endpoints non singleton. So for things like jms-topic, a new
* endpoint is created everytime a message is sent to the URI
* instead of reusing one that was already created. This is in part
* due to the lack of tracking per route. We are ok with caching per
* context as we don't operate on routes individually only contexts
* as a whole.
*/
Map<String, Endpoint> endpointMap = contextUriEndpointMap.get(ctx);
if (endpointMap == null) {
endpointMap = new BoundedMap<String, Endpoint>(100);
Map<String, Endpoint> prev = contextUriEndpointMap.putIfAbsent(
ctx, endpointMap);
if (prev != null) {
endpointMap = prev;
}
}
Endpoint ep = null;
synchronized (endpointMap) {
ep = endpointMap.get(uri);
if (ep == null) {
ep = ctx.getEndpoint(uri);
endpointMap.put(uri, ep);
}
}
return new Pair<ProducerTemplate, Endpoint>(tmp, ep);
}
throw new ConfigurationException(

View file

@ -140,7 +140,7 @@ public class ContextManager implements ApplicationContextAware,
* Last time dependency mapping was generated. Used to periodically
* regenerate the dependency mappings.
*/
private final long lastDependencyTime = 0;
private long lastDependencyTime = 0;
public static ContextManager getInstance() {
return instance;
@ -219,7 +219,8 @@ public class ContextManager implements ApplicationContextAware,
synchronized (this) {
long millis = System.currentTimeMillis();
if ((dependencyMapping == null)
|| (millis > (lastDependencyTime + TimeUtil.MILLIS_PER_MINUTE))) {
|| (millis > (lastDependencyTime + (3 * TimeUtil.MILLIS_PER_MINUTE)))) {
lastDependencyTime = millis;
dependencyMapping = new ContextDependencyMapping(
getContextData(), suppressExceptions);
}