Issue #2170: Update Plugin Notification Framework

Change-Id: I2ec99fadbe2f383bb0892c59601350395f1e5277

Former-commit-id: 2085a9cd33 [formerly 081a19d5cfca3d717a332357dbaead199767e0b8]
Former-commit-id: fbda83aeb9
This commit is contained in:
Richard Peter 2013-11-19 16:58:45 -06:00
parent a5cf904e1f
commit f7f9a4e57c
34 changed files with 1249 additions and 344 deletions

View file

@ -30,16 +30,6 @@
<constructor-arg ref="smartInitSrvCfg"/>
</bean>
<bean factory-bean="pluginNotifier" factory-method="register">
<constructor-arg value="warning" />
<constructor-arg value="jms-warning:queue:edex.spcWatch" />
</bean>
<bean factory-bean="pluginNotifier" factory-method="register">
<constructor-arg value="warning" />
<constructor-arg value="jms-warning:queue:edex.tpcWatch" />
</bean>
<bean id="spcWatch" class="com.raytheon.edex.plugin.gfe.spc.SPCWatchSrv"/>
<bean id="tpcWatch" class="com.raytheon.edex.plugin.gfe.tpc.TPCWatchSrv"/>
<bean id="wclWatch" class="com.raytheon.edex.plugin.gfe.wcl.WCLWatchSrv"/>
@ -48,7 +38,7 @@
<camelContext id="gfe-camel-spring" xmlns="http://camel.apache.org/schema/spring" errorHandlerRef="errorHandler">
<route id="SPCWatch">
<from uri="jms-durable:queue:edex.spcWatch"/>
<from uri="vm:edex.spcWatch"/>
<doTry>
<bean ref="spcWatch" method="handleSpcWatch"/>
<doCatch>
@ -60,7 +50,7 @@
</route>
<route id="TPCWatch">
<from uri="jms-durable:queue:edex.tpcWatch"/>
<from uri="vm:edex.tpcWatch"/>
<doTry>
<bean ref="tpcWatch" method="handleTpcWatch"/>
<doCatch>
@ -82,6 +72,7 @@
</route>
<route id="gfeIngestNotification">
<!-- Data from plugin notification -->
<from
uri="jms-durable:queue:gfeDataURINotification"/>
<doTry>
@ -141,18 +132,6 @@
</route>
<!-- Convert the topic into a queue so only one consumer gets each message and we still have competing consumers. -->
<route id="gfeDataURINotificationQueueRoute">
<from
uri="jms-generic:topic:edex.alerts"/>
<doTry>
<to uri="jms-durable:queue:gfeDataURINotification"/>
<doCatch>
<exception>java.lang.Throwable</exception>
<to
uri="log:ifpServer?level=ERROR"/>
</doCatch>
</doTry>
</route>
</camelContext>
<bean factory-bean="clusteredCamelContextMgr" factory-method="register">

View file

@ -62,7 +62,8 @@ public class SPCWatchSrv {
protected transient Log logger = LogFactory.getLog(getClass());
public void handleSpcWatch(PluginDataObject[] pdos) throws EdexException {
public void handleSpcWatch(List<PluginDataObject> pdos)
throws EdexException {
// create the appropriate SPC notification, returns null if not
// needed.
EnvProperties env = PropertiesFactory.getInstance().getEnvProperties();

View file

@ -92,7 +92,8 @@ public class TPCWatchSrv {
protected transient Log logger = LogFactory.getLog(getClass());
public void handleTpcWatch(PluginDataObject[] pdos) throws EdexException {
public void handleTpcWatch(List<PluginDataObject> pdos)
throws EdexException {
EnvProperties env = PropertiesFactory.getInstance().getEnvProperties();
String primarySite = env.getEnvValue("SITENAME");

View file

@ -0,0 +1,20 @@
<pluginNotificationList>
<!-- TODO: Run time generate to only listen to the models/satellites GFE is interested in -->
<pluginNotification>
<endpointName>gfeDataURINotification</endpointName>
<format>DATAURI</format>
<endpointType>QUEUE</endpointType>
<durable>true</durable>
<timeToLive>600000</timeToLive>
<metadataMap>
<mapping key="pluginName">
<constraint constraintValue="grid" constraintType="EQUALS"/>
</mapping>
</metadataMap>
<metadataMap>
<mapping key="pluginName">
<constraint constraintValue="satellite" constraintType="EQUALS"/>
</mapping>
</metadataMap>
</pluginNotification>
</pluginNotificationList>

View file

@ -0,0 +1,22 @@
<pluginNotificationList>
<pluginNotification>
<endpointName>gfe.spcWatch</endpointName>
<format>PDO</format>
<endpointType>VM</endpointType>
<metadataMap>
<mapping key="pluginName">
<constraint constraintValue="warning" constraintType="EQUALS"/>
</mapping>
</metadataMap>
</pluginNotification>
<pluginNotification>
<endpointName>gfe.tpcWatch</endpointName>
<format>PDO</format>
<endpointType>VM</endpointType>
<metadataMap>
<mapping key="pluginName">
<constraint constraintValue="warning" constraintType="EQUALS"/>
</mapping>
</metadataMap>
</pluginNotification>
</pluginNotificationList>

View file

@ -227,7 +227,7 @@ public class EnsembleGridAssembler implements IDecoderPostProcessor {
updateExistingRecord(record, assembledRecord, thinned, dao);
}
EDEXUtil.getMessageProducer().sendAsync("notificationAggregation",
new String[] { assembledRecord.getDataURI() });
record);
}
private GridRecord createAssembledRecord(GridRecord record,

View file

@ -74,8 +74,7 @@
<from uri="direct-vm:modelSoundingPersistIndexAlert"/>
<bean ref="persist" method="persist"/>
<bean ref="index" method="index"/>
<bean ref="toDataURI" method="toDataURI"/>
<to uri="vm:stageNotification"/>
<to uri="direct-vm:stageNotification"/>
</route>
</camelContext>
</beans>

View file

@ -65,9 +65,9 @@
<bean ref="stringToFile" />
<bean ref="warningDecoder" method="decode" />
<bean ref="index" method="index" />
<bean ref="processUtil" method="log" />
<multicast parallelProcessing="false">
<to uri="direct-vm:warningIngestAlert" />
<bean ref="pluginNotifier" method="send"/>
<filter>
<method bean="vtecFilter" method="hasVTEC" />
<to uri="jms-warning:queue:activeTablePending"/>
@ -83,9 +83,7 @@
<route id="warningIngestAlert">
<from uri="direct-vm:warningIngestAlert" />
<bean ref="toDataURI" method="toDataURI" />
<bean ref="processUtil" method="log" />
<to uri="vm:stageNotification" />
<to uri="direct-vm:stageNotification" />
</route>
</camelContext>
</beans>

View file

@ -35,7 +35,6 @@
value="com.raytheon.uf.common.dataplugin.warning.WarningRecord" />
</bean>
<bean id="practiceProductOfftimeHandler" class="com.raytheon.uf.edex.activetable.PracticeProductOfftimeHandler"/>
<bean id="uriAggregator" class="com.raytheon.uf.edex.ingest.notification.DataUriAggregator" />
<bean id="toDataURI" class="com.raytheon.uf.edex.ingest.notification.ToDataURI" />
<camelContext id="activetable-camel"
xmlns="http://camel.apache.org/schema/spring"
@ -54,9 +53,8 @@
<filter>
<method bean="vtecFilter" method="hasVTEC" />
<bean ref="activeTableSrv" method="practiceVtecArrived" />
<bean ref="toDataURI" method="toDataURI"/>
<bean ref="uriAggregator" method="addDataUris" />
<bean ref="uriAggregator" method="sendPracticeQueuedUris" />
<bean ref="toDataURI" method="toPracticeNotificationMsg"/>
<bean ref="serializationUtil" method="transformToThrift" />
<to uri="jms-generic:topic:edex.alerts.practicewarning?timeToLive=60000&amp;deliveryPersistent=false"/>
</filter>
<doCatch>

View file

@ -2,4 +2,5 @@ source.. = src/
output.. = bin/
bin.includes = META-INF/,\
.,\
res/
res/,\
utility/

View file

@ -16,32 +16,15 @@
autoStartup="false">
<route id="cpgProcessAlerts">
<from uri="direct-vm:processCPGAlerts"/>
<bean ref="toDataURI" method="toDataURI"/>
<to uri="vm:stageNotification"/>
<to uri="direct-vm:stageNotification"/>
</route>
</camelContext>
<camelContext id="clusteredCpgSrvRoutes"
xmlns="http://camel.apache.org/schema/spring"
errorHandlerRef="errorHandler"
autoStartup="false">
<!-- Need to reroute to a queue to allow for multiple jms consumers -->
<route id="cpgsrvTopicToQueueRoute">
<from uri="jms-generic:topic:edex.alerts"/>
<!-- technically with qpid should be able to make this a durable subscription and not need to forward to another queue -->
<doTry>
<multicast>
<to uri="jms-durable:queue:cpgsrvFiltering"/>
<to uri="jms-durable:queue:scanCpgsrvFiltering"/>
<to uri="jms-durable:queue:ffmpCpgsrvFiltering"/>
</multicast>
<doCatch>
<exception>java.lang.Throwable</exception>
<to uri="log:cpgSrv?level=ERROR"/>
</doCatch>
</doTry>
</route>
<route id="cpgsrvListenerRoute">
<!-- technically with qpid should be able to make this a durable subscription and not need to forward to another queue -->
<!-- Data from plugin notification -->
<from uri="jms-durable:queue:cpgsrvFiltering?concurrentConsumers=5"/>
<doTry>
<pipeline>

View file

@ -0,0 +1,15 @@
<pluginNotificationList>
<!-- TODO: Run time generate to only listen to the radars/models DAT is interested in -->
<pluginNotification>
<endpointName>cpgsrvFiltering</endpointName>
<format>DATAURI</format>
<endpointType>QUEUE</endpointType>
<durable>true</durable>
<timeToLive>120000</timeToLive>
<metadataMap>
<mapping key="pluginName">
<constraint constraintValue="grid, radar, qpf, obs, sfcobs, ldadmesonet, satellite" constraintType="IN"/>
</mapping>
</metadataMap>
</pluginNotification>
</pluginNotificationList>

View file

@ -88,8 +88,7 @@
<route id="dataDeliveryNotify">
<from uri="direct-vm:dataDeliveryNotify" />
<bean ref="toDataURI" method="toDataURI" />
<to uri="vm:stageNotification" />
<to uri="direct-vm:stageNotification" />
</route>
</camelContext>

View file

@ -2,4 +2,5 @@ source.. = src/
output.. = bin/
bin.includes = META-INF/,\
.,\
res/
res/,\
utility/

View file

@ -8,27 +8,20 @@
factory-method="getInstance" />
<camelContext id="grid-staticdata-process" xmlns="http://camel.apache.org/schema/spring"
errorHandlerRef="errorHandler" autoStartup="false">
errorHandlerRef="errorHandler">
<!-- Begin Grid Process Route -->
<route id="gridTopoNotificationRoute">
<from uri="jms-generic:topic:edex.alerts" />
<route id="gridStaticDataGenerationRoute">
<!-- Fed from plugin notification -->
<from uri="vm:grid-staticdata-generate" />
<doTry>
<bean ref="serializationUtil" method="transformFromThrift" />
<bean ref="staticDataGenerator" method="processNotification"/>
<bean ref="toDataURI" method="toDataURI"/>
<to uri="vm:stageNotification"/>
<to uri="direct-vm:stageNotification"/>
<doCatch>
<exception>java.lang.Throwable</exception>
<to
uri="log:grid-staticdata?level=ERROR" />
<to uri="log:grid-staticdata?level=ERROR" />
</doCatch>
</doTry>
</route>
</camelContext>
<bean id="gridStaticDataProcessCamelRegistered" factory-bean="clusteredCamelContextMgr"
factory-method="register">
<constructor-arg ref="grid-staticdata-process" />
</bean>
</beans>

View file

@ -108,7 +108,7 @@ public class StaticDataGenerator {
* whether static data has been created to avoid too many trips to db to
* check.
*/
private Map<CacheKey, CacheKey> dataGeneratedCache = Collections
private final Map<CacheKey, CacheKey> dataGeneratedCache = Collections
.synchronizedMap(new LinkedHashMap<CacheKey, CacheKey>(
(int) (CACHE_SIZE / 0.75f) + 1, 0.75f, true) {
@ -138,8 +138,9 @@ public class StaticDataGenerator {
*/
public GridRecord[] processNotification(DataURINotificationMessage msg)
throws Exception {
Set<GridRecord> staticRecords = new HashSet<GridRecord>();
for (String dataURI : msg.getDataURIs()) {
String[] uris = msg.getDataURIs();
Set<GridRecord> staticRecords = new HashSet<GridRecord>(uris.length, 1);
for (String dataURI : uris) {
if (dataURI.startsWith("/grid/")) {
try {
GridRecord record = new GridRecord(dataURI);
@ -376,8 +377,6 @@ public class StaticDataGenerator {
staticRecord.setLevel(LevelFactory.getInstance().getLevel("Dflt", 0,
"m"));
staticRecord.setDataTime(dataTime);
staticRecord.constructDataURI();
return staticRecord;
}
@ -458,38 +457,47 @@ public class StaticDataGenerator {
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + coverageid;
result = prime * result
result = (prime * result) + coverageid;
result = (prime * result)
+ ((datasetid == null) ? 0 : datasetid.hashCode());
result = prime * result + forecastTime;
result = prime * result
result = (prime * result) + forecastTime;
result = (prime * result)
+ ((refTime == null) ? 0 : refTime.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
if (this == obj) {
return true;
if (obj == null)
}
if (obj == null) {
return false;
if (getClass() != obj.getClass())
}
if (getClass() != obj.getClass()) {
return false;
}
CacheKey other = (CacheKey) obj;
if (coverageid != other.coverageid)
if (coverageid != other.coverageid) {
return false;
}
if (datasetid == null) {
if (other.datasetid != null)
if (other.datasetid != null) {
return false;
} else if (!datasetid.equals(other.datasetid))
}
} else if (!datasetid.equals(other.datasetid)) {
return false;
if (forecastTime != other.forecastTime)
}
if (forecastTime != other.forecastTime) {
return false;
}
if (refTime == null) {
if (other.refTime != null)
if (other.refTime != null) {
return false;
} else if (!refTime.equals(other.refTime))
}
} else if (!refTime.equals(other.refTime)) {
return false;
}
return true;
}

View file

@ -0,0 +1,12 @@
<pluginNotificationList>
<pluginNotification>
<endpointName>grid-staticdata-generate</endpointName>
<endpointType>VM</endpointType>
<format>DATAURI</format>
<metadataMap>
<mapping key="pluginName">
<constraint constraintValue="grid" constraintType="EQUALS"/>
</mapping>
</metadataMap>
</pluginNotification>
</pluginNotificationList>

View file

@ -5,6 +5,7 @@ Bundle-SymbolicName: com.raytheon.uf.edex.ingest
Bundle-Version: 1.12.1174.qualifier
Bundle-Vendor: RAYTHEON
Require-Bundle: com.raytheon.edex.common,
com.raytheon.uf.common.localization,
com.google.guava,
org.apache.camel,
org.apache.commons.lang,

View file

@ -2,4 +2,5 @@ source.. = src/
output.. = bin/
bin.includes = META-INF/,\
.,\
res/
res/,\
utility/

View file

@ -6,10 +6,6 @@
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">
<bean id="uriAggregator" class="com.raytheon.uf.edex.ingest.notification.DataUriAggregator" />
<bean id="toDataURI" class="com.raytheon.uf.edex.ingest.notification.ToDataURI" />
<bean id="persist" class="com.raytheon.uf.edex.ingest.PersistSrv" factory-method="getInstance"/>
<bean id="index" class="com.raytheon.uf.edex.ingest.IndexSrv"/>
@ -21,7 +17,6 @@
</bean>
<camelContext id="persist-camel" xmlns="http://camel.apache.org/schema/spring" errorHandlerRef="errorHandler">
<!-- Generic persist and indexing
Intended for routes that need persisting to HDF5,
Indexing but no alert processing
@ -42,8 +37,7 @@
<bean ref="persist" method="persist"/>
<bean ref="index" method="index"/>
<bean ref="processUtil" method="log"/>
<bean ref="toDataURI" method="toDataURI"/>
<to uri="vm:stageNotification"/>
<to uri="direct-vm:stageNotification"/>
</route>
<!-- Generic index and alert route
@ -53,22 +47,17 @@
<from uri="direct-vm:indexAlert"/>
<bean ref="index" method="index"/>
<bean ref="processUtil" method="log"/>
<bean ref="toDataURI" method="toDataURI"/>
<to uri="vm:stageNotification"/>
<to uri="direct-vm:stageNotification"/>
</route>
<route id="notificationAggregation">
<from uri="vm:stageNotification"/>
<bean ref="uriAggregator" method="addDataUris" />
<from uri="direct-vm:stageNotification"/>
<bean ref="pluginNotifier" method="notify" />
</route>
<route id="notificationTimer">
<from uri="timer://notificationTimer?fixedRate=true&amp;period=1000" />
<filter>
<method bean="uriAggregator" method="hasUris" />
<bean ref="uriAggregator" method="sendQueuedUris" />
<to uri="jms-generic:topic:edex.alerts?timeToLive=60000"/>
</filter>
<bean ref="pluginNotifier" method="sendQueuedNotifications" />
</route>
</camelContext>
</beans>

View file

@ -1,146 +0,0 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.edex.ingest.notification;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.GZIPOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import com.raytheon.uf.common.serialization.ISerializableObject;
import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.common.serialization.SerializationUtil;
import com.raytheon.uf.common.dataplugin.message.DataURINotificationMessage;
import com.raytheon.uf.common.dataplugin.message.PracticeDataURINotificationMessage;
import com.raytheon.uf.common.util.ByteArrayOutputStreamPool;
/**
* Combines multiple messages of URIs into a single message that can be
* triggered
*
* <pre>
* SOFTWARE HISTORY
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Dec 10, 2008 njensen Initial creation
* Feb 15, 2013 1638 mschenke Moved DataURINotificationMessage to uf.common.dataplugin
* Aug 16, 2013 2169 bkowal gzip data uris
* </pre>
*
* @author njensen
* @version 1.0
*/
public class DataUriAggregator {
private List<String> dataUris = new ArrayList<String>();
/**
* Add data uris to the queue
*
* @param uris
*/
public void addDataUris(String[] uris) {
synchronized (this) {
for (String uri : uris) {
if (uri != null) {
dataUris.add(uri);
}
}
}
}
/**
* Filters quartz messages to only send a message if there are URIs to send.
*
* @param obj
* @return
*/
public boolean hasUris(Object obj) {
synchronized (this) {
return dataUris.size() > 0;
}
}
/**
* Send off the currently queued uris
*
* @return
*/
public byte[] sendQueuedUris() throws SerializationException {
DataURINotificationMessage msg = null;
synchronized (this) {
String[] uris = dataUris.toArray(new String[dataUris.size()]);
dataUris.clear();
msg = new DataURINotificationMessage();
msg.setDataURIs(uris);
}
return this.encodeMessage(msg);
}
/**
* Send off the currently queued uris
*
* @return
*/
public byte[] sendPracticeQueuedUris() throws SerializationException {
PracticeDataURINotificationMessage msg = null;
synchronized (this) {
String[] uris = dataUris.toArray(new String[dataUris.size()]);
dataUris.clear();
msg = new PracticeDataURINotificationMessage();
msg.setDataURIs(uris);
}
return this.encodeMessage(msg);
}
public byte[] encodeMessage(ISerializableObject msg)
throws SerializationException {
ByteArrayOutputStream baos = ByteArrayOutputStreamPool.getInstance()
.getStream();
GZIPOutputStream gzippedURIs = null;
try {
gzippedURIs = new GZIPOutputStream(baos);
} catch (IOException e) {
throw new SerializationException(
"Failed to prepare the gzipped data stream!", e);
}
SerializationUtil.transformToThriftUsingStream(msg, gzippedURIs);
try {
gzippedURIs.finish();
gzippedURIs.flush();
return baos.toByteArray();
} catch (IOException e) {
throw new SerializationException(
"Failed to write the gzipped data stream!", e);
} finally {
try {
gzippedURIs.close();
} catch (IOException e) {
// ignore, we no longer need the stream
}
}
}
}

View file

@ -0,0 +1,57 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
*/
package com.raytheon.uf.edex.ingest.notification;
/**
* Thrown if a configuration is invalid.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Nov 19, 2013 2170 rjpeter Initial creation
*
* </pre>
*
* @author rjpeter
* @version 1.0
*/
public class InvalidNotificationConfigException extends Exception {
private static final long serialVersionUID = 1L;
public InvalidNotificationConfigException() {
super();
}
public InvalidNotificationConfigException(String message, Throwable cause) {
super(message, cause);
}
public InvalidNotificationConfigException(String message) {
super(message);
}
public InvalidNotificationConfigException(Throwable cause) {
super(cause);
}
}

View file

@ -19,24 +19,47 @@
**/
package com.raytheon.uf.edex.ingest.notification;
import java.util.ArrayList;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.RecipientList;
import javax.xml.bind.JAXBException;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import com.raytheon.uf.common.dataplugin.PluginDataObject;
import com.raytheon.uf.common.dataplugin.PluginException;
import com.raytheon.uf.common.dataplugin.annotations.DataURIUtil;
import com.raytheon.uf.common.dataquery.DecisionTree;
import com.raytheon.uf.common.dataquery.requests.RequestConstraint;
import com.raytheon.uf.common.localization.IPathManager;
import com.raytheon.uf.common.localization.LocalizationFile;
import com.raytheon.uf.common.localization.PathManagerFactory;
import com.raytheon.uf.common.localization.exception.LocalizationException;
import com.raytheon.uf.common.serialization.JAXBManager;
import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.common.status.IPerformanceStatusHandler;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.PerformanceStatus;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.edex.core.EDEXUtil;
import com.raytheon.uf.common.time.util.ITimer;
import com.raytheon.uf.common.time.util.TimeUtil;
import com.raytheon.uf.edex.core.EdexException;
import com.raytheon.uf.edex.ingest.notification.PluginNotifierConfig.EndpointType;
import com.raytheon.uf.edex.ingest.notification.PluginNotifierConfig.NotifyFormat;
import com.raytheon.uf.edex.ingest.notification.router.DataUriRouter;
import com.raytheon.uf.edex.ingest.notification.router.INotificationRouter;
import com.raytheon.uf.edex.ingest.notification.router.PdoRouter;
/**
* Plugins can register routes with this and then be generically fired to. Helps
* to reduce dependencies as we no longer need to call a route directly.
* to reduce dependencies as we no longer need to call a route directly. All
* registration must occur before messages are being picked up. Otherwise
* concurrency problems may occur.
*
* <pre>
*
@ -44,8 +67,9 @@ import com.raytheon.uf.edex.core.EdexException;
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Jun 13, 2013 mnash Initial creation
*
* Jun 13, 2013 mnash Initial creation.
* Nov 19, 2013 2170 rjpeter Add plugin contributed config files, filtering,
* and support for pdo vs datauri.
* </pre>
*
* @author mnash
@ -53,55 +77,311 @@ import com.raytheon.uf.edex.core.EdexException;
*/
public class PluginNotifier {
private static final IUFStatusHandler theHandler = UFStatus
.getHandler(PluginNotifier.class);
private static final Multimap<String, String> routes = ArrayListMultimap
.create();
private final IPerformanceStatusHandler perfLog = PerformanceStatus
.getHandler("Notification:");
private static final PluginNotifier distributor = new PluginNotifier();
private PluginNotifier() {
}
public PluginNotifier getInstance() {
return distributor;
}
private static final int DEFAULT_TIME_TO_LIVE = 300000;
/**
* Normally called from Spring, this allows a set of commands to be
* registered for a specific plugin.
*
* @param name
* @param command
* @return
* Decision tree for plugin notification.
*/
public PluginNotifier register(String name, String command) {
routes.put(name, command);
return this;
}
private final DecisionTree<INotificationRouter> tree = new DecisionTree<INotificationRouter>();
private final List<INotificationRouter> receiveAllRoutes = new LinkedList<INotificationRouter>();
private final List<INotificationRouter> filteredRoutes = new LinkedList<INotificationRouter>();
/**
* Takes the pluginName, which is what was registered for, and sends to all
* the routes that are registered as that.
*
* @param exchange
* @return
* Set of loaded names. Used for duplicate detection.
*/
@RecipientList
public List<String> send(Exchange exchange) {
Message in = exchange.getIn();
String name = (String) in.getHeader("pluginName");
List<String> list = new ArrayList<String>();
for (String route : routes.get(name)) {
private final Set<String> loadedNames = new HashSet<String>();
public PluginNotifier() throws JAXBException {
loadConfigurations();
}
private synchronized void loadConfigurations() throws JAXBException {
JAXBManager mgr = new JAXBManager(PluginNotifierConfigList.class,
PluginNotifierConfig.class);
IPathManager pathMgr = PathManagerFactory.getPathManager();
LocalizationFile[] files = pathMgr.listStaticFiles("notification",
new String[] { ".xml" }, false, true);
for (LocalizationFile lf : files) {
try {
EDEXUtil.getMessageProducer().sendAsyncUri(route, in.getBody());
} catch (EdexException e) {
File f = lf.getFile(true);
if (f.length() > 0) {
// empty files may be used to override base files to remove
// functionality
InputStream is = null;
try {
is = lf.openInputStream();
PluginNotifierConfigList confList = (PluginNotifierConfigList) mgr
.unmarshalFromInputStream(is);
List<PluginNotifierConfig> configs = confList
.getNotificationConfigs();
if ((configs != null) && !configs.isEmpty()) {
for (PluginNotifierConfig conf : configs) {
register(conf, false);
}
}
} catch (SerializationException e) {
theHandler.handle(Priority.PROBLEM,
"Unable to send message to " + route, e);
"Unable to deserialize " + f.getPath(), e);
} catch (InvalidNotificationConfigException e) {
theHandler.handle(
Priority.PROBLEM,
"Unable to load plugin configuration "
+ f.getPath(), e);
} finally {
if (is != null) {
try {
is.close();
} catch (IOException e) {
// ignore
}
}
}
}
} catch (LocalizationException e) {
theHandler.handle(Priority.PROBLEM,
"Error occurred accessing file: " + lf, e);
}
}
rebuildTree();
}
/**
* Register the given PluginNotifierConfig.
*
* @param config
* @return
*/
public synchronized void register(PluginNotifierConfig config)
throws InvalidNotificationConfigException {
register(config, true);
}
/**
* Register the given PluginNotifierConfig.
*
* @param config
* @param rebuildTree
* Whether or not to rebuild the internal tree. If many things
* are being registered can improve performance to only build
* tree once at the end.
* @return
*/
public synchronized void register(PluginNotifierConfig config,
boolean rebuildTree) throws InvalidNotificationConfigException {
validate(config);
INotificationRouter router = null;
switch (config.getFormat()) {
case DATAURI:
router = new DataUriRouter(config);
break;
case PDO:
router = new PdoRouter(config);
break;
default:
throw new InvalidNotificationConfigException(
"No INotificationRouter registered for format: "
+ config.getFormat());
}
Map<String, RequestConstraint>[] metadataMaps = config.getMetadataMap();
boolean receiveAll = (metadataMaps == null)
|| (metadataMaps.length == 0)
|| ((metadataMaps.length == 1) && ((metadataMaps[0] == null) || metadataMaps[0]
.isEmpty()));
if (receiveAll) {
// null or empty constraint map implies receive all data
receiveAllRoutes.add(router);
} else {
filteredRoutes.add(router);
for (Map<String, RequestConstraint> metadataMap : metadataMaps) {
tree.insertCriteria(metadataMap, router);
}
if (rebuildTree) {
tree.rebuildTree();
}
}
loadedNames.add(config.getEndpointName());
}
/**
* Validate the passed config
*
* @param config
* @return
*/
private void validate(PluginNotifierConfig config)
throws InvalidNotificationConfigException {
String endpoint = config.getEndpointName();
if ((endpoint == null) || (endpoint.trim().length() == 0)) {
throw new InvalidNotificationConfigException(
"endpointName is required");
}
if (loadedNames.contains(endpoint)) {
throw new InvalidNotificationConfigException("PluginConfiguration "
+ endpoint + ": endpointName is already in use");
}
EndpointType type = config.getEndpointType();
if (type == null) {
StringBuilder msg = new StringBuilder(180);
msg.append("PluginConfiguration ")
.append(endpoint)
.append(": missing required field endpointType. Valid values for ")
.append(NotifyFormat.PDO).append(" format are ")
.append(EndpointType.DIRECTVM).append(" and ")
.append(EndpointType.VM).append(". Valid values for ")
.append(NotifyFormat.DATAURI).append(" format are ")
.append(EndpointType.DIRECTVM).append(", ")
.append(EndpointType.VM).append(", ")
.append(EndpointType.QUEUE).append(", and ")
.append(EndpointType.TOPIC);
throw new InvalidNotificationConfigException(msg.toString());
}
NotifyFormat format = config.getFormat();
if (NotifyFormat.PDO.equals(format)
&& (EndpointType.QUEUE.equals(type) || EndpointType.TOPIC
.equals(type))) {
StringBuilder msg = new StringBuilder(120);
msg.append("PluginConfiguration ").append(endpoint)
.append(": endpointType ").append(type)
.append(" is invalid for format ").append(format)
.append(". Valid values for ").append(NotifyFormat.PDO)
.append(" format are ").append(EndpointType.DIRECTVM)
.append(" and ").append(EndpointType.VM);
throw new InvalidNotificationConfigException(msg.toString());
}
if (!EndpointType.QUEUE.equals(type) && config.isDurable()) {
theHandler.warn("PluginConfiguration: " + endpoint
+ " durable setting only valid on QUEUE type endpoints");
}
if ((EndpointType.QUEUE.equals(type) || EndpointType.TOPIC.equals(type))
&& (config.getTimeToLive() < 0)) {
theHandler
.warn("PluginConfiguration: "
+ endpoint
+ " has invalid time to live. Time to live for JMS endpoints must be 0 or greater. Setting to default of: "
+ DEFAULT_TIME_TO_LIVE + " ms");
config.setTimeToLive(DEFAULT_TIME_TO_LIVE);
}
}
/**
* Rebuild the tree based on all register'd configurations.
*/
public void rebuildTree() {
tree.rebuildTree();
}
/**
* Checks the pdo's against the registered routes. Data will then be
* transformed and queued or sent immediately depending on configuration.
*
* @param pdos
* @return
*/
public void notify(PluginDataObject... pdos) {
if ((pdos != null) && (pdos.length > 0)) {
ITimer timer = TimeUtil.getTimer();
timer.start();
if (!receiveAllRoutes.isEmpty()) {
for (PluginDataObject pdo : pdos) {
for (INotificationRouter router : receiveAllRoutes) {
router.process(pdo);
}
}
for (INotificationRouter router : receiveAllRoutes) {
try {
router.sendImmediateData();
} catch (EdexException e) {
theHandler.handle(
Priority.PROBLEM,
"Unable to send notification data to "
+ router.getRoute(), e);
}
}
}
if (!filteredRoutes.isEmpty()) {
Set<INotificationRouter> routesWithData = new HashSet<INotificationRouter>();
for (PluginDataObject pdo : pdos) {
try {
List<INotificationRouter> routers = tree
.searchTree(DataURIUtil.createDataURIMap(pdo));
for (INotificationRouter router : routers) {
router.process(pdo);
routesWithData.add(router);
}
} catch (PluginException e) {
theHandler.handle(Priority.PROBLEM,
e.getLocalizedMessage(), e);
}
}
for (INotificationRouter router : routesWithData) {
try {
router.sendImmediateData();
} catch (EdexException e) {
theHandler.handle(
Priority.PROBLEM,
"Unable to send notification data to "
+ router.getRoute(), e);
}
}
}
timer.stop();
perfLog.logDuration("Processed " + pdos.length + " pdos",
timer.getElapsedTime());
}
}
/**
* Send the queued notifications.
*
* @return
*/
public void sendQueuedNotifications() {
for (INotificationRouter router : receiveAllRoutes) {
try {
router.sendQueuedData();
} catch (EdexException e) {
theHandler.handle(
Priority.PROBLEM,
"Unable to send notification data to "
+ router.getRoute(), e);
}
}
for (INotificationRouter router : filteredRoutes) {
try {
router.sendQueuedData();
} catch (EdexException e) {
theHandler.handle(
Priority.PROBLEM,
"Unable to send notification data to "
+ router.getRoute(), e);
}
}
return list;
}
}

View file

@ -0,0 +1,224 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.edex.ingest.notification;
import java.util.HashMap;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
import com.raytheon.uf.common.dataquery.requests.RequestConstraint;
import com.raytheon.uf.common.dataquery.requests.RequestableMetadataMarshaller;
/**
* Configuration object for plugin notification. An empty or null metadataMap
* implies it should receive all data.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Nov 19, 2013 2170 rjpeter Initial creation
*
* </pre>
*
* @author rjpeter
* @version 1.0
*/
@XmlAccessorType(XmlAccessType.NONE)
public class PluginNotifierConfig {
public static enum EndpointType {
QUEUE, TOPIC, VM, DIRECTVM
}
public static enum NotifyFormat {
DATAURI, PDO
}
private static final String vmPrefix = "vm:";
private static final String directvmPrefix = "direct-vm:";
private static final String persistentJmsPrefix = "jms-durable:";
private static final String transientJmsPrefix = "jms-generic:";
@XmlElement
protected String endpointName;
@XmlElement
protected EndpointType endpointType;
@XmlElement(required = true)
protected NotifyFormat format;
/**
* Time to live for JMS type endpoints in milliseconds.
*/
@XmlElement
protected int timeToLive = -1;
/**
* If the JMS message is being sent to a durable endpoint.
*/
@XmlElement
protected boolean durable;
/**
* the metadata criteria to retrieve the resource
*/
@XmlElement
@XmlJavaTypeAdapter(value = RequestableMetadataMarshaller.class)
protected HashMap<String, RequestConstraint>[] metadataMap;
protected transient String endpointUri;
public String getEndpointName() {
return endpointName;
}
public void setEndpointName(String endpointName) {
this.endpointName = endpointName;
}
public EndpointType getEndpointType() {
return endpointType;
}
public void setEndpointType(EndpointType endpointType) {
this.endpointType = endpointType;
}
public NotifyFormat getFormat() {
return format;
}
public void setFormat(NotifyFormat format) {
this.format = format;
}
public int getTimeToLive() {
return timeToLive;
}
public void setTimeToLive(int timeToLive) {
this.timeToLive = timeToLive;
}
public boolean isDurable() {
return durable;
}
public void setDurable(boolean durable) {
this.durable = durable;
}
public HashMap<String, RequestConstraint>[] getMetadataMap() {
return metadataMap;
}
public void setMetadataMap(HashMap<String, RequestConstraint>[] metadataMap) {
this.metadataMap = metadataMap;
}
/**
* Generates and returns the endpoint uri for this configuration.
*
* @return
*/
public String getEndpointUri() {
if (endpointUri == null) {
StringBuilder builder = new StringBuilder(64);
switch (endpointType) {
case DIRECTVM:
builder.append(directvmPrefix);
case VM:
builder.append(vmPrefix);
break;
case QUEUE:
case TOPIC:
if (durable) {
builder.append(persistentJmsPrefix);
} else {
builder.append(transientJmsPrefix);
}
builder.append(endpointType.name().toLowerCase()).append(':');
break;
}
builder.append(endpointName);
// for jms endpoints add time to live field
if ((timeToLive > 0)
&& (EndpointType.QUEUE.equals(endpointType) || EndpointType.TOPIC
.equals(endpointType))) {
// append time to live in milliseconds
builder.append("?timeToLive=").append(timeToLive);
}
endpointUri = builder.toString();
}
return endpointUri;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = (prime * result)
+ ((endpointName == null) ? 0 : endpointName.hashCode());
result = (prime * result)
+ ((endpointType == null) ? 0 : endpointType.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
PluginNotifierConfig other = (PluginNotifierConfig) obj;
if (endpointName == null) {
if (other.endpointName != null) {
return false;
}
} else if (!endpointName.equals(other.endpointName)) {
return false;
}
if (endpointType != other.endpointType) {
return false;
}
return true;
}
}

View file

@ -0,0 +1,60 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
*/
package com.raytheon.uf.edex.ingest.notification;
import java.util.List;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlElements;
import javax.xml.bind.annotation.XmlRootElement;
/**
* List of Plugin Notification Configurations.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Nov 19, 2013 2170 rjpeter Initial creation
*
* </pre>
*
* @author rjpeter
* @version 1.0
*/
@XmlRootElement(name = "pluginNotificationList")
@XmlAccessorType(XmlAccessType.NONE)
public class PluginNotifierConfigList {
@XmlElements({ @XmlElement(name = "pluginNotification") })
private List<PluginNotifierConfig> notificationConfigs;
public List<PluginNotifierConfig> getNotificationConfigs() {
return notificationConfigs;
}
public void setNotificationConfigs(
List<PluginNotifierConfig> notificationConfigs) {
this.notificationConfigs = notificationConfigs;
}
}

View file

@ -20,6 +20,7 @@
package com.raytheon.uf.edex.ingest.notification;
import com.raytheon.uf.common.dataplugin.PluginDataObject;
import com.raytheon.uf.common.dataplugin.message.PracticeDataURINotificationMessage;
/**
* Converts PluginDataObjects or arrays of PluginDataObjects into their dataURIs
@ -30,7 +31,7 @@ import com.raytheon.uf.common.dataplugin.PluginDataObject;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Nov 25, 2008 chammack Initial creation
*
* Nov 19, 2013 2170 rjpeter Added toPracticeNotificationMsg.
* </pre>
*
* @author chammack
@ -51,4 +52,12 @@ public class ToDataURI {
public String toDataURI(PluginDataObject pdo) {
return pdo.getDataURI();
}
public PracticeDataURINotificationMessage toPracticeNotificationMsg(
PluginDataObject[] pdos) {
String[] uris = toDataURI(pdos);
PracticeDataURINotificationMessage rval = new PracticeDataURINotificationMessage();
rval.setDataURIs(uris);
return rval;
}
}

View file

@ -0,0 +1,186 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.edex.ingest.notification.router;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.zip.GZIPOutputStream;
import com.raytheon.uf.common.dataplugin.PluginDataObject;
import com.raytheon.uf.common.dataplugin.message.DataURINotificationMessage;
import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.common.serialization.SerializationUtil;
import com.raytheon.uf.common.util.ByteArrayOutputStreamPool;
import com.raytheon.uf.edex.core.EDEXUtil;
import com.raytheon.uf.edex.core.EdexException;
import com.raytheon.uf.edex.ingest.notification.PluginNotifierConfig;
import com.raytheon.uf.edex.ingest.notification.PluginNotifierConfig.EndpointType;
/**
* Routes DataUri Notifications to a destination uri. Notification msg will be
* sent immediately to the routes for inner jvm calls, routes over jms will be
* queued and sent in gzipped batches.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Nov 19, 2013 2170 rjpeter Initial creation
*
* </pre>
*
* @author rjpeter
* @version 1.0
*/
public class DataUriRouter implements INotificationRouter {
/**
* Buffer size.
*/
private static final int GZIP_BUFFER_SIZE = 4096;
/**
* Data URIs that have not been sent.
*/
private final ConcurrentLinkedQueue<String> uris = new ConcurrentLinkedQueue<String>();
/**
* Flag if this route stays in the jvm.
*/
private final boolean isInternal;
/**
* The destination URI for this router.
*/
private final String route;
/**
*
* @param config
*/
public DataUriRouter(PluginNotifierConfig config) {
EndpointType type = config.getEndpointType();
isInternal = EndpointType.DIRECTVM.equals(type)
|| EndpointType.VM.equals(type);
route = config.getEndpointUri();
}
@Override
public String getRoute() {
return route;
}
@Override
public void process(PluginDataObject pdo) {
uris.add(pdo.getDataURI());
}
/**
* Creates a DataURINotificationMessage.
*
* @return
*/
protected synchronized DataURINotificationMessage createMessage() {
DataURINotificationMessage msg = null;
// this is the only point that uris is reduced, safe to grab current
// size and dequeue that many items
int size = uris.size();
if (size > 0) {
String[] data = new String[size];
for (int i = 0; i < size; i++) {
data[i] = uris.poll();
}
msg = new DataURINotificationMessage();
msg.setDataURIs(data);
}
return msg;
}
@Override
public void sendImmediateData() throws EdexException {
// if sending inside jvm, create message and send immediately as the
// memory object
if (isInternal) {
DataURINotificationMessage msg = createMessage();
if (msg != null) {
EDEXUtil.getMessageProducer().sendAsyncUri(route, msg);
}
}
}
@Override
public void sendQueuedData() throws EdexException {
if (!isInternal) {
// if sending outside the jvm, create message, serialize, gzip, and
// then send
DataURINotificationMessage msg = createMessage();
if (msg != null) {
EDEXUtil.getMessageProducer().sendAsyncUri(route,
encodeMessage(msg));
}
}
}
/**
* Thrift encodes an object and then gzip's the binary data. Should only be
* used for sending data outside the jvm.
*
* @param msg
* @return
* @throws SerializationException
*/
private byte[] encodeMessage(DataURINotificationMessage msg)
throws EdexException {
ByteArrayOutputStream baos = ByteArrayOutputStreamPool.getInstance()
.getStream();
GZIPOutputStream gzippedURIs = null;
try {
gzippedURIs = new GZIPOutputStream(baos, GZIP_BUFFER_SIZE, true);
} catch (IOException e) {
throw new EdexException(
"Failed to prepare the gzipped data stream", e);
}
try {
SerializationUtil.transformToThriftUsingStream(msg, gzippedURIs);
gzippedURIs.finish();
gzippedURIs.flush();
return baos.toByteArray();
} catch (IOException e) {
throw new EdexException("Failed to write the gzipped data stream",
e);
} catch (SerializationException e) {
throw new EdexException(
"Failed to serialize DataURINotificationMessage", e);
} finally {
try {
gzippedURIs.close();
} catch (IOException e) {
// ignore, we no longer need the stream
}
}
}
}

View file

@ -0,0 +1,76 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.edex.ingest.notification.router;
import com.raytheon.uf.common.dataplugin.PluginDataObject;
import com.raytheon.uf.edex.core.EdexException;
/**
* Notification router for a specific destination and format.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Nov 19, 2013 2170 rjpeter Initial creation
*
* </pre>
*
* @author rjpeter
* @version 1.0
*/
public interface INotificationRouter {
/**
* The destination for this router.
*
* @return
*/
public String getRoute();
/**
* Process the pdo into notification format.
*
* @param pdo
*/
public void process(PluginDataObject pdo);
/**
* Send data that should be sent immediately to route. This is mainly used
* for PDO data that we don't want to build up and send as a bulk message.
* Future enhancement could allow for priority data such as warnings and
* radar data to always be notified immediately instead of a timer driver
* basis.
*
* @throws EdexException
*/
public void sendImmediateData() throws EdexException;
/**
* Send any queued data to route. Generally used for data uri notifications
* being sent over JMS to allow for better bundling. The data is queued in
* memory and sent out as a bundled message. The interval is defined by the
* notification timer in persist-ingest.xml.
*
* @throws EdexException
*/
public void sendQueuedData() throws EdexException;
}

View file

@ -0,0 +1,94 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.edex.ingest.notification.router;
import java.util.LinkedList;
import java.util.List;
import com.raytheon.uf.common.dataplugin.PluginDataObject;
import com.raytheon.uf.edex.core.EDEXUtil;
import com.raytheon.uf.edex.core.EdexException;
import com.raytheon.uf.edex.ingest.notification.PluginNotifierConfig;
/**
* Routes pdos to a destination. Should only be used for destination inside the
* jvm.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Nov 19, 2013 2170 rjpeter Initial creation
*
* </pre>
*
* @author rjpeter
* @version 1.0
*/
public class PdoRouter implements INotificationRouter {
/**
* Destination URI for this router.
*/
private final String route;
/**
* PDO's in use by current thread. Since all PDO data is sent immediately
* handles concurrency via thread local instance.
*/
private final ThreadLocal<List<PluginDataObject>> myPdos = new ThreadLocal<List<PluginDataObject>>() {
@Override
protected List<PluginDataObject> initialValue() {
return new LinkedList<PluginDataObject>();
}
};
public PdoRouter(PluginNotifierConfig config) {
this.route = config.getEndpointUri();
}
@Override
public String getRoute() {
return route;
}
@Override
public void process(PluginDataObject pdo) {
myPdos.get().add(pdo);
}
@Override
public void sendImmediateData() throws EdexException {
List<PluginDataObject> pdos = myPdos.get();
myPdos.remove();
if (pdos.size() > 0) {
EDEXUtil.getMessageProducer().sendAsyncUri(route, pdos);
}
}
@Override
public void sendQueuedData() throws EdexException {
// NOOP all data sent immediately
}
}

View file

@ -0,0 +1,11 @@
<pluginNotificationList>
<pluginNotification>
<endpointName>edex.alerts</endpointName>
<format>DATAURI</format>
<endpointType>TOPIC</endpointType>
<durable>false</durable>
<timeToLive>120000</timeToLive>
<metadataMap>
</metadataMap>
</pluginNotification>
</pluginNotificationList>

View file

@ -12,7 +12,8 @@
errorHandlerRef="errorHandler"
autoStartup="false">
<route id="satPreIngestRoute">
<from uri="jms-generic:topic:edex.alerts"/>
<!-- Fed via notification -->
<from uri="jms-generic:queue:satPreFilter"/>
<doTry>
<pipeline>
<bean ref="serializationUtil" method="transformFromThrift" />

View file

@ -0,0 +1,17 @@
<pluginNotificationList>
<pluginNotification>
<endpointName>satPreFilter</endpointName>
<format>DATAURI</format>
<endpointType>QUEUE</endpointType>
<durable>true</durable>
<timeToLive>600000</timeToLive>
<metadataMap>
<mapping key="pluginName">
<constraint constraintValue="grid" constraintType="EQUALS"/>
</mapping>
<mapping key="info.datasetId">
<constraint constraintValue="AUTOSPE" constraintType="EQUALS"/>
</mapping>
</metadataMap>
</pluginNotification>
</pluginNotificationList>

View file

@ -0,0 +1,15 @@
<pluginNotificationList>
<!-- TODO: Run time generate to only listen to the specific data scan is interested in -->
<pluginNotification>
<endpointName>scanCpgsrvFiltering</endpointName>
<format>DATAURI</format>
<endpointType>QUEUE</endpointType>
<durable>true</durable>
<timeToLive>120000</timeToLive>
<metadataMap>
<mapping key="pluginName">
<constraint constraintValue="grid, radar, bufrua, binlightning" constraintType="IN"/>
</mapping>
</metadataMap>
</pluginNotification>
</pluginNotificationList>