318 lines
10 KiB
Python
Executable file
318 lines
10 KiB
Python
Executable file
#!/usr/bin/env python
|
|
|
|
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
|
|
import os
|
|
import getopt
|
|
import sys
|
|
import locale
|
|
import socket
|
|
import re
|
|
from qmf.console import Session, Console
|
|
from qpid.disp import Display, Header, Sorter
|
|
|
|
_host = "localhost"
|
|
_connTimeout = 10
|
|
_types = ""
|
|
_limit = 200
|
|
_increasing = False
|
|
_sortcol = None
|
|
pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$")
|
|
|
|
def Usage ():
|
|
print "Usage: qpid-queue-count [OPTIONS] [broker-addr]"
|
|
print
|
|
print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]"
|
|
print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
|
|
print
|
|
print "General Options:"
|
|
print " --timeout seconds (10) Maximum time to wait for broker connection"
|
|
# print " -n [--numeric] Don't resolve names"
|
|
print
|
|
print "Display Options:"
|
|
print " -S [--sort-by] COLNAME Sort by column name"
|
|
print " -I [--increasing] Sort by increasing value (default = decreasing)"
|
|
print " -L [--limit] NUM Limit output to NUM rows (default = 200)"
|
|
print
|
|
sys.exit (1)
|
|
|
|
class IpAddr:
|
|
def __init__(self, text):
|
|
if text.find("@") != -1:
|
|
tokens = text.split("@")
|
|
text = tokens[1]
|
|
if text.find(":") != -1:
|
|
tokens = text.split(":")
|
|
text = tokens[0]
|
|
self.port = int(tokens[1])
|
|
else:
|
|
self.port = 5672
|
|
self.dottedQuad = socket.gethostbyname(text)
|
|
nums = self.dottedQuad.split(".")
|
|
self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3])
|
|
|
|
def bestAddr(self, addrPortList):
|
|
bestDiff = 0xFFFFFFFFL
|
|
bestAddr = None
|
|
for addrPort in addrPortList:
|
|
diff = IpAddr(addrPort[0]).addr ^ self.addr
|
|
if diff < bestDiff:
|
|
bestDiff = diff
|
|
bestAddr = addrPort
|
|
return bestAddr
|
|
|
|
class Broker(object):
|
|
def __init__(self, qmf, broker):
|
|
self.broker = broker
|
|
|
|
agents = qmf.getAgents()
|
|
for a in agents:
|
|
if a.getAgentBank() == 0:
|
|
self.brokerAgent = a
|
|
|
|
bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker", _agent=self.brokerAgent)[0]
|
|
self.currentTime = bobj.getTimestamps()[0]
|
|
try:
|
|
self.uptime = bobj.uptime
|
|
except:
|
|
self.uptime = 0
|
|
self.connections = {}
|
|
self.sessions = {}
|
|
self.exchanges = {}
|
|
self.queues = {}
|
|
package = "org.apache.qpid.broker"
|
|
|
|
list = qmf.getObjects(_class="connection", _package=package, _agent=self.brokerAgent)
|
|
for conn in list:
|
|
if pattern.match(conn.address):
|
|
self.connections[conn.getObjectId()] = conn
|
|
|
|
list = qmf.getObjects(_class="session", _package=package, _agent=self.brokerAgent)
|
|
for sess in list:
|
|
if sess.connectionRef in self.connections:
|
|
self.sessions[sess.getObjectId()] = sess
|
|
|
|
list = qmf.getObjects(_class="exchange", _package=package, _agent=self.brokerAgent)
|
|
for exchange in list:
|
|
self.exchanges[exchange.getObjectId()] = exchange
|
|
|
|
list = qmf.getObjects(_class="queue", _package=package, _agent=self.brokerAgent)
|
|
for queue in list:
|
|
self.queues[queue.getObjectId()] = queue
|
|
|
|
def getName(self):
|
|
return self.broker.getUrl()
|
|
|
|
def getCurrentTime(self):
|
|
return self.currentTime
|
|
|
|
def getUptime(self):
|
|
return self.uptime
|
|
|
|
class BrokerManager(Console):
|
|
def __init__(self):
|
|
self.brokerName = None
|
|
self.qmf = None
|
|
self.broker = None
|
|
self.brokers = []
|
|
self.cluster = None
|
|
|
|
def SetBroker(self, brokerUrl):
|
|
self.url = brokerUrl
|
|
self.qmf = Session()
|
|
self.broker = self.qmf.addBroker(brokerUrl, _connTimeout)
|
|
agents = self.qmf.getAgents()
|
|
for a in agents:
|
|
if a.getAgentBank() == 0:
|
|
self.brokerAgent = a
|
|
|
|
def Disconnect(self):
|
|
if self.broker:
|
|
self.qmf.delBroker(self.broker)
|
|
|
|
def _getCluster(self):
|
|
packages = self.qmf.getPackages()
|
|
if "org.apache.qpid.cluster" not in packages:
|
|
return None
|
|
|
|
clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
|
|
if len(clusters) == 0:
|
|
print "Clustering is installed but not enabled on the broker."
|
|
return None
|
|
|
|
self.cluster = clusters[0]
|
|
|
|
def _getHostList(self, urlList):
|
|
hosts = []
|
|
hostAddr = IpAddr(_host)
|
|
for url in urlList:
|
|
if url.find("amqp:") != 0:
|
|
raise Exception("Invalid URL 1")
|
|
url = url[5:]
|
|
addrs = str(url).split(",")
|
|
addrList = []
|
|
for addr in addrs:
|
|
tokens = addr.split(":")
|
|
if len(tokens) != 3:
|
|
raise Exception("Invalid URL 2")
|
|
addrList.append((tokens[1], tokens[2]))
|
|
|
|
# Find the address in the list that is most likely to be in the same subnet as the address
|
|
# with which we made the original QMF connection. This increases the probability that we will
|
|
# be able to reach the cluster member.
|
|
|
|
best = hostAddr.bestAddr(addrList)
|
|
bestUrl = best[0] + ":" + best[1]
|
|
hosts.append(bestUrl)
|
|
return hosts
|
|
|
|
def displaySubs(self, subs, indent, broker=None, conn=None, sess=None, exchange=None, queue=None):
|
|
if len(subs) == 0:
|
|
return
|
|
this = subs[0]
|
|
remaining = subs[1:]
|
|
newindent = indent + " "
|
|
if this == 'b':
|
|
pass
|
|
elif this == 'c':
|
|
if broker:
|
|
for oid in broker.connections:
|
|
iconn = broker.connections[oid]
|
|
self.printConnSub(indent, broker.getName(), iconn)
|
|
self.displaySubs(remaining, newindent, broker=broker, conn=iconn,
|
|
sess=sess, exchange=exchange, queue=queue)
|
|
elif this == 's':
|
|
pass
|
|
elif this == 'e':
|
|
pass
|
|
elif this == 'q':
|
|
pass
|
|
print
|
|
|
|
def displayQueue(self, subs):
|
|
disp = Display(prefix=" ")
|
|
heads = []
|
|
if self.cluster:
|
|
heads.append(Header('broker'))
|
|
heads.append(Header("queue"))
|
|
heads.append(Header("msg", Header.KMG))
|
|
heads.append(Header("bytes", Header.KMG))
|
|
heads.append(Header("cons", Header.KMG))
|
|
rows = []
|
|
for broker in self.brokers:
|
|
for oid in broker.queues:
|
|
q = broker.queues[oid]
|
|
row = []
|
|
if self.cluster:
|
|
row.append(broker.getName())
|
|
row.append(q.name)
|
|
row.append(q.msgDepth)
|
|
row.append(q.byteDepth)
|
|
row.append(q.consumerCount)
|
|
rows.append(row)
|
|
title = "Queues"
|
|
if self.cluster:
|
|
title += " for cluster '%s'" % self.cluster.clusterName
|
|
if _sortcol:
|
|
sorter = Sorter(heads, rows, _sortcol, _limit, _increasing)
|
|
dispRows = sorter.getSorted()
|
|
else:
|
|
dispRows = rows
|
|
disp.formattedTable(title, heads, dispRows)
|
|
|
|
def displayMain(self, main, subs):
|
|
self.displayQueue(subs)
|
|
|
|
def display(self):
|
|
self._getCluster()
|
|
if self.cluster:
|
|
memberList = self.cluster.members.split(";")
|
|
hostList = self._getHostList(memberList)
|
|
self.qmf.delBroker(self.broker)
|
|
self.broker = None
|
|
if _host.find("@") > 0:
|
|
authString = _host.split("@")[0] + "@"
|
|
else:
|
|
authString = ""
|
|
for host in hostList:
|
|
b = self.qmf.addBroker(authString + host, _connTimeout)
|
|
self.brokers.append(Broker(self.qmf, b))
|
|
else:
|
|
self.brokers.append(Broker(self.qmf, self.broker))
|
|
|
|
self.displayMain(_types[0], _types[1:])
|
|
|
|
|
|
##
|
|
## Main Program
|
|
##
|
|
try:
|
|
longOpts = ("top", "numeric", "sort-by=", "limit=", "increasing", "timeout=")
|
|
(optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "bceqS:L:I", longOpts)
|
|
except:
|
|
Usage()
|
|
|
|
try:
|
|
encoding = locale.getpreferredencoding()
|
|
cargs = [a.decode(encoding) for a in encArgs]
|
|
except:
|
|
cargs = encArgs
|
|
|
|
for opt in optlist:
|
|
if opt[0] == "--timeout":
|
|
_connTimeout = int(opt[1])
|
|
if _connTimeout == 0:
|
|
_connTimeout = None
|
|
elif opt[0] == "-n" or opt[0] == "--numeric":
|
|
_numeric = True
|
|
elif opt[0] == "-S" or opt[0] == "--sort-by":
|
|
_sortcol = opt[1]
|
|
elif opt[0] == "-I" or opt[0] == "--increasing":
|
|
_increasing = True
|
|
elif opt[0] == "-L" or opt[0] == "--limit":
|
|
_limit = int(opt[1])
|
|
elif len(opt[0]) == 2:
|
|
char = opt[0][1]
|
|
if "bcseq".find(char) != -1:
|
|
_types += char
|
|
else:
|
|
Usage()
|
|
else:
|
|
Usage()
|
|
|
|
if len(_types) == 0:
|
|
_types='q'
|
|
|
|
nargs = len(cargs)
|
|
bm = BrokerManager()
|
|
|
|
if nargs == 1:
|
|
_host = cargs[0]
|
|
|
|
try:
|
|
bm.SetBroker(_host)
|
|
bm.display()
|
|
except KeyboardInterrupt:
|
|
print
|
|
except Exception,e:
|
|
print "Failed: %s - %s" % (e.__class__.__name__, e)
|
|
sys.exit(1)
|
|
|
|
bm.Disconnect()
|