Former-commit-id:06a8b51d6d
[formerlya02aeb236c
[formerly9f19e3f712
] [formerly06a8b51d6d
[formerly 64fa9254b946eae7e61bbc3f513b7c3696c4f54f]]] Former-commit-id:a02aeb236c
[formerly9f19e3f712
] Former-commit-id:a02aeb236c
Former-commit-id:133dc97f67
495 lines
20 KiB
Python
Executable file
495 lines
20 KiB
Python
Executable file
#!/usr/bin/env python
|
|
|
|
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
|
|
import os
|
|
import getopt
|
|
import sys
|
|
import locale
|
|
from qmf.console import Session
|
|
|
|
_recursive = False
|
|
_host = "localhost"
|
|
_connTimeout = 10
|
|
_altern_ex = None
|
|
_passive = False
|
|
_durable = False
|
|
_clusterDurable = False
|
|
_if_empty = True
|
|
_if_unused = True
|
|
_fileCount = 8
|
|
_fileSize = 24
|
|
_maxQueueSize = None
|
|
_maxQueueCount = None
|
|
_limitPolicy = None
|
|
_order = None
|
|
_msgSequence = False
|
|
_ive = False
|
|
_eventGeneration = None
|
|
|
|
FILECOUNT = "qpid.file_count"
|
|
FILESIZE = "qpid.file_size"
|
|
MAX_QUEUE_SIZE = "qpid.max_size"
|
|
MAX_QUEUE_COUNT = "qpid.max_count"
|
|
POLICY_TYPE = "qpid.policy_type"
|
|
CLUSTER_DURABLE = "qpid.persist_last_node"
|
|
LVQ = "qpid.last_value_queue"
|
|
LVQNB = "qpid.last_value_queue_no_browse"
|
|
MSG_SEQUENCE = "qpid.msg_sequence"
|
|
IVE = "qpid.ive"
|
|
QUEUE_EVENT_GENERATION = "qpid.queue_event_generation"
|
|
|
|
def Usage ():
|
|
print "Usage: qpid-config [OPTIONS]"
|
|
print " qpid-config [OPTIONS] exchanges [filter-string]"
|
|
print " qpid-config [OPTIONS] queues [filter-string]"
|
|
print " qpid-config [OPTIONS] add exchange <type> <name> [AddExchangeOptions]"
|
|
print " qpid-config [OPTIONS] del exchange <name>"
|
|
print " qpid-config [OPTIONS] add queue <name> [AddQueueOptions]"
|
|
print " qpid-config [OPTIONS] del queue <name> [DelQueueOptions]"
|
|
print " qpid-config [OPTIONS] bind <exchange-name> <queue-name> [binding-key]"
|
|
print " qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key]"
|
|
print
|
|
print "Options:"
|
|
print " --timeout seconds (10) Maximum time to wait for broker connection"
|
|
print " -b [ --bindings ] Show bindings in queue or exchange list"
|
|
print " -a [ --broker-addr ] Address (localhost) Address of qpidd broker"
|
|
print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]"
|
|
print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
|
|
print
|
|
print "Add Queue Options:"
|
|
print " --alternate-exchange [name of the alternate exchange]"
|
|
print " The alternate-exchange field specifies how messages on this queue should"
|
|
print " be treated when they are rejected by a subscriber, or when they are"
|
|
print " orphaned by queue deletion. When present, rejected or orphaned messages"
|
|
print " MUST be routed to the alternate-exchange. In all cases the messages MUST"
|
|
print " be removed from the queue."
|
|
print " --passive Do not actually change the broker state (queue will not be created)"
|
|
print " --durable Queue is durable"
|
|
print " --cluster-durable Queue becomes durable if there is only one functioning cluster node"
|
|
print " --file-count N (8) Number of files in queue's persistence journal"
|
|
print " --file-size N (24) File size in pages (64Kib/page)"
|
|
print " --max-queue-size N Maximum in-memory queue size as bytes"
|
|
print " --max-queue-count N Maximum in-memory queue size as a number of messages"
|
|
print " --limit-policy [none | reject | flow-to-disk | ring | ring-strict]"
|
|
print " Action taken when queue limit is reached:"
|
|
print " none (default) - Use broker's default policy"
|
|
print " reject - Reject enqueued messages"
|
|
print " flow-to-disk - Page messages to disk"
|
|
print " ring - Replace oldest unacquired message with new"
|
|
print " ring-strict - Replace oldest message, reject if oldest is acquired"
|
|
print " --order [fifo | lvq | lvq-no-browse]"
|
|
print " Set queue ordering policy:"
|
|
print " fifo (default) - First in, first out"
|
|
print " lvq - Last Value Queue ordering, allows queue browsing"
|
|
print " lvq-no-browse - Last Value Queue ordering, browsing clients may lose data"
|
|
print " --generate-queue-events N"
|
|
print " If set to 1, every enqueue will generate an event that can be processed by"
|
|
print " registered listeners (e.g. for replication). If set to 2, events will be"
|
|
print " generated for enqueues and dequeues"
|
|
print
|
|
print "Del Queue Options:"
|
|
print " --force Force delete of queue even if it's currently used or it's not empty"
|
|
print " --force-if-not-empty Force delete of queue even if it's not empty"
|
|
print " --force-if-used Force delete of queue even if it's currently used"
|
|
print
|
|
print "Add Exchange <type> values:"
|
|
print " direct Direct exchange for point-to-point communication"
|
|
print " fanout Fanout exchange for broadcast communication"
|
|
print " topic Topic exchange that routes messages using binding keys with wildcards"
|
|
print " headers Headers exchange that matches header fields against the binding keys"
|
|
print
|
|
print "Add Exchange Options:"
|
|
print " --alternate-exchange [name of the alternate exchange]"
|
|
print " In the event that a message cannot be routed, this is the name of the exchange to"
|
|
print " which the message will be sent. Messages transferred using message.transfer will"
|
|
print " be routed to the alternate-exchange only if they are sent with the \"none\""
|
|
print " accept-mode, and the discard-unroutable delivery property is set to false, and"
|
|
print " there is no queue to route to for the given message according to the bindings"
|
|
print " on this exchange."
|
|
print " --passive Do not actually change the broker state (exchange will not be created)"
|
|
print " --durable Exchange is durable"
|
|
print " --sequence Exchange will insert a 'qpid.msg_sequence' field in the message header"
|
|
print " with a value that increments for each message forwarded."
|
|
print " --ive Exchange will behave as an 'initial-value-exchange', keeping a reference"
|
|
print " to the last message forwarded and enqueuing that message to newly bound"
|
|
print " queues."
|
|
print
|
|
sys.exit (1)
|
|
|
|
class BrokerManager:
|
|
def __init__ (self):
|
|
self.brokerName = None
|
|
self.qmf = None
|
|
self.broker = None
|
|
|
|
def SetBroker (self, brokerUrl):
|
|
self.url = brokerUrl
|
|
self.qmf = Session()
|
|
self.broker = self.qmf.addBroker(brokerUrl, _connTimeout)
|
|
agents = self.qmf.getAgents()
|
|
for a in agents:
|
|
if a.getAgentBank() == 0:
|
|
self.brokerAgent = a
|
|
|
|
def Disconnect(self):
|
|
if self.broker:
|
|
self.qmf.delBroker(self.broker)
|
|
|
|
def Overview (self):
|
|
exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent)
|
|
queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
|
|
print "Total Exchanges: %d" % len (exchanges)
|
|
etype = {}
|
|
for ex in exchanges:
|
|
if ex.type not in etype:
|
|
etype[ex.type] = 1
|
|
else:
|
|
etype[ex.type] = etype[ex.type] + 1
|
|
for typ in etype:
|
|
print "%15s: %d" % (typ, etype[typ])
|
|
|
|
print
|
|
print " Total Queues: %d" % len (queues)
|
|
_durable = 0
|
|
for queue in queues:
|
|
if queue.durable:
|
|
_durable = _durable + 1
|
|
print " durable: %d" % _durable
|
|
print " non-durable: %d" % (len (queues) - _durable)
|
|
|
|
def ExchangeList (self, filter):
|
|
exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent)
|
|
caption1 = "Type "
|
|
caption2 = "Exchange Name"
|
|
maxNameLen = len(caption2)
|
|
for ex in exchanges:
|
|
if self.match(ex.name, filter):
|
|
if len(ex.name) > maxNameLen: maxNameLen = len(ex.name)
|
|
print "%s%-*s Attributes" % (caption1, maxNameLen, caption2)
|
|
line = ""
|
|
for i in range(((maxNameLen + len(caption1)) / 5) + 5):
|
|
line += "====="
|
|
print line
|
|
|
|
for ex in exchanges:
|
|
if self.match (ex.name, filter):
|
|
print "%-10s%-*s " % (ex.type, maxNameLen, ex.name),
|
|
args = ex.arguments
|
|
if ex.durable: print "--durable",
|
|
if MSG_SEQUENCE in args and args[MSG_SEQUENCE] == 1: print "--sequence",
|
|
if IVE in args and args[IVE] == 1: print "--ive",
|
|
if ex.altExchange:
|
|
print "--alternate-exchange=%s" % ex._altExchange_.name,
|
|
print
|
|
|
|
def ExchangeListRecurse (self, filter):
|
|
exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent)
|
|
bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent)
|
|
queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
|
|
for ex in exchanges:
|
|
if self.match (ex.name, filter):
|
|
print "Exchange '%s' (%s)" % (ex.name, ex.type)
|
|
for bind in bindings:
|
|
if bind.exchangeRef == ex.getObjectId():
|
|
qname = "<unknown>"
|
|
queue = self.findById (queues, bind.queueRef)
|
|
if queue != None:
|
|
qname = queue.name
|
|
print " bind [%s] => %s" % (bind.bindingKey, qname)
|
|
|
|
|
|
def QueueList (self, filter):
|
|
queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
|
|
|
|
caption = "Queue Name"
|
|
maxNameLen = len(caption)
|
|
for q in queues:
|
|
if self.match (q.name, filter):
|
|
if len(q.name) > maxNameLen: maxNameLen = len(q.name)
|
|
print "%-*s Attributes" % (maxNameLen, caption)
|
|
line = ""
|
|
for i in range((maxNameLen / 5) + 5):
|
|
line += "====="
|
|
print line
|
|
|
|
for q in queues:
|
|
if self.match (q.name, filter):
|
|
print "%-*s " % (maxNameLen, q.name),
|
|
args = q.arguments
|
|
if q.durable: print "--durable",
|
|
if CLUSTER_DURABLE in args and args[CLUSTER_DURABLE] == 1: print "--cluster-durable",
|
|
if q.autoDelete: print "auto-del",
|
|
if q.exclusive: print "excl",
|
|
if FILESIZE in args: print "--file-size=%d" % args[FILESIZE],
|
|
if FILECOUNT in args: print "--file-count=%d" % args[FILECOUNT],
|
|
if MAX_QUEUE_SIZE in args: print "--max-queue-size=%d" % args[MAX_QUEUE_SIZE],
|
|
if MAX_QUEUE_COUNT in args: print "--max-queue-count=%d" % args[MAX_QUEUE_COUNT],
|
|
if POLICY_TYPE in args: print "--limit-policy=%s" % args[POLICY_TYPE].replace("_", "-"),
|
|
if LVQ in args and args[LVQ] == 1: print "--order lvq",
|
|
if LVQNB in args and args[LVQNB] == 1: print "--order lvq-no-browse",
|
|
if QUEUE_EVENT_GENERATION in args: print "--generate-queue-events=%d" % args[QUEUE_EVENT_GENERATION],
|
|
if q.altExchange:
|
|
print "--alternate-exchange=%s" % q._altExchange_.name,
|
|
print
|
|
|
|
def QueueListRecurse (self, filter):
|
|
exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent)
|
|
bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent)
|
|
queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
|
|
for queue in queues:
|
|
if self.match (queue.name, filter):
|
|
print "Queue '%s'" % queue.name
|
|
for bind in bindings:
|
|
if bind.queueRef == queue.getObjectId():
|
|
ename = "<unknown>"
|
|
ex = self.findById (exchanges, bind.exchangeRef)
|
|
if ex != None:
|
|
ename = ex.name
|
|
if ename == "":
|
|
ename = "''"
|
|
print " bind [%s] => %s" % (bind.bindingKey, ename)
|
|
|
|
def AddExchange (self, args):
|
|
if len (args) < 2:
|
|
Usage ()
|
|
etype = args[0]
|
|
ename = args[1]
|
|
declArgs = {}
|
|
if _msgSequence:
|
|
declArgs[MSG_SEQUENCE] = 1
|
|
if _ive:
|
|
declArgs[IVE] = 1
|
|
if _altern_ex != None:
|
|
self.broker.getAmqpSession().exchange_declare (exchange=ename, type=etype, alternate_exchange=_altern_ex, passive=_passive, durable=_durable, arguments=declArgs)
|
|
else:
|
|
self.broker.getAmqpSession().exchange_declare (exchange=ename, type=etype, passive=_passive, durable=_durable, arguments=declArgs)
|
|
|
|
def DelExchange (self, args):
|
|
if len (args) < 1:
|
|
Usage ()
|
|
ename = args[0]
|
|
self.broker.getAmqpSession().exchange_delete (exchange=ename)
|
|
|
|
def AddQueue (self, args):
|
|
if len (args) < 1:
|
|
Usage ()
|
|
qname = args[0]
|
|
declArgs = {}
|
|
if _durable:
|
|
declArgs[FILECOUNT] = _fileCount
|
|
declArgs[FILESIZE] = _fileSize
|
|
|
|
if _maxQueueSize:
|
|
declArgs[MAX_QUEUE_SIZE] = _maxQueueSize
|
|
if _maxQueueCount:
|
|
declArgs[MAX_QUEUE_COUNT] = _maxQueueCount
|
|
if _limitPolicy:
|
|
if _limitPolicy == "none":
|
|
pass
|
|
elif _limitPolicy == "reject":
|
|
declArgs[POLICY_TYPE] = "reject"
|
|
elif _limitPolicy == "flow-to-disk":
|
|
declArgs[POLICY_TYPE] = "flow_to_disk"
|
|
elif _limitPolicy == "ring":
|
|
declArgs[POLICY_TYPE] = "ring"
|
|
elif _limitPolicy == "ring-strict":
|
|
declArgs[POLICY_TYPE] = "ring_strict"
|
|
|
|
if _clusterDurable:
|
|
declArgs[CLUSTER_DURABLE] = 1
|
|
if _order:
|
|
if _order == "fifo":
|
|
pass
|
|
elif _order == "lvq":
|
|
declArgs[LVQ] = 1
|
|
elif _order == "lvq-no-browse":
|
|
declArgs[LVQNB] = 1
|
|
if _eventGeneration:
|
|
declArgs[QUEUE_EVENT_GENERATION] = _eventGeneration
|
|
|
|
if _altern_ex != None:
|
|
self.broker.getAmqpSession().queue_declare (queue=qname, alternate_exchange=_altern_ex, passive=_passive, durable=_durable, arguments=declArgs)
|
|
else:
|
|
self.broker.getAmqpSession().queue_declare (queue=qname, passive=_passive, durable=_durable, arguments=declArgs)
|
|
|
|
def DelQueue (self, args):
|
|
if len (args) < 1:
|
|
Usage ()
|
|
qname = args[0]
|
|
self.broker.getAmqpSession().queue_delete (queue=qname, if_empty=_if_empty, if_unused=_if_unused)
|
|
|
|
def Bind (self, args):
|
|
if len (args) < 2:
|
|
Usage ()
|
|
ename = args[0]
|
|
qname = args[1]
|
|
key = ""
|
|
if len (args) > 2:
|
|
key = args[2]
|
|
self.broker.getAmqpSession().exchange_bind (queue=qname, exchange=ename, binding_key=key)
|
|
|
|
def Unbind (self, args):
|
|
if len (args) < 2:
|
|
Usage ()
|
|
ename = args[0]
|
|
qname = args[1]
|
|
key = ""
|
|
if len (args) > 2:
|
|
key = args[2]
|
|
self.broker.getAmqpSession().exchange_unbind (queue=qname, exchange=ename, binding_key=key)
|
|
|
|
def findById (self, items, id):
|
|
for item in items:
|
|
if item.getObjectId() == id:
|
|
return item
|
|
return None
|
|
|
|
def match (self, name, filter):
|
|
if filter == "":
|
|
return True
|
|
if name.find (filter) == -1:
|
|
return False
|
|
return True
|
|
|
|
def YN (bool):
|
|
if bool:
|
|
return 'Y'
|
|
return 'N'
|
|
|
|
|
|
##
|
|
## Main Program
|
|
##
|
|
|
|
try:
|
|
longOpts = ("durable", "cluster-durable", "bindings", "broker-addr=", "file-count=",
|
|
"file-size=", "max-queue-size=", "max-queue-count=", "limit-policy=",
|
|
"order=", "sequence", "ive", "generate-queue-events=", "force", "force-if-not-empty",
|
|
"force_if_used", "alternate-exchange=", "passive", "timeout=")
|
|
(optlist, encArgs) = getopt.gnu_getopt (sys.argv[1:], "a:b", longOpts)
|
|
except:
|
|
Usage ()
|
|
|
|
try:
|
|
encoding = locale.getpreferredencoding()
|
|
cargs = [a.decode(encoding) for a in encArgs]
|
|
except:
|
|
cargs = encArgs
|
|
|
|
for opt in optlist:
|
|
if opt[0] == "-b" or opt[0] == "--bindings":
|
|
_recursive = True
|
|
if opt[0] == "-a" or opt[0] == "--broker-addr":
|
|
_host = opt[1]
|
|
if opt[0] == "--timeout":
|
|
_connTimeout = int(opt[1])
|
|
if _connTimeout == 0:
|
|
_connTimeout = None
|
|
if opt[0] == "--alternate-exchange":
|
|
_altern_ex = opt[1]
|
|
if opt[0] == "--passive":
|
|
_passive = True
|
|
if opt[0] == "--durable":
|
|
_durable = True
|
|
if opt[0] == "--cluster-durable":
|
|
_clusterDurable = True
|
|
if opt[0] == "--file-count":
|
|
_fileCount = int (opt[1])
|
|
if opt[0] == "--file-size":
|
|
_fileSize = int (opt[1])
|
|
if opt[0] == "--max-queue-size":
|
|
_maxQueueSize = int (opt[1])
|
|
if opt[0] == "--max-queue-count":
|
|
_maxQueueCount = int (opt[1])
|
|
if opt[0] == "--limit-policy":
|
|
_limitPolicy = opt[1]
|
|
if _limitPolicy not in ("none", "reject", "flow-to-disk", "ring", "ring-strict"):
|
|
print "Error: Invalid --limit-policy argument"
|
|
sys.exit(1)
|
|
if opt[0] == "--order":
|
|
_order = opt[1]
|
|
if _order not in ("fifo", "lvq", "lvq-no-browse"):
|
|
print "Error: Invalid --order argument"
|
|
sys.exit(1)
|
|
if opt[0] == "--sequence":
|
|
_msgSequence = True
|
|
if opt[0] == "--ive":
|
|
_ive = True
|
|
if opt[0] == "--generate-queue-events":
|
|
_eventGeneration = int (opt[1])
|
|
if opt[0] == "--force":
|
|
_if_empty = False
|
|
_if_unused = False
|
|
if opt[0] == "--force-if-not-empty":
|
|
_if_empty = False
|
|
if opt[0] == "--force-if-used":
|
|
_if_unused = False
|
|
|
|
|
|
nargs = len (cargs)
|
|
bm = BrokerManager ()
|
|
|
|
try:
|
|
bm.SetBroker(_host)
|
|
if nargs == 0:
|
|
bm.Overview ()
|
|
else:
|
|
cmd = cargs[0]
|
|
modifier = ""
|
|
if nargs > 1:
|
|
modifier = cargs[1]
|
|
if cmd == "exchanges":
|
|
if _recursive:
|
|
bm.ExchangeListRecurse (modifier)
|
|
else:
|
|
bm.ExchangeList (modifier)
|
|
elif cmd == "queues":
|
|
if _recursive:
|
|
bm.QueueListRecurse (modifier)
|
|
else:
|
|
bm.QueueList (modifier)
|
|
elif cmd == "add":
|
|
if modifier == "exchange":
|
|
bm.AddExchange (cargs[2:])
|
|
elif modifier == "queue":
|
|
bm.AddQueue (cargs[2:])
|
|
else:
|
|
Usage ()
|
|
elif cmd == "del":
|
|
if modifier == "exchange":
|
|
bm.DelExchange (cargs[2:])
|
|
elif modifier == "queue":
|
|
bm.DelQueue (cargs[2:])
|
|
else:
|
|
Usage ()
|
|
elif cmd == "bind":
|
|
bm.Bind (cargs[1:])
|
|
elif cmd == "unbind":
|
|
bm.Unbind (cargs[1:])
|
|
else:
|
|
Usage ()
|
|
except KeyboardInterrupt:
|
|
print
|
|
except Exception,e:
|
|
print "Failed: %s: %s" % (e.__class__.__name__, e)
|
|
sys.exit(1)
|
|
|
|
bm.Disconnect()
|