Former-commit-id:133dc97f67
[formerlya02aeb236c
] [formerly9f19e3f712
] [formerly06a8b51d6d
[formerly9f19e3f712
[formerly 64fa9254b946eae7e61bbc3f513b7c3696c4f54f]]] Former-commit-id:06a8b51d6d
Former-commit-id:377dcd10b9
[formerly3360eb6c5f
] Former-commit-id:8e80217e59
524 lines
19 KiB
Python
Executable file
524 lines
19 KiB
Python
Executable file
#!/usr/bin/env python
|
|
|
|
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
|
|
import getopt
|
|
import sys
|
|
import socket
|
|
import os
|
|
import locale
|
|
from qmf.console import Session, BrokerURL
|
|
|
|
def Usage():
|
|
print "Usage: qpid-route [OPTIONS] dynamic add <dest-broker> <src-broker> <exchange> [tag] [exclude-list]"
|
|
print " qpid-route [OPTIONS] dynamic del <dest-broker> <src-broker> <exchange>"
|
|
print
|
|
print " qpid-route [OPTIONS] route add <dest-broker> <src-broker> <exchange> <routing-key> [tag] [exclude-list]"
|
|
print " qpid-route [OPTIONS] route del <dest-broker> <src-broker> <exchange> <routing-key>"
|
|
print " qpid-route [OPTIONS] queue add <dest-broker> <src-broker> <exchange> <queue>"
|
|
print " qpid-route [OPTIONS] queue del <dest-broker> <src-broker> <exchange> <queue>"
|
|
print " qpid-route [OPTIONS] route list [<dest-broker>]"
|
|
print " qpid-route [OPTIONS] route flush [<dest-broker>]"
|
|
print " qpid-route [OPTIONS] route map [<broker>]"
|
|
print
|
|
print " qpid-route [OPTIONS] link add <dest-broker> <src-broker>"
|
|
print " qpid-route [OPTIONS] link del <dest-broker> <src-broker>"
|
|
print " qpid-route [OPTIONS] link list [<dest-broker>]"
|
|
print
|
|
print "Options:"
|
|
print " --timeout seconds (10) Maximum time to wait for broker connection"
|
|
print " -v [ --verbose ] Verbose output"
|
|
print " -q [ --quiet ] Quiet output, don't print duplicate warnings"
|
|
print " -d [ --durable ] Added configuration shall be durable"
|
|
print " -e [ --del-empty-link ] Delete link after deleting last route on the link"
|
|
print " -s [ --src-local ] Make connection to source broker (push route)"
|
|
print " --ack N Acknowledge transfers over the bridge in batches of N"
|
|
print " -t <transport> [ --transport <transport>]"
|
|
print " Specify transport to use for links, defaults to tcp"
|
|
print
|
|
print " dest-broker and src-broker are in the form: [username/password@] hostname | ip-address [:<port>]"
|
|
print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
|
|
print
|
|
sys.exit(1)
|
|
|
|
_verbose = False
|
|
_quiet = False
|
|
_durable = False
|
|
_dellink = False
|
|
_srclocal = False
|
|
_transport = "tcp"
|
|
_ack = 0
|
|
_connTimeout = 10
|
|
|
|
class RouteManager:
|
|
def __init__(self, localBroker):
|
|
self.local = BrokerURL(localBroker)
|
|
self.remote = None
|
|
self.qmf = Session()
|
|
self.broker = self.qmf.addBroker(localBroker, _connTimeout)
|
|
|
|
def disconnect(self):
|
|
self.qmf.delBroker(self.broker)
|
|
|
|
def getLink(self):
|
|
links = self.qmf.getObjects(_class="link")
|
|
for link in links:
|
|
if self.remote.match(link.host, link.port):
|
|
return link
|
|
return None
|
|
|
|
def addLink(self, remoteBroker):
|
|
self.remote = BrokerURL(remoteBroker)
|
|
if self.local.match(self.remote.host, self.remote.port):
|
|
raise Exception("Linking broker to itself is not permitted")
|
|
|
|
brokers = self.qmf.getObjects(_class="broker")
|
|
broker = brokers[0]
|
|
link = self.getLink()
|
|
if link == None:
|
|
if not self.remote.authName or self.remote.authName == "anonymous":
|
|
mech = "ANONYMOUS"
|
|
else:
|
|
mech = "PLAIN"
|
|
res = broker.connect(self.remote.host, self.remote.port, _durable,
|
|
mech, self.remote.authName or "", self.remote.authPass or "",
|
|
_transport)
|
|
if _verbose:
|
|
print "Connect method returned:", res.status, res.text
|
|
|
|
def delLink(self, remoteBroker):
|
|
self.remote = BrokerURL(remoteBroker)
|
|
brokers = self.qmf.getObjects(_class="broker")
|
|
broker = brokers[0]
|
|
link = self.getLink()
|
|
if link == None:
|
|
raise Exception("Link not found")
|
|
|
|
res = link.close()
|
|
if _verbose:
|
|
print "Close method returned:", res.status, res.text
|
|
|
|
def listLinks(self):
|
|
links = self.qmf.getObjects(_class="link")
|
|
if len(links) == 0:
|
|
print "No Links Found"
|
|
else:
|
|
print
|
|
print "Host Port Transport Durable State Last Error"
|
|
print "============================================================================="
|
|
for link in links:
|
|
print "%-16s%-8d%-13s%c %-18s%s" % \
|
|
(link.host, link.port, link.transport, YN(link.durable), link.state, link.lastError)
|
|
|
|
def mapRoutes(self):
|
|
qmf = self.qmf
|
|
print
|
|
print "Finding Linked Brokers:"
|
|
|
|
brokerList = {}
|
|
brokerList[self.local.name()] = self.broker
|
|
print " %s... Ok" % self.local
|
|
|
|
added = True
|
|
while added:
|
|
added = False
|
|
links = qmf.getObjects(_class="link")
|
|
for link in links:
|
|
url = BrokerURL("%s:%d" % (link.host, link.port))
|
|
if url.name() not in brokerList:
|
|
print " %s..." % url.name(),
|
|
try:
|
|
b = qmf.addBroker("%s:%d" % (link.host, link.port), _connTimeout)
|
|
brokerList[url.name()] = b
|
|
added = True
|
|
print "Ok"
|
|
except Exception, e:
|
|
print e
|
|
|
|
print
|
|
print "Dynamic Routes:"
|
|
bridges = qmf.getObjects(_class="bridge", dynamic=True)
|
|
fedExchanges = []
|
|
for bridge in bridges:
|
|
if bridge.src not in fedExchanges:
|
|
fedExchanges.append(bridge.src)
|
|
if len(fedExchanges) == 0:
|
|
print " none found"
|
|
print
|
|
|
|
for ex in fedExchanges:
|
|
print " Exchange %s:" % ex
|
|
pairs = []
|
|
for bridge in bridges:
|
|
if bridge.src == ex:
|
|
link = bridge._linkRef_
|
|
fromUrl = "%s:%s" % (link.host, link.port)
|
|
toUrl = bridge.getBroker().getUrl()
|
|
found = False
|
|
for pair in pairs:
|
|
if pair.matches(fromUrl, toUrl):
|
|
found = True
|
|
if not found:
|
|
pairs.append(RoutePair(fromUrl, toUrl))
|
|
for pair in pairs:
|
|
print " %s" % pair
|
|
print
|
|
|
|
print "Static Routes:"
|
|
bridges = qmf.getObjects(_class="bridge", dynamic=False)
|
|
if len(bridges) == 0:
|
|
print " none found"
|
|
print
|
|
|
|
for bridge in bridges:
|
|
link = bridge._linkRef_
|
|
fromUrl = "%s:%s" % (link.host, link.port)
|
|
toUrl = bridge.getBroker().getUrl()
|
|
leftType = "ex"
|
|
rightType = "ex"
|
|
if bridge.srcIsLocal:
|
|
arrow = "=>"
|
|
left = bridge.src
|
|
right = bridge.dest
|
|
if bridge.srcIsQueue:
|
|
leftType = "queue"
|
|
else:
|
|
arrow = "<="
|
|
left = bridge.dest
|
|
right = bridge.src
|
|
if bridge.srcIsQueue:
|
|
rightType = "queue"
|
|
|
|
if bridge.srcIsQueue:
|
|
print " %s(%s=%s) %s %s(%s=%s)" % \
|
|
(toUrl, leftType, left, arrow, fromUrl, rightType, right)
|
|
else:
|
|
print " %s(%s=%s) %s %s(%s=%s) key=%s" % \
|
|
(toUrl, leftType, left, arrow, fromUrl, rightType, right, bridge.key)
|
|
print
|
|
|
|
for broker in brokerList:
|
|
if broker != self.local.name():
|
|
qmf.delBroker(brokerList[broker])
|
|
|
|
|
|
def addRoute(self, remoteBroker, exchange, routingKey, tag, excludes, dynamic=False):
|
|
if dynamic and _srclocal:
|
|
raise Exception("--src-local is not permitted on dynamic routes")
|
|
|
|
self.addLink(remoteBroker)
|
|
link = self.getLink()
|
|
if link == None:
|
|
raise Exception("Link failed to create")
|
|
|
|
bridges = self.qmf.getObjects(_class="bridge")
|
|
for bridge in bridges:
|
|
if bridge.linkRef == link.getObjectId() and \
|
|
bridge.dest == exchange and bridge.key == routingKey and not bridge.srcIsQueue:
|
|
if not _quiet:
|
|
raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey))
|
|
sys.exit(0)
|
|
|
|
if _verbose:
|
|
print "Creating inter-broker binding..."
|
|
res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, False, _srclocal, dynamic, _ack)
|
|
if res.status != 0:
|
|
raise Exception(res.text)
|
|
if _verbose:
|
|
print "Bridge method returned:", res.status, res.text
|
|
|
|
def addQueueRoute(self, remoteBroker, exchange, queue):
|
|
self.addLink(remoteBroker)
|
|
link = self.getLink()
|
|
if link == None:
|
|
raise Exception("Link failed to create")
|
|
|
|
bridges = self.qmf.getObjects(_class="bridge")
|
|
for bridge in bridges:
|
|
if bridge.linkRef == link.getObjectId() and \
|
|
bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue:
|
|
if not _quiet:
|
|
raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, queue))
|
|
sys.exit(0)
|
|
|
|
if _verbose:
|
|
print "Creating inter-broker binding..."
|
|
res = link.bridge(_durable, queue, exchange, "", "", "", True, _srclocal, False, _ack)
|
|
if res.status != 0:
|
|
raise Exception(res.text)
|
|
if _verbose:
|
|
print "Bridge method returned:", res.status, res.text
|
|
|
|
def delQueueRoute(self, remoteBroker, exchange, queue):
|
|
self.remote = BrokerURL(remoteBroker)
|
|
link = self.getLink()
|
|
if link == None:
|
|
if not _quiet:
|
|
raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name()))
|
|
sys.exit(0)
|
|
|
|
bridges = self.qmf.getObjects(_class="bridge")
|
|
for bridge in bridges:
|
|
if bridge.linkRef == link.getObjectId() and \
|
|
bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue:
|
|
if _verbose:
|
|
print "Closing bridge..."
|
|
res = bridge.close()
|
|
if res.status != 0:
|
|
raise Exception("Error closing bridge: %d - %s" % (res.status, res.text))
|
|
if len(bridges) == 1 and _dellink:
|
|
link = self.getLink()
|
|
if link == None:
|
|
sys.exit(0)
|
|
if _verbose:
|
|
print "Last bridge on link, closing link..."
|
|
res = link.close()
|
|
if res.status != 0:
|
|
raise Exception("Error closing link: %d - %s" % (res.status, res.text))
|
|
sys.exit(0)
|
|
if not _quiet:
|
|
raise Exception("Route not found")
|
|
|
|
def delRoute(self, remoteBroker, exchange, routingKey, dynamic=False):
|
|
self.remote = BrokerURL(remoteBroker)
|
|
link = self.getLink()
|
|
if link == None:
|
|
if not _quiet:
|
|
raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name()))
|
|
sys.exit(0)
|
|
|
|
bridges = self.qmf.getObjects(_class="bridge")
|
|
for bridge in bridges:
|
|
if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey \
|
|
and bridge.dynamic == dynamic:
|
|
if _verbose:
|
|
print "Closing bridge..."
|
|
res = bridge.close()
|
|
if res.status != 0:
|
|
raise Exception("Error closing bridge: %d - %s" % (res.status, res.text))
|
|
if len(bridges) == 1 and _dellink:
|
|
link = self.getLink()
|
|
if link == None:
|
|
sys.exit(0)
|
|
if _verbose:
|
|
print "Last bridge on link, closing link..."
|
|
res = link.close()
|
|
if res.status != 0:
|
|
raise Exception("Error closing link: %d - %s" % (res.status, res.text))
|
|
sys.exit(0)
|
|
if not _quiet:
|
|
raise Exception("Route not found")
|
|
|
|
def listRoutes(self):
|
|
links = self.qmf.getObjects(_class="link")
|
|
bridges = self.qmf.getObjects(_class="bridge")
|
|
|
|
for bridge in bridges:
|
|
myLink = None
|
|
for link in links:
|
|
if bridge.linkRef == link.getObjectId():
|
|
myLink = link
|
|
break
|
|
if myLink != None:
|
|
if bridge.dynamic:
|
|
keyText = "<dynamic>"
|
|
else:
|
|
keyText = bridge.key
|
|
print "%s %s:%d %s %s" % (self.local.name(), myLink.host, myLink.port, bridge.dest, keyText)
|
|
|
|
def clearAllRoutes(self):
|
|
links = self.qmf.getObjects(_class="link")
|
|
bridges = self.qmf.getObjects(_class="bridge")
|
|
|
|
for bridge in bridges:
|
|
if _verbose:
|
|
myLink = None
|
|
for link in links:
|
|
if bridge.linkRef == link.getObjectId():
|
|
myLink = link
|
|
break
|
|
if myLink != None:
|
|
print "Deleting Bridge: %s:%d %s %s... " % (myLink.host, myLink.port, bridge.dest, bridge.key),
|
|
res = bridge.close()
|
|
if res.status != 0:
|
|
print "Error: %d - %s" % (res.status, res.text)
|
|
elif _verbose:
|
|
print "Ok"
|
|
|
|
if _dellink:
|
|
links = self.qmf.getObjects(_class="link")
|
|
for link in links:
|
|
if _verbose:
|
|
print "Deleting Link: %s:%d... " % (link.host, link.port),
|
|
res = link.close()
|
|
if res.status != 0:
|
|
print "Error: %d - %s" % (res.status, res.text)
|
|
elif _verbose:
|
|
print "Ok"
|
|
|
|
class RoutePair:
|
|
def __init__(self, fromUrl, toUrl):
|
|
self.fromUrl = fromUrl
|
|
self.toUrl = toUrl
|
|
self.bidir = False
|
|
|
|
def __repr__(self):
|
|
if self.bidir:
|
|
delimit = "<=>"
|
|
else:
|
|
delimit = " =>"
|
|
return "%s %s %s" % (self.fromUrl, delimit, self.toUrl)
|
|
|
|
def matches(self, fromUrl, toUrl):
|
|
if fromUrl == self.fromUrl and toUrl == self.toUrl:
|
|
return True
|
|
if toUrl == self.fromUrl and fromUrl == self.toUrl:
|
|
self.bidir = True
|
|
return True
|
|
return False
|
|
|
|
|
|
def YN(val):
|
|
if val == 1:
|
|
return 'Y'
|
|
return 'N'
|
|
|
|
##
|
|
## Main Program
|
|
##
|
|
|
|
try:
|
|
longOpts = ("verbose", "quiet", "durable", "del-empty-link", "src-local", "transport=", "ack=", "timeout=")
|
|
(optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "vqdest:", longOpts)
|
|
except:
|
|
Usage()
|
|
|
|
try:
|
|
encoding = locale.getpreferredencoding()
|
|
cargs = [a.decode(encoding) for a in encArgs]
|
|
except:
|
|
cargs = encArgs
|
|
|
|
for opt in optlist:
|
|
if opt[0] == "--timeout":
|
|
_connTimeout = int(opt[1])
|
|
if _connTimeout == 0:
|
|
_connTimeout = None
|
|
if opt[0] == "-v" or opt[0] == "--verbose":
|
|
_verbose = True
|
|
if opt[0] == "-q" or opt[0] == "--quiet":
|
|
_quiet = True
|
|
if opt[0] == "-d" or opt[0] == "--durable":
|
|
_durable = True
|
|
if opt[0] == "-e" or opt[0] == "--del-empty-link":
|
|
_dellink = True
|
|
if opt[0] == "-s" or opt[0] == "--src-local":
|
|
_srclocal = True
|
|
if opt[0] == "-t" or opt[0] == "--transport":
|
|
_transport = opt[1]
|
|
if opt[0] == "--ack":
|
|
_ack = int(opt[1])
|
|
|
|
nargs = len(cargs)
|
|
if nargs < 2:
|
|
Usage()
|
|
if nargs == 2:
|
|
localBroker = "localhost"
|
|
else:
|
|
if _srclocal:
|
|
localBroker = cargs[3]
|
|
remoteBroker = cargs[2]
|
|
else:
|
|
localBroker = cargs[2]
|
|
if nargs > 3:
|
|
remoteBroker = cargs[3]
|
|
|
|
group = cargs[0]
|
|
cmd = cargs[1]
|
|
|
|
try:
|
|
rm = RouteManager(localBroker)
|
|
if group == "link":
|
|
if cmd == "add":
|
|
if nargs != 4:
|
|
Usage()
|
|
rm.addLink(remoteBroker)
|
|
elif cmd == "del":
|
|
if nargs != 4:
|
|
Usage()
|
|
rm.delLink(remoteBroker)
|
|
elif cmd == "list":
|
|
rm.listLinks()
|
|
|
|
elif group == "dynamic":
|
|
if cmd == "add":
|
|
if nargs < 5 or nargs > 7:
|
|
Usage()
|
|
|
|
tag = ""
|
|
excludes = ""
|
|
if nargs > 5: tag = cargs[5]
|
|
if nargs > 6: excludes = cargs[6]
|
|
rm.addRoute(remoteBroker, cargs[4], "", tag, excludes, dynamic=True)
|
|
elif cmd == "del":
|
|
if nargs != 5:
|
|
Usage()
|
|
else:
|
|
rm.delRoute(remoteBroker, cargs[4], "", dynamic=True)
|
|
|
|
elif group == "route":
|
|
if cmd == "add":
|
|
if nargs < 6 or nargs > 8:
|
|
Usage()
|
|
|
|
tag = ""
|
|
excludes = ""
|
|
if nargs > 6: tag = cargs[6]
|
|
if nargs > 7: excludes = cargs[7]
|
|
rm.addRoute(remoteBroker, cargs[4], cargs[5], tag, excludes, dynamic=False)
|
|
elif cmd == "del":
|
|
if nargs != 6:
|
|
Usage()
|
|
rm.delRoute(remoteBroker, cargs[4], cargs[5], dynamic=False)
|
|
elif cmd == "map":
|
|
rm.mapRoutes()
|
|
else:
|
|
if cmd == "list":
|
|
rm.listRoutes()
|
|
elif cmd == "flush":
|
|
rm.clearAllRoutes()
|
|
else:
|
|
Usage()
|
|
|
|
elif group == "queue":
|
|
if nargs != 6:
|
|
Usage()
|
|
if cmd == "add":
|
|
rm.addQueueRoute(remoteBroker, exchange=cargs[4], queue=cargs[5])
|
|
elif cmd == "del":
|
|
rm.delQueueRoute(remoteBroker, exchange=cargs[4], queue=cargs[5])
|
|
else:
|
|
Usage()
|
|
|
|
except Exception,e:
|
|
print "Failed: %s - %s" % (e.__class__.__name__, e)
|
|
sys.exit(1)
|
|
|
|
rm.disconnect()
|