First pass at upgrading for v23.4.1 of AWIPS

- several small changes (whitespace/comment) for some python classes
- small changes for init.py that wasn't quite correct in v20
- updated thrift package
- updated a few classes for v23.4.1 changes:
  - QpidSubscriber.py
  - qpidingest.py
This commit is contained in:
Shay Carter 2024-04-01 13:25:43 -06:00
parent c492e79342
commit 6fc73fefc6
38 changed files with 195 additions and 134 deletions

View file

@ -33,34 +33,35 @@
# Jul 23, 2019 7724 mrichardson Upgrade Qpid to Qpid Proton
# Nov 04, 2019 7724 tgurney Fix topic creation
# Jun 24, 2020 8187 randerso Added qpid connection_id
# Feb 10. 2022 8747 dgilling Cleanup hardcoded passwords.
# Aug 17. 2023 2036091 dgilling Fix QPID connection issues with qpid-proton version
# 0.38.
#
from __future__ import print_function
import logging
import os
import os.path
import pwd
import socket
import zlib
from ssl import SSLContext, PROTOCOL_TLS
from proton import SSLDomain
from proton.handlers import MessagingHandler
from proton.reactor import Container
logging.basicConfig(level=logging.INFO, datefmt='%H:%M:%S',
format='[%(process)s] %(asctime)s %(levelname)s: %(message)s')
log = logging.getLogger('QpidSubscriber')
SSL_PASSWORD = 'password'
QPID_USERNAME = 'guest'
QPID_PASSWORD = 'guest'
class QpidSubscriber(MessagingHandler):
def __init__(self, host='127.0.0.1', port=5672, decompress=False, ssl=None, program="QpidSubscriber"):
super(QpidSubscriber, self).__init__(auto_accept=True)
super().__init__(auto_accept=True)
#__init__ should only handle setting up properties;
# any connection and subscription actions should be handled
# by the reactor functions
@ -68,8 +69,6 @@ class QpidSubscriber(MessagingHandler):
self.queues = {}
self.scheme = 'amqp'
self.rest_scheme = 'https'
self.ssl_context = None
self.host = host
self.port = port
self.decompress = decompress
@ -88,14 +87,13 @@ class QpidSubscriber(MessagingHandler):
certfile = os.path.join(certdbloc, certname + ".crt")
certkey = os.path.join(certdbloc, certname + ".key")
if ssl or (ssl is None and os.path.isfile(certfile) and os.path.isfile(certkey)):
cacert = os.path.join(certdbloc, "root.crt")
if ssl or (ssl is None and os.path.isfile(certfile) and os.path.isfile(certkey) and os.path.isfile(cacert)):
self.scheme = "amqps"
self.rest_scheme = 'https'
self.ssl_context = SSLContext(PROTOCOL_TLS)
self.ssl_context.load_cert_chain(certfile, certkey)
self.cert_file = certfile
self.cert_key = certkey
self.url = '{}://{}:{}@{}:{}'.format(self.scheme, QPID_USERNAME, QPID_PASSWORD, self.host, self.port)
self.ca_cert = cacert
self.url = f"{self.scheme}://{QPID_USERNAME}@{self.host}:{self.port}"
self.clientID = ":".join([
socket.gethostname(),
pwuid.pw_name,
@ -109,10 +107,8 @@ class QpidSubscriber(MessagingHandler):
Container(self).run()
def on_start(self, event):
'''
# if the queue is edex.alerts, set decompress to true always for now to
# maintain compatibility with existing python scripts.
'''
if self.topicName == 'edex.alerts':
self.decompress = True
@ -120,9 +116,11 @@ class QpidSubscriber(MessagingHandler):
queueName = 'amq.topic/' + self.topicName
self.ssl_domain = None
if self.scheme == "amqps" and self.cert_file and self.cert_key:
if self.scheme == "amqps" and self.cert_file and self.cert_key and self.ca_cert:
self.ssl_domain = SSLDomain(mode=SSLDomain.MODE_CLIENT)
self.ssl_domain.set_credentials(self.cert_file, self.cert_key, SSL_PASSWORD)
self.ssl_domain.set_trusted_ca_db(self.ca_cert)
self.ssl_domain.set_peer_authentication(SSLDomain.VERIFY_PEER)
self.ssl_domain.set_credentials(self.cert_file, self.cert_key, None)
event.container.container_id = self.clientID
self.conn = event.container.connect(self.url, ssl_domain=self.ssl_domain)

View file

@ -97,4 +97,4 @@ def __cloneRequest(request):
levels = request.getLevels(),
locationNames = request.getLocationNames(),
envelope = request.getEnvelope(),
**request.getIdentifiers())
**request.getIdentifiers())

View file

@ -206,4 +206,4 @@ class DataQueue(object):
return self
def __exit__(self, *unused):
self.close()
self.close()

View file

@ -29,4 +29,4 @@
# 08/10/17 5731 bsteffen Initial Creation.
__all__ = [
]
]

View file

@ -56,6 +56,10 @@
# QpidQueueManager
# Dec 12, 2019 7995 dgilling Revert interface changes from #7724.
# Jul 07, 2020 8187 randerso Added qpid connection_id
# Mar 08, 2021 7899 tbucher Updated JMS password to use new decryption class
# Apr 12, 2022 8677 tgurney Remove unnecessary password retrieval code
# Jun 28, 2023 2035875 dgilling Add additional CA verification to fix SSL/TLS
# connections to QPID.
#
#===============================================================================
@ -76,7 +80,9 @@ class QpidIngestException(Exception):
"""Exception subclass for broker communication exceptions."""
pass
class IngestViaQPID:
def __init__(self, host="localhost", port=5672, program="qpidingest"):
'''
Connect to QPID and make bindings to route message to external.dropbox queue
@ -87,20 +93,21 @@ class IngestViaQPID:
pwuid = pwd.getpwuid(os.getuid())
certdb = os.getenv("QPID_SSL_CERT_DB", os.path.join(pwuid.pw_dir, ".qpid"))
certname = os.getenv("QPID_SSL_CERT_NAME", "guest")
cert_password = os.getenv("QPID_SSL_CERT_PASSWORD", "password")
certfile = os.path.join(certdb, f"{certname}.crt")
keyfile = os.path.join(certdb, f"{certname}.key")
url = f"amqps://{host}:{port}"
ADDRESS = "external.dropbox"
ssl_domain = proton.SSLDomain(mode=proton.SSLDomain.MODE_CLIENT)
ssl_domain.set_credentials(certfile, keyfile, cert_password)
ssl_domain.set_trusted_ca_db(os.path.join(certdb, "root.crt"))
ssl_domain.set_peer_authentication(proton.SSLDomain.VERIFY_PEER)
ssl_domain.set_credentials(certfile, keyfile, None)
clientID = ":".join([
socket.gethostname(),
pwuid.pw_name,
program,
str(os.getpid()),
socket.gethostname(),
pwuid.pw_name,
program,
str(os.getpid()),
])
try:

View file

@ -45,4 +45,4 @@ import Record
avh = AlertVizHandler.AlertVizHandler(host=os.getenv("BROKER_ADDR","localhost"), port=9581, category='LOCAL', source='ANNOUNCER', level=logging.NOTSET)
record = Record.Record(10)
avh.emit(record)

View file

@ -1,4 +1,4 @@
#
##
# This software was developed and / or modified by Raytheon Company,
# pursuant to Contract DG133W-05-CQ-1067 with the US Government.
#

View file

@ -1,4 +1,4 @@
#
##
# This software was developed and / or modified by Raytheon Company,
# pursuant to Contract DG133W-05-CQ-1067 with the US Government.
#

View file

@ -29,6 +29,7 @@
# ------------ ---------- ----------- --------------------------
# 07/28/11 dgilling Initial Creation.
# 12/02/13 2537 bsteffen Serialize empty enum sets.
#
#
#

View file

@ -23,7 +23,8 @@
__all__ = [
'com',
'gov',
'java'
'java',
'org'
]

View file

@ -21,7 +21,7 @@
# File auto-generated by PythonFileGenerator
__all__ = [
'raytheon',
'raytheon'
]

View file

@ -1,4 +1,4 @@
name: python-awips-v20
name: python-awips-v23
channels:
- https://conda.anaconda.org/conda-forge
dependencies:
@ -23,4 +23,4 @@
- shapely
- six
- pip
- python-awips
# - python-awips

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

BIN
thrift/package-info.class Normal file

Binary file not shown.

View file

@ -21,7 +21,7 @@
The main idea of the server is to receive and send requests
only from the main thread.
The thread poool should be sized for concurrent tasks, not
The thread pool should be sized for concurrent tasks, not
maximum connections
"""
@ -151,6 +151,10 @@ class Connection(object):
while len(self._rbuf) >= self._reading.end:
if self._reading.is_header:
mlen, = struct.unpack('!i', self._rbuf[:4])
if mlen < 0:
logger.error('could not read the head from frame')
self.close()
break
self._reading = Message(self._reading.end, mlen, False)
self.status = WAIT_MESSAGE
else:
@ -249,6 +253,7 @@ class TNonblockingServer(object):
self._read, self._write = socket.socketpair()
self.prepared = False
self._stop = False
self.poll = select.poll() if hasattr(select, 'poll') else None
def setNumThreads(self, num):
"""Set the number of worker threads that should be created."""
@ -314,13 +319,53 @@ class TNonblockingServer(object):
else:
return select.select(readable, writable, readable) + (True,)
def _poll_select(self):
"""Does poll on open connections, if available."""
remaining = []
self.poll.register(self.socket.handle.fileno(), select.POLLIN | select.POLLRDNORM)
self.poll.register(self._read.fileno(), select.POLLIN | select.POLLRDNORM)
for i, connection in list(self.clients.items()):
if connection.is_readable():
self.poll.register(connection.fileno(), select.POLLIN | select.POLLRDNORM | select.POLLERR | select.POLLHUP | select.POLLNVAL)
if connection.remaining or connection.received:
remaining.append(connection.fileno())
if connection.is_writeable():
self.poll.register(connection.fileno(), select.POLLOUT | select.POLLWRNORM)
if connection.is_closed():
try:
self.poll.unregister(i)
except KeyError:
logger.debug("KeyError in unregistering connections...")
del self.clients[i]
if remaining:
return remaining, [], [], False
rlist = []
wlist = []
xlist = []
pollres = self.poll.poll()
for fd, event in pollres:
if event & (select.POLLERR | select.POLLHUP | select.POLLNVAL):
xlist.append(fd)
elif event & (select.POLLOUT | select.POLLWRNORM):
wlist.append(fd)
elif event & (select.POLLIN | select.POLLRDNORM):
rlist.append(fd)
else: # should be impossible
logger.debug("reached an impossible state in _poll_select")
xlist.append(fd)
return rlist, wlist, xlist, True
def handle(self):
"""Handle requests.
WARNING! You must call prepare() BEFORE calling handle()
"""
assert self.prepared, "You have to call prepare before handle"
rset, wset, xset, selected = self._select()
rset, wset, xset, selected = self._select() if not self.poll else self._poll_select()
for readable in rset:
if readable == self._read.fileno():
# don't care i just need to clean readable flag
@ -339,6 +384,8 @@ class TNonblockingServer(object):
connection.read()
if connection.received:
connection.status = WAIT_PROCESS
if self.poll:
self.poll.unregister(connection.fileno())
msg = connection.received.popleft()
itransport = TTransport.TMemoryBuffer(msg.buffer, msg.offset)
otransport = TTransport.TMemoryBuffer()
@ -350,7 +397,6 @@ class TNonblockingServer(object):
self.clients[writeable].write()
for oob in xset:
self.clients[oob].close()
del self.clients[oob]
def close(self):
"""Closes the server."""

View file

@ -42,6 +42,11 @@ class TProcessPoolServer(TServer):
self.stopCondition = Condition()
self.postForkCallback = None
def __getstate__(self):
state = self.__dict__.copy()
state['workers'] = None
return state
def setPostForkCallback(self, callback):
if not callable(callback):
raise TypeError("This is not a callback!")

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View file

@ -91,6 +91,7 @@ class THttpClient(TTransportBase):
self.__http_response = None
self.__timeout = None
self.__custom_headers = None
self.headers = None
@staticmethod
def basic_proxy_auth_header(proxy):
@ -98,7 +99,7 @@ class THttpClient(TTransportBase):
return None
ap = "%s:%s" % (urllib.parse.unquote(proxy.username),
urllib.parse.unquote(proxy.password))
cr = base64.b64encode(ap).strip()
cr = base64.b64encode(ap.encode()).strip()
return "Basic " + cr
def using_proxy(self):
@ -175,6 +176,12 @@ class THttpClient(TTransportBase):
for key, val in six.iteritems(self.__custom_headers):
self.__http.putheader(key, val)
# Saves the cookie sent by the server in the previous response.
# HTTPConnection.putheader can only be called after a request has been
# started, and before it's been sent.
if self.headers and 'Set-Cookie' in self.headers:
self.__http.putheader('Cookie', self.headers['Set-Cookie'])
self.__http.endheaders()
# Write payload
@ -185,7 +192,3 @@ class THttpClient(TTransportBase):
self.code = self.__http_response.status
self.message = self.__http_response.reason
self.headers = self.__http_response.msg
# Saves the cookie sent by the server response
if 'Set-Cookie' in self.headers:
self.__http.putheader('Cookie', self.headers['Set-Cookie'])

View file

@ -94,6 +94,9 @@ class TSocket(TSocketBase):
if exc.errno in (errno.EWOULDBLOCK, errno.EAGAIN):
return True
return False
except ValueError:
# SSLSocket fails on recv with non-zero flags; fallback to the old behavior
return True
finally:
self.handle.settimeout(original_timeout)
@ -128,9 +131,9 @@ class TSocket(TSocketBase):
for family, socktype, _, _, sockaddr in addrs:
handle = self._do_open(family, socktype)
# TCP_KEEPALIVE
# TCP keep-alive
if self._socket_keepalive:
handle.setsockopt(socket.IPPROTO_TCP, socket.SO_KEEPALIVE, 1)
handle.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
handle.settimeout(self._timeout)
try:
@ -226,6 +229,7 @@ class TServerSocket(TSocketBase, TServerTransportBase):
os.unlink(res[4])
self.handle = socket.socket(res[0], res[1])
self.handle.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
self.handle.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if hasattr(self.handle, 'settimeout'):
self.handle.settimeout(None)

View file

@ -398,6 +398,9 @@ class TSaslClientTransport(TTransportBase, CReadableTransport):
"Bad SASL negotiation status: %d (%s)"
% (status, challenge))
def isOpen(self):
return self.transport.isOpen()
def send_sasl_msg(self, status, body):
header = pack(">BI", status, len(body))
self.transport.write(header + body)