146 lines
4.5 KiB
Python
Executable file
146 lines
4.5 KiB
Python
Executable file
#!/usr/bin/env python
|
|
|
|
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
|
|
import os
|
|
import optparse
|
|
import sys
|
|
import re
|
|
import socket
|
|
import qpid
|
|
from threading import Condition
|
|
from qmf.console import Session, Console
|
|
from qpid.peer import Closed
|
|
from qpid.connection import Connection, ConnectionFailed
|
|
from time import sleep
|
|
|
|
class BrokerManager(Console):
|
|
def __init__(self, host):
|
|
self.url = host
|
|
self.objects = {}
|
|
self.filter = None
|
|
self.session = Session(self, rcvEvents=False, rcvHeartbeats=False,
|
|
userBindings=True, manageConnections=True)
|
|
self.broker = self.session.addBroker(self.url)
|
|
self.firstError = True
|
|
|
|
def setFilter(self,filter):
|
|
self.filter = filter
|
|
|
|
def brokerConnected(self, broker):
|
|
if not self.firstError:
|
|
print "*** Broker connected"
|
|
self.firstError = False
|
|
|
|
def brokerDisconnected(self, broker):
|
|
print "*** Broker connection lost - %s, retrying..." % broker.getError()
|
|
self.firstError = False
|
|
self.objects.clear()
|
|
|
|
def objectProps(self, broker, record):
|
|
className = record.getClassKey().getClassName()
|
|
if className != "queue":
|
|
return
|
|
|
|
id = record.getObjectId().__repr__()
|
|
if id not in self.objects:
|
|
self.objects[id] = (record.name, None, None)
|
|
|
|
def objectStats(self, broker, record):
|
|
className = record.getClassKey().getClassName()
|
|
if className != "queue":
|
|
return
|
|
|
|
id = record.getObjectId().__repr__()
|
|
if id not in self.objects:
|
|
return
|
|
|
|
(name, first, last) = self.objects[id]
|
|
if first == None:
|
|
self.objects[id] = (name, record, None)
|
|
return
|
|
|
|
if len(self.filter) > 0 :
|
|
match = False
|
|
|
|
for x in self.filter:
|
|
if x.match(name):
|
|
match = True
|
|
break
|
|
if match == False:
|
|
return
|
|
|
|
if last == None:
|
|
lastSample = first
|
|
else:
|
|
lastSample = last
|
|
|
|
self.objects[id] = (name, first, record)
|
|
|
|
deltaTime = float (record.getTimestamps()[0] - lastSample.getTimestamps()[0])
|
|
if deltaTime < 1000000000.0:
|
|
return
|
|
enqueueRate = float (record.msgTotalEnqueues - lastSample.msgTotalEnqueues) / \
|
|
(deltaTime / 1000000000.0)
|
|
dequeueRate = float (record.msgTotalDequeues - lastSample.msgTotalDequeues) / \
|
|
(deltaTime / 1000000000.0)
|
|
print "%-41s%10.2f%11d%13.2f%13.2f" % \
|
|
(name, deltaTime / 1000000000, record.msgDepth, enqueueRate, dequeueRate)
|
|
sys.stdout.flush()
|
|
|
|
|
|
def Display (self):
|
|
self.session.bindClass("org.apache.qpid.broker", "queue")
|
|
print "Queue Name Sec Depth Enq Rate Deq Rate"
|
|
print "========================================================================================"
|
|
sys.stdout.flush()
|
|
try:
|
|
while True:
|
|
sleep (1)
|
|
if self.firstError and self.broker.getError():
|
|
self.firstError = False
|
|
print "*** Error: %s, retrying..." % self.broker.getError()
|
|
except KeyboardInterrupt:
|
|
print
|
|
self.session.delBroker(self.broker)
|
|
|
|
##
|
|
## Main Program
|
|
##
|
|
def main():
|
|
p = optparse.OptionParser()
|
|
p.add_option('--broker-address','-a', default='localhost' , help='broker-addr is in the form: [username/password@] hostname | ip-address [:<port>] \n ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost')
|
|
p.add_option('--filter','-f' ,default=None ,help='a list of comma separated queue names (regex are accepted) to show')
|
|
|
|
options, arguments = p.parse_args()
|
|
|
|
host = options.broker_address
|
|
filter = []
|
|
if options.filter != None:
|
|
for s in options.filter.split(","):
|
|
filter.append(re.compile(s))
|
|
|
|
bm = BrokerManager(host)
|
|
bm.setFilter(filter)
|
|
bm.Display()
|
|
|
|
if __name__ == '__main__':
|
|
main()
|
|
|