python-awips/awips/qpidingest.py
Shay Carter 6fc73fefc6 First pass at upgrading for v23.4.1 of AWIPS
- several small changes (whitespace/comment) for some python classes
- small changes for init.py that wasn't quite correct in v20
- updated thrift package
- updated a few classes for v23.4.1 changes:
  - QpidSubscriber.py
  - qpidingest.py
2024-04-01 13:25:43 -06:00

149 lines
6.5 KiB
Python

#===============================================================================
# qpidingest.py
#
# @author: Aaron Anderson
# @organization: NOAA/WDTB OU/CIMMS
# @version: 1.0 02/19/2010
# @requires: awips2-python and awips2-qpid-proton-python RPMs
#
# ***EDEX and QPID must be running for this module to work***
#
# DESCRIPTION:
# This module is used to connect to QPID and send messages to the external.dropbox queue
# which tells EDEX to ingest a data file from a specified path. This avoids having to copy
# a data file into an endpoint. Each message also contains a header which is used to determine
# which plugin should be used to decode the file. Each plugin has an xml file located in
# $EDEX_HOME/data/utility/edex_static/base/distribution that contains regular expressions
# that the header is compared to. When the header matches one of these regular expressions
# the file is decoded with that plugin. If you make changes to one of these xml files you
# must restart EDEX for the changes to take effect.
#
# NOTE: If the message is being sent but you do not see it being ingested in the EDEX log
# check the xml files to make sure the header you are passing matches one of the regular
# expressions. Beware of spaces, some regular expressions require spaces while others use
# a wildcard character so a space is optional. It seems you are better off having the space
# as this will be matched to both patterns. For the file in the example below,
# 20100218_185755_SAUS46KLOX.metar, I use SAUS46 KLOX as the header to make sure it matches.
#
#
# EXAMPLE:
# Simple example program:
#
#------------------------------------------------------------------------------
# import qpidingest
# #Tell EDEX to ingest a metar file from data_store. The filepath is
# #/data_store/20100218/metar/00/standard/20100218_005920_SAUS46KSEW.metar
#
# conn=qpidingest.IngestViaQPID() #defaults to localhost port 5672
#
# #If EDEX is not on the local machine you can make the connection as follows
# #conn=qpidingest.IngestViaQPID(host='<MACHINE NAME>',port=<PORT NUMBER>)
#
# conn.sendmessage('/data_store/20100218/metar/18/standard/20100218_185755_SAUS46KLOX.metar','SAUS46 KLOX')
# conn.close()
#-------------------------------------------------------------------------------
#
# SOFTWARE HISTORY
#
# Date Ticket# Engineer Description
# ------------- -------- ------------ ------------------------------------------
# Jun 13, 2013 16242 D. Friedman Add Qpid authentication info
# Mar 06, 2014 17907 D. Friedman Workaround for issue QPID-5569
# Feb 16, 2017 6084 bsteffen Support ssl connections
# Jun 14, 2019 7870 mrichardson MHS env workaround
# Jul 23, 2019 7724 mrichardson Upgrade Qpid to Qpid Proton
# Nov 06, 2019 7724 tgurney Remove the unnecessary
# QpidQueueManager
# Dec 12, 2019 7995 dgilling Revert interface changes from #7724.
# Jul 07, 2020 8187 randerso Added qpid connection_id
# Mar 08, 2021 7899 tbucher Updated JMS password to use new decryption class
# Apr 12, 2022 8677 tgurney Remove unnecessary password retrieval code
# Jun 28, 2023 2035875 dgilling Add additional CA verification to fix SSL/TLS
# connections to QPID.
#
#===============================================================================
import logging
import os
import pwd
import os.path
import socket
import proton
import proton.utils
import proton.reactor
log = logging.getLogger("qpidingest")
class QpidIngestException(Exception):
"""Exception subclass for broker communication exceptions."""
pass
class IngestViaQPID:
def __init__(self, host="localhost", port=5672, program="qpidingest"):
'''
Connect to QPID and make bindings to route message to external.dropbox queue
@param host: string hostname of computer running EDEX and QPID (default localhost)
@param port: integer port used to connect to QPID (default 5672)
'''
pwuid = pwd.getpwuid(os.getuid())
certdb = os.getenv("QPID_SSL_CERT_DB", os.path.join(pwuid.pw_dir, ".qpid"))
certname = os.getenv("QPID_SSL_CERT_NAME", "guest")
certfile = os.path.join(certdb, f"{certname}.crt")
keyfile = os.path.join(certdb, f"{certname}.key")
url = f"amqps://{host}:{port}"
ADDRESS = "external.dropbox"
ssl_domain = proton.SSLDomain(mode=proton.SSLDomain.MODE_CLIENT)
ssl_domain.set_trusted_ca_db(os.path.join(certdb, "root.crt"))
ssl_domain.set_peer_authentication(proton.SSLDomain.VERIFY_PEER)
ssl_domain.set_credentials(certfile, keyfile, None)
clientID = ":".join([
socket.gethostname(),
pwuid.pw_name,
program,
str(os.getpid()),
])
try:
container = proton.reactor.Container()
container.container_id = clientID
self._conn = proton.utils.BlockingConnection(url, ssl_domain=ssl_domain)
self._sender = self._conn.create_sender(ADDRESS)
log.debug("Connected to broker [%s], endpoint [%s].", url, ADDRESS)
except proton.ProtonException as e:
log.exception("Failed to connect to broker [%s].", url)
raise QpidIngestException("Failed to connect to broker [{}].".format(url)) from e
def sendmessage(self, filepath, header):
'''
This function sends a message to the external.dropbox queue providing the path
to the file to be ingested and a header to determine the plugin to be used to
decode the file.
@param filepath: string full path to file to be ingested
@param header: string header used to determine plugin decoder to use
'''
try:
self._sender.send(proton.Message(body=filepath, subject=header))
except proton.ProtonException as e:
log.exception("Failed to send file [%s] to broker.", filepath)
raise QpidIngestException("Failed to send file [{}] to broker.".format(filepath)) from e
def close(self):
'''
After all messages are sent call this function to close connection and make sure
there are no threads left open
'''
try:
self._sender.close()
self._conn.close()
log.debug("Disconnected from broker.")
except proton.ProtonException as e:
log.warning("Failed to disconnect from broker.", exc_info=True)
raise QpidIngestException("Failed to disconnect from broker.") from e