mirror of
https://github.com/Unidata/python-awips.git
synced 2025-02-23 14:57:56 -05:00
- code brought over from the following raytheon repos and directories: - awips2 repo: - awips2/pythonPackages - awips2/edexOsgi/com.raytheon.uf.common.alertviz/pythonPackages - awips2/edexOsgi/com.raytheon.uf.common.mpe/pythonPackages - awips2/edexOsgi/com.raytheon.uf.common.dataplugin.text/pythonPackages - awips2/edexOsgi/com.raytheon.uf.common.dataplugin.grid/pythonPackages - awips2/edexOsgi/com.raytheon.uf.common.activetable/pythonPackages - awips2/edexOsgi/com.raytheon.uf.common.management/pythonPackages - awips2/edexOsgi/com.raytheon.uf.common.dataplugin.gfe/pythonPackages - awips2/edexOsgi/com.raytheon.uf.common.dataplugin.radar/pythonPackages - awips2/edexOsgi/com.raytheon.uf.common.site/pythonPackages - awips2-core repo: - awips2-core/common/com.raytheon.uf.common.auth/pythonPackages - awips2-core/common/com.raytheon.uf.common.message/pythonPackages - awips2-core/common/com.raytheon.uf.common.localization/pythonPackages - awips2-core/common/com.raytheon.uf.common.datastorage/pythonPackages - awips2-core/common/com.raytheon.uf.common.pointdata/pythonPackages - awips2-core/common/com.raythoen.uf.common.pypies/pythonPackages - awips2-core/common/com.raytheon.uf.common.dataaccess/pythonPackages - awips2-core/common/com.raytheon.uf.common.dataplugin.level/pythonPackages - awips2-core/common/com.raytheon.uf.common.serialization/pythonPackages - awips2-core/common/com.raytheon.uf.common.time/pythonPackages - awips2-core/common/com.raytheon.uf.common.dataplugin/pythonPackages - awips2-core/common/com.raytheon.uf.common.dataquery/pythonPackages - awips2-rpm repo: had to untar and unzip the thirft repo, then go into /lib/py and run `python setup.py build` and then copy in from the build/ subdirectory -foss/thrift-0.14.1/packaged/thrift-0.14.1/lib/py/build/lib.macosx-10.9-x86_64-cpython-38/thrift
170 lines
5.9 KiB
Python
170 lines
5.9 KiB
Python
##
|
|
# This software was developed and / or modified by Raytheon Company,
|
|
# pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
|
#
|
|
# U.S. EXPORT CONTROLLED TECHNICAL DATA
|
|
# This software product contains export-restricted data whose
|
|
# export/transfer/disclosure is restricted by U.S. law. Dissemination
|
|
# to non-U.S. persons whether in the United States or abroad requires
|
|
# an export license or other authorization.
|
|
#
|
|
# Contractor Name: Raytheon Company
|
|
# Contractor Address: 6825 Pine Street, Suite 340
|
|
# Mail Stop B8
|
|
# Omaha, NE 68106
|
|
# 402.291.0100
|
|
#
|
|
# See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
|
# further licensing information.
|
|
##
|
|
|
|
#
|
|
# Provides a Python-based interface for subscribing to qpid queues and topics.
|
|
#
|
|
# SOFTWARE HISTORY
|
|
#
|
|
# Date Ticket# Engineer Description
|
|
# ------------- -------- ------------ --------------------------------------------
|
|
# Nov 17, 2010 njensen Initial Creation.
|
|
# Aug 15, 2013 2169 bkowal Optionally gzip decompress any data that is read.
|
|
# Aug 04, 2016 2416 tgurney Add queueStarted property
|
|
# Feb 16, 2017 6084 bsteffen Support ssl connections
|
|
# Sep 07, 2017 6175 tgurney Remove "decompressing" log message
|
|
# Jul 23, 2019 7724 mrichardson Upgrade Qpid to Qpid Proton
|
|
# Nov 04, 2019 7724 tgurney Fix topic creation
|
|
# Jun 24, 2020 8187 randerso Added qpid connection_id
|
|
#
|
|
|
|
from __future__ import print_function
|
|
import logging
|
|
|
|
import os
|
|
import os.path
|
|
import pwd
|
|
import socket
|
|
import zlib
|
|
from ssl import SSLContext, PROTOCOL_TLS
|
|
|
|
from proton import SSLDomain
|
|
from proton.handlers import MessagingHandler
|
|
from proton.reactor import Container
|
|
|
|
logging.basicConfig(level=logging.INFO, datefmt='%H:%M:%S',
|
|
format='[%(process)s] %(asctime)s %(levelname)s: %(message)s')
|
|
log = logging.getLogger('QpidSubscriber')
|
|
|
|
SSL_PASSWORD = 'password'
|
|
QPID_USERNAME = 'guest'
|
|
QPID_PASSWORD = 'guest'
|
|
|
|
class QpidSubscriber(MessagingHandler):
|
|
|
|
def __init__(self, host='127.0.0.1', port=5672, decompress=False, ssl=None, program="QpidSubscriber"):
|
|
super(QpidSubscriber, self).__init__(auto_accept=True)
|
|
#__init__ should only handle setting up properties;
|
|
# any connection and subscription actions should be handled
|
|
# by the reactor functions
|
|
|
|
self.queues = {}
|
|
|
|
self.scheme = 'amqp'
|
|
self.rest_scheme = 'https'
|
|
self.ssl_context = None
|
|
self.host = host
|
|
self.port = port
|
|
self.decompress = decompress
|
|
self.__queueStarted = False
|
|
self.__subscribed = False
|
|
|
|
pwuid = pwd.getpwuid(os.getuid())
|
|
if "QPID_SSL_CERT_DB" in os.environ:
|
|
certdbloc = os.environ["QPID_SSL_CERT_DB"]
|
|
else:
|
|
certdbloc = pwuid.pw_dir + "/.qpid/"
|
|
if "QPID_SSL_CERT_NAME" in os.environ:
|
|
certname = os.environ["QPID_SSL_CERT_NAME"]
|
|
else:
|
|
certname = QPID_USERNAME
|
|
|
|
certfile = os.path.join(certdbloc, certname + ".crt")
|
|
certkey = os.path.join(certdbloc, certname + ".key")
|
|
if ssl or (ssl is None and os.path.isfile(certfile) and os.path.isfile(certkey)):
|
|
self.scheme = "amqps"
|
|
self.rest_scheme = 'https'
|
|
self.ssl_context = SSLContext(PROTOCOL_TLS)
|
|
self.ssl_context.load_cert_chain(certfile, certkey)
|
|
self.cert_file = certfile
|
|
self.cert_key = certkey
|
|
self.url = '{}://{}:{}@{}:{}'.format(self.scheme, QPID_USERNAME, QPID_PASSWORD, self.host, self.port)
|
|
self.clientID = ":".join([
|
|
socket.gethostname(),
|
|
pwuid.pw_name,
|
|
program,
|
|
str(os.getpid()),
|
|
])
|
|
|
|
def topicSubscribe(self, topicName, callback):
|
|
self.topicName = topicName
|
|
self.callback = callback
|
|
Container(self).run()
|
|
|
|
def on_start(self, event):
|
|
'''
|
|
# if the queue is edex.alerts, set decompress to true always for now to
|
|
# maintain compatibility with existing python scripts.
|
|
'''
|
|
if self.topicName == 'edex.alerts':
|
|
self.decompress = True
|
|
|
|
self.container = event.container
|
|
queueName = 'amq.topic/' + self.topicName
|
|
|
|
self.ssl_domain = None
|
|
if self.scheme == "amqps" and self.cert_file and self.cert_key:
|
|
self.ssl_domain = SSLDomain(mode=SSLDomain.MODE_CLIENT)
|
|
self.ssl_domain.set_credentials(self.cert_file, self.cert_key, SSL_PASSWORD)
|
|
|
|
event.container.container_id = self.clientID
|
|
self.conn = event.container.connect(self.url, ssl_domain=self.ssl_domain)
|
|
self.receiver = event.container.create_receiver(self.conn, queueName)
|
|
self.__queueStarted = True
|
|
self.__subscribed = True
|
|
|
|
def on_message(self, event):
|
|
message = event.message
|
|
content = message.body
|
|
self.process_message(content)
|
|
if not self.__subscribed:
|
|
self.close()
|
|
|
|
def process_message(self, content):
|
|
if (self.decompress):
|
|
try:
|
|
# http://stackoverflow.com/questions/2423866/python-decompressing-gzip-chunk-by-chunk
|
|
d = zlib.decompressobj(16 + zlib.MAX_WBITS)
|
|
content = d.decompress(content)
|
|
except Exception:
|
|
# decompression failed, return the original content
|
|
pass
|
|
self.callback(content)
|
|
|
|
def close(self):
|
|
self.__queueStarted = False
|
|
self.unsubscribe()
|
|
try:
|
|
self.receiver.close()
|
|
self.conn.close()
|
|
except:
|
|
# already closed
|
|
pass
|
|
|
|
@property
|
|
def queueStarted(self):
|
|
return self.__queueStarted
|
|
|
|
@property
|
|
def subscribed(self):
|
|
return self.__subscribed
|
|
|
|
def unsubscribe(self):
|
|
self.__subscribed = False
|