python-awips/awips/QpidSubscriber.py
Shay Carter 6fc73fefc6 First pass at upgrading for v23.4.1 of AWIPS
- several small changes (whitespace/comment) for some python classes
- small changes for init.py that wasn't quite correct in v20
- updated thrift package
- updated a few classes for v23.4.1 changes:
  - QpidSubscriber.py
  - qpidingest.py
2024-04-01 13:25:43 -06:00

168 lines
5.9 KiB
Python

##
# This software was developed and / or modified by Raytheon Company,
# pursuant to Contract DG133W-05-CQ-1067 with the US Government.
#
# U.S. EXPORT CONTROLLED TECHNICAL DATA
# This software product contains export-restricted data whose
# export/transfer/disclosure is restricted by U.S. law. Dissemination
# to non-U.S. persons whether in the United States or abroad requires
# an export license or other authorization.
#
# Contractor Name: Raytheon Company
# Contractor Address: 6825 Pine Street, Suite 340
# Mail Stop B8
# Omaha, NE 68106
# 402.291.0100
#
# See the AWIPS II Master Rights File ("Master Rights File.pdf") for
# further licensing information.
##
#
# Provides a Python-based interface for subscribing to qpid queues and topics.
#
# SOFTWARE HISTORY
#
# Date Ticket# Engineer Description
# ------------- -------- ------------ --------------------------------------------
# Nov 17, 2010 njensen Initial Creation.
# Aug 15, 2013 2169 bkowal Optionally gzip decompress any data that is read.
# Aug 04, 2016 2416 tgurney Add queueStarted property
# Feb 16, 2017 6084 bsteffen Support ssl connections
# Sep 07, 2017 6175 tgurney Remove "decompressing" log message
# Jul 23, 2019 7724 mrichardson Upgrade Qpid to Qpid Proton
# Nov 04, 2019 7724 tgurney Fix topic creation
# Jun 24, 2020 8187 randerso Added qpid connection_id
# Feb 10. 2022 8747 dgilling Cleanup hardcoded passwords.
# Aug 17. 2023 2036091 dgilling Fix QPID connection issues with qpid-proton version
# 0.38.
#
from __future__ import print_function
import logging
import os
import os.path
import pwd
import socket
import zlib
from proton import SSLDomain
from proton.handlers import MessagingHandler
from proton.reactor import Container
logging.basicConfig(level=logging.INFO, datefmt='%H:%M:%S',
format='[%(process)s] %(asctime)s %(levelname)s: %(message)s')
log = logging.getLogger('QpidSubscriber')
QPID_USERNAME = 'guest'
class QpidSubscriber(MessagingHandler):
def __init__(self, host='127.0.0.1', port=5672, decompress=False, ssl=None, program="QpidSubscriber"):
super().__init__(auto_accept=True)
#__init__ should only handle setting up properties;
# any connection and subscription actions should be handled
# by the reactor functions
self.queues = {}
self.scheme = 'amqp'
self.host = host
self.port = port
self.decompress = decompress
self.__queueStarted = False
self.__subscribed = False
pwuid = pwd.getpwuid(os.getuid())
if "QPID_SSL_CERT_DB" in os.environ:
certdbloc = os.environ["QPID_SSL_CERT_DB"]
else:
certdbloc = pwuid.pw_dir + "/.qpid/"
if "QPID_SSL_CERT_NAME" in os.environ:
certname = os.environ["QPID_SSL_CERT_NAME"]
else:
certname = QPID_USERNAME
certfile = os.path.join(certdbloc, certname + ".crt")
certkey = os.path.join(certdbloc, certname + ".key")
cacert = os.path.join(certdbloc, "root.crt")
if ssl or (ssl is None and os.path.isfile(certfile) and os.path.isfile(certkey) and os.path.isfile(cacert)):
self.scheme = "amqps"
self.cert_file = certfile
self.cert_key = certkey
self.ca_cert = cacert
self.url = f"{self.scheme}://{QPID_USERNAME}@{self.host}:{self.port}"
self.clientID = ":".join([
socket.gethostname(),
pwuid.pw_name,
program,
str(os.getpid()),
])
def topicSubscribe(self, topicName, callback):
self.topicName = topicName
self.callback = callback
Container(self).run()
def on_start(self, event):
# if the queue is edex.alerts, set decompress to true always for now to
# maintain compatibility with existing python scripts.
if self.topicName == 'edex.alerts':
self.decompress = True
self.container = event.container
queueName = 'amq.topic/' + self.topicName
self.ssl_domain = None
if self.scheme == "amqps" and self.cert_file and self.cert_key and self.ca_cert:
self.ssl_domain = SSLDomain(mode=SSLDomain.MODE_CLIENT)
self.ssl_domain.set_trusted_ca_db(self.ca_cert)
self.ssl_domain.set_peer_authentication(SSLDomain.VERIFY_PEER)
self.ssl_domain.set_credentials(self.cert_file, self.cert_key, None)
event.container.container_id = self.clientID
self.conn = event.container.connect(self.url, ssl_domain=self.ssl_domain)
self.receiver = event.container.create_receiver(self.conn, queueName)
self.__queueStarted = True
self.__subscribed = True
def on_message(self, event):
message = event.message
content = message.body
self.process_message(content)
if not self.__subscribed:
self.close()
def process_message(self, content):
if (self.decompress):
try:
# http://stackoverflow.com/questions/2423866/python-decompressing-gzip-chunk-by-chunk
d = zlib.decompressobj(16 + zlib.MAX_WBITS)
content = d.decompress(content)
except Exception:
# decompression failed, return the original content
pass
self.callback(content)
def close(self):
self.__queueStarted = False
self.unsubscribe()
try:
self.receiver.close()
self.conn.close()
except:
# already closed
pass
@property
def queueStarted(self):
return self.__queueStarted
@property
def subscribed(self):
return self.__subscribed
def unsubscribe(self):
self.__subscribed = False