* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
using System;
using System.Collections.Generic;
using System.IO ;
using System.Reflection ;
using System.Threading ;
using log4net ;
using org.apache.qpid.client ;
using org.apache.qpid.transport.util;
using org.apache.qpid.transport.codec ;
namespace org.apache.qpid.console
* Main Interaction point for interaction with the bus. Can be used to locate objects
* on the bus, and invoke messages on them.
public class Session
public static ILog log = LogManager.GetLogger(typeof(Session)) ;
public static int CONTEXT_SYNC = 1 ;
public static int CONTEXT_STARTUP = 2 ;
public static int CONTEXT_MULTIGET = 3 ;
public static int DEFAULT_GET_WAIT_TIME = 60000 ;
public bool RecieveObjects = true ;
public bool RecieveEvents = true ;
public bool RecieveHeartbeat = true ;
public bool UserBindings = false ;
public Console Console ;
protected Dictionary<string, Dictionary<string, SchemaClass>> Packages = new Dictionary<string, Dictionary<string, SchemaClass>>() ;
protected List<Broker> Brokers = new List<Broker>() ;
protected SequenceManager SequenceManager = new SequenceManager() ;
protected object LockObject = new Object();
protected List<long> SyncSequenceList = new List<long>() ;
protected List<QMFObject> GetResult ;
protected Object SyncResult ;
public Session()
public Session(Console console) {
Console = console ;
public void AddBroker(string url) {
BrokerURL brokerurl = GetBrokerURL(url) ;
Broker broker = new Broker(this, brokerurl) ;
Brokers.Add(broker) ;
Dictionary<string, object> args = new Dictionary<string, object>() ;
args.Add("_class", "agent") ;
args.Add("_broker", broker) ;
this.GetObjects(args) ;
public void RemoveBroker(Broker broker) {
if (Brokers.Contains(broker)) {
Brokers.Remove(broker) ;
broker.Shutdown() ;
public void Close() {
foreach (Broker broker in Brokers.ToArray()) {
this.RemoveBroker(broker) ;
protected BrokerURL GetBrokerURL(string url) {
return new BrokerURL(url) ;
public List<QMFObject> GetObjects(Dictionary<string, object> args) {
List<Broker> brokerList = null ;
List<Agent> agentList = new List<Agent>() ;
if (args.ContainsKey("_broker")) {
brokerList = new List<Broker>() ;
brokerList.Add((Broker)args["_broker"]) ;
} else {
brokerList = this.Brokers ;
foreach (Broker broker in brokerList) {
broker.WaitForStable() ;
if (args.ContainsKey("_agent")) {
Agent agent = (Agent)args["_agent"] ;
if (brokerList.Contains(agent.Broker)) {
agentList.Add(agent) ;
} else {
throw new Exception("Agent is not managed by this console or the supplied broker") ;
} else {
if (args.ContainsKey("_objectId")) {
ObjectID oid = (ObjectID) args["_objectId"] ;
foreach (Broker broker in Brokers) {
foreach (Agent agent in broker.Agents.Values) {
if ((agent.AgentBank == oid.AgentBank()) && (agent.BrokerBank == oid.BrokerBank())) {
agentList.Add(agent) ;
else {
foreach (Broker broker in brokerList) {
foreach (Agent agent in broker.Agents.Values) {
if (agent.Broker.IsConnected()) {
agentList.Add(agent) ;
GetResult = new List<QMFObject>() ;
if (agentList.Count > 0) {
//FIXME Add a bunch of other suff too
foreach (Agent agent in agentList) {
Dictionary<string, object> getParameters = new Dictionary<string, object>() ;
Broker broker = agent.Broker ;
long seq = -1 ;
lock(LockObject) {
seq = SequenceManager.Reserve(Session.CONTEXT_MULTIGET) ;
SyncSequenceList.Add(seq) ;
object packageName = null ;
object className = null ;
object key = null ;
object sClass = null ;
object oid = null ;
object hash = null ;
args.TryGetValue("_schema", out sClass) ;
args.TryGetValue("_key", out key) ;
args.TryGetValue("_class", out className) ;
args.TryGetValue("_package", out packageName) ;
args.TryGetValue("_objectID", out oid) ;
args.TryGetValue("_hash", out hash) ;
if ((className == null) && (oid == null) && (oid == null)) {
throw new Exception("No class supplied, use '_schema', '_key', '_class', or '_objectId' argument") ;
if (oid != null) {
getParameters.Add("_objectID", oid) ;
else {
if (sClass != null) {
key = key ?? ((SchemaClass)sClass).Key ;
if (key != null) {
ClassKey cKey = (ClassKey)key ;
className = className ?? cKey.ClassName ;
packageName = packageName ?? cKey.PackageName ;
hash = hash ?? cKey.Hash ;
if (packageName != null) {
getParameters.Add("_package", packageName) ;
if (className != null) {
getParameters.Add("_class", className) ;
if (hash != null) {
getParameters.Add("_hash", hash) ;
foreach (KeyValuePair<string, object> pair in args) {
if (!pair.Key.StartsWith("_")) {
getParameters.Add(pair.Key, pair.Value) ;
IEncoder enc = broker.CreateEncoder('G', seq) ;
enc.WriteMap(getParameters) ;
string routingKey = String.Format("agent.{0}.{1}", agent.BrokerBank, agent.AgentBank) ;
Message msg = broker.CreateMessage(enc, routingKey) ;
log.Debug("Get Object Keys: ") ;
foreach (string pKey in getParameters.Keys) {
log.Debug(String.Format("\tKey: '{0}' Value: '{1}'", pKey, getParameters[pKey])) ;
broker.Send(msg) ;
int waittime = DEFAULT_GET_WAIT_TIME ;
bool timeout = false ;
if (args.ContainsKey("_timeout")) {
waittime = (int) args["_timeout"] ;
DateTime start = DateTime.Now ;
lock (LockObject) {
while (SyncSequenceList.Count > 0){
Monitor.Wait(LockObject,waittime) ;
TimeSpan duration = DateTime.Now - start;
if (duration.TotalMilliseconds > waittime) {
foreach (long pendingSeq in SyncSequenceList) {
SequenceManager.Release(pendingSeq) ;
SyncSequenceList.Clear() ;
timeout = true ;
//FIXME Add the error logic
if ((GetResult.Count == 0) && timeout) {
throw new Exception ("Get Request timed out") ;
return GetResult ;
public List<string> GetPackages() {
this.WaitForStable() ;
List<string> returnValue = new List<string>() ;
foreach (String name in Packages.Keys) {
returnValue.Add(name) ;
return returnValue ;
public List<ClassKey> GetClasses(string packageName) {
List<ClassKey> returnValue = new List<ClassKey>() ;
this.WaitForStable() ;
if (Packages.ContainsKey(packageName)) {
foreach (SchemaClass sClass in Packages[packageName].Values) {
returnValue.Add(sClass.Key) ;
return returnValue ;
public SchemaClass GetSchema(ClassKey key) {
return GetSchema(key, true) ;
protected SchemaClass GetSchema(ClassKey key, bool waitForStable) {
if (waitForStable) {
this.WaitForStable() ;
SchemaClass returnValue = null ;
try {
returnValue = Packages[key.PackageName][key.GetKeyString()] ;
catch (KeyNotFoundException) {
// eat it
return returnValue ;
protected void WaitForStable() {
foreach (Broker broker in Brokers) {
broker.WaitForStable() ;
public Broker GetBroker(long BrokerBank) {
Broker returnValue = null ;
foreach (Broker broker in Brokers) {
if (broker.BrokerBank() == BrokerBank) {
returnValue = broker ;
break ;
return returnValue ;
public MethodResult InvokeMethod(QMFObject obj, string name, List<object> args, bool synchronous, int timeToLive) {
Broker aBroker = this.GetBroker(obj.BrokerBank()) ;
long seq = this.SendMethodRequest(obj, aBroker, name, args, synchronous, timeToLive) ;
if (seq != 0) {
if (!synchronous) {
return null ;
try {
aBroker.WaitForSync(timeToLive) ;
} catch (Exception e) {
SequenceManager.Release(seq) ;
throw e ;
// FIXME missing error logic in the broker
return (MethodResult) SyncResult ;
return null ;
protected long SendMethodRequest(QMFObject obj, Broker aBroker, string name, List<object> args, bool synchronous, int timeToLive) {
SchemaMethod method = obj.Schema.GetMethod(name) ;
if (args == null) {
args = new List<object>() ;
long seq = 0 ;
if (method != null) {
KeyValuePair<SchemaMethod, bool> pair = new KeyValuePair<SchemaMethod, bool>(method, synchronous) ;
seq = SequenceManager.Reserve(pair) ;
IEncoder enc = aBroker.CreateEncoder('M', seq) ;
obj.ObjectID.encode(enc) ;
obj.Schema.Key.encode(enc) ;
enc.WriteStr8(name) ;
if (args.Count < method.InputArgCount) {
throw new Exception(String.Format("Incorrect number of arguments: expected {0}, got{1}", method.InputArgCount, args.Count)) ;
int argIndex = 0 ;
foreach (SchemaArgument arg in method.Arguments) {
if (arg.IsInput()) {;
this.EncodeValue(enc, arg.Type, args[argIndex]) ;
argIndex += 1 ;
Message msg = aBroker.CreateMessage(enc,obj.RoutingKey(),timeToLive) ;
if (synchronous) {
aBroker.SetSyncInFlight(true) ;
aBroker.Send(msg) ;
return seq ;
public QMFObject MakeObject(ClassKey key) {
SchemaClass sClass = this.GetSchema(key) ;
if (sClass == null) {
throw new Exception("No schema found for class " + key.ToString()) ;
return this.CreateQMFObject(sClass, true, true, false) ;
public QMFObject MakeObject(String keyString) {
return this.MakeObject(new ClassKey(keyString)) ;
// Callback Methods
public void HandleNewAgent(Agent agent) {
if (Console != null) {
Console.NewAgent(agent) ;
public void HandleAgentRemoved(Agent agent) {
if (Console != null) {
Console.AgentRemoved(agent) ;
public void HandleBrokerConnect(Broker broker) {
if (Console != null) {
Console.BrokerConnected(broker) ;
public void HandleBrokerDisconnect(Broker broker) {
if (Console != null) {
Console.BrokerDisconnected(broker) ;
public void HandleBrokerResponse(Broker broker, IDecoder decoder, long sequence) {
if (Console != null) {
Console.BrokerInformation(broker) ;
long seq = SequenceManager.Reserve(CONTEXT_STARTUP) ;
IEncoder endocder = broker.CreateEncoder('P', seq) ;
broker.Send(endocder) ;
public void HandlePackageIndicator(Broker broker, IDecoder decoder, long sequence) {
string packageName = decoder.ReadStr8() ;
bool notify = false ;
if (!Packages.ContainsKey(packageName)) {
lock (LockObject) {
Packages[packageName] = new Dictionary<string, SchemaClass>() ;
notify = true ;
if (notify && Console != null) {
Console.NewPackage(packageName) ;
broker.IncrementOutstanding() ;
long seq = SequenceManager.Reserve(Session.CONTEXT_STARTUP) ;
IEncoder enc = broker.CreateEncoder('Q', seq) ;
enc.WriteStr8(packageName) ;
broker.Send(enc) ;
public void HandleCommandComplete(Broker broker, IDecoder decoder, long sequence) {
long code = decoder.ReadUint32() ;
string text = decoder.ReadStr8() ;
Object context = this.SequenceManager.Release(sequence) ;
if (context.Equals(CONTEXT_STARTUP)) {
broker.DecrementOutstanding() ;
} else {
if ((context.Equals(CONTEXT_SYNC)) & broker.GetSyncInFlight()) {
broker.SetSyncInFlight(false) ;
} else {
if (context.Equals(CONTEXT_MULTIGET) && SyncSequenceList.Contains(sequence)) {
lock(LockObject) {
SyncSequenceList.Remove(sequence) ;
if (SyncSequenceList.Count == 0) {
Monitor.PulseAll(LockObject) ;
public void HandleClassIndicator(Broker broker, IDecoder decoder, long sequence) {
short kind = decoder.ReadUint8() ;
ClassKey classKey = new ClassKey(decoder) ;
bool unknown = false ;
lock (LockObject) {
if (Packages.ContainsKey(classKey.PackageName)) {
if (!Packages[classKey.PackageName].ContainsKey(classKey.GetKeyString())) {
unknown = true ;
if (unknown) {
broker.IncrementOutstanding() ;
long seq = SequenceManager.Reserve(Session.CONTEXT_STARTUP) ;
IEncoder enc = broker.CreateEncoder('S', seq) ;
classKey.encode(enc) ;
broker.Send(enc) ;
public void HandleMethodResponse(Broker broker, IDecoder decoder, long sequence) {
long code = decoder.ReadUint32() ;
string text = decoder.ReadStr16() ;
Dictionary<string, object> outArgs = new Dictionary<string, object>() ;
object obj = SequenceManager.Release(sequence) ;
if (obj == null) {
return ;
KeyValuePair<SchemaMethod, bool> pair = (KeyValuePair<SchemaMethod, bool>) obj ;
if (code == 0) {
foreach (SchemaArgument arg in pair.Key.Arguments) {
if (arg.IsOutput()) {
outArgs.Add(arg.Name, this.DecodeValue(decoder, arg.Type)) ;
MethodResult result = new MethodResult(code, text, outArgs) ;
if (pair.Value) {
this.SyncResult = result;
broker.SetSyncInFlight(false) ;
if (Console != null) {
Console.MethodResponse(broker, sequence, result) ;
public void HandleHeartbeatIndicator(Broker broker, IDecoder decoder, long sequence, IMessage msg) {
if (Console != null) {
long brokerBank = 1 ;
long agentBank = 0 ;
try {
string routingKey = msg.DeliveryProperties.GetRoutingKey() ;
if (routingKey != null) {
agentBank = Agent.GetBrokerBank(routingKey) ;
brokerBank = Agent.GetBrokerBank(routingKey) ;
catch (Exception e) {
log.Warn("Internal QPID error", e) ;
string agentKey = Agent.AgentKey(agentBank, brokerBank) ;
long timestamp = decoder.ReadUint64() ;
if (broker.Agents.ContainsKey(agentKey)) {
Agent agent = broker.Agents[agentKey] ;
Console.HearbeatRecieved(agent, timestamp) ;
public void HandleEventIndicator(Broker broker, IDecoder decoder, long sequence) {
if (Console != null) {
QMFEvent newEvent = new QMFEvent(this, decoder) ;
Console.EventRecieved(broker, newEvent) ;
public void HandleSchemaResponse(Broker broker, IDecoder decoder, long sequence) {
short kind = decoder.ReadUint8() ;
ClassKey classKey = new ClassKey(decoder) ;
SchemaClass sClass = new SchemaClass(kind, classKey, decoder, this) ;
lock(LockObject) {
Dictionary<string, SchemaClass> classMappings = Packages[sClass.PackageName] ;
classMappings.Remove(sClass.ClassKeyString) ;
classMappings.Add(sClass.ClassKeyString, sClass) ;
SequenceManager.Release(sequence) ;
broker.DecrementOutstanding() ;
if (Console != null) {
this.Console.NewClass(kind, classKey) ;
public void HandleContentIndicator(Broker broker, IDecoder decoder, long sequence, bool hasProperties, bool hasStatistics) {
ClassKey key = new ClassKey(decoder) ;
SchemaClass sClass = null ;;
lock (LockObject) {
sClass = GetSchema(key, false) ;
if (sClass != null) {
QMFObject obj = this.CreateQMFObject(sClass, decoder, hasProperties, hasStatistics, true) ;
if (key.PackageName.Equals("org.apache.qpid.broker") && key.ClassName.Equals("agent") && hasProperties) {
broker.UpdateAgent(obj) ;
lock (LockObject) {
if (SyncSequenceList.Contains(sequence)) {
if (!obj.IsDeleted() && this.SelectMatch(obj)) {
GetResult.Add(obj) ;
if (Console != null) {
if (hasProperties) {
Console.ObjectProperties(broker, obj) ;
if (hasStatistics) {
Console.ObjectStatistics(broker, obj) ;
public bool SelectMatch(QMFObject obj) {
return true ;
public object DecodeValue(IDecoder dec, short type) {
switch (type) {
case 1: return dec.ReadUint8() ; // U8
case 2: return dec.ReadUint16() ; // U16
case 3: return dec.ReadUint32() ; // U32
case 4: return dec.ReadUint64() ; // U64
case 6: return dec.ReadStr8() ; // SSTR
case 7: return dec.ReadStr16() ; // LSTR
case 8: return dec.ReadDatetime() ; // ABSTIME
case 9: return dec.ReadUint32() ; // DELTATIME
case 10: return new ObjectID(dec) ; // ref
case 11: return dec.ReadUint8() != 0 ; // bool
case 12: return dec.ReadFloat() ; // float
case 13: return dec.ReadDouble() ; // double
case 14: return dec.ReadUuid() ; // UUID
case 15: return dec.ReadMap() ; // Ftable
case 16: return dec.ReadInt8() ; // int8
case 17: return dec.ReadInt16() ; // int16
case 18: return dec.ReadInt32() ; // int32
case 19: return dec.ReadInt64() ; // int64
case 20: // Object
// Peek into the inner type code, make sure
// it is actually an object
object returnValue = null ;
short innerTypeCode = dec.ReadUint8() ;
if (innerTypeCode != 20) {
returnValue = this.DecodeValue(dec, innerTypeCode) ;
else {
ClassKey classKey = new ClassKey(dec) ;
lock(LockObject) {
SchemaClass sClass = GetSchema(classKey) ;
if (sClass != null) {
returnValue = this.CreateQMFObject(sClass, dec, true, true, false) ;
return returnValue;
case 21: // List
MSDecoder lDec = new MSDecoder();
lDec.Init(new MemoryStream(dec.ReadVbin32()));
long count = lDec.ReadUint32();
List<object> newList = new List<object>();
while (count > 0)
short innerType = lDec.ReadUint8();
newList.Add(this.DecodeValue(lDec, innerType));
count -= 1;
return newList;
case 22: // Array
MSDecoder aDec = new MSDecoder();
aDec.Init(new MemoryStream(dec.ReadVbin32()));
long cnt = aDec.ReadUint32();
short innerType = aDec.ReadUint8();
List<object> aList = new List<object>();
while (cnt > 0)
aList.Add(this.DecodeValue(aDec, innerType));
cnt -= 1;
return aList;
throw new Exception(String.Format("Invalid Type Code: {0}", type)) ;
public void EncodeValue(IEncoder enc, short type, object val) {
try {
switch ((int)type) {
case 1: enc.WriteUint8((short) val) ; break; // U8
case 2: enc.WriteUint16((int) val) ; break; // U16
case 3: enc.WriteUint32((long) val) ; break; // U32
case 4: enc.WriteUint64((long) val) ; break; // U64
case 6: enc.WriteStr8((string) val) ; break; // SSTR
case 7: enc.WriteStr16((string) val) ; break; // LSTR
case 8: enc.WriteDatetime((long) val); break; // ABSTIME
case 9: enc.WriteUint32((long) val); break; // DELTATIME
case 10: ((ObjectID)val).encode(enc) ; break; // ref
case 11:
if ((bool) val) {
enc.WriteUint8(1) ;
} else {
enc.WriteUint8(0) ;
break ;
case 12: enc.WriteFloat((float) val); break; // FLOAT
case 13: enc.WriteDouble((double) val); break; // DOUBLE
case 14: enc.WriteUuid((UUID) val) ; break ; // UUID
case 15: enc.WriteMap((Dictionary<string, object>) val) ; break ; // Ftable
case 16: enc.WriteInt8((short) val) ; break; // int8
case 17: enc.WriteInt16((int) val) ; break; // int16
case 18: enc.WriteInt32(long.Parse(""+ val)) ; break; // int32
case 19: enc.WriteInt64(long.Parse("" + val)) ; break; // int64
case 20: // Object
// Check that the object has a session, if not
// take ownership of it
QMFObject qObj = (QMFObject) val ;
if (qObj.Session == null) {
qObj.Session = this ;
qObj.Encode(enc) ;
case 21: // List
List<object> items = (List<object>) val ;
MSEncoder lEnc = new MSEncoder(1) ;
lEnc.Init() ;
lEnc.WriteUint32(items.Count) ;
foreach (object obj in items) {
short innerType = Util.QMFType(obj) ;
lEnc.WriteUint8(innerType) ;
this.EncodeValue(lEnc,innerType,obj) ;
enc.WriteVbin32(lEnc.Segment().ToArray()) ;
break ;
case 22: // Array
List<object> aItems = (List<object>) val ;
MSEncoder aEnc = new MSEncoder(1) ;
aEnc.Init() ;
long aCount = aItems.Count ;
aEnc.WriteUint32(aCount) ;
if (aCount > 0) {
Object anObj = aItems[0] ;
short innerType = Util.QMFType(anObj) ;
aEnc.WriteUint8(innerType) ;
foreach (object obj in aItems) {
this.EncodeValue(aEnc,innerType,obj) ;
enc.WriteVbin32(aEnc.Segment().ToArray()) ;
break ;
throw new Exception(String.Format("Invalid Type Code: {0}", type)) ;
catch (System.InvalidCastException e) {
string msg = String.Format("Class cast exception for typecode {0}, type {1} ", type, val.GetType()) ;
log.Error(msg) ;
throw new Exception(msg + type, e) ;
public List<string> BindingKeys() {
List<string> bindings = new List<string>() ;
bindings.Add("schema.#") ;
if (RecieveObjects & RecieveEvents & RecieveHeartbeat & !UserBindings) {
bindings.Add("console.#") ;
else {
if (RecieveObjects & !UserBindings) {
bindings.Add("console.obj.#") ;
else {
bindings.Add("console.obj.*.*.org.apache.qpid.broker.agent") ;
if (RecieveEvents) {
bindings.Add("console.event.#") ;
if (RecieveHeartbeat) {
bindings.Add("console.heartbeat.#") ;
return bindings ;
protected QMFObject CreateQMFObject(SchemaClass schema, bool hasProperties, bool hasStats , bool isManaged) {
Type realClass = typeof(QMFObject) ;
if (Console != null) {
realClass = Console.TypeMapping(schema.Key) ;
Type[] types = new Type[] {typeof(Session), typeof(SchemaClass), typeof(bool), typeof(bool),typeof(bool)} ;
object[] args = new object[] {this, schema, hasProperties, hasStats, isManaged} ;
ConstructorInfo ci = realClass.GetConstructor(types);
return (QMFObject) ci.Invoke(args) ;
protected QMFObject CreateQMFObject(SchemaClass schema, IDecoder dec, bool hasProperties, bool hasStats , bool isManaged) {
Type realClass = typeof(QMFObject) ;
if (Console != null) {
realClass = Console.TypeMapping(schema.Key) ;
Type[] types = new Type[] {typeof(Session), typeof(SchemaClass), typeof(IDecoder), typeof(bool), typeof(bool),typeof(bool)} ;
object[] args = new object[] {this, schema, dec, hasProperties, hasStats, isManaged} ;
ConstructorInfo ci = realClass.GetConstructor(types);
return (QMFObject) ci.Invoke(args) ;