Issue #1844 - update qpid-queue-count and qpid-stat to use the new REST-based ws
- removed debug print statements - currently, authenticating against the REST service is not supported; need to setup a qpid that requires REST authentication to ensure proper implementation Change-Id: I9790f7a6678f30892ee3e5a29bcc8f2b5606b478 Former-commit-id:e5ee38d896
[formerlye5ee38d896
[formerly a45c818e6df529fb4e85f76cd5155a48c67fe0d9]] Former-commit-id:6811a31cd5
Former-commit-id:02710de565
This commit is contained in:
parent
3a8e2159de
commit
5e92cd512c
2 changed files with 239 additions and 507 deletions
|
@ -25,6 +25,8 @@ import sys
|
|||
import locale
|
||||
import socket
|
||||
import re
|
||||
import httplib
|
||||
import json
|
||||
from qmf.console import Session, Console
|
||||
from qpid.disp import Display, Header, Sorter
|
||||
|
||||
|
@ -53,213 +55,36 @@ def Usage ():
|
|||
print
|
||||
sys.exit (1)
|
||||
|
||||
class IpAddr:
|
||||
def __init__(self, text):
|
||||
if text.find("@") != -1:
|
||||
tokens = text.split("@")
|
||||
text = tokens[1]
|
||||
if text.find(":") != -1:
|
||||
tokens = text.split(":")
|
||||
text = tokens[0]
|
||||
self.port = int(tokens[1])
|
||||
else:
|
||||
self.port = 5672
|
||||
self.dottedQuad = socket.gethostbyname(text)
|
||||
nums = self.dottedQuad.split(".")
|
||||
self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3])
|
||||
|
||||
def bestAddr(self, addrPortList):
|
||||
bestDiff = 0xFFFFFFFFL
|
||||
bestAddr = None
|
||||
for addrPort in addrPortList:
|
||||
diff = IpAddr(addrPort[0]).addr ^ self.addr
|
||||
if diff < bestDiff:
|
||||
bestDiff = diff
|
||||
bestAddr = addrPort
|
||||
return bestAddr
|
||||
|
||||
class Broker(object):
|
||||
def __init__(self, qmf, broker):
|
||||
self.broker = broker
|
||||
|
||||
agents = qmf.getAgents()
|
||||
for a in agents:
|
||||
if a.getAgentBank() == 0:
|
||||
self.brokerAgent = a
|
||||
|
||||
bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker", _agent=self.brokerAgent)[0]
|
||||
self.currentTime = bobj.getTimestamps()[0]
|
||||
try:
|
||||
self.uptime = bobj.uptime
|
||||
except:
|
||||
self.uptime = 0
|
||||
self.connections = {}
|
||||
self.sessions = {}
|
||||
self.exchanges = {}
|
||||
self.queues = {}
|
||||
package = "org.apache.qpid.broker"
|
||||
|
||||
list = qmf.getObjects(_class="connection", _package=package, _agent=self.brokerAgent)
|
||||
for conn in list:
|
||||
if pattern.match(conn.address):
|
||||
self.connections[conn.getObjectId()] = conn
|
||||
|
||||
list = qmf.getObjects(_class="session", _package=package, _agent=self.brokerAgent)
|
||||
for sess in list:
|
||||
if sess.connectionRef in self.connections:
|
||||
self.sessions[sess.getObjectId()] = sess
|
||||
|
||||
list = qmf.getObjects(_class="exchange", _package=package, _agent=self.brokerAgent)
|
||||
for exchange in list:
|
||||
self.exchanges[exchange.getObjectId()] = exchange
|
||||
|
||||
list = qmf.getObjects(_class="queue", _package=package, _agent=self.brokerAgent)
|
||||
for queue in list:
|
||||
self.queues[queue.getObjectId()] = queue
|
||||
|
||||
def getName(self):
|
||||
return self.broker.getUrl()
|
||||
|
||||
def getCurrentTime(self):
|
||||
return self.currentTime
|
||||
|
||||
def getUptime(self):
|
||||
return self.uptime
|
||||
|
||||
class BrokerManager(Console):
|
||||
class RestManager(Console):
|
||||
def __init__(self):
|
||||
self.brokerName = None
|
||||
self.qmf = None
|
||||
self.broker = None
|
||||
self.brokers = []
|
||||
self.cluster = None
|
||||
self._host = None
|
||||
self._port = 0
|
||||
|
||||
def SetBroker(self, brokerUrl):
|
||||
self.url = brokerUrl
|
||||
self.qmf = Session()
|
||||
self.broker = self.qmf.addBroker(brokerUrl, _connTimeout)
|
||||
agents = self.qmf.getAgents()
|
||||
for a in agents:
|
||||
if a.getAgentBank() == 0:
|
||||
self.brokerAgent = a
|
||||
def setPort(self, port):
|
||||
self._port = port
|
||||
|
||||
def Disconnect(self):
|
||||
if self.broker:
|
||||
self.qmf.delBroker(self.broker)
|
||||
|
||||
def _getCluster(self):
|
||||
packages = self.qmf.getPackages()
|
||||
if "org.apache.qpid.cluster" not in packages:
|
||||
return None
|
||||
|
||||
clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
|
||||
if len(clusters) == 0:
|
||||
print "Clustering is installed but not enabled on the broker."
|
||||
return None
|
||||
|
||||
self.cluster = clusters[0]
|
||||
|
||||
def _getHostList(self, urlList):
|
||||
hosts = []
|
||||
hostAddr = IpAddr(_host)
|
||||
for url in urlList:
|
||||
if url.find("amqp:") != 0:
|
||||
raise Exception("Invalid URL 1")
|
||||
url = url[5:]
|
||||
addrs = str(url).split(",")
|
||||
addrList = []
|
||||
for addr in addrs:
|
||||
tokens = addr.split(":")
|
||||
if len(tokens) != 3:
|
||||
raise Exception("Invalid URL 2")
|
||||
addrList.append((tokens[1], tokens[2]))
|
||||
|
||||
# Find the address in the list that is most likely to be in the same subnet as the address
|
||||
# with which we made the original QMF connection. This increases the probability that we will
|
||||
# be able to reach the cluster member.
|
||||
|
||||
best = hostAddr.bestAddr(addrList)
|
||||
bestUrl = best[0] + ":" + best[1]
|
||||
hosts.append(bestUrl)
|
||||
return hosts
|
||||
|
||||
def displaySubs(self, subs, indent, broker=None, conn=None, sess=None, exchange=None, queue=None):
|
||||
if len(subs) == 0:
|
||||
return
|
||||
this = subs[0]
|
||||
remaining = subs[1:]
|
||||
newindent = indent + " "
|
||||
if this == 'b':
|
||||
pass
|
||||
elif this == 'c':
|
||||
if broker:
|
||||
for oid in broker.connections:
|
||||
iconn = broker.connections[oid]
|
||||
self.printConnSub(indent, broker.getName(), iconn)
|
||||
self.displaySubs(remaining, newindent, broker=broker, conn=iconn,
|
||||
sess=sess, exchange=exchange, queue=queue)
|
||||
elif this == 's':
|
||||
pass
|
||||
elif this == 'e':
|
||||
pass
|
||||
elif this == 'q':
|
||||
pass
|
||||
print
|
||||
|
||||
def displayQueue(self, subs):
|
||||
disp = Display(prefix=" ")
|
||||
heads = []
|
||||
if self.cluster:
|
||||
heads.append(Header('broker'))
|
||||
heads.append(Header("queue"))
|
||||
heads.append(Header("msg", Header.KMG))
|
||||
heads.append(Header("bytes", Header.KMG))
|
||||
heads.append(Header("cons", Header.KMG))
|
||||
rows = []
|
||||
for broker in self.brokers:
|
||||
for oid in broker.queues:
|
||||
q = broker.queues[oid]
|
||||
row = []
|
||||
if self.cluster:
|
||||
row.append(broker.getName())
|
||||
row.append(q.name)
|
||||
row.append(q.msgDepth)
|
||||
row.append(q.byteDepth)
|
||||
row.append(q.consumerCount)
|
||||
rows.append(row)
|
||||
title = "Queues"
|
||||
if self.cluster:
|
||||
title += " for cluster '%s'" % self.cluster.clusterName
|
||||
if _sortcol:
|
||||
sorter = Sorter(heads, rows, _sortcol, _limit, _increasing)
|
||||
dispRows = sorter.getSorted()
|
||||
def setHost(self, host):
|
||||
self._host = host
|
||||
|
||||
def execute(self, service):
|
||||
if (self._port is None):
|
||||
httpConn = httplib.HTTPConnection(self._host)
|
||||
else:
|
||||
dispRows = rows
|
||||
disp.formattedTable(title, heads, dispRows)
|
||||
|
||||
def displayMain(self, main, subs):
|
||||
self.displayQueue(subs)
|
||||
|
||||
def display(self):
|
||||
self._getCluster()
|
||||
if self.cluster:
|
||||
memberList = self.cluster.members.split(";")
|
||||
hostList = self._getHostList(memberList)
|
||||
self.qmf.delBroker(self.broker)
|
||||
self.broker = None
|
||||
if _host.find("@") > 0:
|
||||
authString = _host.split("@")[0] + "@"
|
||||
else:
|
||||
authString = ""
|
||||
for host in hostList:
|
||||
b = self.qmf.addBroker(authString + host, _connTimeout)
|
||||
self.brokers.append(Broker(self.qmf, b))
|
||||
else:
|
||||
self.brokers.append(Broker(self.qmf, self.broker))
|
||||
|
||||
self.displayMain(_types[0], _types[1:])
|
||||
httpConn = httplib.HTTPConnection(self._host, self._port)
|
||||
|
||||
if (_connTimeout is not None):
|
||||
httpConn.timeout = _connTimeout
|
||||
|
||||
httpConn.connect()
|
||||
httpConn.request("GET", "/rest/" + service)
|
||||
response = httpConn.getresponse()
|
||||
|
||||
if (response.status != 200):
|
||||
print "Unable to post request to server!"
|
||||
print response.reason
|
||||
sys.exit(1)
|
||||
|
||||
return response
|
||||
##
|
||||
## Main Program
|
||||
##
|
||||
|
@ -301,18 +126,69 @@ if len(_types) == 0:
|
|||
_types='q'
|
||||
|
||||
nargs = len(cargs)
|
||||
bm = BrokerManager()
|
||||
rm = RestManager()
|
||||
_user = None
|
||||
_password = None
|
||||
_port = 8180
|
||||
|
||||
if nargs == 1:
|
||||
_host = cargs[0]
|
||||
_broker_addr = cargs[0]
|
||||
_host = _broker_addr
|
||||
|
||||
# check for username
|
||||
if _host.find("@") != -1:
|
||||
tokens = _host.split("@")
|
||||
_user = tokens[0]
|
||||
_host = tokens[1]
|
||||
# check for password
|
||||
if _user.find("/") != -1:
|
||||
tokens = _user.split("/")
|
||||
_user = tokens[0]
|
||||
_password = tokens[1]
|
||||
# check for port
|
||||
if _host.find(":") != -1:
|
||||
tokens = _host.split(":")
|
||||
_host = tokens[0]
|
||||
_port = int(tokens[1])
|
||||
|
||||
try:
|
||||
bm.SetBroker(_host)
|
||||
bm.display()
|
||||
rm.setHost(_host)
|
||||
rm.setPort(_port)
|
||||
response = rm.execute('queue')
|
||||
|
||||
# evaluate the JSON
|
||||
jsonStr = response.read()
|
||||
jsonObjArray = json.loads(jsonStr)
|
||||
|
||||
# create rows of queues to display
|
||||
rows = []
|
||||
for staticDict in jsonObjArray:
|
||||
row = []
|
||||
row.append(staticDict.get("name"))
|
||||
statistics = staticDict.get("statistics")
|
||||
row.append(statistics.get("queueDepthMessages"))
|
||||
row.append(statistics.get("queueDepthBytes"))
|
||||
row.append(statistics.get("consumerCount"))
|
||||
rows.append(row)
|
||||
|
||||
# sort the queues if required
|
||||
if _sortcol:
|
||||
sorter = Sorter(heads, rows, _sortcol, _limit, _increasing)
|
||||
dispRows = sorter.getSorted()
|
||||
else:
|
||||
dispRows = rows
|
||||
|
||||
# prepare to build & display the table
|
||||
disp = Display(prefix=" ")
|
||||
# table header
|
||||
heads = []
|
||||
heads.append(Header("queue"))
|
||||
heads.append(Header("msg", Header.KMG))
|
||||
heads.append(Header("bytes", Header.KMG))
|
||||
heads.append(Header("cons", Header.KMG))
|
||||
disp.formattedTable("Queues", heads, dispRows)
|
||||
except KeyboardInterrupt:
|
||||
print
|
||||
except Exception,e:
|
||||
print "Failed: %s - %s" % (e.__class__.__name__, e)
|
||||
sys.exit(1)
|
||||
|
||||
bm.Disconnect()
|
||||
|
|
|
@ -25,6 +25,10 @@ import sys
|
|||
import locale
|
||||
import socket
|
||||
import re
|
||||
import httplib
|
||||
import json
|
||||
import time
|
||||
from datetime import datetime
|
||||
from qmf.console import Session, Console
|
||||
from qpid.disp import Display, Header, Sorter
|
||||
|
||||
|
@ -60,182 +64,33 @@ def Usage ():
|
|||
print
|
||||
sys.exit (1)
|
||||
|
||||
class IpAddr:
|
||||
def __init__(self, text):
|
||||
if text.find("@") != -1:
|
||||
tokens = text.split("@")
|
||||
text = tokens[1]
|
||||
if text.find(":") != -1:
|
||||
tokens = text.split(":")
|
||||
text = tokens[0]
|
||||
self.port = int(tokens[1])
|
||||
else:
|
||||
self.port = 5672
|
||||
self.dottedQuad = socket.gethostbyname(text)
|
||||
nums = self.dottedQuad.split(".")
|
||||
self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3])
|
||||
class DisplayManager(Console):
|
||||
|
||||
def bestAddr(self, addrPortList):
|
||||
bestDiff = 0xFFFFFFFFL
|
||||
bestAddr = None
|
||||
for addrPort in addrPortList:
|
||||
diff = IpAddr(addrPort[0]).addr ^ self.addr
|
||||
if diff < bestDiff:
|
||||
bestDiff = diff
|
||||
bestAddr = addrPort
|
||||
return bestAddr
|
||||
|
||||
class Broker(object):
|
||||
def __init__(self, qmf, broker):
|
||||
self.broker = broker
|
||||
|
||||
agents = qmf.getAgents()
|
||||
for a in agents:
|
||||
if a.getAgentBank() == 0:
|
||||
self.brokerAgent = a
|
||||
|
||||
bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker", _agent=self.brokerAgent)[0]
|
||||
self.currentTime = bobj.getTimestamps()[0]
|
||||
try:
|
||||
self.uptime = bobj.uptime
|
||||
except:
|
||||
self.uptime = 0
|
||||
self.connections = {}
|
||||
self.sessions = {}
|
||||
self.exchanges = {}
|
||||
self.queues = {}
|
||||
self.subscriptions = {}
|
||||
package = "org.apache.qpid.broker"
|
||||
|
||||
list = qmf.getObjects(_class="connection", _package=package, _agent=self.brokerAgent)
|
||||
for conn in list:
|
||||
self.connections[conn.getObjectId()] = conn
|
||||
|
||||
list = qmf.getObjects(_class="session", _package=package, _agent=self.brokerAgent)
|
||||
for sess in list:
|
||||
if sess.connectionRef in self.connections:
|
||||
self.sessions[sess.getObjectId()] = sess
|
||||
|
||||
list = qmf.getObjects(_class="exchange", _package=package, _agent=self.brokerAgent)
|
||||
for exchange in list:
|
||||
self.exchanges[exchange.getObjectId()] = exchange
|
||||
|
||||
list = qmf.getObjects(_class="queue", _package=package, _agent=self.brokerAgent)
|
||||
for queue in list:
|
||||
self.queues[queue.getObjectId()] = queue
|
||||
|
||||
list = qmf.getObjects(_class="subscription", _package=package, _agent=self.brokerAgent)
|
||||
for subs in list:
|
||||
self.subscriptions[subs.getObjectId()] = subs
|
||||
|
||||
def getName(self):
|
||||
return self.broker.getUrl()
|
||||
|
||||
def getCurrentTime(self):
|
||||
return self.currentTime
|
||||
|
||||
def getUptime(self):
|
||||
return self.uptime
|
||||
|
||||
class BrokerManager(Console):
|
||||
def __init__(self):
|
||||
self.brokerName = None
|
||||
self.qmf = None
|
||||
self.broker = None
|
||||
self.brokers = []
|
||||
self.cluster = None
|
||||
|
||||
def SetBroker(self, brokerUrl):
|
||||
self.url = brokerUrl
|
||||
self.qmf = Session()
|
||||
self.broker = self.qmf.addBroker(brokerUrl, _connTimeout)
|
||||
agents = self.qmf.getAgents()
|
||||
for a in agents:
|
||||
if a.getAgentBank() == 0:
|
||||
self.brokerAgent = a
|
||||
|
||||
def Disconnect(self):
|
||||
if self.broker:
|
||||
self.qmf.delBroker(self.broker)
|
||||
|
||||
def _getCluster(self):
|
||||
packages = self.qmf.getPackages()
|
||||
if "org.apache.qpid.cluster" not in packages:
|
||||
return None
|
||||
|
||||
clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
|
||||
if len(clusters) == 0:
|
||||
print "Clustering is installed but not enabled on the broker."
|
||||
return None
|
||||
|
||||
self.cluster = clusters[0]
|
||||
|
||||
def _getHostList(self, urlList):
|
||||
hosts = []
|
||||
hostAddr = IpAddr(_host)
|
||||
for url in urlList:
|
||||
if url.find("amqp:") != 0:
|
||||
raise Exception("Invalid URL 1")
|
||||
url = url[5:]
|
||||
addrs = str(url).split(",")
|
||||
addrList = []
|
||||
for addr in addrs:
|
||||
tokens = addr.split(":")
|
||||
if len(tokens) != 3:
|
||||
raise Exception("Invalid URL 2")
|
||||
addrList.append((tokens[1], tokens[2]))
|
||||
|
||||
# Find the address in the list that is most likely to be in the same subnet as the address
|
||||
# with which we made the original QMF connection. This increases the probability that we will
|
||||
# be able to reach the cluster member.
|
||||
|
||||
best = hostAddr.bestAddr(addrList)
|
||||
bestUrl = best[0] + ":" + best[1]
|
||||
hosts.append(bestUrl)
|
||||
return hosts
|
||||
|
||||
def displaySubs(self, subs, indent, broker=None, conn=None, sess=None, exchange=None, queue=None):
|
||||
if len(subs) == 0:
|
||||
return
|
||||
this = subs[0]
|
||||
remaining = subs[1:]
|
||||
newindent = indent + " "
|
||||
if this == 'b':
|
||||
pass
|
||||
elif this == 'c':
|
||||
if broker:
|
||||
for oid in broker.connections:
|
||||
iconn = broker.connections[oid]
|
||||
self.printConnSub(indent, broker.getName(), iconn)
|
||||
self.displaySubs(remaining, newindent, broker=broker, conn=iconn,
|
||||
sess=sess, exchange=exchange, queue=queue)
|
||||
elif this == 's':
|
||||
pass
|
||||
elif this == 'e':
|
||||
pass
|
||||
elif this == 'q':
|
||||
pass
|
||||
print
|
||||
|
||||
def displayBroker(self, subs):
|
||||
def displayBroker(self, jsonObjArray):
|
||||
disp = Display(prefix=" ")
|
||||
heads = []
|
||||
heads.append(Header('broker'))
|
||||
heads.append(Header('cluster'))
|
||||
heads.append(Header('uptime', Header.DURATION))
|
||||
heads.append(Header('conn', Header.KMG))
|
||||
heads.append(Header('sess', Header.KMG))
|
||||
heads.append(Header('exch', Header.KMG))
|
||||
heads.append(Header('queue', Header.KMG))
|
||||
rows = []
|
||||
for broker in self.brokers:
|
||||
if self.cluster:
|
||||
ctext = "%s(%s)" % (self.cluster.clusterName, self.cluster.status)
|
||||
else:
|
||||
ctext = "<standalone>"
|
||||
row = (broker.getName(), ctext, broker.getUptime(),
|
||||
len(broker.connections), len(broker.sessions),
|
||||
len(broker.exchanges), len(broker.queues))
|
||||
for staticDict in jsonObjArray:
|
||||
row = []
|
||||
row.append(staticDict.get("name"))
|
||||
# count the connections, exchanges, and queues
|
||||
# in each virtual host
|
||||
virtualhosts = staticDict.get("virtualhosts")
|
||||
_connectionCount = 0
|
||||
_exchangeCount = 0
|
||||
_queueCount = 0
|
||||
for virtualHost in virtualhosts:
|
||||
statistics = virtualHost.get("statistics")
|
||||
_connectionCount += statistics.get("connectionCount")
|
||||
_exchangeCount += statistics.get("exchangeCount")
|
||||
_queueCount += statistics.get("queueCount")
|
||||
row.append(_connectionCount)
|
||||
row.append(_exchangeCount)
|
||||
row.append(_queueCount)
|
||||
rows.append(row)
|
||||
title = "Brokers"
|
||||
if _sortcol:
|
||||
|
@ -245,39 +100,28 @@ class BrokerManager(Console):
|
|||
dispRows = rows
|
||||
disp.formattedTable(title, heads, dispRows)
|
||||
|
||||
def displayConn(self, subs):
|
||||
def displayConn(self, jsonObjArray):
|
||||
disp = Display(prefix=" ")
|
||||
heads = []
|
||||
if self.cluster:
|
||||
heads.append(Header('broker'))
|
||||
heads.append(Header('client-addr'))
|
||||
heads.append(Header('cproc'))
|
||||
heads.append(Header('cpid'))
|
||||
heads.append(Header('client-version'))
|
||||
heads.append(Header('auth'))
|
||||
heads.append(Header('connected', Header.DURATION))
|
||||
heads.append(Header('idle', Header.DURATION))
|
||||
heads.append(Header('last-i/o'))
|
||||
heads.append(Header('msgIn', Header.KMG))
|
||||
heads.append(Header('msgOut', Header.KMG))
|
||||
rows = []
|
||||
for broker in self.brokers:
|
||||
for oid in broker.connections:
|
||||
conn = broker.connections[oid]
|
||||
row = []
|
||||
if self.cluster:
|
||||
row.append(broker.getName())
|
||||
row.append(conn.address)
|
||||
row.append(conn.remoteProcessName)
|
||||
row.append(conn.remotePid)
|
||||
row.append(conn.authIdentity)
|
||||
row.append(broker.getCurrentTime() - conn.getTimestamps()[1])
|
||||
idle = broker.getCurrentTime() - conn.getTimestamps()[0]
|
||||
row.append(broker.getCurrentTime() - conn.getTimestamps()[0])
|
||||
row.append(conn.framesFromClient)
|
||||
row.append(conn.framesToClient)
|
||||
rows.append(row)
|
||||
for staticDict in jsonObjArray:
|
||||
row = []
|
||||
row.append(staticDict.get("remoteAddress"))
|
||||
row.append(staticDict.get("clientVersion"))
|
||||
row.append(staticDict.get("principal"))
|
||||
statistics = staticDict.get("statistics")
|
||||
lastIOTime = time.gmtime(long(statistics.get("lastIoTime")) / 1000)
|
||||
row.append(time.strftime('%m/%d/%y %H:%M:%S', lastIOTime))
|
||||
row.append(statistics.get("messagesIn"))
|
||||
row.append(statistics.get("messagesOut"))
|
||||
rows.append(row)
|
||||
title = "Connections"
|
||||
if self.cluster:
|
||||
title += " for cluster '%s'" % self.cluster.clusterName
|
||||
if _sortcol:
|
||||
sorter = Sorter(heads, rows, _sortcol, _limit, _increasing)
|
||||
dispRows = sorter.getSorted()
|
||||
|
@ -285,38 +129,23 @@ class BrokerManager(Console):
|
|||
dispRows = rows
|
||||
disp.formattedTable(title, heads, dispRows)
|
||||
|
||||
def displaySubscription(self, subs):
|
||||
def displaySession(self, jsonObjArray):
|
||||
disp = Display(prefix=" ")
|
||||
heads = []
|
||||
if self.cluster:
|
||||
heads.append(Header('broker'))
|
||||
heads.append(Header('connection'))
|
||||
heads.append(Header('session'))
|
||||
heads.append(Header('queue'))
|
||||
heads.append(Header('connected', Header.DURATION))
|
||||
heads.append(Header('msg', Header.KMG))
|
||||
heads.append(Header('id'))
|
||||
heads.append(Header('name'))
|
||||
heads.append(Header('consumers'))
|
||||
heads.append(Header('unack-msg', Header.KMG))
|
||||
rows = []
|
||||
for broker in self.brokers:
|
||||
for oid in broker.subscriptions:
|
||||
subs = broker.subscriptions[oid]
|
||||
sess = broker.sessions[subs.sessionRef]
|
||||
conn = broker.connections[sess.connectionRef]
|
||||
q = broker.queues[subs.queueRef]
|
||||
|
||||
row = []
|
||||
|
||||
if self.cluster:
|
||||
row.append(broker.getName())
|
||||
|
||||
row.append(conn.address)
|
||||
row.append(sess.name)
|
||||
row.append(q.name)
|
||||
row.append(broker.getCurrentTime() - conn.getTimestamps()[1])
|
||||
row.append(q.msgDepth)
|
||||
rows.append(row)
|
||||
title = "Subscriptions"
|
||||
if self.cluster:
|
||||
title += " for cluster '%s'" % self.cluster.clusterName
|
||||
for staticDict in jsonObjArray:
|
||||
row = []
|
||||
row.append(staticDict.get("id"))
|
||||
row.append(staticDict.get("name"))
|
||||
statistics = staticDict.get("statistics")
|
||||
row.append(statistics.get("consumerCount"))
|
||||
row.append(statistics.get("unacknowledgedMessages"))
|
||||
rows.append(row)
|
||||
title = "Sessions"
|
||||
if _sortcol:
|
||||
sorter = Sorter(heads, rows, _sortcol, _limit, _increasing)
|
||||
dispRows = sorter.getSorted()
|
||||
|
@ -324,42 +153,31 @@ class BrokerManager(Console):
|
|||
dispRows = rows
|
||||
disp.formattedTable(title, heads, dispRows)
|
||||
|
||||
def displayExchange(self, subs):
|
||||
def displayExchange(self, jsonObjArray):
|
||||
disp = Display(prefix=" ")
|
||||
heads = []
|
||||
if self.cluster:
|
||||
heads.append(Header('broker'))
|
||||
heads.append(Header("exchange"))
|
||||
heads.append(Header("type"))
|
||||
heads.append(Header("dur", Header.Y))
|
||||
heads.append(Header("bind", Header.KMG))
|
||||
heads.append(Header("msgIn", Header.KMG))
|
||||
heads.append(Header("msgOut", Header.KMG))
|
||||
heads.append(Header("msgDrop", Header.KMG))
|
||||
heads.append(Header("byteIn", Header.KMG))
|
||||
heads.append(Header("byteOut", Header.KMG))
|
||||
heads.append(Header("byteDrop", Header.KMG))
|
||||
rows = []
|
||||
for broker in self.brokers:
|
||||
for oid in broker.exchanges:
|
||||
ex = broker.exchanges[oid]
|
||||
row = []
|
||||
if self.cluster:
|
||||
row.append(broker.getName())
|
||||
row.append(ex.name)
|
||||
row.append(ex.type)
|
||||
row.append(ex.durable)
|
||||
row.append(ex.bindingCount)
|
||||
row.append(ex.msgReceives)
|
||||
row.append(ex.msgRoutes)
|
||||
row.append(ex.msgDrops)
|
||||
row.append(ex.byteReceives)
|
||||
row.append(ex.byteRoutes)
|
||||
row.append(ex.byteDrops)
|
||||
rows.append(row)
|
||||
for staticDict in jsonObjArray:
|
||||
row = []
|
||||
row.append(staticDict.get("name"))
|
||||
row.append(staticDict.get("type"))
|
||||
row.append(staticDict.get("durable"))
|
||||
statistics = staticDict.get("statistics")
|
||||
row.append(statistics.get("bindingCount"))
|
||||
row.append(statistics.get("messagesIn"))
|
||||
row.append(statistics.get("messagesDropped"))
|
||||
row.append(statistics.get("bytesIn"))
|
||||
row.append(statistics.get("bytesDropped"))
|
||||
rows.append(row)
|
||||
title = "Exchanges"
|
||||
if self.cluster:
|
||||
title += " for cluster '%s'" % self.cluster.clusterName
|
||||
if _sortcol:
|
||||
sorter = Sorter(heads, rows, _sortcol, _limit, _increasing)
|
||||
dispRows = sorter.getSorted()
|
||||
|
@ -367,14 +185,11 @@ class BrokerManager(Console):
|
|||
dispRows = rows
|
||||
disp.formattedTable(title, heads, dispRows)
|
||||
|
||||
def displayQueue(self, subs):
|
||||
def displayQueue(self, jsonObjArray):
|
||||
disp = Display(prefix=" ")
|
||||
heads = []
|
||||
if self.cluster:
|
||||
heads.append(Header('broker'))
|
||||
heads.append(Header("queue"))
|
||||
heads.append(Header("dur", Header.Y))
|
||||
heads.append(Header("autoDel", Header.Y))
|
||||
heads.append(Header("excl", Header.Y))
|
||||
heads.append(Header("msg", Header.KMG))
|
||||
heads.append(Header("msgIn", Header.KMG))
|
||||
|
@ -385,28 +200,22 @@ class BrokerManager(Console):
|
|||
heads.append(Header("cons", Header.KMG))
|
||||
heads.append(Header("bind", Header.KMG))
|
||||
rows = []
|
||||
for broker in self.brokers:
|
||||
for oid in broker.queues:
|
||||
q = broker.queues[oid]
|
||||
row = []
|
||||
if self.cluster:
|
||||
row.append(broker.getName())
|
||||
row.append(q.name)
|
||||
row.append(q.durable)
|
||||
row.append(q.autoDelete)
|
||||
row.append(q.exclusive)
|
||||
row.append(q.msgDepth)
|
||||
row.append(q.msgTotalEnqueues)
|
||||
row.append(q.msgTotalDequeues)
|
||||
row.append(q.byteDepth)
|
||||
row.append(q.byteTotalEnqueues)
|
||||
row.append(q.byteTotalDequeues)
|
||||
row.append(q.consumerCount)
|
||||
row.append(q.bindingCount)
|
||||
rows.append(row)
|
||||
for staticDict in jsonObjArray:
|
||||
row = []
|
||||
row.append(staticDict.get("name"))
|
||||
row.append(staticDict.get("durable"))
|
||||
row.append(staticDict.get("exclusive"))
|
||||
statistics = staticDict.get("statistics")
|
||||
row.append(statistics.get("queueDepthMessages"))
|
||||
row.append(statistics.get("totalEnqueuedMessages"))
|
||||
row.append(statistics.get("totalDequeuedMessages"))
|
||||
row.append(statistics.get("queueDepthBytes"))
|
||||
row.append(statistics.get("totalEnqueuedBytes"))
|
||||
row.append(statistics.get("totalDequeuedBytes"))
|
||||
row.append(statistics.get("consumerCount"))
|
||||
row.append(statistics.get("bindingCount"))
|
||||
rows.append(row)
|
||||
title = "Queues"
|
||||
if self.cluster:
|
||||
title += " for cluster '%s'" % self.cluster.clusterName
|
||||
if _sortcol:
|
||||
sorter = Sorter(heads, rows, _sortcol, _limit, _increasing)
|
||||
dispRows = sorter.getSorted()
|
||||
|
@ -414,32 +223,43 @@ class BrokerManager(Console):
|
|||
dispRows = rows
|
||||
disp.formattedTable(title, heads, dispRows)
|
||||
|
||||
def displayMain(self, main, subs):
|
||||
if main == 'b': self.displayBroker(subs)
|
||||
elif main == 'c': self.displayConn(subs)
|
||||
elif main == 's': self.displaySubscription(subs)
|
||||
elif main == 'e': self.displayExchange(subs)
|
||||
elif main == 'q': self.displayQueue(subs)
|
||||
def displayMain(self, main, jsonObject):
|
||||
if main == 'b': self.displayBroker(jsonObject)
|
||||
elif main == 'c': self.displayConn(jsonObject)
|
||||
elif main == 's': self.displaySession(jsonObject)
|
||||
elif main == 'e': self.displayExchange(jsonObject)
|
||||
elif main == 'q': self.displayQueue(jsonObject)
|
||||
|
||||
def display(self):
|
||||
self._getCluster()
|
||||
if self.cluster:
|
||||
memberList = self.cluster.members.split(";")
|
||||
hostList = self._getHostList(memberList)
|
||||
self.qmf.delBroker(self.broker)
|
||||
self.broker = None
|
||||
if _host.find("@") > 0:
|
||||
authString = _host.split("@")[0] + "@"
|
||||
else:
|
||||
authString = ""
|
||||
for host in hostList:
|
||||
b = self.qmf.addBroker(authString + host, _connTimeout)
|
||||
self.brokers.append(Broker(self.qmf, b))
|
||||
class RestManager(Console):
|
||||
def __init__(self):
|
||||
self._host = None
|
||||
self._port = 0
|
||||
|
||||
def setPort(self, port):
|
||||
self._port = port
|
||||
|
||||
def setHost(self, host):
|
||||
self._host = host
|
||||
|
||||
def execute(self, service):
|
||||
if (self._port is None):
|
||||
httpConn = httplib.HTTPConnection(self._host)
|
||||
else:
|
||||
self.brokers.append(Broker(self.qmf, self.broker))
|
||||
httpConn = httplib.HTTPConnection(self._host, self._port)
|
||||
|
||||
self.displayMain(_types[0], _types[1:])
|
||||
if (_connTimeout is not None):
|
||||
httpConn.timeout = _connTimeout
|
||||
|
||||
httpConn.connect()
|
||||
httpConn.request("GET", "/rest/" + service)
|
||||
response = httpConn.getresponse()
|
||||
|
||||
if (response.status != 200):
|
||||
print "Unable to post request to server!"
|
||||
print response.reason
|
||||
sys.exit(1)
|
||||
|
||||
return response
|
||||
|
||||
##
|
||||
## Main Program
|
||||
|
@ -483,18 +303,54 @@ if len(_types) == 0:
|
|||
Usage()
|
||||
|
||||
nargs = len(cargs)
|
||||
bm = BrokerManager()
|
||||
rm = RestManager()
|
||||
_user = None
|
||||
_password = None
|
||||
_port = 8180
|
||||
|
||||
if nargs == 1:
|
||||
_host = cargs[0]
|
||||
_broker_addr = cargs[0]
|
||||
_host = _broker_addr
|
||||
|
||||
# check for username
|
||||
if _host.find("@") != -1:
|
||||
tokens = _host.split("@")
|
||||
_user = tokens[0]
|
||||
_host = tokens[1]
|
||||
# check for password
|
||||
if _user.find("/") != -1:
|
||||
tokens = _user.split("/")
|
||||
_user = tokens[0]
|
||||
_password = tokens[1]
|
||||
# check for port
|
||||
if _host.find(":") != -1:
|
||||
tokens = _host.split(":")
|
||||
_host = tokens[0]
|
||||
_port = int(tokens[1])
|
||||
|
||||
try:
|
||||
bm.SetBroker(_host)
|
||||
bm.display()
|
||||
rm.setHost(_host)
|
||||
rm.setPort(_port)
|
||||
# determine which REST service we will be utilizing
|
||||
_serviceEndpoint = None
|
||||
if _types[0] == 'b': _serviceEndpoint = "broker"
|
||||
elif _types[0] == 'c': _serviceEndpoint = "connection"
|
||||
elif _types[0] == 's': _serviceEndpoint = "session"
|
||||
elif _types[0] == 'e': _serviceEndpoint = "exchange"
|
||||
elif _types[0] == 'q': _serviceEndpoint = "queue"
|
||||
|
||||
response = rm.execute(_serviceEndpoint)
|
||||
|
||||
# evaluate the JSON
|
||||
jsonStr = response.read()
|
||||
jsonObjArray = json.loads(jsonStr)
|
||||
|
||||
# display construct
|
||||
dm = DisplayManager()
|
||||
dm.displayMain(_types[0], jsonObjArray)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print
|
||||
except Exception,e:
|
||||
print "Failed: %s - %s" % (e.__class__.__name__, e)
|
||||
sys.exit(1)
|
||||
|
||||
bm.Disconnect()
|
||||
sys.exit(1)
|
Loading…
Add table
Reference in a new issue