2015-06-12 11:57:06 -06:00
|
|
|
##
|
|
|
|
# This software was developed and / or modified by Raytheon Company,
|
|
|
|
# pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
2016-03-16 16:32:17 -05:00
|
|
|
#
|
2015-06-12 11:57:06 -06:00
|
|
|
# U.S. EXPORT CONTROLLED TECHNICAL DATA
|
|
|
|
# This software product contains export-restricted data whose
|
|
|
|
# export/transfer/disclosure is restricted by U.S. law. Dissemination
|
|
|
|
# to non-U.S. persons whether in the United States or abroad requires
|
|
|
|
# an export license or other authorization.
|
2016-03-16 16:32:17 -05:00
|
|
|
#
|
2015-06-12 11:57:06 -06:00
|
|
|
# Contractor Name: Raytheon Company
|
|
|
|
# Contractor Address: 6825 Pine Street, Suite 340
|
|
|
|
# Mail Stop B8
|
|
|
|
# Omaha, NE 68106
|
|
|
|
# 402.291.0100
|
2016-03-16 16:32:17 -05:00
|
|
|
#
|
2015-06-12 11:57:06 -06:00
|
|
|
# See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
|
|
|
# further licensing information.
|
|
|
|
##
|
|
|
|
|
|
|
|
#
|
|
|
|
# Provides a Python-based interface for subscribing to qpid queues and topics.
|
|
|
|
#
|
2016-03-16 16:32:17 -05:00
|
|
|
#
|
|
|
|
#
|
2015-06-12 11:57:06 -06:00
|
|
|
# SOFTWARE HISTORY
|
2016-03-16 16:32:17 -05:00
|
|
|
#
|
2015-06-12 11:57:06 -06:00
|
|
|
# Date Ticket# Engineer Description
|
|
|
|
# ------------ ---------- ----------- --------------------------
|
|
|
|
# 11/17/10 njensen Initial Creation.
|
|
|
|
# 08/15/13 2169 bkowal Optionally gzip decompress any data that is read.
|
2017-04-22 11:16:52 -06:00
|
|
|
# 08/04/16 2416 tgurney Add queueStarted property
|
2016-03-16 16:32:17 -05:00
|
|
|
#
|
2015-06-12 11:57:06 -06:00
|
|
|
#
|
|
|
|
|
|
|
|
import qpid
|
|
|
|
import zlib
|
|
|
|
|
|
|
|
from Queue import Empty
|
|
|
|
from qpid.exceptions import Closed
|
|
|
|
|
|
|
|
class QpidSubscriber:
|
2016-03-16 16:32:17 -05:00
|
|
|
|
2015-06-12 11:57:06 -06:00
|
|
|
def __init__(self, host='127.0.0.1', port=5672, decompress=False):
|
|
|
|
self.host = host
|
|
|
|
self.port = port
|
|
|
|
self.decompress = decompress;
|
|
|
|
socket = qpid.util.connect(host, port)
|
|
|
|
self.__connection = qpid.connection.Connection(sock=socket, username='guest', password='guest')
|
|
|
|
self.__connection.start()
|
|
|
|
self.__session = self.__connection.session(str(qpid.datatypes.uuid4()))
|
|
|
|
self.subscribed = True
|
2017-04-22 11:16:52 -06:00
|
|
|
self.__queueStarted = False
|
2016-03-16 16:32:17 -05:00
|
|
|
|
2015-06-12 11:57:06 -06:00
|
|
|
def topicSubscribe(self, topicName, callback):
|
|
|
|
# if the queue is edex.alerts, set decompress to true always for now to
|
|
|
|
# maintain compatibility with existing python scripts.
|
|
|
|
if (topicName == 'edex.alerts'):
|
2016-03-16 16:32:17 -05:00
|
|
|
self.decompress = True
|
|
|
|
|
2015-06-12 11:57:06 -06:00
|
|
|
print "Establishing connection to broker on", self.host
|
|
|
|
queueName = topicName + self.__session.name
|
|
|
|
self.__session.queue_declare(queue=queueName, exclusive=True, auto_delete=True, arguments={'qpid.max_count':100, 'qpid.policy_type':'ring'})
|
|
|
|
self.__session.exchange_bind(exchange='amq.topic', queue=queueName, binding_key=topicName)
|
2016-03-16 16:32:17 -05:00
|
|
|
self.__innerSubscribe(queueName, callback)
|
|
|
|
|
2015-06-12 11:57:06 -06:00
|
|
|
def __innerSubscribe(self, serverQueueName, callback):
|
2016-03-16 16:32:17 -05:00
|
|
|
local_queue_name = 'local_queue_' + serverQueueName
|
2015-06-12 11:57:06 -06:00
|
|
|
queue = self.__session.incoming(local_queue_name)
|
|
|
|
self.__session.message_subscribe(serverQueueName, destination=local_queue_name)
|
|
|
|
queue.start()
|
|
|
|
print "Connection complete to broker on", self.host
|
2017-04-22 11:16:52 -06:00
|
|
|
self.__queueStarted = True
|
2016-03-16 16:32:17 -05:00
|
|
|
|
2015-06-12 11:57:06 -06:00
|
|
|
while self.subscribed:
|
|
|
|
try:
|
|
|
|
message = queue.get(timeout=10)
|
|
|
|
content = message.body
|
|
|
|
self.__session.message_accept(qpid.datatypes.RangedSet(message.id))
|
|
|
|
if (self.decompress):
|
|
|
|
print "Decompressing received content"
|
|
|
|
try:
|
|
|
|
# http://stackoverflow.com/questions/2423866/python-decompressing-gzip-chunk-by-chunk
|
|
|
|
d = zlib.decompressobj(16+zlib.MAX_WBITS)
|
|
|
|
content = d.decompress(content)
|
2017-04-22 11:16:52 -06:00
|
|
|
except Exception:
|
2015-06-12 11:57:06 -06:00
|
|
|
# decompression failed, return the original content
|
2016-03-16 16:32:17 -05:00
|
|
|
pass
|
2015-06-12 11:57:06 -06:00
|
|
|
callback(content)
|
|
|
|
except Empty:
|
|
|
|
pass
|
|
|
|
except Closed:
|
|
|
|
self.close()
|
|
|
|
|
|
|
|
def close(self):
|
2017-04-22 11:16:52 -06:00
|
|
|
self.__queueStarted = False
|
2015-06-12 11:57:06 -06:00
|
|
|
self.subscribed = False
|
|
|
|
try:
|
|
|
|
self.__session.close(timeout=10)
|
2017-04-22 11:16:52 -06:00
|
|
|
except Exception:
|
2015-06-12 11:57:06 -06:00
|
|
|
pass
|
2017-04-22 11:16:52 -06:00
|
|
|
|
|
|
|
@property
|
|
|
|
def queueStarted(self):
|
|
|
|
return self.__queueStarted
|