435 lines
12 KiB
Python
435 lines
12 KiB
Python
|
#
|
||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||
|
# or more contributor license agreements. See the NOTICE file
|
||
|
# distributed with this work for additional information
|
||
|
# regarding copyright ownership. The ASF licenses this file
|
||
|
# to you under the Apache License, Version 2.0 (the
|
||
|
# "License"); you may not use this file except in compliance
|
||
|
# with the License. You may obtain a copy of the License at
|
||
|
#
|
||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||
|
#
|
||
|
# Unless required by applicable law or agreed to in writing,
|
||
|
# software distributed under the License is distributed on an
|
||
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||
|
# KIND, either express or implied. See the License for the
|
||
|
# specific language governing permissions and limitations
|
||
|
# under the License.
|
||
|
#
|
||
|
|
||
|
class Sim:
|
||
|
def __init__(self):
|
||
|
self.brokers = {}
|
||
|
self.clients = {}
|
||
|
self.errors = 0
|
||
|
self.warnings = 0
|
||
|
|
||
|
def error(self, text):
|
||
|
self.errors += 1
|
||
|
print "###### Error:", text
|
||
|
|
||
|
def warning(self, text):
|
||
|
self.warnings += 1
|
||
|
print "###### Warning:", text
|
||
|
|
||
|
def end(self):
|
||
|
print "========================"
|
||
|
print "Errors: %d, Warnings: %d" % (self.errors, self.warnings)
|
||
|
print "========================"
|
||
|
|
||
|
def dumpState(self):
|
||
|
print "============================"
|
||
|
print "===== Federation State ====="
|
||
|
print "============================"
|
||
|
for broker in self.brokers:
|
||
|
for exchange in self.brokers[broker].exchanges:
|
||
|
print "Exchange %s.%s" % (broker, exchange)
|
||
|
for key in self.brokers[broker].exchanges[exchange].keys:
|
||
|
print " Key %s" % key
|
||
|
for queue in self.brokers[broker].exchanges[exchange].keys[key]:
|
||
|
print " Queue %s origins=%s" % \
|
||
|
(queue.name, self.brokers[broker].exchanges[exchange].keys[key][queue].originList)
|
||
|
|
||
|
def addBroker(self, name):
|
||
|
if name in self.brokers:
|
||
|
raise Exception("Broker of same name already exists")
|
||
|
broker = Broker(self, name)
|
||
|
self.brokers[name] = broker
|
||
|
return broker
|
||
|
|
||
|
def addClient(self, name, broker):
|
||
|
if name in self.clients:
|
||
|
raise Exception("Client of same name already exists")
|
||
|
client = Client(self, name, broker)
|
||
|
self.clients[name] = client
|
||
|
return client
|
||
|
|
||
|
def link(self, left, right, bidir=True):
|
||
|
print "====== link %s to %s, bidir=%s" % (left.tag, right.tag, bidir)
|
||
|
l1 = left.createLink(right)
|
||
|
l1.bridge("amq.direct")
|
||
|
if bidir:
|
||
|
l2 = right.createLink(left)
|
||
|
l2.bridge("amq.direct")
|
||
|
|
||
|
def bind(self, client, key):
|
||
|
print "====== bind Client(%s): k=%s" % (client.name, key)
|
||
|
client.bind(key)
|
||
|
|
||
|
def unbind(self, client, key):
|
||
|
print "====== unbind Client(%s): k=%s" % (client.name, key)
|
||
|
client.unbind(key)
|
||
|
|
||
|
def sendMessage(self, key, broker, body="Message Body"):
|
||
|
print "====== sendMessage: broker=%s k=%s" % (broker.tag, key)
|
||
|
msg = Message(key, body)
|
||
|
exchange = broker.exchanges["amq.direct"]
|
||
|
for client in self.clients:
|
||
|
self.clients[client].expect(key);
|
||
|
exchange.receive(key, msg, True)
|
||
|
for client in self.clients:
|
||
|
self.clients[client].checkReception()
|
||
|
|
||
|
|
||
|
class Destination:
|
||
|
def receive(self, key, msg, fromUser=False):
|
||
|
pass
|
||
|
|
||
|
|
||
|
class Client(Destination):
|
||
|
def __init__(self, sim, name, broker):
|
||
|
self.sim = sim
|
||
|
self.name = name
|
||
|
self.broker = broker
|
||
|
self.broker.connect(self)
|
||
|
self.queue = self.broker.declare_queue(name)
|
||
|
self.subscription = self.broker.subscribe(self, name)
|
||
|
self.expected = None
|
||
|
self.boundKeys = []
|
||
|
|
||
|
def bind(self, key):
|
||
|
self.boundKeys.append(key)
|
||
|
self.broker.bind("amq.direct", self.name, key)
|
||
|
|
||
|
def unbind(self, key):
|
||
|
self.boundKeys.remove(key)
|
||
|
self.broker.unbind("amq.direct", self.name, key)
|
||
|
|
||
|
def receive(self, key, msg, fromUser=False):
|
||
|
print "Client(%s) received [%s]: %s" % (self.name, key, msg.body)
|
||
|
if self.expected == key:
|
||
|
self.expected = None
|
||
|
else:
|
||
|
self.sim.error("Client(%s) received unexpected message with key [%s]" % \
|
||
|
(self.name, self.expected))
|
||
|
|
||
|
def expect(self, key):
|
||
|
if key in self.boundKeys:
|
||
|
self.expected = key
|
||
|
|
||
|
def checkReception(self):
|
||
|
if self.expected:
|
||
|
self.sim.error("Client(%s) never received message with key [%s]" % \
|
||
|
(self.name, self.expected))
|
||
|
|
||
|
class Broker(Client):
|
||
|
def __init__(self, sim, tag):
|
||
|
self.sim = sim
|
||
|
self.tag = tag
|
||
|
self.connections = {}
|
||
|
self.exchanges = {}
|
||
|
self.queues = {}
|
||
|
self.subscriptions = {}
|
||
|
self.links = {}
|
||
|
self.directExchange = self.declare_exchange("amq.direct")
|
||
|
|
||
|
def connect(self, client):
|
||
|
if client in self.connections:
|
||
|
raise Exception("Client already connected")
|
||
|
self.connections[client] = Connection(client)
|
||
|
|
||
|
def declare_queue(self, name, tag=None, exclude=None):
|
||
|
if name in self.queues:
|
||
|
raise Exception("Queue already exists")
|
||
|
self.queues[name] = Queue(self, name, tag, exclude)
|
||
|
|
||
|
def subscribe(self, dest, queueName):
|
||
|
if queueName not in self.queues:
|
||
|
raise Exception("Queue does not exist")
|
||
|
self.queues[queueName].setDest(dest)
|
||
|
|
||
|
def declare_exchange(self, name):
|
||
|
if name in self.exchanges:
|
||
|
return
|
||
|
exchange = Exchange(self, name)
|
||
|
self.exchanges[name] = exchange
|
||
|
return exchange
|
||
|
|
||
|
def bind(self, exchangeName, queueName, key, tagList=[], fedOp=None, origin=None):
|
||
|
if exchangeName not in self.exchanges:
|
||
|
raise Exception("Exchange not found")
|
||
|
if queueName not in self.queues:
|
||
|
raise Exception("Queue not found")
|
||
|
exchange = self.exchanges[exchangeName]
|
||
|
queue = self.queues[queueName]
|
||
|
exchange.bind(queue, key, tagList, fedOp, origin)
|
||
|
|
||
|
def unbind(self, exchangeName, queueName, key):
|
||
|
if exchangeName not in self.exchanges:
|
||
|
raise Exception("Exchange not found")
|
||
|
if queueName not in self.queues:
|
||
|
raise Exception("Queue not found")
|
||
|
exchange = self.exchanges[exchangeName]
|
||
|
queue = self.queues[queueName]
|
||
|
exchange.unbind(queue, key)
|
||
|
|
||
|
def createLink(self, other):
|
||
|
if other in self.links:
|
||
|
raise Exception("Peer broker already linked")
|
||
|
link = Link(self, other)
|
||
|
self.links[other] = link
|
||
|
return link
|
||
|
|
||
|
|
||
|
class Connection:
|
||
|
def __init__(self, client):
|
||
|
self.client = client
|
||
|
|
||
|
|
||
|
class Exchange(Destination):
|
||
|
def __init__(self, broker, name):
|
||
|
self.broker = broker
|
||
|
self.sim = broker.sim
|
||
|
self.name = name
|
||
|
self.keys = {}
|
||
|
self.bridges = []
|
||
|
|
||
|
def bind(self, queue, key, tagList, fedOp, origin):
|
||
|
if not fedOp: fedOp = "bind"
|
||
|
print "Exchange(%s.%s) bind q=%s, k=%s, tags=%s, op=%s, origin=%s" % \
|
||
|
(self.broker.tag, self.name, queue.name, key, tagList, fedOp, origin),
|
||
|
|
||
|
if self.broker.tag in tagList:
|
||
|
print "(tag ignored)"
|
||
|
return
|
||
|
|
||
|
if fedOp == "bind" or fedOp == "unbind":
|
||
|
if key not in self.keys:
|
||
|
self.keys[key] = {}
|
||
|
queueMap = self.keys[key]
|
||
|
|
||
|
if fedOp == "bind":
|
||
|
##
|
||
|
## Add local or federation binding case
|
||
|
##
|
||
|
if queue in queueMap:
|
||
|
if origin and origin in queueMap[queue].originList:
|
||
|
print "(dup ignored)"
|
||
|
elif origin:
|
||
|
queueMap[queue].originList.append(origin)
|
||
|
print "(origin added)"
|
||
|
else:
|
||
|
binding = Binding(origin)
|
||
|
queueMap[queue] = binding
|
||
|
print "(binding added)"
|
||
|
|
||
|
elif fedOp == "unbind":
|
||
|
##
|
||
|
## Delete federation binding case
|
||
|
##
|
||
|
if queue in queueMap:
|
||
|
binding = queueMap[queue]
|
||
|
if origin and origin in binding.originList:
|
||
|
binding.originList.remove(origin)
|
||
|
if len(binding.originList) == 0:
|
||
|
queueMap.pop(queue)
|
||
|
if len(queueMap) == 0:
|
||
|
self.keys.pop(key)
|
||
|
print "(last origin del)"
|
||
|
else:
|
||
|
print "(removed origin)"
|
||
|
else:
|
||
|
print "(origin not found)"
|
||
|
else:
|
||
|
print "(queue not found)"
|
||
|
|
||
|
elif fedOp == "reorigin":
|
||
|
print "(ok)"
|
||
|
self.reorigin()
|
||
|
|
||
|
elif fedOp == "hello":
|
||
|
print "(ok)"
|
||
|
|
||
|
else:
|
||
|
raise Exception("Unknown fed-opcode '%s'" % fedOp)
|
||
|
|
||
|
newTagList = []
|
||
|
newTagList.append(self.broker.tag)
|
||
|
for tag in tagList:
|
||
|
newTagList.append(tag)
|
||
|
if origin:
|
||
|
propOrigin = origin
|
||
|
else:
|
||
|
propOrigin = self.broker.tag
|
||
|
|
||
|
for bridge in self.bridges:
|
||
|
if bridge.isDynamic():
|
||
|
bridge.propagate(key, newTagList, fedOp, propOrigin)
|
||
|
|
||
|
def reorigin(self):
|
||
|
myTag = []
|
||
|
myTag.append(self.broker.tag)
|
||
|
for key in self.keys:
|
||
|
queueMap = self.keys[key]
|
||
|
found = False
|
||
|
for queue in queueMap:
|
||
|
binding = queueMap[queue]
|
||
|
if binding.isLocal():
|
||
|
found = True
|
||
|
if found:
|
||
|
for bridge in self.bridges:
|
||
|
if bridge.isDynamic():
|
||
|
bridge.propagate(key, myTag, "bind", self.broker.tag)
|
||
|
|
||
|
def unbind(self, queue, key):
|
||
|
print "Exchange(%s.%s) unbind q=%s, k=%s" % (self.broker.tag, self.name, queue.name, key),
|
||
|
if key not in self.keys:
|
||
|
print "(key not known)"
|
||
|
return
|
||
|
queueMap = self.keys[key]
|
||
|
if queue not in queueMap:
|
||
|
print "(queue not bound)"
|
||
|
return
|
||
|
queueMap.pop(queue)
|
||
|
if len(queueMap) == 0:
|
||
|
self.keys.pop(key)
|
||
|
print "(ok, remove bound-key)"
|
||
|
else:
|
||
|
print "(ok)"
|
||
|
|
||
|
count = 0
|
||
|
for queue in queueMap:
|
||
|
if len(queueMap[queue].originList) == 0:
|
||
|
count += 1
|
||
|
|
||
|
if count == 0:
|
||
|
myTag = []
|
||
|
myTag.append(self.broker.tag)
|
||
|
for bridge in self.bridges:
|
||
|
if bridge.isDynamic():
|
||
|
bridge.propagate(key, myTag, "unbind", self.broker.tag)
|
||
|
|
||
|
def receive(self, key, msg, fromUser=False):
|
||
|
sent = False
|
||
|
if key in self.keys:
|
||
|
queueMap = self.keys[key]
|
||
|
for queue in queueMap:
|
||
|
if queue.enqueue(msg):
|
||
|
sent = True
|
||
|
if not sent and not fromUser:
|
||
|
self.sim.warning("Exchange(%s.%s) received unroutable message: k=%s" % \
|
||
|
(self.broker.tag, self.name, key))
|
||
|
|
||
|
def addDynamicBridge(self, bridge):
|
||
|
if bridge in self.bridges:
|
||
|
raise Exception("Dynamic bridge already added to exchange")
|
||
|
self.bridges.append(bridge)
|
||
|
|
||
|
for b in self.bridges:
|
||
|
if b != bridge:
|
||
|
b.sendReorigin()
|
||
|
self.reorigin()
|
||
|
|
||
|
class Queue:
|
||
|
def __init__(self, broker, name, tag=None, exclude=None):
|
||
|
self.broker = broker
|
||
|
self.name = name
|
||
|
self.tag = tag
|
||
|
self.exclude = exclude
|
||
|
self.dest = None
|
||
|
|
||
|
def setDest(self, dest):
|
||
|
self.dest = dest
|
||
|
|
||
|
def enqueue(self, msg):
|
||
|
print "Queue(%s.%s) rcvd k=%s, tags=%s" % (self.broker.tag, self.name, msg.key, msg.tags),
|
||
|
if self.dest == None:
|
||
|
print "(dropped, no dest)"
|
||
|
return False
|
||
|
if self.exclude and msg.tagFound(self.exclude):
|
||
|
print "(dropped, tag)"
|
||
|
return False
|
||
|
if self.tag:
|
||
|
msg.appendTag(self.tag)
|
||
|
print "(ok)"
|
||
|
self.dest.receive(msg.key, msg)
|
||
|
return True
|
||
|
|
||
|
|
||
|
class Binding:
|
||
|
def __init__(self, origin):
|
||
|
self.originList = []
|
||
|
if origin:
|
||
|
self.originList.append(origin)
|
||
|
|
||
|
def isLocal(self):
|
||
|
return len(self.originList) == 0
|
||
|
|
||
|
|
||
|
class Link:
|
||
|
def __init__(self, local, remote):
|
||
|
self.local = local
|
||
|
self.remote = remote
|
||
|
self.remote.connect(self)
|
||
|
self.bridges = []
|
||
|
|
||
|
def bridge(self, exchangeName):
|
||
|
bridge = Bridge(self, exchangeName)
|
||
|
|
||
|
|
||
|
class Bridge:
|
||
|
def __init__(self, link, exchangeName):
|
||
|
self.link = link
|
||
|
self.exchangeName = exchangeName
|
||
|
if self.exchangeName not in link.local.exchanges:
|
||
|
raise Exception("Exchange not found")
|
||
|
self.exchange = link.local.exchanges[self.exchangeName]
|
||
|
self.queueName = "bridge." + link.local.tag
|
||
|
self.link.remote.declare_queue(self.queueName, self.link.remote.tag, self.link.local.tag)
|
||
|
self.link.remote.subscribe(self.exchange, self.queueName)
|
||
|
self.exchange.addDynamicBridge(self)
|
||
|
|
||
|
def isDynamic(self):
|
||
|
return True
|
||
|
|
||
|
def localTag(self):
|
||
|
return self.link.local.tag
|
||
|
|
||
|
def remoteTag(self):
|
||
|
return self.link.remote.tag
|
||
|
|
||
|
def propagate(self, key, tagList, fedOp, origin):
|
||
|
if self.link.remote.tag not in tagList:
|
||
|
self.link.remote.bind(self.exchangeName, self.queueName, key, tagList, fedOp, origin)
|
||
|
|
||
|
def sendReorigin(self):
|
||
|
myTag = []
|
||
|
myTag.append(self.link.local.tag)
|
||
|
self.link.remote.bind(self.exchangeName, self.queueName, "", myTag, "reorigin", "")
|
||
|
|
||
|
|
||
|
class Message:
|
||
|
def __init__(self, key, body):
|
||
|
self.key = key
|
||
|
self.body = body
|
||
|
self.tags = []
|
||
|
|
||
|
def appendTag(self, tag):
|
||
|
if tag not in self.tags:
|
||
|
self.tags.append(tag)
|
||
|
|
||
|
def tagFound(self, tag):
|
||
|
return tag in self.tags
|
||
|
|
||
|
|