diff --git a/edexOsgi/com.raytheon.edex.plugin.gfe/res/spring/gfe-spring.xml b/edexOsgi/com.raytheon.edex.plugin.gfe/res/spring/gfe-spring.xml index 91920b91e7..e4b5bd8270 100644 --- a/edexOsgi/com.raytheon.edex.plugin.gfe/res/spring/gfe-spring.xml +++ b/edexOsgi/com.raytheon.edex.plugin.gfe/res/spring/gfe-spring.xml @@ -30,16 +30,6 @@ - - - - - - - - - - @@ -48,7 +38,7 @@ - + @@ -60,7 +50,7 @@ - + @@ -82,6 +72,7 @@ + @@ -141,18 +132,6 @@ - - - - - - java.lang.Throwable - - - - diff --git a/edexOsgi/com.raytheon.edex.plugin.gfe/src/com/raytheon/edex/plugin/gfe/spc/SPCWatchSrv.java b/edexOsgi/com.raytheon.edex.plugin.gfe/src/com/raytheon/edex/plugin/gfe/spc/SPCWatchSrv.java index db76ab5ccb..e174fe2531 100644 --- a/edexOsgi/com.raytheon.edex.plugin.gfe/src/com/raytheon/edex/plugin/gfe/spc/SPCWatchSrv.java +++ b/edexOsgi/com.raytheon.edex.plugin.gfe/src/com/raytheon/edex/plugin/gfe/spc/SPCWatchSrv.java @@ -62,7 +62,8 @@ public class SPCWatchSrv { protected transient Log logger = LogFactory.getLog(getClass()); - public void handleSpcWatch(PluginDataObject[] pdos) throws EdexException { + public void handleSpcWatch(List pdos) + throws EdexException { // create the appropriate SPC notification, returns null if not // needed. EnvProperties env = PropertiesFactory.getInstance().getEnvProperties(); diff --git a/edexOsgi/com.raytheon.edex.plugin.gfe/src/com/raytheon/edex/plugin/gfe/tpc/TPCWatchSrv.java b/edexOsgi/com.raytheon.edex.plugin.gfe/src/com/raytheon/edex/plugin/gfe/tpc/TPCWatchSrv.java index b2d4a282bc..907ccd9bbc 100644 --- a/edexOsgi/com.raytheon.edex.plugin.gfe/src/com/raytheon/edex/plugin/gfe/tpc/TPCWatchSrv.java +++ b/edexOsgi/com.raytheon.edex.plugin.gfe/src/com/raytheon/edex/plugin/gfe/tpc/TPCWatchSrv.java @@ -92,7 +92,8 @@ public class TPCWatchSrv { protected transient Log logger = LogFactory.getLog(getClass()); - public void handleTpcWatch(PluginDataObject[] pdos) throws EdexException { + public void handleTpcWatch(List pdos) + throws EdexException { EnvProperties env = PropertiesFactory.getInstance().getEnvProperties(); String primarySite = env.getEnvValue("SITENAME"); diff --git a/edexOsgi/com.raytheon.edex.plugin.gfe/utility/common_static/base/notification/gfe-smartinit.xml b/edexOsgi/com.raytheon.edex.plugin.gfe/utility/common_static/base/notification/gfe-smartinit.xml new file mode 100644 index 0000000000..3ab28d227b --- /dev/null +++ b/edexOsgi/com.raytheon.edex.plugin.gfe/utility/common_static/base/notification/gfe-smartinit.xml @@ -0,0 +1,20 @@ + + + + gfeDataURINotification + DATAURI + QUEUE + true + 600000 + + + + + + + + + + + + diff --git a/edexOsgi/com.raytheon.edex.plugin.gfe/utility/common_static/base/notification/gfe-watch.xml b/edexOsgi/com.raytheon.edex.plugin.gfe/utility/common_static/base/notification/gfe-watch.xml new file mode 100644 index 0000000000..18ab30e28c --- /dev/null +++ b/edexOsgi/com.raytheon.edex.plugin.gfe/utility/common_static/base/notification/gfe-watch.xml @@ -0,0 +1,22 @@ + + + gfe.spcWatch + PDO + VM + + + + + + + + gfe.tpcWatch + PDO + VM + + + + + + + diff --git a/edexOsgi/com.raytheon.edex.plugin.grib/src/com/raytheon/edex/plugin/grib/decoderpostprocessors/EnsembleGridAssembler.java b/edexOsgi/com.raytheon.edex.plugin.grib/src/com/raytheon/edex/plugin/grib/decoderpostprocessors/EnsembleGridAssembler.java index d4661b24c3..d463ff27d2 100644 --- a/edexOsgi/com.raytheon.edex.plugin.grib/src/com/raytheon/edex/plugin/grib/decoderpostprocessors/EnsembleGridAssembler.java +++ b/edexOsgi/com.raytheon.edex.plugin.grib/src/com/raytheon/edex/plugin/grib/decoderpostprocessors/EnsembleGridAssembler.java @@ -227,7 +227,7 @@ public class EnsembleGridAssembler implements IDecoderPostProcessor { updateExistingRecord(record, assembledRecord, thinned, dao); } EDEXUtil.getMessageProducer().sendAsync("notificationAggregation", - new String[] { assembledRecord.getDataURI() }); + record); } private GridRecord createAssembledRecord(GridRecord record, diff --git a/edexOsgi/com.raytheon.edex.plugin.modelsounding/res/spring/modelsounding-ingest.xml b/edexOsgi/com.raytheon.edex.plugin.modelsounding/res/spring/modelsounding-ingest.xml index 5a664102f7..73770c7c57 100644 --- a/edexOsgi/com.raytheon.edex.plugin.modelsounding/res/spring/modelsounding-ingest.xml +++ b/edexOsgi/com.raytheon.edex.plugin.modelsounding/res/spring/modelsounding-ingest.xml @@ -74,8 +74,7 @@ - - + diff --git a/edexOsgi/com.raytheon.edex.plugin.warning/res/spring/warning-ingest.xml b/edexOsgi/com.raytheon.edex.plugin.warning/res/spring/warning-ingest.xml index 5e5693c152..bf213c9ea8 100644 --- a/edexOsgi/com.raytheon.edex.plugin.warning/res/spring/warning-ingest.xml +++ b/edexOsgi/com.raytheon.edex.plugin.warning/res/spring/warning-ingest.xml @@ -65,9 +65,9 @@ + - @@ -83,9 +83,7 @@ - - - + \ No newline at end of file diff --git a/edexOsgi/com.raytheon.uf.edex.activetable/res/spring/activetable-common.xml b/edexOsgi/com.raytheon.uf.edex.activetable/res/spring/activetable-common.xml index 2c57d6eb99..5e968850e9 100644 --- a/edexOsgi/com.raytheon.uf.edex.activetable/res/spring/activetable-common.xml +++ b/edexOsgi/com.raytheon.uf.edex.activetable/res/spring/activetable-common.xml @@ -34,9 +34,8 @@ - - - + + @@ -54,9 +53,8 @@ - - - + + diff --git a/edexOsgi/com.raytheon.uf.edex.cpgsrv/build.properties b/edexOsgi/com.raytheon.uf.edex.cpgsrv/build.properties index 5791d48d5f..73974cda80 100644 --- a/edexOsgi/com.raytheon.uf.edex.cpgsrv/build.properties +++ b/edexOsgi/com.raytheon.uf.edex.cpgsrv/build.properties @@ -2,4 +2,5 @@ source.. = src/ output.. = bin/ bin.includes = META-INF/,\ .,\ - res/ + res/,\ + utility/ diff --git a/edexOsgi/com.raytheon.uf.edex.cpgsrv/res/spring/cpgsrv-spring.xml b/edexOsgi/com.raytheon.uf.edex.cpgsrv/res/spring/cpgsrv-spring.xml index 69c5a9e7f6..c1dd697082 100644 --- a/edexOsgi/com.raytheon.uf.edex.cpgsrv/res/spring/cpgsrv-spring.xml +++ b/edexOsgi/com.raytheon.uf.edex.cpgsrv/res/spring/cpgsrv-spring.xml @@ -16,32 +16,15 @@ autoStartup="false"> - - + - - - - - - - - - - - - java.lang.Throwable - - - - - + @@ -59,4 +42,4 @@ factory-method="register"> - \ No newline at end of file + diff --git a/edexOsgi/com.raytheon.uf.edex.cpgsrv/utility/common_static/base/notification/cpgsrv.xml b/edexOsgi/com.raytheon.uf.edex.cpgsrv/utility/common_static/base/notification/cpgsrv.xml new file mode 100644 index 0000000000..8982a81178 --- /dev/null +++ b/edexOsgi/com.raytheon.uf.edex.cpgsrv/utility/common_static/base/notification/cpgsrv.xml @@ -0,0 +1,15 @@ + + + + cpgsrvFiltering + DATAURI + QUEUE + true + 120000 + + + + + + + diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/res/spring/retrieval-datadelivery.xml b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/res/spring/retrieval-datadelivery.xml index 07aa568450..e3909ac819 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/res/spring/retrieval-datadelivery.xml +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.retrieval/res/spring/retrieval-datadelivery.xml @@ -88,8 +88,7 @@ - - + diff --git a/edexOsgi/com.raytheon.uf.edex.grid.staticdata/build.properties b/edexOsgi/com.raytheon.uf.edex.grid.staticdata/build.properties index 5791d48d5f..6200c0cd21 100644 --- a/edexOsgi/com.raytheon.uf.edex.grid.staticdata/build.properties +++ b/edexOsgi/com.raytheon.uf.edex.grid.staticdata/build.properties @@ -2,4 +2,5 @@ source.. = src/ output.. = bin/ bin.includes = META-INF/,\ .,\ - res/ + res/,\ + utility/ \ No newline at end of file diff --git a/edexOsgi/com.raytheon.uf.edex.grid.staticdata/res/spring/grid-staticdata-process.xml b/edexOsgi/com.raytheon.uf.edex.grid.staticdata/res/spring/grid-staticdata-process.xml index 8db9b3d210..1f72790c2b 100644 --- a/edexOsgi/com.raytheon.uf.edex.grid.staticdata/res/spring/grid-staticdata-process.xml +++ b/edexOsgi/com.raytheon.uf.edex.grid.staticdata/res/spring/grid-staticdata-process.xml @@ -1,34 +1,27 @@ - - - + + + - - - - - - - - - - java.lang.Throwable - - - - - - - - - + + + + + + + + + java.lang.Throwable + + + + + diff --git a/edexOsgi/com.raytheon.uf.edex.grid.staticdata/src/com/raytheon/uf/edex/grid/staticdata/StaticDataGenerator.java b/edexOsgi/com.raytheon.uf.edex.grid.staticdata/src/com/raytheon/uf/edex/grid/staticdata/StaticDataGenerator.java index e4df17246f..7bd4e1ec92 100644 --- a/edexOsgi/com.raytheon.uf.edex.grid.staticdata/src/com/raytheon/uf/edex/grid/staticdata/StaticDataGenerator.java +++ b/edexOsgi/com.raytheon.uf.edex.grid.staticdata/src/com/raytheon/uf/edex/grid/staticdata/StaticDataGenerator.java @@ -108,7 +108,7 @@ public class StaticDataGenerator { * whether static data has been created to avoid too many trips to db to * check. */ - private Map dataGeneratedCache = Collections + private final Map dataGeneratedCache = Collections .synchronizedMap(new LinkedHashMap( (int) (CACHE_SIZE / 0.75f) + 1, 0.75f, true) { @@ -138,8 +138,9 @@ public class StaticDataGenerator { */ public GridRecord[] processNotification(DataURINotificationMessage msg) throws Exception { - Set staticRecords = new HashSet(); - for (String dataURI : msg.getDataURIs()) { + String[] uris = msg.getDataURIs(); + Set staticRecords = new HashSet(uris.length, 1); + for (String dataURI : uris) { if (dataURI.startsWith("/grid/")) { try { GridRecord record = new GridRecord(dataURI); @@ -376,8 +377,6 @@ public class StaticDataGenerator { staticRecord.setLevel(LevelFactory.getInstance().getLevel("Dflt", 0, "m")); staticRecord.setDataTime(dataTime); - - staticRecord.constructDataURI(); return staticRecord; } @@ -458,38 +457,47 @@ public class StaticDataGenerator { public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + coverageid; - result = prime * result + result = (prime * result) + coverageid; + result = (prime * result) + ((datasetid == null) ? 0 : datasetid.hashCode()); - result = prime * result + forecastTime; - result = prime * result + result = (prime * result) + forecastTime; + result = (prime * result) + ((refTime == null) ? 0 : refTime.hashCode()); return result; } @Override public boolean equals(Object obj) { - if (this == obj) + if (this == obj) { return true; - if (obj == null) + } + if (obj == null) { return false; - if (getClass() != obj.getClass()) + } + if (getClass() != obj.getClass()) { return false; + } CacheKey other = (CacheKey) obj; - if (coverageid != other.coverageid) + if (coverageid != other.coverageid) { return false; + } if (datasetid == null) { - if (other.datasetid != null) + if (other.datasetid != null) { return false; - } else if (!datasetid.equals(other.datasetid)) + } + } else if (!datasetid.equals(other.datasetid)) { return false; - if (forecastTime != other.forecastTime) + } + if (forecastTime != other.forecastTime) { return false; + } if (refTime == null) { - if (other.refTime != null) + if (other.refTime != null) { return false; - } else if (!refTime.equals(other.refTime)) + } + } else if (!refTime.equals(other.refTime)) { return false; + } return true; } diff --git a/edexOsgi/com.raytheon.uf.edex.grid.staticdata/utility/common_static/base/notification/grid-staticdata.xml b/edexOsgi/com.raytheon.uf.edex.grid.staticdata/utility/common_static/base/notification/grid-staticdata.xml new file mode 100644 index 0000000000..854d43b8e0 --- /dev/null +++ b/edexOsgi/com.raytheon.uf.edex.grid.staticdata/utility/common_static/base/notification/grid-staticdata.xml @@ -0,0 +1,12 @@ + + + grid-staticdata-generate + VM + DATAURI + + + + + + + diff --git a/edexOsgi/com.raytheon.uf.edex.ingest/META-INF/MANIFEST.MF b/edexOsgi/com.raytheon.uf.edex.ingest/META-INF/MANIFEST.MF index 964144d6b0..bd9f0e1e21 100644 --- a/edexOsgi/com.raytheon.uf.edex.ingest/META-INF/MANIFEST.MF +++ b/edexOsgi/com.raytheon.uf.edex.ingest/META-INF/MANIFEST.MF @@ -5,6 +5,7 @@ Bundle-SymbolicName: com.raytheon.uf.edex.ingest Bundle-Version: 1.12.1174.qualifier Bundle-Vendor: RAYTHEON Require-Bundle: com.raytheon.edex.common, + com.raytheon.uf.common.localization, com.google.guava, org.apache.camel, org.apache.commons.lang, diff --git a/edexOsgi/com.raytheon.uf.edex.ingest/build.properties b/edexOsgi/com.raytheon.uf.edex.ingest/build.properties index 5791d48d5f..73974cda80 100644 --- a/edexOsgi/com.raytheon.uf.edex.ingest/build.properties +++ b/edexOsgi/com.raytheon.uf.edex.ingest/build.properties @@ -2,4 +2,5 @@ source.. = src/ output.. = bin/ bin.includes = META-INF/,\ .,\ - res/ + res/,\ + utility/ diff --git a/edexOsgi/com.raytheon.uf.edex.ingest/res/spring/.persist-request.xml.swp b/edexOsgi/com.raytheon.uf.edex.ingest/res/spring/.persist-request.xml.swp deleted file mode 100644 index b12de2cc11..0000000000 Binary files a/edexOsgi/com.raytheon.uf.edex.ingest/res/spring/.persist-request.xml.swp and /dev/null differ diff --git a/edexOsgi/com.raytheon.uf.edex.ingest/res/spring/persist-ingest.xml b/edexOsgi/com.raytheon.uf.edex.ingest/res/spring/persist-ingest.xml index 2fe1c903f2..0345344482 100644 --- a/edexOsgi/com.raytheon.uf.edex.ingest/res/spring/persist-ingest.xml +++ b/edexOsgi/com.raytheon.uf.edex.ingest/res/spring/persist-ingest.xml @@ -5,70 +5,59 @@ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd"> - - - - - + - - + + - - + factory-method="register"> + + - - - - - + + + - - - - + + - - + - + - - + - + - - + + - - - - - - + + diff --git a/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/DataUriAggregator.java b/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/DataUriAggregator.java deleted file mode 100644 index 6083fb3c2b..0000000000 --- a/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/DataUriAggregator.java +++ /dev/null @@ -1,146 +0,0 @@ -/** - * This software was developed and / or modified by Raytheon Company, - * pursuant to Contract DG133W-05-CQ-1067 with the US Government. - * - * U.S. EXPORT CONTROLLED TECHNICAL DATA - * This software product contains export-restricted data whose - * export/transfer/disclosure is restricted by U.S. law. Dissemination - * to non-U.S. persons whether in the United States or abroad requires - * an export license or other authorization. - * - * Contractor Name: Raytheon Company - * Contractor Address: 6825 Pine Street, Suite 340 - * Mail Stop B8 - * Omaha, NE 68106 - * 402.291.0100 - * - * See the AWIPS II Master Rights File ("Master Rights File.pdf") for - * further licensing information. - **/ -package com.raytheon.uf.edex.ingest.notification; - -import java.util.ArrayList; -import java.util.List; -import java.util.zip.GZIPOutputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; - -import com.raytheon.uf.common.serialization.ISerializableObject; -import com.raytheon.uf.common.serialization.SerializationException; -import com.raytheon.uf.common.serialization.SerializationUtil; -import com.raytheon.uf.common.dataplugin.message.DataURINotificationMessage; -import com.raytheon.uf.common.dataplugin.message.PracticeDataURINotificationMessage; -import com.raytheon.uf.common.util.ByteArrayOutputStreamPool; - -/** - * Combines multiple messages of URIs into a single message that can be - * triggered - * - *
- * SOFTWARE HISTORY
- * Date         Ticket#    Engineer    Description
- * ------------ ---------- ----------- --------------------------
- * Dec 10, 2008            njensen     Initial creation
- * Feb 15, 2013 1638       mschenke    Moved DataURINotificationMessage to uf.common.dataplugin
- * Aug 16, 2013 2169       bkowal      gzip data uris
- * 
- * - * @author njensen - * @version 1.0 - */ - -public class DataUriAggregator { - - private List dataUris = new ArrayList(); - - /** - * Add data uris to the queue - * - * @param uris - */ - public void addDataUris(String[] uris) { - synchronized (this) { - for (String uri : uris) { - if (uri != null) { - dataUris.add(uri); - } - } - } - } - - /** - * Filters quartz messages to only send a message if there are URIs to send. - * - * @param obj - * @return - */ - public boolean hasUris(Object obj) { - synchronized (this) { - return dataUris.size() > 0; - } - } - - /** - * Send off the currently queued uris - * - * @return - */ - public byte[] sendQueuedUris() throws SerializationException { - DataURINotificationMessage msg = null; - synchronized (this) { - String[] uris = dataUris.toArray(new String[dataUris.size()]); - dataUris.clear(); - msg = new DataURINotificationMessage(); - msg.setDataURIs(uris); - } - - return this.encodeMessage(msg); - } - - /** - * Send off the currently queued uris - * - * @return - */ - public byte[] sendPracticeQueuedUris() throws SerializationException { - PracticeDataURINotificationMessage msg = null; - synchronized (this) { - String[] uris = dataUris.toArray(new String[dataUris.size()]); - dataUris.clear(); - msg = new PracticeDataURINotificationMessage(); - msg.setDataURIs(uris); - } - - return this.encodeMessage(msg); - } - - public byte[] encodeMessage(ISerializableObject msg) - throws SerializationException { - ByteArrayOutputStream baos = ByteArrayOutputStreamPool.getInstance() - .getStream(); - GZIPOutputStream gzippedURIs = null; - - try { - gzippedURIs = new GZIPOutputStream(baos); - } catch (IOException e) { - throw new SerializationException( - "Failed to prepare the gzipped data stream!", e); - } - - SerializationUtil.transformToThriftUsingStream(msg, gzippedURIs); - try { - gzippedURIs.finish(); - gzippedURIs.flush(); - return baos.toByteArray(); - } catch (IOException e) { - throw new SerializationException( - "Failed to write the gzipped data stream!", e); - } finally { - try { - gzippedURIs.close(); - } catch (IOException e) { - // ignore, we no longer need the stream - } - } - } -} \ No newline at end of file diff --git a/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/InvalidNotificationConfigException.java b/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/InvalidNotificationConfigException.java new file mode 100644 index 0000000000..e027ca6325 --- /dev/null +++ b/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/InvalidNotificationConfigException.java @@ -0,0 +1,57 @@ +/** + * This software was developed and / or modified by Raytheon Company, + * pursuant to Contract DG133W-05-CQ-1067 with the US Government. + * + * U.S. EXPORT CONTROLLED TECHNICAL DATA + * This software product contains export-restricted data whose + * export/transfer/disclosure is restricted by U.S. law. Dissemination + * to non-U.S. persons whether in the United States or abroad requires + * an export license or other authorization. + * + * Contractor Name: Raytheon Company + * Contractor Address: 6825 Pine Street, Suite 340 + * Mail Stop B8 + * Omaha, NE 68106 + * 402.291.0100 + * + * See the AWIPS II Master Rights File ("Master Rights File.pdf") for + * further licensing information. + */ +package com.raytheon.uf.edex.ingest.notification; + +/** + * Thrown if a configuration is invalid. + * + *
+ * 
+ * SOFTWARE HISTORY
+ * 
+ * Date         Ticket#    Engineer    Description
+ * ------------ ---------- ----------- --------------------------
+ * Nov 19, 2013 2170       rjpeter     Initial creation
+ * 
+ * 
+ * + * @author rjpeter + * @version 1.0 + */ +public class InvalidNotificationConfigException extends Exception { + + private static final long serialVersionUID = 1L; + + public InvalidNotificationConfigException() { + super(); + } + + public InvalidNotificationConfigException(String message, Throwable cause) { + super(message, cause); + } + + public InvalidNotificationConfigException(String message) { + super(message); + } + + public InvalidNotificationConfigException(Throwable cause) { + super(cause); + } +} diff --git a/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/PluginNotifier.java b/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/PluginNotifier.java index aa03dd085b..be3b6b381d 100644 --- a/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/PluginNotifier.java +++ b/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/PluginNotifier.java @@ -19,24 +19,47 @@ **/ package com.raytheon.uf.edex.ingest.notification; -import java.util.ArrayList; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashSet; +import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.Set; -import org.apache.camel.Exchange; -import org.apache.camel.Message; -import org.apache.camel.RecipientList; +import javax.xml.bind.JAXBException; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.Multimap; +import com.raytheon.uf.common.dataplugin.PluginDataObject; +import com.raytheon.uf.common.dataplugin.PluginException; +import com.raytheon.uf.common.dataplugin.annotations.DataURIUtil; +import com.raytheon.uf.common.dataquery.DecisionTree; +import com.raytheon.uf.common.dataquery.requests.RequestConstraint; +import com.raytheon.uf.common.localization.IPathManager; +import com.raytheon.uf.common.localization.LocalizationFile; +import com.raytheon.uf.common.localization.PathManagerFactory; +import com.raytheon.uf.common.localization.exception.LocalizationException; +import com.raytheon.uf.common.serialization.JAXBManager; +import com.raytheon.uf.common.serialization.SerializationException; +import com.raytheon.uf.common.status.IPerformanceStatusHandler; import com.raytheon.uf.common.status.IUFStatusHandler; +import com.raytheon.uf.common.status.PerformanceStatus; import com.raytheon.uf.common.status.UFStatus; import com.raytheon.uf.common.status.UFStatus.Priority; -import com.raytheon.uf.edex.core.EDEXUtil; +import com.raytheon.uf.common.time.util.ITimer; +import com.raytheon.uf.common.time.util.TimeUtil; import com.raytheon.uf.edex.core.EdexException; +import com.raytheon.uf.edex.ingest.notification.PluginNotifierConfig.EndpointType; +import com.raytheon.uf.edex.ingest.notification.PluginNotifierConfig.NotifyFormat; +import com.raytheon.uf.edex.ingest.notification.router.DataUriRouter; +import com.raytheon.uf.edex.ingest.notification.router.INotificationRouter; +import com.raytheon.uf.edex.ingest.notification.router.PdoRouter; /** * Plugins can register routes with this and then be generically fired to. Helps - * to reduce dependencies as we no longer need to call a route directly. + * to reduce dependencies as we no longer need to call a route directly. All + * registration must occur before messages are being picked up. Otherwise + * concurrency problems may occur. * *
  * 
@@ -44,8 +67,9 @@ import com.raytheon.uf.edex.core.EdexException;
  * 
  * Date         Ticket#    Engineer    Description
  * ------------ ---------- ----------- --------------------------
- * Jun 13, 2013            mnash     Initial creation
- * 
+ * Jun 13, 2013            mnash       Initial creation.
+ * Nov 19, 2013 2170       rjpeter     Add plugin contributed config files, filtering,
+ *                                     and support for pdo vs datauri.
  * 
* * @author mnash @@ -53,55 +77,311 @@ import com.raytheon.uf.edex.core.EdexException; */ public class PluginNotifier { - private static final IUFStatusHandler theHandler = UFStatus .getHandler(PluginNotifier.class); - private static final Multimap routes = ArrayListMultimap - .create(); + private final IPerformanceStatusHandler perfLog = PerformanceStatus + .getHandler("Notification:"); - private static final PluginNotifier distributor = new PluginNotifier(); - - private PluginNotifier() { - } - - public PluginNotifier getInstance() { - return distributor; - } + private static final int DEFAULT_TIME_TO_LIVE = 300000; /** - * Normally called from Spring, this allows a set of commands to be - * registered for a specific plugin. - * - * @param name - * @param command - * @return + * Decision tree for plugin notification. */ - public PluginNotifier register(String name, String command) { - routes.put(name, command); - return this; - } + private final DecisionTree tree = new DecisionTree(); + + private final List receiveAllRoutes = new LinkedList(); + + private final List filteredRoutes = new LinkedList(); /** - * Takes the pluginName, which is what was registered for, and sends to all - * the routes that are registered as that. - * - * @param exchange - * @return + * Set of loaded names. Used for duplicate detection. */ - @RecipientList - public List send(Exchange exchange) { - Message in = exchange.getIn(); - String name = (String) in.getHeader("pluginName"); - List list = new ArrayList(); - for (String route : routes.get(name)) { + private final Set loadedNames = new HashSet(); + + public PluginNotifier() throws JAXBException { + loadConfigurations(); + } + + private synchronized void loadConfigurations() throws JAXBException { + JAXBManager mgr = new JAXBManager(PluginNotifierConfigList.class, + PluginNotifierConfig.class); + + IPathManager pathMgr = PathManagerFactory.getPathManager(); + LocalizationFile[] files = pathMgr.listStaticFiles("notification", + new String[] { ".xml" }, false, true); + for (LocalizationFile lf : files) { try { - EDEXUtil.getMessageProducer().sendAsyncUri(route, in.getBody()); - } catch (EdexException e) { + File f = lf.getFile(true); + if (f.length() > 0) { + // empty files may be used to override base files to remove + // functionality + InputStream is = null; + + try { + is = lf.openInputStream(); + + PluginNotifierConfigList confList = (PluginNotifierConfigList) mgr + .unmarshalFromInputStream(is); + List configs = confList + .getNotificationConfigs(); + if ((configs != null) && !configs.isEmpty()) { + for (PluginNotifierConfig conf : configs) { + register(conf, false); + } + } + } catch (SerializationException e) { + theHandler.handle(Priority.PROBLEM, + "Unable to deserialize " + f.getPath(), e); + } catch (InvalidNotificationConfigException e) { + theHandler.handle( + Priority.PROBLEM, + "Unable to load plugin configuration " + + f.getPath(), e); + } finally { + if (is != null) { + try { + is.close(); + } catch (IOException e) { + // ignore + } + } + } + } + } catch (LocalizationException e) { theHandler.handle(Priority.PROBLEM, - "Unable to send message to " + route, e); + "Error occurred accessing file: " + lf, e); + } + } + + rebuildTree(); + + } + + /** + * Register the given PluginNotifierConfig. + * + * @param config + * @return + */ + public synchronized void register(PluginNotifierConfig config) + throws InvalidNotificationConfigException { + register(config, true); + } + + /** + * Register the given PluginNotifierConfig. + * + * @param config + * @param rebuildTree + * Whether or not to rebuild the internal tree. If many things + * are being registered can improve performance to only build + * tree once at the end. + * @return + */ + public synchronized void register(PluginNotifierConfig config, + boolean rebuildTree) throws InvalidNotificationConfigException { + validate(config); + + INotificationRouter router = null; + switch (config.getFormat()) { + case DATAURI: + router = new DataUriRouter(config); + break; + case PDO: + router = new PdoRouter(config); + break; + default: + throw new InvalidNotificationConfigException( + "No INotificationRouter registered for format: " + + config.getFormat()); + } + + Map[] metadataMaps = config.getMetadataMap(); + boolean receiveAll = (metadataMaps == null) + || (metadataMaps.length == 0) + || ((metadataMaps.length == 1) && ((metadataMaps[0] == null) || metadataMaps[0] + .isEmpty())); + + if (receiveAll) { + // null or empty constraint map implies receive all data + receiveAllRoutes.add(router); + } else { + filteredRoutes.add(router); + for (Map metadataMap : metadataMaps) { + tree.insertCriteria(metadataMap, router); + } + + if (rebuildTree) { + tree.rebuildTree(); + } + } + + loadedNames.add(config.getEndpointName()); + } + + /** + * Validate the passed config + * + * @param config + * @return + */ + private void validate(PluginNotifierConfig config) + throws InvalidNotificationConfigException { + String endpoint = config.getEndpointName(); + if ((endpoint == null) || (endpoint.trim().length() == 0)) { + throw new InvalidNotificationConfigException( + "endpointName is required"); + } + + if (loadedNames.contains(endpoint)) { + throw new InvalidNotificationConfigException("PluginConfiguration " + + endpoint + ": endpointName is already in use"); + } + + EndpointType type = config.getEndpointType(); + if (type == null) { + StringBuilder msg = new StringBuilder(180); + msg.append("PluginConfiguration ") + .append(endpoint) + .append(": missing required field endpointType. Valid values for ") + .append(NotifyFormat.PDO).append(" format are ") + .append(EndpointType.DIRECTVM).append(" and ") + .append(EndpointType.VM).append(". Valid values for ") + .append(NotifyFormat.DATAURI).append(" format are ") + .append(EndpointType.DIRECTVM).append(", ") + .append(EndpointType.VM).append(", ") + .append(EndpointType.QUEUE).append(", and ") + .append(EndpointType.TOPIC); + throw new InvalidNotificationConfigException(msg.toString()); + } + + NotifyFormat format = config.getFormat(); + if (NotifyFormat.PDO.equals(format) + && (EndpointType.QUEUE.equals(type) || EndpointType.TOPIC + .equals(type))) { + StringBuilder msg = new StringBuilder(120); + msg.append("PluginConfiguration ").append(endpoint) + .append(": endpointType ").append(type) + .append(" is invalid for format ").append(format) + .append(". Valid values for ").append(NotifyFormat.PDO) + .append(" format are ").append(EndpointType.DIRECTVM) + .append(" and ").append(EndpointType.VM); + throw new InvalidNotificationConfigException(msg.toString()); + } + + if (!EndpointType.QUEUE.equals(type) && config.isDurable()) { + theHandler.warn("PluginConfiguration: " + endpoint + + " durable setting only valid on QUEUE type endpoints"); + } + + if ((EndpointType.QUEUE.equals(type) || EndpointType.TOPIC.equals(type)) + && (config.getTimeToLive() < 0)) { + theHandler + .warn("PluginConfiguration: " + + endpoint + + " has invalid time to live. Time to live for JMS endpoints must be 0 or greater. Setting to default of: " + + DEFAULT_TIME_TO_LIVE + " ms"); + config.setTimeToLive(DEFAULT_TIME_TO_LIVE); + } + } + + /** + * Rebuild the tree based on all register'd configurations. + */ + public void rebuildTree() { + tree.rebuildTree(); + } + + /** + * Checks the pdo's against the registered routes. Data will then be + * transformed and queued or sent immediately depending on configuration. + * + * @param pdos + * @return + */ + public void notify(PluginDataObject... pdos) { + if ((pdos != null) && (pdos.length > 0)) { + ITimer timer = TimeUtil.getTimer(); + timer.start(); + if (!receiveAllRoutes.isEmpty()) { + for (PluginDataObject pdo : pdos) { + for (INotificationRouter router : receiveAllRoutes) { + router.process(pdo); + } + } + + for (INotificationRouter router : receiveAllRoutes) { + try { + router.sendImmediateData(); + } catch (EdexException e) { + theHandler.handle( + Priority.PROBLEM, + "Unable to send notification data to " + + router.getRoute(), e); + } + } + } + + if (!filteredRoutes.isEmpty()) { + Set routesWithData = new HashSet(); + for (PluginDataObject pdo : pdos) { + try { + List routers = tree + .searchTree(DataURIUtil.createDataURIMap(pdo)); + for (INotificationRouter router : routers) { + router.process(pdo); + routesWithData.add(router); + } + } catch (PluginException e) { + theHandler.handle(Priority.PROBLEM, + e.getLocalizedMessage(), e); + } + } + + for (INotificationRouter router : routesWithData) { + try { + router.sendImmediateData(); + } catch (EdexException e) { + theHandler.handle( + Priority.PROBLEM, + "Unable to send notification data to " + + router.getRoute(), e); + } + } + } + timer.stop(); + perfLog.logDuration("Processed " + pdos.length + " pdos", + timer.getElapsedTime()); + } + } + + /** + * Send the queued notifications. + * + * @return + */ + public void sendQueuedNotifications() { + for (INotificationRouter router : receiveAllRoutes) { + try { + router.sendQueuedData(); + } catch (EdexException e) { + theHandler.handle( + Priority.PROBLEM, + "Unable to send notification data to " + + router.getRoute(), e); + } + } + + for (INotificationRouter router : filteredRoutes) { + try { + router.sendQueuedData(); + } catch (EdexException e) { + theHandler.handle( + Priority.PROBLEM, + "Unable to send notification data to " + + router.getRoute(), e); } } - return list; } } diff --git a/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/PluginNotifierConfig.java b/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/PluginNotifierConfig.java new file mode 100644 index 0000000000..ba2097bc2d --- /dev/null +++ b/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/PluginNotifierConfig.java @@ -0,0 +1,224 @@ +/** + * This software was developed and / or modified by Raytheon Company, + * pursuant to Contract DG133W-05-CQ-1067 with the US Government. + * + * U.S. EXPORT CONTROLLED TECHNICAL DATA + * This software product contains export-restricted data whose + * export/transfer/disclosure is restricted by U.S. law. Dissemination + * to non-U.S. persons whether in the United States or abroad requires + * an export license or other authorization. + * + * Contractor Name: Raytheon Company + * Contractor Address: 6825 Pine Street, Suite 340 + * Mail Stop B8 + * Omaha, NE 68106 + * 402.291.0100 + * + * See the AWIPS II Master Rights File ("Master Rights File.pdf") for + * further licensing information. + **/ +package com.raytheon.uf.edex.ingest.notification; + +import java.util.HashMap; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; + +import com.raytheon.uf.common.dataquery.requests.RequestConstraint; +import com.raytheon.uf.common.dataquery.requests.RequestableMetadataMarshaller; + +/** + * Configuration object for plugin notification. An empty or null metadataMap + * implies it should receive all data. + * + *
+ * 
+ * SOFTWARE HISTORY
+ * 
+ * Date         Ticket#    Engineer    Description
+ * ------------ ---------- ----------- --------------------------
+ * Nov 19, 2013 2170       rjpeter     Initial creation
+ * 
+ * 
+ * + * @author rjpeter + * @version 1.0 + */ +@XmlAccessorType(XmlAccessType.NONE) +public class PluginNotifierConfig { + + public static enum EndpointType { + QUEUE, TOPIC, VM, DIRECTVM + } + + public static enum NotifyFormat { + DATAURI, PDO + } + + private static final String vmPrefix = "vm:"; + + private static final String directvmPrefix = "direct-vm:"; + + private static final String persistentJmsPrefix = "jms-durable:"; + + private static final String transientJmsPrefix = "jms-generic:"; + + @XmlElement + protected String endpointName; + + @XmlElement + protected EndpointType endpointType; + + @XmlElement(required = true) + protected NotifyFormat format; + + /** + * Time to live for JMS type endpoints in milliseconds. + */ + @XmlElement + protected int timeToLive = -1; + + /** + * If the JMS message is being sent to a durable endpoint. + */ + @XmlElement + protected boolean durable; + + /** + * the metadata criteria to retrieve the resource + */ + @XmlElement + @XmlJavaTypeAdapter(value = RequestableMetadataMarshaller.class) + protected HashMap[] metadataMap; + + protected transient String endpointUri; + + public String getEndpointName() { + return endpointName; + } + + public void setEndpointName(String endpointName) { + this.endpointName = endpointName; + } + + public EndpointType getEndpointType() { + return endpointType; + } + + public void setEndpointType(EndpointType endpointType) { + this.endpointType = endpointType; + } + + public NotifyFormat getFormat() { + return format; + } + + public void setFormat(NotifyFormat format) { + this.format = format; + } + + public int getTimeToLive() { + return timeToLive; + } + + public void setTimeToLive(int timeToLive) { + this.timeToLive = timeToLive; + } + + public boolean isDurable() { + return durable; + } + + public void setDurable(boolean durable) { + this.durable = durable; + } + + public HashMap[] getMetadataMap() { + return metadataMap; + } + + public void setMetadataMap(HashMap[] metadataMap) { + this.metadataMap = metadataMap; + } + + /** + * Generates and returns the endpoint uri for this configuration. + * + * @return + */ + public String getEndpointUri() { + if (endpointUri == null) { + StringBuilder builder = new StringBuilder(64); + switch (endpointType) { + case DIRECTVM: + builder.append(directvmPrefix); + case VM: + builder.append(vmPrefix); + break; + case QUEUE: + case TOPIC: + if (durable) { + builder.append(persistentJmsPrefix); + } else { + builder.append(transientJmsPrefix); + } + + builder.append(endpointType.name().toLowerCase()).append(':'); + break; + } + + builder.append(endpointName); + + // for jms endpoints add time to live field + if ((timeToLive > 0) + && (EndpointType.QUEUE.equals(endpointType) || EndpointType.TOPIC + .equals(endpointType))) { + // append time to live in milliseconds + builder.append("?timeToLive=").append(timeToLive); + } + + endpointUri = builder.toString(); + } + + return endpointUri; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = (prime * result) + + ((endpointName == null) ? 0 : endpointName.hashCode()); + result = (prime * result) + + ((endpointType == null) ? 0 : endpointType.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + PluginNotifierConfig other = (PluginNotifierConfig) obj; + if (endpointName == null) { + if (other.endpointName != null) { + return false; + } + } else if (!endpointName.equals(other.endpointName)) { + return false; + } + if (endpointType != other.endpointType) { + return false; + } + return true; + } + +} diff --git a/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/PluginNotifierConfigList.java b/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/PluginNotifierConfigList.java new file mode 100644 index 0000000000..c7c130baae --- /dev/null +++ b/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/PluginNotifierConfigList.java @@ -0,0 +1,60 @@ +/** + * This software was developed and / or modified by Raytheon Company, + * pursuant to Contract DG133W-05-CQ-1067 with the US Government. + * + * U.S. EXPORT CONTROLLED TECHNICAL DATA + * This software product contains export-restricted data whose + * export/transfer/disclosure is restricted by U.S. law. Dissemination + * to non-U.S. persons whether in the United States or abroad requires + * an export license or other authorization. + * + * Contractor Name: Raytheon Company + * Contractor Address: 6825 Pine Street, Suite 340 + * Mail Stop B8 + * Omaha, NE 68106 + * 402.291.0100 + * + * See the AWIPS II Master Rights File ("Master Rights File.pdf") for + * further licensing information. + */ +package com.raytheon.uf.edex.ingest.notification; + +import java.util.List; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlElements; +import javax.xml.bind.annotation.XmlRootElement; + +/** + * List of Plugin Notification Configurations. + * + *
+ * 
+ * SOFTWARE HISTORY
+ * 
+ * Date         Ticket#    Engineer    Description
+ * ------------ ---------- ----------- --------------------------
+ * Nov 19, 2013 2170      rjpeter     Initial creation
+ * 
+ * 
+ * + * @author rjpeter + * @version 1.0 + */ +@XmlRootElement(name = "pluginNotificationList") +@XmlAccessorType(XmlAccessType.NONE) +public class PluginNotifierConfigList { + @XmlElements({ @XmlElement(name = "pluginNotification") }) + private List notificationConfigs; + + public List getNotificationConfigs() { + return notificationConfigs; + } + + public void setNotificationConfigs( + List notificationConfigs) { + this.notificationConfigs = notificationConfigs; + } +} diff --git a/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/ToDataURI.java b/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/ToDataURI.java index 8261d6b071..04f5fc96d6 100644 --- a/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/ToDataURI.java +++ b/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/ToDataURI.java @@ -20,6 +20,7 @@ package com.raytheon.uf.edex.ingest.notification; import com.raytheon.uf.common.dataplugin.PluginDataObject; +import com.raytheon.uf.common.dataplugin.message.PracticeDataURINotificationMessage; /** * Converts PluginDataObjects or arrays of PluginDataObjects into their dataURIs @@ -29,8 +30,8 @@ import com.raytheon.uf.common.dataplugin.PluginDataObject; * SOFTWARE HISTORY * Date Ticket# Engineer Description * ------------ ---------- ----------- -------------------------- - * Nov 25, 2008 chammack Initial creation - * + * Nov 25, 2008 chammack Initial creation + * Nov 19, 2013 2170 rjpeter Added toPracticeNotificationMsg. * * * @author chammack @@ -51,4 +52,12 @@ public class ToDataURI { public String toDataURI(PluginDataObject pdo) { return pdo.getDataURI(); } + + public PracticeDataURINotificationMessage toPracticeNotificationMsg( + PluginDataObject[] pdos) { + String[] uris = toDataURI(pdos); + PracticeDataURINotificationMessage rval = new PracticeDataURINotificationMessage(); + rval.setDataURIs(uris); + return rval; + } } diff --git a/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/router/DataUriRouter.java b/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/router/DataUriRouter.java new file mode 100644 index 0000000000..d76e6176e7 --- /dev/null +++ b/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/router/DataUriRouter.java @@ -0,0 +1,186 @@ +/** + * This software was developed and / or modified by Raytheon Company, + * pursuant to Contract DG133W-05-CQ-1067 with the US Government. + * + * U.S. EXPORT CONTROLLED TECHNICAL DATA + * This software product contains export-restricted data whose + * export/transfer/disclosure is restricted by U.S. law. Dissemination + * to non-U.S. persons whether in the United States or abroad requires + * an export license or other authorization. + * + * Contractor Name: Raytheon Company + * Contractor Address: 6825 Pine Street, Suite 340 + * Mail Stop B8 + * Omaha, NE 68106 + * 402.291.0100 + * + * See the AWIPS II Master Rights File ("Master Rights File.pdf") for + * further licensing information. + **/ +package com.raytheon.uf.edex.ingest.notification.router; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.zip.GZIPOutputStream; + +import com.raytheon.uf.common.dataplugin.PluginDataObject; +import com.raytheon.uf.common.dataplugin.message.DataURINotificationMessage; +import com.raytheon.uf.common.serialization.SerializationException; +import com.raytheon.uf.common.serialization.SerializationUtil; +import com.raytheon.uf.common.util.ByteArrayOutputStreamPool; +import com.raytheon.uf.edex.core.EDEXUtil; +import com.raytheon.uf.edex.core.EdexException; +import com.raytheon.uf.edex.ingest.notification.PluginNotifierConfig; +import com.raytheon.uf.edex.ingest.notification.PluginNotifierConfig.EndpointType; + +/** + * Routes DataUri Notifications to a destination uri. Notification msg will be + * sent immediately to the routes for inner jvm calls, routes over jms will be + * queued and sent in gzipped batches. + * + *
+ * 
+ * SOFTWARE HISTORY
+ * 
+ * Date         Ticket#    Engineer    Description
+ * ------------ ---------- ----------- --------------------------
+ * Nov 19, 2013 2170       rjpeter     Initial creation
+ * 
+ * 
+ * + * @author rjpeter + * @version 1.0 + */ +public class DataUriRouter implements INotificationRouter { + + /** + * Buffer size. + */ + private static final int GZIP_BUFFER_SIZE = 4096; + + /** + * Data URIs that have not been sent. + */ + private final ConcurrentLinkedQueue uris = new ConcurrentLinkedQueue(); + + /** + * Flag if this route stays in the jvm. + */ + private final boolean isInternal; + + /** + * The destination URI for this router. + */ + private final String route; + + /** + * + * @param config + */ + public DataUriRouter(PluginNotifierConfig config) { + EndpointType type = config.getEndpointType(); + isInternal = EndpointType.DIRECTVM.equals(type) + || EndpointType.VM.equals(type); + route = config.getEndpointUri(); + } + + @Override + public String getRoute() { + return route; + } + + @Override + public void process(PluginDataObject pdo) { + uris.add(pdo.getDataURI()); + } + + /** + * Creates a DataURINotificationMessage. + * + * @return + */ + protected synchronized DataURINotificationMessage createMessage() { + DataURINotificationMessage msg = null; + // this is the only point that uris is reduced, safe to grab current + // size and dequeue that many items + int size = uris.size(); + if (size > 0) { + String[] data = new String[size]; + for (int i = 0; i < size; i++) { + data[i] = uris.poll(); + } + + msg = new DataURINotificationMessage(); + msg.setDataURIs(data); + } + + return msg; + } + + @Override + public void sendImmediateData() throws EdexException { + // if sending inside jvm, create message and send immediately as the + // memory object + if (isInternal) { + DataURINotificationMessage msg = createMessage(); + if (msg != null) { + EDEXUtil.getMessageProducer().sendAsyncUri(route, msg); + } + } + } + + @Override + public void sendQueuedData() throws EdexException { + if (!isInternal) { + // if sending outside the jvm, create message, serialize, gzip, and + // then send + DataURINotificationMessage msg = createMessage(); + if (msg != null) { + EDEXUtil.getMessageProducer().sendAsyncUri(route, + encodeMessage(msg)); + } + } + } + + /** + * Thrift encodes an object and then gzip's the binary data. Should only be + * used for sending data outside the jvm. + * + * @param msg + * @return + * @throws SerializationException + */ + private byte[] encodeMessage(DataURINotificationMessage msg) + throws EdexException { + ByteArrayOutputStream baos = ByteArrayOutputStreamPool.getInstance() + .getStream(); + GZIPOutputStream gzippedURIs = null; + + try { + gzippedURIs = new GZIPOutputStream(baos, GZIP_BUFFER_SIZE, true); + } catch (IOException e) { + throw new EdexException( + "Failed to prepare the gzipped data stream", e); + } + + try { + SerializationUtil.transformToThriftUsingStream(msg, gzippedURIs); + gzippedURIs.finish(); + gzippedURIs.flush(); + return baos.toByteArray(); + } catch (IOException e) { + throw new EdexException("Failed to write the gzipped data stream", + e); + } catch (SerializationException e) { + throw new EdexException( + "Failed to serialize DataURINotificationMessage", e); + } finally { + try { + gzippedURIs.close(); + } catch (IOException e) { + // ignore, we no longer need the stream + } + } + } +} diff --git a/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/router/INotificationRouter.java b/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/router/INotificationRouter.java new file mode 100644 index 0000000000..19e46ffec9 --- /dev/null +++ b/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/router/INotificationRouter.java @@ -0,0 +1,76 @@ +/** + * This software was developed and / or modified by Raytheon Company, + * pursuant to Contract DG133W-05-CQ-1067 with the US Government. + * + * U.S. EXPORT CONTROLLED TECHNICAL DATA + * This software product contains export-restricted data whose + * export/transfer/disclosure is restricted by U.S. law. Dissemination + * to non-U.S. persons whether in the United States or abroad requires + * an export license or other authorization. + * + * Contractor Name: Raytheon Company + * Contractor Address: 6825 Pine Street, Suite 340 + * Mail Stop B8 + * Omaha, NE 68106 + * 402.291.0100 + * + * See the AWIPS II Master Rights File ("Master Rights File.pdf") for + * further licensing information. + **/ +package com.raytheon.uf.edex.ingest.notification.router; + +import com.raytheon.uf.common.dataplugin.PluginDataObject; +import com.raytheon.uf.edex.core.EdexException; + +/** + * Notification router for a specific destination and format. + * + *
+ * 
+ * SOFTWARE HISTORY
+ * 
+ * Date         Ticket#    Engineer    Description
+ * ------------ ---------- ----------- --------------------------
+ * Nov 19, 2013 2170       rjpeter     Initial creation
+ * 
+ * 
+ * + * @author rjpeter + * @version 1.0 + */ +public interface INotificationRouter { + /** + * The destination for this router. + * + * @return + */ + public String getRoute(); + + /** + * Process the pdo into notification format. + * + * @param pdo + */ + public void process(PluginDataObject pdo); + + /** + * Send data that should be sent immediately to route. This is mainly used + * for PDO data that we don't want to build up and send as a bulk message. + * Future enhancement could allow for priority data such as warnings and + * radar data to always be notified immediately instead of a timer driver + * basis. + * + * @throws EdexException + */ + public void sendImmediateData() throws EdexException; + + /** + * Send any queued data to route. Generally used for data uri notifications + * being sent over JMS to allow for better bundling. The data is queued in + * memory and sent out as a bundled message. The interval is defined by the + * notification timer in persist-ingest.xml. + * + * @throws EdexException + */ + public void sendQueuedData() throws EdexException; +} diff --git a/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/router/PdoRouter.java b/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/router/PdoRouter.java new file mode 100644 index 0000000000..3a6d941fc8 --- /dev/null +++ b/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/router/PdoRouter.java @@ -0,0 +1,94 @@ +/** + * This software was developed and / or modified by Raytheon Company, + * pursuant to Contract DG133W-05-CQ-1067 with the US Government. + * + * U.S. EXPORT CONTROLLED TECHNICAL DATA + * This software product contains export-restricted data whose + * export/transfer/disclosure is restricted by U.S. law. Dissemination + * to non-U.S. persons whether in the United States or abroad requires + * an export license or other authorization. + * + * Contractor Name: Raytheon Company + * Contractor Address: 6825 Pine Street, Suite 340 + * Mail Stop B8 + * Omaha, NE 68106 + * 402.291.0100 + * + * See the AWIPS II Master Rights File ("Master Rights File.pdf") for + * further licensing information. + **/ +package com.raytheon.uf.edex.ingest.notification.router; + +import java.util.LinkedList; +import java.util.List; + +import com.raytheon.uf.common.dataplugin.PluginDataObject; +import com.raytheon.uf.edex.core.EDEXUtil; +import com.raytheon.uf.edex.core.EdexException; +import com.raytheon.uf.edex.ingest.notification.PluginNotifierConfig; + +/** + * Routes pdos to a destination. Should only be used for destination inside the + * jvm. + * + *
+ * 
+ * SOFTWARE HISTORY
+ * 
+ * Date         Ticket#    Engineer    Description
+ * ------------ ---------- ----------- --------------------------
+ * Nov 19, 2013 2170       rjpeter     Initial creation
+ * 
+ * 
+ * + * @author rjpeter + * @version 1.0 + */ +public class PdoRouter implements INotificationRouter { + + /** + * Destination URI for this router. + */ + private final String route; + + /** + * PDO's in use by current thread. Since all PDO data is sent immediately + * handles concurrency via thread local instance. + */ + private final ThreadLocal> myPdos = new ThreadLocal>() { + + @Override + protected List initialValue() { + return new LinkedList(); + } + + }; + + public PdoRouter(PluginNotifierConfig config) { + this.route = config.getEndpointUri(); + } + + @Override + public String getRoute() { + return route; + } + + @Override + public void process(PluginDataObject pdo) { + myPdos.get().add(pdo); + } + + @Override + public void sendImmediateData() throws EdexException { + List pdos = myPdos.get(); + myPdos.remove(); + if (pdos.size() > 0) { + EDEXUtil.getMessageProducer().sendAsyncUri(route, pdos); + } + } + + @Override + public void sendQueuedData() throws EdexException { + // NOOP all data sent immediately + } +} diff --git a/edexOsgi/com.raytheon.uf.edex.ingest/utility/common_static/base/notification/edex-alerts.xml b/edexOsgi/com.raytheon.uf.edex.ingest/utility/common_static/base/notification/edex-alerts.xml new file mode 100644 index 0000000000..80c7e73127 --- /dev/null +++ b/edexOsgi/com.raytheon.uf.edex.ingest/utility/common_static/base/notification/edex-alerts.xml @@ -0,0 +1,11 @@ + + + edex.alerts + DATAURI + TOPIC + false + 120000 + + + + diff --git a/edexOsgi/com.raytheon.uf.edex.ohd/res/spring/satpre-spring.xml b/edexOsgi/com.raytheon.uf.edex.ohd/res/spring/satpre-spring.xml index cd47724608..41b88edc54 100644 --- a/edexOsgi/com.raytheon.uf.edex.ohd/res/spring/satpre-spring.xml +++ b/edexOsgi/com.raytheon.uf.edex.ohd/res/spring/satpre-spring.xml @@ -12,7 +12,8 @@ errorHandlerRef="errorHandler" autoStartup="false"> - + + diff --git a/edexOsgi/com.raytheon.uf.edex.ohd/utility/common_static/base/notification/satpre-ohd.xml b/edexOsgi/com.raytheon.uf.edex.ohd/utility/common_static/base/notification/satpre-ohd.xml new file mode 100644 index 0000000000..b62aa9f838 --- /dev/null +++ b/edexOsgi/com.raytheon.uf.edex.ohd/utility/common_static/base/notification/satpre-ohd.xml @@ -0,0 +1,17 @@ + + + satPreFilter + DATAURI + QUEUE + true + 600000 + + + + + + + + + + diff --git a/edexOsgi/com.raytheon.uf.edex.plugin.scan/utility/common_static/base/notification/scan.xml b/edexOsgi/com.raytheon.uf.edex.plugin.scan/utility/common_static/base/notification/scan.xml new file mode 100644 index 0000000000..c5cd721ab2 --- /dev/null +++ b/edexOsgi/com.raytheon.uf.edex.plugin.scan/utility/common_static/base/notification/scan.xml @@ -0,0 +1,15 @@ + + + + scanCpgsrvFiltering + DATAURI + QUEUE + true + 120000 + + + + + + +