python-awips/ufpy/QpidSubscriber.py
Shay Carter 5f070d0655 Commit of all v20 ufpy code:
- code brought over from the following raytheon repos and directories:
  - awips2 repo:
    - awips2/pythonPackages
    - awips2/edexOsgi/com.raytheon.uf.common.alertviz/pythonPackages
    - awips2/edexOsgi/com.raytheon.uf.common.mpe/pythonPackages
    - awips2/edexOsgi/com.raytheon.uf.common.dataplugin.text/pythonPackages
    - awips2/edexOsgi/com.raytheon.uf.common.dataplugin.grid/pythonPackages
    - awips2/edexOsgi/com.raytheon.uf.common.activetable/pythonPackages
    - awips2/edexOsgi/com.raytheon.uf.common.management/pythonPackages
    - awips2/edexOsgi/com.raytheon.uf.common.dataplugin.gfe/pythonPackages
    - awips2/edexOsgi/com.raytheon.uf.common.dataplugin.radar/pythonPackages
    - awips2/edexOsgi/com.raytheon.uf.common.site/pythonPackages
  - awips2-core repo:
    - awips2-core/common/com.raytheon.uf.common.auth/pythonPackages
    - awips2-core/common/com.raytheon.uf.common.message/pythonPackages
    - awips2-core/common/com.raytheon.uf.common.localization/pythonPackages
    - awips2-core/common/com.raytheon.uf.common.datastorage/pythonPackages
    - awips2-core/common/com.raytheon.uf.common.pointdata/pythonPackages
    - awips2-core/common/com.raythoen.uf.common.pypies/pythonPackages
    - awips2-core/common/com.raytheon.uf.common.dataaccess/pythonPackages
    - awips2-core/common/com.raytheon.uf.common.dataplugin.level/pythonPackages
    - awips2-core/common/com.raytheon.uf.common.serialization/pythonPackages
    - awips2-core/common/com.raytheon.uf.common.time/pythonPackages
    - awips2-core/common/com.raytheon.uf.common.dataplugin/pythonPackages
    - awips2-core/common/com.raytheon.uf.common.dataquery/pythonPackages
  - awips2-rpm repo: had to untar and unzip the thirft repo, then go into /lib/py and run `python setup.py build` and then copy in from the build/ subdirectory
    -foss/thrift-0.14.1/packaged/thrift-0.14.1/lib/py/build/lib.macosx-10.9-x86_64-cpython-38/thrift
2023-09-12 13:38:19 -06:00

170 lines
5.9 KiB
Python

##
# This software was developed and / or modified by Raytheon Company,
# pursuant to Contract DG133W-05-CQ-1067 with the US Government.
#
# U.S. EXPORT CONTROLLED TECHNICAL DATA
# This software product contains export-restricted data whose
# export/transfer/disclosure is restricted by U.S. law. Dissemination
# to non-U.S. persons whether in the United States or abroad requires
# an export license or other authorization.
#
# Contractor Name: Raytheon Company
# Contractor Address: 6825 Pine Street, Suite 340
# Mail Stop B8
# Omaha, NE 68106
# 402.291.0100
#
# See the AWIPS II Master Rights File ("Master Rights File.pdf") for
# further licensing information.
##
#
# Provides a Python-based interface for subscribing to qpid queues and topics.
#
# SOFTWARE HISTORY
#
# Date Ticket# Engineer Description
# ------------- -------- ------------ --------------------------------------------
# Nov 17, 2010 njensen Initial Creation.
# Aug 15, 2013 2169 bkowal Optionally gzip decompress any data that is read.
# Aug 04, 2016 2416 tgurney Add queueStarted property
# Feb 16, 2017 6084 bsteffen Support ssl connections
# Sep 07, 2017 6175 tgurney Remove "decompressing" log message
# Jul 23, 2019 7724 mrichardson Upgrade Qpid to Qpid Proton
# Nov 04, 2019 7724 tgurney Fix topic creation
# Jun 24, 2020 8187 randerso Added qpid connection_id
#
from __future__ import print_function
import logging
import os
import os.path
import pwd
import socket
import zlib
from ssl import SSLContext, PROTOCOL_TLS
from proton import SSLDomain
from proton.handlers import MessagingHandler
from proton.reactor import Container
logging.basicConfig(level=logging.INFO, datefmt='%H:%M:%S',
format='[%(process)s] %(asctime)s %(levelname)s: %(message)s')
log = logging.getLogger('QpidSubscriber')
SSL_PASSWORD = 'password'
QPID_USERNAME = 'guest'
QPID_PASSWORD = 'guest'
class QpidSubscriber(MessagingHandler):
def __init__(self, host='127.0.0.1', port=5672, decompress=False, ssl=None, program="QpidSubscriber"):
super(QpidSubscriber, self).__init__(auto_accept=True)
#__init__ should only handle setting up properties;
# any connection and subscription actions should be handled
# by the reactor functions
self.queues = {}
self.scheme = 'amqp'
self.rest_scheme = 'https'
self.ssl_context = None
self.host = host
self.port = port
self.decompress = decompress
self.__queueStarted = False
self.__subscribed = False
pwuid = pwd.getpwuid(os.getuid())
if "QPID_SSL_CERT_DB" in os.environ:
certdbloc = os.environ["QPID_SSL_CERT_DB"]
else:
certdbloc = pwuid.pw_dir + "/.qpid/"
if "QPID_SSL_CERT_NAME" in os.environ:
certname = os.environ["QPID_SSL_CERT_NAME"]
else:
certname = QPID_USERNAME
certfile = os.path.join(certdbloc, certname + ".crt")
certkey = os.path.join(certdbloc, certname + ".key")
if ssl or (ssl is None and os.path.isfile(certfile) and os.path.isfile(certkey)):
self.scheme = "amqps"
self.rest_scheme = 'https'
self.ssl_context = SSLContext(PROTOCOL_TLS)
self.ssl_context.load_cert_chain(certfile, certkey)
self.cert_file = certfile
self.cert_key = certkey
self.url = '{}://{}:{}@{}:{}'.format(self.scheme, QPID_USERNAME, QPID_PASSWORD, self.host, self.port)
self.clientID = ":".join([
socket.gethostname(),
pwuid.pw_name,
program,
str(os.getpid()),
])
def topicSubscribe(self, topicName, callback):
self.topicName = topicName
self.callback = callback
Container(self).run()
def on_start(self, event):
'''
# if the queue is edex.alerts, set decompress to true always for now to
# maintain compatibility with existing python scripts.
'''
if self.topicName == 'edex.alerts':
self.decompress = True
self.container = event.container
queueName = 'amq.topic/' + self.topicName
self.ssl_domain = None
if self.scheme == "amqps" and self.cert_file and self.cert_key:
self.ssl_domain = SSLDomain(mode=SSLDomain.MODE_CLIENT)
self.ssl_domain.set_credentials(self.cert_file, self.cert_key, SSL_PASSWORD)
event.container.container_id = self.clientID
self.conn = event.container.connect(self.url, ssl_domain=self.ssl_domain)
self.receiver = event.container.create_receiver(self.conn, queueName)
self.__queueStarted = True
self.__subscribed = True
def on_message(self, event):
message = event.message
content = message.body
self.process_message(content)
if not self.__subscribed:
self.close()
def process_message(self, content):
if (self.decompress):
try:
# http://stackoverflow.com/questions/2423866/python-decompressing-gzip-chunk-by-chunk
d = zlib.decompressobj(16 + zlib.MAX_WBITS)
content = d.decompress(content)
except Exception:
# decompression failed, return the original content
pass
self.callback(content)
def close(self):
self.__queueStarted = False
self.unsubscribe()
try:
self.receiver.close()
self.conn.close()
except:
# already closed
pass
@property
def queueStarted(self):
return self.__queueStarted
@property
def subscribed(self):
return self.__subscribed
def unsubscribe(self):
self.__subscribed = False