Issue #2169 - gzip data uris; entities that receive data uris will unzip them before running thrift conversions.

- Amend: updates for 32-bit edex_com
- Amend: do not synchronize when building the message
- Amend: use the byte array pool
- Amend: default to data decompression when the queue is edex.alerts

Change-Id: I79a1f3d3d698c67019a4aa33e667fef99f6ebc17

Former-commit-id: 3eaacdb522 [formerly 3eaacdb522 [formerly 120aa0d6ee671e20734f8efb7bad89ff1b145629]]
Former-commit-id: 2e1a8eb2b3
Former-commit-id: 5c0af47d6d
This commit is contained in:
Bryan Kowal 2013-08-16 11:13:43 -05:00
parent 5dd5e0bf30
commit 6f246409fd
11 changed files with 135 additions and 16 deletions

View file

@ -43,6 +43,7 @@ import com.raytheon.uf.viz.core.VizApp;
* Nov 2, 2011 #7391 bkowal Ensure that the generated WsId is properly formatted to be
* included in a url.
* May 09, 2013 1814 rjpeter Updated prefetch to 10.
* Aug 16, 2013 2169 bkowal CAVE will now synchronously acknowledge messages.
* </pre>
*
* @author chammack
@ -82,7 +83,7 @@ public class JMSConnection {
"UTF-8")
+ "/edex?brokerlist='"
+ this.jndiProviderUrl
+ "?connecttimeout='5000'&heartbeat='0''&maxprefetch='10'&sync_publish='all'&failover='nofailover'");
+ "?connecttimeout='5000'&heartbeat='0''&maxprefetch='10'&sync_publish='all'&failover='nofailover'&sync_ack='true'");
} catch (URLSyntaxException e) {
// TODO Auto-generated catch block
e.printStackTrace();

View file

@ -32,6 +32,7 @@ import com.raytheon.uf.common.serialization.DynamicSerializationManager;
import com.raytheon.uf.common.serialization.DynamicSerializationManager.SerializationType;
import com.raytheon.uf.common.serialization.SerializationUtil;
import com.raytheon.uf.viz.core.exception.VizException;
import com.raytheon.uf.common.util.DataUnzipper;
/**
* Encapsulation object for notification messages
@ -44,6 +45,7 @@ import com.raytheon.uf.viz.core.exception.VizException;
* Oct 4, 2010 7193 cjeanbap Added a new method, isNotExpired().
* Feb 1, 2011 7193 cjeanbap Added a new method, getPublishedTime().
* Aug 6, 2013 2228 njensen Use deserialize(byte[])
* Aug 16, 2013 2169 bkowal Unzip any gzipped information
* </pre>
*
* @author chammack
@ -98,7 +100,9 @@ public class NotificationMessage {
throw new NotificationException(
"Message payload terminated early. Expected: "
+ length + ". Got: " + readLength);
if (DataUnzipper.isGzipped(data)) {
data = new DataUnzipper().gunzip(data);
}
this.unmarshalledObject = DynamicSerializationManager
.getManager(SerializationType.Thrift)
.deserialize(data);

View file

@ -20,6 +20,7 @@
package com.raytheon.uf.common.serialization;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
@ -29,6 +30,7 @@ import javax.xml.bind.JAXBException;
import com.raytheon.uf.common.serialization.DynamicSerializationManager.SerializationType;
import com.raytheon.uf.common.util.ServiceLoaderUtil;
import com.raytheon.uf.common.util.DataUnzipper;
/**
* Provides utilities for serialization support
@ -46,6 +48,7 @@ import com.raytheon.uf.common.util.ServiceLoaderUtil;
* Mar 21, 2013 1794 djohnson ServiceLoaderUtil now requires the requesting class.
* May 01, 2013 1968 djohnson Prevent deadlock due to SerializableManager threads needing to serialize things.
* Aug 06, 2013 2228 njensen More efficient transformFromThrift(Class, byte[])
* Aug 13, 2013 2169 bkowal Unzip any gzipped data before applying thrift transformations
*
* </pre>
*
@ -333,6 +336,14 @@ public final class SerializationUtil {
@Deprecated
public static Object transformFromThrift(byte[] bytes)
throws SerializationException {
try {
if (DataUnzipper.isGzipped(bytes)) {
return transformFromThrift(Object.class,
new DataUnzipper().gunzip(bytes));
}
} catch (IOException e) {
throw new SerializationException("GZip analysis failed!", e);
}
return transformFromThrift(Object.class, bytes);
}

View file

@ -37,6 +37,8 @@ import java.util.zip.ZipInputStream;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Jul 17, 2012 mschenke Initial creation
* Aug 16, 2013 2169 bkowal Added a method to determine if data is gzipped
* using the magic number verification.
*
* </pre>
*
@ -45,6 +47,21 @@ import java.util.zip.ZipInputStream;
*/
public class DataUnzipper {
public static boolean isGzipped(byte[] data) throws IOException {
ByteArrayInputStream bais = new ByteArrayInputStream(data);
int b = bais.read();
if (b == -1) {
throw new IOException("Unexpected end of input stream encountered!");
}
int x = bais.read();
if (x == -1) {
throw new IOException("Unexpected end of input stream encountered!");
}
int magic = ((int) x << 8) | b;
return magic == GZIPInputStream.GZIP_MAGIC;
}
/**
* Uses ZLIB decompression to unzip the data

View file

@ -57,7 +57,6 @@
<bean ref="toDataURI" method="toDataURI"/>
<bean ref="uriAggregator" method="addDataUris" />
<bean ref="uriAggregator" method="sendPracticeQueuedUris" />
<bean ref="serializationUtil" method="transformToThrift" />
<to uri="jms-generic:topic:edex.alerts.practicewarning?timeToLive=60000&amp;deliveryPersistent=false"/>
</filter>
<doCatch>

View file

@ -63,11 +63,10 @@
</route>
<route id="notificationTimer">
<from uri="timer://notificationTimer?fixedRate=true&amp;period=5000" />
<from uri="timer://notificationTimer?fixedRate=true&amp;period=1000" />
<filter>
<method bean="uriAggregator" method="hasUris" />
<bean ref="uriAggregator" method="sendQueuedUris" />
<bean ref="serializationUtil" method="transformToThrift" />
<to uri="jms-generic:topic:edex.alerts?timeToLive=60000&amp;deliveryPersistent=false"/>
</filter>
</route>

View file

@ -21,9 +21,16 @@ package com.raytheon.uf.edex.ingest.notification;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.GZIPOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import com.raytheon.uf.common.serialization.ISerializableObject;
import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.common.serialization.SerializationUtil;
import com.raytheon.uf.common.dataplugin.message.DataURINotificationMessage;
import com.raytheon.uf.common.dataplugin.message.PracticeDataURINotificationMessage;
import com.raytheon.uf.common.util.ByteArrayOutputStreamPool;
/**
* Combines multiple messages of URIs into a single message that can be
@ -35,6 +42,7 @@ import com.raytheon.uf.common.dataplugin.message.PracticeDataURINotificationMess
* ------------ ---------- ----------- --------------------------
* Dec 10, 2008 njensen Initial creation
* Feb 15, 2013 1638 mschenke Moved DataURINotificationMessage to uf.common.dataplugin
* Aug 16, 2013 2169 bkowal gzip data uris
* </pre>
*
* @author njensen
@ -77,14 +85,16 @@ public class DataUriAggregator {
*
* @return
*/
public DataURINotificationMessage sendQueuedUris() {
public byte[] sendQueuedUris() throws SerializationException {
DataURINotificationMessage msg = null;
synchronized (this) {
String[] uris = dataUris.toArray(new String[dataUris.size()]);
dataUris.clear();
DataURINotificationMessage msg = new DataURINotificationMessage();
msg = new DataURINotificationMessage();
msg.setDataURIs(uris);
return msg;
}
return this.encodeMessage(msg);
}
/**
@ -92,13 +102,45 @@ public class DataUriAggregator {
*
* @return
*/
public PracticeDataURINotificationMessage sendPracticeQueuedUris() {
public byte[] sendPracticeQueuedUris() throws SerializationException {
PracticeDataURINotificationMessage msg = null;
synchronized (this) {
String[] uris = dataUris.toArray(new String[dataUris.size()]);
dataUris.clear();
PracticeDataURINotificationMessage msg = new PracticeDataURINotificationMessage();
msg = new PracticeDataURINotificationMessage();
msg.setDataURIs(uris);
return msg;
}
return this.encodeMessage(msg);
}
public byte[] encodeMessage(ISerializableObject msg)
throws SerializationException {
ByteArrayOutputStream baos = ByteArrayOutputStreamPool.getInstance()
.getStream();
GZIPOutputStream gzippedURIs = null;
try {
gzippedURIs = new GZIPOutputStream(baos);
} catch (IOException e) {
throw new SerializationException(
"Failed to prepare the gzipped data stream!", e);
}
SerializationUtil.transformToThriftUsingStream(msg, gzippedURIs);
try {
gzippedURIs.finish();
gzippedURIs.flush();
return baos.toByteArray();
} catch (IOException e) {
throw new SerializationException(
"Failed to write the gzipped data stream!", e);
} finally {
try {
gzippedURIs.close();
} catch (IOException e) {
// ignore, we no longer need the stream
}
}
}
}
}

View file

@ -60,6 +60,7 @@
<option defaultValue="true" id="gnu.cpp.link.so.release.option.shared.393797976" name="Shared (-shared)" superClass="gnu.cpp.link.so.release.option.shared" valueType="boolean"/>
<option id="gnu.cpp.link.option.libs.60193779" name="Libraries (-l)" superClass="gnu.cpp.link.option.libs" valueType="libs">
<listOptionValue builtIn="false" value="thrift"/>
<listOptionValue builtIn="false" value="boost_iostreams"/>
<listOptionValue builtIn="false" value="uuid"/>
<listOptionValue builtIn="false" value="qpidcommon"/>
<listOptionValue builtIn="false" value="qpidtypes"/>
@ -157,6 +158,7 @@
<option defaultValue="true" id="gnu.cpp.link.so.release.option.shared.340284127" name="Shared (-shared)" superClass="gnu.cpp.link.so.release.option.shared" valueType="boolean"/>
<option id="gnu.cpp.link.option.libs.1646606439" name="Libraries (-l)" superClass="gnu.cpp.link.option.libs" valueType="libs">
<listOptionValue builtIn="false" value="thrift"/>
<listOptionValue builtIn="false" value="boost_iostreams"/>
<listOptionValue builtIn="false" value="uuid"/>
<listOptionValue builtIn="false" value="qpidcommon"/>
<listOptionValue builtIn="false" value="qpidtypes"/>

View file

@ -29,6 +29,7 @@
* ------------ ---------- ----------- --------------------------
* 11/2/09 3375 brockwoo Initial Creation
* 08/13/13 2257 bkowal Update for qpid 0.18.
* 08/15/13 2169 bkowal Qpid messages are now decompressed
*
* </pre>
*
@ -37,7 +38,11 @@
*/
#include <iostream>
#include <fstream>
#include <sstream>
#include <boost/iostreams/filtering_streambuf.hpp>
#include <boost/iostreams/copy.hpp>
#include <boost/iostreams/filter/gzip.hpp>
#include <uuid/uuid.h>
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Duration.h>
@ -103,7 +108,29 @@ void EdexNotification::listen() {
if (result) {
this->session.acknowledge(message);
std::string output = message.getContent();
std::stringstream compressedData(output);
std::stringstream decompressedData;
/* decompress the data. */
try
{
boost::iostreams::filtering_streambuf<boost::iostreams::input> in;
in.push(boost::iostreams::gzip_decompressor());
in.push(compressedData);
boost::iostreams::copy(in, decompressedData);
}
catch (const boost::iostreams::gzip_error& error)
{
std::cout << "ERROR: Failed to decompress gzip data - "
<< error.error() << std::endl;
this->cleanup();
throw;
}
output = decompressedData.str();
uint8_t * data = (uint8_t *) output.c_str();
TMemoryBuffer * buffer = new TMemoryBuffer(data,
output.length(),
apache::thrift::transport::TMemoryBuffer::COPY);

View file

@ -28,20 +28,22 @@
# Date Ticket# Engineer Description
# ------------ ---------- ----------- --------------------------
# 11/17/10 njensen Initial Creation.
#
# 08/15/13 2169 bkowal Optionally gzip decompress any data that is read.
#
#
import qpid
import zlib
from Queue import Empty
from qpid.exceptions import Closed
class QpidSubscriber:
def __init__(self, host='127.0.0.1', port=5672):
def __init__(self, host='127.0.0.1', port=5672, decompress=False):
self.host = host
self.port = port
self.decompress = decompress;
socket = qpid.util.connect(host, port)
self.__connection = qpid.connection.Connection(sock=socket, username='guest', password='guest')
self.__connection.start()
@ -49,6 +51,11 @@ class QpidSubscriber:
self.subscribed = True
def topicSubscribe(self, topicName, callback):
# if the queue is edex.alerts, set decompress to true always for now to
# maintain compatibility with existing python scripts.
if (topicName == 'edex.alerts'):
self.decompress = True
print "Establishing connection to broker on", self.host
queueName = topicName + self.__session.name
self.__session.queue_declare(queue=queueName, exclusive=True, auto_delete=True, arguments={'qpid.max_count':100, 'qpid.policy_type':'ring'})
@ -66,7 +73,16 @@ class QpidSubscriber:
try:
message = queue.get(timeout=10)
content = message.body
self.__session.message_accept(qpid.datatypes.RangedSet(message.id))
self.__session.message_accept(qpid.datatypes.RangedSet(message.id))
if (self.decompress):
print "Decompressing received content"
try:
# http://stackoverflow.com/questions/2423866/python-decompressing-gzip-chunk-by-chunk
d = zlib.decompressobj(16+zlib.MAX_WBITS)
content = d.decompress(content)
except:
# decompression failed, return the original content
pass
callback(content)
except Empty:
pass

View file

@ -28,6 +28,7 @@
# Date Ticket# Engineer Description
# ------------ ---------- ----------- --------------------------
# 03/09/11 njensen Initial Creation.
# 08/15/13 2169 bkowal Decompress data read from the queue
#
#
#
@ -52,7 +53,7 @@ class ListenThread(threading.Thread):
def run(self):
from ufpy import QpidSubscriber
self.qs = QpidSubscriber.QpidSubscriber(self.hostname, self.portNumber)
self.qs = QpidSubscriber.QpidSubscriber(self.hostname, self.portNumber, True)
self.qs.topicSubscribe(self.topicName, self.receivedMessage)
def receivedMessage(self, msg):