python-awips/awips/stomp.py

936 lines
34 KiB
Python
Raw Normal View History

2015-06-12 11:57:06 -06:00
#!/usr/bin/env python
##
# This software was developed and / or modified by Raytheon Company,
# pursuant to Contract DG133W-05-CQ-1067 with the US Government.
2016-03-16 16:32:17 -05:00
#
2015-06-12 11:57:06 -06:00
# U.S. EXPORT CONTROLLED TECHNICAL DATA
# This software product contains export-restricted data whose
# export/transfer/disclosure is restricted by U.S. law. Dissemination
# to non-U.S. persons whether in the United States or abroad requires
# an export license or other authorization.
2016-03-16 16:32:17 -05:00
#
2015-06-12 11:57:06 -06:00
# Contractor Name: Raytheon Company
# Contractor Address: 6825 Pine Street, Suite 340
# Mail Stop B8
# Omaha, NE 68106
# 402.291.0100
2016-03-16 16:32:17 -05:00
#
2015-06-12 11:57:06 -06:00
# See the AWIPS II Master Rights File ("Master Rights File.pdf") for
# further licensing information.
##
"""Stomp Protocol Connectivity
This provides basic connectivity to a message broker supporting the 'stomp' protocol.
At the moment ACK, SEND, SUBSCRIBE, UNSUBSCRIBE, BEGIN, ABORT, COMMIT, CONNECT and DISCONNECT operations
are supported.
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
This changes the previous version which required a listener per subscription -- now a listener object
just calls the 'addlistener' method and will receive all messages sent in response to all/any subscriptions.
(The reason for the change is that the handling of an 'ack' becomes problematic unless the listener mechanism
is decoupled from subscriptions).
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
Note that you must 'start' an instance of Connection to begin receiving messages. For example:
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
conn = stomp.Connection([('localhost', 62003)], 'myuser', 'mypass')
conn.start()
Meta-Data
---------
Author: Jason R Briggs
License: http://www.apache.org/licenses/LICENSE-2.0
Start Date: 2005/12/01
Last Revision Date: $Date: 2008/09/11 00:16 $
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
Notes/Attribution
-----------------
* uuid method courtesy of Carl Free Jr:
http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/213761
* patch from Andreas Schobel
* patches from Julian Scheid of Rising Sun Pictures (http://open.rsp.com.au)
* patch from Fernando
* patches from Eugene Strulyov
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
Updates
-------
* 2007/03/31 : (Andreas Schobel) patch to fix newlines problem in ActiveMQ 4.1
* 2007/09 : (JRB) updated to get stomp.py working in Jython as well as Python
* 2007/09/05 : (Julian Scheid) patch to allow sending custom headers
* 2007/09/18 : (JRB) changed code to use logging instead of just print. added logger for jython to work
* 2007/09/18 : (Julian Scheid) various updates, including:
2016-03-16 16:32:17 -05:00
- change incoming message handling so that callbacks are invoked on the listener not only for MESSAGE, but also for
2015-06-12 11:57:06 -06:00
CONNECTED, RECEIPT and ERROR frames.
- callbacks now get not only the payload but any headers specified by the server
- all outgoing messages now sent via a single method
2016-03-16 16:32:17 -05:00
- only one connection used
2015-06-12 11:57:06 -06:00
- change to use thread instead of threading
- sends performed on the calling thread
- receiver loop now deals with multiple messages in one received chunk of data
- added reconnection attempts and connection fail-over
- changed defaults for "user" and "passcode" to None instead of empty string (fixed transmission of those values)
- added readline support
2016-03-16 16:32:17 -05:00
* 2008/03/26 : (Fernando) added cStringIO for faster performance on large messages
2015-06-12 11:57:06 -06:00
* 2008/09/10 : (Eugene) remove lower() on headers to support case-sensitive header names
* 2008/09/11 : (JRB) fix incompatibilities with RabbitMQ, add wait for socket-connect
* 2008/10/28 : (Eugene) add jms map (from stomp1.1 ideas)
* 2008/11/25 : (Eugene) remove superfluous (incorrect) locking code
* 2009/02/05 : (JRB) remove code to replace underscores with dashes in header names (causes a problem in rabbit-mq)
* 2009/03/29 : (JRB) minor change to add logging config file
(JRB) minor change to add socket timeout, suggested by Israel
* 2009/04/01 : (Gavin) patch to change md5 to hashlib (for 2.6 compatibility)
* 2009/04/02 : (Fernando Ciciliati) fix overflow bug when waiting too long to connect to the broker
"""
import hashlib
import math
import random
import re
import socket
import sys
2016-04-16 17:00:50 -06:00
import _thread
2015-06-12 11:57:06 -06:00
import threading
import time
import types
import xml.dom.minidom
2016-04-16 17:00:50 -06:00
from io import StringIO
from functools import reduce
2015-06-12 11:57:06 -06:00
#
# stomp.py version number
#
_version = 1.8
def _uuid( *args ):
"""
uuid courtesy of Carl Free Jr:
(http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/213761)
"""
2016-03-16 16:32:17 -05:00
2016-04-16 17:00:50 -06:00
t = int( time.time() * 1000 )
r = int( random.random() * 100000000000000000 )
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
try:
a = socket.gethostbyname( socket.gethostname() )
except:
# if we can't get a network address, just imagine one
2016-04-16 17:00:50 -06:00
a = random.random() * 100000000000000000
2015-06-12 11:57:06 -06:00
data = str(t) + ' ' + str(r) + ' ' + str(a) + ' ' + str(args)
md5 = hashlib.md5()
md5.update(data)
data = md5.hexdigest()
return data
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
class DevNullLogger(object):
"""
dummy logging class for environments without the logging module
"""
def log(self, msg):
2016-04-16 17:00:50 -06:00
print(msg)
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
def devnull(self, msg):
pass
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
debug = devnull
info = devnull
warning = log
error = log
critical = log
exception = log
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
def isEnabledFor(self, lvl):
return False
#
# add logging if available
#
try:
import logging
import logging.config
logging.config.fileConfig("stomp.log.conf")
log = logging.getLogger('root')
except:
log = DevNullLogger()
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
class ConnectionClosedException(Exception):
"""
Raised in the receiver thread when the connection has been closed
by the server.
"""
pass
class NotConnectedException(Exception):
"""
Raised by Connection.__send_frame when there is currently no server
connection.
"""
pass
class ConnectionListener(object):
"""
This class should be used as a base class for objects registered
using Connection.add_listener().
"""
def on_connecting(self, host_and_port):
"""
Called by the STOMP connection once a TCP/IP connection to the
STOMP server has been established or re-established. Note that
at this point, no connection has been established on the STOMP
protocol level. For this, you need to invoke the "connect"
method on the connection.
\param host_and_port a tuple containing the host name and port
number to which the connection has been established.
"""
pass
def on_connected(self, headers, body):
"""
Called by the STOMP connection when a CONNECTED frame is
received, that is after a connection has been established or
re-established.
\param headers a dictionary containing all headers sent by the
server as key/value pairs.
\param body the frame's payload. This is usually empty for
CONNECTED frames.
"""
pass
def on_disconnected(self):
"""
Called by the STOMP connection when a TCP/IP connection to the
STOMP server has been lost. No messages should be sent via
the connection until it has been reestablished.
"""
pass
def on_message(self, headers, body):
"""
Called by the STOMP connection when a MESSAGE frame is
received.
\param headers a dictionary containing all headers sent by the
server as key/value pairs.
\param body the frame's payload - the message body.
"""
pass
def on_receipt(self, headers, body):
"""
Called by the STOMP connection when a RECEIPT frame is
received, sent by the server if requested by the client using
the 'receipt' header.
\param headers a dictionary containing all headers sent by the
server as key/value pairs.
\param body the frame's payload. This is usually empty for
RECEIPT frames.
"""
pass
def on_error(self, headers, body):
"""
Called by the STOMP connection when an ERROR frame is
received.
\param headers a dictionary containing all headers sent by the
server as key/value pairs.
\param body the frame's payload - usually a detailed error
description.
"""
pass
class Connection(object):
"""
Represents a STOMP client connection.
"""
2016-03-16 16:32:17 -05:00
def __init__(self,
host_and_ports = [ ('localhost', 61613) ],
2015-06-12 11:57:06 -06:00
user = None,
passcode = None,
prefer_localhost = True,
try_loopback_connect = True,
reconnect_sleep_initial = 0.1,
reconnect_sleep_increase = 0.5,
reconnect_sleep_jitter = 0.1,
reconnect_sleep_max = 60.0):
"""
Initialize and start this connection.
2016-03-16 16:32:17 -05:00
\param host_and_ports
2015-06-12 11:57:06 -06:00
a list of (host, port) tuples.
\param prefer_localhost
if True and the local host is mentioned in the (host,
port) tuples, try to connect to this first
2016-03-16 16:32:17 -05:00
\param try_loopback_connect
2015-06-12 11:57:06 -06:00
if True and the local host is found in the host
tuples, try connecting to it using loopback interface
(127.0.0.1)
2016-03-16 16:32:17 -05:00
\param reconnect_sleep_initial
2015-06-12 11:57:06 -06:00
initial delay in seconds to wait before reattempting
to establish a connection if connection to any of the
hosts fails.
2016-03-16 16:32:17 -05:00
\param reconnect_sleep_increase
2015-06-12 11:57:06 -06:00
factor by which the sleep delay is increased after
each connection attempt. For example, 0.5 means
to wait 50% longer than before the previous attempt,
1.0 means wait twice as long, and 0.0 means keep
the delay constant.
\param reconnect_sleep_max
maximum delay between connection attempts, regardless
of the reconnect_sleep_increase.
\param reconnect_sleep_jitter
random additional time to wait (as a percentage of
the time determined using the previous parameters)
between connection attempts in order to avoid
stampeding. For example, a value of 0.1 means to wait
an extra 0%-10% (randomly determined) of the delay
calculated using the previous three parameters.
"""
sorted_host_and_ports = []
sorted_host_and_ports.extend(host_and_ports)
# If localhost is preferred, make sure all (host, port) tuples
# that refer to the local host come first in the list
if prefer_localhost:
def is_local_host(host):
return host in Connection.__localhost_names
2016-03-16 16:32:17 -05:00
sorted_host_and_ports.sort(lambda x, y: (int(is_local_host(y[0]))
2015-06-12 11:57:06 -06:00
- int(is_local_host(x[0]))))
# If the user wishes to attempt connecting to local ports
# using the loopback interface, for each (host, port) tuple
# referring to a local host, add an entry with the host name
# replaced by 127.0.0.1 if it doesn't exist already
loopback_host_and_ports = []
if try_loopback_connect:
for host_and_port in sorted_host_and_ports:
if is_local_host(host_and_port[0]):
port = host_and_port[1]
2016-03-16 16:32:17 -05:00
if (not ("127.0.0.1", port) in sorted_host_and_ports
2015-06-12 11:57:06 -06:00
and not ("localhost", port) in sorted_host_and_ports):
loopback_host_and_ports.append(("127.0.0.1", port))
# Assemble the final, possibly sorted list of (host, port) tuples
self.__host_and_ports = []
self.__host_and_ports.extend(loopback_host_and_ports)
self.__host_and_ports.extend(sorted_host_and_ports)
self.__recvbuf = ''
self.__listeners = [ ]
self.__reconnect_sleep_initial = reconnect_sleep_initial
self.__reconnect_sleep_increase = reconnect_sleep_increase
self.__reconnect_sleep_jitter = reconnect_sleep_jitter
self.__reconnect_sleep_max = reconnect_sleep_max
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
self.__connect_headers = {}
if user is not None and passcode is not None:
self.__connect_headers['login'] = user
self.__connect_headers['passcode'] = passcode
self.__socket = None
self.__current_host_and_port = None
self.__receiver_thread_exit_condition = threading.Condition()
self.__receiver_thread_exited = False
#
# Manage the connection
#
def start(self):
"""
Start the connection. This should be called after all
listeners have been registered. If this method is not called,
no frames will be received by the connection.
"""
self.__running = True
self.__attempt_connection()
2016-04-16 17:00:50 -06:00
_thread.start_new_thread(self.__receiver_loop, ())
2015-06-12 11:57:06 -06:00
def stop(self):
"""
Stop the connection. This is equivalent to calling
disconnect() but will do a clean shutdown by waiting for the
receiver thread to exit.
"""
self.disconnect()
self.__receiver_thread_exit_condition.acquire()
if not self.__receiver_thread_exited:
self.__receiver_thread_exit_condition.wait()
self.__receiver_thread_exit_condition.release()
def get_host_and_port(self):
"""
Return a (host, port) tuple indicating which STOMP host and
port is currently connected, or None if there is currently no
connection.
"""
return self.__current_host_and_port
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
def is_connected(self):
try:
return self.__socket is not None and self.__socket.getsockname()[1] != 0
except socket.error:
return False
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
#
# Manage objects listening to incoming frames
#
def add_listener(self, listener):
self.__listeners.append(listener)
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
def remove_listener(self, listener):
self.__listeners.remove(listener)
#
# STOMP transmissions
#
def subscribe(self, headers={}, **keyword_headers):
self.__send_frame_helper('SUBSCRIBE', '', self.__merge_headers([headers, keyword_headers]), [ 'destination' ])
def unsubscribe(self, headers={}, **keyword_headers):
self.__send_frame_helper('UNSUBSCRIBE', '', self.__merge_headers([headers, keyword_headers]), [ ('destination', 'id') ])
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
def send(self, message='', headers={}, **keyword_headers):
if '\x00' in message:
content_length_headers = {'content-length': len(message)}
else:
content_length_headers = {}
2016-03-16 16:32:17 -05:00
self.__send_frame_helper('SEND', message, self.__merge_headers([headers,
2015-06-12 11:57:06 -06:00
keyword_headers,
content_length_headers]), [ 'destination' ])
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
def ack(self, headers={}, **keyword_headers):
self.__send_frame_helper('ACK', '', self.__merge_headers([headers, keyword_headers]), [ 'message-id' ])
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
def begin(self, headers={}, **keyword_headers):
use_headers = self.__merge_headers([headers, keyword_headers])
2016-04-16 17:00:50 -06:00
if not 'transaction' in list(use_headers.keys()):
2015-06-12 11:57:06 -06:00
use_headers['transaction'] = _uuid()
self.__send_frame_helper('BEGIN', '', use_headers, [ 'transaction' ])
return use_headers['transaction']
def abort(self, headers={}, **keyword_headers):
self.__send_frame_helper('ABORT', '', self.__merge_headers([headers, keyword_headers]), [ 'transaction' ])
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
def commit(self, headers={}, **keyword_headers):
self.__send_frame_helper('COMMIT', '', self.__merge_headers([headers, keyword_headers]), [ 'transaction' ])
def connect(self, headers={}, **keyword_headers):
2016-04-16 17:00:50 -06:00
if 'wait' in keyword_headers and keyword_headers['wait']:
2015-06-12 11:57:06 -06:00
while not self.is_connected(): time.sleep(0.1)
del keyword_headers['wait']
self.__send_frame_helper('CONNECT', '', self.__merge_headers([self.__connect_headers, headers, keyword_headers]), [ ])
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
def disconnect(self, headers={}, **keyword_headers):
self.__send_frame_helper('DISCONNECT', '', self.__merge_headers([self.__connect_headers, headers, keyword_headers]), [ ])
self.__running = False
if hasattr(socket, 'SHUT_RDWR'):
self.__socket.shutdown(socket.SHUT_RDWR)
if self.__socket:
self.__socket.close()
self.__current_host_and_port = None
# ========= PRIVATE MEMBERS =========
# List of all host names (unqualified, fully-qualified, and IP
# addresses) that refer to the local host (both loopback interface
# and external interfaces). This is used for determining
# preferred targets.
__localhost_names = [ "localhost",
"127.0.0.1",
socket.gethostbyname(socket.gethostname()),
socket.gethostname(),
socket.getfqdn(socket.gethostname()) ]
#
# Used to parse STOMP header lines in the format "key:value",
#
2016-03-16 16:32:17 -05:00
__header_line_re = re.compile('(?P<key>[^:]+)[:](?P<value>.*)')
2015-06-12 11:57:06 -06:00
#
# Used to parse the STOMP "content-length" header lines,
#
__content_length_re = re.compile('^content-length[:]\\s*(?P<value>[0-9]+)', re.MULTILINE)
def __merge_headers(self, header_map_list):
"""
Helper function for combining multiple header maps into one.
Any underscores ('_') in header names (keys) will be replaced by dashes ('-').
"""
headers = {}
for header_map in header_map_list:
2016-04-16 17:00:50 -06:00
for header_key in list(header_map.keys()):
2015-06-12 11:57:06 -06:00
headers[header_key] = header_map[header_key]
return headers
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
def __convert_dict(self, payload):
"""
Encode python dictionary as <map>...</map> structure.
"""
xmlStr = "<map>\n"
for key in payload:
xmlStr += "<entry>\n"
xmlStr += "<string>%s</string>" % key
xmlStr += "<string>%s</string>" % payload[key]
xmlStr += "</entry>\n"
xmlStr += "</map>"
return xmlStr
def __send_frame_helper(self, command, payload, headers, required_header_keys):
"""
Helper function for sending a frame after verifying that a
given set of headers are present.
\param command the command to send
\param payload the frame's payload
\param headers a dictionary containing the frame's headers
\param required_header_keys a sequence enumerating all
required header keys. If an element in this sequence is itself
a tuple, that tuple is taken as a list of alternatives, one of
which must be present.
\throws ArgumentError if one of the required header keys is
not present in the header map.
"""
for required_header_key in required_header_keys:
if type(required_header_key) == tuple:
found_alternative = False
for alternative in required_header_key:
2016-04-16 17:00:50 -06:00
if alternative in list(headers.keys()):
2015-06-12 11:57:06 -06:00
found_alternative = True
if not found_alternative:
raise KeyError("Command %s requires one of the following headers: %s" % (command, str(required_header_key)))
2016-04-16 17:00:50 -06:00
elif not required_header_key in list(headers.keys()):
2015-06-12 11:57:06 -06:00
raise KeyError("Command %s requires header %r" % (command, required_header_key))
self.__send_frame(command, headers, payload)
def __send_frame(self, command, headers={}, payload=''):
"""
Send a STOMP frame.
"""
if type(payload) == dict:
headers["transformation"] = "jms-map-xml"
2016-03-16 16:32:17 -05:00
payload = self.__convert_dict(payload)
2015-06-12 11:57:06 -06:00
if self.__socket is not None:
frame = '%s\n%s\n%s\x00' % (command,
2016-04-16 17:00:50 -06:00
reduce(lambda accu, key: accu + ('%s:%s\n' % (key, headers[key])), list(headers.keys()), ''),
2016-03-16 16:32:17 -05:00
payload)
2015-06-12 11:57:06 -06:00
self.__socket.sendall(frame)
log.debug("Sent frame: type=%s, headers=%r, body=%r" % (command, headers, payload))
else:
raise NotConnectedException()
def __receiver_loop(self):
"""
Main loop listening for incoming data.
"""
try:
try:
threading.currentThread().setName("StompReceiver")
while self.__running:
log.debug('starting receiver loop')
if self.__socket is None:
break
try:
try:
for listener in self.__listeners:
if hasattr(listener, 'on_connecting'):
listener.on_connecting(self.__current_host_and_port)
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
while self.__running:
frames = self.__read()
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
for frame in frames:
(frame_type, headers, body) = self.__parse_frame(frame)
log.debug("Received frame: result=%r, headers=%r, body=%r" % (frame_type, headers, body))
frame_type = frame_type.lower()
2016-03-16 16:32:17 -05:00
if frame_type in [ 'connected',
'message',
'receipt',
2015-06-12 11:57:06 -06:00
'error' ]:
for listener in self.__listeners:
if hasattr(listener, 'on_%s' % frame_type):
eval('listener.on_%s(headers, body)' % frame_type)
else:
2016-03-16 16:32:17 -05:00
log.debug('listener %s has no such method on_%s' % (listener, frame_type))
2015-06-12 11:57:06 -06:00
else:
log.warning('Unknown response frame type: "%s" (frame length was %d)' % (frame_type, len(frame)))
finally:
try:
self.__socket.close()
except:
pass # ignore errors when attempting to close socket
self.__socket = None
self.__current_host_and_port = None
except ConnectionClosedException:
if self.__running:
log.error("Lost connection")
# Notify listeners
for listener in self.__listeners:
if hasattr(listener, 'on_disconnected'):
listener.on_disconnected()
# Clear out any half-received messages after losing connection
self.__recvbuf = ''
continue
else:
break
except:
log.exception("An unhandled exception was encountered in the stomp receiver loop")
finally:
self.__receiver_thread_exit_condition.acquire()
self.__receiver_thread_exited = True
self.__receiver_thread_exit_condition.notifyAll()
self.__receiver_thread_exit_condition.release()
def __read(self):
"""
Read the next frame(s) from the socket.
"""
fastbuf = StringIO()
while self.__running:
try:
c = self.__socket.recv(1024)
except:
c = ''
if len(c) == 0:
raise ConnectionClosedException
fastbuf.write(c)
if '\x00' in c:
break
self.__recvbuf += fastbuf.getvalue()
2016-03-16 16:32:17 -05:00
fastbuf.close()
2015-06-12 11:57:06 -06:00
result = []
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
if len(self.__recvbuf) > 0 and self.__running:
while True:
pos = self.__recvbuf.find('\x00')
if pos >= 0:
frame = self.__recvbuf[0:pos]
preamble_end = frame.find('\n\n')
if preamble_end >= 0:
content_length_match = Connection.__content_length_re.search(frame[0:preamble_end])
if content_length_match:
content_length = int(content_length_match.group('value'))
content_offset = preamble_end + 2
frame_size = content_offset + content_length
if frame_size > len(frame):
# Frame contains NUL bytes, need to
# read more
if frame_size < len(self.__recvbuf):
pos = frame_size
frame = self.__recvbuf[0:pos]
else:
# Haven't read enough data yet,
# exit loop and wait for more to
# arrive
break
result.append(frame)
self.__recvbuf = self.__recvbuf[pos+1:]
else:
break
return result
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
def __transform(self, body, transType):
"""
Perform body transformation. Currently, the only supported transformation is
'jms-map-xml', which converts a map into python dictionary. This can be extended
to support other transformation types.
2016-03-16 16:32:17 -05:00
The body has the following format:
2015-06-12 11:57:06 -06:00
<map>
<entry>
<string>name</string>
<string>Dejan</string>
</entry>
<entry>
<string>city</string>
<string>Belgrade</string>
</entry>
</map>
(see http://docs.codehaus.org/display/STOMP/Stomp+v1.1+Ideas)
"""
if transType != 'jms-map-xml':
return body
try:
entries = {}
doc = xml.dom.minidom.parseString(body)
rootElem = doc.documentElement
for entryElem in rootElem.getElementsByTagName("entry"):
pair = []
for node in entryElem.childNodes:
if not isinstance(node, xml.dom.minidom.Element): continue
pair.append(node.firstChild.nodeValue)
assert len(pair) == 2
entries[pair[0]] = pair[1]
return entries
2016-04-16 17:00:50 -06:00
except Exception as ex:
2015-06-12 11:57:06 -06:00
# unable to parse message. return original
return body
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
def __parse_frame(self, frame):
"""
Parse a STOMP frame into a (frame_type, headers, body) tuple,
where frame_type is the frame type as a string (e.g. MESSAGE),
headers is a map containing all header key/value pairs, and
body is a string containing the frame's payload.
"""
preamble_end = frame.find('\n\n')
preamble = frame[0:preamble_end]
preamble_lines = preamble.split('\n')
body = frame[preamble_end+2:]
# Skip any leading newlines
first_line = 0
while first_line < len(preamble_lines) and len(preamble_lines[first_line]) == 0:
first_line += 1
# Extract frame type
frame_type = preamble_lines[first_line]
# Put headers into a key/value map
headers = {}
for header_line in preamble_lines[first_line+1:]:
header_match = Connection.__header_line_re.match(header_line)
if header_match:
headers[header_match.group('key')] = header_match.group('value')
if 'transformation' in headers:
body = self.__transform(body, headers['transformation'])
return (frame_type, headers, body)
def __attempt_connection(self):
"""
Try connecting to the (host, port) tuples specified at construction time.
"""
sleep_exp = 1
while self.__running and self.__socket is None:
for host_and_port in self.__host_and_ports:
try:
log.debug("Attempting connection to host %s, port %s" % host_and_port)
self.__socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.__socket.settimeout(None)
self.__socket.connect(host_and_port)
self.__current_host_and_port = host_and_port
log.info("Established connection to host %s, port %s" % host_and_port)
break
except socket.error:
self.__socket = None
2016-04-16 17:00:50 -06:00
if type(sys.exc_info()[1]) == tuple:
2015-06-12 11:57:06 -06:00
exc = sys.exc_info()[1][1]
else:
exc = sys.exc_info()[1]
log.warning("Could not connect to host %s, port %s: %s" % (host_and_port[0], host_and_port[1], exc))
if self.__socket is None:
2016-03-16 16:32:17 -05:00
sleep_duration = (min(self.__reconnect_sleep_max,
((self.__reconnect_sleep_initial / (1.0 + self.__reconnect_sleep_increase))
2015-06-12 11:57:06 -06:00
* math.pow(1.0 + self.__reconnect_sleep_increase, sleep_exp)))
* (1.0 + random.random() * self.__reconnect_sleep_jitter))
sleep_end = time.time() + sleep_duration
log.debug("Sleeping for %.1f seconds before attempting reconnect" % sleep_duration)
while self.__running and time.time() < sleep_end:
time.sleep(0.2)
if sleep_duration < self.__reconnect_sleep_max:
sleep_exp += 1
#
# command line testing
#
if __name__ == '__main__':
# If the readline module is available, make command input easier
try:
import readline
def stomp_completer(text, state):
2016-03-16 16:32:17 -05:00
commands = [ 'subscribe', 'unsubscribe',
'send', 'ack',
'begin', 'abort', 'commit',
2015-06-12 11:57:06 -06:00
'connect', 'disconnect'
]
for command in commands[state:]:
if command.startswith(text):
return "%s " % command
return None
readline.parse_and_bind("tab: complete")
readline.set_completer(stomp_completer)
readline.set_completer_delims("")
except ImportError:
pass # ignore unavailable readline module
class StompTester(object):
def __init__(self, host='localhost', port=61613, user='', passcode=''):
self.c = Connection([(host, port)], user, passcode)
self.c.add_listener(self)
self.c.start()
def __print_async(self, frame_type, headers, body):
2016-04-16 17:00:50 -06:00
print("\r \r", end=' ')
print(frame_type)
for header_key in list(headers.keys()):
print('%s: %s' % (header_key, headers[header_key]))
print()
print(body)
print('> ', end=' ')
2015-06-12 11:57:06 -06:00
sys.stdout.flush()
def on_connecting(self, host_and_port):
self.c.connect(wait=True)
def on_disconnected(self):
2016-04-16 17:00:50 -06:00
print("lost connection")
2015-06-12 11:57:06 -06:00
def on_message(self, headers, body):
self.__print_async("MESSAGE", headers, body)
def on_error(self, headers, body):
self.__print_async("ERROR", headers, body)
def on_receipt(self, headers, body):
self.__print_async("RECEIPT", headers, body)
def on_connected(self, headers, body):
self.__print_async("CONNECTED", headers, body)
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
def ack(self, args):
if len(args) < 3:
self.c.ack(message_id=args[1])
else:
self.c.ack(message_id=args[1], transaction=args[2])
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
def abort(self, args):
self.c.abort(transaction=args[1])
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
def begin(self, args):
2016-04-16 17:00:50 -06:00
print('transaction id: %s' % self.c.begin())
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
def commit(self, args):
if len(args) < 2:
2016-04-16 17:00:50 -06:00
print('expecting: commit <transid>')
2015-06-12 11:57:06 -06:00
else:
2016-04-16 17:00:50 -06:00
print('committing %s' % args[1])
2015-06-12 11:57:06 -06:00
self.c.commit(transaction=args[1])
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
def disconnect(self, args):
try:
self.c.disconnect()
except NotConnectedException:
pass # ignore if no longer connected
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
def send(self, args):
if len(args) < 3:
2016-04-16 17:00:50 -06:00
print('expecting: send <destination> <message>')
2015-06-12 11:57:06 -06:00
else:
self.c.send(destination=args[1], message=' '.join(args[2:]))
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
def sendtrans(self, args):
if len(args) < 3:
2016-04-16 17:00:50 -06:00
print('expecting: sendtrans <destination> <transid> <message>')
2015-06-12 11:57:06 -06:00
else:
self.c.send(destination=args[1], message="%s\n" % ' '.join(args[3:]), transaction=args[2])
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
def subscribe(self, args):
if len(args) < 2:
2016-04-16 17:00:50 -06:00
print('expecting: subscribe <destination> [ack]')
2015-06-12 11:57:06 -06:00
elif len(args) > 2:
2016-04-16 17:00:50 -06:00
print('subscribing to "%s" with acknowledge set to "%s"' % (args[1], args[2]))
2015-06-12 11:57:06 -06:00
self.c.subscribe(destination=args[1], ack=args[2])
else:
2016-04-16 17:00:50 -06:00
print('subscribing to "%s" with auto acknowledge' % args[1])
2015-06-12 11:57:06 -06:00
self.c.subscribe(destination=args[1], ack='auto')
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
def unsubscribe(self, args):
if len(args) < 2:
2016-04-16 17:00:50 -06:00
print('expecting: unsubscribe <destination>')
2015-06-12 11:57:06 -06:00
else:
2016-04-16 17:00:50 -06:00
print('unsubscribing from "%s"' % args[1])
2015-06-12 11:57:06 -06:00
self.c.unsubscribe(destination=args[1])
if len(sys.argv) > 5:
2016-04-16 17:00:50 -06:00
print('USAGE: stomp.py [host] [port] [user] [passcode]')
2015-06-12 11:57:06 -06:00
sys.exit(1)
if len(sys.argv) >= 2:
host = sys.argv[1]
else:
host = "localhost"
if len(sys.argv) >= 3:
port = int(sys.argv[2])
else:
port = 61613
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
if len(sys.argv) >= 5:
user = sys.argv[3]
passcode = sys.argv[4]
else:
user = None
passcode = None
2016-03-16 16:32:17 -05:00
2015-06-12 11:57:06 -06:00
st = StompTester(host, port, user, passcode)
try:
while True:
2016-04-16 17:00:50 -06:00
line = input("\r> ")
2015-06-12 11:57:06 -06:00
if not line or line.lstrip().rstrip() == '':
continue
elif 'quit' in line or 'disconnect' in line:
break
split = line.split()
command = split[0]
if not command.startswith("on_") and hasattr(st, command):
getattr(st, command)(split)
else:
2016-04-16 17:00:50 -06:00
print('unrecognized command')
2015-06-12 11:57:06 -06:00
finally:
st.disconnect(None)