python-awips/awips/QpidSubscriber.py

123 lines
4.7 KiB
Python
Raw Permalink Normal View History

2015-06-12 11:57:06 -06:00
##
# This software was developed and / or modified by Raytheon Company,
# pursuant to Contract DG133W-05-CQ-1067 with the US Government.
2016-03-16 16:32:17 -05:00
#
2015-06-12 11:57:06 -06:00
# U.S. EXPORT CONTROLLED TECHNICAL DATA
# This software product contains export-restricted data whose
# export/transfer/disclosure is restricted by U.S. law. Dissemination
# to non-U.S. persons whether in the United States or abroad requires
# an export license or other authorization.
2016-03-16 16:32:17 -05:00
#
2015-06-12 11:57:06 -06:00
# Contractor Name: Raytheon Company
# Contractor Address: 6825 Pine Street, Suite 340
# Mail Stop B8
# Omaha, NE 68106
# 402.291.0100
2016-03-16 16:32:17 -05:00
#
2015-06-12 11:57:06 -06:00
# See the AWIPS II Master Rights File ("Master Rights File.pdf") for
# further licensing information.
##
#
# Provides a Python-based interface for subscribing to qpid queues and topics.
#
2016-03-16 16:32:17 -05:00
#
#
2015-06-12 11:57:06 -06:00
# SOFTWARE HISTORY
2016-03-16 16:32:17 -05:00
#
2015-06-12 11:57:06 -06:00
# Date Ticket# Engineer Description
# ------------ ---------- ----------- --------------------------
# 11/17/10 njensen Initial Creation.
# 08/15/13 2169 bkowal Optionally gzip decompress any data that is read.
# 08/04/16 2416 tgurney Add queueStarted property
2018-06-19 09:18:30 -06:00
# 02/16/17 6084 bsteffen Support ssl connections
# 09/07/17 6175 tgurney Remove "decompressing" log message
2016-03-16 16:32:17 -05:00
#
2015-06-12 11:57:06 -06:00
#
2018-06-19 09:18:30 -06:00
import os
import os.path
2015-06-12 11:57:06 -06:00
import qpid
import zlib
from Queue import Empty
from qpid.exceptions import Closed
class QpidSubscriber:
2016-03-16 16:32:17 -05:00
2018-06-19 09:18:30 -06:00
def __init__(self, host='127.0.0.1', port=5672, decompress=False, ssl=None):
2015-06-12 11:57:06 -06:00
self.host = host
self.port = port
self.decompress = decompress;
socket = qpid.util.connect(host, port)
2018-06-19 09:18:30 -06:00
if "QPID_SSL_CERT_DB" in os.environ:
certdb = os.environ["QPID_SSL_CERT_DB"]
else:
certdb = os.path.expanduser("~/.qpid/")
if "QPID_SSL_CERT_NAME" in os.environ:
certname = os.environ["QPID_SSL_CERT_NAME"]
else:
certname = "guest"
certfile = os.path.join(certdb, certname + ".crt")
if ssl or (ssl is None and os.path.exists(certfile)):
keyfile = os.path.join(certdb, certname + ".key")
trustfile = os.path.join(certdb, "root.crt")
socket = qpid.util.ssl(socket, keyfile=keyfile, certfile=certfile, ca_certs=trustfile)
2015-06-12 11:57:06 -06:00
self.__connection = qpid.connection.Connection(sock=socket, username='guest', password='guest')
self.__connection.start()
self.__session = self.__connection.session(str(qpid.datatypes.uuid4()))
self.subscribed = True
self.__queueStarted = False
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
def topicSubscribe(self, topicName, callback):
# if the queue is edex.alerts, set decompress to true always for now to
# maintain compatibility with existing python scripts.
if (topicName == 'edex.alerts'):
2016-03-16 16:32:17 -05:00
self.decompress = True
2015-06-12 11:57:06 -06:00
print "Establishing connection to broker on", self.host
queueName = topicName + self.__session.name
self.__session.queue_declare(queue=queueName, exclusive=True, auto_delete=True, arguments={'qpid.max_count':100, 'qpid.policy_type':'ring'})
self.__session.exchange_bind(exchange='amq.topic', queue=queueName, binding_key=topicName)
2016-03-16 16:32:17 -05:00
self.__innerSubscribe(queueName, callback)
2015-06-12 11:57:06 -06:00
def __innerSubscribe(self, serverQueueName, callback):
2016-03-16 16:32:17 -05:00
local_queue_name = 'local_queue_' + serverQueueName
2015-06-12 11:57:06 -06:00
queue = self.__session.incoming(local_queue_name)
self.__session.message_subscribe(serverQueueName, destination=local_queue_name)
queue.start()
print "Connection complete to broker on", self.host
self.__queueStarted = True
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
while self.subscribed:
try:
message = queue.get(timeout=10)
content = message.body
self.__session.message_accept(qpid.datatypes.RangedSet(message.id))
if (self.decompress):
try:
# http://stackoverflow.com/questions/2423866/python-decompressing-gzip-chunk-by-chunk
d = zlib.decompressobj(16+zlib.MAX_WBITS)
content = d.decompress(content)
except Exception:
2015-06-12 11:57:06 -06:00
# decompression failed, return the original content
2016-03-16 16:32:17 -05:00
pass
2015-06-12 11:57:06 -06:00
callback(content)
except Empty:
pass
except Closed:
self.close()
def close(self):
self.__queueStarted = False
2015-06-12 11:57:06 -06:00
self.subscribed = False
try:
self.__session.close(timeout=10)
except Exception:
2015-06-12 11:57:06 -06:00
pass
@property
def queueStarted(self):
return self.__queueStarted
2018-06-19 09:18:30 -06:00