Issue #2327: Manual ingest now routes by plugin

Change-Id: I7627049a4881ecaa572c0f4e86d31092ce79871e

Former-commit-id: 93c51b648c [formerly c64e1ecae3] [formerly 7dd9dc40fd [formerly 311a74bbb65dc1fd8337ba9b004ec977da24a5f8]]
Former-commit-id: 7dd9dc40fd
Former-commit-id: e1573d6012
This commit is contained in:
Richard Peter 2013-09-10 17:00:02 -05:00
parent b1b5569fea
commit 59cc0d8c69
13 changed files with 465 additions and 241 deletions

View file

@ -14,6 +14,11 @@
<constructor-arg value="jms-dist:queue:Ingest.Text?destinationResolver=#qpidDurableResolver" /> <constructor-arg value="jms-dist:queue:Ingest.Text?destinationResolver=#qpidDurableResolver" />
</bean> </bean>
<bean factory-bean="manualProc"
factory-method="registerSecondaryPlugin">
<constructor-arg value="text" />
</bean>
<bean id="textHandleoupDistRegistry" factory-bean="handleoupDistributionSrv" <bean id="textHandleoupDistRegistry" factory-bean="handleoupDistributionSrv"
factory-method="register"> factory-method="register">
<constructor-arg value="text" /> <constructor-arg value="text" />

View file

@ -38,7 +38,7 @@ import java.util.regex.Pattern;
* Date Ticket# Engineer Description * Date Ticket# Engineer Description
* ------------ ---------- ----------- -------------------------- * ------------ ---------- ----------- --------------------------
* Jul 27, 2012 mschenke Initial creation * Jul 27, 2012 mschenke Initial creation
* * Sep 09, 2013 2327 rjpeter Updated to allow pattern to be used again. Added capture for DTG
* </pre> * </pre>
* *
* @author mschenke * @author mschenke
@ -46,10 +46,9 @@ import java.util.regex.Pattern;
*/ */
public class WMOHeaderFinder { public class WMOHeaderFinder {
private static final Pattern WMOPATTERN = Pattern
private static Pattern WMOPATTERN = Pattern .compile("([A-Z]{3}[A-Z0-9](?:\\d{0,2}|[A-Z]{0,2}) [A-Z0-9 ]{4} "
.compile("([A-Z]{3}[A-Z0-9](\\d{0,2}|[A-Z]{0,2}) [A-Z0-9 ]{4} " + "(\\d{6})[^\\r\\n]*)[\\r\\n]*");
+ "\\d{6}[^\\r\\n]*)[\\r\\n]+");
/** /**
* Finds and returns the WMO header on the {@link File} * Finds and returns the WMO header on the {@link File}
@ -99,4 +98,21 @@ public class WMOHeaderFinder {
in.close(); in.close();
} }
} }
/**
* Returns the Date Time Group associated with a WMO Header
*
* @param header
* @return
*/
public static String findDtg(String header) {
String dtg = null;
Matcher matcher = WMOPATTERN.matcher(header);
if (matcher.matches()) {
dtg = matcher.group(2);
}
return dtg;
}
} }

View file

@ -10,3 +10,4 @@ Require-Bundle: com.raytheon.uf.common.serialization,
com.raytheon.uf.common.status;bundle-version="1.12.1112" com.raytheon.uf.common.status;bundle-version="1.12.1112"
Import-Package: org.apache.camel, Import-Package: org.apache.camel,
org.apache.commons.logging org.apache.commons.logging
Export-Package: com.raytheon.uf.edex.distribution

View file

@ -3,6 +3,7 @@
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean id="distributionPatterns" class="com.raytheon.uf.edex.distribution.DistributionPatterns" factory-method="getInstance"/>
<bean id="distributionSrv" class="com.raytheon.uf.edex.distribution.DistributionSrv" /> <bean id="distributionSrv" class="com.raytheon.uf.edex.distribution.DistributionSrv" />
<bean id="handleoupDistributionSrv" class="com.raytheon.uf.edex.distribution.DistributionSrv" /> <bean id="handleoupDistributionSrv" class="com.raytheon.uf.edex.distribution.DistributionSrv" />
<bean id="radarserverDistributionSrv" class="com.raytheon.uf.edex.distribution.DistributionSrv" /> <bean id="radarserverDistributionSrv" class="com.raytheon.uf.edex.distribution.DistributionSrv" />
@ -64,9 +65,7 @@
<route id="refreshDistributionPatterns"> <route id="refreshDistributionPatterns">
<from uri="refreshDistributionCron" /> <from uri="refreshDistributionCron" />
<doTry> <doTry>
<bean ref="distributionSrv" method="refresh" /> <bean ref="distributionPatterns" method="refresh" />
<bean ref="handleoupDistributionSrv" method="refresh" />
<bean ref="radarserverDistributionSrv" method="refresh" />
<doCatch> <doCatch>
<exception>java.lang.Throwable</exception> <exception>java.lang.Throwable</exception>
<to uri="log:refreshDistribution?level=ERROR"/> <to uri="log:refreshDistribution?level=ERROR"/>

View file

@ -0,0 +1,215 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.edex.distribution;
import java.io.File;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import com.raytheon.uf.common.localization.IPathManager;
import com.raytheon.uf.common.localization.LocalizationContext.LocalizationType;
import com.raytheon.uf.common.localization.LocalizationFile;
import com.raytheon.uf.common.localization.PathManagerFactory;
import com.raytheon.uf.common.serialization.SerializationUtil;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
/**
* Container for the various Distribution patterns used by plugins.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Sep 6, 2013 2327 rjpeter Initial creation
*
* </pre>
*
* @author rjpeter
* @version 1.0
*/
public class DistributionPatterns {
private static final IUFStatusHandler statusHandler = UFStatus
.getHandler(DistributionPatterns.class);
private static final DistributionPatterns instance = new DistributionPatterns();
/**
* Used to track file modified time to determine if a pattern set needs to
* be reloaded.
*/
private final ConcurrentMap<String, Long> modifiedTimes = new ConcurrentHashMap<String, Long>();
/**
* Patterns for the various plugins.
*/
private final ConcurrentMap<String, RequestPatterns> patterns = new ConcurrentHashMap<String, RequestPatterns>();
/**
* Returns the singleton instance.
*
* @return
*/
public static DistributionPatterns getInstance() {
return instance;
}
private DistributionPatterns() {
refresh();
}
/**
* Loads patterns from a distribution file for the specified plugin.
*
* @param file
* The file containing the ingest patterns
* @throws DistributionException
* If the modelFile cannot be deserialized
*/
private RequestPatterns loadPatterns(File file)
throws DistributionException {
RequestPatterns patternSet = null;
try {
patternSet = SerializationUtil.jaxbUnmarshalFromXmlFile(
RequestPatterns.class, file.getPath());
} catch (Exception e) {
throw new DistributionException("File " + file.getAbsolutePath()
+ " could not be unmarshalled.", e);
}
patternSet.compilePatterns();
return patternSet;
}
/**
* Lists the files in the distribution directory
*
* @return An array of the files in the distribution directory
*/
private Collection<File> getDistributionFiles() {
IPathManager pathMgr = PathManagerFactory.getPathManager();
LocalizationFile[] files = pathMgr.listFiles(
pathMgr.getLocalSearchHierarchy(LocalizationType.EDEX_STATIC),
"distribution", new String[] { ".xml" }, true, true);
Map<String, File> distFiles = new HashMap<String, File>();
for (LocalizationFile file : files) {
if (distFiles.containsKey(file.getName()) == false) {
distFiles.put(file.getName(), file.getFile());
}
}
return distFiles.values();
}
/**
* Refreshes the distribution patterns if a plugin's distribution pattern
* file has been modified. This method is executed via a quartz timer every
* five seconds
*/
public void refresh() {
for (File file : getDistributionFiles()) {
String fileName = file.getName();
Long modTime = modifiedTimes.get(fileName);
if ((modTime == null)
|| (modTime.longValue() != file.lastModified())) {
// getDistributionFiles only returns files ending in .xml
int index = fileName.lastIndexOf(".");
String plugin = null;
if (index > 0) {
plugin = fileName.substring(0, index);
} else {
plugin = fileName;
}
try {
if (patterns.containsKey(plugin)) {
statusHandler
.info("Change to distribution file detected. "
+ fileName
+ " has been modified. Reloading distribution patterns");
}
patterns.put(plugin, loadPatterns(file));
modifiedTimes.put(fileName, file.lastModified());
} catch (DistributionException e) {
statusHandler.error(
"Error reloading distribution patterns from file: "
+ fileName, e);
}
}
}
}
/**
* Returns a list of plugins that are interested in the given header.
*
* @param header
* @return
*/
public List<String> getMatchingPlugins(String header) {
List<String> plugins = new LinkedList<String>();
for (Map.Entry<String, RequestPatterns> entry : patterns.entrySet()) {
if (entry.getValue().isDesiredHeader(header)) {
plugins.add(entry.getKey());
}
}
return plugins;
}
/**
* Returns a list of plugins that are interested in the given header.
*
* @param header
* @param pluginsToCheck
* @return
*/
public List<String> getMatchingPlugins(String header,
Collection<String> pluginsToCheck) {
List<String> plugins = new LinkedList<String>();
for (String plugin : pluginsToCheck) {
RequestPatterns pattern = patterns.get(plugin);
if ((pattern != null) && pattern.isDesiredHeader(header)) {
plugins.add(plugin);
}
}
return plugins;
}
/**
* Returns true if there are patterns registered for the given plugin, false
* otherwise.
*
* @param pluginName
* @return
*/
public boolean hasPatternsForPlugin(String pluginName) {
return patterns.containsKey(pluginName);
}
}

View file

@ -20,11 +20,8 @@
package com.raytheon.uf.edex.distribution; package com.raytheon.uf.edex.distribution;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -34,16 +31,6 @@ import org.apache.camel.RecipientList;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import com.raytheon.uf.common.localization.IPathManager;
import com.raytheon.uf.common.localization.LocalizationContext;
import com.raytheon.uf.common.localization.LocalizationContext.LocalizationType;
import com.raytheon.uf.common.localization.LocalizationFile;
import com.raytheon.uf.common.localization.PathManagerFactory;
import com.raytheon.uf.common.serialization.SerializationUtil;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
/** /**
* The purpose of this bean is to load a series of XML files from localization * The purpose of this bean is to load a series of XML files from localization
* for each plugin registering itself with this bean and route messages based on * for each plugin registering itself with this bean and route messages based on
@ -62,101 +49,20 @@ import com.raytheon.uf.common.status.UFStatus.Priority;
* when no distribution files present * when no distribution files present
* Mar 19, 2013 1794 djohnson PatternWrapper is immutable, add toString() to it for debugging. * Mar 19, 2013 1794 djohnson PatternWrapper is immutable, add toString() to it for debugging.
* Aug 19, 2013 2257 bkowal edexBridge to qpid 0.18 upgrade * Aug 19, 2013 2257 bkowal edexBridge to qpid 0.18 upgrade
* * Sep 06, 2013 2327 rjpeter Updated to use DistributionPatterns.
* </pre> * </pre>
* *
* @author brockwoo * @author brockwoo
* @version 1.0 * @version 1.0
*/ */
public class DistributionSrv { public class DistributionSrv {
private static final IUFStatusHandler statusHandler = UFStatus
.getHandler(DistributionSrv.class);
private static final String HEADER_QPID_SUBJECT = "qpid.subject"; private static final String HEADER_QPID_SUBJECT = "qpid.subject";
private static class PatternWrapper { protected Log logger = LogFactory.getLog("Ingest");
private final String plugin;
private final RequestPatterns patterns; protected Log routeFailedLogger = LogFactory.getLog("RouteFailedLog");
private final String route; private final ConcurrentMap<String, String> pluginRoutes = new ConcurrentHashMap<String, String>();
private final String displayString;
private PatternWrapper(String plugin, String route,
RequestPatterns patterns) {
this.plugin = plugin;
this.route = route;
this.patterns = patterns;
this.displayString = createDisplayString();
}
private String createDisplayString() {
StringBuilder sb = new StringBuilder();
sb.append("plugin=").append(plugin).append(", ");
sb.append("route=").append(route).append(", ");
sb.append("patterns=").append(patterns);
return sb.toString();
}
@Override
public String toString() {
return displayString;
}
}
protected transient Log logger = LogFactory.getLog("Ingest");
protected transient Log routeFailedLogger = LogFactory
.getLog("RouteFailedLog");
private final List<PatternWrapper> pluginPatterns = new ArrayList<PatternWrapper>(
100);
private final ConcurrentMap<String, PatternWrapper> patternMap = new ConcurrentHashMap<String, PatternWrapper>();
private final ConcurrentMap<String, Long> modifiedTimes = new ConcurrentHashMap<String, Long>();
public DistributionSrv() {
for (File file : getDistributionFiles()) {
modifiedTimes.put(file.getName(), file.lastModified());
}
}
/**
* Refreshes the distribution patterns if a plugin's distribution pattern
* file has been modified. This method is executed via a quartz timer every
* five seconds
*/
public synchronized void refresh() {
for (File file : getDistributionFiles()) {
if (!file.getName().endsWith("~")
&& modifiedTimes.containsKey(file.getName())
&& (modifiedTimes.get(file.getName()) < file.lastModified())) {
String plugin = file.getName().replace(".xml", "");
PatternWrapper wrapper = patternMap.get(plugin);
if (wrapper != null) {
try {
statusHandler
.handle(Priority.EVENTA,
"Change to distribution file detected. "
+ file.getName()
+ " has been modified. Reloading distribution patterns");
wrapper = new PatternWrapper(wrapper.plugin,
wrapper.route, loadPatterns(file, plugin));
patternMap.put(plugin, wrapper);
modifiedTimes.put(file.getName(), file.lastModified());
} catch (DistributionException e) {
statusHandler.handle(Priority.PROBLEM,
"Error reloading distribution patterns from file: "
+ file.getName(), e);
}
}
}
}
}
/** /**
* Allows a plugin to register itself with this bean. Note: if the plugin * Allows a plugin to register itself with this bean. Note: if the plugin
@ -167,49 +73,18 @@ public class DistributionSrv {
* @param destination * @param destination
* a destination to send this message to * a destination to send this message to
* @return an instance of this bean * @return an instance of this bean
* @throws EdexException * @throws DistributionException
*/ */
public DistributionSrv register(String pluginName, String destination) public DistributionSrv register(String pluginName, String destination)
throws DistributionException { throws DistributionException {
IPathManager pathMgr = PathManagerFactory.getPathManager(); if (!DistributionPatterns.getInstance()
LocalizationContext commonStaticBase = pathMgr.getContext( .hasPatternsForPlugin(pluginName)) {
LocalizationContext.LocalizationType.EDEX_STATIC,
LocalizationContext.LocalizationLevel.BASE);
LocalizationContext siteStaticBase = pathMgr.getContext(
LocalizationContext.LocalizationType.EDEX_STATIC,
LocalizationContext.LocalizationLevel.SITE);
String path = "";
String sitePath = "";
try {
path = pathMgr.getFile(commonStaticBase,
"distribution" + File.separator + pluginName + ".xml")
.getCanonicalPath();
sitePath = pathMgr.getFile(siteStaticBase,
"distribution" + File.separator + pluginName + ".xml")
.getCanonicalPath();
} catch (IOException e) {
throw new DistributionException( throw new DistributionException(
"Plugin " "Plugin "
+ pluginName + pluginName
+ " does not have an accompanying patterns file in localization."); + " does not have an accompanying patterns file in localization.");
} }
pluginRoutes.put(pluginName, destination);
File modelFile = new File(path);
File siteModelFile = new File(sitePath);
RequestPatterns patterns = null;
if (siteModelFile.exists()) {
patterns = loadPatterns(siteModelFile, pluginName);
} else if (modelFile.exists()) {
patterns = loadPatterns(modelFile, pluginName);
} else {
patterns = new RequestPatterns();
}
PatternWrapper wrapper = new PatternWrapper(pluginName, destination,
patterns);
patternMap.put(wrapper.plugin, wrapper);
pluginPatterns.add(wrapper);
return this; return this;
} }
@ -223,8 +98,6 @@ public class DistributionSrv {
*/ */
@RecipientList @RecipientList
public List<String> route(Exchange exchange) { public List<String> route(Exchange exchange) {
StringBuilder pluginNames = new StringBuilder();
List<String> dest = new ArrayList<String>();
Message in = exchange.getIn(); Message in = exchange.getIn();
// determine if the header is in the qpid subject field? // determine if the header is in the qpid subject field?
String header = (String) in.getHeader(HEADER_QPID_SUBJECT); String header = (String) in.getHeader(HEADER_QPID_SUBJECT);
@ -254,14 +127,22 @@ public class DistributionSrv {
// No header entry so will try and use the filename instead // No header entry so will try and use the filename instead
header = (String) exchange.getIn().getBody(); header = (String) exchange.getIn().getBody();
} }
for (PatternWrapper wrapper : pluginPatterns) {
if (wrapper.patterns.isDesiredHeader(header)) { List<String> plugins = DistributionPatterns.getInstance()
.getMatchingPlugins(header, pluginRoutes.keySet());
List<String> routes = new ArrayList<String>(plugins.size());
StringBuilder pluginNames = new StringBuilder(plugins.size() * 8);
for (String plugin : plugins) {
String route = pluginRoutes.get(plugin);
if (route != null) {
if (pluginNames.length() != 0) { if (pluginNames.length() != 0) {
pluginNames.append(","); pluginNames.append(",");
} }
pluginNames.append(wrapper.plugin); pluginNames.append(plugin);
dest.add(wrapper.route); routes.add(route);
unroutedFlag = false; unroutedFlag = false;
} else if (logger.isDebugEnabled()) {
logger.debug("No route registered for plugin: " + plugin);
} }
} }
@ -270,53 +151,8 @@ public class DistributionSrv {
// using warn instead of error; app can continue // using warn instead of error; app can continue
routeFailedLogger.warn(header); routeFailedLogger.warn(header);
} }
in.setHeader("pluginName", pluginNames.toString()); in.setHeader("pluginName", pluginNames.toString());
return dest; return routes;
}
/**
* Loads patterns from a distribution file for the specified plugin
*
* @param modelFile
* The file containing the ingest patterns
* @param pluginName
* The plugin name associated with the ingest patterns
* @throws DistributionException
* If the modelFile cannot be deserialized
*/
private RequestPatterns loadPatterns(File modelFile, String pluginName)
throws DistributionException {
RequestPatterns patternSet = null;
try {
patternSet = SerializationUtil.jaxbUnmarshalFromXmlFile(
RequestPatterns.class, modelFile.getPath());
} catch (Exception e) {
throw new DistributionException("File "
+ modelFile.getAbsolutePath()
+ " could not be unmarshalled.", e);
}
patternSet.compilePatterns();
return patternSet;
}
/**
* Lists the files in the distribution directory
*
* @return An array of the files in the distribution directory
*/
private File[] getDistributionFiles() {
IPathManager pathMgr = PathManagerFactory.getPathManager();
LocalizationFile[] files = pathMgr.listFiles(
pathMgr.getLocalSearchHierarchy(LocalizationType.EDEX_STATIC),
"distribution", null, true, true);
Map<String, File> distFiles = new HashMap<String, File>();
for (LocalizationFile file : files) {
if (distFiles.containsKey(file.getName()) == false) {
distFiles.put(file.getName(), file.getFile());
}
}
return distFiles.values().toArray(new File[0]);
} }
} }

View file

@ -51,7 +51,7 @@ import com.raytheon.uf.common.serialization.ISerializableObject;
* May 16, 2011 7317 cjeanbap Added try-catch statement * May 16, 2011 7317 cjeanbap Added try-catch statement
* for PatternSyntaxException. * for PatternSyntaxException.
* Mar 19, 2013 1794 djohnson Add toString() for debugging. * Mar 19, 2013 1794 djohnson Add toString() for debugging.
* * Sep 10, 2013 2327 rjpeter Sized ArrayList declarations.
* </pre> * </pre>
* *
* @author brockwoo * @author brockwoo
@ -60,22 +60,22 @@ import com.raytheon.uf.common.serialization.ISerializableObject;
@XmlRootElement(name = "requestPatterns") @XmlRootElement(name = "requestPatterns")
@XmlAccessorType(XmlAccessType.NONE) @XmlAccessorType(XmlAccessType.NONE)
public class RequestPatterns implements ISerializableObject{ public class RequestPatterns implements ISerializableObject {
/** /**
* List of patterns requested by a plugin. * List of patterns requested by a plugin.
*/ */
@XmlElements( { @XmlElement(name = "regex", type = String.class) }) @XmlElements({ @XmlElement(name = "regex", type = String.class) })
private List<String> patterns = new ArrayList<String>(); private List<String> patterns = new ArrayList<String>(0);
private final List<Pattern> compiledPatterns = new ArrayList<Pattern>(); private List<Pattern> compiledPatterns = new ArrayList<Pattern>(0);
protected transient Log patternFailedLogger = LogFactory.getLog("PatternFailedLog"); protected Log patternFailedLogger = LogFactory.getLog("PatternFailedLog");
/** /**
* Creates a new instance of the container. * Creates a new instance of the container.
*/ */
public RequestPatterns(){ public RequestPatterns() {
} }
/** /**
@ -90,7 +90,8 @@ public class RequestPatterns implements ISerializableObject{
/** /**
* Sets the list of regex strings for this container. * Sets the list of regex strings for this container.
* *
* @param patterns an arraylist of regex strings * @param patterns
* an arraylist of regex strings
*/ */
public void setPatterns(List<String> patterns) { public void setPatterns(List<String> patterns) {
this.patterns = patterns; this.patterns = patterns;
@ -99,7 +100,8 @@ public class RequestPatterns implements ISerializableObject{
/** /**
* Inserts a single string into the list. * Inserts a single string into the list.
* *
* @param pattern The regex string to insert * @param pattern
* The regex string to insert
*/ */
public void setPattern(String pattern) { public void setPattern(String pattern) {
this.patterns.add(pattern); this.patterns.add(pattern);
@ -109,8 +111,9 @@ public class RequestPatterns implements ISerializableObject{
* Will compile the strings into Pattern objects. * Will compile the strings into Pattern objects.
* *
*/ */
public void compilePatterns(){ public void compilePatterns() {
for(String pattern : patterns) { compiledPatterns = new ArrayList<Pattern>(patterns.size());
for (String pattern : patterns) {
try { try {
compiledPatterns.add(Pattern.compile(pattern)); compiledPatterns.add(Pattern.compile(pattern));
} catch (PatternSyntaxException e) { } catch (PatternSyntaxException e) {
@ -123,17 +126,17 @@ public class RequestPatterns implements ISerializableObject{
} }
/** /**
* Takes a string and compares against the patterns in this * Takes a string and compares against the patterns in this container. The
* container. The first one that matches breaks the search and * first one that matches breaks the search and returns true.
* returns true.
* *
* @param header The string to search for * @param header
* The string to search for
* @return a boolean indicating success * @return a boolean indicating success
*/ */
public boolean isDesiredHeader(String header) { public boolean isDesiredHeader(String header) {
boolean isFound = false; boolean isFound = false;
for(Pattern headerPattern : compiledPatterns) { for (Pattern headerPattern : compiledPatterns) {
if(headerPattern.matcher(header).find()) { if (headerPattern.matcher(header).find()) {
isFound = true; isFound = true;
break; break;
} }

View file

@ -18,6 +18,11 @@
<constructor-arg value="jms-dist:queue:Ingest.dpa?destinationResolver=#qpidDurableResolver"/> <constructor-arg value="jms-dist:queue:Ingest.dpa?destinationResolver=#qpidDurableResolver"/>
</bean> </bean>
<bean factory-bean="manualProc"
factory-method="registerSecondaryPlugin">
<constructor-arg value="dpa" />
</bean>
<camelContext id="dpa-camel" <camelContext id="dpa-camel"
xmlns="http://camel.apache.org/schema/spring" xmlns="http://camel.apache.org/schema/spring"
errorHandlerRef="errorHandler"> errorHandlerRef="errorHandler">

View file

@ -18,6 +18,11 @@
<constructor-arg value="jms-dist:queue:Ingest.dhr?destinationResolver=#qpidDurableResolver" /> <constructor-arg value="jms-dist:queue:Ingest.dhr?destinationResolver=#qpidDurableResolver" />
</bean> </bean>
<bean factory-bean="manualProc"
factory-method="registerSecondaryPlugin">
<constructor-arg value="dhr" />
</bean>
<camelContext id="nonClusteredDHRroutes" xmlns="http://camel.apache.org/schema/spring" <camelContext id="nonClusteredDHRroutes" xmlns="http://camel.apache.org/schema/spring"
errorHandlerRef="errorHandler"> errorHandlerRef="errorHandler">
<!-- Begin non-clustered dhr Routes --> <!-- Begin non-clustered dhr Routes -->

View file

@ -8,5 +8,8 @@ Bundle-RequiredExecutionEnvironment: JavaSE-1.6
Require-Bundle: org.apache.camel;bundle-version="1.0.0", Require-Bundle: org.apache.camel;bundle-version="1.0.0",
org.springframework;bundle-version="2.5.6", org.springframework;bundle-version="2.5.6",
com.raytheon.edex.common;bundle-version="1.11.17", com.raytheon.edex.common;bundle-version="1.11.17",
com.raytheon.uf.common.status com.raytheon.uf.common.status,
com.raytheon.uf.edex.decodertools,
com.raytheon.uf.edex.distribution,
org.apache.commons.io
Import-Package: com.raytheon.uf.edex.site.ingest Import-Package: com.raytheon.uf.edex.site.ingest

View file

@ -21,18 +21,27 @@ package com.raytheon.uf.edex.plugin.manualIngest;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TimeZone;
import org.apache.camel.Exchange; import org.apache.camel.Exchange;
import org.apache.camel.Processor; import org.apache.camel.Processor;
import org.springframework.util.FileCopyUtils; import org.apache.commons.io.FileUtils;
import com.raytheon.uf.common.status.IUFStatusHandler; import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus; import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority; import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.time.SimulatedTime;
import com.raytheon.uf.common.util.header.WMOHeaderFinder; import com.raytheon.uf.common.util.header.WMOHeaderFinder;
import com.raytheon.uf.edex.core.EDEXUtil; import com.raytheon.uf.edex.core.EDEXUtil;
import com.raytheon.uf.edex.core.EdexException;
import com.raytheon.uf.edex.core.props.PropertiesFactory; import com.raytheon.uf.edex.core.props.PropertiesFactory;
import com.raytheon.uf.edex.decodertools.time.TimeTools;
import com.raytheon.uf.edex.distribution.DistributionPatterns;
/** /**
* A bean based on FileToString that will take a message generated from a file * A bean based on FileToString that will take a message generated from a file
@ -46,7 +55,7 @@ import com.raytheon.uf.edex.core.props.PropertiesFactory;
* Date Ticket# Engineer Description * Date Ticket# Engineer Description
* ------------ ---------- ----------- -------------------------- * ------------ ---------- ----------- --------------------------
* Oct 28, 2009 brockwoo Initial creation * Oct 28, 2009 brockwoo Initial creation
* * Sep 03, 2013 2327 rjpeter Added directory routing by plugin and date of product.
* </pre> * </pre>
* *
* @author brockwoo * @author brockwoo
@ -65,6 +74,29 @@ public class MessageGenerator implements Processor {
private String ingestRoute = null; private String ingestRoute = null;
private final ThreadLocal<SimpleDateFormat> sdfs = new ThreadLocal<SimpleDateFormat>() {
/*
* (non-Javadoc)
*
* @see java.lang.ThreadLocal#initialValue()
*/
@Override
protected SimpleDateFormat initialValue() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd"
+ File.separatorChar + "HH");
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
return sdf;
}
};
/**
* Set of plugins that are not the primary decoder of the data. These are
* secondary or additional information such as text, dhr, dpa, etc.
*/
private final Set<String> secondaryPlugins = new HashSet<String>();
public static MessageGenerator getInstance() { public static MessageGenerator getInstance() {
return instance; return instance;
} }
@ -77,6 +109,19 @@ public class MessageGenerator implements Processor {
this.ingestRoute = ingestRoute; this.ingestRoute = ingestRoute;
} }
/**
* Register a secondary plugin, i.e. not the primary decoder of the data.
* These are plugins that provide data in a different format oradditional
* information such as text, dhr, dpa, etc.
*
* @param plugin
* @return
*/
public MessageGenerator registerSecondaryPlugin(String plugin) {
secondaryPlugins.add(plugin);
return this;
}
/* /*
* (non-Javadoc) * (non-Javadoc)
* *
@ -86,14 +131,12 @@ public class MessageGenerator implements Processor {
public void process(Exchange arg0) throws Exception { public void process(Exchange arg0) throws Exception {
File file = (File) arg0.getIn().getBody(); File file = (File) arg0.getIn().getBody();
if (file != null) { if (file != null) {
String fileName = file.getName();
String messageHeader = WMOHeaderFinder.find(file); String messageHeader = WMOHeaderFinder.find(file);
if (messageHeader == null) { if (messageHeader == null) {
messageHeader = fileName; messageHeader = file.getName();
} else { } else {
messageHeader = messageHeader.trim(); messageHeader = messageHeader.trim();
} }
arg0.getIn().setBody(file.toString()); arg0.getIn().setBody(file.toString());
arg0.getIn().setHeader("header", messageHeader); arg0.getIn().setHeader("header", messageHeader);
arg0.getIn().setHeader("enqueueTime", System.currentTimeMillis()); arg0.getIn().setHeader("enqueueTime", System.currentTimeMillis());
@ -103,21 +146,87 @@ public class MessageGenerator implements Processor {
} }
} }
public File copyFileToArchive(File inFile) { /**
String path = DIR + File.separator; * Copies the specified file to the archive directory.
*
* @param inFile
* @return
* @throws IOException
*/
public File copyFileToArchive(File inFile) throws IOException {
StringBuilder path = new StringBuilder(inFile.getPath().length());
path.append(DIR).append(File.separatorChar);
// find header and determine file date
Date fileTime = null;
String header = WMOHeaderFinder.find(inFile);
if (header == null) {
header = inFile.getName();
} else {
header = header.trim();
try {
String dtg = WMOHeaderFinder.findDtg(header);
Calendar headerTime = TimeTools.findCurrentTime(dtg,
inFile.getName());
if (headerTime != null) {
fileTime = headerTime.getTime();
}
} catch (Exception e) {
statusHandler.error("Exception occurred parsing WMO Header", e);
}
}
// determine the plugin
List<String> plugins = DistributionPatterns.getInstance()
.getMatchingPlugins(header);
int numPlugins = plugins.size();
if (numPlugins == 1) {
path.append(plugins.get(0)).append(File.separatorChar);
} else if (numPlugins > 1) {
if (plugins.size() <= secondaryPlugins.size()) {
// check for a non secondary plugin,
String plugin = null;
for (String pluginToCheck : plugins) {
if (!secondaryPlugins.contains(pluginToCheck)) {
plugin = pluginToCheck;
break;
}
}
if (plugin == null) {
// didn't find a non secondary plugin, just grab first
// plugin
plugin = plugins.get(0);
}
path.append(plugin).append(File.separatorChar);
} else {
// remove secondary and grab first one
plugins.removeAll(secondaryPlugins);
path.append(plugins.get(0)).append(File.separatorChar);
}
} else {
path.append("unknown").append(File.separatorChar);
}
// append YYYYMMDD/HH
if (fileTime == null) {
// default to current time
fileTime = SimulatedTime.getSystemTime().getTime();
}
path.append(sdfs.get().format(fileTime)).append(File.separatorChar);
// Determine the sub-directory // Determine the sub-directory
String inputPath = inFile.getParent(); String inputPath = inFile.getParent();
// Split on the manual directory to get the sub-directory // Split on the manual directory to get the sub-directory
String[] parts = inputPath.split("manual"); String[] parts = inputPath.split("manual");
File dir = null;
if (parts.length > 1) { if (parts.length > 1) {
dir = new File(path + parts[1]); path.append(parts[1]);
} else {
dir = new File(path);
} }
File dir = new File(path.toString());
if (!dir.exists()) { if (!dir.exists()) {
dir.mkdirs(); dir.mkdirs();
} }
@ -125,7 +234,7 @@ public class MessageGenerator implements Processor {
File newFile = new File(dir, inFile.getName()); File newFile = new File(dir, inFile.getName());
try { try {
FileCopyUtils.copy(inFile, newFile); FileUtils.copyFile(inFile, newFile);
statusHandler.handle(Priority.INFO, statusHandler.handle(Priority.INFO,
"DataManual: " + inFile.getAbsolutePath()); "DataManual: " + inFile.getAbsolutePath());
} catch (IOException e) { } catch (IOException e) {
@ -137,7 +246,14 @@ public class MessageGenerator implements Processor {
return newFile; return newFile;
} }
public File moveFileToArchive(File inFile) { /**
* Moves the specified file to the archive directory.
*
* @param inFile
* @return
* @throws IOException
*/
public File moveFileToArchive(File inFile) throws IOException {
File newFile = copyFileToArchive(inFile); File newFile = copyFileToArchive(inFile);
if (newFile != null) { if (newFile != null) {
inFile.delete(); inFile.delete();
@ -145,10 +261,25 @@ public class MessageGenerator implements Processor {
return newFile; return newFile;
} }
/**
* Copies a file to the archive directory and sends the path to the manual
* ingest route.
*
* @param inFile
* @return
*/
public boolean sendFileToIngest(String inFile) { public boolean sendFileToIngest(String inFile) {
return sendFileToIngest(inFile, ingestRoute); return sendFileToIngest(inFile, ingestRoute);
} }
/**
* Copies a file to the archive directory and sends the path to the
* specified route.
*
* @param inFile
* @param route
* @return
*/
public boolean sendFileToIngest(String inFile, String route) { public boolean sendFileToIngest(String inFile, String route) {
boolean rval = true; boolean rval = true;
@ -156,7 +287,7 @@ public class MessageGenerator implements Processor {
File archiveFile = copyFileToArchive(new File(inFile)); File archiveFile = copyFileToArchive(new File(inFile));
EDEXUtil.getMessageProducer().sendAsync(route, EDEXUtil.getMessageProducer().sendAsync(route,
archiveFile.getAbsolutePath()); archiveFile.getAbsolutePath());
} catch (EdexException e) { } catch (Exception e) {
rval = false; rval = false;
statusHandler.handle(Priority.ERROR, "Failed to insert file [" statusHandler.handle(Priority.ERROR, "Failed to insert file ["
+ inFile + "] into ingest stream", e); + inFile + "] into ingest stream", e);

View file

@ -16,6 +16,11 @@
<constructor-arg value="jms-dist:queue:ingest.nctext" /> <constructor-arg value="jms-dist:queue:ingest.nctext" />
</bean> </bean>
<bean factory-bean="manualProc"
factory-method="registerSecondaryPlugin">
<constructor-arg value="nctext" />
</bean>
<bean id="nctextCamelRegistered" factory-bean="contextManager" <bean id="nctextCamelRegistered" factory-bean="contextManager"
factory-method="register" depends-on="persistCamelRegistered"> factory-method="register" depends-on="persistCamelRegistered">
<constructor-arg ref="nctext-camel"/> <constructor-arg ref="nctext-camel"/>

View file

@ -18,7 +18,7 @@
<!-- class=" com.raytheon.uf.edex.distribution.DistributionSrv" --> <!-- class=" com.raytheon.uf.edex.distribution.DistributionSrv" -->
<bean id="stormTrackDistRegistry" factory-bean="distributionSrv" <bean id="stormTrackDistRegistry" factory-bean="distributionSrv"
factory-method="register"> factory-method="register">
<constructor-arg value="stormTrack" /> <constructor-arg value="stormtrack" />
<!-- <constructor-arg value="jms-generic:queue:Ingest.AlphaNumeric" /> --> <!-- <constructor-arg value="jms-generic:queue:Ingest.AlphaNumeric" /> -->
<constructor-arg value="jms-dist:queue:Ingest.stormTrack" /> <constructor-arg value="jms-dist:queue:Ingest.stormTrack" />
</bean> </bean>