Issue #2169 - gzip data uris; entities that receive data uris will unzip them before running thrift conversions.

- Amend: updates for 32-bit edex_com
- Amend: do not synchronize when building the message
- Amend: use the byte array pool
- Amend: default to data decompression when the queue is edex.alerts

Change-Id: I79a1f3d3d698c67019a4aa33e667fef99f6ebc17

Former-commit-id: 120aa0d6ee671e20734f8efb7bad89ff1b145629
This commit is contained in:
Bryan Kowal 2013-08-16 11:13:43 -05:00
parent 2a296b5890
commit 2e1a8eb2b3
11 changed files with 135 additions and 16 deletions

View file

@ -43,6 +43,7 @@ import com.raytheon.uf.viz.core.VizApp;
* Nov 2, 2011 #7391 bkowal Ensure that the generated WsId is properly formatted to be * Nov 2, 2011 #7391 bkowal Ensure that the generated WsId is properly formatted to be
* included in a url. * included in a url.
* May 09, 2013 1814 rjpeter Updated prefetch to 10. * May 09, 2013 1814 rjpeter Updated prefetch to 10.
* Aug 16, 2013 2169 bkowal CAVE will now synchronously acknowledge messages.
* </pre> * </pre>
* *
* @author chammack * @author chammack
@ -82,7 +83,7 @@ public class JMSConnection {
"UTF-8") "UTF-8")
+ "/edex?brokerlist='" + "/edex?brokerlist='"
+ this.jndiProviderUrl + this.jndiProviderUrl
+ "?connecttimeout='5000'&heartbeat='0''&maxprefetch='10'&sync_publish='all'&failover='nofailover'"); + "?connecttimeout='5000'&heartbeat='0''&maxprefetch='10'&sync_publish='all'&failover='nofailover'&sync_ack='true'");
} catch (URLSyntaxException e) { } catch (URLSyntaxException e) {
// TODO Auto-generated catch block // TODO Auto-generated catch block
e.printStackTrace(); e.printStackTrace();

View file

@ -32,6 +32,7 @@ import com.raytheon.uf.common.serialization.DynamicSerializationManager;
import com.raytheon.uf.common.serialization.DynamicSerializationManager.SerializationType; import com.raytheon.uf.common.serialization.DynamicSerializationManager.SerializationType;
import com.raytheon.uf.common.serialization.SerializationUtil; import com.raytheon.uf.common.serialization.SerializationUtil;
import com.raytheon.uf.viz.core.exception.VizException; import com.raytheon.uf.viz.core.exception.VizException;
import com.raytheon.uf.common.util.DataUnzipper;
/** /**
* Encapsulation object for notification messages * Encapsulation object for notification messages
@ -44,6 +45,7 @@ import com.raytheon.uf.viz.core.exception.VizException;
* Oct 4, 2010 7193 cjeanbap Added a new method, isNotExpired(). * Oct 4, 2010 7193 cjeanbap Added a new method, isNotExpired().
* Feb 1, 2011 7193 cjeanbap Added a new method, getPublishedTime(). * Feb 1, 2011 7193 cjeanbap Added a new method, getPublishedTime().
* Aug 6, 2013 2228 njensen Use deserialize(byte[]) * Aug 6, 2013 2228 njensen Use deserialize(byte[])
* Aug 16, 2013 2169 bkowal Unzip any gzipped information
* </pre> * </pre>
* *
* @author chammack * @author chammack
@ -98,7 +100,9 @@ public class NotificationMessage {
throw new NotificationException( throw new NotificationException(
"Message payload terminated early. Expected: " "Message payload terminated early. Expected: "
+ length + ". Got: " + readLength); + length + ". Got: " + readLength);
if (DataUnzipper.isGzipped(data)) {
data = new DataUnzipper().gunzip(data);
}
this.unmarshalledObject = DynamicSerializationManager this.unmarshalledObject = DynamicSerializationManager
.getManager(SerializationType.Thrift) .getManager(SerializationType.Thrift)
.deserialize(data); .deserialize(data);

View file

@ -20,6 +20,7 @@
package com.raytheon.uf.common.serialization; package com.raytheon.uf.common.serialization;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.List; import java.util.List;
@ -29,6 +30,7 @@ import javax.xml.bind.JAXBException;
import com.raytheon.uf.common.serialization.DynamicSerializationManager.SerializationType; import com.raytheon.uf.common.serialization.DynamicSerializationManager.SerializationType;
import com.raytheon.uf.common.util.ServiceLoaderUtil; import com.raytheon.uf.common.util.ServiceLoaderUtil;
import com.raytheon.uf.common.util.DataUnzipper;
/** /**
* Provides utilities for serialization support * Provides utilities for serialization support
@ -46,6 +48,7 @@ import com.raytheon.uf.common.util.ServiceLoaderUtil;
* Mar 21, 2013 1794 djohnson ServiceLoaderUtil now requires the requesting class. * Mar 21, 2013 1794 djohnson ServiceLoaderUtil now requires the requesting class.
* May 01, 2013 1968 djohnson Prevent deadlock due to SerializableManager threads needing to serialize things. * May 01, 2013 1968 djohnson Prevent deadlock due to SerializableManager threads needing to serialize things.
* Aug 06, 2013 2228 njensen More efficient transformFromThrift(Class, byte[]) * Aug 06, 2013 2228 njensen More efficient transformFromThrift(Class, byte[])
* Aug 13, 2013 2169 bkowal Unzip any gzipped data before applying thrift transformations
* *
* </pre> * </pre>
* *
@ -333,6 +336,14 @@ public final class SerializationUtil {
@Deprecated @Deprecated
public static Object transformFromThrift(byte[] bytes) public static Object transformFromThrift(byte[] bytes)
throws SerializationException { throws SerializationException {
try {
if (DataUnzipper.isGzipped(bytes)) {
return transformFromThrift(Object.class,
new DataUnzipper().gunzip(bytes));
}
} catch (IOException e) {
throw new SerializationException("GZip analysis failed!", e);
}
return transformFromThrift(Object.class, bytes); return transformFromThrift(Object.class, bytes);
} }

View file

@ -37,6 +37,8 @@ import java.util.zip.ZipInputStream;
* Date Ticket# Engineer Description * Date Ticket# Engineer Description
* ------------ ---------- ----------- -------------------------- * ------------ ---------- ----------- --------------------------
* Jul 17, 2012 mschenke Initial creation * Jul 17, 2012 mschenke Initial creation
* Aug 16, 2013 2169 bkowal Added a method to determine if data is gzipped
* using the magic number verification.
* *
* </pre> * </pre>
* *
@ -45,6 +47,21 @@ import java.util.zip.ZipInputStream;
*/ */
public class DataUnzipper { public class DataUnzipper {
public static boolean isGzipped(byte[] data) throws IOException {
ByteArrayInputStream bais = new ByteArrayInputStream(data);
int b = bais.read();
if (b == -1) {
throw new IOException("Unexpected end of input stream encountered!");
}
int x = bais.read();
if (x == -1) {
throw new IOException("Unexpected end of input stream encountered!");
}
int magic = ((int) x << 8) | b;
return magic == GZIPInputStream.GZIP_MAGIC;
}
/** /**
* Uses ZLIB decompression to unzip the data * Uses ZLIB decompression to unzip the data

View file

@ -57,7 +57,6 @@
<bean ref="toDataURI" method="toDataURI"/> <bean ref="toDataURI" method="toDataURI"/>
<bean ref="uriAggregator" method="addDataUris" /> <bean ref="uriAggregator" method="addDataUris" />
<bean ref="uriAggregator" method="sendPracticeQueuedUris" /> <bean ref="uriAggregator" method="sendPracticeQueuedUris" />
<bean ref="serializationUtil" method="transformToThrift" />
<to uri="jms-generic:topic:edex.alerts.practicewarning?timeToLive=60000&amp;deliveryPersistent=false"/> <to uri="jms-generic:topic:edex.alerts.practicewarning?timeToLive=60000&amp;deliveryPersistent=false"/>
</filter> </filter>
<doCatch> <doCatch>

View file

@ -63,11 +63,10 @@
</route> </route>
<route id="notificationTimer"> <route id="notificationTimer">
<from uri="timer://notificationTimer?fixedRate=true&amp;period=5000" /> <from uri="timer://notificationTimer?fixedRate=true&amp;period=1000" />
<filter> <filter>
<method bean="uriAggregator" method="hasUris" /> <method bean="uriAggregator" method="hasUris" />
<bean ref="uriAggregator" method="sendQueuedUris" /> <bean ref="uriAggregator" method="sendQueuedUris" />
<bean ref="serializationUtil" method="transformToThrift" />
<to uri="jms-generic:topic:edex.alerts?timeToLive=60000&amp;deliveryPersistent=false"/> <to uri="jms-generic:topic:edex.alerts?timeToLive=60000&amp;deliveryPersistent=false"/>
</filter> </filter>
</route> </route>

View file

@ -21,9 +21,16 @@ package com.raytheon.uf.edex.ingest.notification;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.zip.GZIPOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import com.raytheon.uf.common.serialization.ISerializableObject;
import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.common.serialization.SerializationUtil;
import com.raytheon.uf.common.dataplugin.message.DataURINotificationMessage; import com.raytheon.uf.common.dataplugin.message.DataURINotificationMessage;
import com.raytheon.uf.common.dataplugin.message.PracticeDataURINotificationMessage; import com.raytheon.uf.common.dataplugin.message.PracticeDataURINotificationMessage;
import com.raytheon.uf.common.util.ByteArrayOutputStreamPool;
/** /**
* Combines multiple messages of URIs into a single message that can be * Combines multiple messages of URIs into a single message that can be
@ -35,6 +42,7 @@ import com.raytheon.uf.common.dataplugin.message.PracticeDataURINotificationMess
* ------------ ---------- ----------- -------------------------- * ------------ ---------- ----------- --------------------------
* Dec 10, 2008 njensen Initial creation * Dec 10, 2008 njensen Initial creation
* Feb 15, 2013 1638 mschenke Moved DataURINotificationMessage to uf.common.dataplugin * Feb 15, 2013 1638 mschenke Moved DataURINotificationMessage to uf.common.dataplugin
* Aug 16, 2013 2169 bkowal gzip data uris
* </pre> * </pre>
* *
* @author njensen * @author njensen
@ -77,14 +85,16 @@ public class DataUriAggregator {
* *
* @return * @return
*/ */
public DataURINotificationMessage sendQueuedUris() { public byte[] sendQueuedUris() throws SerializationException {
DataURINotificationMessage msg = null;
synchronized (this) { synchronized (this) {
String[] uris = dataUris.toArray(new String[dataUris.size()]); String[] uris = dataUris.toArray(new String[dataUris.size()]);
dataUris.clear(); dataUris.clear();
DataURINotificationMessage msg = new DataURINotificationMessage(); msg = new DataURINotificationMessage();
msg.setDataURIs(uris); msg.setDataURIs(uris);
return msg;
} }
return this.encodeMessage(msg);
} }
/** /**
@ -92,13 +102,45 @@ public class DataUriAggregator {
* *
* @return * @return
*/ */
public PracticeDataURINotificationMessage sendPracticeQueuedUris() { public byte[] sendPracticeQueuedUris() throws SerializationException {
PracticeDataURINotificationMessage msg = null;
synchronized (this) { synchronized (this) {
String[] uris = dataUris.toArray(new String[dataUris.size()]); String[] uris = dataUris.toArray(new String[dataUris.size()]);
dataUris.clear(); dataUris.clear();
PracticeDataURINotificationMessage msg = new PracticeDataURINotificationMessage(); msg = new PracticeDataURINotificationMessage();
msg.setDataURIs(uris); msg.setDataURIs(uris);
return msg; }
return this.encodeMessage(msg);
}
public byte[] encodeMessage(ISerializableObject msg)
throws SerializationException {
ByteArrayOutputStream baos = ByteArrayOutputStreamPool.getInstance()
.getStream();
GZIPOutputStream gzippedURIs = null;
try {
gzippedURIs = new GZIPOutputStream(baos);
} catch (IOException e) {
throw new SerializationException(
"Failed to prepare the gzipped data stream!", e);
}
SerializationUtil.transformToThriftUsingStream(msg, gzippedURIs);
try {
gzippedURIs.finish();
gzippedURIs.flush();
return baos.toByteArray();
} catch (IOException e) {
throw new SerializationException(
"Failed to write the gzipped data stream!", e);
} finally {
try {
gzippedURIs.close();
} catch (IOException e) {
// ignore, we no longer need the stream
}
} }
} }
} }

View file

@ -60,6 +60,7 @@
<option defaultValue="true" id="gnu.cpp.link.so.release.option.shared.393797976" name="Shared (-shared)" superClass="gnu.cpp.link.so.release.option.shared" valueType="boolean"/> <option defaultValue="true" id="gnu.cpp.link.so.release.option.shared.393797976" name="Shared (-shared)" superClass="gnu.cpp.link.so.release.option.shared" valueType="boolean"/>
<option id="gnu.cpp.link.option.libs.60193779" name="Libraries (-l)" superClass="gnu.cpp.link.option.libs" valueType="libs"> <option id="gnu.cpp.link.option.libs.60193779" name="Libraries (-l)" superClass="gnu.cpp.link.option.libs" valueType="libs">
<listOptionValue builtIn="false" value="thrift"/> <listOptionValue builtIn="false" value="thrift"/>
<listOptionValue builtIn="false" value="boost_iostreams"/>
<listOptionValue builtIn="false" value="uuid"/> <listOptionValue builtIn="false" value="uuid"/>
<listOptionValue builtIn="false" value="qpidcommon"/> <listOptionValue builtIn="false" value="qpidcommon"/>
<listOptionValue builtIn="false" value="qpidtypes"/> <listOptionValue builtIn="false" value="qpidtypes"/>
@ -157,6 +158,7 @@
<option defaultValue="true" id="gnu.cpp.link.so.release.option.shared.340284127" name="Shared (-shared)" superClass="gnu.cpp.link.so.release.option.shared" valueType="boolean"/> <option defaultValue="true" id="gnu.cpp.link.so.release.option.shared.340284127" name="Shared (-shared)" superClass="gnu.cpp.link.so.release.option.shared" valueType="boolean"/>
<option id="gnu.cpp.link.option.libs.1646606439" name="Libraries (-l)" superClass="gnu.cpp.link.option.libs" valueType="libs"> <option id="gnu.cpp.link.option.libs.1646606439" name="Libraries (-l)" superClass="gnu.cpp.link.option.libs" valueType="libs">
<listOptionValue builtIn="false" value="thrift"/> <listOptionValue builtIn="false" value="thrift"/>
<listOptionValue builtIn="false" value="boost_iostreams"/>
<listOptionValue builtIn="false" value="uuid"/> <listOptionValue builtIn="false" value="uuid"/>
<listOptionValue builtIn="false" value="qpidcommon"/> <listOptionValue builtIn="false" value="qpidcommon"/>
<listOptionValue builtIn="false" value="qpidtypes"/> <listOptionValue builtIn="false" value="qpidtypes"/>

View file

@ -29,6 +29,7 @@
* ------------ ---------- ----------- -------------------------- * ------------ ---------- ----------- --------------------------
* 11/2/09 3375 brockwoo Initial Creation * 11/2/09 3375 brockwoo Initial Creation
* 08/13/13 2257 bkowal Update for qpid 0.18. * 08/13/13 2257 bkowal Update for qpid 0.18.
* 08/15/13 2169 bkowal Qpid messages are now decompressed
* *
* </pre> * </pre>
* *
@ -37,7 +38,11 @@
*/ */
#include <iostream> #include <iostream>
#include <fstream>
#include <sstream> #include <sstream>
#include <boost/iostreams/filtering_streambuf.hpp>
#include <boost/iostreams/copy.hpp>
#include <boost/iostreams/filter/gzip.hpp>
#include <uuid/uuid.h> #include <uuid/uuid.h>
#include <qpid/messaging/Connection.h> #include <qpid/messaging/Connection.h>
#include <qpid/messaging/Duration.h> #include <qpid/messaging/Duration.h>
@ -103,7 +108,29 @@ void EdexNotification::listen() {
if (result) { if (result) {
this->session.acknowledge(message); this->session.acknowledge(message);
std::string output = message.getContent(); std::string output = message.getContent();
std::stringstream compressedData(output);
std::stringstream decompressedData;
/* decompress the data. */
try
{
boost::iostreams::filtering_streambuf<boost::iostreams::input> in;
in.push(boost::iostreams::gzip_decompressor());
in.push(compressedData);
boost::iostreams::copy(in, decompressedData);
}
catch (const boost::iostreams::gzip_error& error)
{
std::cout << "ERROR: Failed to decompress gzip data - "
<< error.error() << std::endl;
this->cleanup();
throw;
}
output = decompressedData.str();
uint8_t * data = (uint8_t *) output.c_str(); uint8_t * data = (uint8_t *) output.c_str();
TMemoryBuffer * buffer = new TMemoryBuffer(data, TMemoryBuffer * buffer = new TMemoryBuffer(data,
output.length(), output.length(),
apache::thrift::transport::TMemoryBuffer::COPY); apache::thrift::transport::TMemoryBuffer::COPY);

View file

@ -28,20 +28,22 @@
# Date Ticket# Engineer Description # Date Ticket# Engineer Description
# ------------ ---------- ----------- -------------------------- # ------------ ---------- ----------- --------------------------
# 11/17/10 njensen Initial Creation. # 11/17/10 njensen Initial Creation.
# # 08/15/13 2169 bkowal Optionally gzip decompress any data that is read.
# #
# #
import qpid import qpid
import zlib
from Queue import Empty from Queue import Empty
from qpid.exceptions import Closed from qpid.exceptions import Closed
class QpidSubscriber: class QpidSubscriber:
def __init__(self, host='127.0.0.1', port=5672): def __init__(self, host='127.0.0.1', port=5672, decompress=False):
self.host = host self.host = host
self.port = port self.port = port
self.decompress = decompress;
socket = qpid.util.connect(host, port) socket = qpid.util.connect(host, port)
self.__connection = qpid.connection.Connection(sock=socket, username='guest', password='guest') self.__connection = qpid.connection.Connection(sock=socket, username='guest', password='guest')
self.__connection.start() self.__connection.start()
@ -49,6 +51,11 @@ class QpidSubscriber:
self.subscribed = True self.subscribed = True
def topicSubscribe(self, topicName, callback): def topicSubscribe(self, topicName, callback):
# if the queue is edex.alerts, set decompress to true always for now to
# maintain compatibility with existing python scripts.
if (topicName == 'edex.alerts'):
self.decompress = True
print "Establishing connection to broker on", self.host print "Establishing connection to broker on", self.host
queueName = topicName + self.__session.name queueName = topicName + self.__session.name
self.__session.queue_declare(queue=queueName, exclusive=True, auto_delete=True, arguments={'qpid.max_count':100, 'qpid.policy_type':'ring'}) self.__session.queue_declare(queue=queueName, exclusive=True, auto_delete=True, arguments={'qpid.max_count':100, 'qpid.policy_type':'ring'})
@ -66,7 +73,16 @@ class QpidSubscriber:
try: try:
message = queue.get(timeout=10) message = queue.get(timeout=10)
content = message.body content = message.body
self.__session.message_accept(qpid.datatypes.RangedSet(message.id)) self.__session.message_accept(qpid.datatypes.RangedSet(message.id))
if (self.decompress):
print "Decompressing received content"
try:
# http://stackoverflow.com/questions/2423866/python-decompressing-gzip-chunk-by-chunk
d = zlib.decompressobj(16+zlib.MAX_WBITS)
content = d.decompress(content)
except:
# decompression failed, return the original content
pass
callback(content) callback(content)
except Empty: except Empty:
pass pass

View file

@ -28,6 +28,7 @@
# Date Ticket# Engineer Description # Date Ticket# Engineer Description
# ------------ ---------- ----------- -------------------------- # ------------ ---------- ----------- --------------------------
# 03/09/11 njensen Initial Creation. # 03/09/11 njensen Initial Creation.
# 08/15/13 2169 bkowal Decompress data read from the queue
# #
# #
# #
@ -52,7 +53,7 @@ class ListenThread(threading.Thread):
def run(self): def run(self):
from ufpy import QpidSubscriber from ufpy import QpidSubscriber
self.qs = QpidSubscriber.QpidSubscriber(self.hostname, self.portNumber) self.qs = QpidSubscriber.QpidSubscriber(self.hostname, self.portNumber, True)
self.qs.topicSubscribe(self.topicName, self.receivedMessage) self.qs.topicSubscribe(self.topicName, self.receivedMessage)
def receivedMessage(self, msg): def receivedMessage(self, msg):