458 lines
12 KiB
Ruby
458 lines
12 KiB
Ruby
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
|
|
require 'monitor'
|
|
|
|
module Qpid
|
|
|
|
class Session < Invoker
|
|
|
|
def log; Qpid::logger["qpid.io.cmd"]; end
|
|
def msg; Qpid::logger["qpid.io.msg"]; end
|
|
|
|
|
|
class Exception < RuntimeError; end
|
|
class Closed < Qpid::Session::Exception; end
|
|
class Detached < Qpid::Session::Exception; end
|
|
|
|
|
|
INCOMPLETE = Object.new
|
|
|
|
def self.client(*args)
|
|
return Qpid::Client(*args)
|
|
end
|
|
|
|
def self.server(*args)
|
|
return Server(*args)
|
|
end
|
|
|
|
attr_reader :name, :spec, :auto_sync, :timeout, :channel
|
|
attr_reader :results, :exceptions
|
|
attr_accessor :channel, :auto_sync, :send_id, :receiver, :sender
|
|
|
|
# FIXME: Pass delegate through a block ?
|
|
def initialize(name, spec, kwargs = {})
|
|
auto_sync = true
|
|
auto_sync = kwargs[:auto_sync] if kwargs.key?(:auto_sync)
|
|
timeout = kwargs[:timeout] || 10
|
|
delegate = kwargs[:delegate]
|
|
|
|
@name = name
|
|
@spec = spec
|
|
@auto_sync = auto_sync
|
|
@timeout = timeout
|
|
@invoke_lock = Monitor.new
|
|
@closing = false
|
|
@closed = false
|
|
|
|
@cond_lock = Monitor.new
|
|
@condition = @cond_lock.new_cond
|
|
|
|
@send_id = true
|
|
@receiver = Receiver.new(self)
|
|
@sender = Sender.new(self)
|
|
|
|
@lock = Monitor.new
|
|
@incoming = {}
|
|
@results = {}
|
|
@exceptions = []
|
|
|
|
@assembly = nil
|
|
|
|
@delegate = delegate.call(self) if delegate
|
|
|
|
@ctl_seg = spec[:segment_type].enum[:control].value
|
|
@cmd_seg = spec[:segment_type].enum[:command].value
|
|
@hdr_seg = spec[:segment_type].enum[:header].value
|
|
@body_seg = spec[:segment_type].enum[:body].value
|
|
end
|
|
|
|
def incoming(destination)
|
|
@lock.synchronize do
|
|
queue = @incoming[destination]
|
|
unless queue
|
|
queue = Incoming.new(self, destination)
|
|
@incoming[destination] = queue
|
|
end
|
|
return queue
|
|
end
|
|
end
|
|
|
|
def error?
|
|
@exceptions.size > 0
|
|
end
|
|
|
|
def sync(timeout=nil)
|
|
if channel && Thread.current == channel.connection.thread
|
|
raise Qpid::Session::Exception, "deadlock detected"
|
|
end
|
|
unless @auto_sync
|
|
execution_sync(:sync => true)
|
|
end
|
|
last = @sender.next_id - 1
|
|
@cond_lock.synchronize do
|
|
unless @condition.wait_for(timeout) {
|
|
@sender.completed.include?(last) || error?
|
|
}
|
|
raise Qpid::Timeout
|
|
end
|
|
end
|
|
if error?
|
|
raise Qpid::Session::Exception, @exceptions
|
|
end
|
|
end
|
|
|
|
def close(timeout=nil)
|
|
@invoke_lock.synchronize do
|
|
@closing = true
|
|
channel.session_detach(name)
|
|
end
|
|
@cond_lock.synchronize do
|
|
unless @condition.wait_for(timeout) { @closed }
|
|
raise Qpid::Timeout
|
|
end
|
|
end
|
|
end
|
|
|
|
def closed
|
|
@lock.synchronize do
|
|
return if @closed
|
|
|
|
@results.each { |id, f| f.error(exceptions) }
|
|
@results.clear
|
|
|
|
@incoming.values.each { |q| q.close(exceptions) }
|
|
@closed = true
|
|
@cond_lock.synchronize { @condition.signal }
|
|
end
|
|
end
|
|
|
|
def resolve_method(name)
|
|
o = @spec.children[name]
|
|
case o
|
|
when Qpid::Spec010::Command
|
|
return invocation(:method, o)
|
|
when Qpid::Spec010::Struct
|
|
return invocation(:method, o)
|
|
when Qpid::Spec010::Domain
|
|
return invocation(:value, o.enum) unless o.enum.nil?
|
|
end
|
|
|
|
matches = @spec.children.select { |x|
|
|
x.name.to_s.include?(name.to_s)
|
|
}.collect { |x| x.name.to_s }.sort
|
|
if matches.size == 0
|
|
msg = nil
|
|
elsif matches.size == 1
|
|
msg = "Did you mean #{matches[0]} ? "
|
|
else
|
|
msg = "Did you mean one of #{matches.join(",")} ? "
|
|
end
|
|
return invocation(:error, msg)
|
|
end
|
|
|
|
def invoke(type, args)
|
|
# XXX
|
|
unless type.respond_to?(:track)
|
|
return type.create(*args)
|
|
end
|
|
@invoke_lock.synchronize do
|
|
return do_invoke(type, args)
|
|
end
|
|
end
|
|
|
|
def do_invoke(type, args)
|
|
raise Qpid::Session::Closed if @closing
|
|
raise Qpid::Session::Detached unless channel
|
|
|
|
# Clumsy simulation of Python's keyword args
|
|
kwargs = {}
|
|
if args.size > 0 && args[-1].is_a?(Hash)
|
|
if args.size > type.fields.size
|
|
kwargs = args.pop
|
|
elsif type.fields[args.size - 1].type != @spec[:map]
|
|
kwargs = args.pop
|
|
end
|
|
end
|
|
|
|
if type.payload
|
|
if args.size == type.fields.size + 1
|
|
message = args.pop
|
|
else
|
|
message = kwargs.delete(:message) # XXX Really ?
|
|
end
|
|
else
|
|
message = nil
|
|
end
|
|
|
|
hdr = Qpid::struct(@spec[:header])
|
|
hdr.sync = @auto_sync || kwargs.delete(:sync)
|
|
|
|
cmd = type.create(*args.push(kwargs))
|
|
sc = Qpid::StringCodec.new(@spec)
|
|
sc.write_command(hdr, cmd)
|
|
|
|
seg = Segment.new(true, (message.nil? ||
|
|
(message.headers.nil? && message.body.nil?)),
|
|
type.segment_type, type.track, @channel.id, sc.encoded)
|
|
|
|
unless type.result.nil?
|
|
result = Future.new(exception=Exception)
|
|
@results[@sender.next_id] = result
|
|
end
|
|
emit(seg)
|
|
|
|
log.debug("SENT %s %s %s" % [seg.id, hdr, cmd]) if log
|
|
|
|
unless message.nil?
|
|
unless message.headers.nil?
|
|
sc = Qpid::StringCodec.new(@spec)
|
|
message.headers.each { |st| sc.write_struct32(st) }
|
|
|
|
seg = Segment.new(false, message.body.nil?, @hdr_seg,
|
|
type.track, @channel.id, sc.encoded)
|
|
emit(seg)
|
|
end
|
|
unless message.body.nil?
|
|
seg = Segment.new(false, true, @body_seg, type.track,
|
|
@channel.id, message.body)
|
|
emit(seg)
|
|
end
|
|
msg.debug("SENT %s" % message) if msg
|
|
end
|
|
|
|
if !type.result.nil?
|
|
return @auto_sync ? result.get(@timeout) : result
|
|
elsif @auto_sync
|
|
sync(@timeout)
|
|
end
|
|
end
|
|
|
|
def received(seg)
|
|
@receiver.received(seg)
|
|
if seg.first_segment?
|
|
raise Qpid::Session::Exception unless @assembly.nil?
|
|
@assembly = []
|
|
end
|
|
@assembly << seg
|
|
if seg.last_segment?
|
|
dispatch(@assembly)
|
|
@assembly = nil
|
|
end
|
|
end
|
|
|
|
def dispatch(assembly)
|
|
hdr = nil
|
|
cmd = nil
|
|
header = nil
|
|
body = nil
|
|
assembly.each do |seg|
|
|
d = seg.decode(@spec)
|
|
case seg.type
|
|
when @cmd_seg
|
|
hdr, cmd = d
|
|
when @hdr_seg
|
|
header = d
|
|
when @body_seg
|
|
body = d
|
|
else
|
|
raise Qpid::Session::Exception
|
|
end
|
|
end
|
|
log.debug("RECV %s %s %s" % [cmd.id, hdr, cmd]) if log
|
|
|
|
if cmd.st_type.payload
|
|
result = @delegate.send(cmd.st_type.name, cmd, header, body)
|
|
else
|
|
result = @delegate.send(cmd.st_type.name, cmd)
|
|
end
|
|
|
|
unless cmd.st_type.result.nil?
|
|
execution_result(cmd.id, result)
|
|
end
|
|
|
|
if result != INCOMPLETE
|
|
assembly.each do |seg|
|
|
@receiver.has_completed(seg)
|
|
# XXX: don't forget to obey sync for manual completion as well
|
|
if hdr.sync
|
|
@channel.session_completed(@receiver.completed)
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
# Python calls this 'send', but that has a special meaning
|
|
# in Ruby, so we call it 'emit'
|
|
def emit(seg)
|
|
@sender.emit(seg)
|
|
end
|
|
|
|
def signal
|
|
@cond_lock.synchronize { @condition.signal }
|
|
end
|
|
|
|
def wait_for(timeout = nil, &block)
|
|
@cond_lock.synchronize { @condition.wait_for(timeout, &block) }
|
|
end
|
|
|
|
def to_s
|
|
"<Session: #{name}, #{channel}>"
|
|
end
|
|
|
|
class Receiver
|
|
|
|
attr_reader :completed
|
|
attr_accessor :next_id, :next_offset
|
|
|
|
def initialize(session)
|
|
@session = session
|
|
@next_id = nil
|
|
@next_offset = nil
|
|
@completed = Qpid::RangedSet.new()
|
|
end
|
|
|
|
def received(seg)
|
|
if @next_id.nil? || @next_offset.nil?
|
|
raise Exception, "todo"
|
|
end
|
|
seg.id = @next_id
|
|
seg.offset = @next_offset
|
|
if seg.last_segment?
|
|
@next_id += 1
|
|
@next_offset = 0
|
|
else
|
|
@next_offset += seg.payload.size
|
|
end
|
|
end
|
|
|
|
def has_completed(seg)
|
|
if seg.id.nil?
|
|
raise ArgumentError, "cannot complete unidentified segment"
|
|
end
|
|
if seg.last_segment?
|
|
@completed.add(seg.id)
|
|
end
|
|
end
|
|
|
|
def known_completed(commands)
|
|
completed = Qpid::RangedSet.new()
|
|
@completed.ranges.each do |c|
|
|
unless commands.ranges.find { |kc|
|
|
kc.contains(c.lower) && kc.contains(c.upper)
|
|
}
|
|
completed.add_range(c)
|
|
end
|
|
end
|
|
@completed = completed
|
|
end
|
|
end
|
|
|
|
class Sender
|
|
|
|
def initialize(session)
|
|
@session = session
|
|
@next_id = 0.to_serial
|
|
@next_offset = 0
|
|
@segments = []
|
|
@completed = RangedSet.new()
|
|
end
|
|
|
|
attr_reader :next_id, :completed
|
|
|
|
def emit(seg)
|
|
seg.id = @next_id
|
|
seg.offset = @next_offset
|
|
if seg.last_segment?
|
|
@next_id += 1
|
|
@next_offset = 0
|
|
else
|
|
@next_offset += seg.payload.size
|
|
end
|
|
@segments << seg
|
|
if @session.send_id
|
|
@session.send_id = false
|
|
@session.channel.session_command_point(seg.id, seg.offset)
|
|
end
|
|
@session.channel.connection.write_segment(seg)
|
|
end
|
|
|
|
def has_completed(commands)
|
|
@segments = @segments.reject { |seg| commands.include?(seg.id) }
|
|
commands.ranges.each do |range|
|
|
@completed.add(range.lower, range.upper)
|
|
end
|
|
end
|
|
end
|
|
|
|
class Incoming < Qpid::Queue
|
|
|
|
def initialize(session, destination)
|
|
super()
|
|
@session = session
|
|
@destination = destination
|
|
end
|
|
|
|
def start
|
|
@session.message_credit_unit.choices.each do |unit|
|
|
@session.message_flow(@destination, unit.value, 0xFFFFFFFF)
|
|
end
|
|
end
|
|
|
|
def stop
|
|
@session.message_cancel(@destination)
|
|
listen # Kill the listener
|
|
end
|
|
end
|
|
|
|
class Delegate
|
|
|
|
def initialize(session)
|
|
@session = session
|
|
end
|
|
|
|
#XXX: do something with incoming accepts
|
|
def message_accept(ma) nil; end
|
|
|
|
def execution_result(er)
|
|
future = @session.results.delete(er.command_id)
|
|
future.set(er.value)
|
|
end
|
|
|
|
def execution_exception(ex)
|
|
@session.exceptions << ex
|
|
end
|
|
end
|
|
|
|
class Client < Delegate
|
|
|
|
def log ; Qpid::logger["qpid.io.msg"]; end
|
|
|
|
def message_transfer(cmd, headers, body)
|
|
m = Qpid::Message.new(body)
|
|
m.headers = headers
|
|
m.id = cmd.id
|
|
messages = @session.incoming(cmd.destination)
|
|
messages.put(m)
|
|
log.debug("RECV %s" % m) if log
|
|
return INCOMPLETE
|
|
end
|
|
end
|
|
end
|
|
end
|