Issue #2726: Autoset route startup order.

Change-Id: I1f4165b6eab12207cf6b6d31964e8170ebe1bddc

Former-commit-id: b148d11ec9 [formerly b148d11ec9 [formerly 9b9d2b475f4777febf125e4013cba0d971eaf8e8]]
Former-commit-id: 4dc91f494f
Former-commit-id: c6719da76b
This commit is contained in:
Richard Peter 2014-05-05 19:30:20 -05:00
parent f48d571cb6
commit 4a140afac6
4 changed files with 101 additions and 42 deletions

View file

@ -86,8 +86,35 @@
-->
<!--
Text routes
Text routes.
If an internal route (see ContextManager.INTERNAL_ENDPOINT_TYPES) is being sent data from another internal
route in the same context it needs to come after the route that sends it data for proper startup/shutdown order.
-->
<route id="textUndecodedIngestRoute">
<from uri="jms-durable:queue:Ingest.Text?concurrentConsumers=2" />
<setHeader headerName="pluginName">
<constant>text</constant>
</setHeader>
<doTry>
<pipeline>
<bean ref="stringToFile" />
<bean ref="textDecoder" method="decodeFile" />
<bean ref="processUtil" method="log"/>
<multicast>
<!-- This route needs to go to Mark's -->
<to uri="direct:textToWatchWarn"/>
<to uri="direct:textSerializeRoute" />
<to uri="direct:textPurgeAccumulate" />
<to uri="vm:autoFaxRoute" />
</multicast>
</pipeline>
<doCatch>
<exception>java.lang.Throwable</exception>
<to uri="log:text?level=INFO"/>
</doCatch>
</doTry>
</route>
<route id="textDirectDecodedIngestRoute">
<from uri="direct-vm:textDirectDecodedIngestRoute" />
<setHeader headerName="pluginName">
@ -134,31 +161,6 @@
</doTry>
</route>
<route id="textUndecodedIngestRoute">
<from uri="jms-durable:queue:Ingest.Text?concurrentConsumers=2" />
<setHeader headerName="pluginName">
<constant>text</constant>
</setHeader>
<doTry>
<pipeline>
<bean ref="stringToFile" />
<bean ref="textDecoder" method="decodeFile" />
<bean ref="processUtil" method="log"/>
<multicast>
<!-- This route needs to go to Mark's -->
<to uri="direct:textToWatchWarn"/>
<to uri="direct:textSerializeRoute" />
<to uri="direct:textPurgeAccumulate" />
<to uri="vm:autoFaxRoute" />
</multicast>
</pipeline>
<doCatch>
<exception>java.lang.Throwable</exception>
<to uri="log:text?level=INFO"/>
</doCatch>
</doTry>
</route>
<route id="textToWatchWarnRoute">
<from uri="direct:textToWatchWarn" />
<bean ref="textDecoder" method="transformToProductIds" />

View file

@ -20,6 +20,7 @@
package com.raytheon.uf.edex.esb.camel.context;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@ -38,6 +39,7 @@ import javax.naming.ConfigurationException;
import org.apache.camel.CamelContext;
import org.apache.camel.Route;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.model.RouteDefinition;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
@ -78,6 +80,21 @@ public class ContextManager implements ApplicationContextAware,
private static ContextManager instance = new ContextManager();
/**
* Endpoint types that are internal only. Mainly used at shutdown time to
* designate routes that shouldn't be shutdown immediately.
*/
private static final Set<String> INTERNAL_ENDPOINT_TYPES;
static {
HashSet<String> set = new HashSet<String>(
ContextDependencyMapping.DEPENDENCY_ENDPOINT_TYPES);
set.add("timer");
set.add("quartz");
set.add("direct");
INTERNAL_ENDPOINT_TYPES = Collections.unmodifiableSet(set);
}
/**
* Service used for start up and shut down threading.
*/
@ -102,12 +119,6 @@ public class ContextManager implements ApplicationContextAware,
*/
private ApplicationContext springCtx = null;
/**
* Endpoint types that are internal only. Mainly used at shutdown time to
* designate routes that shouldn't be shutdown immediately.
*/
private final Set<String> internalEndpointTypes = new HashSet<String>();
/**
* Map of context processors that have been registered for a given context.
* Used to allow contexts to do custom worn on startup/shutdown.
@ -147,15 +158,19 @@ public class ContextManager implements ApplicationContextAware,
}
/**
* Private constructor. Sets up internal types for prioritized stopping of
* routes on shutdown.
* Private constructor.
*/
private ContextManager() {
internalEndpointTypes
.addAll(ContextDependencyMapping.DEPENDENCY_ENDPOINT_TYPES);
internalEndpointTypes.add("timer");
internalEndpointTypes.add("quartz");
internalEndpointTypes.add("direct");
}
/**
* Returns a set of endpoint types that are considered internal for routing
* purposes.
*
* @return
*/
public Set<String> getInternalEndpointTypes() {
return INTERNAL_ENDPOINT_TYPES;
}
/**
@ -242,6 +257,31 @@ public class ContextManager implements ApplicationContextAware,
List<Future<Pair<CamelContext, Boolean>>> callbacks = new LinkedList<Future<Pair<CamelContext, Boolean>>>();
for (final CamelContext context : cxtData.getContexts()) {
/*
* Enforce startup order so that internal endpoints start first
* and shutdown last. Each route must have a unique number under
* 1000. Camel documentation doesn't state if numbers can be
* negative or not. Order is reverse of how they are found in
* the file with internal types going first followed by external
* types.
*/
int externalCount = 999;
int internalCount = externalCount - context.getRoutes().size();
for (Route route : context.getRoutes()) {
String uri = route.getEndpoint().getEndpointUri();
Pair<String, String> typeAndName = ContextData
.getEndpointTypeAndName(uri);
String type = typeAndName.getFirst();
RouteDefinition def = route.getRouteContext().getRoute();
if (INTERNAL_ENDPOINT_TYPES.contains(type)) {
def.setStartupOrder(internalCount--);
} else {
def.setStartupOrder(externalCount--);
}
}
final IContextStateManager stateManager = getStateManager(context);
if (stateManager.isContextStartable(context)) {
/*
@ -410,7 +450,7 @@ public class ContextManager implements ApplicationContextAware,
Pair<String, String> typeAndName = ContextData
.getEndpointTypeAndName(uri);
String type = typeAndName.getFirst();
if (!internalEndpointTypes.contains(type)) {
if (!INTERNAL_ENDPOINT_TYPES.contains(type)) {
try {
statusHandler.info("Stopping route ["
+ route.getId() + "]");
@ -531,7 +571,7 @@ public class ContextManager implements ApplicationContextAware,
}
/**
* Update all camel beans to have autoStartup to false and handles quart
* Update all camel beans to have autoStartup to false and handles quartz
* workaround when JMX is disabled.
*/
@Override
@ -539,8 +579,18 @@ public class ContextManager implements ApplicationContextAware,
ConfigurableListableBeanFactory beanFactory) throws BeansException {
for (CamelContext ctx : beanFactory.getBeansOfType(CamelContext.class)
.values()) {
/*
* set contexts to not auto start to enforce dependency order
* correctly.
*/
ctx.setAutoStartup(false);
/*
* Quartz work around to set management name.
*
* TODO: Remove with camel 2.12.3 upgrade:
* https://issues.apache.org/jira/browse/CAMEL-7132
*/
if ((ctx instanceof DefaultCamelContext)
&& (ctx.getManagementName() == null)) {
((DefaultCamelContext) ctx).setManagementName(ctx.getName());

View file

@ -51,6 +51,10 @@
<to uri="direct-vm:stageNotification"/>
</route>
<!--
This route should come after all other routes in this context
that send data to it for proper startup/shutdown order.
-->
<route id="notificationAggregation">
<from uri="direct-vm:stageNotification"/>
<bean ref="pluginNotifier" method="notify" />

View file

@ -86,7 +86,10 @@
<bean ref="modelsoundingPersistenceManager" method="run"/>
</route>
<!-- Copy of persist route without the log call -->
<!--
Copy of persist route without the log call.
This route must come after the timer route for proper startup/shutdown order.
-->
<route id="modelSoundingPersistIndexAlert">
<from uri="direct-vm:modelSoundingPersistIndexAlert"/>
<bean ref="persist" method="persist"/>