python-awips/awips/stomp.py
2018-09-06 12:11:36 -06:00

925 lines
34 KiB
Python

#!/usr/bin/env python
##
##
"""Stomp Protocol Connectivity
This provides basic connectivity to a message broker supporting the 'stomp' protocol.
At the moment ACK, SEND, SUBSCRIBE, UNSUBSCRIBE, BEGIN, ABORT, COMMIT, CONNECT and DISCONNECT operations
are supported.
This changes the previous version which required a listener per subscription -- now a listener object
just calls the 'addlistener' method and will receive all messages sent in response to all/any subscriptions.
(The reason for the change is that the handling of an 'ack' becomes problematic unless the listener mechanism
is decoupled from subscriptions).
Note that you must 'start' an instance of Connection to begin receiving messages. For example:
conn = stomp.Connection([('localhost', 62003)], 'myuser', 'mypass')
conn.start()
Meta-Data
---------
Author: Jason R Briggs
License: http://www.apache.org/licenses/LICENSE-2.0
Start Date: 2005/12/01
Last Revision Date: $Date: 2008/09/11 00:16 $
Notes/Attribution
-----------------
* uuid method courtesy of Carl Free Jr:
http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/213761
* patch from Andreas Schobel
* patches from Julian Scheid of Rising Sun Pictures (http://open.rsp.com.au)
* patch from Fernando
* patches from Eugene Strulyov
Updates
-------
* 2007/03/31 : (Andreas Schobel) patch to fix newlines problem in ActiveMQ 4.1
* 2007/09 : (JRB) updated to get stomp.py working in Jython as well as Python
* 2007/09/05 : (Julian Scheid) patch to allow sending custom headers
* 2007/09/18 : (JRB) changed code to use logging instead of just print. added logger for jython to work
* 2007/09/18 : (Julian Scheid) various updates, including:
- change incoming message handling so that callbacks are invoked on the listener not only for MESSAGE, but also for
CONNECTED, RECEIPT and ERROR frames.
- callbacks now get not only the payload but any headers specified by the server
- all outgoing messages now sent via a single method
- only one connection used
- change to use thread instead of threading
- sends performed on the calling thread
- receiver loop now deals with multiple messages in one received chunk of data
- added reconnection attempts and connection fail-over
- changed defaults for "user" and "passcode" to None instead of empty string (fixed transmission of those values)
- added readline support
* 2008/03/26 : (Fernando) added cStringIO for faster performance on large messages
* 2008/09/10 : (Eugene) remove lower() on headers to support case-sensitive header names
* 2008/09/11 : (JRB) fix incompatibilities with RabbitMQ, add wait for socket-connect
* 2008/10/28 : (Eugene) add jms map (from stomp1.1 ideas)
* 2008/11/25 : (Eugene) remove superfluous (incorrect) locking code
* 2009/02/05 : (JRB) remove code to replace underscores with dashes in header names (causes a problem in rabbit-mq)
* 2009/03/29 : (JRB) minor change to add logging config file
(JRB) minor change to add socket timeout, suggested by Israel
* 2009/04/01 : (Gavin) patch to change md5 to hashlib (for 2.6 compatibility)
* 2009/04/02 : (Fernando Ciciliati) fix overflow bug when waiting too long to connect to the broker
"""
from __future__ import print_function
import hashlib
import math
import random
import re
import socket
import sys
import threading
import time
import types
import xml.dom.minidom
try:
from StringIO import StringIO
except ImportError:
from io import StringIO
from functools import reduce
try:
import _thread
except ImportError:
import thread
#
# stomp.py version number
#
_version = 1.8
def _uuid( *args ):
"""
uuid courtesy of Carl Free Jr:
(http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/213761)
"""
t = int( time.time() * 1000 )
r = int( random.random() * 100000000000000000 )
try:
a = socket.gethostbyname( socket.gethostname() )
except:
# if we can't get a network address, just imagine one
a = random.random() * 100000000000000000
data = str(t) + ' ' + str(r) + ' ' + str(a) + ' ' + str(args)
md5 = hashlib.md5()
md5.update(data)
data = md5.hexdigest()
return data
class DevNullLogger(object):
"""
dummy logging class for environments without the logging module
"""
def log(self, msg):
print(msg)
def devnull(self, msg):
pass
debug = devnull
info = devnull
warning = log
error = log
critical = log
exception = log
def isEnabledFor(self, lvl):
return False
#
# add logging if available
#
try:
import logging
import logging.config
logging.config.fileConfig("stomp.log.conf")
log = logging.getLogger('root')
except:
log = DevNullLogger()
class ConnectionClosedException(Exception):
"""
Raised in the receiver thread when the connection has been closed
by the server.
"""
pass
class NotConnectedException(Exception):
"""
Raised by Connection.__send_frame when there is currently no server
connection.
"""
pass
class ConnectionListener(object):
"""
This class should be used as a base class for objects registered
using Connection.add_listener().
"""
def on_connecting(self, host_and_port):
"""
Called by the STOMP connection once a TCP/IP connection to the
STOMP server has been established or re-established. Note that
at this point, no connection has been established on the STOMP
protocol level. For this, you need to invoke the "connect"
method on the connection.
\param host_and_port a tuple containing the host name and port
number to which the connection has been established.
"""
pass
def on_connected(self, headers, body):
"""
Called by the STOMP connection when a CONNECTED frame is
received, that is after a connection has been established or
re-established.
\param headers a dictionary containing all headers sent by the
server as key/value pairs.
\param body the frame's payload. This is usually empty for
CONNECTED frames.
"""
pass
def on_disconnected(self):
"""
Called by the STOMP connection when a TCP/IP connection to the
STOMP server has been lost. No messages should be sent via
the connection until it has been reestablished.
"""
pass
def on_message(self, headers, body):
"""
Called by the STOMP connection when a MESSAGE frame is
received.
\param headers a dictionary containing all headers sent by the
server as key/value pairs.
\param body the frame's payload - the message body.
"""
pass
def on_receipt(self, headers, body):
"""
Called by the STOMP connection when a RECEIPT frame is
received, sent by the server if requested by the client using
the 'receipt' header.
\param headers a dictionary containing all headers sent by the
server as key/value pairs.
\param body the frame's payload. This is usually empty for
RECEIPT frames.
"""
pass
def on_error(self, headers, body):
"""
Called by the STOMP connection when an ERROR frame is
received.
\param headers a dictionary containing all headers sent by the
server as key/value pairs.
\param body the frame's payload - usually a detailed error
description.
"""
pass
class Connection(object):
"""
Represents a STOMP client connection.
"""
def __init__(self,
host_and_ports = [ ('localhost', 61613) ],
user = None,
passcode = None,
prefer_localhost = True,
try_loopback_connect = True,
reconnect_sleep_initial = 0.1,
reconnect_sleep_increase = 0.5,
reconnect_sleep_jitter = 0.1,
reconnect_sleep_max = 60.0):
"""
Initialize and start this connection.
\param host_and_ports
a list of (host, port) tuples.
\param prefer_localhost
if True and the local host is mentioned in the (host,
port) tuples, try to connect to this first
\param try_loopback_connect
if True and the local host is found in the host
tuples, try connecting to it using loopback interface
(127.0.0.1)
\param reconnect_sleep_initial
initial delay in seconds to wait before reattempting
to establish a connection if connection to any of the
hosts fails.
\param reconnect_sleep_increase
factor by which the sleep delay is increased after
each connection attempt. For example, 0.5 means
to wait 50% longer than before the previous attempt,
1.0 means wait twice as long, and 0.0 means keep
the delay constant.
\param reconnect_sleep_max
maximum delay between connection attempts, regardless
of the reconnect_sleep_increase.
\param reconnect_sleep_jitter
random additional time to wait (as a percentage of
the time determined using the previous parameters)
between connection attempts in order to avoid
stampeding. For example, a value of 0.1 means to wait
an extra 0%-10% (randomly determined) of the delay
calculated using the previous three parameters.
"""
sorted_host_and_ports = []
sorted_host_and_ports.extend(host_and_ports)
# If localhost is preferred, make sure all (host, port) tuples
# that refer to the local host come first in the list
if prefer_localhost:
def is_local_host(host):
return host in Connection.__localhost_names
sorted_host_and_ports.sort(lambda x, y: (int(is_local_host(y[0]))
- int(is_local_host(x[0]))))
# If the user wishes to attempt connecting to local ports
# using the loopback interface, for each (host, port) tuple
# referring to a local host, add an entry with the host name
# replaced by 127.0.0.1 if it doesn't exist already
loopback_host_and_ports = []
if try_loopback_connect:
for host_and_port in sorted_host_and_ports:
if is_local_host(host_and_port[0]):
port = host_and_port[1]
if (not ("127.0.0.1", port) in sorted_host_and_ports
and not ("localhost", port) in sorted_host_and_ports):
loopback_host_and_ports.append(("127.0.0.1", port))
# Assemble the final, possibly sorted list of (host, port) tuples
self.__host_and_ports = []
self.__host_and_ports.extend(loopback_host_and_ports)
self.__host_and_ports.extend(sorted_host_and_ports)
self.__recvbuf = ''
self.__listeners = [ ]
self.__reconnect_sleep_initial = reconnect_sleep_initial
self.__reconnect_sleep_increase = reconnect_sleep_increase
self.__reconnect_sleep_jitter = reconnect_sleep_jitter
self.__reconnect_sleep_max = reconnect_sleep_max
self.__connect_headers = {}
if user is not None and passcode is not None:
self.__connect_headers['login'] = user
self.__connect_headers['passcode'] = passcode
self.__socket = None
self.__current_host_and_port = None
self.__receiver_thread_exit_condition = threading.Condition()
self.__receiver_thread_exited = False
#
# Manage the connection
#
def start(self):
"""
Start the connection. This should be called after all
listeners have been registered. If this method is not called,
no frames will be received by the connection.
"""
self.__running = True
self.__attempt_connection()
_thread.start_new_thread(self.__receiver_loop, ())
def stop(self):
"""
Stop the connection. This is equivalent to calling
disconnect() but will do a clean shutdown by waiting for the
receiver thread to exit.
"""
self.disconnect()
self.__receiver_thread_exit_condition.acquire()
if not self.__receiver_thread_exited:
self.__receiver_thread_exit_condition.wait()
self.__receiver_thread_exit_condition.release()
def get_host_and_port(self):
"""
Return a (host, port) tuple indicating which STOMP host and
port is currently connected, or None if there is currently no
connection.
"""
return self.__current_host_and_port
def is_connected(self):
try:
return self.__socket is not None and self.__socket.getsockname()[1] != 0
except socket.error:
return False
#
# Manage objects listening to incoming frames
#
def add_listener(self, listener):
self.__listeners.append(listener)
def remove_listener(self, listener):
self.__listeners.remove(listener)
#
# STOMP transmissions
#
def subscribe(self, headers={}, **keyword_headers):
self.__send_frame_helper('SUBSCRIBE', '', self.__merge_headers([headers, keyword_headers]), [ 'destination' ])
def unsubscribe(self, headers={}, **keyword_headers):
self.__send_frame_helper('UNSUBSCRIBE', '', self.__merge_headers([headers, keyword_headers]), [ ('destination', 'id') ])
def send(self, message='', headers={}, **keyword_headers):
if '\x00' in message:
content_length_headers = {'content-length': len(message)}
else:
content_length_headers = {}
self.__send_frame_helper('SEND', message, self.__merge_headers([headers,
keyword_headers,
content_length_headers]), [ 'destination' ])
def ack(self, headers={}, **keyword_headers):
self.__send_frame_helper('ACK', '', self.__merge_headers([headers, keyword_headers]), [ 'message-id' ])
def begin(self, headers={}, **keyword_headers):
use_headers = self.__merge_headers([headers, keyword_headers])
if not 'transaction' in list(use_headers.keys()):
use_headers['transaction'] = _uuid()
self.__send_frame_helper('BEGIN', '', use_headers, [ 'transaction' ])
return use_headers['transaction']
def abort(self, headers={}, **keyword_headers):
self.__send_frame_helper('ABORT', '', self.__merge_headers([headers, keyword_headers]), [ 'transaction' ])
def commit(self, headers={}, **keyword_headers):
self.__send_frame_helper('COMMIT', '', self.__merge_headers([headers, keyword_headers]), [ 'transaction' ])
def connect(self, headers={}, **keyword_headers):
if 'wait' in keyword_headers and keyword_headers['wait']:
while not self.is_connected(): time.sleep(0.1)
del keyword_headers['wait']
self.__send_frame_helper('CONNECT', '', self.__merge_headers([self.__connect_headers, headers, keyword_headers]), [ ])
def disconnect(self, headers={}, **keyword_headers):
self.__send_frame_helper('DISCONNECT', '', self.__merge_headers([self.__connect_headers, headers, keyword_headers]), [ ])
self.__running = False
if hasattr(socket, 'SHUT_RDWR'):
self.__socket.shutdown(socket.SHUT_RDWR)
if self.__socket:
self.__socket.close()
self.__current_host_and_port = None
# ========= PRIVATE MEMBERS =========
# List of all host names (unqualified, fully-qualified, and IP
# addresses) that refer to the local host (both loopback interface
# and external interfaces). This is used for determining
# preferred targets.
__localhost_names = [ "localhost",
"127.0.0.1",
socket.gethostbyname(socket.gethostname()),
socket.gethostname(),
socket.getfqdn(socket.gethostname()) ]
#
# Used to parse STOMP header lines in the format "key:value",
#
__header_line_re = re.compile('(?P<key>[^:]+)[:](?P<value>.*)')
#
# Used to parse the STOMP "content-length" header lines,
#
__content_length_re = re.compile('^content-length[:]\\s*(?P<value>[0-9]+)', re.MULTILINE)
def __merge_headers(self, header_map_list):
"""
Helper function for combining multiple header maps into one.
Any underscores ('_') in header names (keys) will be replaced by dashes ('-').
"""
headers = {}
for header_map in header_map_list:
for header_key in header_map.keys():
headers[header_key] = header_map[header_key]
return headers
def __convert_dict(self, payload):
"""
Encode python dictionary as <map>...</map> structure.
"""
xmlStr = "<map>\n"
for key in payload:
xmlStr += "<entry>\n"
xmlStr += "<string>%s</string>" % key
xmlStr += "<string>%s</string>" % payload[key]
xmlStr += "</entry>\n"
xmlStr += "</map>"
return xmlStr
def __send_frame_helper(self, command, payload, headers, required_header_keys):
"""
Helper function for sending a frame after verifying that a
given set of headers are present.
\param command the command to send
\param payload the frame's payload
\param headers a dictionary containing the frame's headers
\param required_header_keys a sequence enumerating all
required header keys. If an element in this sequence is itself
a tuple, that tuple is taken as a list of alternatives, one of
which must be present.
\throws ArgumentError if one of the required header keys is
not present in the header map.
"""
for required_header_key in required_header_keys:
if type(required_header_key) == tuple:
found_alternative = False
for alternative in required_header_key:
if alternative in list(headers.keys()):
found_alternative = True
if not found_alternative:
raise KeyError("Command %s requires one of the following headers: %s" % (command, str(required_header_key)))
elif not required_header_key in list(headers.keys()):
raise KeyError("Command %s requires header %r" % (command, required_header_key))
self.__send_frame(command, headers, payload)
def __send_frame(self, command, headers={}, payload=''):
"""
Send a STOMP frame.
"""
if type(payload) == dict:
headers["transformation"] = "jms-map-xml"
payload = self.__convert_dict(payload)
if self.__socket is not None:
frame = '%s\n%s\n%s\x00' % (command,
reduce(lambda accu, key: accu + ('%s:%s\n' % (key, list(headers[key]))), headers.keys(), ''),
payload)
self.__socket.sendall(frame)
log.debug("Sent frame: type=%s, headers=%r, body=%r" % (command, headers, payload))
else:
raise NotConnectedException()
def __receiver_loop(self):
"""
Main loop listening for incoming data.
"""
try:
try:
threading.currentThread().setName("StompReceiver")
while self.__running:
log.debug('starting receiver loop')
if self.__socket is None:
break
try:
try:
for listener in self.__listeners:
if hasattr(listener, 'on_connecting'):
listener.on_connecting(self.__current_host_and_port)
while self.__running:
frames = self.__read()
for frame in frames:
(frame_type, headers, body) = self.__parse_frame(frame)
log.debug("Received frame: result=%r, headers=%r, body=%r" % (frame_type, headers, body))
frame_type = frame_type.lower()
if frame_type in [ 'connected',
'message',
'receipt',
'error' ]:
for listener in self.__listeners:
if hasattr(listener, 'on_%s' % frame_type):
eval('listener.on_%s(headers, body)' % frame_type)
else:
log.debug('listener %s has no such method on_%s' % (listener, frame_type))
else:
log.warning('Unknown response frame type: "%s" (frame length was %d)' % (frame_type, len(frame)))
finally:
try:
self.__socket.close()
except:
pass # ignore errors when attempting to close socket
self.__socket = None
self.__current_host_and_port = None
except ConnectionClosedException:
if self.__running:
log.error("Lost connection")
# Notify listeners
for listener in self.__listeners:
if hasattr(listener, 'on_disconnected'):
listener.on_disconnected()
# Clear out any half-received messages after losing connection
self.__recvbuf = ''
continue
else:
break
except:
log.exception("An unhandled exception was encountered in the stomp receiver loop")
finally:
self.__receiver_thread_exit_condition.acquire()
self.__receiver_thread_exited = True
self.__receiver_thread_exit_condition.notifyAll()
self.__receiver_thread_exit_condition.release()
def __read(self):
"""
Read the next frame(s) from the socket.
"""
fastbuf = StringIO()
while self.__running:
try:
c = self.__socket.recv(1024)
except:
c = ''
if len(c) == 0:
raise ConnectionClosedException
fastbuf.write(c)
if '\x00' in c:
break
self.__recvbuf += fastbuf.getvalue()
fastbuf.close()
result = []
if len(self.__recvbuf) > 0 and self.__running:
while True:
pos = self.__recvbuf.find('\x00')
if pos >= 0:
frame = self.__recvbuf[0:pos]
preamble_end = frame.find('\n\n')
if preamble_end >= 0:
content_length_match = Connection.__content_length_re.search(frame[0:preamble_end])
if content_length_match:
content_length = int(content_length_match.group('value'))
content_offset = preamble_end + 2
frame_size = content_offset + content_length
if frame_size > len(frame):
# Frame contains NUL bytes, need to
# read more
if frame_size < len(self.__recvbuf):
pos = frame_size
frame = self.__recvbuf[0:pos]
else:
# Haven't read enough data yet,
# exit loop and wait for more to
# arrive
break
result.append(frame)
self.__recvbuf = self.__recvbuf[pos+1:]
else:
break
return result
def __transform(self, body, transType):
"""
Perform body transformation. Currently, the only supported transformation is
'jms-map-xml', which converts a map into python dictionary. This can be extended
to support other transformation types.
The body has the following format:
<map>
<entry>
<string>name</string>
<string>Dejan</string>
</entry>
<entry>
<string>city</string>
<string>Belgrade</string>
</entry>
</map>
(see http://docs.codehaus.org/display/STOMP/Stomp+v1.1+Ideas)
"""
if transType != 'jms-map-xml':
return body
try:
entries = {}
doc = xml.dom.minidom.parseString(body)
rootElem = doc.documentElement
for entryElem in rootElem.getElementsByTagName("entry"):
pair = []
for node in entryElem.childNodes:
if not isinstance(node, xml.dom.minidom.Element): continue
pair.append(node.firstChild.nodeValue)
assert len(pair) == 2
entries[pair[0]] = pair[1]
return entries
except Exception as ex:
# unable to parse message. return original
return body
def __parse_frame(self, frame):
"""
Parse a STOMP frame into a (frame_type, headers, body) tuple,
where frame_type is the frame type as a string (e.g. MESSAGE),
headers is a map containing all header key/value pairs, and
body is a string containing the frame's payload.
"""
preamble_end = frame.find('\n\n')
preamble = frame[0:preamble_end]
preamble_lines = preamble.split('\n')
body = frame[preamble_end+2:]
# Skip any leading newlines
first_line = 0
while first_line < len(preamble_lines) and len(preamble_lines[first_line]) == 0:
first_line += 1
# Extract frame type
frame_type = preamble_lines[first_line]
# Put headers into a key/value map
headers = {}
for header_line in preamble_lines[first_line+1:]:
header_match = Connection.__header_line_re.match(header_line)
if header_match:
headers[header_match.group('key')] = header_match.group('value')
if 'transformation' in headers:
body = self.__transform(body, headers['transformation'])
return (frame_type, headers, body)
def __attempt_connection(self):
"""
Try connecting to the (host, port) tuples specified at construction time.
"""
sleep_exp = 1
while self.__running and self.__socket is None:
for host_and_port in self.__host_and_ports:
try:
log.debug("Attempting connection to host %s, port %s" % host_and_port)
self.__socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.__socket.settimeout(None)
self.__socket.connect(host_and_port)
self.__current_host_and_port = host_and_port
log.info("Established connection to host %s, port %s" % host_and_port)
break
except socket.error:
self.__socket = None
if type(sys.exc_info()[1]) == tuple:
exc = sys.exc_info()[1][1]
else:
exc = sys.exc_info()[1]
log.warning("Could not connect to host %s, port %s: %s" % (host_and_port[0], host_and_port[1], exc))
if self.__socket is None:
sleep_duration = (min(self.__reconnect_sleep_max,
((self.__reconnect_sleep_initial / (1.0 + self.__reconnect_sleep_increase))
* math.pow(1.0 + self.__reconnect_sleep_increase, sleep_exp)))
* (1.0 + random.random() * self.__reconnect_sleep_jitter))
sleep_end = time.time() + sleep_duration
log.debug("Sleeping for %.1f seconds before attempting reconnect" % sleep_duration)
while self.__running and time.time() < sleep_end:
time.sleep(0.2)
if sleep_duration < self.__reconnect_sleep_max:
sleep_exp += 1
#
# command line testing
#
if __name__ == '__main__':
# If the readline module is available, make command input easier
try:
import readline
def stomp_completer(text, state):
commands = [ 'subscribe', 'unsubscribe',
'send', 'ack',
'begin', 'abort', 'commit',
'connect', 'disconnect'
]
for command in commands[state:]:
if command.startswith(text):
return "%s " % command
return None
readline.parse_and_bind("tab: complete")
readline.set_completer(stomp_completer)
readline.set_completer_delims("")
except ImportError:
pass # ignore unavailable readline module
class StompTester(object):
def __init__(self, host='localhost', port=61613, user='', passcode=''):
self.c = Connection([(host, port)], user, passcode)
self.c.add_listener(self)
self.c.start()
def __print_async(self, frame_type, headers, body):
print("\r \r",)
print(frame_type)
for header_key in list(headers.keys()):
print('%s: %s' % (header_key, headers[header_key]))
print("")
print(body)
print('> ',)
sys.stdout.flush()
def on_connecting(self, host_and_port):
self.c.connect(wait=True)
def on_disconnected(self):
print("lost connection")
def on_message(self, headers, body):
self.__print_async("MESSAGE", headers, body)
def on_error(self, headers, body):
self.__print_async("ERROR", headers, body)
def on_receipt(self, headers, body):
self.__print_async("RECEIPT", headers, body)
def on_connected(self, headers, body):
self.__print_async("CONNECTED", headers, body)
def ack(self, args):
if len(args) < 3:
self.c.ack(message_id=args[1])
else:
self.c.ack(message_id=args[1], transaction=args[2])
def abort(self, args):
self.c.abort(transaction=args[1])
def begin(self, args):
print('transaction id: %s' % self.c.begin())
def commit(self, args):
if len(args) < 2:
print('expecting: commit <transid>')
else:
print('committing %s' % args[1])
self.c.commit(transaction=args[1])
def disconnect(self, args):
try:
self.c.disconnect()
except NotConnectedException:
pass # ignore if no longer connected
def send(self, args):
if len(args) < 3:
print('expecting: send <destination> <message>')
else:
self.c.send(destination=args[1], message=' '.join(args[2:]))
def sendtrans(self, args):
if len(args) < 3:
print('expecting: sendtrans <destination> <transid> <message>')
else:
self.c.send(destination=args[1], message="%s\n" % ' '.join(args[3:]), transaction=args[2])
def subscribe(self, args):
if len(args) < 2:
print('expecting: subscribe <destination> [ack]')
elif len(args) > 2:
print('subscribing to "%s" with acknowledge set to "%s"' % (args[1], args[2]))
self.c.subscribe(destination=args[1], ack=args[2])
else:
print('subscribing to "%s" with auto acknowledge' % args[1])
self.c.subscribe(destination=args[1], ack='auto')
def unsubscribe(self, args):
if len(args) < 2:
print('expecting: unsubscribe <destination>')
else:
print('unsubscribing from "%s"' % args[1])
self.c.unsubscribe(destination=args[1])
if len(sys.argv) > 5:
print('USAGE: stomp.py [host] [port] [user] [passcode]')
sys.exit(1)
if len(sys.argv) >= 2:
host = sys.argv[1]
else:
host = "localhost"
if len(sys.argv) >= 3:
port = int(sys.argv[2])
else:
port = 61613
if len(sys.argv) >= 5:
user = sys.argv[3]
passcode = sys.argv[4]
else:
user = None
passcode = None
st = StompTester(host, port, user, passcode)
try:
while True:
line = input("\r> ")
if not line or line.lstrip().rstrip() == '':
continue
elif 'quit' in line or 'disconnect' in line:
break
split = line.split()
command = split[0]
if not command.startswith("on_") and hasattr(st, command):
getattr(st, command)(split)
else:
print('unrecognized command')
finally:
st.disconnect(None)