Issue #2170: Update Plugin Notification Framework
Change-Id: I2ec99fadbe2f383bb0892c59601350395f1e5277 Former-commit-id:f2d99af20e
[formerly5905c5d08d
] [formerly2085a9cd33
] [formerlyf2d99af20e
[formerly5905c5d08d
] [formerly2085a9cd33
] [formerlyfbda83aeb9
[formerly2085a9cd33
[formerly 081a19d5cfca3d717a332357dbaead199767e0b8]]]] Former-commit-id:fbda83aeb9
Former-commit-id:14ed9331d3
[formerly670e63d0dd
] [formerly 92cc0eed5d5ca22d3b43329858c1619c9363e66a [formerlyf7f9a4e57c
]] Former-commit-id: 5edc348b297d22aeb20b7b1cbe81e90329464d25 [formerlyff5fbcf012
] Former-commit-id:90e23f5c6b
This commit is contained in:
parent
3804cdf794
commit
052119fffe
34 changed files with 1249 additions and 344 deletions
|
@ -30,16 +30,6 @@
|
|||
<constructor-arg ref="smartInitSrvCfg"/>
|
||||
</bean>
|
||||
|
||||
<bean factory-bean="pluginNotifier" factory-method="register">
|
||||
<constructor-arg value="warning" />
|
||||
<constructor-arg value="jms-warning:queue:edex.spcWatch" />
|
||||
</bean>
|
||||
|
||||
<bean factory-bean="pluginNotifier" factory-method="register">
|
||||
<constructor-arg value="warning" />
|
||||
<constructor-arg value="jms-warning:queue:edex.tpcWatch" />
|
||||
</bean>
|
||||
|
||||
<bean id="spcWatch" class="com.raytheon.edex.plugin.gfe.spc.SPCWatchSrv"/>
|
||||
<bean id="tpcWatch" class="com.raytheon.edex.plugin.gfe.tpc.TPCWatchSrv"/>
|
||||
<bean id="wclWatch" class="com.raytheon.edex.plugin.gfe.wcl.WCLWatchSrv"/>
|
||||
|
@ -48,7 +38,7 @@
|
|||
|
||||
<camelContext id="gfe-camel-spring" xmlns="http://camel.apache.org/schema/spring" errorHandlerRef="errorHandler">
|
||||
<route id="SPCWatch">
|
||||
<from uri="jms-durable:queue:edex.spcWatch"/>
|
||||
<from uri="vm:edex.spcWatch"/>
|
||||
<doTry>
|
||||
<bean ref="spcWatch" method="handleSpcWatch"/>
|
||||
<doCatch>
|
||||
|
@ -60,7 +50,7 @@
|
|||
</route>
|
||||
|
||||
<route id="TPCWatch">
|
||||
<from uri="jms-durable:queue:edex.tpcWatch"/>
|
||||
<from uri="vm:edex.tpcWatch"/>
|
||||
<doTry>
|
||||
<bean ref="tpcWatch" method="handleTpcWatch"/>
|
||||
<doCatch>
|
||||
|
@ -82,6 +72,7 @@
|
|||
</route>
|
||||
|
||||
<route id="gfeIngestNotification">
|
||||
<!-- Data from plugin notification -->
|
||||
<from
|
||||
uri="jms-durable:queue:gfeDataURINotification"/>
|
||||
<doTry>
|
||||
|
@ -141,18 +132,6 @@
|
|||
</route>
|
||||
|
||||
<!-- Convert the topic into a queue so only one consumer gets each message and we still have competing consumers. -->
|
||||
<route id="gfeDataURINotificationQueueRoute">
|
||||
<from
|
||||
uri="jms-generic:topic:edex.alerts"/>
|
||||
<doTry>
|
||||
<to uri="jms-durable:queue:gfeDataURINotification"/>
|
||||
<doCatch>
|
||||
<exception>java.lang.Throwable</exception>
|
||||
<to
|
||||
uri="log:ifpServer?level=ERROR"/>
|
||||
</doCatch>
|
||||
</doTry>
|
||||
</route>
|
||||
</camelContext>
|
||||
|
||||
<bean factory-bean="clusteredCamelContextMgr" factory-method="register">
|
||||
|
|
|
@ -62,7 +62,8 @@ public class SPCWatchSrv {
|
|||
|
||||
protected transient Log logger = LogFactory.getLog(getClass());
|
||||
|
||||
public void handleSpcWatch(PluginDataObject[] pdos) throws EdexException {
|
||||
public void handleSpcWatch(List<PluginDataObject> pdos)
|
||||
throws EdexException {
|
||||
// create the appropriate SPC notification, returns null if not
|
||||
// needed.
|
||||
EnvProperties env = PropertiesFactory.getInstance().getEnvProperties();
|
||||
|
|
|
@ -92,7 +92,8 @@ public class TPCWatchSrv {
|
|||
|
||||
protected transient Log logger = LogFactory.getLog(getClass());
|
||||
|
||||
public void handleTpcWatch(PluginDataObject[] pdos) throws EdexException {
|
||||
public void handleTpcWatch(List<PluginDataObject> pdos)
|
||||
throws EdexException {
|
||||
|
||||
EnvProperties env = PropertiesFactory.getInstance().getEnvProperties();
|
||||
String primarySite = env.getEnvValue("SITENAME");
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
<pluginNotificationList>
|
||||
<!-- TODO: Run time generate to only listen to the models/satellites GFE is interested in -->
|
||||
<pluginNotification>
|
||||
<endpointName>gfeDataURINotification</endpointName>
|
||||
<format>DATAURI</format>
|
||||
<endpointType>QUEUE</endpointType>
|
||||
<durable>true</durable>
|
||||
<timeToLive>600000</timeToLive>
|
||||
<metadataMap>
|
||||
<mapping key="pluginName">
|
||||
<constraint constraintValue="grid" constraintType="EQUALS"/>
|
||||
</mapping>
|
||||
</metadataMap>
|
||||
<metadataMap>
|
||||
<mapping key="pluginName">
|
||||
<constraint constraintValue="satellite" constraintType="EQUALS"/>
|
||||
</mapping>
|
||||
</metadataMap>
|
||||
</pluginNotification>
|
||||
</pluginNotificationList>
|
|
@ -0,0 +1,22 @@
|
|||
<pluginNotificationList>
|
||||
<pluginNotification>
|
||||
<endpointName>gfe.spcWatch</endpointName>
|
||||
<format>PDO</format>
|
||||
<endpointType>VM</endpointType>
|
||||
<metadataMap>
|
||||
<mapping key="pluginName">
|
||||
<constraint constraintValue="warning" constraintType="EQUALS"/>
|
||||
</mapping>
|
||||
</metadataMap>
|
||||
</pluginNotification>
|
||||
<pluginNotification>
|
||||
<endpointName>gfe.tpcWatch</endpointName>
|
||||
<format>PDO</format>
|
||||
<endpointType>VM</endpointType>
|
||||
<metadataMap>
|
||||
<mapping key="pluginName">
|
||||
<constraint constraintValue="warning" constraintType="EQUALS"/>
|
||||
</mapping>
|
||||
</metadataMap>
|
||||
</pluginNotification>
|
||||
</pluginNotificationList>
|
|
@ -227,7 +227,7 @@ public class EnsembleGridAssembler implements IDecoderPostProcessor {
|
|||
updateExistingRecord(record, assembledRecord, thinned, dao);
|
||||
}
|
||||
EDEXUtil.getMessageProducer().sendAsync("notificationAggregation",
|
||||
new String[] { assembledRecord.getDataURI() });
|
||||
record);
|
||||
}
|
||||
|
||||
private GridRecord createAssembledRecord(GridRecord record,
|
||||
|
|
|
@ -74,8 +74,7 @@
|
|||
<from uri="direct-vm:modelSoundingPersistIndexAlert"/>
|
||||
<bean ref="persist" method="persist"/>
|
||||
<bean ref="index" method="index"/>
|
||||
<bean ref="toDataURI" method="toDataURI"/>
|
||||
<to uri="vm:stageNotification"/>
|
||||
<to uri="direct-vm:stageNotification"/>
|
||||
</route>
|
||||
</camelContext>
|
||||
</beans>
|
||||
|
|
|
@ -65,9 +65,9 @@
|
|||
<bean ref="stringToFile" />
|
||||
<bean ref="warningDecoder" method="decode" />
|
||||
<bean ref="index" method="index" />
|
||||
<bean ref="processUtil" method="log" />
|
||||
<multicast parallelProcessing="false">
|
||||
<to uri="direct-vm:warningIngestAlert" />
|
||||
<bean ref="pluginNotifier" method="send"/>
|
||||
<filter>
|
||||
<method bean="vtecFilter" method="hasVTEC" />
|
||||
<to uri="jms-warning:queue:activeTablePending"/>
|
||||
|
@ -83,9 +83,7 @@
|
|||
|
||||
<route id="warningIngestAlert">
|
||||
<from uri="direct-vm:warningIngestAlert" />
|
||||
<bean ref="toDataURI" method="toDataURI" />
|
||||
<bean ref="processUtil" method="log" />
|
||||
<to uri="vm:stageNotification" />
|
||||
<to uri="direct-vm:stageNotification" />
|
||||
</route>
|
||||
</camelContext>
|
||||
</beans>
|
|
@ -35,7 +35,6 @@
|
|||
value="com.raytheon.uf.common.dataplugin.warning.WarningRecord" />
|
||||
</bean>
|
||||
<bean id="practiceProductOfftimeHandler" class="com.raytheon.uf.edex.activetable.PracticeProductOfftimeHandler"/>
|
||||
<bean id="uriAggregator" class="com.raytheon.uf.edex.ingest.notification.DataUriAggregator" />
|
||||
<bean id="toDataURI" class="com.raytheon.uf.edex.ingest.notification.ToDataURI" />
|
||||
<camelContext id="activetable-camel"
|
||||
xmlns="http://camel.apache.org/schema/spring"
|
||||
|
@ -54,9 +53,8 @@
|
|||
<filter>
|
||||
<method bean="vtecFilter" method="hasVTEC" />
|
||||
<bean ref="activeTableSrv" method="practiceVtecArrived" />
|
||||
<bean ref="toDataURI" method="toDataURI"/>
|
||||
<bean ref="uriAggregator" method="addDataUris" />
|
||||
<bean ref="uriAggregator" method="sendPracticeQueuedUris" />
|
||||
<bean ref="toDataURI" method="toPracticeNotificationMsg"/>
|
||||
<bean ref="serializationUtil" method="transformToThrift" />
|
||||
<to uri="jms-generic:topic:edex.alerts.practicewarning?timeToLive=60000&deliveryPersistent=false"/>
|
||||
</filter>
|
||||
<doCatch>
|
||||
|
|
|
@ -2,4 +2,5 @@ source.. = src/
|
|||
output.. = bin/
|
||||
bin.includes = META-INF/,\
|
||||
.,\
|
||||
res/
|
||||
res/,\
|
||||
utility/
|
||||
|
|
|
@ -16,32 +16,15 @@
|
|||
autoStartup="false">
|
||||
<route id="cpgProcessAlerts">
|
||||
<from uri="direct-vm:processCPGAlerts"/>
|
||||
<bean ref="toDataURI" method="toDataURI"/>
|
||||
<to uri="vm:stageNotification"/>
|
||||
<to uri="direct-vm:stageNotification"/>
|
||||
</route>
|
||||
</camelContext>
|
||||
<camelContext id="clusteredCpgSrvRoutes"
|
||||
xmlns="http://camel.apache.org/schema/spring"
|
||||
errorHandlerRef="errorHandler"
|
||||
autoStartup="false">
|
||||
<!-- Need to reroute to a queue to allow for multiple jms consumers -->
|
||||
<route id="cpgsrvTopicToQueueRoute">
|
||||
<from uri="jms-generic:topic:edex.alerts"/>
|
||||
<!-- technically with qpid should be able to make this a durable subscription and not need to forward to another queue -->
|
||||
<doTry>
|
||||
<multicast>
|
||||
<to uri="jms-durable:queue:cpgsrvFiltering"/>
|
||||
<to uri="jms-durable:queue:scanCpgsrvFiltering"/>
|
||||
<to uri="jms-durable:queue:ffmpCpgsrvFiltering"/>
|
||||
</multicast>
|
||||
<doCatch>
|
||||
<exception>java.lang.Throwable</exception>
|
||||
<to uri="log:cpgSrv?level=ERROR"/>
|
||||
</doCatch>
|
||||
</doTry>
|
||||
</route>
|
||||
<route id="cpgsrvListenerRoute">
|
||||
<!-- technically with qpid should be able to make this a durable subscription and not need to forward to another queue -->
|
||||
<!-- Data from plugin notification -->
|
||||
<from uri="jms-durable:queue:cpgsrvFiltering?concurrentConsumers=5"/>
|
||||
<doTry>
|
||||
<pipeline>
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
<pluginNotificationList>
|
||||
<!-- TODO: Run time generate to only listen to the radars/models DAT is interested in -->
|
||||
<pluginNotification>
|
||||
<endpointName>cpgsrvFiltering</endpointName>
|
||||
<format>DATAURI</format>
|
||||
<endpointType>QUEUE</endpointType>
|
||||
<durable>true</durable>
|
||||
<timeToLive>120000</timeToLive>
|
||||
<metadataMap>
|
||||
<mapping key="pluginName">
|
||||
<constraint constraintValue="grid, radar, qpf, obs, sfcobs, ldadmesonet, satellite" constraintType="IN"/>
|
||||
</mapping>
|
||||
</metadataMap>
|
||||
</pluginNotification>
|
||||
</pluginNotificationList>
|
|
@ -88,8 +88,7 @@
|
|||
|
||||
<route id="dataDeliveryNotify">
|
||||
<from uri="direct-vm:dataDeliveryNotify" />
|
||||
<bean ref="toDataURI" method="toDataURI" />
|
||||
<to uri="vm:stageNotification" />
|
||||
<to uri="direct-vm:stageNotification" />
|
||||
</route>
|
||||
</camelContext>
|
||||
|
||||
|
|
|
@ -2,4 +2,5 @@ source.. = src/
|
|||
output.. = bin/
|
||||
bin.includes = META-INF/,\
|
||||
.,\
|
||||
res/
|
||||
res/,\
|
||||
utility/
|
|
@ -8,27 +8,20 @@
|
|||
factory-method="getInstance" />
|
||||
|
||||
<camelContext id="grid-staticdata-process" xmlns="http://camel.apache.org/schema/spring"
|
||||
errorHandlerRef="errorHandler" autoStartup="false">
|
||||
errorHandlerRef="errorHandler">
|
||||
|
||||
<!-- Begin Grid Process Route -->
|
||||
<route id="gridTopoNotificationRoute">
|
||||
<from uri="jms-generic:topic:edex.alerts" />
|
||||
<route id="gridStaticDataGenerationRoute">
|
||||
<!-- Fed from plugin notification -->
|
||||
<from uri="vm:grid-staticdata-generate" />
|
||||
<doTry>
|
||||
<bean ref="serializationUtil" method="transformFromThrift" />
|
||||
<bean ref="staticDataGenerator" method="processNotification"/>
|
||||
<bean ref="toDataURI" method="toDataURI"/>
|
||||
<to uri="vm:stageNotification"/>
|
||||
<to uri="direct-vm:stageNotification"/>
|
||||
<doCatch>
|
||||
<exception>java.lang.Throwable</exception>
|
||||
<to
|
||||
uri="log:grid-staticdata?level=ERROR" />
|
||||
<to uri="log:grid-staticdata?level=ERROR" />
|
||||
</doCatch>
|
||||
</doTry>
|
||||
</route>
|
||||
</camelContext>
|
||||
|
||||
<bean id="gridStaticDataProcessCamelRegistered" factory-bean="clusteredCamelContextMgr"
|
||||
factory-method="register">
|
||||
<constructor-arg ref="grid-staticdata-process" />
|
||||
</bean>
|
||||
</beans>
|
||||
|
|
|
@ -108,7 +108,7 @@ public class StaticDataGenerator {
|
|||
* whether static data has been created to avoid too many trips to db to
|
||||
* check.
|
||||
*/
|
||||
private Map<CacheKey, CacheKey> dataGeneratedCache = Collections
|
||||
private final Map<CacheKey, CacheKey> dataGeneratedCache = Collections
|
||||
.synchronizedMap(new LinkedHashMap<CacheKey, CacheKey>(
|
||||
(int) (CACHE_SIZE / 0.75f) + 1, 0.75f, true) {
|
||||
|
||||
|
@ -138,8 +138,9 @@ public class StaticDataGenerator {
|
|||
*/
|
||||
public GridRecord[] processNotification(DataURINotificationMessage msg)
|
||||
throws Exception {
|
||||
Set<GridRecord> staticRecords = new HashSet<GridRecord>();
|
||||
for (String dataURI : msg.getDataURIs()) {
|
||||
String[] uris = msg.getDataURIs();
|
||||
Set<GridRecord> staticRecords = new HashSet<GridRecord>(uris.length, 1);
|
||||
for (String dataURI : uris) {
|
||||
if (dataURI.startsWith("/grid/")) {
|
||||
try {
|
||||
GridRecord record = new GridRecord(dataURI);
|
||||
|
@ -376,8 +377,6 @@ public class StaticDataGenerator {
|
|||
staticRecord.setLevel(LevelFactory.getInstance().getLevel("Dflt", 0,
|
||||
"m"));
|
||||
staticRecord.setDataTime(dataTime);
|
||||
|
||||
staticRecord.constructDataURI();
|
||||
return staticRecord;
|
||||
|
||||
}
|
||||
|
@ -458,38 +457,47 @@ public class StaticDataGenerator {
|
|||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = 1;
|
||||
result = prime * result + coverageid;
|
||||
result = prime * result
|
||||
result = (prime * result) + coverageid;
|
||||
result = (prime * result)
|
||||
+ ((datasetid == null) ? 0 : datasetid.hashCode());
|
||||
result = prime * result + forecastTime;
|
||||
result = prime * result
|
||||
result = (prime * result) + forecastTime;
|
||||
result = (prime * result)
|
||||
+ ((refTime == null) ? 0 : refTime.hashCode());
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj)
|
||||
if (this == obj) {
|
||||
return true;
|
||||
if (obj == null)
|
||||
}
|
||||
if (obj == null) {
|
||||
return false;
|
||||
if (getClass() != obj.getClass())
|
||||
}
|
||||
if (getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
CacheKey other = (CacheKey) obj;
|
||||
if (coverageid != other.coverageid)
|
||||
if (coverageid != other.coverageid) {
|
||||
return false;
|
||||
}
|
||||
if (datasetid == null) {
|
||||
if (other.datasetid != null)
|
||||
if (other.datasetid != null) {
|
||||
return false;
|
||||
} else if (!datasetid.equals(other.datasetid))
|
||||
}
|
||||
} else if (!datasetid.equals(other.datasetid)) {
|
||||
return false;
|
||||
if (forecastTime != other.forecastTime)
|
||||
}
|
||||
if (forecastTime != other.forecastTime) {
|
||||
return false;
|
||||
}
|
||||
if (refTime == null) {
|
||||
if (other.refTime != null)
|
||||
if (other.refTime != null) {
|
||||
return false;
|
||||
} else if (!refTime.equals(other.refTime))
|
||||
}
|
||||
} else if (!refTime.equals(other.refTime)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
<pluginNotificationList>
|
||||
<pluginNotification>
|
||||
<endpointName>grid-staticdata-generate</endpointName>
|
||||
<endpointType>VM</endpointType>
|
||||
<format>DATAURI</format>
|
||||
<metadataMap>
|
||||
<mapping key="pluginName">
|
||||
<constraint constraintValue="grid" constraintType="EQUALS"/>
|
||||
</mapping>
|
||||
</metadataMap>
|
||||
</pluginNotification>
|
||||
</pluginNotificationList>
|
|
@ -5,6 +5,7 @@ Bundle-SymbolicName: com.raytheon.uf.edex.ingest
|
|||
Bundle-Version: 1.12.1174.qualifier
|
||||
Bundle-Vendor: RAYTHEON
|
||||
Require-Bundle: com.raytheon.edex.common,
|
||||
com.raytheon.uf.common.localization,
|
||||
com.google.guava,
|
||||
org.apache.camel,
|
||||
org.apache.commons.lang,
|
||||
|
|
|
@ -2,4 +2,5 @@ source.. = src/
|
|||
output.. = bin/
|
||||
bin.includes = META-INF/,\
|
||||
.,\
|
||||
res/
|
||||
res/,\
|
||||
utility/
|
||||
|
|
Binary file not shown.
|
@ -6,10 +6,6 @@
|
|||
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
|
||||
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">
|
||||
|
||||
|
||||
<bean id="uriAggregator" class="com.raytheon.uf.edex.ingest.notification.DataUriAggregator" />
|
||||
<bean id="toDataURI" class="com.raytheon.uf.edex.ingest.notification.ToDataURI" />
|
||||
|
||||
<bean id="persist" class="com.raytheon.uf.edex.ingest.PersistSrv" factory-method="getInstance"/>
|
||||
<bean id="index" class="com.raytheon.uf.edex.ingest.IndexSrv"/>
|
||||
|
||||
|
@ -21,7 +17,6 @@
|
|||
</bean>
|
||||
|
||||
<camelContext id="persist-camel" xmlns="http://camel.apache.org/schema/spring" errorHandlerRef="errorHandler">
|
||||
|
||||
<!-- Generic persist and indexing
|
||||
Intended for routes that need persisting to HDF5,
|
||||
Indexing but no alert processing
|
||||
|
@ -42,8 +37,7 @@
|
|||
<bean ref="persist" method="persist"/>
|
||||
<bean ref="index" method="index"/>
|
||||
<bean ref="processUtil" method="log"/>
|
||||
<bean ref="toDataURI" method="toDataURI"/>
|
||||
<to uri="vm:stageNotification"/>
|
||||
<to uri="direct-vm:stageNotification"/>
|
||||
</route>
|
||||
|
||||
<!-- Generic index and alert route
|
||||
|
@ -53,22 +47,17 @@
|
|||
<from uri="direct-vm:indexAlert"/>
|
||||
<bean ref="index" method="index"/>
|
||||
<bean ref="processUtil" method="log"/>
|
||||
<bean ref="toDataURI" method="toDataURI"/>
|
||||
<to uri="vm:stageNotification"/>
|
||||
<to uri="direct-vm:stageNotification"/>
|
||||
</route>
|
||||
|
||||
<route id="notificationAggregation">
|
||||
<from uri="vm:stageNotification"/>
|
||||
<bean ref="uriAggregator" method="addDataUris" />
|
||||
<from uri="direct-vm:stageNotification"/>
|
||||
<bean ref="pluginNotifier" method="notify" />
|
||||
</route>
|
||||
|
||||
<route id="notificationTimer">
|
||||
<from uri="timer://notificationTimer?fixedRate=true&period=1000" />
|
||||
<filter>
|
||||
<method bean="uriAggregator" method="hasUris" />
|
||||
<bean ref="uriAggregator" method="sendQueuedUris" />
|
||||
<to uri="jms-generic:topic:edex.alerts?timeToLive=60000"/>
|
||||
</filter>
|
||||
<bean ref="pluginNotifier" method="sendQueuedNotifications" />
|
||||
</route>
|
||||
</camelContext>
|
||||
</beans>
|
||||
|
|
|
@ -1,146 +0,0 @@
|
|||
/**
|
||||
* This software was developed and / or modified by Raytheon Company,
|
||||
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
||||
*
|
||||
* U.S. EXPORT CONTROLLED TECHNICAL DATA
|
||||
* This software product contains export-restricted data whose
|
||||
* export/transfer/disclosure is restricted by U.S. law. Dissemination
|
||||
* to non-U.S. persons whether in the United States or abroad requires
|
||||
* an export license or other authorization.
|
||||
*
|
||||
* Contractor Name: Raytheon Company
|
||||
* Contractor Address: 6825 Pine Street, Suite 340
|
||||
* Mail Stop B8
|
||||
* Omaha, NE 68106
|
||||
* 402.291.0100
|
||||
*
|
||||
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||
* further licensing information.
|
||||
**/
|
||||
package com.raytheon.uf.edex.ingest.notification;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.zip.GZIPOutputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
|
||||
import com.raytheon.uf.common.serialization.ISerializableObject;
|
||||
import com.raytheon.uf.common.serialization.SerializationException;
|
||||
import com.raytheon.uf.common.serialization.SerializationUtil;
|
||||
import com.raytheon.uf.common.dataplugin.message.DataURINotificationMessage;
|
||||
import com.raytheon.uf.common.dataplugin.message.PracticeDataURINotificationMessage;
|
||||
import com.raytheon.uf.common.util.ByteArrayOutputStreamPool;
|
||||
|
||||
/**
|
||||
* Combines multiple messages of URIs into a single message that can be
|
||||
* triggered
|
||||
*
|
||||
* <pre>
|
||||
* SOFTWARE HISTORY
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Dec 10, 2008 njensen Initial creation
|
||||
* Feb 15, 2013 1638 mschenke Moved DataURINotificationMessage to uf.common.dataplugin
|
||||
* Aug 16, 2013 2169 bkowal gzip data uris
|
||||
* </pre>
|
||||
*
|
||||
* @author njensen
|
||||
* @version 1.0
|
||||
*/
|
||||
|
||||
public class DataUriAggregator {
|
||||
|
||||
private List<String> dataUris = new ArrayList<String>();
|
||||
|
||||
/**
|
||||
* Add data uris to the queue
|
||||
*
|
||||
* @param uris
|
||||
*/
|
||||
public void addDataUris(String[] uris) {
|
||||
synchronized (this) {
|
||||
for (String uri : uris) {
|
||||
if (uri != null) {
|
||||
dataUris.add(uri);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Filters quartz messages to only send a message if there are URIs to send.
|
||||
*
|
||||
* @param obj
|
||||
* @return
|
||||
*/
|
||||
public boolean hasUris(Object obj) {
|
||||
synchronized (this) {
|
||||
return dataUris.size() > 0;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send off the currently queued uris
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public byte[] sendQueuedUris() throws SerializationException {
|
||||
DataURINotificationMessage msg = null;
|
||||
synchronized (this) {
|
||||
String[] uris = dataUris.toArray(new String[dataUris.size()]);
|
||||
dataUris.clear();
|
||||
msg = new DataURINotificationMessage();
|
||||
msg.setDataURIs(uris);
|
||||
}
|
||||
|
||||
return this.encodeMessage(msg);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send off the currently queued uris
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public byte[] sendPracticeQueuedUris() throws SerializationException {
|
||||
PracticeDataURINotificationMessage msg = null;
|
||||
synchronized (this) {
|
||||
String[] uris = dataUris.toArray(new String[dataUris.size()]);
|
||||
dataUris.clear();
|
||||
msg = new PracticeDataURINotificationMessage();
|
||||
msg.setDataURIs(uris);
|
||||
}
|
||||
|
||||
return this.encodeMessage(msg);
|
||||
}
|
||||
|
||||
public byte[] encodeMessage(ISerializableObject msg)
|
||||
throws SerializationException {
|
||||
ByteArrayOutputStream baos = ByteArrayOutputStreamPool.getInstance()
|
||||
.getStream();
|
||||
GZIPOutputStream gzippedURIs = null;
|
||||
|
||||
try {
|
||||
gzippedURIs = new GZIPOutputStream(baos);
|
||||
} catch (IOException e) {
|
||||
throw new SerializationException(
|
||||
"Failed to prepare the gzipped data stream!", e);
|
||||
}
|
||||
|
||||
SerializationUtil.transformToThriftUsingStream(msg, gzippedURIs);
|
||||
try {
|
||||
gzippedURIs.finish();
|
||||
gzippedURIs.flush();
|
||||
return baos.toByteArray();
|
||||
} catch (IOException e) {
|
||||
throw new SerializationException(
|
||||
"Failed to write the gzipped data stream!", e);
|
||||
} finally {
|
||||
try {
|
||||
gzippedURIs.close();
|
||||
} catch (IOException e) {
|
||||
// ignore, we no longer need the stream
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
/**
|
||||
* This software was developed and / or modified by Raytheon Company,
|
||||
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
||||
*
|
||||
* U.S. EXPORT CONTROLLED TECHNICAL DATA
|
||||
* This software product contains export-restricted data whose
|
||||
* export/transfer/disclosure is restricted by U.S. law. Dissemination
|
||||
* to non-U.S. persons whether in the United States or abroad requires
|
||||
* an export license or other authorization.
|
||||
*
|
||||
* Contractor Name: Raytheon Company
|
||||
* Contractor Address: 6825 Pine Street, Suite 340
|
||||
* Mail Stop B8
|
||||
* Omaha, NE 68106
|
||||
* 402.291.0100
|
||||
*
|
||||
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||
* further licensing information.
|
||||
*/
|
||||
package com.raytheon.uf.edex.ingest.notification;
|
||||
|
||||
/**
|
||||
* Thrown if a configuration is invalid.
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
* SOFTWARE HISTORY
|
||||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Nov 19, 2013 2170 rjpeter Initial creation
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
* @author rjpeter
|
||||
* @version 1.0
|
||||
*/
|
||||
public class InvalidNotificationConfigException extends Exception {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public InvalidNotificationConfigException() {
|
||||
super();
|
||||
}
|
||||
|
||||
public InvalidNotificationConfigException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
public InvalidNotificationConfigException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public InvalidNotificationConfigException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
}
|
|
@ -19,24 +19,47 @@
|
|||
**/
|
||||
package com.raytheon.uf.edex.ingest.notification;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.camel.Exchange;
|
||||
import org.apache.camel.Message;
|
||||
import org.apache.camel.RecipientList;
|
||||
import javax.xml.bind.JAXBException;
|
||||
|
||||
import com.google.common.collect.ArrayListMultimap;
|
||||
import com.google.common.collect.Multimap;
|
||||
import com.raytheon.uf.common.dataplugin.PluginDataObject;
|
||||
import com.raytheon.uf.common.dataplugin.PluginException;
|
||||
import com.raytheon.uf.common.dataplugin.annotations.DataURIUtil;
|
||||
import com.raytheon.uf.common.dataquery.DecisionTree;
|
||||
import com.raytheon.uf.common.dataquery.requests.RequestConstraint;
|
||||
import com.raytheon.uf.common.localization.IPathManager;
|
||||
import com.raytheon.uf.common.localization.LocalizationFile;
|
||||
import com.raytheon.uf.common.localization.PathManagerFactory;
|
||||
import com.raytheon.uf.common.localization.exception.LocalizationException;
|
||||
import com.raytheon.uf.common.serialization.JAXBManager;
|
||||
import com.raytheon.uf.common.serialization.SerializationException;
|
||||
import com.raytheon.uf.common.status.IPerformanceStatusHandler;
|
||||
import com.raytheon.uf.common.status.IUFStatusHandler;
|
||||
import com.raytheon.uf.common.status.PerformanceStatus;
|
||||
import com.raytheon.uf.common.status.UFStatus;
|
||||
import com.raytheon.uf.common.status.UFStatus.Priority;
|
||||
import com.raytheon.uf.edex.core.EDEXUtil;
|
||||
import com.raytheon.uf.common.time.util.ITimer;
|
||||
import com.raytheon.uf.common.time.util.TimeUtil;
|
||||
import com.raytheon.uf.edex.core.EdexException;
|
||||
import com.raytheon.uf.edex.ingest.notification.PluginNotifierConfig.EndpointType;
|
||||
import com.raytheon.uf.edex.ingest.notification.PluginNotifierConfig.NotifyFormat;
|
||||
import com.raytheon.uf.edex.ingest.notification.router.DataUriRouter;
|
||||
import com.raytheon.uf.edex.ingest.notification.router.INotificationRouter;
|
||||
import com.raytheon.uf.edex.ingest.notification.router.PdoRouter;
|
||||
|
||||
/**
|
||||
* Plugins can register routes with this and then be generically fired to. Helps
|
||||
* to reduce dependencies as we no longer need to call a route directly.
|
||||
* to reduce dependencies as we no longer need to call a route directly. All
|
||||
* registration must occur before messages are being picked up. Otherwise
|
||||
* concurrency problems may occur.
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
|
@ -44,8 +67,9 @@ import com.raytheon.uf.edex.core.EdexException;
|
|||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Jun 13, 2013 mnash Initial creation
|
||||
*
|
||||
* Jun 13, 2013 mnash Initial creation.
|
||||
* Nov 19, 2013 2170 rjpeter Add plugin contributed config files, filtering,
|
||||
* and support for pdo vs datauri.
|
||||
* </pre>
|
||||
*
|
||||
* @author mnash
|
||||
|
@ -53,55 +77,311 @@ import com.raytheon.uf.edex.core.EdexException;
|
|||
*/
|
||||
|
||||
public class PluginNotifier {
|
||||
|
||||
private static final IUFStatusHandler theHandler = UFStatus
|
||||
.getHandler(PluginNotifier.class);
|
||||
|
||||
private static final Multimap<String, String> routes = ArrayListMultimap
|
||||
.create();
|
||||
private final IPerformanceStatusHandler perfLog = PerformanceStatus
|
||||
.getHandler("Notification:");
|
||||
|
||||
private static final PluginNotifier distributor = new PluginNotifier();
|
||||
|
||||
private PluginNotifier() {
|
||||
}
|
||||
|
||||
public PluginNotifier getInstance() {
|
||||
return distributor;
|
||||
}
|
||||
private static final int DEFAULT_TIME_TO_LIVE = 300000;
|
||||
|
||||
/**
|
||||
* Normally called from Spring, this allows a set of commands to be
|
||||
* registered for a specific plugin.
|
||||
*
|
||||
* @param name
|
||||
* @param command
|
||||
* @return
|
||||
* Decision tree for plugin notification.
|
||||
*/
|
||||
public PluginNotifier register(String name, String command) {
|
||||
routes.put(name, command);
|
||||
return this;
|
||||
}
|
||||
private final DecisionTree<INotificationRouter> tree = new DecisionTree<INotificationRouter>();
|
||||
|
||||
private final List<INotificationRouter> receiveAllRoutes = new LinkedList<INotificationRouter>();
|
||||
|
||||
private final List<INotificationRouter> filteredRoutes = new LinkedList<INotificationRouter>();
|
||||
|
||||
/**
|
||||
* Takes the pluginName, which is what was registered for, and sends to all
|
||||
* the routes that are registered as that.
|
||||
*
|
||||
* @param exchange
|
||||
* @return
|
||||
* Set of loaded names. Used for duplicate detection.
|
||||
*/
|
||||
@RecipientList
|
||||
public List<String> send(Exchange exchange) {
|
||||
Message in = exchange.getIn();
|
||||
String name = (String) in.getHeader("pluginName");
|
||||
List<String> list = new ArrayList<String>();
|
||||
for (String route : routes.get(name)) {
|
||||
private final Set<String> loadedNames = new HashSet<String>();
|
||||
|
||||
public PluginNotifier() throws JAXBException {
|
||||
loadConfigurations();
|
||||
}
|
||||
|
||||
private synchronized void loadConfigurations() throws JAXBException {
|
||||
JAXBManager mgr = new JAXBManager(PluginNotifierConfigList.class,
|
||||
PluginNotifierConfig.class);
|
||||
|
||||
IPathManager pathMgr = PathManagerFactory.getPathManager();
|
||||
LocalizationFile[] files = pathMgr.listStaticFiles("notification",
|
||||
new String[] { ".xml" }, false, true);
|
||||
for (LocalizationFile lf : files) {
|
||||
try {
|
||||
EDEXUtil.getMessageProducer().sendAsyncUri(route, in.getBody());
|
||||
} catch (EdexException e) {
|
||||
File f = lf.getFile(true);
|
||||
if (f.length() > 0) {
|
||||
// empty files may be used to override base files to remove
|
||||
// functionality
|
||||
InputStream is = null;
|
||||
|
||||
try {
|
||||
is = lf.openInputStream();
|
||||
|
||||
PluginNotifierConfigList confList = (PluginNotifierConfigList) mgr
|
||||
.unmarshalFromInputStream(is);
|
||||
List<PluginNotifierConfig> configs = confList
|
||||
.getNotificationConfigs();
|
||||
if ((configs != null) && !configs.isEmpty()) {
|
||||
for (PluginNotifierConfig conf : configs) {
|
||||
register(conf, false);
|
||||
}
|
||||
}
|
||||
} catch (SerializationException e) {
|
||||
theHandler.handle(Priority.PROBLEM,
|
||||
"Unable to send message to " + route, e);
|
||||
"Unable to deserialize " + f.getPath(), e);
|
||||
} catch (InvalidNotificationConfigException e) {
|
||||
theHandler.handle(
|
||||
Priority.PROBLEM,
|
||||
"Unable to load plugin configuration "
|
||||
+ f.getPath(), e);
|
||||
} finally {
|
||||
if (is != null) {
|
||||
try {
|
||||
is.close();
|
||||
} catch (IOException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (LocalizationException e) {
|
||||
theHandler.handle(Priority.PROBLEM,
|
||||
"Error occurred accessing file: " + lf, e);
|
||||
}
|
||||
}
|
||||
|
||||
rebuildTree();
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Register the given PluginNotifierConfig.
|
||||
*
|
||||
* @param config
|
||||
* @return
|
||||
*/
|
||||
public synchronized void register(PluginNotifierConfig config)
|
||||
throws InvalidNotificationConfigException {
|
||||
register(config, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Register the given PluginNotifierConfig.
|
||||
*
|
||||
* @param config
|
||||
* @param rebuildTree
|
||||
* Whether or not to rebuild the internal tree. If many things
|
||||
* are being registered can improve performance to only build
|
||||
* tree once at the end.
|
||||
* @return
|
||||
*/
|
||||
public synchronized void register(PluginNotifierConfig config,
|
||||
boolean rebuildTree) throws InvalidNotificationConfigException {
|
||||
validate(config);
|
||||
|
||||
INotificationRouter router = null;
|
||||
switch (config.getFormat()) {
|
||||
case DATAURI:
|
||||
router = new DataUriRouter(config);
|
||||
break;
|
||||
case PDO:
|
||||
router = new PdoRouter(config);
|
||||
break;
|
||||
default:
|
||||
throw new InvalidNotificationConfigException(
|
||||
"No INotificationRouter registered for format: "
|
||||
+ config.getFormat());
|
||||
}
|
||||
|
||||
Map<String, RequestConstraint>[] metadataMaps = config.getMetadataMap();
|
||||
boolean receiveAll = (metadataMaps == null)
|
||||
|| (metadataMaps.length == 0)
|
||||
|| ((metadataMaps.length == 1) && ((metadataMaps[0] == null) || metadataMaps[0]
|
||||
.isEmpty()));
|
||||
|
||||
if (receiveAll) {
|
||||
// null or empty constraint map implies receive all data
|
||||
receiveAllRoutes.add(router);
|
||||
} else {
|
||||
filteredRoutes.add(router);
|
||||
for (Map<String, RequestConstraint> metadataMap : metadataMaps) {
|
||||
tree.insertCriteria(metadataMap, router);
|
||||
}
|
||||
|
||||
if (rebuildTree) {
|
||||
tree.rebuildTree();
|
||||
}
|
||||
}
|
||||
|
||||
loadedNames.add(config.getEndpointName());
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate the passed config
|
||||
*
|
||||
* @param config
|
||||
* @return
|
||||
*/
|
||||
private void validate(PluginNotifierConfig config)
|
||||
throws InvalidNotificationConfigException {
|
||||
String endpoint = config.getEndpointName();
|
||||
if ((endpoint == null) || (endpoint.trim().length() == 0)) {
|
||||
throw new InvalidNotificationConfigException(
|
||||
"endpointName is required");
|
||||
}
|
||||
|
||||
if (loadedNames.contains(endpoint)) {
|
||||
throw new InvalidNotificationConfigException("PluginConfiguration "
|
||||
+ endpoint + ": endpointName is already in use");
|
||||
}
|
||||
|
||||
EndpointType type = config.getEndpointType();
|
||||
if (type == null) {
|
||||
StringBuilder msg = new StringBuilder(180);
|
||||
msg.append("PluginConfiguration ")
|
||||
.append(endpoint)
|
||||
.append(": missing required field endpointType. Valid values for ")
|
||||
.append(NotifyFormat.PDO).append(" format are ")
|
||||
.append(EndpointType.DIRECTVM).append(" and ")
|
||||
.append(EndpointType.VM).append(". Valid values for ")
|
||||
.append(NotifyFormat.DATAURI).append(" format are ")
|
||||
.append(EndpointType.DIRECTVM).append(", ")
|
||||
.append(EndpointType.VM).append(", ")
|
||||
.append(EndpointType.QUEUE).append(", and ")
|
||||
.append(EndpointType.TOPIC);
|
||||
throw new InvalidNotificationConfigException(msg.toString());
|
||||
}
|
||||
|
||||
NotifyFormat format = config.getFormat();
|
||||
if (NotifyFormat.PDO.equals(format)
|
||||
&& (EndpointType.QUEUE.equals(type) || EndpointType.TOPIC
|
||||
.equals(type))) {
|
||||
StringBuilder msg = new StringBuilder(120);
|
||||
msg.append("PluginConfiguration ").append(endpoint)
|
||||
.append(": endpointType ").append(type)
|
||||
.append(" is invalid for format ").append(format)
|
||||
.append(". Valid values for ").append(NotifyFormat.PDO)
|
||||
.append(" format are ").append(EndpointType.DIRECTVM)
|
||||
.append(" and ").append(EndpointType.VM);
|
||||
throw new InvalidNotificationConfigException(msg.toString());
|
||||
}
|
||||
|
||||
if (!EndpointType.QUEUE.equals(type) && config.isDurable()) {
|
||||
theHandler.warn("PluginConfiguration: " + endpoint
|
||||
+ " durable setting only valid on QUEUE type endpoints");
|
||||
}
|
||||
|
||||
if ((EndpointType.QUEUE.equals(type) || EndpointType.TOPIC.equals(type))
|
||||
&& (config.getTimeToLive() < 0)) {
|
||||
theHandler
|
||||
.warn("PluginConfiguration: "
|
||||
+ endpoint
|
||||
+ " has invalid time to live. Time to live for JMS endpoints must be 0 or greater. Setting to default of: "
|
||||
+ DEFAULT_TIME_TO_LIVE + " ms");
|
||||
config.setTimeToLive(DEFAULT_TIME_TO_LIVE);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Rebuild the tree based on all register'd configurations.
|
||||
*/
|
||||
public void rebuildTree() {
|
||||
tree.rebuildTree();
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks the pdo's against the registered routes. Data will then be
|
||||
* transformed and queued or sent immediately depending on configuration.
|
||||
*
|
||||
* @param pdos
|
||||
* @return
|
||||
*/
|
||||
public void notify(PluginDataObject... pdos) {
|
||||
if ((pdos != null) && (pdos.length > 0)) {
|
||||
ITimer timer = TimeUtil.getTimer();
|
||||
timer.start();
|
||||
if (!receiveAllRoutes.isEmpty()) {
|
||||
for (PluginDataObject pdo : pdos) {
|
||||
for (INotificationRouter router : receiveAllRoutes) {
|
||||
router.process(pdo);
|
||||
}
|
||||
}
|
||||
|
||||
for (INotificationRouter router : receiveAllRoutes) {
|
||||
try {
|
||||
router.sendImmediateData();
|
||||
} catch (EdexException e) {
|
||||
theHandler.handle(
|
||||
Priority.PROBLEM,
|
||||
"Unable to send notification data to "
|
||||
+ router.getRoute(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!filteredRoutes.isEmpty()) {
|
||||
Set<INotificationRouter> routesWithData = new HashSet<INotificationRouter>();
|
||||
for (PluginDataObject pdo : pdos) {
|
||||
try {
|
||||
List<INotificationRouter> routers = tree
|
||||
.searchTree(DataURIUtil.createDataURIMap(pdo));
|
||||
for (INotificationRouter router : routers) {
|
||||
router.process(pdo);
|
||||
routesWithData.add(router);
|
||||
}
|
||||
} catch (PluginException e) {
|
||||
theHandler.handle(Priority.PROBLEM,
|
||||
e.getLocalizedMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
for (INotificationRouter router : routesWithData) {
|
||||
try {
|
||||
router.sendImmediateData();
|
||||
} catch (EdexException e) {
|
||||
theHandler.handle(
|
||||
Priority.PROBLEM,
|
||||
"Unable to send notification data to "
|
||||
+ router.getRoute(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
timer.stop();
|
||||
perfLog.logDuration("Processed " + pdos.length + " pdos",
|
||||
timer.getElapsedTime());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send the queued notifications.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public void sendQueuedNotifications() {
|
||||
for (INotificationRouter router : receiveAllRoutes) {
|
||||
try {
|
||||
router.sendQueuedData();
|
||||
} catch (EdexException e) {
|
||||
theHandler.handle(
|
||||
Priority.PROBLEM,
|
||||
"Unable to send notification data to "
|
||||
+ router.getRoute(), e);
|
||||
}
|
||||
}
|
||||
|
||||
for (INotificationRouter router : filteredRoutes) {
|
||||
try {
|
||||
router.sendQueuedData();
|
||||
} catch (EdexException e) {
|
||||
theHandler.handle(
|
||||
Priority.PROBLEM,
|
||||
"Unable to send notification data to "
|
||||
+ router.getRoute(), e);
|
||||
}
|
||||
}
|
||||
return list;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,224 @@
|
|||
/**
|
||||
* This software was developed and / or modified by Raytheon Company,
|
||||
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
||||
*
|
||||
* U.S. EXPORT CONTROLLED TECHNICAL DATA
|
||||
* This software product contains export-restricted data whose
|
||||
* export/transfer/disclosure is restricted by U.S. law. Dissemination
|
||||
* to non-U.S. persons whether in the United States or abroad requires
|
||||
* an export license or other authorization.
|
||||
*
|
||||
* Contractor Name: Raytheon Company
|
||||
* Contractor Address: 6825 Pine Street, Suite 340
|
||||
* Mail Stop B8
|
||||
* Omaha, NE 68106
|
||||
* 402.291.0100
|
||||
*
|
||||
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||
* further licensing information.
|
||||
**/
|
||||
package com.raytheon.uf.edex.ingest.notification;
|
||||
|
||||
import java.util.HashMap;
|
||||
|
||||
import javax.xml.bind.annotation.XmlAccessType;
|
||||
import javax.xml.bind.annotation.XmlAccessorType;
|
||||
import javax.xml.bind.annotation.XmlElement;
|
||||
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
|
||||
|
||||
import com.raytheon.uf.common.dataquery.requests.RequestConstraint;
|
||||
import com.raytheon.uf.common.dataquery.requests.RequestableMetadataMarshaller;
|
||||
|
||||
/**
|
||||
* Configuration object for plugin notification. An empty or null metadataMap
|
||||
* implies it should receive all data.
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
* SOFTWARE HISTORY
|
||||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Nov 19, 2013 2170 rjpeter Initial creation
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
* @author rjpeter
|
||||
* @version 1.0
|
||||
*/
|
||||
@XmlAccessorType(XmlAccessType.NONE)
|
||||
public class PluginNotifierConfig {
|
||||
|
||||
public static enum EndpointType {
|
||||
QUEUE, TOPIC, VM, DIRECTVM
|
||||
}
|
||||
|
||||
public static enum NotifyFormat {
|
||||
DATAURI, PDO
|
||||
}
|
||||
|
||||
private static final String vmPrefix = "vm:";
|
||||
|
||||
private static final String directvmPrefix = "direct-vm:";
|
||||
|
||||
private static final String persistentJmsPrefix = "jms-durable:";
|
||||
|
||||
private static final String transientJmsPrefix = "jms-generic:";
|
||||
|
||||
@XmlElement
|
||||
protected String endpointName;
|
||||
|
||||
@XmlElement
|
||||
protected EndpointType endpointType;
|
||||
|
||||
@XmlElement(required = true)
|
||||
protected NotifyFormat format;
|
||||
|
||||
/**
|
||||
* Time to live for JMS type endpoints in milliseconds.
|
||||
*/
|
||||
@XmlElement
|
||||
protected int timeToLive = -1;
|
||||
|
||||
/**
|
||||
* If the JMS message is being sent to a durable endpoint.
|
||||
*/
|
||||
@XmlElement
|
||||
protected boolean durable;
|
||||
|
||||
/**
|
||||
* the metadata criteria to retrieve the resource
|
||||
*/
|
||||
@XmlElement
|
||||
@XmlJavaTypeAdapter(value = RequestableMetadataMarshaller.class)
|
||||
protected HashMap<String, RequestConstraint>[] metadataMap;
|
||||
|
||||
protected transient String endpointUri;
|
||||
|
||||
public String getEndpointName() {
|
||||
return endpointName;
|
||||
}
|
||||
|
||||
public void setEndpointName(String endpointName) {
|
||||
this.endpointName = endpointName;
|
||||
}
|
||||
|
||||
public EndpointType getEndpointType() {
|
||||
return endpointType;
|
||||
}
|
||||
|
||||
public void setEndpointType(EndpointType endpointType) {
|
||||
this.endpointType = endpointType;
|
||||
}
|
||||
|
||||
public NotifyFormat getFormat() {
|
||||
return format;
|
||||
}
|
||||
|
||||
public void setFormat(NotifyFormat format) {
|
||||
this.format = format;
|
||||
}
|
||||
|
||||
public int getTimeToLive() {
|
||||
return timeToLive;
|
||||
}
|
||||
|
||||
public void setTimeToLive(int timeToLive) {
|
||||
this.timeToLive = timeToLive;
|
||||
}
|
||||
|
||||
public boolean isDurable() {
|
||||
return durable;
|
||||
}
|
||||
|
||||
public void setDurable(boolean durable) {
|
||||
this.durable = durable;
|
||||
}
|
||||
|
||||
public HashMap<String, RequestConstraint>[] getMetadataMap() {
|
||||
return metadataMap;
|
||||
}
|
||||
|
||||
public void setMetadataMap(HashMap<String, RequestConstraint>[] metadataMap) {
|
||||
this.metadataMap = metadataMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates and returns the endpoint uri for this configuration.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public String getEndpointUri() {
|
||||
if (endpointUri == null) {
|
||||
StringBuilder builder = new StringBuilder(64);
|
||||
switch (endpointType) {
|
||||
case DIRECTVM:
|
||||
builder.append(directvmPrefix);
|
||||
case VM:
|
||||
builder.append(vmPrefix);
|
||||
break;
|
||||
case QUEUE:
|
||||
case TOPIC:
|
||||
if (durable) {
|
||||
builder.append(persistentJmsPrefix);
|
||||
} else {
|
||||
builder.append(transientJmsPrefix);
|
||||
}
|
||||
|
||||
builder.append(endpointType.name().toLowerCase()).append(':');
|
||||
break;
|
||||
}
|
||||
|
||||
builder.append(endpointName);
|
||||
|
||||
// for jms endpoints add time to live field
|
||||
if ((timeToLive > 0)
|
||||
&& (EndpointType.QUEUE.equals(endpointType) || EndpointType.TOPIC
|
||||
.equals(endpointType))) {
|
||||
// append time to live in milliseconds
|
||||
builder.append("?timeToLive=").append(timeToLive);
|
||||
}
|
||||
|
||||
endpointUri = builder.toString();
|
||||
}
|
||||
|
||||
return endpointUri;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = 1;
|
||||
result = (prime * result)
|
||||
+ ((endpointName == null) ? 0 : endpointName.hashCode());
|
||||
result = (prime * result)
|
||||
+ ((endpointType == null) ? 0 : endpointType.hashCode());
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
PluginNotifierConfig other = (PluginNotifierConfig) obj;
|
||||
if (endpointName == null) {
|
||||
if (other.endpointName != null) {
|
||||
return false;
|
||||
}
|
||||
} else if (!endpointName.equals(other.endpointName)) {
|
||||
return false;
|
||||
}
|
||||
if (endpointType != other.endpointType) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,60 @@
|
|||
/**
|
||||
* This software was developed and / or modified by Raytheon Company,
|
||||
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
||||
*
|
||||
* U.S. EXPORT CONTROLLED TECHNICAL DATA
|
||||
* This software product contains export-restricted data whose
|
||||
* export/transfer/disclosure is restricted by U.S. law. Dissemination
|
||||
* to non-U.S. persons whether in the United States or abroad requires
|
||||
* an export license or other authorization.
|
||||
*
|
||||
* Contractor Name: Raytheon Company
|
||||
* Contractor Address: 6825 Pine Street, Suite 340
|
||||
* Mail Stop B8
|
||||
* Omaha, NE 68106
|
||||
* 402.291.0100
|
||||
*
|
||||
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||
* further licensing information.
|
||||
*/
|
||||
package com.raytheon.uf.edex.ingest.notification;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import javax.xml.bind.annotation.XmlAccessType;
|
||||
import javax.xml.bind.annotation.XmlAccessorType;
|
||||
import javax.xml.bind.annotation.XmlElement;
|
||||
import javax.xml.bind.annotation.XmlElements;
|
||||
import javax.xml.bind.annotation.XmlRootElement;
|
||||
|
||||
/**
|
||||
* List of Plugin Notification Configurations.
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
* SOFTWARE HISTORY
|
||||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Nov 19, 2013 2170 rjpeter Initial creation
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
* @author rjpeter
|
||||
* @version 1.0
|
||||
*/
|
||||
@XmlRootElement(name = "pluginNotificationList")
|
||||
@XmlAccessorType(XmlAccessType.NONE)
|
||||
public class PluginNotifierConfigList {
|
||||
@XmlElements({ @XmlElement(name = "pluginNotification") })
|
||||
private List<PluginNotifierConfig> notificationConfigs;
|
||||
|
||||
public List<PluginNotifierConfig> getNotificationConfigs() {
|
||||
return notificationConfigs;
|
||||
}
|
||||
|
||||
public void setNotificationConfigs(
|
||||
List<PluginNotifierConfig> notificationConfigs) {
|
||||
this.notificationConfigs = notificationConfigs;
|
||||
}
|
||||
}
|
|
@ -20,6 +20,7 @@
|
|||
package com.raytheon.uf.edex.ingest.notification;
|
||||
|
||||
import com.raytheon.uf.common.dataplugin.PluginDataObject;
|
||||
import com.raytheon.uf.common.dataplugin.message.PracticeDataURINotificationMessage;
|
||||
|
||||
/**
|
||||
* Converts PluginDataObjects or arrays of PluginDataObjects into their dataURIs
|
||||
|
@ -30,7 +31,7 @@ import com.raytheon.uf.common.dataplugin.PluginDataObject;
|
|||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Nov 25, 2008 chammack Initial creation
|
||||
*
|
||||
* Nov 19, 2013 2170 rjpeter Added toPracticeNotificationMsg.
|
||||
* </pre>
|
||||
*
|
||||
* @author chammack
|
||||
|
@ -51,4 +52,12 @@ public class ToDataURI {
|
|||
public String toDataURI(PluginDataObject pdo) {
|
||||
return pdo.getDataURI();
|
||||
}
|
||||
|
||||
public PracticeDataURINotificationMessage toPracticeNotificationMsg(
|
||||
PluginDataObject[] pdos) {
|
||||
String[] uris = toDataURI(pdos);
|
||||
PracticeDataURINotificationMessage rval = new PracticeDataURINotificationMessage();
|
||||
rval.setDataURIs(uris);
|
||||
return rval;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,186 @@
|
|||
/**
|
||||
* This software was developed and / or modified by Raytheon Company,
|
||||
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
||||
*
|
||||
* U.S. EXPORT CONTROLLED TECHNICAL DATA
|
||||
* This software product contains export-restricted data whose
|
||||
* export/transfer/disclosure is restricted by U.S. law. Dissemination
|
||||
* to non-U.S. persons whether in the United States or abroad requires
|
||||
* an export license or other authorization.
|
||||
*
|
||||
* Contractor Name: Raytheon Company
|
||||
* Contractor Address: 6825 Pine Street, Suite 340
|
||||
* Mail Stop B8
|
||||
* Omaha, NE 68106
|
||||
* 402.291.0100
|
||||
*
|
||||
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||
* further licensing information.
|
||||
**/
|
||||
package com.raytheon.uf.edex.ingest.notification.router;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.zip.GZIPOutputStream;
|
||||
|
||||
import com.raytheon.uf.common.dataplugin.PluginDataObject;
|
||||
import com.raytheon.uf.common.dataplugin.message.DataURINotificationMessage;
|
||||
import com.raytheon.uf.common.serialization.SerializationException;
|
||||
import com.raytheon.uf.common.serialization.SerializationUtil;
|
||||
import com.raytheon.uf.common.util.ByteArrayOutputStreamPool;
|
||||
import com.raytheon.uf.edex.core.EDEXUtil;
|
||||
import com.raytheon.uf.edex.core.EdexException;
|
||||
import com.raytheon.uf.edex.ingest.notification.PluginNotifierConfig;
|
||||
import com.raytheon.uf.edex.ingest.notification.PluginNotifierConfig.EndpointType;
|
||||
|
||||
/**
|
||||
* Routes DataUri Notifications to a destination uri. Notification msg will be
|
||||
* sent immediately to the routes for inner jvm calls, routes over jms will be
|
||||
* queued and sent in gzipped batches.
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
* SOFTWARE HISTORY
|
||||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Nov 19, 2013 2170 rjpeter Initial creation
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
* @author rjpeter
|
||||
* @version 1.0
|
||||
*/
|
||||
public class DataUriRouter implements INotificationRouter {
|
||||
|
||||
/**
|
||||
* Buffer size.
|
||||
*/
|
||||
private static final int GZIP_BUFFER_SIZE = 4096;
|
||||
|
||||
/**
|
||||
* Data URIs that have not been sent.
|
||||
*/
|
||||
private final ConcurrentLinkedQueue<String> uris = new ConcurrentLinkedQueue<String>();
|
||||
|
||||
/**
|
||||
* Flag if this route stays in the jvm.
|
||||
*/
|
||||
private final boolean isInternal;
|
||||
|
||||
/**
|
||||
* The destination URI for this router.
|
||||
*/
|
||||
private final String route;
|
||||
|
||||
/**
|
||||
*
|
||||
* @param config
|
||||
*/
|
||||
public DataUriRouter(PluginNotifierConfig config) {
|
||||
EndpointType type = config.getEndpointType();
|
||||
isInternal = EndpointType.DIRECTVM.equals(type)
|
||||
|| EndpointType.VM.equals(type);
|
||||
route = config.getEndpointUri();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getRoute() {
|
||||
return route;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(PluginDataObject pdo) {
|
||||
uris.add(pdo.getDataURI());
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a DataURINotificationMessage.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
protected synchronized DataURINotificationMessage createMessage() {
|
||||
DataURINotificationMessage msg = null;
|
||||
// this is the only point that uris is reduced, safe to grab current
|
||||
// size and dequeue that many items
|
||||
int size = uris.size();
|
||||
if (size > 0) {
|
||||
String[] data = new String[size];
|
||||
for (int i = 0; i < size; i++) {
|
||||
data[i] = uris.poll();
|
||||
}
|
||||
|
||||
msg = new DataURINotificationMessage();
|
||||
msg.setDataURIs(data);
|
||||
}
|
||||
|
||||
return msg;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendImmediateData() throws EdexException {
|
||||
// if sending inside jvm, create message and send immediately as the
|
||||
// memory object
|
||||
if (isInternal) {
|
||||
DataURINotificationMessage msg = createMessage();
|
||||
if (msg != null) {
|
||||
EDEXUtil.getMessageProducer().sendAsyncUri(route, msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendQueuedData() throws EdexException {
|
||||
if (!isInternal) {
|
||||
// if sending outside the jvm, create message, serialize, gzip, and
|
||||
// then send
|
||||
DataURINotificationMessage msg = createMessage();
|
||||
if (msg != null) {
|
||||
EDEXUtil.getMessageProducer().sendAsyncUri(route,
|
||||
encodeMessage(msg));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Thrift encodes an object and then gzip's the binary data. Should only be
|
||||
* used for sending data outside the jvm.
|
||||
*
|
||||
* @param msg
|
||||
* @return
|
||||
* @throws SerializationException
|
||||
*/
|
||||
private byte[] encodeMessage(DataURINotificationMessage msg)
|
||||
throws EdexException {
|
||||
ByteArrayOutputStream baos = ByteArrayOutputStreamPool.getInstance()
|
||||
.getStream();
|
||||
GZIPOutputStream gzippedURIs = null;
|
||||
|
||||
try {
|
||||
gzippedURIs = new GZIPOutputStream(baos, GZIP_BUFFER_SIZE, true);
|
||||
} catch (IOException e) {
|
||||
throw new EdexException(
|
||||
"Failed to prepare the gzipped data stream", e);
|
||||
}
|
||||
|
||||
try {
|
||||
SerializationUtil.transformToThriftUsingStream(msg, gzippedURIs);
|
||||
gzippedURIs.finish();
|
||||
gzippedURIs.flush();
|
||||
return baos.toByteArray();
|
||||
} catch (IOException e) {
|
||||
throw new EdexException("Failed to write the gzipped data stream",
|
||||
e);
|
||||
} catch (SerializationException e) {
|
||||
throw new EdexException(
|
||||
"Failed to serialize DataURINotificationMessage", e);
|
||||
} finally {
|
||||
try {
|
||||
gzippedURIs.close();
|
||||
} catch (IOException e) {
|
||||
// ignore, we no longer need the stream
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,76 @@
|
|||
/**
|
||||
* This software was developed and / or modified by Raytheon Company,
|
||||
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
||||
*
|
||||
* U.S. EXPORT CONTROLLED TECHNICAL DATA
|
||||
* This software product contains export-restricted data whose
|
||||
* export/transfer/disclosure is restricted by U.S. law. Dissemination
|
||||
* to non-U.S. persons whether in the United States or abroad requires
|
||||
* an export license or other authorization.
|
||||
*
|
||||
* Contractor Name: Raytheon Company
|
||||
* Contractor Address: 6825 Pine Street, Suite 340
|
||||
* Mail Stop B8
|
||||
* Omaha, NE 68106
|
||||
* 402.291.0100
|
||||
*
|
||||
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||
* further licensing information.
|
||||
**/
|
||||
package com.raytheon.uf.edex.ingest.notification.router;
|
||||
|
||||
import com.raytheon.uf.common.dataplugin.PluginDataObject;
|
||||
import com.raytheon.uf.edex.core.EdexException;
|
||||
|
||||
/**
|
||||
* Notification router for a specific destination and format.
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
* SOFTWARE HISTORY
|
||||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Nov 19, 2013 2170 rjpeter Initial creation
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
* @author rjpeter
|
||||
* @version 1.0
|
||||
*/
|
||||
public interface INotificationRouter {
|
||||
/**
|
||||
* The destination for this router.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public String getRoute();
|
||||
|
||||
/**
|
||||
* Process the pdo into notification format.
|
||||
*
|
||||
* @param pdo
|
||||
*/
|
||||
public void process(PluginDataObject pdo);
|
||||
|
||||
/**
|
||||
* Send data that should be sent immediately to route. This is mainly used
|
||||
* for PDO data that we don't want to build up and send as a bulk message.
|
||||
* Future enhancement could allow for priority data such as warnings and
|
||||
* radar data to always be notified immediately instead of a timer driver
|
||||
* basis.
|
||||
*
|
||||
* @throws EdexException
|
||||
*/
|
||||
public void sendImmediateData() throws EdexException;
|
||||
|
||||
/**
|
||||
* Send any queued data to route. Generally used for data uri notifications
|
||||
* being sent over JMS to allow for better bundling. The data is queued in
|
||||
* memory and sent out as a bundled message. The interval is defined by the
|
||||
* notification timer in persist-ingest.xml.
|
||||
*
|
||||
* @throws EdexException
|
||||
*/
|
||||
public void sendQueuedData() throws EdexException;
|
||||
}
|
|
@ -0,0 +1,94 @@
|
|||
/**
|
||||
* This software was developed and / or modified by Raytheon Company,
|
||||
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
||||
*
|
||||
* U.S. EXPORT CONTROLLED TECHNICAL DATA
|
||||
* This software product contains export-restricted data whose
|
||||
* export/transfer/disclosure is restricted by U.S. law. Dissemination
|
||||
* to non-U.S. persons whether in the United States or abroad requires
|
||||
* an export license or other authorization.
|
||||
*
|
||||
* Contractor Name: Raytheon Company
|
||||
* Contractor Address: 6825 Pine Street, Suite 340
|
||||
* Mail Stop B8
|
||||
* Omaha, NE 68106
|
||||
* 402.291.0100
|
||||
*
|
||||
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||
* further licensing information.
|
||||
**/
|
||||
package com.raytheon.uf.edex.ingest.notification.router;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import com.raytheon.uf.common.dataplugin.PluginDataObject;
|
||||
import com.raytheon.uf.edex.core.EDEXUtil;
|
||||
import com.raytheon.uf.edex.core.EdexException;
|
||||
import com.raytheon.uf.edex.ingest.notification.PluginNotifierConfig;
|
||||
|
||||
/**
|
||||
* Routes pdos to a destination. Should only be used for destination inside the
|
||||
* jvm.
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
* SOFTWARE HISTORY
|
||||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Nov 19, 2013 2170 rjpeter Initial creation
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
* @author rjpeter
|
||||
* @version 1.0
|
||||
*/
|
||||
public class PdoRouter implements INotificationRouter {
|
||||
|
||||
/**
|
||||
* Destination URI for this router.
|
||||
*/
|
||||
private final String route;
|
||||
|
||||
/**
|
||||
* PDO's in use by current thread. Since all PDO data is sent immediately
|
||||
* handles concurrency via thread local instance.
|
||||
*/
|
||||
private final ThreadLocal<List<PluginDataObject>> myPdos = new ThreadLocal<List<PluginDataObject>>() {
|
||||
|
||||
@Override
|
||||
protected List<PluginDataObject> initialValue() {
|
||||
return new LinkedList<PluginDataObject>();
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
public PdoRouter(PluginNotifierConfig config) {
|
||||
this.route = config.getEndpointUri();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getRoute() {
|
||||
return route;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(PluginDataObject pdo) {
|
||||
myPdos.get().add(pdo);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendImmediateData() throws EdexException {
|
||||
List<PluginDataObject> pdos = myPdos.get();
|
||||
myPdos.remove();
|
||||
if (pdos.size() > 0) {
|
||||
EDEXUtil.getMessageProducer().sendAsyncUri(route, pdos);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendQueuedData() throws EdexException {
|
||||
// NOOP all data sent immediately
|
||||
}
|
||||
}
|
|
@ -0,0 +1,11 @@
|
|||
<pluginNotificationList>
|
||||
<pluginNotification>
|
||||
<endpointName>edex.alerts</endpointName>
|
||||
<format>DATAURI</format>
|
||||
<endpointType>TOPIC</endpointType>
|
||||
<durable>false</durable>
|
||||
<timeToLive>120000</timeToLive>
|
||||
<metadataMap>
|
||||
</metadataMap>
|
||||
</pluginNotification>
|
||||
</pluginNotificationList>
|
|
@ -12,7 +12,8 @@
|
|||
errorHandlerRef="errorHandler"
|
||||
autoStartup="false">
|
||||
<route id="satPreIngestRoute">
|
||||
<from uri="jms-generic:topic:edex.alerts"/>
|
||||
<!-- Fed via notification -->
|
||||
<from uri="jms-generic:queue:satPreFilter"/>
|
||||
<doTry>
|
||||
<pipeline>
|
||||
<bean ref="serializationUtil" method="transformFromThrift" />
|
||||
|
|
|
@ -0,0 +1,17 @@
|
|||
<pluginNotificationList>
|
||||
<pluginNotification>
|
||||
<endpointName>satPreFilter</endpointName>
|
||||
<format>DATAURI</format>
|
||||
<endpointType>QUEUE</endpointType>
|
||||
<durable>true</durable>
|
||||
<timeToLive>600000</timeToLive>
|
||||
<metadataMap>
|
||||
<mapping key="pluginName">
|
||||
<constraint constraintValue="grid" constraintType="EQUALS"/>
|
||||
</mapping>
|
||||
<mapping key="info.datasetId">
|
||||
<constraint constraintValue="AUTOSPE" constraintType="EQUALS"/>
|
||||
</mapping>
|
||||
</metadataMap>
|
||||
</pluginNotification>
|
||||
</pluginNotificationList>
|
|
@ -0,0 +1,15 @@
|
|||
<pluginNotificationList>
|
||||
<!-- TODO: Run time generate to only listen to the specific data scan is interested in -->
|
||||
<pluginNotification>
|
||||
<endpointName>scanCpgsrvFiltering</endpointName>
|
||||
<format>DATAURI</format>
|
||||
<endpointType>QUEUE</endpointType>
|
||||
<durable>true</durable>
|
||||
<timeToLive>120000</timeToLive>
|
||||
<metadataMap>
|
||||
<mapping key="pluginName">
|
||||
<constraint constraintValue="grid, radar, bufrua, binlightning" constraintType="IN"/>
|
||||
</mapping>
|
||||
</metadataMap>
|
||||
</pluginNotification>
|
||||
</pluginNotificationList>
|
Loading…
Add table
Reference in a new issue