Issue #2726: Autoset route startup order.
Change-Id: I1f4165b6eab12207cf6b6d31964e8170ebe1bddc Former-commit-id:4dc91f494f
[formerly 9b9d2b475f4777febf125e4013cba0d971eaf8e8] Former-commit-id:b148d11ec9
This commit is contained in:
parent
550668cbf7
commit
8ae9cfba55
4 changed files with 101 additions and 42 deletions
|
@ -86,8 +86,35 @@
|
|||
-->
|
||||
|
||||
<!--
|
||||
Text routes
|
||||
Text routes.
|
||||
If an internal route (see ContextManager.INTERNAL_ENDPOINT_TYPES) is being sent data from another internal
|
||||
route in the same context it needs to come after the route that sends it data for proper startup/shutdown order.
|
||||
-->
|
||||
<route id="textUndecodedIngestRoute">
|
||||
<from uri="jms-durable:queue:Ingest.Text?concurrentConsumers=2" />
|
||||
<setHeader headerName="pluginName">
|
||||
<constant>text</constant>
|
||||
</setHeader>
|
||||
<doTry>
|
||||
<pipeline>
|
||||
<bean ref="stringToFile" />
|
||||
<bean ref="textDecoder" method="decodeFile" />
|
||||
<bean ref="processUtil" method="log"/>
|
||||
<multicast>
|
||||
<!-- This route needs to go to Mark's -->
|
||||
<to uri="direct:textToWatchWarn"/>
|
||||
<to uri="direct:textSerializeRoute" />
|
||||
<to uri="direct:textPurgeAccumulate" />
|
||||
<to uri="vm:autoFaxRoute" />
|
||||
</multicast>
|
||||
</pipeline>
|
||||
<doCatch>
|
||||
<exception>java.lang.Throwable</exception>
|
||||
<to uri="log:text?level=INFO"/>
|
||||
</doCatch>
|
||||
</doTry>
|
||||
</route>
|
||||
|
||||
<route id="textDirectDecodedIngestRoute">
|
||||
<from uri="direct-vm:textDirectDecodedIngestRoute" />
|
||||
<setHeader headerName="pluginName">
|
||||
|
@ -134,31 +161,6 @@
|
|||
</doTry>
|
||||
</route>
|
||||
|
||||
<route id="textUndecodedIngestRoute">
|
||||
<from uri="jms-durable:queue:Ingest.Text?concurrentConsumers=2" />
|
||||
<setHeader headerName="pluginName">
|
||||
<constant>text</constant>
|
||||
</setHeader>
|
||||
<doTry>
|
||||
<pipeline>
|
||||
<bean ref="stringToFile" />
|
||||
<bean ref="textDecoder" method="decodeFile" />
|
||||
<bean ref="processUtil" method="log"/>
|
||||
<multicast>
|
||||
<!-- This route needs to go to Mark's -->
|
||||
<to uri="direct:textToWatchWarn"/>
|
||||
<to uri="direct:textSerializeRoute" />
|
||||
<to uri="direct:textPurgeAccumulate" />
|
||||
<to uri="vm:autoFaxRoute" />
|
||||
</multicast>
|
||||
</pipeline>
|
||||
<doCatch>
|
||||
<exception>java.lang.Throwable</exception>
|
||||
<to uri="log:text?level=INFO"/>
|
||||
</doCatch>
|
||||
</doTry>
|
||||
</route>
|
||||
|
||||
<route id="textToWatchWarnRoute">
|
||||
<from uri="direct:textToWatchWarn" />
|
||||
<bean ref="textDecoder" method="transformToProductIds" />
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package com.raytheon.uf.edex.esb.camel.context;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
|
@ -38,6 +39,7 @@ import javax.naming.ConfigurationException;
|
|||
import org.apache.camel.CamelContext;
|
||||
import org.apache.camel.Route;
|
||||
import org.apache.camel.impl.DefaultCamelContext;
|
||||
import org.apache.camel.model.RouteDefinition;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
|
||||
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
|
||||
|
@ -78,6 +80,21 @@ public class ContextManager implements ApplicationContextAware,
|
|||
|
||||
private static ContextManager instance = new ContextManager();
|
||||
|
||||
/**
|
||||
* Endpoint types that are internal only. Mainly used at shutdown time to
|
||||
* designate routes that shouldn't be shutdown immediately.
|
||||
*/
|
||||
private static final Set<String> INTERNAL_ENDPOINT_TYPES;
|
||||
|
||||
static {
|
||||
HashSet<String> set = new HashSet<String>(
|
||||
ContextDependencyMapping.DEPENDENCY_ENDPOINT_TYPES);
|
||||
set.add("timer");
|
||||
set.add("quartz");
|
||||
set.add("direct");
|
||||
INTERNAL_ENDPOINT_TYPES = Collections.unmodifiableSet(set);
|
||||
}
|
||||
|
||||
/**
|
||||
* Service used for start up and shut down threading.
|
||||
*/
|
||||
|
@ -102,12 +119,6 @@ public class ContextManager implements ApplicationContextAware,
|
|||
*/
|
||||
private ApplicationContext springCtx = null;
|
||||
|
||||
/**
|
||||
* Endpoint types that are internal only. Mainly used at shutdown time to
|
||||
* designate routes that shouldn't be shutdown immediately.
|
||||
*/
|
||||
private final Set<String> internalEndpointTypes = new HashSet<String>();
|
||||
|
||||
/**
|
||||
* Map of context processors that have been registered for a given context.
|
||||
* Used to allow contexts to do custom worn on startup/shutdown.
|
||||
|
@ -147,15 +158,19 @@ public class ContextManager implements ApplicationContextAware,
|
|||
}
|
||||
|
||||
/**
|
||||
* Private constructor. Sets up internal types for prioritized stopping of
|
||||
* routes on shutdown.
|
||||
* Private constructor.
|
||||
*/
|
||||
private ContextManager() {
|
||||
internalEndpointTypes
|
||||
.addAll(ContextDependencyMapping.DEPENDENCY_ENDPOINT_TYPES);
|
||||
internalEndpointTypes.add("timer");
|
||||
internalEndpointTypes.add("quartz");
|
||||
internalEndpointTypes.add("direct");
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a set of endpoint types that are considered internal for routing
|
||||
* purposes.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public Set<String> getInternalEndpointTypes() {
|
||||
return INTERNAL_ENDPOINT_TYPES;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -242,6 +257,31 @@ public class ContextManager implements ApplicationContextAware,
|
|||
List<Future<Pair<CamelContext, Boolean>>> callbacks = new LinkedList<Future<Pair<CamelContext, Boolean>>>();
|
||||
|
||||
for (final CamelContext context : cxtData.getContexts()) {
|
||||
/*
|
||||
* Enforce startup order so that internal endpoints start first
|
||||
* and shutdown last. Each route must have a unique number under
|
||||
* 1000. Camel documentation doesn't state if numbers can be
|
||||
* negative or not. Order is reverse of how they are found in
|
||||
* the file with internal types going first followed by external
|
||||
* types.
|
||||
*/
|
||||
int externalCount = 999;
|
||||
int internalCount = externalCount - context.getRoutes().size();
|
||||
|
||||
for (Route route : context.getRoutes()) {
|
||||
String uri = route.getEndpoint().getEndpointUri();
|
||||
Pair<String, String> typeAndName = ContextData
|
||||
.getEndpointTypeAndName(uri);
|
||||
String type = typeAndName.getFirst();
|
||||
RouteDefinition def = route.getRouteContext().getRoute();
|
||||
|
||||
if (INTERNAL_ENDPOINT_TYPES.contains(type)) {
|
||||
def.setStartupOrder(internalCount--);
|
||||
} else {
|
||||
def.setStartupOrder(externalCount--);
|
||||
}
|
||||
}
|
||||
|
||||
final IContextStateManager stateManager = getStateManager(context);
|
||||
if (stateManager.isContextStartable(context)) {
|
||||
/*
|
||||
|
@ -410,7 +450,7 @@ public class ContextManager implements ApplicationContextAware,
|
|||
Pair<String, String> typeAndName = ContextData
|
||||
.getEndpointTypeAndName(uri);
|
||||
String type = typeAndName.getFirst();
|
||||
if (!internalEndpointTypes.contains(type)) {
|
||||
if (!INTERNAL_ENDPOINT_TYPES.contains(type)) {
|
||||
try {
|
||||
statusHandler.info("Stopping route ["
|
||||
+ route.getId() + "]");
|
||||
|
@ -531,7 +571,7 @@ public class ContextManager implements ApplicationContextAware,
|
|||
}
|
||||
|
||||
/**
|
||||
* Update all camel beans to have autoStartup to false and handles quart
|
||||
* Update all camel beans to have autoStartup to false and handles quartz
|
||||
* workaround when JMX is disabled.
|
||||
*/
|
||||
@Override
|
||||
|
@ -539,8 +579,18 @@ public class ContextManager implements ApplicationContextAware,
|
|||
ConfigurableListableBeanFactory beanFactory) throws BeansException {
|
||||
for (CamelContext ctx : beanFactory.getBeansOfType(CamelContext.class)
|
||||
.values()) {
|
||||
/*
|
||||
* set contexts to not auto start to enforce dependency order
|
||||
* correctly.
|
||||
*/
|
||||
ctx.setAutoStartup(false);
|
||||
|
||||
/*
|
||||
* Quartz work around to set management name.
|
||||
*
|
||||
* TODO: Remove with camel 2.12.3 upgrade:
|
||||
* https://issues.apache.org/jira/browse/CAMEL-7132
|
||||
*/
|
||||
if ((ctx instanceof DefaultCamelContext)
|
||||
&& (ctx.getManagementName() == null)) {
|
||||
((DefaultCamelContext) ctx).setManagementName(ctx.getName());
|
||||
|
|
|
@ -51,6 +51,10 @@
|
|||
<to uri="direct-vm:stageNotification"/>
|
||||
</route>
|
||||
|
||||
<!--
|
||||
This route should come after all other routes in this context
|
||||
that send data to it for proper startup/shutdown order.
|
||||
-->
|
||||
<route id="notificationAggregation">
|
||||
<from uri="direct-vm:stageNotification"/>
|
||||
<bean ref="pluginNotifier" method="notify" />
|
||||
|
|
|
@ -86,7 +86,10 @@
|
|||
<bean ref="modelsoundingPersistenceManager" method="run"/>
|
||||
</route>
|
||||
|
||||
<!-- Copy of persist route without the log call -->
|
||||
<!--
|
||||
Copy of persist route without the log call.
|
||||
This route must come after the timer route for proper startup/shutdown order.
|
||||
-->
|
||||
<route id="modelSoundingPersistIndexAlert">
|
||||
<from uri="direct-vm:modelSoundingPersistIndexAlert"/>
|
||||
<bean ref="persist" method="persist"/>
|
||||
|
|
Loading…
Add table
Reference in a new issue