Issue #2726: Add endpoint tracking to MessageProducer

Change-Id: I2368b112dd008dba6d699642b0b25abec781927d

Former-commit-id: 5860aa7fed [formerly 33c3a71df1] [formerly 5860aa7fed [formerly 33c3a71df1] [formerly d80fe56528 [formerly 7ceffcc229824730e54c34a38cf178a2282cd920]]]
Former-commit-id: d80fe56528
Former-commit-id: f319040439 [formerly 01eaeb6034]
Former-commit-id: a09972d3ce
This commit is contained in:
Richard Peter 2014-04-18 12:48:39 -05:00
parent 953dd37865
commit 8d4b22c3f8
2 changed files with 54 additions and 15 deletions

View file

@ -31,6 +31,7 @@ import javax.naming.ConfigurationException;
import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor; import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext; import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange; import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern; import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor; import org.apache.camel.Processor;
@ -43,6 +44,7 @@ import com.raytheon.uf.common.message.IMessage;
import com.raytheon.uf.common.status.IUFStatusHandler; import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus; import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.util.Pair; import com.raytheon.uf.common.util.Pair;
import com.raytheon.uf.common.util.collections.BoundedMap;
import com.raytheon.uf.edex.core.EDEXUtil; import com.raytheon.uf.edex.core.EDEXUtil;
import com.raytheon.uf.edex.core.EdexException; import com.raytheon.uf.edex.core.EdexException;
import com.raytheon.uf.edex.core.IMessageProducer; import com.raytheon.uf.edex.core.IMessageProducer;
@ -59,7 +61,8 @@ import com.raytheon.uf.edex.esb.camel.context.ContextManager;
* Date Ticket# Engineer Description * Date Ticket# Engineer Description
* ------------ ---------- ----------- -------------------------- * ------------ ---------- ----------- --------------------------
* Nov 14, 2008 njensen Initial creation. * Nov 14, 2008 njensen Initial creation.
* Mar 27, 2014 2726 rjpeter Modified for graceful shutdown changes. * Mar 27, 2014 2726 rjpeter Modified for graceful shutdown changes,
* added tracking of endpoints by context.
* </pre> * </pre>
* *
* @author njensen * @author njensen
@ -79,6 +82,8 @@ public class MessageProducer implements IMessageProducer, InterceptStrategy {
private final ConcurrentMap<CamelContext, ProducerTemplate> contextProducerMap = new ConcurrentHashMap<CamelContext, ProducerTemplate>(); private final ConcurrentMap<CamelContext, ProducerTemplate> contextProducerMap = new ConcurrentHashMap<CamelContext, ProducerTemplate>();
private final ConcurrentMap<CamelContext, Map<String, Endpoint>> contextUriEndpointMap = new ConcurrentHashMap<CamelContext, Map<String, Endpoint>>();
/** /**
* List of messages waiting to be sent. * List of messages waiting to be sent.
*/ */
@ -151,14 +156,16 @@ public class MessageProducer implements IMessageProducer, InterceptStrategy {
} }
try { try {
ProducerTemplate template = getProducerTemplateForUri(uri); Pair<ProducerTemplate, Endpoint> ctxAndTemplate = getProducerTemplateAndEndpointForUri(uri);
Map<String, Object> headers = getHeaders(message); Map<String, Object> headers = getHeaders(message);
ProducerTemplate template = ctxAndTemplate.getFirst();
Endpoint ep = ctxAndTemplate.getSecond();
if (headers != null) { if (headers != null) {
template.sendBodyAndHeaders(uri, ExchangePattern.InOnly, template.sendBodyAndHeaders(ep, ExchangePattern.InOnly,
message, headers); message, headers);
} else { } else {
template.sendBody(uri, ExchangePattern.InOnly, message); template.sendBody(ep, ExchangePattern.InOnly, message);
} }
} catch (Exception e) { } catch (Exception e) {
throw new EdexException("Error sending asynchronous message: " throw new EdexException("Error sending asynchronous message: "
@ -184,14 +191,16 @@ public class MessageProducer implements IMessageProducer, InterceptStrategy {
String uri = getContextData().getEndpointUriForRouteId(endpoint); String uri = getContextData().getEndpointUriForRouteId(endpoint);
try { try {
ProducerTemplate template = getProducerTemplateForUri(uri); Pair<ProducerTemplate, Endpoint> ctxAndTemplate = getProducerTemplateAndEndpointForUri(uri);
Map<String, Object> headers = getHeaders(message); Map<String, Object> headers = getHeaders(message);
ProducerTemplate template = ctxAndTemplate.getFirst();
Endpoint ep = ctxAndTemplate.getSecond();
if (headers != null) { if (headers != null) {
return template.sendBodyAndHeaders(uri, ExchangePattern.OutIn, return template.sendBodyAndHeaders(ep, ExchangePattern.OutIn,
message, headers); message, headers);
} else { } else {
return template.sendBody(uri, ExchangePattern.OutIn, message); return template.sendBody(ep, ExchangePattern.OutIn, message);
} }
} catch (Exception e) { } catch (Exception e) {
throw new EdexException("Error sending synchronous message: " throw new EdexException("Error sending synchronous message: "
@ -226,14 +235,14 @@ public class MessageProducer implements IMessageProducer, InterceptStrategy {
/** /**
* Returns the a producer template for the CamelContext this thread is * Returns the a producer template for the CamelContext this thread is
* currently a part of. If thread is not part of a context, will use context * currently a part of and the endpoint for the URI. If thread is not part
* of the uri. If the uri is not registered in this jvm, will use the first * of a context, will use context of the uri. If the uri is not registered
* context available. * in this jvm, will use the first context available.
* *
* @return * @return
*/ */
protected ProducerTemplate getProducerTemplateForUri(String uri) protected Pair<ProducerTemplate, Endpoint> getProducerTemplateAndEndpointForUri(
throws ConfigurationException, EdexException { String uri) throws ConfigurationException, EdexException {
CamelContext ctx = currentThreadContext.get(); CamelContext ctx = currentThreadContext.get();
if (ctx == null) { if (ctx == null) {
@ -273,7 +282,36 @@ public class MessageProducer implements IMessageProducer, InterceptStrategy {
tmp = prev; tmp = prev;
} }
} }
return tmp;
/*
* Caching endpoint for the uri ourselves. Camel considers various
* endpoints non singleton. So for things like jms-topic, a new
* endpoint is created everytime a message is sent to the URI
* instead of reusing one that was already created. This is in part
* due to the lack of tracking per route. We are ok with caching per
* context as we don't operate on routes individually only contexts
* as a whole.
*/
Map<String, Endpoint> endpointMap = contextUriEndpointMap.get(ctx);
if (endpointMap == null) {
endpointMap = new BoundedMap<String, Endpoint>(100);
Map<String, Endpoint> prev = contextUriEndpointMap.putIfAbsent(
ctx, endpointMap);
if (prev != null) {
endpointMap = prev;
}
}
Endpoint ep = null;
synchronized (endpointMap) {
ep = endpointMap.get(uri);
if (ep == null) {
ep = ctx.getEndpoint(uri);
endpointMap.put(uri, ep);
}
}
return new Pair<ProducerTemplate, Endpoint>(tmp, ep);
} }
throw new ConfigurationException( throw new ConfigurationException(

View file

@ -140,7 +140,7 @@ public class ContextManager implements ApplicationContextAware,
* Last time dependency mapping was generated. Used to periodically * Last time dependency mapping was generated. Used to periodically
* regenerate the dependency mappings. * regenerate the dependency mappings.
*/ */
private final long lastDependencyTime = 0; private long lastDependencyTime = 0;
public static ContextManager getInstance() { public static ContextManager getInstance() {
return instance; return instance;
@ -219,7 +219,8 @@ public class ContextManager implements ApplicationContextAware,
synchronized (this) { synchronized (this) {
long millis = System.currentTimeMillis(); long millis = System.currentTimeMillis();
if ((dependencyMapping == null) if ((dependencyMapping == null)
|| (millis > (lastDependencyTime + TimeUtil.MILLIS_PER_MINUTE))) { || (millis > (lastDependencyTime + (3 * TimeUtil.MILLIS_PER_MINUTE)))) {
lastDependencyTime = millis;
dependencyMapping = new ContextDependencyMapping( dependencyMapping = new ContextDependencyMapping(
getContextData(), suppressExceptions); getContextData(), suppressExceptions);
} }