();
-
- /**
- * Add data uris to the queue
- *
- * @param uris
- */
- public void addDataUris(String[] uris) {
- synchronized (this) {
- for (String uri : uris) {
- if (uri != null) {
- dataUris.add(uri);
- }
- }
- }
- }
-
- /**
- * Filters quartz messages to only send a message if there are URIs to send.
- *
- * @param obj
- * @return
- */
- public boolean hasUris(Object obj) {
- synchronized (this) {
- return dataUris.size() > 0;
- }
- }
-
- /**
- * Send off the currently queued uris
- *
- * @return
- */
- public byte[] sendQueuedUris() throws SerializationException {
- DataURINotificationMessage msg = null;
- synchronized (this) {
- String[] uris = dataUris.toArray(new String[dataUris.size()]);
- dataUris.clear();
- msg = new DataURINotificationMessage();
- msg.setDataURIs(uris);
- }
-
- return this.encodeMessage(msg);
- }
-
- /**
- * Send off the currently queued uris
- *
- * @return
- */
- public byte[] sendPracticeQueuedUris() throws SerializationException {
- PracticeDataURINotificationMessage msg = null;
- synchronized (this) {
- String[] uris = dataUris.toArray(new String[dataUris.size()]);
- dataUris.clear();
- msg = new PracticeDataURINotificationMessage();
- msg.setDataURIs(uris);
- }
-
- return this.encodeMessage(msg);
- }
-
- public byte[] encodeMessage(ISerializableObject msg)
- throws SerializationException {
- ByteArrayOutputStream baos = ByteArrayOutputStreamPool.getInstance()
- .getStream();
- GZIPOutputStream gzippedURIs = null;
-
- try {
- gzippedURIs = new GZIPOutputStream(baos);
- } catch (IOException e) {
- throw new SerializationException(
- "Failed to prepare the gzipped data stream!", e);
- }
-
- SerializationUtil.transformToThriftUsingStream(msg, gzippedURIs);
- try {
- gzippedURIs.finish();
- gzippedURIs.flush();
- return baos.toByteArray();
- } catch (IOException e) {
- throw new SerializationException(
- "Failed to write the gzipped data stream!", e);
- } finally {
- try {
- gzippedURIs.close();
- } catch (IOException e) {
- // ignore, we no longer need the stream
- }
- }
- }
-}
\ No newline at end of file
diff --git a/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/InvalidNotificationConfigException.java b/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/InvalidNotificationConfigException.java
new file mode 100644
index 0000000000..e027ca6325
--- /dev/null
+++ b/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/InvalidNotificationConfigException.java
@@ -0,0 +1,57 @@
+/**
+ * This software was developed and / or modified by Raytheon Company,
+ * pursuant to Contract DG133W-05-CQ-1067 with the US Government.
+ *
+ * U.S. EXPORT CONTROLLED TECHNICAL DATA
+ * This software product contains export-restricted data whose
+ * export/transfer/disclosure is restricted by U.S. law. Dissemination
+ * to non-U.S. persons whether in the United States or abroad requires
+ * an export license or other authorization.
+ *
+ * Contractor Name: Raytheon Company
+ * Contractor Address: 6825 Pine Street, Suite 340
+ * Mail Stop B8
+ * Omaha, NE 68106
+ * 402.291.0100
+ *
+ * See the AWIPS II Master Rights File ("Master Rights File.pdf") for
+ * further licensing information.
+ */
+package com.raytheon.uf.edex.ingest.notification;
+
+/**
+ * Thrown if a configuration is invalid.
+ *
+ *
+ *
+ * SOFTWARE HISTORY
+ *
+ * Date Ticket# Engineer Description
+ * ------------ ---------- ----------- --------------------------
+ * Nov 19, 2013 2170 rjpeter Initial creation
+ *
+ *
+ *
+ * @author rjpeter
+ * @version 1.0
+ */
+public class InvalidNotificationConfigException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ public InvalidNotificationConfigException() {
+ super();
+ }
+
+ public InvalidNotificationConfigException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public InvalidNotificationConfigException(String message) {
+ super(message);
+ }
+
+ public InvalidNotificationConfigException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/PluginNotifier.java b/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/PluginNotifier.java
index aa03dd085b..be3b6b381d 100644
--- a/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/PluginNotifier.java
+++ b/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/PluginNotifier.java
@@ -19,24 +19,47 @@
**/
package com.raytheon.uf.edex.ingest.notification;
-import java.util.ArrayList;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.RecipientList;
+import javax.xml.bind.JAXBException;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
+import com.raytheon.uf.common.dataplugin.PluginDataObject;
+import com.raytheon.uf.common.dataplugin.PluginException;
+import com.raytheon.uf.common.dataplugin.annotations.DataURIUtil;
+import com.raytheon.uf.common.dataquery.DecisionTree;
+import com.raytheon.uf.common.dataquery.requests.RequestConstraint;
+import com.raytheon.uf.common.localization.IPathManager;
+import com.raytheon.uf.common.localization.LocalizationFile;
+import com.raytheon.uf.common.localization.PathManagerFactory;
+import com.raytheon.uf.common.localization.exception.LocalizationException;
+import com.raytheon.uf.common.serialization.JAXBManager;
+import com.raytheon.uf.common.serialization.SerializationException;
+import com.raytheon.uf.common.status.IPerformanceStatusHandler;
import com.raytheon.uf.common.status.IUFStatusHandler;
+import com.raytheon.uf.common.status.PerformanceStatus;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
-import com.raytheon.uf.edex.core.EDEXUtil;
+import com.raytheon.uf.common.time.util.ITimer;
+import com.raytheon.uf.common.time.util.TimeUtil;
import com.raytheon.uf.edex.core.EdexException;
+import com.raytheon.uf.edex.ingest.notification.PluginNotifierConfig.EndpointType;
+import com.raytheon.uf.edex.ingest.notification.PluginNotifierConfig.NotifyFormat;
+import com.raytheon.uf.edex.ingest.notification.router.DataUriRouter;
+import com.raytheon.uf.edex.ingest.notification.router.INotificationRouter;
+import com.raytheon.uf.edex.ingest.notification.router.PdoRouter;
/**
* Plugins can register routes with this and then be generically fired to. Helps
- * to reduce dependencies as we no longer need to call a route directly.
+ * to reduce dependencies as we no longer need to call a route directly. All
+ * registration must occur before messages are being picked up. Otherwise
+ * concurrency problems may occur.
*
*
*
@@ -44,8 +67,9 @@ import com.raytheon.uf.edex.core.EdexException;
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
- * Jun 13, 2013 mnash Initial creation
- *
+ * Jun 13, 2013 mnash Initial creation.
+ * Nov 19, 2013 2170 rjpeter Add plugin contributed config files, filtering,
+ * and support for pdo vs datauri.
*
*
* @author mnash
@@ -53,55 +77,311 @@ import com.raytheon.uf.edex.core.EdexException;
*/
public class PluginNotifier {
-
private static final IUFStatusHandler theHandler = UFStatus
.getHandler(PluginNotifier.class);
- private static final Multimap routes = ArrayListMultimap
- .create();
+ private final IPerformanceStatusHandler perfLog = PerformanceStatus
+ .getHandler("Notification:");
- private static final PluginNotifier distributor = new PluginNotifier();
-
- private PluginNotifier() {
- }
-
- public PluginNotifier getInstance() {
- return distributor;
- }
+ private static final int DEFAULT_TIME_TO_LIVE = 300000;
/**
- * Normally called from Spring, this allows a set of commands to be
- * registered for a specific plugin.
- *
- * @param name
- * @param command
- * @return
+ * Decision tree for plugin notification.
*/
- public PluginNotifier register(String name, String command) {
- routes.put(name, command);
- return this;
- }
+ private final DecisionTree tree = new DecisionTree();
+
+ private final List receiveAllRoutes = new LinkedList();
+
+ private final List filteredRoutes = new LinkedList();
/**
- * Takes the pluginName, which is what was registered for, and sends to all
- * the routes that are registered as that.
- *
- * @param exchange
- * @return
+ * Set of loaded names. Used for duplicate detection.
*/
- @RecipientList
- public List send(Exchange exchange) {
- Message in = exchange.getIn();
- String name = (String) in.getHeader("pluginName");
- List list = new ArrayList();
- for (String route : routes.get(name)) {
+ private final Set loadedNames = new HashSet();
+
+ public PluginNotifier() throws JAXBException {
+ loadConfigurations();
+ }
+
+ private synchronized void loadConfigurations() throws JAXBException {
+ JAXBManager mgr = new JAXBManager(PluginNotifierConfigList.class,
+ PluginNotifierConfig.class);
+
+ IPathManager pathMgr = PathManagerFactory.getPathManager();
+ LocalizationFile[] files = pathMgr.listStaticFiles("notification",
+ new String[] { ".xml" }, false, true);
+ for (LocalizationFile lf : files) {
try {
- EDEXUtil.getMessageProducer().sendAsyncUri(route, in.getBody());
- } catch (EdexException e) {
+ File f = lf.getFile(true);
+ if (f.length() > 0) {
+ // empty files may be used to override base files to remove
+ // functionality
+ InputStream is = null;
+
+ try {
+ is = lf.openInputStream();
+
+ PluginNotifierConfigList confList = (PluginNotifierConfigList) mgr
+ .unmarshalFromInputStream(is);
+ List configs = confList
+ .getNotificationConfigs();
+ if ((configs != null) && !configs.isEmpty()) {
+ for (PluginNotifierConfig conf : configs) {
+ register(conf, false);
+ }
+ }
+ } catch (SerializationException e) {
+ theHandler.handle(Priority.PROBLEM,
+ "Unable to deserialize " + f.getPath(), e);
+ } catch (InvalidNotificationConfigException e) {
+ theHandler.handle(
+ Priority.PROBLEM,
+ "Unable to load plugin configuration "
+ + f.getPath(), e);
+ } finally {
+ if (is != null) {
+ try {
+ is.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ }
+ }
+ } catch (LocalizationException e) {
theHandler.handle(Priority.PROBLEM,
- "Unable to send message to " + route, e);
+ "Error occurred accessing file: " + lf, e);
+ }
+ }
+
+ rebuildTree();
+
+ }
+
+ /**
+ * Register the given PluginNotifierConfig.
+ *
+ * @param config
+ * @return
+ */
+ public synchronized void register(PluginNotifierConfig config)
+ throws InvalidNotificationConfigException {
+ register(config, true);
+ }
+
+ /**
+ * Register the given PluginNotifierConfig.
+ *
+ * @param config
+ * @param rebuildTree
+ * Whether or not to rebuild the internal tree. If many things
+ * are being registered can improve performance to only build
+ * tree once at the end.
+ * @return
+ */
+ public synchronized void register(PluginNotifierConfig config,
+ boolean rebuildTree) throws InvalidNotificationConfigException {
+ validate(config);
+
+ INotificationRouter router = null;
+ switch (config.getFormat()) {
+ case DATAURI:
+ router = new DataUriRouter(config);
+ break;
+ case PDO:
+ router = new PdoRouter(config);
+ break;
+ default:
+ throw new InvalidNotificationConfigException(
+ "No INotificationRouter registered for format: "
+ + config.getFormat());
+ }
+
+ Map[] metadataMaps = config.getMetadataMap();
+ boolean receiveAll = (metadataMaps == null)
+ || (metadataMaps.length == 0)
+ || ((metadataMaps.length == 1) && ((metadataMaps[0] == null) || metadataMaps[0]
+ .isEmpty()));
+
+ if (receiveAll) {
+ // null or empty constraint map implies receive all data
+ receiveAllRoutes.add(router);
+ } else {
+ filteredRoutes.add(router);
+ for (Map metadataMap : metadataMaps) {
+ tree.insertCriteria(metadataMap, router);
+ }
+
+ if (rebuildTree) {
+ tree.rebuildTree();
+ }
+ }
+
+ loadedNames.add(config.getEndpointName());
+ }
+
+ /**
+ * Validate the passed config
+ *
+ * @param config
+ * @return
+ */
+ private void validate(PluginNotifierConfig config)
+ throws InvalidNotificationConfigException {
+ String endpoint = config.getEndpointName();
+ if ((endpoint == null) || (endpoint.trim().length() == 0)) {
+ throw new InvalidNotificationConfigException(
+ "endpointName is required");
+ }
+
+ if (loadedNames.contains(endpoint)) {
+ throw new InvalidNotificationConfigException("PluginConfiguration "
+ + endpoint + ": endpointName is already in use");
+ }
+
+ EndpointType type = config.getEndpointType();
+ if (type == null) {
+ StringBuilder msg = new StringBuilder(180);
+ msg.append("PluginConfiguration ")
+ .append(endpoint)
+ .append(": missing required field endpointType. Valid values for ")
+ .append(NotifyFormat.PDO).append(" format are ")
+ .append(EndpointType.DIRECTVM).append(" and ")
+ .append(EndpointType.VM).append(". Valid values for ")
+ .append(NotifyFormat.DATAURI).append(" format are ")
+ .append(EndpointType.DIRECTVM).append(", ")
+ .append(EndpointType.VM).append(", ")
+ .append(EndpointType.QUEUE).append(", and ")
+ .append(EndpointType.TOPIC);
+ throw new InvalidNotificationConfigException(msg.toString());
+ }
+
+ NotifyFormat format = config.getFormat();
+ if (NotifyFormat.PDO.equals(format)
+ && (EndpointType.QUEUE.equals(type) || EndpointType.TOPIC
+ .equals(type))) {
+ StringBuilder msg = new StringBuilder(120);
+ msg.append("PluginConfiguration ").append(endpoint)
+ .append(": endpointType ").append(type)
+ .append(" is invalid for format ").append(format)
+ .append(". Valid values for ").append(NotifyFormat.PDO)
+ .append(" format are ").append(EndpointType.DIRECTVM)
+ .append(" and ").append(EndpointType.VM);
+ throw new InvalidNotificationConfigException(msg.toString());
+ }
+
+ if (!EndpointType.QUEUE.equals(type) && config.isDurable()) {
+ theHandler.warn("PluginConfiguration: " + endpoint
+ + " durable setting only valid on QUEUE type endpoints");
+ }
+
+ if ((EndpointType.QUEUE.equals(type) || EndpointType.TOPIC.equals(type))
+ && (config.getTimeToLive() < 0)) {
+ theHandler
+ .warn("PluginConfiguration: "
+ + endpoint
+ + " has invalid time to live. Time to live for JMS endpoints must be 0 or greater. Setting to default of: "
+ + DEFAULT_TIME_TO_LIVE + " ms");
+ config.setTimeToLive(DEFAULT_TIME_TO_LIVE);
+ }
+ }
+
+ /**
+ * Rebuild the tree based on all register'd configurations.
+ */
+ public void rebuildTree() {
+ tree.rebuildTree();
+ }
+
+ /**
+ * Checks the pdo's against the registered routes. Data will then be
+ * transformed and queued or sent immediately depending on configuration.
+ *
+ * @param pdos
+ * @return
+ */
+ public void notify(PluginDataObject... pdos) {
+ if ((pdos != null) && (pdos.length > 0)) {
+ ITimer timer = TimeUtil.getTimer();
+ timer.start();
+ if (!receiveAllRoutes.isEmpty()) {
+ for (PluginDataObject pdo : pdos) {
+ for (INotificationRouter router : receiveAllRoutes) {
+ router.process(pdo);
+ }
+ }
+
+ for (INotificationRouter router : receiveAllRoutes) {
+ try {
+ router.sendImmediateData();
+ } catch (EdexException e) {
+ theHandler.handle(
+ Priority.PROBLEM,
+ "Unable to send notification data to "
+ + router.getRoute(), e);
+ }
+ }
+ }
+
+ if (!filteredRoutes.isEmpty()) {
+ Set routesWithData = new HashSet();
+ for (PluginDataObject pdo : pdos) {
+ try {
+ List routers = tree
+ .searchTree(DataURIUtil.createDataURIMap(pdo));
+ for (INotificationRouter router : routers) {
+ router.process(pdo);
+ routesWithData.add(router);
+ }
+ } catch (PluginException e) {
+ theHandler.handle(Priority.PROBLEM,
+ e.getLocalizedMessage(), e);
+ }
+ }
+
+ for (INotificationRouter router : routesWithData) {
+ try {
+ router.sendImmediateData();
+ } catch (EdexException e) {
+ theHandler.handle(
+ Priority.PROBLEM,
+ "Unable to send notification data to "
+ + router.getRoute(), e);
+ }
+ }
+ }
+ timer.stop();
+ perfLog.logDuration("Processed " + pdos.length + " pdos",
+ timer.getElapsedTime());
+ }
+ }
+
+ /**
+ * Send the queued notifications.
+ *
+ * @return
+ */
+ public void sendQueuedNotifications() {
+ for (INotificationRouter router : receiveAllRoutes) {
+ try {
+ router.sendQueuedData();
+ } catch (EdexException e) {
+ theHandler.handle(
+ Priority.PROBLEM,
+ "Unable to send notification data to "
+ + router.getRoute(), e);
+ }
+ }
+
+ for (INotificationRouter router : filteredRoutes) {
+ try {
+ router.sendQueuedData();
+ } catch (EdexException e) {
+ theHandler.handle(
+ Priority.PROBLEM,
+ "Unable to send notification data to "
+ + router.getRoute(), e);
}
}
- return list;
}
}
diff --git a/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/PluginNotifierConfig.java b/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/PluginNotifierConfig.java
new file mode 100644
index 0000000000..ba2097bc2d
--- /dev/null
+++ b/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/PluginNotifierConfig.java
@@ -0,0 +1,224 @@
+/**
+ * This software was developed and / or modified by Raytheon Company,
+ * pursuant to Contract DG133W-05-CQ-1067 with the US Government.
+ *
+ * U.S. EXPORT CONTROLLED TECHNICAL DATA
+ * This software product contains export-restricted data whose
+ * export/transfer/disclosure is restricted by U.S. law. Dissemination
+ * to non-U.S. persons whether in the United States or abroad requires
+ * an export license or other authorization.
+ *
+ * Contractor Name: Raytheon Company
+ * Contractor Address: 6825 Pine Street, Suite 340
+ * Mail Stop B8
+ * Omaha, NE 68106
+ * 402.291.0100
+ *
+ * See the AWIPS II Master Rights File ("Master Rights File.pdf") for
+ * further licensing information.
+ **/
+package com.raytheon.uf.edex.ingest.notification;
+
+import java.util.HashMap;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
+import com.raytheon.uf.common.dataquery.requests.RequestConstraint;
+import com.raytheon.uf.common.dataquery.requests.RequestableMetadataMarshaller;
+
+/**
+ * Configuration object for plugin notification. An empty or null metadataMap
+ * implies it should receive all data.
+ *
+ *
+ *
+ * SOFTWARE HISTORY
+ *
+ * Date Ticket# Engineer Description
+ * ------------ ---------- ----------- --------------------------
+ * Nov 19, 2013 2170 rjpeter Initial creation
+ *
+ *
+ *
+ * @author rjpeter
+ * @version 1.0
+ */
+@XmlAccessorType(XmlAccessType.NONE)
+public class PluginNotifierConfig {
+
+ public static enum EndpointType {
+ QUEUE, TOPIC, VM, DIRECTVM
+ }
+
+ public static enum NotifyFormat {
+ DATAURI, PDO
+ }
+
+ private static final String vmPrefix = "vm:";
+
+ private static final String directvmPrefix = "direct-vm:";
+
+ private static final String persistentJmsPrefix = "jms-durable:";
+
+ private static final String transientJmsPrefix = "jms-generic:";
+
+ @XmlElement
+ protected String endpointName;
+
+ @XmlElement
+ protected EndpointType endpointType;
+
+ @XmlElement(required = true)
+ protected NotifyFormat format;
+
+ /**
+ * Time to live for JMS type endpoints in milliseconds.
+ */
+ @XmlElement
+ protected int timeToLive = -1;
+
+ /**
+ * If the JMS message is being sent to a durable endpoint.
+ */
+ @XmlElement
+ protected boolean durable;
+
+ /**
+ * the metadata criteria to retrieve the resource
+ */
+ @XmlElement
+ @XmlJavaTypeAdapter(value = RequestableMetadataMarshaller.class)
+ protected HashMap[] metadataMap;
+
+ protected transient String endpointUri;
+
+ public String getEndpointName() {
+ return endpointName;
+ }
+
+ public void setEndpointName(String endpointName) {
+ this.endpointName = endpointName;
+ }
+
+ public EndpointType getEndpointType() {
+ return endpointType;
+ }
+
+ public void setEndpointType(EndpointType endpointType) {
+ this.endpointType = endpointType;
+ }
+
+ public NotifyFormat getFormat() {
+ return format;
+ }
+
+ public void setFormat(NotifyFormat format) {
+ this.format = format;
+ }
+
+ public int getTimeToLive() {
+ return timeToLive;
+ }
+
+ public void setTimeToLive(int timeToLive) {
+ this.timeToLive = timeToLive;
+ }
+
+ public boolean isDurable() {
+ return durable;
+ }
+
+ public void setDurable(boolean durable) {
+ this.durable = durable;
+ }
+
+ public HashMap[] getMetadataMap() {
+ return metadataMap;
+ }
+
+ public void setMetadataMap(HashMap[] metadataMap) {
+ this.metadataMap = metadataMap;
+ }
+
+ /**
+ * Generates and returns the endpoint uri for this configuration.
+ *
+ * @return
+ */
+ public String getEndpointUri() {
+ if (endpointUri == null) {
+ StringBuilder builder = new StringBuilder(64);
+ switch (endpointType) {
+ case DIRECTVM:
+ builder.append(directvmPrefix);
+ case VM:
+ builder.append(vmPrefix);
+ break;
+ case QUEUE:
+ case TOPIC:
+ if (durable) {
+ builder.append(persistentJmsPrefix);
+ } else {
+ builder.append(transientJmsPrefix);
+ }
+
+ builder.append(endpointType.name().toLowerCase()).append(':');
+ break;
+ }
+
+ builder.append(endpointName);
+
+ // for jms endpoints add time to live field
+ if ((timeToLive > 0)
+ && (EndpointType.QUEUE.equals(endpointType) || EndpointType.TOPIC
+ .equals(endpointType))) {
+ // append time to live in milliseconds
+ builder.append("?timeToLive=").append(timeToLive);
+ }
+
+ endpointUri = builder.toString();
+ }
+
+ return endpointUri;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = (prime * result)
+ + ((endpointName == null) ? 0 : endpointName.hashCode());
+ result = (prime * result)
+ + ((endpointType == null) ? 0 : endpointType.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ PluginNotifierConfig other = (PluginNotifierConfig) obj;
+ if (endpointName == null) {
+ if (other.endpointName != null) {
+ return false;
+ }
+ } else if (!endpointName.equals(other.endpointName)) {
+ return false;
+ }
+ if (endpointType != other.endpointType) {
+ return false;
+ }
+ return true;
+ }
+
+}
diff --git a/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/PluginNotifierConfigList.java b/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/PluginNotifierConfigList.java
new file mode 100644
index 0000000000..c7c130baae
--- /dev/null
+++ b/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/PluginNotifierConfigList.java
@@ -0,0 +1,60 @@
+/**
+ * This software was developed and / or modified by Raytheon Company,
+ * pursuant to Contract DG133W-05-CQ-1067 with the US Government.
+ *
+ * U.S. EXPORT CONTROLLED TECHNICAL DATA
+ * This software product contains export-restricted data whose
+ * export/transfer/disclosure is restricted by U.S. law. Dissemination
+ * to non-U.S. persons whether in the United States or abroad requires
+ * an export license or other authorization.
+ *
+ * Contractor Name: Raytheon Company
+ * Contractor Address: 6825 Pine Street, Suite 340
+ * Mail Stop B8
+ * Omaha, NE 68106
+ * 402.291.0100
+ *
+ * See the AWIPS II Master Rights File ("Master Rights File.pdf") for
+ * further licensing information.
+ */
+package com.raytheon.uf.edex.ingest.notification;
+
+import java.util.List;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlElements;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * List of Plugin Notification Configurations.
+ *
+ *
+ *
+ * SOFTWARE HISTORY
+ *
+ * Date Ticket# Engineer Description
+ * ------------ ---------- ----------- --------------------------
+ * Nov 19, 2013 2170 rjpeter Initial creation
+ *
+ *
+ *
+ * @author rjpeter
+ * @version 1.0
+ */
+@XmlRootElement(name = "pluginNotificationList")
+@XmlAccessorType(XmlAccessType.NONE)
+public class PluginNotifierConfigList {
+ @XmlElements({ @XmlElement(name = "pluginNotification") })
+ private List notificationConfigs;
+
+ public List getNotificationConfigs() {
+ return notificationConfigs;
+ }
+
+ public void setNotificationConfigs(
+ List notificationConfigs) {
+ this.notificationConfigs = notificationConfigs;
+ }
+}
diff --git a/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/ToDataURI.java b/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/ToDataURI.java
index 8261d6b071..04f5fc96d6 100644
--- a/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/ToDataURI.java
+++ b/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/ToDataURI.java
@@ -20,6 +20,7 @@
package com.raytheon.uf.edex.ingest.notification;
import com.raytheon.uf.common.dataplugin.PluginDataObject;
+import com.raytheon.uf.common.dataplugin.message.PracticeDataURINotificationMessage;
/**
* Converts PluginDataObjects or arrays of PluginDataObjects into their dataURIs
@@ -29,8 +30,8 @@ import com.raytheon.uf.common.dataplugin.PluginDataObject;
* SOFTWARE HISTORY
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
- * Nov 25, 2008 chammack Initial creation
- *
+ * Nov 25, 2008 chammack Initial creation
+ * Nov 19, 2013 2170 rjpeter Added toPracticeNotificationMsg.
*
*
* @author chammack
@@ -51,4 +52,12 @@ public class ToDataURI {
public String toDataURI(PluginDataObject pdo) {
return pdo.getDataURI();
}
+
+ public PracticeDataURINotificationMessage toPracticeNotificationMsg(
+ PluginDataObject[] pdos) {
+ String[] uris = toDataURI(pdos);
+ PracticeDataURINotificationMessage rval = new PracticeDataURINotificationMessage();
+ rval.setDataURIs(uris);
+ return rval;
+ }
}
diff --git a/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/router/DataUriRouter.java b/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/router/DataUriRouter.java
new file mode 100644
index 0000000000..d76e6176e7
--- /dev/null
+++ b/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/router/DataUriRouter.java
@@ -0,0 +1,186 @@
+/**
+ * This software was developed and / or modified by Raytheon Company,
+ * pursuant to Contract DG133W-05-CQ-1067 with the US Government.
+ *
+ * U.S. EXPORT CONTROLLED TECHNICAL DATA
+ * This software product contains export-restricted data whose
+ * export/transfer/disclosure is restricted by U.S. law. Dissemination
+ * to non-U.S. persons whether in the United States or abroad requires
+ * an export license or other authorization.
+ *
+ * Contractor Name: Raytheon Company
+ * Contractor Address: 6825 Pine Street, Suite 340
+ * Mail Stop B8
+ * Omaha, NE 68106
+ * 402.291.0100
+ *
+ * See the AWIPS II Master Rights File ("Master Rights File.pdf") for
+ * further licensing information.
+ **/
+package com.raytheon.uf.edex.ingest.notification.router;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.zip.GZIPOutputStream;
+
+import com.raytheon.uf.common.dataplugin.PluginDataObject;
+import com.raytheon.uf.common.dataplugin.message.DataURINotificationMessage;
+import com.raytheon.uf.common.serialization.SerializationException;
+import com.raytheon.uf.common.serialization.SerializationUtil;
+import com.raytheon.uf.common.util.ByteArrayOutputStreamPool;
+import com.raytheon.uf.edex.core.EDEXUtil;
+import com.raytheon.uf.edex.core.EdexException;
+import com.raytheon.uf.edex.ingest.notification.PluginNotifierConfig;
+import com.raytheon.uf.edex.ingest.notification.PluginNotifierConfig.EndpointType;
+
+/**
+ * Routes DataUri Notifications to a destination uri. Notification msg will be
+ * sent immediately to the routes for inner jvm calls, routes over jms will be
+ * queued and sent in gzipped batches.
+ *
+ *
+ *
+ * SOFTWARE HISTORY
+ *
+ * Date Ticket# Engineer Description
+ * ------------ ---------- ----------- --------------------------
+ * Nov 19, 2013 2170 rjpeter Initial creation
+ *
+ *
+ *
+ * @author rjpeter
+ * @version 1.0
+ */
+public class DataUriRouter implements INotificationRouter {
+
+ /**
+ * Buffer size.
+ */
+ private static final int GZIP_BUFFER_SIZE = 4096;
+
+ /**
+ * Data URIs that have not been sent.
+ */
+ private final ConcurrentLinkedQueue uris = new ConcurrentLinkedQueue();
+
+ /**
+ * Flag if this route stays in the jvm.
+ */
+ private final boolean isInternal;
+
+ /**
+ * The destination URI for this router.
+ */
+ private final String route;
+
+ /**
+ *
+ * @param config
+ */
+ public DataUriRouter(PluginNotifierConfig config) {
+ EndpointType type = config.getEndpointType();
+ isInternal = EndpointType.DIRECTVM.equals(type)
+ || EndpointType.VM.equals(type);
+ route = config.getEndpointUri();
+ }
+
+ @Override
+ public String getRoute() {
+ return route;
+ }
+
+ @Override
+ public void process(PluginDataObject pdo) {
+ uris.add(pdo.getDataURI());
+ }
+
+ /**
+ * Creates a DataURINotificationMessage.
+ *
+ * @return
+ */
+ protected synchronized DataURINotificationMessage createMessage() {
+ DataURINotificationMessage msg = null;
+ // this is the only point that uris is reduced, safe to grab current
+ // size and dequeue that many items
+ int size = uris.size();
+ if (size > 0) {
+ String[] data = new String[size];
+ for (int i = 0; i < size; i++) {
+ data[i] = uris.poll();
+ }
+
+ msg = new DataURINotificationMessage();
+ msg.setDataURIs(data);
+ }
+
+ return msg;
+ }
+
+ @Override
+ public void sendImmediateData() throws EdexException {
+ // if sending inside jvm, create message and send immediately as the
+ // memory object
+ if (isInternal) {
+ DataURINotificationMessage msg = createMessage();
+ if (msg != null) {
+ EDEXUtil.getMessageProducer().sendAsyncUri(route, msg);
+ }
+ }
+ }
+
+ @Override
+ public void sendQueuedData() throws EdexException {
+ if (!isInternal) {
+ // if sending outside the jvm, create message, serialize, gzip, and
+ // then send
+ DataURINotificationMessage msg = createMessage();
+ if (msg != null) {
+ EDEXUtil.getMessageProducer().sendAsyncUri(route,
+ encodeMessage(msg));
+ }
+ }
+ }
+
+ /**
+ * Thrift encodes an object and then gzip's the binary data. Should only be
+ * used for sending data outside the jvm.
+ *
+ * @param msg
+ * @return
+ * @throws SerializationException
+ */
+ private byte[] encodeMessage(DataURINotificationMessage msg)
+ throws EdexException {
+ ByteArrayOutputStream baos = ByteArrayOutputStreamPool.getInstance()
+ .getStream();
+ GZIPOutputStream gzippedURIs = null;
+
+ try {
+ gzippedURIs = new GZIPOutputStream(baos, GZIP_BUFFER_SIZE, true);
+ } catch (IOException e) {
+ throw new EdexException(
+ "Failed to prepare the gzipped data stream", e);
+ }
+
+ try {
+ SerializationUtil.transformToThriftUsingStream(msg, gzippedURIs);
+ gzippedURIs.finish();
+ gzippedURIs.flush();
+ return baos.toByteArray();
+ } catch (IOException e) {
+ throw new EdexException("Failed to write the gzipped data stream",
+ e);
+ } catch (SerializationException e) {
+ throw new EdexException(
+ "Failed to serialize DataURINotificationMessage", e);
+ } finally {
+ try {
+ gzippedURIs.close();
+ } catch (IOException e) {
+ // ignore, we no longer need the stream
+ }
+ }
+ }
+}
diff --git a/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/router/INotificationRouter.java b/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/router/INotificationRouter.java
new file mode 100644
index 0000000000..19e46ffec9
--- /dev/null
+++ b/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/router/INotificationRouter.java
@@ -0,0 +1,76 @@
+/**
+ * This software was developed and / or modified by Raytheon Company,
+ * pursuant to Contract DG133W-05-CQ-1067 with the US Government.
+ *
+ * U.S. EXPORT CONTROLLED TECHNICAL DATA
+ * This software product contains export-restricted data whose
+ * export/transfer/disclosure is restricted by U.S. law. Dissemination
+ * to non-U.S. persons whether in the United States or abroad requires
+ * an export license or other authorization.
+ *
+ * Contractor Name: Raytheon Company
+ * Contractor Address: 6825 Pine Street, Suite 340
+ * Mail Stop B8
+ * Omaha, NE 68106
+ * 402.291.0100
+ *
+ * See the AWIPS II Master Rights File ("Master Rights File.pdf") for
+ * further licensing information.
+ **/
+package com.raytheon.uf.edex.ingest.notification.router;
+
+import com.raytheon.uf.common.dataplugin.PluginDataObject;
+import com.raytheon.uf.edex.core.EdexException;
+
+/**
+ * Notification router for a specific destination and format.
+ *
+ *
+ *
+ * SOFTWARE HISTORY
+ *
+ * Date Ticket# Engineer Description
+ * ------------ ---------- ----------- --------------------------
+ * Nov 19, 2013 2170 rjpeter Initial creation
+ *
+ *
+ *
+ * @author rjpeter
+ * @version 1.0
+ */
+public interface INotificationRouter {
+ /**
+ * The destination for this router.
+ *
+ * @return
+ */
+ public String getRoute();
+
+ /**
+ * Process the pdo into notification format.
+ *
+ * @param pdo
+ */
+ public void process(PluginDataObject pdo);
+
+ /**
+ * Send data that should be sent immediately to route. This is mainly used
+ * for PDO data that we don't want to build up and send as a bulk message.
+ * Future enhancement could allow for priority data such as warnings and
+ * radar data to always be notified immediately instead of a timer driver
+ * basis.
+ *
+ * @throws EdexException
+ */
+ public void sendImmediateData() throws EdexException;
+
+ /**
+ * Send any queued data to route. Generally used for data uri notifications
+ * being sent over JMS to allow for better bundling. The data is queued in
+ * memory and sent out as a bundled message. The interval is defined by the
+ * notification timer in persist-ingest.xml.
+ *
+ * @throws EdexException
+ */
+ public void sendQueuedData() throws EdexException;
+}
diff --git a/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/router/PdoRouter.java b/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/router/PdoRouter.java
new file mode 100644
index 0000000000..3a6d941fc8
--- /dev/null
+++ b/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/router/PdoRouter.java
@@ -0,0 +1,94 @@
+/**
+ * This software was developed and / or modified by Raytheon Company,
+ * pursuant to Contract DG133W-05-CQ-1067 with the US Government.
+ *
+ * U.S. EXPORT CONTROLLED TECHNICAL DATA
+ * This software product contains export-restricted data whose
+ * export/transfer/disclosure is restricted by U.S. law. Dissemination
+ * to non-U.S. persons whether in the United States or abroad requires
+ * an export license or other authorization.
+ *
+ * Contractor Name: Raytheon Company
+ * Contractor Address: 6825 Pine Street, Suite 340
+ * Mail Stop B8
+ * Omaha, NE 68106
+ * 402.291.0100
+ *
+ * See the AWIPS II Master Rights File ("Master Rights File.pdf") for
+ * further licensing information.
+ **/
+package com.raytheon.uf.edex.ingest.notification.router;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import com.raytheon.uf.common.dataplugin.PluginDataObject;
+import com.raytheon.uf.edex.core.EDEXUtil;
+import com.raytheon.uf.edex.core.EdexException;
+import com.raytheon.uf.edex.ingest.notification.PluginNotifierConfig;
+
+/**
+ * Routes pdos to a destination. Should only be used for destination inside the
+ * jvm.
+ *
+ *
+ *
+ * SOFTWARE HISTORY
+ *
+ * Date Ticket# Engineer Description
+ * ------------ ---------- ----------- --------------------------
+ * Nov 19, 2013 2170 rjpeter Initial creation
+ *
+ *
+ *
+ * @author rjpeter
+ * @version 1.0
+ */
+public class PdoRouter implements INotificationRouter {
+
+ /**
+ * Destination URI for this router.
+ */
+ private final String route;
+
+ /**
+ * PDO's in use by current thread. Since all PDO data is sent immediately
+ * handles concurrency via thread local instance.
+ */
+ private final ThreadLocal> myPdos = new ThreadLocal>() {
+
+ @Override
+ protected List initialValue() {
+ return new LinkedList();
+ }
+
+ };
+
+ public PdoRouter(PluginNotifierConfig config) {
+ this.route = config.getEndpointUri();
+ }
+
+ @Override
+ public String getRoute() {
+ return route;
+ }
+
+ @Override
+ public void process(PluginDataObject pdo) {
+ myPdos.get().add(pdo);
+ }
+
+ @Override
+ public void sendImmediateData() throws EdexException {
+ List pdos = myPdos.get();
+ myPdos.remove();
+ if (pdos.size() > 0) {
+ EDEXUtil.getMessageProducer().sendAsyncUri(route, pdos);
+ }
+ }
+
+ @Override
+ public void sendQueuedData() throws EdexException {
+ // NOOP all data sent immediately
+ }
+}
diff --git a/edexOsgi/com.raytheon.uf.edex.ingest/utility/common_static/base/notification/edex-alerts.xml b/edexOsgi/com.raytheon.uf.edex.ingest/utility/common_static/base/notification/edex-alerts.xml
new file mode 100644
index 0000000000..80c7e73127
--- /dev/null
+++ b/edexOsgi/com.raytheon.uf.edex.ingest/utility/common_static/base/notification/edex-alerts.xml
@@ -0,0 +1,11 @@
+
+
+ edex.alerts
+ DATAURI
+ TOPIC
+ false
+ 120000
+
+
+
+
diff --git a/edexOsgi/com.raytheon.uf.edex.ohd/res/spring/satpre-spring.xml b/edexOsgi/com.raytheon.uf.edex.ohd/res/spring/satpre-spring.xml
index cd47724608..41b88edc54 100644
--- a/edexOsgi/com.raytheon.uf.edex.ohd/res/spring/satpre-spring.xml
+++ b/edexOsgi/com.raytheon.uf.edex.ohd/res/spring/satpre-spring.xml
@@ -12,7 +12,8 @@
errorHandlerRef="errorHandler"
autoStartup="false">
-
+
+
diff --git a/edexOsgi/com.raytheon.uf.edex.ohd/utility/common_static/base/notification/satpre-ohd.xml b/edexOsgi/com.raytheon.uf.edex.ohd/utility/common_static/base/notification/satpre-ohd.xml
new file mode 100644
index 0000000000..b62aa9f838
--- /dev/null
+++ b/edexOsgi/com.raytheon.uf.edex.ohd/utility/common_static/base/notification/satpre-ohd.xml
@@ -0,0 +1,17 @@
+
+
+ satPreFilter
+ DATAURI
+ QUEUE
+ true
+ 600000
+
+
+
+
+
+
+
+
+
+
diff --git a/edexOsgi/com.raytheon.uf.edex.plugin.scan/utility/common_static/base/notification/scan.xml b/edexOsgi/com.raytheon.uf.edex.plugin.scan/utility/common_static/base/notification/scan.xml
new file mode 100644
index 0000000000..c5cd721ab2
--- /dev/null
+++ b/edexOsgi/com.raytheon.uf.edex.plugin.scan/utility/common_static/base/notification/scan.xml
@@ -0,0 +1,15 @@
+
+
+
+ scanCpgsrvFiltering
+ DATAURI
+ QUEUE
+ true
+ 120000
+
+
+
+
+
+
+