awips2/edexOsgi/com.raytheon.uf.tools.gfesuite/cli/getNotify.py
ucar-tmeyer ddb3fd58dc Transition from ufpy to awips python package
-Had to update the awips-ade and awips-devel dockers
-Update rpm building to include awips2-python-awips (which includes awips, dynamicserialize, thrift packages), and remove rpm install dependencies on ufpy, dynamicserialize, and thrift
-Update LDM scripts and edexBridge to use awips instead of ufpy
-Update multiple python scripts across several repos to use awips instead of ufpy
  -awips2
  -awips2-core
  -awips2-nativelib
  -awips2-ncep
  -awips2-rpm
2024-01-16 16:10:09 +00:00

219 lines
6.6 KiB
Python

#!/awips2/python/bin/python3
##
# This software was developed and / or modified by Raytheon Company,
# pursuant to Contract DG133W-05-CQ-1067 with the US Government.
#
# U.S. EXPORT CONTROLLED TECHNICAL DATA
# This software product contains export-restricted data whose
# export/transfer/disclosure is restricted by U.S. law. Dissemination
# to non-U.S. persons whether in the United States or abroad requires
# an export license or other authorization.
#
# Contractor Name: Raytheon Company
# Contractor Address: 6825 Pine Street, Suite 340
# Mail Stop B8
# Omaha, NE 68106
# 402.291.0100
#
# See the AWIPS II Master Rights File ("Master Rights File.pdf") for
# further licensing information.
##
#
# Provides a Python-based interface listening to gfe notifications
#
# SOFTWARE HISTORY
#
# Date Ticket# Engineer Description
# ------------- -------- ------------ --------------------------------------------
# Nov 18, 2010 njensen Initial Creation.
# Jun 13, 2013 2044 randerso Fixed to use correct python
# Jun 23, 2015 4573 randerso Removed obsolete A1 options, added new options for A2
# Jul 23, 2019 7724 mrichardson Upgrade Qpid to Qpid Proton
# Jun 24, 2020 8187 randerso Added program for qpid connection_id
#
#
import time, sys
import threading
from collections import OrderedDict
import dynamicserialize
from dynamicserialize.dstypes.com.raytheon.uf.common.dataplugin.gfe.server.notify import *
from dynamicserialize.dstypes.com.raytheon.uf.common.activetable import VTECTableChangeNotification
from awips import QpidSubscriber
printoutMap = OrderedDict([
('L', 'LOCK'),
('G', 'GRID'),
('D', 'DB'),
('U', 'USER'),
('V', 'VTEC'),
('C', 'COMBO'),
('S', 'SBU'),
('H', 'HISTORY'),
])
classMap = {
LockNotification: 'L',
GridUpdateNotification: 'G',
DBInvChangeNotification: 'D',
UserMessageNotification: 'U',
VTECTableChangeNotification: 'V',
CombinationsFileChangedNotification: 'C',
ServiceBackupJobStatusNotification: 'S',
GridHistoryUpdateNotification: 'H'
}
topicList = ['edex.alerts.gfe', 'edex.alerts.vtec']
messageQueueMap = {}
for letter in printoutMap:
messageQueueMap[letter] = []
class GetGfeNotifyTopicListener(threading.Thread):
def __init__(self, topicName, hostname, portNumber, L, G, D, U, V, C, S, H):
self.hostname = hostname
self.portNumber = portNumber
self.topicName = topicName
self.L = L
self.G = G
self.D = D
self.U = U
self.V = V
self.C = C
self.S = S
self.H = H
self.qs = None
threading.Thread.__init__(self)
def run(self):
self.qs = QpidSubscriber.QpidSubscriber(host=self.hostname, port=self.portNumber, program="getNotify")
self.qs.topicSubscribe(self.topicName, self.receivedMessage)
def addMessageToQueue(self, obj, t):
if t in classMap:
messageQueueMap[classMap[t]].append(obj)
else:
print("Does not yet support type", t)
def receivedMessage(self, msg):
try:
obj = dynamicserialize.deserialize(msg)
t = type(obj)
if t is list:
for notification in obj:
self.addMessageToQueue(notification, type(notification))
else:
self.addMessageToQueue(obj, t)
except:
import traceback
traceback.print_exc()
def stop(self):
self.qs.close()
def decodeOptions():
import getopt
optionlist, arglist = getopt.getopt(sys.argv[1:], 'h:p:lgduvcsH')
badFormat = False
optionMap = {"hostname": "", "portNumber": 5672, }
for letter in printoutMap:
optionMap[letter] = False
for option in optionlist:
if option[0] == '-h':
optionMap["hostname"] = option[1]
elif option[0] == '-p':
optionMap["portNumber"] = int(option[1])
elif option[0] == '-l':
optionMap["L"] = True
elif option[0] == '-g':
optionMap["G"] = True
elif option[0] == '-d':
optionMap["D"] = True
elif option[0] == '-u':
optionMap["U"] = True
elif option[0] == '-v':
optionMap["V"] = True
elif option[0] == '-c':
optionMap["C"] = True
elif option[0] == '-s':
optionMap["S"] = True
elif option[0] == '-H':
optionMap["H"] = True
else:
badFormat = True
break
if len(optionMap["hostname"]) == 0 == 0 or badFormat:
usage()
return None
return optionMap
def usage():
s = """
Usage: getNotify -h <hostname> [-p <portNumber>] [-l] [-g] [-d] [-u] [-v] [-c] [-s] [-H]
-h hostname : upon which the JMS broker is running
-p portNumber: the port that JMS broker is using, defaults to 5672
-l: display detailed lock notifications
-g: display detailed grid update notifications
-d: display detailed database update notifications
-u: display detailed user message notifications
-v: display detailed VTEC notifications
-c: display detailed combinations file change notifications
-s: display detailed service backup job status notifications
-H: display detailed grid history update notifications
"""
print(s)
def printLoop(options):
try:
while True:
time.sleep(3)
msg = ''
for letter in printoutMap:
msg += letter + '=' + str(len(messageQueueMap[letter])) + ","
print(msg)
for letter in printoutMap:
if options[letter] and len(messageQueueMap[letter]) > 0:
msg = printoutMap[letter] + "=[" + "\n"
for obj in messageQueueMap[letter]:
msg += str(obj) + "," + "\n"
messageQueueMap[letter].remove(obj)
msg += ']'
print(msg)
else:
messageQueueMap[letter] = []
sys.stdout.flush()
except KeyboardInterrupt:
pass
def main():
print("Get Notifications Diagnostic Program")
options = decodeOptions()
if options is not None:
threadList = []
for topic in topicList:
options["topicName"] = topic
thread = GetGfeNotifyTopicListener(**options)
threadList.append(thread)
try:
for thread in threadList:
thread.start()
printLoop(options)
finally:
for thread in threadList:
thread.stop()
if __name__ == '__main__':
main()