From 291d80cee1ce62e50513e4890759f7c4f72c3108 Mon Sep 17 00:00:00 2001 From: Bryan Kowal Date: Fri, 16 Aug 2013 11:13:43 -0500 Subject: [PATCH] Issue #2169 - gzip data uris; entities that receive data uris will unzip them before running thrift conversions. - Amend: updates for 32-bit edex_com - Amend: do not synchronize when building the message - Amend: use the byte array pool - Amend: default to data decompression when the queue is edex.alerts Change-Id: I79a1f3d3d698c67019a4aa33e667fef99f6ebc17 Former-commit-id: 2890b19c7cd88471f1adbc57886f988d22d0e370 [formerly 35cacacc4521c917b01ffb12f91b0289fa0a5e6b] [formerly 7e61461e86b2edbadcb134c09797d43f626385f3] [formerly 7e61461e86b2edbadcb134c09797d43f626385f3 [formerly 3eaacdb5222c51133d880f96eab011a123a6093e]] [formerly 2e1a8eb2b32684708d48ed09bea69ace7b678bcd [formerly 7e61461e86b2edbadcb134c09797d43f626385f3 [formerly 3eaacdb5222c51133d880f96eab011a123a6093e] [formerly 2e1a8eb2b32684708d48ed09bea69ace7b678bcd [formerly 120aa0d6ee671e20734f8efb7bad89ff1b145629]]]] Former-commit-id: 2e1a8eb2b32684708d48ed09bea69ace7b678bcd Former-commit-id: 909996295acd3b4e7ff8c53b54592dbbe8fd8131 [formerly f55be8607a629311fb3cfbbd46fe9a4e8526f65a] [formerly 18c9026455591ec9fa485702a17e736b05ede1c7 [formerly 5c0af47d6df3571556c7cf3c351e5dd2cfc68dbd]] Former-commit-id: 18c9026455591ec9fa485702a17e736b05ede1c7 Former-commit-id: 99e3667a78eae3fac223f2f5d0edd17155893806 --- .../uf/viz/core/comm/JMSConnection.java | 3 +- .../notification/NotificationMessage.java | 6 +- .../serialization/SerializationUtil.java | 11 ++++ .../raytheon/uf/common/util/DataUnzipper.java | 17 ++++++ .../res/spring/activetable-common.xml | 1 - .../res/spring/persist-ingest.xml | 3 +- .../notification/DataUriAggregator.java | 56 ++++++++++++++++--- nativeLib/edex_com/.cproject | 2 + nativeLib/edex_com/src/EdexNotification.cpp | 27 +++++++++ pythonPackages/ufpy/QpidSubscriber.py | 22 +++++++- .../ufpy/test/testQpidTimeToLive.py | 3 +- 11 files changed, 135 insertions(+), 16 deletions(-) diff --git a/cave/com.raytheon.uf.viz.core/src/com/raytheon/uf/viz/core/comm/JMSConnection.java b/cave/com.raytheon.uf.viz.core/src/com/raytheon/uf/viz/core/comm/JMSConnection.java index 7d06157abf..592b3f7984 100644 --- a/cave/com.raytheon.uf.viz.core/src/com/raytheon/uf/viz/core/comm/JMSConnection.java +++ b/cave/com.raytheon.uf.viz.core/src/com/raytheon/uf/viz/core/comm/JMSConnection.java @@ -43,6 +43,7 @@ import com.raytheon.uf.viz.core.VizApp; * Nov 2, 2011 #7391 bkowal Ensure that the generated WsId is properly formatted to be * included in a url. * May 09, 2013 1814 rjpeter Updated prefetch to 10. + * Aug 16, 2013 2169 bkowal CAVE will now synchronously acknowledge messages. * * * @author chammack @@ -82,7 +83,7 @@ public class JMSConnection { "UTF-8") + "/edex?brokerlist='" + this.jndiProviderUrl - + "?connecttimeout='5000'&heartbeat='0''&maxprefetch='10'&sync_publish='all'&failover='nofailover'"); + + "?connecttimeout='5000'&heartbeat='0''&maxprefetch='10'&sync_publish='all'&failover='nofailover'&sync_ack='true'"); } catch (URLSyntaxException e) { // TODO Auto-generated catch block e.printStackTrace(); diff --git a/cave/com.raytheon.uf.viz.core/src/com/raytheon/uf/viz/core/notification/NotificationMessage.java b/cave/com.raytheon.uf.viz.core/src/com/raytheon/uf/viz/core/notification/NotificationMessage.java index 3be680f02f..0122072ad3 100644 --- a/cave/com.raytheon.uf.viz.core/src/com/raytheon/uf/viz/core/notification/NotificationMessage.java +++ b/cave/com.raytheon.uf.viz.core/src/com/raytheon/uf/viz/core/notification/NotificationMessage.java @@ -32,6 +32,7 @@ import com.raytheon.uf.common.serialization.DynamicSerializationManager; import com.raytheon.uf.common.serialization.DynamicSerializationManager.SerializationType; import com.raytheon.uf.common.serialization.SerializationUtil; import com.raytheon.uf.viz.core.exception.VizException; +import com.raytheon.uf.common.util.DataUnzipper; /** * Encapsulation object for notification messages @@ -44,6 +45,7 @@ import com.raytheon.uf.viz.core.exception.VizException; * Oct 4, 2010 7193 cjeanbap Added a new method, isNotExpired(). * Feb 1, 2011 7193 cjeanbap Added a new method, getPublishedTime(). * Aug 6, 2013 2228 njensen Use deserialize(byte[]) + * Aug 16, 2013 2169 bkowal Unzip any gzipped information * * * @author chammack @@ -98,7 +100,9 @@ public class NotificationMessage { throw new NotificationException( "Message payload terminated early. Expected: " + length + ". Got: " + readLength); - + if (DataUnzipper.isGzipped(data)) { + data = new DataUnzipper().gunzip(data); + } this.unmarshalledObject = DynamicSerializationManager .getManager(SerializationType.Thrift) .deserialize(data); diff --git a/edexOsgi/com.raytheon.uf.common.serialization/src/com/raytheon/uf/common/serialization/SerializationUtil.java b/edexOsgi/com.raytheon.uf.common.serialization/src/com/raytheon/uf/common/serialization/SerializationUtil.java index 6cf3723140..abf1070224 100644 --- a/edexOsgi/com.raytheon.uf.common.serialization/src/com/raytheon/uf/common/serialization/SerializationUtil.java +++ b/edexOsgi/com.raytheon.uf.common.serialization/src/com/raytheon/uf/common/serialization/SerializationUtil.java @@ -20,6 +20,7 @@ package com.raytheon.uf.common.serialization; import java.io.File; +import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.List; @@ -29,6 +30,7 @@ import javax.xml.bind.JAXBException; import com.raytheon.uf.common.serialization.DynamicSerializationManager.SerializationType; import com.raytheon.uf.common.util.ServiceLoaderUtil; +import com.raytheon.uf.common.util.DataUnzipper; /** * Provides utilities for serialization support @@ -46,6 +48,7 @@ import com.raytheon.uf.common.util.ServiceLoaderUtil; * Mar 21, 2013 1794 djohnson ServiceLoaderUtil now requires the requesting class. * May 01, 2013 1968 djohnson Prevent deadlock due to SerializableManager threads needing to serialize things. * Aug 06, 2013 2228 njensen More efficient transformFromThrift(Class, byte[]) + * Aug 13, 2013 2169 bkowal Unzip any gzipped data before applying thrift transformations * * * @@ -333,6 +336,14 @@ public final class SerializationUtil { @Deprecated public static Object transformFromThrift(byte[] bytes) throws SerializationException { + try { + if (DataUnzipper.isGzipped(bytes)) { + return transformFromThrift(Object.class, + new DataUnzipper().gunzip(bytes)); + } + } catch (IOException e) { + throw new SerializationException("GZip analysis failed!", e); + } return transformFromThrift(Object.class, bytes); } diff --git a/edexOsgi/com.raytheon.uf.common.util/src/com/raytheon/uf/common/util/DataUnzipper.java b/edexOsgi/com.raytheon.uf.common.util/src/com/raytheon/uf/common/util/DataUnzipper.java index 7438a23784..f094282848 100644 --- a/edexOsgi/com.raytheon.uf.common.util/src/com/raytheon/uf/common/util/DataUnzipper.java +++ b/edexOsgi/com.raytheon.uf.common.util/src/com/raytheon/uf/common/util/DataUnzipper.java @@ -37,6 +37,8 @@ import java.util.zip.ZipInputStream; * Date Ticket# Engineer Description * ------------ ---------- ----------- -------------------------- * Jul 17, 2012 mschenke Initial creation + * Aug 16, 2013 2169 bkowal Added a method to determine if data is gzipped + * using the magic number verification. * * * @@ -45,6 +47,21 @@ import java.util.zip.ZipInputStream; */ public class DataUnzipper { + public static boolean isGzipped(byte[] data) throws IOException { + ByteArrayInputStream bais = new ByteArrayInputStream(data); + + int b = bais.read(); + if (b == -1) { + throw new IOException("Unexpected end of input stream encountered!"); + } + int x = bais.read(); + if (x == -1) { + throw new IOException("Unexpected end of input stream encountered!"); + } + int magic = ((int) x << 8) | b; + + return magic == GZIPInputStream.GZIP_MAGIC; + } /** * Uses ZLIB decompression to unzip the data diff --git a/edexOsgi/com.raytheon.uf.edex.activetable/res/spring/activetable-common.xml b/edexOsgi/com.raytheon.uf.edex.activetable/res/spring/activetable-common.xml index 0cf40db440..ca80e04b60 100644 --- a/edexOsgi/com.raytheon.uf.edex.activetable/res/spring/activetable-common.xml +++ b/edexOsgi/com.raytheon.uf.edex.activetable/res/spring/activetable-common.xml @@ -57,7 +57,6 @@ - diff --git a/edexOsgi/com.raytheon.uf.edex.ingest/res/spring/persist-ingest.xml b/edexOsgi/com.raytheon.uf.edex.ingest/res/spring/persist-ingest.xml index f0ba066b0e..609c73da8b 100644 --- a/edexOsgi/com.raytheon.uf.edex.ingest/res/spring/persist-ingest.xml +++ b/edexOsgi/com.raytheon.uf.edex.ingest/res/spring/persist-ingest.xml @@ -63,11 +63,10 @@ - + - diff --git a/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/DataUriAggregator.java b/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/DataUriAggregator.java index 75c7141183..6083fb3c2b 100644 --- a/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/DataUriAggregator.java +++ b/edexOsgi/com.raytheon.uf.edex.ingest/src/com/raytheon/uf/edex/ingest/notification/DataUriAggregator.java @@ -21,9 +21,16 @@ package com.raytheon.uf.edex.ingest.notification; import java.util.ArrayList; import java.util.List; +import java.util.zip.GZIPOutputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import com.raytheon.uf.common.serialization.ISerializableObject; +import com.raytheon.uf.common.serialization.SerializationException; +import com.raytheon.uf.common.serialization.SerializationUtil; import com.raytheon.uf.common.dataplugin.message.DataURINotificationMessage; import com.raytheon.uf.common.dataplugin.message.PracticeDataURINotificationMessage; +import com.raytheon.uf.common.util.ByteArrayOutputStreamPool; /** * Combines multiple messages of URIs into a single message that can be @@ -35,6 +42,7 @@ import com.raytheon.uf.common.dataplugin.message.PracticeDataURINotificationMess * ------------ ---------- ----------- -------------------------- * Dec 10, 2008 njensen Initial creation * Feb 15, 2013 1638 mschenke Moved DataURINotificationMessage to uf.common.dataplugin + * Aug 16, 2013 2169 bkowal gzip data uris * * * @author njensen @@ -77,14 +85,16 @@ public class DataUriAggregator { * * @return */ - public DataURINotificationMessage sendQueuedUris() { + public byte[] sendQueuedUris() throws SerializationException { + DataURINotificationMessage msg = null; synchronized (this) { String[] uris = dataUris.toArray(new String[dataUris.size()]); dataUris.clear(); - DataURINotificationMessage msg = new DataURINotificationMessage(); + msg = new DataURINotificationMessage(); msg.setDataURIs(uris); - return msg; } + + return this.encodeMessage(msg); } /** @@ -92,13 +102,45 @@ public class DataUriAggregator { * * @return */ - public PracticeDataURINotificationMessage sendPracticeQueuedUris() { + public byte[] sendPracticeQueuedUris() throws SerializationException { + PracticeDataURINotificationMessage msg = null; synchronized (this) { String[] uris = dataUris.toArray(new String[dataUris.size()]); dataUris.clear(); - PracticeDataURINotificationMessage msg = new PracticeDataURINotificationMessage(); + msg = new PracticeDataURINotificationMessage(); msg.setDataURIs(uris); - return msg; + } + + return this.encodeMessage(msg); + } + + public byte[] encodeMessage(ISerializableObject msg) + throws SerializationException { + ByteArrayOutputStream baos = ByteArrayOutputStreamPool.getInstance() + .getStream(); + GZIPOutputStream gzippedURIs = null; + + try { + gzippedURIs = new GZIPOutputStream(baos); + } catch (IOException e) { + throw new SerializationException( + "Failed to prepare the gzipped data stream!", e); + } + + SerializationUtil.transformToThriftUsingStream(msg, gzippedURIs); + try { + gzippedURIs.finish(); + gzippedURIs.flush(); + return baos.toByteArray(); + } catch (IOException e) { + throw new SerializationException( + "Failed to write the gzipped data stream!", e); + } finally { + try { + gzippedURIs.close(); + } catch (IOException e) { + // ignore, we no longer need the stream + } } } -} +} \ No newline at end of file diff --git a/nativeLib/edex_com/.cproject b/nativeLib/edex_com/.cproject index 250a367700..dd7ddd29eb 100644 --- a/nativeLib/edex_com/.cproject +++ b/nativeLib/edex_com/.cproject @@ -60,6 +60,7 @@