Issue #2327: Manual ingest now routes by plugin

Change-Id: I7627049a4881ecaa572c0f4e86d31092ce79871e

Former-commit-id: 311a74bbb65dc1fd8337ba9b004ec977da24a5f8
This commit is contained in:
Richard Peter 2013-09-10 17:00:02 -05:00
parent 604ff6a0dd
commit c64e1ecae3
13 changed files with 465 additions and 241 deletions

View file

@ -14,6 +14,11 @@
<constructor-arg value="jms-dist:queue:Ingest.Text?destinationResolver=#qpidDurableResolver" />
</bean>
<bean factory-bean="manualProc"
factory-method="registerSecondaryPlugin">
<constructor-arg value="text" />
</bean>
<bean id="textHandleoupDistRegistry" factory-bean="handleoupDistributionSrv"
factory-method="register">
<constructor-arg value="text" />

View file

@ -37,8 +37,8 @@ import java.util.regex.Pattern;
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Jul 27, 2012 mschenke Initial creation
*
* Jul 27, 2012 mschenke Initial creation
* Sep 09, 2013 2327 rjpeter Updated to allow pattern to be used again. Added capture for DTG
* </pre>
*
* @author mschenke
@ -46,10 +46,9 @@ import java.util.regex.Pattern;
*/
public class WMOHeaderFinder {
private static Pattern WMOPATTERN = Pattern
.compile("([A-Z]{3}[A-Z0-9](\\d{0,2}|[A-Z]{0,2}) [A-Z0-9 ]{4} "
+ "\\d{6}[^\\r\\n]*)[\\r\\n]+");
private static final Pattern WMOPATTERN = Pattern
.compile("([A-Z]{3}[A-Z0-9](?:\\d{0,2}|[A-Z]{0,2}) [A-Z0-9 ]{4} "
+ "(\\d{6})[^\\r\\n]*)[\\r\\n]*");
/**
* Finds and returns the WMO header on the {@link File}
@ -99,4 +98,21 @@ public class WMOHeaderFinder {
in.close();
}
}
/**
* Returns the Date Time Group associated with a WMO Header
*
* @param header
* @return
*/
public static String findDtg(String header) {
String dtg = null;
Matcher matcher = WMOPATTERN.matcher(header);
if (matcher.matches()) {
dtg = matcher.group(2);
}
return dtg;
}
}

View file

@ -10,3 +10,4 @@ Require-Bundle: com.raytheon.uf.common.serialization,
com.raytheon.uf.common.status;bundle-version="1.12.1112"
Import-Package: org.apache.camel,
org.apache.commons.logging
Export-Package: com.raytheon.uf.edex.distribution

View file

@ -3,6 +3,7 @@
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean id="distributionPatterns" class="com.raytheon.uf.edex.distribution.DistributionPatterns" factory-method="getInstance"/>
<bean id="distributionSrv" class="com.raytheon.uf.edex.distribution.DistributionSrv" />
<bean id="handleoupDistributionSrv" class="com.raytheon.uf.edex.distribution.DistributionSrv" />
<bean id="radarserverDistributionSrv" class="com.raytheon.uf.edex.distribution.DistributionSrv" />
@ -64,9 +65,7 @@
<route id="refreshDistributionPatterns">
<from uri="refreshDistributionCron" />
<doTry>
<bean ref="distributionSrv" method="refresh" />
<bean ref="handleoupDistributionSrv" method="refresh" />
<bean ref="radarserverDistributionSrv" method="refresh" />
<bean ref="distributionPatterns" method="refresh" />
<doCatch>
<exception>java.lang.Throwable</exception>
<to uri="log:refreshDistribution?level=ERROR"/>

View file

@ -0,0 +1,215 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.edex.distribution;
import java.io.File;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import com.raytheon.uf.common.localization.IPathManager;
import com.raytheon.uf.common.localization.LocalizationContext.LocalizationType;
import com.raytheon.uf.common.localization.LocalizationFile;
import com.raytheon.uf.common.localization.PathManagerFactory;
import com.raytheon.uf.common.serialization.SerializationUtil;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
/**
* Container for the various Distribution patterns used by plugins.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Sep 6, 2013 2327 rjpeter Initial creation
*
* </pre>
*
* @author rjpeter
* @version 1.0
*/
public class DistributionPatterns {
private static final IUFStatusHandler statusHandler = UFStatus
.getHandler(DistributionPatterns.class);
private static final DistributionPatterns instance = new DistributionPatterns();
/**
* Used to track file modified time to determine if a pattern set needs to
* be reloaded.
*/
private final ConcurrentMap<String, Long> modifiedTimes = new ConcurrentHashMap<String, Long>();
/**
* Patterns for the various plugins.
*/
private final ConcurrentMap<String, RequestPatterns> patterns = new ConcurrentHashMap<String, RequestPatterns>();
/**
* Returns the singleton instance.
*
* @return
*/
public static DistributionPatterns getInstance() {
return instance;
}
private DistributionPatterns() {
refresh();
}
/**
* Loads patterns from a distribution file for the specified plugin.
*
* @param file
* The file containing the ingest patterns
* @throws DistributionException
* If the modelFile cannot be deserialized
*/
private RequestPatterns loadPatterns(File file)
throws DistributionException {
RequestPatterns patternSet = null;
try {
patternSet = SerializationUtil.jaxbUnmarshalFromXmlFile(
RequestPatterns.class, file.getPath());
} catch (Exception e) {
throw new DistributionException("File " + file.getAbsolutePath()
+ " could not be unmarshalled.", e);
}
patternSet.compilePatterns();
return patternSet;
}
/**
* Lists the files in the distribution directory
*
* @return An array of the files in the distribution directory
*/
private Collection<File> getDistributionFiles() {
IPathManager pathMgr = PathManagerFactory.getPathManager();
LocalizationFile[] files = pathMgr.listFiles(
pathMgr.getLocalSearchHierarchy(LocalizationType.EDEX_STATIC),
"distribution", new String[] { ".xml" }, true, true);
Map<String, File> distFiles = new HashMap<String, File>();
for (LocalizationFile file : files) {
if (distFiles.containsKey(file.getName()) == false) {
distFiles.put(file.getName(), file.getFile());
}
}
return distFiles.values();
}
/**
* Refreshes the distribution patterns if a plugin's distribution pattern
* file has been modified. This method is executed via a quartz timer every
* five seconds
*/
public void refresh() {
for (File file : getDistributionFiles()) {
String fileName = file.getName();
Long modTime = modifiedTimes.get(fileName);
if ((modTime == null)
|| (modTime.longValue() != file.lastModified())) {
// getDistributionFiles only returns files ending in .xml
int index = fileName.lastIndexOf(".");
String plugin = null;
if (index > 0) {
plugin = fileName.substring(0, index);
} else {
plugin = fileName;
}
try {
if (patterns.containsKey(plugin)) {
statusHandler
.info("Change to distribution file detected. "
+ fileName
+ " has been modified. Reloading distribution patterns");
}
patterns.put(plugin, loadPatterns(file));
modifiedTimes.put(fileName, file.lastModified());
} catch (DistributionException e) {
statusHandler.error(
"Error reloading distribution patterns from file: "
+ fileName, e);
}
}
}
}
/**
* Returns a list of plugins that are interested in the given header.
*
* @param header
* @return
*/
public List<String> getMatchingPlugins(String header) {
List<String> plugins = new LinkedList<String>();
for (Map.Entry<String, RequestPatterns> entry : patterns.entrySet()) {
if (entry.getValue().isDesiredHeader(header)) {
plugins.add(entry.getKey());
}
}
return plugins;
}
/**
* Returns a list of plugins that are interested in the given header.
*
* @param header
* @param pluginsToCheck
* @return
*/
public List<String> getMatchingPlugins(String header,
Collection<String> pluginsToCheck) {
List<String> plugins = new LinkedList<String>();
for (String plugin : pluginsToCheck) {
RequestPatterns pattern = patterns.get(plugin);
if ((pattern != null) && pattern.isDesiredHeader(header)) {
plugins.add(plugin);
}
}
return plugins;
}
/**
* Returns true if there are patterns registered for the given plugin, false
* otherwise.
*
* @param pluginName
* @return
*/
public boolean hasPatternsForPlugin(String pluginName) {
return patterns.containsKey(pluginName);
}
}

View file

@ -20,11 +20,8 @@
package com.raytheon.uf.edex.distribution;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -34,16 +31,6 @@ import org.apache.camel.RecipientList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.raytheon.uf.common.localization.IPathManager;
import com.raytheon.uf.common.localization.LocalizationContext;
import com.raytheon.uf.common.localization.LocalizationContext.LocalizationType;
import com.raytheon.uf.common.localization.LocalizationFile;
import com.raytheon.uf.common.localization.PathManagerFactory;
import com.raytheon.uf.common.serialization.SerializationUtil;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
/**
* The purpose of this bean is to load a series of XML files from localization
* for each plugin registering itself with this bean and route messages based on
@ -55,108 +42,27 @@ import com.raytheon.uf.common.status.UFStatus.Priority;
* SOFTWARE HISTORY
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Oct 16, 2009 brockwoo Initial creation
* Oct 16, 2009 brockwoo Initial creation
* 6/8/2010 4647 bphillip Added automatic pattern refreshing
* 09/01/2010 4293 cjeanbap Logging of unknown Weather Products.
* Feb 27, 2013 1638 mschenke Cleaned up localization code to fix null pointer
* Feb 27, 2013 1638 mschenke Cleaned up localization code to fix null pointer
* when no distribution files present
* Mar 19, 2013 1794 djohnson PatternWrapper is immutable, add toString() to it for debugging.
* Aug 19, 2013 2257 bkowal edexBridge to qpid 0.18 upgrade
*
* Sep 06, 2013 2327 rjpeter Updated to use DistributionPatterns.
* </pre>
*
* @author brockwoo
* @version 1.0
*/
public class DistributionSrv {
private static final IUFStatusHandler statusHandler = UFStatus
.getHandler(DistributionSrv.class);
private static final String HEADER_QPID_SUBJECT = "qpid.subject";
private static class PatternWrapper {
private final String plugin;
protected Log logger = LogFactory.getLog("Ingest");
private final RequestPatterns patterns;
protected Log routeFailedLogger = LogFactory.getLog("RouteFailedLog");
private final String route;
private final String displayString;
private PatternWrapper(String plugin, String route,
RequestPatterns patterns) {
this.plugin = plugin;
this.route = route;
this.patterns = patterns;
this.displayString = createDisplayString();
}
private String createDisplayString() {
StringBuilder sb = new StringBuilder();
sb.append("plugin=").append(plugin).append(", ");
sb.append("route=").append(route).append(", ");
sb.append("patterns=").append(patterns);
return sb.toString();
}
@Override
public String toString() {
return displayString;
}
}
protected transient Log logger = LogFactory.getLog("Ingest");
protected transient Log routeFailedLogger = LogFactory
.getLog("RouteFailedLog");
private final List<PatternWrapper> pluginPatterns = new ArrayList<PatternWrapper>(
100);
private final ConcurrentMap<String, PatternWrapper> patternMap = new ConcurrentHashMap<String, PatternWrapper>();
private final ConcurrentMap<String, Long> modifiedTimes = new ConcurrentHashMap<String, Long>();
public DistributionSrv() {
for (File file : getDistributionFiles()) {
modifiedTimes.put(file.getName(), file.lastModified());
}
}
/**
* Refreshes the distribution patterns if a plugin's distribution pattern
* file has been modified. This method is executed via a quartz timer every
* five seconds
*/
public synchronized void refresh() {
for (File file : getDistributionFiles()) {
if (!file.getName().endsWith("~")
&& modifiedTimes.containsKey(file.getName())
&& (modifiedTimes.get(file.getName()) < file.lastModified())) {
String plugin = file.getName().replace(".xml", "");
PatternWrapper wrapper = patternMap.get(plugin);
if (wrapper != null) {
try {
statusHandler
.handle(Priority.EVENTA,
"Change to distribution file detected. "
+ file.getName()
+ " has been modified. Reloading distribution patterns");
wrapper = new PatternWrapper(wrapper.plugin,
wrapper.route, loadPatterns(file, plugin));
patternMap.put(plugin, wrapper);
modifiedTimes.put(file.getName(), file.lastModified());
} catch (DistributionException e) {
statusHandler.handle(Priority.PROBLEM,
"Error reloading distribution patterns from file: "
+ file.getName(), e);
}
}
}
}
}
private final ConcurrentMap<String, String> pluginRoutes = new ConcurrentHashMap<String, String>();
/**
* Allows a plugin to register itself with this bean. Note: if the plugin
@ -167,49 +73,18 @@ public class DistributionSrv {
* @param destination
* a destination to send this message to
* @return an instance of this bean
* @throws EdexException
* @throws DistributionException
*/
public DistributionSrv register(String pluginName, String destination)
throws DistributionException {
IPathManager pathMgr = PathManagerFactory.getPathManager();
LocalizationContext commonStaticBase = pathMgr.getContext(
LocalizationContext.LocalizationType.EDEX_STATIC,
LocalizationContext.LocalizationLevel.BASE);
LocalizationContext siteStaticBase = pathMgr.getContext(
LocalizationContext.LocalizationType.EDEX_STATIC,
LocalizationContext.LocalizationLevel.SITE);
String path = "";
String sitePath = "";
try {
path = pathMgr.getFile(commonStaticBase,
"distribution" + File.separator + pluginName + ".xml")
.getCanonicalPath();
sitePath = pathMgr.getFile(siteStaticBase,
"distribution" + File.separator + pluginName + ".xml")
.getCanonicalPath();
} catch (IOException e) {
if (!DistributionPatterns.getInstance()
.hasPatternsForPlugin(pluginName)) {
throw new DistributionException(
"Plugin "
+ pluginName
+ " does not have an accompanying patterns file in localization.");
}
File modelFile = new File(path);
File siteModelFile = new File(sitePath);
RequestPatterns patterns = null;
if (siteModelFile.exists()) {
patterns = loadPatterns(siteModelFile, pluginName);
} else if (modelFile.exists()) {
patterns = loadPatterns(modelFile, pluginName);
} else {
patterns = new RequestPatterns();
}
PatternWrapper wrapper = new PatternWrapper(pluginName, destination,
patterns);
patternMap.put(wrapper.plugin, wrapper);
pluginPatterns.add(wrapper);
pluginRoutes.put(pluginName, destination);
return this;
}
@ -223,8 +98,6 @@ public class DistributionSrv {
*/
@RecipientList
public List<String> route(Exchange exchange) {
StringBuilder pluginNames = new StringBuilder();
List<String> dest = new ArrayList<String>();
Message in = exchange.getIn();
// determine if the header is in the qpid subject field?
String header = (String) in.getHeader(HEADER_QPID_SUBJECT);
@ -254,14 +127,22 @@ public class DistributionSrv {
// No header entry so will try and use the filename instead
header = (String) exchange.getIn().getBody();
}
for (PatternWrapper wrapper : pluginPatterns) {
if (wrapper.patterns.isDesiredHeader(header)) {
List<String> plugins = DistributionPatterns.getInstance()
.getMatchingPlugins(header, pluginRoutes.keySet());
List<String> routes = new ArrayList<String>(plugins.size());
StringBuilder pluginNames = new StringBuilder(plugins.size() * 8);
for (String plugin : plugins) {
String route = pluginRoutes.get(plugin);
if (route != null) {
if (pluginNames.length() != 0) {
pluginNames.append(",");
}
pluginNames.append(wrapper.plugin);
dest.add(wrapper.route);
pluginNames.append(plugin);
routes.add(route);
unroutedFlag = false;
} else if (logger.isDebugEnabled()) {
logger.debug("No route registered for plugin: " + plugin);
}
}
@ -270,53 +151,8 @@ public class DistributionSrv {
// using warn instead of error; app can continue
routeFailedLogger.warn(header);
}
in.setHeader("pluginName", pluginNames.toString());
return dest;
}
/**
* Loads patterns from a distribution file for the specified plugin
*
* @param modelFile
* The file containing the ingest patterns
* @param pluginName
* The plugin name associated with the ingest patterns
* @throws DistributionException
* If the modelFile cannot be deserialized
*/
private RequestPatterns loadPatterns(File modelFile, String pluginName)
throws DistributionException {
RequestPatterns patternSet = null;
try {
patternSet = SerializationUtil.jaxbUnmarshalFromXmlFile(
RequestPatterns.class, modelFile.getPath());
} catch (Exception e) {
throw new DistributionException("File "
+ modelFile.getAbsolutePath()
+ " could not be unmarshalled.", e);
}
patternSet.compilePatterns();
return patternSet;
}
/**
* Lists the files in the distribution directory
*
* @return An array of the files in the distribution directory
*/
private File[] getDistributionFiles() {
IPathManager pathMgr = PathManagerFactory.getPathManager();
LocalizationFile[] files = pathMgr.listFiles(
pathMgr.getLocalSearchHierarchy(LocalizationType.EDEX_STATIC),
"distribution", null, true, true);
Map<String, File> distFiles = new HashMap<String, File>();
for (LocalizationFile file : files) {
if (distFiles.containsKey(file.getName()) == false) {
distFiles.put(file.getName(), file.getFile());
}
}
return distFiles.values().toArray(new File[0]);
return routes;
}
}

View file

@ -51,7 +51,7 @@ import com.raytheon.uf.common.serialization.ISerializableObject;
* May 16, 2011 7317 cjeanbap Added try-catch statement
* for PatternSyntaxException.
* Mar 19, 2013 1794 djohnson Add toString() for debugging.
*
* Sep 10, 2013 2327 rjpeter Sized ArrayList declarations.
* </pre>
*
* @author brockwoo
@ -60,22 +60,22 @@ import com.raytheon.uf.common.serialization.ISerializableObject;
@XmlRootElement(name = "requestPatterns")
@XmlAccessorType(XmlAccessType.NONE)
public class RequestPatterns implements ISerializableObject{
public class RequestPatterns implements ISerializableObject {
/**
* List of patterns requested by a plugin.
*/
@XmlElements( { @XmlElement(name = "regex", type = String.class) })
private List<String> patterns = new ArrayList<String>();
private final List<Pattern> compiledPatterns = new ArrayList<Pattern>();
protected transient Log patternFailedLogger = LogFactory.getLog("PatternFailedLog");
@XmlElements({ @XmlElement(name = "regex", type = String.class) })
private List<String> patterns = new ArrayList<String>(0);
private List<Pattern> compiledPatterns = new ArrayList<Pattern>(0);
protected Log patternFailedLogger = LogFactory.getLog("PatternFailedLog");
/**
* Creates a new instance of the container.
*/
public RequestPatterns(){
public RequestPatterns() {
}
/**
@ -90,27 +90,30 @@ public class RequestPatterns implements ISerializableObject{
/**
* Sets the list of regex strings for this container.
*
* @param patterns an arraylist of regex strings
* @param patterns
* an arraylist of regex strings
*/
public void setPatterns(List<String> patterns) {
this.patterns = patterns;
}
/**
* Inserts a single string into the list.
*
* @param pattern The regex string to insert
* @param pattern
* The regex string to insert
*/
public void setPattern(String pattern) {
this.patterns.add(pattern);
}
/**
* Will compile the strings into Pattern objects.
*
*/
public void compilePatterns(){
for(String pattern : patterns) {
public void compilePatterns() {
compiledPatterns = new ArrayList<Pattern>(patterns.size());
for (String pattern : patterns) {
try {
compiledPatterns.add(Pattern.compile(pattern));
} catch (PatternSyntaxException e) {
@ -121,19 +124,19 @@ public class RequestPatterns implements ISerializableObject{
}
}
}
/**
* Takes a string and compares against the patterns in this
* container. The first one that matches breaks the search and
* returns true.
* Takes a string and compares against the patterns in this container. The
* first one that matches breaks the search and returns true.
*
* @param header The string to search for
* @param header
* The string to search for
* @return a boolean indicating success
*/
public boolean isDesiredHeader(String header) {
boolean isFound = false;
for(Pattern headerPattern : compiledPatterns) {
if(headerPattern.matcher(header).find()) {
for (Pattern headerPattern : compiledPatterns) {
if (headerPattern.matcher(header).find()) {
isFound = true;
break;
}

View file

@ -18,6 +18,11 @@
<constructor-arg value="jms-dist:queue:Ingest.dpa?destinationResolver=#qpidDurableResolver"/>
</bean>
<bean factory-bean="manualProc"
factory-method="registerSecondaryPlugin">
<constructor-arg value="dpa" />
</bean>
<camelContext id="dpa-camel"
xmlns="http://camel.apache.org/schema/spring"
errorHandlerRef="errorHandler">

View file

@ -11,13 +11,18 @@
<constructor-arg value="dhr" />
<constructor-arg value="jms-dist:queue:Ingest.dhr?destinationResolver=#qpidDurableResolver" />
</bean>
<bean id="dhrRadarServerDistRegistry" factory-bean="radarserverDistributionSrv"
factory-method="register">
<constructor-arg value="dhr" />
<constructor-arg value="jms-dist:queue:Ingest.dhr?destinationResolver=#qpidDurableResolver" />
</bean>
<bean factory-bean="manualProc"
factory-method="registerSecondaryPlugin">
<constructor-arg value="dhr" />
</bean>
<camelContext id="nonClusteredDHRroutes" xmlns="http://camel.apache.org/schema/spring"
errorHandlerRef="errorHandler">
<!-- Begin non-clustered dhr Routes -->

View file

@ -8,5 +8,8 @@ Bundle-RequiredExecutionEnvironment: JavaSE-1.6
Require-Bundle: org.apache.camel;bundle-version="1.0.0",
org.springframework;bundle-version="2.5.6",
com.raytheon.edex.common;bundle-version="1.11.17",
com.raytheon.uf.common.status
com.raytheon.uf.common.status,
com.raytheon.uf.edex.decodertools,
com.raytheon.uf.edex.distribution,
org.apache.commons.io
Import-Package: com.raytheon.uf.edex.site.ingest

View file

@ -21,18 +21,27 @@ package com.raytheon.uf.edex.plugin.manualIngest;
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TimeZone;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.springframework.util.FileCopyUtils;
import org.apache.commons.io.FileUtils;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.time.SimulatedTime;
import com.raytheon.uf.common.util.header.WMOHeaderFinder;
import com.raytheon.uf.edex.core.EDEXUtil;
import com.raytheon.uf.edex.core.EdexException;
import com.raytheon.uf.edex.core.props.PropertiesFactory;
import com.raytheon.uf.edex.decodertools.time.TimeTools;
import com.raytheon.uf.edex.distribution.DistributionPatterns;
/**
* A bean based on FileToString that will take a message generated from a file
@ -45,8 +54,8 @@ import com.raytheon.uf.edex.core.props.PropertiesFactory;
* SOFTWARE HISTORY
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Oct 28, 2009 brockwoo Initial creation
*
* Oct 28, 2009 brockwoo Initial creation
* Sep 03, 2013 2327 rjpeter Added directory routing by plugin and date of product.
* </pre>
*
* @author brockwoo
@ -65,6 +74,29 @@ public class MessageGenerator implements Processor {
private String ingestRoute = null;
private final ThreadLocal<SimpleDateFormat> sdfs = new ThreadLocal<SimpleDateFormat>() {
/*
* (non-Javadoc)
*
* @see java.lang.ThreadLocal#initialValue()
*/
@Override
protected SimpleDateFormat initialValue() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd"
+ File.separatorChar + "HH");
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
return sdf;
}
};
/**
* Set of plugins that are not the primary decoder of the data. These are
* secondary or additional information such as text, dhr, dpa, etc.
*/
private final Set<String> secondaryPlugins = new HashSet<String>();
public static MessageGenerator getInstance() {
return instance;
}
@ -77,6 +109,19 @@ public class MessageGenerator implements Processor {
this.ingestRoute = ingestRoute;
}
/**
* Register a secondary plugin, i.e. not the primary decoder of the data.
* These are plugins that provide data in a different format oradditional
* information such as text, dhr, dpa, etc.
*
* @param plugin
* @return
*/
public MessageGenerator registerSecondaryPlugin(String plugin) {
secondaryPlugins.add(plugin);
return this;
}
/*
* (non-Javadoc)
*
@ -86,14 +131,12 @@ public class MessageGenerator implements Processor {
public void process(Exchange arg0) throws Exception {
File file = (File) arg0.getIn().getBody();
if (file != null) {
String fileName = file.getName();
String messageHeader = WMOHeaderFinder.find(file);
if (messageHeader == null) {
messageHeader = fileName;
messageHeader = file.getName();
} else {
messageHeader = messageHeader.trim();
}
arg0.getIn().setBody(file.toString());
arg0.getIn().setHeader("header", messageHeader);
arg0.getIn().setHeader("enqueueTime", System.currentTimeMillis());
@ -103,21 +146,87 @@ public class MessageGenerator implements Processor {
}
}
public File copyFileToArchive(File inFile) {
String path = DIR + File.separator;
/**
* Copies the specified file to the archive directory.
*
* @param inFile
* @return
* @throws IOException
*/
public File copyFileToArchive(File inFile) throws IOException {
StringBuilder path = new StringBuilder(inFile.getPath().length());
path.append(DIR).append(File.separatorChar);
// find header and determine file date
Date fileTime = null;
String header = WMOHeaderFinder.find(inFile);
if (header == null) {
header = inFile.getName();
} else {
header = header.trim();
try {
String dtg = WMOHeaderFinder.findDtg(header);
Calendar headerTime = TimeTools.findCurrentTime(dtg,
inFile.getName());
if (headerTime != null) {
fileTime = headerTime.getTime();
}
} catch (Exception e) {
statusHandler.error("Exception occurred parsing WMO Header", e);
}
}
// determine the plugin
List<String> plugins = DistributionPatterns.getInstance()
.getMatchingPlugins(header);
int numPlugins = plugins.size();
if (numPlugins == 1) {
path.append(plugins.get(0)).append(File.separatorChar);
} else if (numPlugins > 1) {
if (plugins.size() <= secondaryPlugins.size()) {
// check for a non secondary plugin,
String plugin = null;
for (String pluginToCheck : plugins) {
if (!secondaryPlugins.contains(pluginToCheck)) {
plugin = pluginToCheck;
break;
}
}
if (plugin == null) {
// didn't find a non secondary plugin, just grab first
// plugin
plugin = plugins.get(0);
}
path.append(plugin).append(File.separatorChar);
} else {
// remove secondary and grab first one
plugins.removeAll(secondaryPlugins);
path.append(plugins.get(0)).append(File.separatorChar);
}
} else {
path.append("unknown").append(File.separatorChar);
}
// append YYYYMMDD/HH
if (fileTime == null) {
// default to current time
fileTime = SimulatedTime.getSystemTime().getTime();
}
path.append(sdfs.get().format(fileTime)).append(File.separatorChar);
// Determine the sub-directory
String inputPath = inFile.getParent();
// Split on the manual directory to get the sub-directory
String[] parts = inputPath.split("manual");
File dir = null;
if (parts.length > 1) {
dir = new File(path + parts[1]);
} else {
dir = new File(path);
path.append(parts[1]);
}
File dir = new File(path.toString());
if (!dir.exists()) {
dir.mkdirs();
}
@ -125,7 +234,7 @@ public class MessageGenerator implements Processor {
File newFile = new File(dir, inFile.getName());
try {
FileCopyUtils.copy(inFile, newFile);
FileUtils.copyFile(inFile, newFile);
statusHandler.handle(Priority.INFO,
"DataManual: " + inFile.getAbsolutePath());
} catch (IOException e) {
@ -137,7 +246,14 @@ public class MessageGenerator implements Processor {
return newFile;
}
public File moveFileToArchive(File inFile) {
/**
* Moves the specified file to the archive directory.
*
* @param inFile
* @return
* @throws IOException
*/
public File moveFileToArchive(File inFile) throws IOException {
File newFile = copyFileToArchive(inFile);
if (newFile != null) {
inFile.delete();
@ -145,10 +261,25 @@ public class MessageGenerator implements Processor {
return newFile;
}
/**
* Copies a file to the archive directory and sends the path to the manual
* ingest route.
*
* @param inFile
* @return
*/
public boolean sendFileToIngest(String inFile) {
return sendFileToIngest(inFile, ingestRoute);
}
/**
* Copies a file to the archive directory and sends the path to the
* specified route.
*
* @param inFile
* @param route
* @return
*/
public boolean sendFileToIngest(String inFile, String route) {
boolean rval = true;
@ -156,7 +287,7 @@ public class MessageGenerator implements Processor {
File archiveFile = copyFileToArchive(new File(inFile));
EDEXUtil.getMessageProducer().sendAsync(route,
archiveFile.getAbsolutePath());
} catch (EdexException e) {
} catch (Exception e) {
rval = false;
statusHandler.handle(Priority.ERROR, "Failed to insert file ["
+ inFile + "] into ingest stream", e);

View file

@ -15,6 +15,11 @@
<constructor-arg value="nctext" />
<constructor-arg value="jms-dist:queue:ingest.nctext" />
</bean>
<bean factory-bean="manualProc"
factory-method="registerSecondaryPlugin">
<constructor-arg value="nctext" />
</bean>
<bean id="nctextCamelRegistered" factory-bean="contextManager"
factory-method="register" depends-on="persistCamelRegistered">

View file

@ -18,7 +18,7 @@
<!-- class=" com.raytheon.uf.edex.distribution.DistributionSrv" -->
<bean id="stormTrackDistRegistry" factory-bean="distributionSrv"
factory-method="register">
<constructor-arg value="stormTrack" />
<constructor-arg value="stormtrack" />
<!-- <constructor-arg value="jms-generic:queue:Ingest.AlphaNumeric" /> -->
<constructor-arg value="jms-dist:queue:Ingest.stormTrack" />
</bean>