awips2/pythonPackages/qpid/bin/qpid-stat
Bryan Kowal cd864eedf8 Issue #1844 - update qpid-queue-count and qpid-stat to use the new REST-based ws
- removed debug print statements
- currently, authenticating against the REST service is not supported; need to
  setup a qpid that requires REST authentication to ensure proper implementation

Change-Id: I9790f7a6678f30892ee3e5a29bcc8f2b5606b478

Former-commit-id: df07748ab8 [formerly 9aab25bdd4] [formerly e5ee38d896] [formerly 6811a31cd5 [formerly e5ee38d896 [formerly a45c818e6df529fb4e85f76cd5155a48c67fe0d9]]]
Former-commit-id: 6811a31cd5
Former-commit-id: 295d34185f0c763165f5e941a848fce19fe923d0 [formerly 02710de565]
Former-commit-id: f4c91f4abe
2013-04-09 14:21:09 -05:00

356 lines
No EOL
12 KiB
Python
Executable file

#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import os
import getopt
import sys
import locale
import socket
import re
import httplib
import json
import time
from datetime import datetime
from qmf.console import Session, Console
from qpid.disp import Display, Header, Sorter
_host = "localhost"
_connTimeout = 10
_types = ""
_limit = 250
_increasing = False
_sortcol = None
pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$")
def Usage ():
print "Usage: qpid-stat [OPTIONS] [broker-addr]"
print
print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]"
print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
print
print "General Options:"
print " --timeout seconds (10) Maximum time to wait for broker connection"
# print " -n [--numeric] Don't resolve names"
print
print "Display Options:"
print
print " -b Show Brokers"
print " -c Show Connections"
print " -s Show Subscriptions"
print " -e Show Exchanges"
print " -q Show Queues"
print
print " -S [--sort-by] COLNAME Sort by column name"
print " -I [--increasing] Sort by increasing value (default = decreasing)"
print " -L [--limit] NUM Limit output to NUM rows (default = 250)"
print
sys.exit (1)
class DisplayManager(Console):
def displayBroker(self, jsonObjArray):
disp = Display(prefix=" ")
heads = []
heads.append(Header('broker'))
heads.append(Header('conn', Header.KMG))
heads.append(Header('exch', Header.KMG))
heads.append(Header('queue', Header.KMG))
rows = []
for staticDict in jsonObjArray:
row = []
row.append(staticDict.get("name"))
# count the connections, exchanges, and queues
# in each virtual host
virtualhosts = staticDict.get("virtualhosts")
_connectionCount = 0
_exchangeCount = 0
_queueCount = 0
for virtualHost in virtualhosts:
statistics = virtualHost.get("statistics")
_connectionCount += statistics.get("connectionCount")
_exchangeCount += statistics.get("exchangeCount")
_queueCount += statistics.get("queueCount")
row.append(_connectionCount)
row.append(_exchangeCount)
row.append(_queueCount)
rows.append(row)
title = "Brokers"
if _sortcol:
sorter = Sorter(heads, rows, _sortcol, _limit, _increasing)
dispRows = sorter.getSorted()
else:
dispRows = rows
disp.formattedTable(title, heads, dispRows)
def displayConn(self, jsonObjArray):
disp = Display(prefix=" ")
heads = []
heads.append(Header('client-addr'))
heads.append(Header('client-version'))
heads.append(Header('auth'))
heads.append(Header('last-i/o'))
heads.append(Header('msgIn', Header.KMG))
heads.append(Header('msgOut', Header.KMG))
rows = []
for staticDict in jsonObjArray:
row = []
row.append(staticDict.get("remoteAddress"))
row.append(staticDict.get("clientVersion"))
row.append(staticDict.get("principal"))
statistics = staticDict.get("statistics")
lastIOTime = time.gmtime(long(statistics.get("lastIoTime")) / 1000)
row.append(time.strftime('%m/%d/%y %H:%M:%S', lastIOTime))
row.append(statistics.get("messagesIn"))
row.append(statistics.get("messagesOut"))
rows.append(row)
title = "Connections"
if _sortcol:
sorter = Sorter(heads, rows, _sortcol, _limit, _increasing)
dispRows = sorter.getSorted()
else:
dispRows = rows
disp.formattedTable(title, heads, dispRows)
def displaySession(self, jsonObjArray):
disp = Display(prefix=" ")
heads = []
heads.append(Header('id'))
heads.append(Header('name'))
heads.append(Header('consumers'))
heads.append(Header('unack-msg', Header.KMG))
rows = []
for staticDict in jsonObjArray:
row = []
row.append(staticDict.get("id"))
row.append(staticDict.get("name"))
statistics = staticDict.get("statistics")
row.append(statistics.get("consumerCount"))
row.append(statistics.get("unacknowledgedMessages"))
rows.append(row)
title = "Sessions"
if _sortcol:
sorter = Sorter(heads, rows, _sortcol, _limit, _increasing)
dispRows = sorter.getSorted()
else:
dispRows = rows
disp.formattedTable(title, heads, dispRows)
def displayExchange(self, jsonObjArray):
disp = Display(prefix=" ")
heads = []
heads.append(Header("exchange"))
heads.append(Header("type"))
heads.append(Header("dur", Header.Y))
heads.append(Header("bind", Header.KMG))
heads.append(Header("msgIn", Header.KMG))
heads.append(Header("msgDrop", Header.KMG))
heads.append(Header("byteIn", Header.KMG))
heads.append(Header("byteDrop", Header.KMG))
rows = []
for staticDict in jsonObjArray:
row = []
row.append(staticDict.get("name"))
row.append(staticDict.get("type"))
row.append(staticDict.get("durable"))
statistics = staticDict.get("statistics")
row.append(statistics.get("bindingCount"))
row.append(statistics.get("messagesIn"))
row.append(statistics.get("messagesDropped"))
row.append(statistics.get("bytesIn"))
row.append(statistics.get("bytesDropped"))
rows.append(row)
title = "Exchanges"
if _sortcol:
sorter = Sorter(heads, rows, _sortcol, _limit, _increasing)
dispRows = sorter.getSorted()
else:
dispRows = rows
disp.formattedTable(title, heads, dispRows)
def displayQueue(self, jsonObjArray):
disp = Display(prefix=" ")
heads = []
heads.append(Header("queue"))
heads.append(Header("dur", Header.Y))
heads.append(Header("excl", Header.Y))
heads.append(Header("msg", Header.KMG))
heads.append(Header("msgIn", Header.KMG))
heads.append(Header("msgOut", Header.KMG))
heads.append(Header("bytes", Header.KMG))
heads.append(Header("bytesIn", Header.KMG))
heads.append(Header("bytesOut", Header.KMG))
heads.append(Header("cons", Header.KMG))
heads.append(Header("bind", Header.KMG))
rows = []
for staticDict in jsonObjArray:
row = []
row.append(staticDict.get("name"))
row.append(staticDict.get("durable"))
row.append(staticDict.get("exclusive"))
statistics = staticDict.get("statistics")
row.append(statistics.get("queueDepthMessages"))
row.append(statistics.get("totalEnqueuedMessages"))
row.append(statistics.get("totalDequeuedMessages"))
row.append(statistics.get("queueDepthBytes"))
row.append(statistics.get("totalEnqueuedBytes"))
row.append(statistics.get("totalDequeuedBytes"))
row.append(statistics.get("consumerCount"))
row.append(statistics.get("bindingCount"))
rows.append(row)
title = "Queues"
if _sortcol:
sorter = Sorter(heads, rows, _sortcol, _limit, _increasing)
dispRows = sorter.getSorted()
else:
dispRows = rows
disp.formattedTable(title, heads, dispRows)
def displayMain(self, main, jsonObject):
if main == 'b': self.displayBroker(jsonObject)
elif main == 'c': self.displayConn(jsonObject)
elif main == 's': self.displaySession(jsonObject)
elif main == 'e': self.displayExchange(jsonObject)
elif main == 'q': self.displayQueue(jsonObject)
class RestManager(Console):
def __init__(self):
self._host = None
self._port = 0
def setPort(self, port):
self._port = port
def setHost(self, host):
self._host = host
def execute(self, service):
if (self._port is None):
httpConn = httplib.HTTPConnection(self._host)
else:
httpConn = httplib.HTTPConnection(self._host, self._port)
if (_connTimeout is not None):
httpConn.timeout = _connTimeout
httpConn.connect()
httpConn.request("GET", "/rest/" + service)
response = httpConn.getresponse()
if (response.status != 200):
print "Unable to post request to server!"
print response.reason
sys.exit(1)
return response
##
## Main Program
##
try:
longOpts = ("top", "numeric", "sort-by=", "limit=", "increasing", "timeout=")
(optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "bceqsS:L:I", longOpts)
except:
Usage()
try:
encoding = locale.getpreferredencoding()
cargs = [a.decode(encoding) for a in encArgs]
except:
cargs = encArgs
for opt in optlist:
if opt[0] == "--timeout":
_connTimeout = int(opt[1])
if _connTimeout == 0:
_connTimeout = None
elif opt[0] == "-n" or opt[0] == "--numeric":
_numeric = True
elif opt[0] == "-S" or opt[0] == "--sort-by":
_sortcol = opt[1]
elif opt[0] == "-I" or opt[0] == "--increasing":
_increasing = True
elif opt[0] == "-L" or opt[0] == "--limit":
_limit = int(opt[1])
elif len(opt[0]) == 2:
char = opt[0][1]
if "bcseqs".find(char) != -1:
_types += char
else:
Usage()
else:
Usage()
if len(_types) == 0:
Usage()
nargs = len(cargs)
rm = RestManager()
_user = None
_password = None
_port = 8180
if nargs == 1:
_broker_addr = cargs[0]
_host = _broker_addr
# check for username
if _host.find("@") != -1:
tokens = _host.split("@")
_user = tokens[0]
_host = tokens[1]
# check for password
if _user.find("/") != -1:
tokens = _user.split("/")
_user = tokens[0]
_password = tokens[1]
# check for port
if _host.find(":") != -1:
tokens = _host.split(":")
_host = tokens[0]
_port = int(tokens[1])
try:
rm.setHost(_host)
rm.setPort(_port)
# determine which REST service we will be utilizing
_serviceEndpoint = None
if _types[0] == 'b': _serviceEndpoint = "broker"
elif _types[0] == 'c': _serviceEndpoint = "connection"
elif _types[0] == 's': _serviceEndpoint = "session"
elif _types[0] == 'e': _serviceEndpoint = "exchange"
elif _types[0] == 'q': _serviceEndpoint = "queue"
response = rm.execute(_serviceEndpoint)
# evaluate the JSON
jsonStr = response.read()
jsonObjArray = json.loads(jsonStr)
# display construct
dm = DisplayManager()
dm.displayMain(_types[0], jsonObjArray)
except KeyboardInterrupt:
print
except Exception,e:
print "Failed: %s - %s" % (e.__class__.__name__, e)
sys.exit(1)