diff --git a/awips/AlertVizHandler.py b/awips/AlertVizHandler.py deleted file mode 100644 index 8742eb0..0000000 --- a/awips/AlertVizHandler.py +++ /dev/null @@ -1,52 +0,0 @@ -## -## - - -# -# Pure python logging mechanism for logging to AlertViz from -# pure python (ie not JEP). DO NOT USE IN PYTHON CALLED -# FROM JAVA. -# -# -# SOFTWARE HISTORY -# -# Date Ticket# Engineer Description -# ------------ ---------- ----------- -------------------------- -# 08/18/10 njensen Initial Creation. -# -# -# - -import logging -from . import NotificationMessage - -class AlertVizHandler(logging.Handler): - - def __init__(self, host='localhost', port=61999, category='LOCAL', source='ANNOUNCER', level=logging.NOTSET): - logging.Handler.__init__(self, level) - self._category = category - self._host = host - self._port = port - self._source = source - - - def emit(self, record): - "Implements logging.Handler's interface. Record argument is a logging.LogRecord." - priority = None - if record.levelno >= 50: - priority = 'CRITICAL' - elif record.levelno >= 40: - priority = 'SIGNIFICANT' - elif record.levelno >= 30: - priority = 'PROBLEM' - elif record.levelno >= 20: - priority = 'EVENTA' - elif record.levelno >= 10: - priority = 'EVENTB' - else: - priority = 'VERBOSE' - - msg = self.format(record) - - notify = NotificationMessage.NotificationMessage(self._host, self._port, msg, priority, self._category, self._source) - notify.send() diff --git a/awips/ConfigFileUtil.py b/awips/ConfigFileUtil.py deleted file mode 100644 index 83c6733..0000000 --- a/awips/ConfigFileUtil.py +++ /dev/null @@ -1,39 +0,0 @@ -## -## - -# -# A set of utility functions for dealing with configuration files. -# -# -# -# SOFTWARE HISTORY -# -# Date Ticket# Engineer Description -# ------------ ---------- ----------- -------------------------- -# 09/27/10 dgilling Initial Creation. -# -# -# - - -def parseKeyValueFile(fileName): - propDict= dict() - - try: - propFile= open(fileName, "rU") - for propLine in propFile: - propDef= propLine.strip() - if len(propDef) == 0: - continue - if propDef[0] in ( '#' ): - continue - punctuation= [ propDef.find(c) for c in ':= ' ] + [ len(propDef) ] - found= min( [ pos for pos in punctuation if pos != -1 ] ) - name= propDef[:found].rstrip() - value= propDef[found:].lstrip(":= ").rstrip() - propDict[name]= value - propFile.close() - except: - pass - - return propDict \ No newline at end of file diff --git a/awips/DateTimeConverter.py b/awips/DateTimeConverter.py index dada99a..05d1357 100644 --- a/awips/DateTimeConverter.py +++ b/awips/DateTimeConverter.py @@ -1,6 +1,3 @@ -# # -# # - # # Functions for converting between the various "Java" dynamic serialize types # used by EDEX to the native python time datetime. diff --git a/awips/NotificationMessage.py b/awips/NotificationMessage.py deleted file mode 100755 index 5b6a975..0000000 --- a/awips/NotificationMessage.py +++ /dev/null @@ -1,160 +0,0 @@ -import ctypes -from . import stomp -import socket -import sys -import time -import threading -import xml.etree.ElementTree as ET - -from . import ThriftClient -from dynamicserialize.dstypes.com.raytheon.uf.common.alertviz import AlertVizRequest - -# -# Provides a capability of constructing notification messages and sending -# them to a STOMP data source. -# -# -# SOFTWARE HISTORY -# -# Date Ticket# Engineer Description -# ------------ ---------- ----------- -------------------------- -# 09/30/08 chammack Initial Creation. -# 11/03/10 5849 cjeanbap Moved to awips package from -# com.raytheon.uf.tools.cli -# 01/07/11 5645 cjeanbap Added audio file to Status Message. -# 05/27/11 3050 cjeanbap Added if-statement to check Priority -# value -# 07/27/15 4654 skorolev Added filters -# 11/11/15 5120 rferrel Cannot serialize empty filters. -# -class NotificationMessage: - - priorityMap = { - 0: 'CRITICAL', - 1: 'SIGNIFICANT', - 2: 'PROBLEM', - 3: 'EVENTA', - 4: 'EVENTB', - 5: 'VERBOSE'} - - def __init__(self, host='localhost', port=61999, message='', priority='PROBLEM', category="LOCAL", source="ANNOUNCER", audioFile="NONE", filters=None): - self.host = host - self.port = port - self.message = message - self.audioFile = audioFile - self.source = source - self.category = category - self.filters = filters - - priorityInt = None - - try: - priorityInt = int(priority) - except: - pass - - if priorityInt is None: - #UFStatus.java contains mapping of Priority to Logging level mapping - if priority == 'CRITICAL' or priority == 'FATAL': - priorityInt = int(0) - elif priority == 'SIGNIFICANT' or priority == 'ERROR': - priorityInt = int(1) - elif priority == 'PROBLEM' or priority == 'WARN': - priorityInt = int(2) - elif priority == 'EVENTA' or priority == 'INFO': - priorityInt = int(3) - elif priority == 'EVENTB': - priorityInt = int(4) - elif priority == 'VERBOSE' or priority == 'DEBUG': - priorityInt = int(5) - - if (priorityInt < 0 or priorityInt > 5): - print("Error occurred, supplied an invalid Priority value: " + str(priorityInt)) - print("Priority values are 0, 1, 2, 3, 4 and 5.") - sys.exit(1) - - if priorityInt is not None: - self.priority = self.priorityMap[priorityInt] - else: - self.priority = priority - - def connection_timeout(self, connection): - if (connection is not None and not connection.is_connected()): - print("Connection Retry Timeout") - for tid, tobj in list(threading._active.items()): - if tobj.name is "MainThread": - res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(SystemExit)) - if res != 0 and res != 1: - # problem, reset state - ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0) - - def send(self): - # depending on the value of the port number indicates the distribution - # of the message to AlertViz - # 9581 is global distribution thru ThriftClient to Edex - # 61999 is local distribution - if (int(self.port) == 61999): - # use stomp.py - conn = stomp.Connection(host_and_ports=[(self.host, 61999)]) - timeout = threading.Timer(5.0, self.connection_timeout, [conn]) - - try: - timeout.start(); - conn.start() - finally: - timeout.cancel() - - conn.connect() - - sm = ET.Element("statusMessage") - sm.set("machine", socket.gethostname()) - sm.set("priority", self.priority) - sm.set("category", self.category) - sm.set("sourceKey", self.source) - sm.set("audioFile", self.audioFile) - if self.filters is not None and len(self.filters) > 0: - sm.set("filters", self.filters) - msg = ET.SubElement(sm, "message") - msg.text = self.message - # details = ET.SubElement(sm, "details") - msg = ET.tostring(sm, "UTF-8") - - try : - conn.send(msg, destination='/queue/messages') - time.sleep(2) - finally: - conn.stop() - else: - # use ThriftClient - alertVizRequest = createRequest(self.message, self.priority, self.source, self.category, self.audioFile, self.filters) - thriftClient = ThriftClient.ThriftClient(self.host, self.port, "/services") - - serverResponse = None - try: - serverResponse = thriftClient.sendRequest(alertVizRequest) - except Exception as ex: - print("Caught exception submitting AlertVizRequest: ", str(ex)) - - if (serverResponse != "None"): - print("Error occurred submitting Notification Message to AlertViz receiver: ", serverResponse) - sys.exit(1) - else: - print("Response: " + str(serverResponse)) - -def createRequest(message, priority, source, category, audioFile, filters): - obj = AlertVizRequest() - - obj.setMachine(socket.gethostname()) - obj.setPriority(priority) - obj.setCategory(category) - obj.setSourceKey(source) - obj.setMessage(message) - if (audioFile is not None): - obj.setAudioFile(audioFile) - else: - obj.setAudioFile('\0') - obj.setFilters(filters) - return obj - -if __name__ == '__main__': - main() diff --git a/awips/QpidSubscriber.py b/awips/QpidSubscriber.py index 06c4153..9f7a675 100644 --- a/awips/QpidSubscriber.py +++ b/awips/QpidSubscriber.py @@ -1,6 +1,3 @@ -## -## - # # Provides a Python-based interface for subscribing to qpid queues and topics. # @@ -26,6 +23,7 @@ import zlib from Queue import Empty from qpid.exceptions import Closed + class QpidSubscriber: def __init__(self, host='127.0.0.1', port=5672, decompress=False, ssl=None): diff --git a/awips/__init__.py b/awips/__init__.py index 636de3c..cd02a75 100644 --- a/awips/__init__.py +++ b/awips/__init__.py @@ -1,7 +1,3 @@ -## -## - - # # __init__.py for awips package # diff --git a/awips/qpidingest.py b/awips/qpidingest.py index 6ed3f42..615ca10 100644 --- a/awips/qpidingest.py +++ b/awips/qpidingest.py @@ -1,4 +1,4 @@ -#=============================================================================== +# =============================================================================== # qpidingest.py # # @author: Aaron Anderson @@ -38,7 +38,7 @@ # EXAMPLE: # Simple example program: # -#------------------------------------------------------------------------------ +# ------------------------------------------------------------------------------ # import qpidingest # #Tell EDEX to ingest a metar file from data_store. The filepath is # #/data_store/20100218/metar/00/standard/20100218_005920_SAUS46KSEW.metar @@ -50,7 +50,7 @@ # # conn.sendmessage('/data_store/20100218/metar/18/standard/20100218_185755_SAUS46KLOX.metar','SAUS46 KLOX') # conn.close() -#------------------------------------------------------------------------------- +# ------------------------------------------------------------------------------- # # SOFTWARE HISTORY # @@ -61,7 +61,7 @@ # 03/06/2014 DR 17907 D. Friedman Workaround for issue QPID-5569 # 02/16/2017 DR 6084 bsteffen Support ssl connections # -#=============================================================================== +# =============================================================================== import os import os.path @@ -74,6 +74,7 @@ from qpid.datatypes import Message, uuid4 QPID_USERNAME = 'guest' QPID_PASSWORD = 'guest' + class IngestViaQPID: def __init__(self, host='localhost', port=5672, ssl=None): ''' diff --git a/awips/stomp.py b/awips/stomp.py deleted file mode 100644 index 2327e42..0000000 --- a/awips/stomp.py +++ /dev/null @@ -1,921 +0,0 @@ -from __future__ import print_function -import hashlib -import math -import random -import re -import socket -import sys -import threading -import time -import xml.dom.minidom -try: - from StringIO import StringIO -except ImportError: - from io import StringIO -from functools import reduce -try: - import _thread -except ImportError: - import thread as _thread - -"""Stomp Protocol Connectivity - - This provides basic connectivity to a message broker supporting the 'stomp' protocol. - At the moment ACK, SEND, SUBSCRIBE, UNSUBSCRIBE, BEGIN, ABORT, COMMIT, CONNECT and DISCONNECT operations - are supported. - - This changes the previous version which required a listener per subscription -- now a listener object - just calls the 'addlistener' method and will receive all messages sent in response to all/any subscriptions. - (The reason for the change is that the handling of an 'ack' becomes problematic unless the listener mechanism - is decoupled from subscriptions). - - Note that you must 'start' an instance of Connection to begin receiving messages. For example: - - conn = stomp.Connection([('localhost', 62003)], 'myuser', 'mypass') - conn.start() - - Meta-Data - --------- - Author: Jason R Briggs - License: http://www.apache.org/licenses/LICENSE-2.0 - Start Date: 2005/12/01 - Last Revision Date: $Date: 2008/09/11 00:16 $ - - Notes/Attribution - ----------------- - * uuid method courtesy of Carl Free Jr: - http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/213761 - * patch from Andreas Schobel - * patches from Julian Scheid of Rising Sun Pictures (http://open.rsp.com.au) - * patch from Fernando - * patches from Eugene Strulyov - - Updates - ------- - * 2007/03/31 : (Andreas Schobel) patch to fix newlines problem in ActiveMQ 4.1 - * 2007/09 : (JRB) updated to get stomp.py working in Jython as well as Python - * 2007/09/05 : (Julian Scheid) patch to allow sending custom headers - * 2007/09/18 : (JRB) changed code to use logging instead of just print. added logger for jython to work - * 2007/09/18 : (Julian Scheid) various updates, including: - - change incoming message handling so that callbacks are invoked on the listener not only for MESSAGE, but also for - CONNECTED, RECEIPT and ERROR frames. - - callbacks now get not only the payload but any headers specified by the server - - all outgoing messages now sent via a single method - - only one connection used - - change to use thread instead of threading - - sends performed on the calling thread - - receiver loop now deals with multiple messages in one received chunk of data - - added reconnection attempts and connection fail-over - - changed defaults for "user" and "passcode" to None instead of empty string (fixed transmission of those values) - - added readline support - * 2008/03/26 : (Fernando) added cStringIO for faster performance on large messages - * 2008/09/10 : (Eugene) remove lower() on headers to support case-sensitive header names - * 2008/09/11 : (JRB) fix incompatibilities with RabbitMQ, add wait for socket-connect - * 2008/10/28 : (Eugene) add jms map (from stomp1.1 ideas) - * 2008/11/25 : (Eugene) remove superfluous (incorrect) locking code - * 2009/02/05 : (JRB) remove code to replace underscores with dashes in header names (causes a problem in rabbit-mq) - * 2009/03/29 : (JRB) minor change to add logging config file - (JRB) minor change to add socket timeout, suggested by Israel - * 2009/04/01 : (Gavin) patch to change md5 to hashlib (for 2.6 compatibility) - * 2009/04/02 : (Fernando Ciciliati) fix overflow bug when waiting too long to connect to the broker - -""" - -# -# stomp.py version number -# -_version = 1.8 - - -def _uuid( *args ): - """ - uuid courtesy of Carl Free Jr: - (http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/213761) - """ - - t = int( time.time() * 1000 ) - r = int( random.random() * 100000000000000000 ) - - try: - a = socket.gethostbyname( socket.gethostname() ) - except: - # if we can't get a network address, just imagine one - a = random.random() * 100000000000000000 - data = str(t) + ' ' + str(r) + ' ' + str(a) + ' ' + str(args) - md5 = hashlib.md5() - md5.update(data) - data = md5.hexdigest() - return data - - -class DevNullLogger(object): - """ - dummy logging class for environments without the logging module - """ - def log(self, msg): - print(msg) - - def devnull(self, msg): - pass - - debug = devnull - info = devnull - warning = log - error = log - critical = log - exception = log - - def isEnabledFor(self, lvl): - return False - - -# -# add logging if available -# -try: - import logging - import logging.config - logging.config.fileConfig("stomp.log.conf") - log = logging.getLogger('root') -except: - log = DevNullLogger() - - -class ConnectionClosedException(Exception): - """ - Raised in the receiver thread when the connection has been closed - by the server. - """ - pass - - -class NotConnectedException(Exception): - """ - Raised by Connection.__send_frame when there is currently no server - connection. - """ - pass - - -class ConnectionListener(object): - """ - This class should be used as a base class for objects registered - using Connection.add_listener(). - """ - def on_connecting(self, host_and_port): - """ - Called by the STOMP connection once a TCP/IP connection to the - STOMP server has been established or re-established. Note that - at this point, no connection has been established on the STOMP - protocol level. For this, you need to invoke the "connect" - method on the connection. - - \param host_and_port a tuple containing the host name and port - number to which the connection has been established. - """ - pass - - def on_connected(self, headers, body): - """ - Called by the STOMP connection when a CONNECTED frame is - received, that is after a connection has been established or - re-established. - - \param headers a dictionary containing all headers sent by the - server as key/value pairs. - - \param body the frame's payload. This is usually empty for - CONNECTED frames. - """ - pass - - def on_disconnected(self): - """ - Called by the STOMP connection when a TCP/IP connection to the - STOMP server has been lost. No messages should be sent via - the connection until it has been reestablished. - """ - pass - - def on_message(self, headers, body): - """ - Called by the STOMP connection when a MESSAGE frame is - received. - - \param headers a dictionary containing all headers sent by the - server as key/value pairs. - - \param body the frame's payload - the message body. - """ - pass - - def on_receipt(self, headers, body): - """ - Called by the STOMP connection when a RECEIPT frame is - received, sent by the server if requested by the client using - the 'receipt' header. - - \param headers a dictionary containing all headers sent by the - server as key/value pairs. - - \param body the frame's payload. This is usually empty for - RECEIPT frames. - """ - pass - - def on_error(self, headers, body): - """ - Called by the STOMP connection when an ERROR frame is - received. - - \param headers a dictionary containing all headers sent by the - server as key/value pairs. - - \param body the frame's payload - usually a detailed error - description. - """ - pass - - -class Connection(object): - """ - Represents a STOMP client connection. - """ - - def __init__(self, - host_and_ports = [ ('localhost', 61613) ], - user = None, - passcode = None, - prefer_localhost = True, - try_loopback_connect = True, - reconnect_sleep_initial = 0.1, - reconnect_sleep_increase = 0.5, - reconnect_sleep_jitter = 0.1, - reconnect_sleep_max = 60.0): - """ - Initialize and start this connection. - - \param host_and_ports - a list of (host, port) tuples. - - \param prefer_localhost - if True and the local host is mentioned in the (host, - port) tuples, try to connect to this first - - \param try_loopback_connect - if True and the local host is found in the host - tuples, try connecting to it using loopback interface - (127.0.0.1) - - \param reconnect_sleep_initial - - initial delay in seconds to wait before reattempting - to establish a connection if connection to any of the - hosts fails. - - \param reconnect_sleep_increase - - factor by which the sleep delay is increased after - each connection attempt. For example, 0.5 means - to wait 50% longer than before the previous attempt, - 1.0 means wait twice as long, and 0.0 means keep - the delay constant. - - \param reconnect_sleep_max - - maximum delay between connection attempts, regardless - of the reconnect_sleep_increase. - - \param reconnect_sleep_jitter - - random additional time to wait (as a percentage of - the time determined using the previous parameters) - between connection attempts in order to avoid - stampeding. For example, a value of 0.1 means to wait - an extra 0%-10% (randomly determined) of the delay - calculated using the previous three parameters. - """ - - sorted_host_and_ports = [] - sorted_host_and_ports.extend(host_and_ports) - - # If localhost is preferred, make sure all (host, port) tuples - # that refer to the local host come first in the list - if prefer_localhost: - def is_local_host(host): - return host in Connection.__localhost_names - - sorted_host_and_ports.sort(lambda x, y: (int(is_local_host(y[0])) - - int(is_local_host(x[0])))) - - # If the user wishes to attempt connecting to local ports - # using the loopback interface, for each (host, port) tuple - # referring to a local host, add an entry with the host name - # replaced by 127.0.0.1 if it doesn't exist already - loopback_host_and_ports = [] - if try_loopback_connect: - for host_and_port in sorted_host_and_ports: - if is_local_host(host_and_port[0]): - port = host_and_port[1] - if (not ("127.0.0.1", port) in sorted_host_and_ports - and not ("localhost", port) in sorted_host_and_ports): - loopback_host_and_ports.append(("127.0.0.1", port)) - - # Assemble the final, possibly sorted list of (host, port) tuples - self.__host_and_ports = [] - self.__host_and_ports.extend(loopback_host_and_ports) - self.__host_and_ports.extend(sorted_host_and_ports) - - self.__recvbuf = '' - - self.__listeners = [ ] - - self.__reconnect_sleep_initial = reconnect_sleep_initial - self.__reconnect_sleep_increase = reconnect_sleep_increase - self.__reconnect_sleep_jitter = reconnect_sleep_jitter - self.__reconnect_sleep_max = reconnect_sleep_max - - self.__connect_headers = {} - if user is not None and passcode is not None: - self.__connect_headers['login'] = user - self.__connect_headers['passcode'] = passcode - - self.__socket = None - self.__current_host_and_port = None - - self.__receiver_thread_exit_condition = threading.Condition() - self.__receiver_thread_exited = False - - # - # Manage the connection - # - - def start(self): - """ - Start the connection. This should be called after all - listeners have been registered. If this method is not called, - no frames will be received by the connection. - """ - self.__running = True - self.__attempt_connection() - _thread.start_new_thread(self.__receiver_loop, ()) - - def stop(self): - """ - Stop the connection. This is equivalent to calling - disconnect() but will do a clean shutdown by waiting for the - receiver thread to exit. - """ - self.disconnect() - - self.__receiver_thread_exit_condition.acquire() - if not self.__receiver_thread_exited: - self.__receiver_thread_exit_condition.wait() - self.__receiver_thread_exit_condition.release() - - def get_host_and_port(self): - """ - Return a (host, port) tuple indicating which STOMP host and - port is currently connected, or None if there is currently no - connection. - """ - return self.__current_host_and_port - - def is_connected(self): - try: - return self.__socket is not None and self.__socket.getsockname()[1] != 0 - except socket.error: - return False - - # - # Manage objects listening to incoming frames - # - - def add_listener(self, listener): - self.__listeners.append(listener) - - def remove_listener(self, listener): - self.__listeners.remove(listener) - - # - # STOMP transmissions - # - - def subscribe(self, headers={}, **keyword_headers): - self.__send_frame_helper('SUBSCRIBE', '', self.__merge_headers([headers, keyword_headers]), [ 'destination' ]) - - def unsubscribe(self, headers={}, **keyword_headers): - self.__send_frame_helper('UNSUBSCRIBE', '', self.__merge_headers([headers, keyword_headers]), [ ('destination', 'id') ]) - - def send(self, message='', headers={}, **keyword_headers): - if '\x00' in message: - content_length_headers = {'content-length': len(message)} - else: - content_length_headers = {} - self.__send_frame_helper('SEND', message, self.__merge_headers([headers, - keyword_headers, - content_length_headers]), [ 'destination' ]) - - def ack(self, headers={}, **keyword_headers): - self.__send_frame_helper('ACK', '', self.__merge_headers([headers, keyword_headers]), [ 'message-id' ]) - - def begin(self, headers={}, **keyword_headers): - use_headers = self.__merge_headers([headers, keyword_headers]) - if not 'transaction' in list(use_headers.keys()): - use_headers['transaction'] = _uuid() - self.__send_frame_helper('BEGIN', '', use_headers, [ 'transaction' ]) - return use_headers['transaction'] - - def abort(self, headers={}, **keyword_headers): - self.__send_frame_helper('ABORT', '', self.__merge_headers([headers, keyword_headers]), [ 'transaction' ]) - - def commit(self, headers={}, **keyword_headers): - self.__send_frame_helper('COMMIT', '', self.__merge_headers([headers, keyword_headers]), [ 'transaction' ]) - - def connect(self, headers={}, **keyword_headers): - if 'wait' in keyword_headers and keyword_headers['wait']: - while not self.is_connected(): time.sleep(0.1) - del keyword_headers['wait'] - self.__send_frame_helper('CONNECT', '', self.__merge_headers([self.__connect_headers, headers, keyword_headers]), [ ]) - - def disconnect(self, headers={}, **keyword_headers): - self.__send_frame_helper('DISCONNECT', '', self.__merge_headers([self.__connect_headers, headers, keyword_headers]), [ ]) - self.__running = False - if hasattr(socket, 'SHUT_RDWR'): - self.__socket.shutdown(socket.SHUT_RDWR) - if self.__socket: - self.__socket.close() - self.__current_host_and_port = None - - # ========= PRIVATE MEMBERS ========= - - - # List of all host names (unqualified, fully-qualified, and IP - # addresses) that refer to the local host (both loopback interface - # and external interfaces). This is used for determining - # preferred targets. - __localhost_names = [ "localhost", - "127.0.0.1", - socket.gethostbyname(socket.gethostname()), - socket.gethostname(), - socket.getfqdn(socket.gethostname()) ] - # - # Used to parse STOMP header lines in the format "key:value", - # - __header_line_re = re.compile('(?P[^:]+)[:](?P.*)') - - # - # Used to parse the STOMP "content-length" header lines, - # - __content_length_re = re.compile('^content-length[:]\\s*(?P[0-9]+)', re.MULTILINE) - - def __merge_headers(self, header_map_list): - """ - Helper function for combining multiple header maps into one. - - Any underscores ('_') in header names (keys) will be replaced by dashes ('-'). - """ - headers = {} - for header_map in header_map_list: - for header_key in list(header_map.keys()): - headers[header_key] = header_map[header_key] - return headers - - def __convert_dict(self, payload): - """ - Encode python dictionary as ... structure. - """ - - xmlStr = "\n" - for key in payload: - xmlStr += "\n" - xmlStr += "%s" % key - xmlStr += "%s" % payload[key] - xmlStr += "\n" - xmlStr += "" - - return xmlStr - - def __send_frame_helper(self, command, payload, headers, required_header_keys): - """ - Helper function for sending a frame after verifying that a - given set of headers are present. - - \param command the command to send - - \param payload the frame's payload - - \param headers a dictionary containing the frame's headers - - \param required_header_keys a sequence enumerating all - required header keys. If an element in this sequence is itself - a tuple, that tuple is taken as a list of alternatives, one of - which must be present. - - \throws ArgumentError if one of the required header keys is - not present in the header map. - """ - for required_header_key in required_header_keys: - if type(required_header_key) == tuple: - found_alternative = False - for alternative in required_header_key: - if alternative in list(headers.keys()): - found_alternative = True - if not found_alternative: - raise KeyError("Command %s requires one of the following headers: %s" % (command, str(required_header_key))) - elif not required_header_key in list(headers.keys()): - raise KeyError("Command %s requires header %r" % (command, required_header_key)) - self.__send_frame(command, headers, payload) - - def __send_frame(self, command, headers={}, payload=''): - """ - Send a STOMP frame. - """ - if type(payload) == dict: - headers["transformation"] = "jms-map-xml" - payload = self.__convert_dict(payload) - - if self.__socket is not None: - frame = '%s\n%s\n%s\x00' % (command, - reduce(lambda accu, key: accu + ('%s:%s\n' % (key, headers[key])), list(headers.keys()), ''), - payload) - self.__socket.sendall(frame) - log.debug("Sent frame: type=%s, headers=%r, body=%r" % (command, headers, payload)) - else: - raise NotConnectedException() - - def __receiver_loop(self): - """ - Main loop listening for incoming data. - """ - try: - try: - threading.currentThread().setName("StompReceiver") - while self.__running: - log.debug('starting receiver loop') - - if self.__socket is None: - break - - try: - try: - for listener in self.__listeners: - if hasattr(listener, 'on_connecting'): - listener.on_connecting(self.__current_host_and_port) - - while self.__running: - frames = self.__read() - - for frame in frames: - (frame_type, headers, body) = self.__parse_frame(frame) - log.debug("Received frame: result=%r, headers=%r, body=%r" % (frame_type, headers, body)) - frame_type = frame_type.lower() - if frame_type in [ 'connected', - 'message', - 'receipt', - 'error' ]: - for listener in self.__listeners: - if hasattr(listener, 'on_%s' % frame_type): - eval('listener.on_%s(headers, body)' % frame_type) - else: - log.debug('listener %s has no such method on_%s' % (listener, frame_type)) - else: - log.warning('Unknown response frame type: "%s" (frame length was %d)' % (frame_type, len(frame))) - finally: - try: - self.__socket.close() - except: - pass # ignore errors when attempting to close socket - self.__socket = None - self.__current_host_and_port = None - except ConnectionClosedException: - if self.__running: - log.error("Lost connection") - # Notify listeners - for listener in self.__listeners: - if hasattr(listener, 'on_disconnected'): - listener.on_disconnected() - # Clear out any half-received messages after losing connection - self.__recvbuf = '' - continue - else: - break - except: - log.exception("An unhandled exception was encountered in the stomp receiver loop") - - finally: - self.__receiver_thread_exit_condition.acquire() - self.__receiver_thread_exited = True - self.__receiver_thread_exit_condition.notifyAll() - self.__receiver_thread_exit_condition.release() - - def __read(self): - """ - Read the next frame(s) from the socket. - """ - fastbuf = StringIO() - while self.__running: - try: - c = self.__socket.recv(1024) - except: - c = '' - if len(c) == 0: - raise ConnectionClosedException - fastbuf.write(c) - if '\x00' in c: - break - self.__recvbuf += fastbuf.getvalue() - fastbuf.close() - result = [] - - if len(self.__recvbuf) > 0 and self.__running: - while True: - pos = self.__recvbuf.find('\x00') - if pos >= 0: - frame = self.__recvbuf[0:pos] - preamble_end = frame.find('\n\n') - if preamble_end >= 0: - content_length_match = Connection.__content_length_re.search(frame[0:preamble_end]) - if content_length_match: - content_length = int(content_length_match.group('value')) - content_offset = preamble_end + 2 - frame_size = content_offset + content_length - if frame_size > len(frame): - # Frame contains NUL bytes, need to - # read more - if frame_size < len(self.__recvbuf): - pos = frame_size - frame = self.__recvbuf[0:pos] - else: - # Haven't read enough data yet, - # exit loop and wait for more to - # arrive - break - result.append(frame) - self.__recvbuf = self.__recvbuf[pos+1:] - else: - break - return result - - - def __transform(self, body, transType): - """ - Perform body transformation. Currently, the only supported transformation is - 'jms-map-xml', which converts a map into python dictionary. This can be extended - to support other transformation types. - - The body has the following format: - - - name - Dejan - - - city - Belgrade - - - - (see http://docs.codehaus.org/display/STOMP/Stomp+v1.1+Ideas) - """ - - if transType != 'jms-map-xml': - return body - - try: - entries = {} - doc = xml.dom.minidom.parseString(body) - rootElem = doc.documentElement - for entryElem in rootElem.getElementsByTagName("entry"): - pair = [] - for node in entryElem.childNodes: - if not isinstance(node, xml.dom.minidom.Element): continue - pair.append(node.firstChild.nodeValue) - assert len(pair) == 2 - entries[pair[0]] = pair[1] - return entries - except Exception as ex: - # unable to parse message. return original - return body - - - def __parse_frame(self, frame): - """ - Parse a STOMP frame into a (frame_type, headers, body) tuple, - where frame_type is the frame type as a string (e.g. MESSAGE), - headers is a map containing all header key/value pairs, and - body is a string containing the frame's payload. - """ - preamble_end = frame.find('\n\n') - preamble = frame[0:preamble_end] - preamble_lines = preamble.split('\n') - body = frame[preamble_end+2:] - - # Skip any leading newlines - first_line = 0 - while first_line < len(preamble_lines) and len(preamble_lines[first_line]) == 0: - first_line += 1 - - # Extract frame type - frame_type = preamble_lines[first_line] - - # Put headers into a key/value map - headers = {} - for header_line in preamble_lines[first_line+1:]: - header_match = Connection.__header_line_re.match(header_line) - if header_match: - headers[header_match.group('key')] = header_match.group('value') - - if 'transformation' in headers: - body = self.__transform(body, headers['transformation']) - - return (frame_type, headers, body) - - def __attempt_connection(self): - """ - Try connecting to the (host, port) tuples specified at construction time. - """ - - sleep_exp = 1 - while self.__running and self.__socket is None: - for host_and_port in self.__host_and_ports: - try: - log.debug("Attempting connection to host %s, port %s" % host_and_port) - self.__socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.__socket.settimeout(None) - self.__socket.connect(host_and_port) - self.__current_host_and_port = host_and_port - log.info("Established connection to host %s, port %s" % host_and_port) - break - except socket.error: - self.__socket = None - if type(sys.exc_info()[1]) == tuple: - exc = sys.exc_info()[1][1] - else: - exc = sys.exc_info()[1] - log.warning("Could not connect to host %s, port %s: %s" % (host_and_port[0], host_and_port[1], exc)) - - if self.__socket is None: - sleep_duration = (min(self.__reconnect_sleep_max, - ((self.__reconnect_sleep_initial / (1.0 + self.__reconnect_sleep_increase)) - * math.pow(1.0 + self.__reconnect_sleep_increase, sleep_exp))) - * (1.0 + random.random() * self.__reconnect_sleep_jitter)) - sleep_end = time.time() + sleep_duration - log.debug("Sleeping for %.1f seconds before attempting reconnect" % sleep_duration) - while self.__running and time.time() < sleep_end: - time.sleep(0.2) - - if sleep_duration < self.__reconnect_sleep_max: - sleep_exp += 1 - -# -# command line testing -# -if __name__ == '__main__': - - # If the readline module is available, make command input easier - try: - import readline - def stomp_completer(text, state): - commands = [ 'subscribe', 'unsubscribe', - 'send', 'ack', - 'begin', 'abort', 'commit', - 'connect', 'disconnect' - ] - for command in commands[state:]: - if command.startswith(text): - return "%s " % command - return None - - readline.parse_and_bind("tab: complete") - readline.set_completer(stomp_completer) - readline.set_completer_delims("") - except ImportError: - pass # ignore unavailable readline module - - class StompTester(object): - def __init__(self, host='localhost', port=61613, user='', passcode=''): - self.c = Connection([(host, port)], user, passcode) - self.c.add_listener(self) - self.c.start() - - def __print_async(self, frame_type, headers, body): - #print("\r \r",) - print(frame_type) - for header_key in list(headers.keys()): - print('%s: %s' % (header_key, headers[header_key])) - print() - print(body) - print('> ', end=' ') - sys.stdout.flush() - - def on_connecting(self, host_and_port): - self.c.connect(wait=True) - - def on_disconnected(self): - print("lost connection") - - def on_message(self, headers, body): - self.__print_async("MESSAGE", headers, body) - - def on_error(self, headers, body): - self.__print_async("ERROR", headers, body) - - def on_receipt(self, headers, body): - self.__print_async("RECEIPT", headers, body) - - def on_connected(self, headers, body): - self.__print_async("CONNECTED", headers, body) - - def ack(self, args): - if len(args) < 3: - self.c.ack(message_id=args[1]) - else: - self.c.ack(message_id=args[1], transaction=args[2]) - - def abort(self, args): - self.c.abort(transaction=args[1]) - - def begin(self, args): - print('transaction id: %s' % self.c.begin()) - - def commit(self, args): - if len(args) < 2: - print('expecting: commit ') - else: - print('committing %s' % args[1]) - self.c.commit(transaction=args[1]) - - def disconnect(self, args): - try: - self.c.disconnect() - except NotConnectedException: - pass # ignore if no longer connected - - def send(self, args): - if len(args) < 3: - print('expecting: send ') - else: - self.c.send(destination=args[1], message=' '.join(args[2:])) - - def sendtrans(self, args): - if len(args) < 3: - print('expecting: sendtrans ') - else: - self.c.send(destination=args[1], message="%s\n" % ' '.join(args[3:]), transaction=args[2]) - - def subscribe(self, args): - if len(args) < 2: - print('expecting: subscribe [ack]') - elif len(args) > 2: - print('subscribing to "%s" with acknowledge set to "%s"' % (args[1], args[2])) - self.c.subscribe(destination=args[1], ack=args[2]) - else: - print('subscribing to "%s" with auto acknowledge' % args[1]) - self.c.subscribe(destination=args[1], ack='auto') - - def unsubscribe(self, args): - if len(args) < 2: - print('expecting: unsubscribe ') - else: - print('unsubscribing from "%s"' % args[1]) - self.c.unsubscribe(destination=args[1]) - - if len(sys.argv) > 5: - print('USAGE: stomp.py [host] [port] [user] [passcode]') - sys.exit(1) - - if len(sys.argv) >= 2: - host = sys.argv[1] - else: - host = "localhost" - if len(sys.argv) >= 3: - port = int(sys.argv[2]) - else: - port = 61613 - - if len(sys.argv) >= 5: - user = sys.argv[3] - passcode = sys.argv[4] - else: - user = None - passcode = None - - st = StompTester(host, port, user, passcode) - try: - while True: - line = input("\r> ") - if not line or line.lstrip().rstrip() == '': - continue - elif 'quit' in line or 'disconnect' in line: - break - split = line.split() - command = split[0] - if not command.startswith("on_") and hasattr(st, command): - getattr(st, command)(split) - else: - print('unrecognized command') - finally: - st.disconnect(None) - - diff --git a/awips/test/Test b/awips/test/Test deleted file mode 100644 index 7f8c36c..0000000 --- a/awips/test/Test +++ /dev/null @@ -1,26 +0,0 @@ -# -# Pure python logging mechanism for logging to AlertViz from -# pure python (ie not JEP). DO NOT USE IN PYTHON CALLED -# FROM JAVA. -# -# -# SOFTWARE HISTORY -# -# Date Ticket# Engineer Description -# ------------ ---------- ----------- -------------------------- -# 11/03/10 5849 cjeanbap Initial Creation. -# -# -# - -## to execute type python Test - - -import os -import logging -from awips import AlertVizHandler -from . import Record - -avh = AlertVizHandler.AlertVizHandler(host=os.getenv("BROKER_ADDR","localhost"), port=9581, category='LOCAL', source='ANNOUNCER', level=logging.NOTSET) -record = Record.Record(10) -avh.emit(record)