mirror of
https://github.com/Unidata/python-awips.git
synced 2025-02-23 14:57:56 -05:00
Cleanup unused and unnecessary files
This commit is contained in:
parent
7a88a6d3db
commit
900a03958d
9 changed files with 6 additions and 1212 deletions
|
@ -1,52 +0,0 @@
|
|||
##
|
||||
##
|
||||
|
||||
|
||||
#
|
||||
# Pure python logging mechanism for logging to AlertViz from
|
||||
# pure python (ie not JEP). DO NOT USE IN PYTHON CALLED
|
||||
# FROM JAVA.
|
||||
#
|
||||
#
|
||||
# SOFTWARE HISTORY
|
||||
#
|
||||
# Date Ticket# Engineer Description
|
||||
# ------------ ---------- ----------- --------------------------
|
||||
# 08/18/10 njensen Initial Creation.
|
||||
#
|
||||
#
|
||||
#
|
||||
|
||||
import logging
|
||||
from . import NotificationMessage
|
||||
|
||||
class AlertVizHandler(logging.Handler):
|
||||
|
||||
def __init__(self, host='localhost', port=61999, category='LOCAL', source='ANNOUNCER', level=logging.NOTSET):
|
||||
logging.Handler.__init__(self, level)
|
||||
self._category = category
|
||||
self._host = host
|
||||
self._port = port
|
||||
self._source = source
|
||||
|
||||
|
||||
def emit(self, record):
|
||||
"Implements logging.Handler's interface. Record argument is a logging.LogRecord."
|
||||
priority = None
|
||||
if record.levelno >= 50:
|
||||
priority = 'CRITICAL'
|
||||
elif record.levelno >= 40:
|
||||
priority = 'SIGNIFICANT'
|
||||
elif record.levelno >= 30:
|
||||
priority = 'PROBLEM'
|
||||
elif record.levelno >= 20:
|
||||
priority = 'EVENTA'
|
||||
elif record.levelno >= 10:
|
||||
priority = 'EVENTB'
|
||||
else:
|
||||
priority = 'VERBOSE'
|
||||
|
||||
msg = self.format(record)
|
||||
|
||||
notify = NotificationMessage.NotificationMessage(self._host, self._port, msg, priority, self._category, self._source)
|
||||
notify.send()
|
|
@ -1,39 +0,0 @@
|
|||
##
|
||||
##
|
||||
|
||||
#
|
||||
# A set of utility functions for dealing with configuration files.
|
||||
#
|
||||
#
|
||||
#
|
||||
# SOFTWARE HISTORY
|
||||
#
|
||||
# Date Ticket# Engineer Description
|
||||
# ------------ ---------- ----------- --------------------------
|
||||
# 09/27/10 dgilling Initial Creation.
|
||||
#
|
||||
#
|
||||
#
|
||||
|
||||
|
||||
def parseKeyValueFile(fileName):
|
||||
propDict= dict()
|
||||
|
||||
try:
|
||||
propFile= open(fileName, "rU")
|
||||
for propLine in propFile:
|
||||
propDef= propLine.strip()
|
||||
if len(propDef) == 0:
|
||||
continue
|
||||
if propDef[0] in ( '#' ):
|
||||
continue
|
||||
punctuation= [ propDef.find(c) for c in ':= ' ] + [ len(propDef) ]
|
||||
found= min( [ pos for pos in punctuation if pos != -1 ] )
|
||||
name= propDef[:found].rstrip()
|
||||
value= propDef[found:].lstrip(":= ").rstrip()
|
||||
propDict[name]= value
|
||||
propFile.close()
|
||||
except:
|
||||
pass
|
||||
|
||||
return propDict
|
|
@ -1,6 +1,3 @@
|
|||
# #
|
||||
# #
|
||||
|
||||
#
|
||||
# Functions for converting between the various "Java" dynamic serialize types
|
||||
# used by EDEX to the native python time datetime.
|
||||
|
|
|
@ -1,160 +0,0 @@
|
|||
import ctypes
|
||||
from . import stomp
|
||||
import socket
|
||||
import sys
|
||||
import time
|
||||
import threading
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
from . import ThriftClient
|
||||
from dynamicserialize.dstypes.com.raytheon.uf.common.alertviz import AlertVizRequest
|
||||
|
||||
#
|
||||
# Provides a capability of constructing notification messages and sending
|
||||
# them to a STOMP data source.
|
||||
#
|
||||
#
|
||||
# SOFTWARE HISTORY
|
||||
#
|
||||
# Date Ticket# Engineer Description
|
||||
# ------------ ---------- ----------- --------------------------
|
||||
# 09/30/08 chammack Initial Creation.
|
||||
# 11/03/10 5849 cjeanbap Moved to awips package from
|
||||
# com.raytheon.uf.tools.cli
|
||||
# 01/07/11 5645 cjeanbap Added audio file to Status Message.
|
||||
# 05/27/11 3050 cjeanbap Added if-statement to check Priority
|
||||
# value
|
||||
# 07/27/15 4654 skorolev Added filters
|
||||
# 11/11/15 5120 rferrel Cannot serialize empty filters.
|
||||
#
|
||||
class NotificationMessage:
|
||||
|
||||
priorityMap = {
|
||||
0: 'CRITICAL',
|
||||
1: 'SIGNIFICANT',
|
||||
2: 'PROBLEM',
|
||||
3: 'EVENTA',
|
||||
4: 'EVENTB',
|
||||
5: 'VERBOSE'}
|
||||
|
||||
def __init__(self, host='localhost', port=61999, message='', priority='PROBLEM', category="LOCAL", source="ANNOUNCER", audioFile="NONE", filters=None):
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.message = message
|
||||
self.audioFile = audioFile
|
||||
self.source = source
|
||||
self.category = category
|
||||
self.filters = filters
|
||||
|
||||
priorityInt = None
|
||||
|
||||
try:
|
||||
priorityInt = int(priority)
|
||||
except:
|
||||
pass
|
||||
|
||||
if priorityInt is None:
|
||||
#UFStatus.java contains mapping of Priority to Logging level mapping
|
||||
if priority == 'CRITICAL' or priority == 'FATAL':
|
||||
priorityInt = int(0)
|
||||
elif priority == 'SIGNIFICANT' or priority == 'ERROR':
|
||||
priorityInt = int(1)
|
||||
elif priority == 'PROBLEM' or priority == 'WARN':
|
||||
priorityInt = int(2)
|
||||
elif priority == 'EVENTA' or priority == 'INFO':
|
||||
priorityInt = int(3)
|
||||
elif priority == 'EVENTB':
|
||||
priorityInt = int(4)
|
||||
elif priority == 'VERBOSE' or priority == 'DEBUG':
|
||||
priorityInt = int(5)
|
||||
|
||||
if (priorityInt < 0 or priorityInt > 5):
|
||||
print("Error occurred, supplied an invalid Priority value: " + str(priorityInt))
|
||||
print("Priority values are 0, 1, 2, 3, 4 and 5.")
|
||||
sys.exit(1)
|
||||
|
||||
if priorityInt is not None:
|
||||
self.priority = self.priorityMap[priorityInt]
|
||||
else:
|
||||
self.priority = priority
|
||||
|
||||
def connection_timeout(self, connection):
|
||||
if (connection is not None and not connection.is_connected()):
|
||||
print("Connection Retry Timeout")
|
||||
for tid, tobj in list(threading._active.items()):
|
||||
if tobj.name is "MainThread":
|
||||
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(SystemExit))
|
||||
if res != 0 and res != 1:
|
||||
# problem, reset state
|
||||
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0)
|
||||
|
||||
def send(self):
|
||||
# depending on the value of the port number indicates the distribution
|
||||
# of the message to AlertViz
|
||||
# 9581 is global distribution thru ThriftClient to Edex
|
||||
# 61999 is local distribution
|
||||
if (int(self.port) == 61999):
|
||||
# use stomp.py
|
||||
conn = stomp.Connection(host_and_ports=[(self.host, 61999)])
|
||||
timeout = threading.Timer(5.0, self.connection_timeout, [conn])
|
||||
|
||||
try:
|
||||
timeout.start();
|
||||
conn.start()
|
||||
finally:
|
||||
timeout.cancel()
|
||||
|
||||
conn.connect()
|
||||
|
||||
sm = ET.Element("statusMessage")
|
||||
sm.set("machine", socket.gethostname())
|
||||
sm.set("priority", self.priority)
|
||||
sm.set("category", self.category)
|
||||
sm.set("sourceKey", self.source)
|
||||
sm.set("audioFile", self.audioFile)
|
||||
if self.filters is not None and len(self.filters) > 0:
|
||||
sm.set("filters", self.filters)
|
||||
msg = ET.SubElement(sm, "message")
|
||||
msg.text = self.message
|
||||
# details = ET.SubElement(sm, "details")
|
||||
msg = ET.tostring(sm, "UTF-8")
|
||||
|
||||
try :
|
||||
conn.send(msg, destination='/queue/messages')
|
||||
time.sleep(2)
|
||||
finally:
|
||||
conn.stop()
|
||||
else:
|
||||
# use ThriftClient
|
||||
alertVizRequest = createRequest(self.message, self.priority, self.source, self.category, self.audioFile, self.filters)
|
||||
thriftClient = ThriftClient.ThriftClient(self.host, self.port, "/services")
|
||||
|
||||
serverResponse = None
|
||||
try:
|
||||
serverResponse = thriftClient.sendRequest(alertVizRequest)
|
||||
except Exception as ex:
|
||||
print("Caught exception submitting AlertVizRequest: ", str(ex))
|
||||
|
||||
if (serverResponse != "None"):
|
||||
print("Error occurred submitting Notification Message to AlertViz receiver: ", serverResponse)
|
||||
sys.exit(1)
|
||||
else:
|
||||
print("Response: " + str(serverResponse))
|
||||
|
||||
def createRequest(message, priority, source, category, audioFile, filters):
|
||||
obj = AlertVizRequest()
|
||||
|
||||
obj.setMachine(socket.gethostname())
|
||||
obj.setPriority(priority)
|
||||
obj.setCategory(category)
|
||||
obj.setSourceKey(source)
|
||||
obj.setMessage(message)
|
||||
if (audioFile is not None):
|
||||
obj.setAudioFile(audioFile)
|
||||
else:
|
||||
obj.setAudioFile('\0')
|
||||
obj.setFilters(filters)
|
||||
return obj
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
|
@ -1,6 +1,3 @@
|
|||
##
|
||||
##
|
||||
|
||||
#
|
||||
# Provides a Python-based interface for subscribing to qpid queues and topics.
|
||||
#
|
||||
|
@ -26,6 +23,7 @@ import zlib
|
|||
from Queue import Empty
|
||||
from qpid.exceptions import Closed
|
||||
|
||||
|
||||
class QpidSubscriber:
|
||||
|
||||
def __init__(self, host='127.0.0.1', port=5672, decompress=False, ssl=None):
|
||||
|
|
|
@ -1,7 +1,3 @@
|
|||
##
|
||||
##
|
||||
|
||||
|
||||
#
|
||||
# __init__.py for awips package
|
||||
#
|
||||
|
|
|
@ -74,6 +74,7 @@ from qpid.datatypes import Message, uuid4
|
|||
QPID_USERNAME = 'guest'
|
||||
QPID_PASSWORD = 'guest'
|
||||
|
||||
|
||||
class IngestViaQPID:
|
||||
def __init__(self, host='localhost', port=5672, ssl=None):
|
||||
'''
|
||||
|
|
921
awips/stomp.py
921
awips/stomp.py
|
@ -1,921 +0,0 @@
|
|||
from __future__ import print_function
|
||||
import hashlib
|
||||
import math
|
||||
import random
|
||||
import re
|
||||
import socket
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import xml.dom.minidom
|
||||
try:
|
||||
from StringIO import StringIO
|
||||
except ImportError:
|
||||
from io import StringIO
|
||||
from functools import reduce
|
||||
try:
|
||||
import _thread
|
||||
except ImportError:
|
||||
import thread as _thread
|
||||
|
||||
"""Stomp Protocol Connectivity
|
||||
|
||||
This provides basic connectivity to a message broker supporting the 'stomp' protocol.
|
||||
At the moment ACK, SEND, SUBSCRIBE, UNSUBSCRIBE, BEGIN, ABORT, COMMIT, CONNECT and DISCONNECT operations
|
||||
are supported.
|
||||
|
||||
This changes the previous version which required a listener per subscription -- now a listener object
|
||||
just calls the 'addlistener' method and will receive all messages sent in response to all/any subscriptions.
|
||||
(The reason for the change is that the handling of an 'ack' becomes problematic unless the listener mechanism
|
||||
is decoupled from subscriptions).
|
||||
|
||||
Note that you must 'start' an instance of Connection to begin receiving messages. For example:
|
||||
|
||||
conn = stomp.Connection([('localhost', 62003)], 'myuser', 'mypass')
|
||||
conn.start()
|
||||
|
||||
Meta-Data
|
||||
---------
|
||||
Author: Jason R Briggs
|
||||
License: http://www.apache.org/licenses/LICENSE-2.0
|
||||
Start Date: 2005/12/01
|
||||
Last Revision Date: $Date: 2008/09/11 00:16 $
|
||||
|
||||
Notes/Attribution
|
||||
-----------------
|
||||
* uuid method courtesy of Carl Free Jr:
|
||||
http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/213761
|
||||
* patch from Andreas Schobel
|
||||
* patches from Julian Scheid of Rising Sun Pictures (http://open.rsp.com.au)
|
||||
* patch from Fernando
|
||||
* patches from Eugene Strulyov
|
||||
|
||||
Updates
|
||||
-------
|
||||
* 2007/03/31 : (Andreas Schobel) patch to fix newlines problem in ActiveMQ 4.1
|
||||
* 2007/09 : (JRB) updated to get stomp.py working in Jython as well as Python
|
||||
* 2007/09/05 : (Julian Scheid) patch to allow sending custom headers
|
||||
* 2007/09/18 : (JRB) changed code to use logging instead of just print. added logger for jython to work
|
||||
* 2007/09/18 : (Julian Scheid) various updates, including:
|
||||
- change incoming message handling so that callbacks are invoked on the listener not only for MESSAGE, but also for
|
||||
CONNECTED, RECEIPT and ERROR frames.
|
||||
- callbacks now get not only the payload but any headers specified by the server
|
||||
- all outgoing messages now sent via a single method
|
||||
- only one connection used
|
||||
- change to use thread instead of threading
|
||||
- sends performed on the calling thread
|
||||
- receiver loop now deals with multiple messages in one received chunk of data
|
||||
- added reconnection attempts and connection fail-over
|
||||
- changed defaults for "user" and "passcode" to None instead of empty string (fixed transmission of those values)
|
||||
- added readline support
|
||||
* 2008/03/26 : (Fernando) added cStringIO for faster performance on large messages
|
||||
* 2008/09/10 : (Eugene) remove lower() on headers to support case-sensitive header names
|
||||
* 2008/09/11 : (JRB) fix incompatibilities with RabbitMQ, add wait for socket-connect
|
||||
* 2008/10/28 : (Eugene) add jms map (from stomp1.1 ideas)
|
||||
* 2008/11/25 : (Eugene) remove superfluous (incorrect) locking code
|
||||
* 2009/02/05 : (JRB) remove code to replace underscores with dashes in header names (causes a problem in rabbit-mq)
|
||||
* 2009/03/29 : (JRB) minor change to add logging config file
|
||||
(JRB) minor change to add socket timeout, suggested by Israel
|
||||
* 2009/04/01 : (Gavin) patch to change md5 to hashlib (for 2.6 compatibility)
|
||||
* 2009/04/02 : (Fernando Ciciliati) fix overflow bug when waiting too long to connect to the broker
|
||||
|
||||
"""
|
||||
|
||||
#
|
||||
# stomp.py version number
|
||||
#
|
||||
_version = 1.8
|
||||
|
||||
|
||||
def _uuid( *args ):
|
||||
"""
|
||||
uuid courtesy of Carl Free Jr:
|
||||
(http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/213761)
|
||||
"""
|
||||
|
||||
t = int( time.time() * 1000 )
|
||||
r = int( random.random() * 100000000000000000 )
|
||||
|
||||
try:
|
||||
a = socket.gethostbyname( socket.gethostname() )
|
||||
except:
|
||||
# if we can't get a network address, just imagine one
|
||||
a = random.random() * 100000000000000000
|
||||
data = str(t) + ' ' + str(r) + ' ' + str(a) + ' ' + str(args)
|
||||
md5 = hashlib.md5()
|
||||
md5.update(data)
|
||||
data = md5.hexdigest()
|
||||
return data
|
||||
|
||||
|
||||
class DevNullLogger(object):
|
||||
"""
|
||||
dummy logging class for environments without the logging module
|
||||
"""
|
||||
def log(self, msg):
|
||||
print(msg)
|
||||
|
||||
def devnull(self, msg):
|
||||
pass
|
||||
|
||||
debug = devnull
|
||||
info = devnull
|
||||
warning = log
|
||||
error = log
|
||||
critical = log
|
||||
exception = log
|
||||
|
||||
def isEnabledFor(self, lvl):
|
||||
return False
|
||||
|
||||
|
||||
#
|
||||
# add logging if available
|
||||
#
|
||||
try:
|
||||
import logging
|
||||
import logging.config
|
||||
logging.config.fileConfig("stomp.log.conf")
|
||||
log = logging.getLogger('root')
|
||||
except:
|
||||
log = DevNullLogger()
|
||||
|
||||
|
||||
class ConnectionClosedException(Exception):
|
||||
"""
|
||||
Raised in the receiver thread when the connection has been closed
|
||||
by the server.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class NotConnectedException(Exception):
|
||||
"""
|
||||
Raised by Connection.__send_frame when there is currently no server
|
||||
connection.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class ConnectionListener(object):
|
||||
"""
|
||||
This class should be used as a base class for objects registered
|
||||
using Connection.add_listener().
|
||||
"""
|
||||
def on_connecting(self, host_and_port):
|
||||
"""
|
||||
Called by the STOMP connection once a TCP/IP connection to the
|
||||
STOMP server has been established or re-established. Note that
|
||||
at this point, no connection has been established on the STOMP
|
||||
protocol level. For this, you need to invoke the "connect"
|
||||
method on the connection.
|
||||
|
||||
\param host_and_port a tuple containing the host name and port
|
||||
number to which the connection has been established.
|
||||
"""
|
||||
pass
|
||||
|
||||
def on_connected(self, headers, body):
|
||||
"""
|
||||
Called by the STOMP connection when a CONNECTED frame is
|
||||
received, that is after a connection has been established or
|
||||
re-established.
|
||||
|
||||
\param headers a dictionary containing all headers sent by the
|
||||
server as key/value pairs.
|
||||
|
||||
\param body the frame's payload. This is usually empty for
|
||||
CONNECTED frames.
|
||||
"""
|
||||
pass
|
||||
|
||||
def on_disconnected(self):
|
||||
"""
|
||||
Called by the STOMP connection when a TCP/IP connection to the
|
||||
STOMP server has been lost. No messages should be sent via
|
||||
the connection until it has been reestablished.
|
||||
"""
|
||||
pass
|
||||
|
||||
def on_message(self, headers, body):
|
||||
"""
|
||||
Called by the STOMP connection when a MESSAGE frame is
|
||||
received.
|
||||
|
||||
\param headers a dictionary containing all headers sent by the
|
||||
server as key/value pairs.
|
||||
|
||||
\param body the frame's payload - the message body.
|
||||
"""
|
||||
pass
|
||||
|
||||
def on_receipt(self, headers, body):
|
||||
"""
|
||||
Called by the STOMP connection when a RECEIPT frame is
|
||||
received, sent by the server if requested by the client using
|
||||
the 'receipt' header.
|
||||
|
||||
\param headers a dictionary containing all headers sent by the
|
||||
server as key/value pairs.
|
||||
|
||||
\param body the frame's payload. This is usually empty for
|
||||
RECEIPT frames.
|
||||
"""
|
||||
pass
|
||||
|
||||
def on_error(self, headers, body):
|
||||
"""
|
||||
Called by the STOMP connection when an ERROR frame is
|
||||
received.
|
||||
|
||||
\param headers a dictionary containing all headers sent by the
|
||||
server as key/value pairs.
|
||||
|
||||
\param body the frame's payload - usually a detailed error
|
||||
description.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class Connection(object):
|
||||
"""
|
||||
Represents a STOMP client connection.
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
host_and_ports = [ ('localhost', 61613) ],
|
||||
user = None,
|
||||
passcode = None,
|
||||
prefer_localhost = True,
|
||||
try_loopback_connect = True,
|
||||
reconnect_sleep_initial = 0.1,
|
||||
reconnect_sleep_increase = 0.5,
|
||||
reconnect_sleep_jitter = 0.1,
|
||||
reconnect_sleep_max = 60.0):
|
||||
"""
|
||||
Initialize and start this connection.
|
||||
|
||||
\param host_and_ports
|
||||
a list of (host, port) tuples.
|
||||
|
||||
\param prefer_localhost
|
||||
if True and the local host is mentioned in the (host,
|
||||
port) tuples, try to connect to this first
|
||||
|
||||
\param try_loopback_connect
|
||||
if True and the local host is found in the host
|
||||
tuples, try connecting to it using loopback interface
|
||||
(127.0.0.1)
|
||||
|
||||
\param reconnect_sleep_initial
|
||||
|
||||
initial delay in seconds to wait before reattempting
|
||||
to establish a connection if connection to any of the
|
||||
hosts fails.
|
||||
|
||||
\param reconnect_sleep_increase
|
||||
|
||||
factor by which the sleep delay is increased after
|
||||
each connection attempt. For example, 0.5 means
|
||||
to wait 50% longer than before the previous attempt,
|
||||
1.0 means wait twice as long, and 0.0 means keep
|
||||
the delay constant.
|
||||
|
||||
\param reconnect_sleep_max
|
||||
|
||||
maximum delay between connection attempts, regardless
|
||||
of the reconnect_sleep_increase.
|
||||
|
||||
\param reconnect_sleep_jitter
|
||||
|
||||
random additional time to wait (as a percentage of
|
||||
the time determined using the previous parameters)
|
||||
between connection attempts in order to avoid
|
||||
stampeding. For example, a value of 0.1 means to wait
|
||||
an extra 0%-10% (randomly determined) of the delay
|
||||
calculated using the previous three parameters.
|
||||
"""
|
||||
|
||||
sorted_host_and_ports = []
|
||||
sorted_host_and_ports.extend(host_and_ports)
|
||||
|
||||
# If localhost is preferred, make sure all (host, port) tuples
|
||||
# that refer to the local host come first in the list
|
||||
if prefer_localhost:
|
||||
def is_local_host(host):
|
||||
return host in Connection.__localhost_names
|
||||
|
||||
sorted_host_and_ports.sort(lambda x, y: (int(is_local_host(y[0]))
|
||||
- int(is_local_host(x[0]))))
|
||||
|
||||
# If the user wishes to attempt connecting to local ports
|
||||
# using the loopback interface, for each (host, port) tuple
|
||||
# referring to a local host, add an entry with the host name
|
||||
# replaced by 127.0.0.1 if it doesn't exist already
|
||||
loopback_host_and_ports = []
|
||||
if try_loopback_connect:
|
||||
for host_and_port in sorted_host_and_ports:
|
||||
if is_local_host(host_and_port[0]):
|
||||
port = host_and_port[1]
|
||||
if (not ("127.0.0.1", port) in sorted_host_and_ports
|
||||
and not ("localhost", port) in sorted_host_and_ports):
|
||||
loopback_host_and_ports.append(("127.0.0.1", port))
|
||||
|
||||
# Assemble the final, possibly sorted list of (host, port) tuples
|
||||
self.__host_and_ports = []
|
||||
self.__host_and_ports.extend(loopback_host_and_ports)
|
||||
self.__host_and_ports.extend(sorted_host_and_ports)
|
||||
|
||||
self.__recvbuf = ''
|
||||
|
||||
self.__listeners = [ ]
|
||||
|
||||
self.__reconnect_sleep_initial = reconnect_sleep_initial
|
||||
self.__reconnect_sleep_increase = reconnect_sleep_increase
|
||||
self.__reconnect_sleep_jitter = reconnect_sleep_jitter
|
||||
self.__reconnect_sleep_max = reconnect_sleep_max
|
||||
|
||||
self.__connect_headers = {}
|
||||
if user is not None and passcode is not None:
|
||||
self.__connect_headers['login'] = user
|
||||
self.__connect_headers['passcode'] = passcode
|
||||
|
||||
self.__socket = None
|
||||
self.__current_host_and_port = None
|
||||
|
||||
self.__receiver_thread_exit_condition = threading.Condition()
|
||||
self.__receiver_thread_exited = False
|
||||
|
||||
#
|
||||
# Manage the connection
|
||||
#
|
||||
|
||||
def start(self):
|
||||
"""
|
||||
Start the connection. This should be called after all
|
||||
listeners have been registered. If this method is not called,
|
||||
no frames will be received by the connection.
|
||||
"""
|
||||
self.__running = True
|
||||
self.__attempt_connection()
|
||||
_thread.start_new_thread(self.__receiver_loop, ())
|
||||
|
||||
def stop(self):
|
||||
"""
|
||||
Stop the connection. This is equivalent to calling
|
||||
disconnect() but will do a clean shutdown by waiting for the
|
||||
receiver thread to exit.
|
||||
"""
|
||||
self.disconnect()
|
||||
|
||||
self.__receiver_thread_exit_condition.acquire()
|
||||
if not self.__receiver_thread_exited:
|
||||
self.__receiver_thread_exit_condition.wait()
|
||||
self.__receiver_thread_exit_condition.release()
|
||||
|
||||
def get_host_and_port(self):
|
||||
"""
|
||||
Return a (host, port) tuple indicating which STOMP host and
|
||||
port is currently connected, or None if there is currently no
|
||||
connection.
|
||||
"""
|
||||
return self.__current_host_and_port
|
||||
|
||||
def is_connected(self):
|
||||
try:
|
||||
return self.__socket is not None and self.__socket.getsockname()[1] != 0
|
||||
except socket.error:
|
||||
return False
|
||||
|
||||
#
|
||||
# Manage objects listening to incoming frames
|
||||
#
|
||||
|
||||
def add_listener(self, listener):
|
||||
self.__listeners.append(listener)
|
||||
|
||||
def remove_listener(self, listener):
|
||||
self.__listeners.remove(listener)
|
||||
|
||||
#
|
||||
# STOMP transmissions
|
||||
#
|
||||
|
||||
def subscribe(self, headers={}, **keyword_headers):
|
||||
self.__send_frame_helper('SUBSCRIBE', '', self.__merge_headers([headers, keyword_headers]), [ 'destination' ])
|
||||
|
||||
def unsubscribe(self, headers={}, **keyword_headers):
|
||||
self.__send_frame_helper('UNSUBSCRIBE', '', self.__merge_headers([headers, keyword_headers]), [ ('destination', 'id') ])
|
||||
|
||||
def send(self, message='', headers={}, **keyword_headers):
|
||||
if '\x00' in message:
|
||||
content_length_headers = {'content-length': len(message)}
|
||||
else:
|
||||
content_length_headers = {}
|
||||
self.__send_frame_helper('SEND', message, self.__merge_headers([headers,
|
||||
keyword_headers,
|
||||
content_length_headers]), [ 'destination' ])
|
||||
|
||||
def ack(self, headers={}, **keyword_headers):
|
||||
self.__send_frame_helper('ACK', '', self.__merge_headers([headers, keyword_headers]), [ 'message-id' ])
|
||||
|
||||
def begin(self, headers={}, **keyword_headers):
|
||||
use_headers = self.__merge_headers([headers, keyword_headers])
|
||||
if not 'transaction' in list(use_headers.keys()):
|
||||
use_headers['transaction'] = _uuid()
|
||||
self.__send_frame_helper('BEGIN', '', use_headers, [ 'transaction' ])
|
||||
return use_headers['transaction']
|
||||
|
||||
def abort(self, headers={}, **keyword_headers):
|
||||
self.__send_frame_helper('ABORT', '', self.__merge_headers([headers, keyword_headers]), [ 'transaction' ])
|
||||
|
||||
def commit(self, headers={}, **keyword_headers):
|
||||
self.__send_frame_helper('COMMIT', '', self.__merge_headers([headers, keyword_headers]), [ 'transaction' ])
|
||||
|
||||
def connect(self, headers={}, **keyword_headers):
|
||||
if 'wait' in keyword_headers and keyword_headers['wait']:
|
||||
while not self.is_connected(): time.sleep(0.1)
|
||||
del keyword_headers['wait']
|
||||
self.__send_frame_helper('CONNECT', '', self.__merge_headers([self.__connect_headers, headers, keyword_headers]), [ ])
|
||||
|
||||
def disconnect(self, headers={}, **keyword_headers):
|
||||
self.__send_frame_helper('DISCONNECT', '', self.__merge_headers([self.__connect_headers, headers, keyword_headers]), [ ])
|
||||
self.__running = False
|
||||
if hasattr(socket, 'SHUT_RDWR'):
|
||||
self.__socket.shutdown(socket.SHUT_RDWR)
|
||||
if self.__socket:
|
||||
self.__socket.close()
|
||||
self.__current_host_and_port = None
|
||||
|
||||
# ========= PRIVATE MEMBERS =========
|
||||
|
||||
|
||||
# List of all host names (unqualified, fully-qualified, and IP
|
||||
# addresses) that refer to the local host (both loopback interface
|
||||
# and external interfaces). This is used for determining
|
||||
# preferred targets.
|
||||
__localhost_names = [ "localhost",
|
||||
"127.0.0.1",
|
||||
socket.gethostbyname(socket.gethostname()),
|
||||
socket.gethostname(),
|
||||
socket.getfqdn(socket.gethostname()) ]
|
||||
#
|
||||
# Used to parse STOMP header lines in the format "key:value",
|
||||
#
|
||||
__header_line_re = re.compile('(?P<key>[^:]+)[:](?P<value>.*)')
|
||||
|
||||
#
|
||||
# Used to parse the STOMP "content-length" header lines,
|
||||
#
|
||||
__content_length_re = re.compile('^content-length[:]\\s*(?P<value>[0-9]+)', re.MULTILINE)
|
||||
|
||||
def __merge_headers(self, header_map_list):
|
||||
"""
|
||||
Helper function for combining multiple header maps into one.
|
||||
|
||||
Any underscores ('_') in header names (keys) will be replaced by dashes ('-').
|
||||
"""
|
||||
headers = {}
|
||||
for header_map in header_map_list:
|
||||
for header_key in list(header_map.keys()):
|
||||
headers[header_key] = header_map[header_key]
|
||||
return headers
|
||||
|
||||
def __convert_dict(self, payload):
|
||||
"""
|
||||
Encode python dictionary as <map>...</map> structure.
|
||||
"""
|
||||
|
||||
xmlStr = "<map>\n"
|
||||
for key in payload:
|
||||
xmlStr += "<entry>\n"
|
||||
xmlStr += "<string>%s</string>" % key
|
||||
xmlStr += "<string>%s</string>" % payload[key]
|
||||
xmlStr += "</entry>\n"
|
||||
xmlStr += "</map>"
|
||||
|
||||
return xmlStr
|
||||
|
||||
def __send_frame_helper(self, command, payload, headers, required_header_keys):
|
||||
"""
|
||||
Helper function for sending a frame after verifying that a
|
||||
given set of headers are present.
|
||||
|
||||
\param command the command to send
|
||||
|
||||
\param payload the frame's payload
|
||||
|
||||
\param headers a dictionary containing the frame's headers
|
||||
|
||||
\param required_header_keys a sequence enumerating all
|
||||
required header keys. If an element in this sequence is itself
|
||||
a tuple, that tuple is taken as a list of alternatives, one of
|
||||
which must be present.
|
||||
|
||||
\throws ArgumentError if one of the required header keys is
|
||||
not present in the header map.
|
||||
"""
|
||||
for required_header_key in required_header_keys:
|
||||
if type(required_header_key) == tuple:
|
||||
found_alternative = False
|
||||
for alternative in required_header_key:
|
||||
if alternative in list(headers.keys()):
|
||||
found_alternative = True
|
||||
if not found_alternative:
|
||||
raise KeyError("Command %s requires one of the following headers: %s" % (command, str(required_header_key)))
|
||||
elif not required_header_key in list(headers.keys()):
|
||||
raise KeyError("Command %s requires header %r" % (command, required_header_key))
|
||||
self.__send_frame(command, headers, payload)
|
||||
|
||||
def __send_frame(self, command, headers={}, payload=''):
|
||||
"""
|
||||
Send a STOMP frame.
|
||||
"""
|
||||
if type(payload) == dict:
|
||||
headers["transformation"] = "jms-map-xml"
|
||||
payload = self.__convert_dict(payload)
|
||||
|
||||
if self.__socket is not None:
|
||||
frame = '%s\n%s\n%s\x00' % (command,
|
||||
reduce(lambda accu, key: accu + ('%s:%s\n' % (key, headers[key])), list(headers.keys()), ''),
|
||||
payload)
|
||||
self.__socket.sendall(frame)
|
||||
log.debug("Sent frame: type=%s, headers=%r, body=%r" % (command, headers, payload))
|
||||
else:
|
||||
raise NotConnectedException()
|
||||
|
||||
def __receiver_loop(self):
|
||||
"""
|
||||
Main loop listening for incoming data.
|
||||
"""
|
||||
try:
|
||||
try:
|
||||
threading.currentThread().setName("StompReceiver")
|
||||
while self.__running:
|
||||
log.debug('starting receiver loop')
|
||||
|
||||
if self.__socket is None:
|
||||
break
|
||||
|
||||
try:
|
||||
try:
|
||||
for listener in self.__listeners:
|
||||
if hasattr(listener, 'on_connecting'):
|
||||
listener.on_connecting(self.__current_host_and_port)
|
||||
|
||||
while self.__running:
|
||||
frames = self.__read()
|
||||
|
||||
for frame in frames:
|
||||
(frame_type, headers, body) = self.__parse_frame(frame)
|
||||
log.debug("Received frame: result=%r, headers=%r, body=%r" % (frame_type, headers, body))
|
||||
frame_type = frame_type.lower()
|
||||
if frame_type in [ 'connected',
|
||||
'message',
|
||||
'receipt',
|
||||
'error' ]:
|
||||
for listener in self.__listeners:
|
||||
if hasattr(listener, 'on_%s' % frame_type):
|
||||
eval('listener.on_%s(headers, body)' % frame_type)
|
||||
else:
|
||||
log.debug('listener %s has no such method on_%s' % (listener, frame_type))
|
||||
else:
|
||||
log.warning('Unknown response frame type: "%s" (frame length was %d)' % (frame_type, len(frame)))
|
||||
finally:
|
||||
try:
|
||||
self.__socket.close()
|
||||
except:
|
||||
pass # ignore errors when attempting to close socket
|
||||
self.__socket = None
|
||||
self.__current_host_and_port = None
|
||||
except ConnectionClosedException:
|
||||
if self.__running:
|
||||
log.error("Lost connection")
|
||||
# Notify listeners
|
||||
for listener in self.__listeners:
|
||||
if hasattr(listener, 'on_disconnected'):
|
||||
listener.on_disconnected()
|
||||
# Clear out any half-received messages after losing connection
|
||||
self.__recvbuf = ''
|
||||
continue
|
||||
else:
|
||||
break
|
||||
except:
|
||||
log.exception("An unhandled exception was encountered in the stomp receiver loop")
|
||||
|
||||
finally:
|
||||
self.__receiver_thread_exit_condition.acquire()
|
||||
self.__receiver_thread_exited = True
|
||||
self.__receiver_thread_exit_condition.notifyAll()
|
||||
self.__receiver_thread_exit_condition.release()
|
||||
|
||||
def __read(self):
|
||||
"""
|
||||
Read the next frame(s) from the socket.
|
||||
"""
|
||||
fastbuf = StringIO()
|
||||
while self.__running:
|
||||
try:
|
||||
c = self.__socket.recv(1024)
|
||||
except:
|
||||
c = ''
|
||||
if len(c) == 0:
|
||||
raise ConnectionClosedException
|
||||
fastbuf.write(c)
|
||||
if '\x00' in c:
|
||||
break
|
||||
self.__recvbuf += fastbuf.getvalue()
|
||||
fastbuf.close()
|
||||
result = []
|
||||
|
||||
if len(self.__recvbuf) > 0 and self.__running:
|
||||
while True:
|
||||
pos = self.__recvbuf.find('\x00')
|
||||
if pos >= 0:
|
||||
frame = self.__recvbuf[0:pos]
|
||||
preamble_end = frame.find('\n\n')
|
||||
if preamble_end >= 0:
|
||||
content_length_match = Connection.__content_length_re.search(frame[0:preamble_end])
|
||||
if content_length_match:
|
||||
content_length = int(content_length_match.group('value'))
|
||||
content_offset = preamble_end + 2
|
||||
frame_size = content_offset + content_length
|
||||
if frame_size > len(frame):
|
||||
# Frame contains NUL bytes, need to
|
||||
# read more
|
||||
if frame_size < len(self.__recvbuf):
|
||||
pos = frame_size
|
||||
frame = self.__recvbuf[0:pos]
|
||||
else:
|
||||
# Haven't read enough data yet,
|
||||
# exit loop and wait for more to
|
||||
# arrive
|
||||
break
|
||||
result.append(frame)
|
||||
self.__recvbuf = self.__recvbuf[pos+1:]
|
||||
else:
|
||||
break
|
||||
return result
|
||||
|
||||
|
||||
def __transform(self, body, transType):
|
||||
"""
|
||||
Perform body transformation. Currently, the only supported transformation is
|
||||
'jms-map-xml', which converts a map into python dictionary. This can be extended
|
||||
to support other transformation types.
|
||||
|
||||
The body has the following format:
|
||||
<map>
|
||||
<entry>
|
||||
<string>name</string>
|
||||
<string>Dejan</string>
|
||||
</entry>
|
||||
<entry>
|
||||
<string>city</string>
|
||||
<string>Belgrade</string>
|
||||
</entry>
|
||||
</map>
|
||||
|
||||
(see http://docs.codehaus.org/display/STOMP/Stomp+v1.1+Ideas)
|
||||
"""
|
||||
|
||||
if transType != 'jms-map-xml':
|
||||
return body
|
||||
|
||||
try:
|
||||
entries = {}
|
||||
doc = xml.dom.minidom.parseString(body)
|
||||
rootElem = doc.documentElement
|
||||
for entryElem in rootElem.getElementsByTagName("entry"):
|
||||
pair = []
|
||||
for node in entryElem.childNodes:
|
||||
if not isinstance(node, xml.dom.minidom.Element): continue
|
||||
pair.append(node.firstChild.nodeValue)
|
||||
assert len(pair) == 2
|
||||
entries[pair[0]] = pair[1]
|
||||
return entries
|
||||
except Exception as ex:
|
||||
# unable to parse message. return original
|
||||
return body
|
||||
|
||||
|
||||
def __parse_frame(self, frame):
|
||||
"""
|
||||
Parse a STOMP frame into a (frame_type, headers, body) tuple,
|
||||
where frame_type is the frame type as a string (e.g. MESSAGE),
|
||||
headers is a map containing all header key/value pairs, and
|
||||
body is a string containing the frame's payload.
|
||||
"""
|
||||
preamble_end = frame.find('\n\n')
|
||||
preamble = frame[0:preamble_end]
|
||||
preamble_lines = preamble.split('\n')
|
||||
body = frame[preamble_end+2:]
|
||||
|
||||
# Skip any leading newlines
|
||||
first_line = 0
|
||||
while first_line < len(preamble_lines) and len(preamble_lines[first_line]) == 0:
|
||||
first_line += 1
|
||||
|
||||
# Extract frame type
|
||||
frame_type = preamble_lines[first_line]
|
||||
|
||||
# Put headers into a key/value map
|
||||
headers = {}
|
||||
for header_line in preamble_lines[first_line+1:]:
|
||||
header_match = Connection.__header_line_re.match(header_line)
|
||||
if header_match:
|
||||
headers[header_match.group('key')] = header_match.group('value')
|
||||
|
||||
if 'transformation' in headers:
|
||||
body = self.__transform(body, headers['transformation'])
|
||||
|
||||
return (frame_type, headers, body)
|
||||
|
||||
def __attempt_connection(self):
|
||||
"""
|
||||
Try connecting to the (host, port) tuples specified at construction time.
|
||||
"""
|
||||
|
||||
sleep_exp = 1
|
||||
while self.__running and self.__socket is None:
|
||||
for host_and_port in self.__host_and_ports:
|
||||
try:
|
||||
log.debug("Attempting connection to host %s, port %s" % host_and_port)
|
||||
self.__socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.__socket.settimeout(None)
|
||||
self.__socket.connect(host_and_port)
|
||||
self.__current_host_and_port = host_and_port
|
||||
log.info("Established connection to host %s, port %s" % host_and_port)
|
||||
break
|
||||
except socket.error:
|
||||
self.__socket = None
|
||||
if type(sys.exc_info()[1]) == tuple:
|
||||
exc = sys.exc_info()[1][1]
|
||||
else:
|
||||
exc = sys.exc_info()[1]
|
||||
log.warning("Could not connect to host %s, port %s: %s" % (host_and_port[0], host_and_port[1], exc))
|
||||
|
||||
if self.__socket is None:
|
||||
sleep_duration = (min(self.__reconnect_sleep_max,
|
||||
((self.__reconnect_sleep_initial / (1.0 + self.__reconnect_sleep_increase))
|
||||
* math.pow(1.0 + self.__reconnect_sleep_increase, sleep_exp)))
|
||||
* (1.0 + random.random() * self.__reconnect_sleep_jitter))
|
||||
sleep_end = time.time() + sleep_duration
|
||||
log.debug("Sleeping for %.1f seconds before attempting reconnect" % sleep_duration)
|
||||
while self.__running and time.time() < sleep_end:
|
||||
time.sleep(0.2)
|
||||
|
||||
if sleep_duration < self.__reconnect_sleep_max:
|
||||
sleep_exp += 1
|
||||
|
||||
#
|
||||
# command line testing
|
||||
#
|
||||
if __name__ == '__main__':
|
||||
|
||||
# If the readline module is available, make command input easier
|
||||
try:
|
||||
import readline
|
||||
def stomp_completer(text, state):
|
||||
commands = [ 'subscribe', 'unsubscribe',
|
||||
'send', 'ack',
|
||||
'begin', 'abort', 'commit',
|
||||
'connect', 'disconnect'
|
||||
]
|
||||
for command in commands[state:]:
|
||||
if command.startswith(text):
|
||||
return "%s " % command
|
||||
return None
|
||||
|
||||
readline.parse_and_bind("tab: complete")
|
||||
readline.set_completer(stomp_completer)
|
||||
readline.set_completer_delims("")
|
||||
except ImportError:
|
||||
pass # ignore unavailable readline module
|
||||
|
||||
class StompTester(object):
|
||||
def __init__(self, host='localhost', port=61613, user='', passcode=''):
|
||||
self.c = Connection([(host, port)], user, passcode)
|
||||
self.c.add_listener(self)
|
||||
self.c.start()
|
||||
|
||||
def __print_async(self, frame_type, headers, body):
|
||||
#print("\r \r",)
|
||||
print(frame_type)
|
||||
for header_key in list(headers.keys()):
|
||||
print('%s: %s' % (header_key, headers[header_key]))
|
||||
print()
|
||||
print(body)
|
||||
print('> ', end=' ')
|
||||
sys.stdout.flush()
|
||||
|
||||
def on_connecting(self, host_and_port):
|
||||
self.c.connect(wait=True)
|
||||
|
||||
def on_disconnected(self):
|
||||
print("lost connection")
|
||||
|
||||
def on_message(self, headers, body):
|
||||
self.__print_async("MESSAGE", headers, body)
|
||||
|
||||
def on_error(self, headers, body):
|
||||
self.__print_async("ERROR", headers, body)
|
||||
|
||||
def on_receipt(self, headers, body):
|
||||
self.__print_async("RECEIPT", headers, body)
|
||||
|
||||
def on_connected(self, headers, body):
|
||||
self.__print_async("CONNECTED", headers, body)
|
||||
|
||||
def ack(self, args):
|
||||
if len(args) < 3:
|
||||
self.c.ack(message_id=args[1])
|
||||
else:
|
||||
self.c.ack(message_id=args[1], transaction=args[2])
|
||||
|
||||
def abort(self, args):
|
||||
self.c.abort(transaction=args[1])
|
||||
|
||||
def begin(self, args):
|
||||
print('transaction id: %s' % self.c.begin())
|
||||
|
||||
def commit(self, args):
|
||||
if len(args) < 2:
|
||||
print('expecting: commit <transid>')
|
||||
else:
|
||||
print('committing %s' % args[1])
|
||||
self.c.commit(transaction=args[1])
|
||||
|
||||
def disconnect(self, args):
|
||||
try:
|
||||
self.c.disconnect()
|
||||
except NotConnectedException:
|
||||
pass # ignore if no longer connected
|
||||
|
||||
def send(self, args):
|
||||
if len(args) < 3:
|
||||
print('expecting: send <destination> <message>')
|
||||
else:
|
||||
self.c.send(destination=args[1], message=' '.join(args[2:]))
|
||||
|
||||
def sendtrans(self, args):
|
||||
if len(args) < 3:
|
||||
print('expecting: sendtrans <destination> <transid> <message>')
|
||||
else:
|
||||
self.c.send(destination=args[1], message="%s\n" % ' '.join(args[3:]), transaction=args[2])
|
||||
|
||||
def subscribe(self, args):
|
||||
if len(args) < 2:
|
||||
print('expecting: subscribe <destination> [ack]')
|
||||
elif len(args) > 2:
|
||||
print('subscribing to "%s" with acknowledge set to "%s"' % (args[1], args[2]))
|
||||
self.c.subscribe(destination=args[1], ack=args[2])
|
||||
else:
|
||||
print('subscribing to "%s" with auto acknowledge' % args[1])
|
||||
self.c.subscribe(destination=args[1], ack='auto')
|
||||
|
||||
def unsubscribe(self, args):
|
||||
if len(args) < 2:
|
||||
print('expecting: unsubscribe <destination>')
|
||||
else:
|
||||
print('unsubscribing from "%s"' % args[1])
|
||||
self.c.unsubscribe(destination=args[1])
|
||||
|
||||
if len(sys.argv) > 5:
|
||||
print('USAGE: stomp.py [host] [port] [user] [passcode]')
|
||||
sys.exit(1)
|
||||
|
||||
if len(sys.argv) >= 2:
|
||||
host = sys.argv[1]
|
||||
else:
|
||||
host = "localhost"
|
||||
if len(sys.argv) >= 3:
|
||||
port = int(sys.argv[2])
|
||||
else:
|
||||
port = 61613
|
||||
|
||||
if len(sys.argv) >= 5:
|
||||
user = sys.argv[3]
|
||||
passcode = sys.argv[4]
|
||||
else:
|
||||
user = None
|
||||
passcode = None
|
||||
|
||||
st = StompTester(host, port, user, passcode)
|
||||
try:
|
||||
while True:
|
||||
line = input("\r> ")
|
||||
if not line or line.lstrip().rstrip() == '':
|
||||
continue
|
||||
elif 'quit' in line or 'disconnect' in line:
|
||||
break
|
||||
split = line.split()
|
||||
command = split[0]
|
||||
if not command.startswith("on_") and hasattr(st, command):
|
||||
getattr(st, command)(split)
|
||||
else:
|
||||
print('unrecognized command')
|
||||
finally:
|
||||
st.disconnect(None)
|
||||
|
||||
|
|
@ -1,26 +0,0 @@
|
|||
#
|
||||
# Pure python logging mechanism for logging to AlertViz from
|
||||
# pure python (ie not JEP). DO NOT USE IN PYTHON CALLED
|
||||
# FROM JAVA.
|
||||
#
|
||||
#
|
||||
# SOFTWARE HISTORY
|
||||
#
|
||||
# Date Ticket# Engineer Description
|
||||
# ------------ ---------- ----------- --------------------------
|
||||
# 11/03/10 5849 cjeanbap Initial Creation.
|
||||
#
|
||||
#
|
||||
#
|
||||
|
||||
## to execute type python Test
|
||||
|
||||
|
||||
import os
|
||||
import logging
|
||||
from awips import AlertVizHandler
|
||||
from . import Record
|
||||
|
||||
avh = AlertVizHandler.AlertVizHandler(host=os.getenv("BROKER_ADDR","localhost"), port=9581, category='LOCAL', source='ANNOUNCER', level=logging.NOTSET)
|
||||
record = Record.Record(10)
|
||||
avh.emit(record)
|
Loading…
Add table
Reference in a new issue