Former-commit-id:133dc97f67
[formerlya02aeb236c
] [formerly9f19e3f712
] [formerly06a8b51d6d
[formerly9f19e3f712
[formerly 64fa9254b946eae7e61bbc3f513b7c3696c4f54f]]] Former-commit-id:06a8b51d6d
Former-commit-id:377dcd10b9
[formerly3360eb6c5f
] Former-commit-id:8e80217e59
1198 lines
47 KiB
C#
1198 lines
47 KiB
C#
/*
|
|
*
|
|
* Licensed to the Apache Software Foundation (ASF) under one
|
|
* or more contributor license agreements. See the NOTICE file
|
|
* distributed with this work for additional information
|
|
* regarding copyright ownership. The ASF licenses this file
|
|
* to you under the Apache License, Version 2.0 (the
|
|
* "License"); you may not use this file except in compliance
|
|
* with the License. You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing,
|
|
* software distributed under the License is distributed on an
|
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
* KIND, either express or implied. See the License for the
|
|
* specific language governing permissions and limitations
|
|
* under the License.
|
|
*
|
|
*/
|
|
using System;
|
|
using System.Collections;
|
|
using System.Text.RegularExpressions;
|
|
using System.Threading;
|
|
using log4net;
|
|
using Apache.Qpid.Buffer;
|
|
using Apache.Qpid.Client.Message;
|
|
using Apache.Qpid.Client.Util;
|
|
using Apache.Qpid.Collections;
|
|
using Apache.Qpid.Framing;
|
|
using Apache.Qpid.Messaging;
|
|
using Apache.Qpid.Protocol;
|
|
|
|
namespace Apache.Qpid.Client
|
|
{
|
|
/// <summary>
|
|
/// <p/><table id="crc"><caption>CRC Card</caption>
|
|
/// <tr><th> Responsibilities <th> Collaborations
|
|
/// <tr><td> Declare queues.
|
|
/// <tr><td> Declare exchanges.
|
|
/// <tr><td> Bind queues to exchanges.
|
|
/// <tr><td> Create messages.
|
|
/// <tr><td> Set up message consumers on the channel.
|
|
/// <tr><td> Set up message producers on the channel.
|
|
/// <tr><td> Commit the current transaction.
|
|
/// <tr><td> Roll-back the current transaction.
|
|
/// <tr><td> Close the channel.
|
|
/// </table>
|
|
/// </summary>
|
|
public class AmqChannel : Closeable, IChannel
|
|
{
|
|
private static readonly ILog _logger = LogManager.GetLogger(typeof(AmqChannel));
|
|
|
|
internal const int BASIC_CONTENT_TYPE = 60;
|
|
|
|
public const int DEFAULT_PREFETCH_HIGH_MARK = 5000;
|
|
|
|
public const int DEFAULT_PREFETCH_LOW_MARK = 2500;
|
|
|
|
private static int _nextSessionNumber = 0;
|
|
|
|
private AMQConnection _connection;
|
|
|
|
private int _sessionNumber;
|
|
|
|
private bool _suspended;
|
|
|
|
private object _suspensionLock = new object();
|
|
|
|
// Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature.
|
|
private int _nextConsumerNumber = 1;
|
|
|
|
private bool _transacted;
|
|
|
|
private AcknowledgeMode _acknowledgeMode;
|
|
|
|
private ushort _channelId;
|
|
|
|
private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK;
|
|
|
|
private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK;
|
|
|
|
private FlowControlQueue _queue;
|
|
|
|
private Dispatcher _dispatcher;
|
|
|
|
private MessageFactoryRegistry _messageFactoryRegistry;
|
|
|
|
/// <summary> Holds all of the producers created by this channel. </summary>
|
|
private Hashtable _producers = Hashtable.Synchronized(new Hashtable());
|
|
|
|
/// <summary> Holds all of the consumers created by this channel. </summary>
|
|
private Hashtable _consumers = Hashtable.Synchronized(new Hashtable());
|
|
|
|
private ArrayList _replayFrames = new ArrayList();
|
|
|
|
/// <summary>
|
|
/// The counter of the _next producer id. This id is generated by the session and used only to allow the
|
|
/// producer to identify itself to the session when deregistering itself.
|
|
///
|
|
/// Access to this id does not require to be synchronized since according to the JMS specification only one
|
|
/// thread of control is allowed to create producers for any given session instance.
|
|
/// </summary>
|
|
private long _nextProducerId;
|
|
|
|
/// <summary>
|
|
/// Initializes a new instance of the <see cref="AmqChannel"/> class.
|
|
/// </summary>
|
|
/// <param name="con">The connection.</param>
|
|
/// <param name="channelId">The channel id.</param>
|
|
/// <param name="transacted">if set to <c>true</c> [transacted].</param>
|
|
/// <param name="acknowledgeMode">The acknowledge mode.</param>
|
|
/// <param name="defaultPrefetchHigh">Default prefetch high value</param>
|
|
/// <param name="defaultPrefetchLow">Default prefetch low value</param>
|
|
internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode,
|
|
int defaultPrefetchHigh, int defaultPrefetchLow)
|
|
: this()
|
|
{
|
|
_sessionNumber = Interlocked.Increment(ref _nextSessionNumber);
|
|
_connection = con;
|
|
_transacted = transacted;
|
|
|
|
if ( transacted )
|
|
{
|
|
_acknowledgeMode = AcknowledgeMode.SessionTransacted;
|
|
}
|
|
else
|
|
{
|
|
_acknowledgeMode = acknowledgeMode;
|
|
}
|
|
|
|
_channelId = channelId;
|
|
_defaultPrefetchHighMark = defaultPrefetchHigh;
|
|
_defaultPrefetchLowMark = defaultPrefetchLow;
|
|
|
|
if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge )
|
|
{
|
|
_queue = new FlowControlQueue(_defaultPrefetchLowMark, _defaultPrefetchHighMark,
|
|
new ThresholdMethod(OnPrefetchLowMark),
|
|
new ThresholdMethod(OnPrefetchHighMark));
|
|
}
|
|
else
|
|
{
|
|
// low and upper are the same
|
|
_queue = new FlowControlQueue(_defaultPrefetchHighMark, _defaultPrefetchHighMark,
|
|
null, null);
|
|
}
|
|
}
|
|
|
|
private AmqChannel()
|
|
{
|
|
_messageFactoryRegistry = MessageFactoryRegistry.NewDefaultRegistry();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Acknowledge mode for messages received.
|
|
/// </summary>
|
|
public AcknowledgeMode AcknowledgeMode
|
|
{
|
|
get
|
|
{
|
|
CheckNotClosed();
|
|
return _acknowledgeMode;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// True if the channel should use transactions.
|
|
/// </summary>
|
|
public bool Transacted
|
|
{
|
|
get
|
|
{
|
|
CheckNotClosed();
|
|
return _transacted;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Prefetch value to be used as the default for
|
|
/// consumers created on this channel.
|
|
/// </summary>
|
|
public int DefaultPrefetch
|
|
{
|
|
get { return DefaultPrefetchHigh; }
|
|
}
|
|
|
|
/// <summary>
|
|
/// Prefetch low value to be used as the default for
|
|
/// consumers created on this channel.
|
|
/// </summary>
|
|
public int DefaultPrefetchLow
|
|
{
|
|
get { return _defaultPrefetchLowMark; }
|
|
}
|
|
|
|
/// <summary>
|
|
/// Prefetch high value to be used as the default for
|
|
/// consumers created on this channel.
|
|
/// </summary>
|
|
public int DefaultPrefetchHigh
|
|
{
|
|
get { return _defaultPrefetchHighMark; }
|
|
}
|
|
|
|
/// <summary> Indicates whether or not this channel is currently suspended. </summary>
|
|
public bool IsSuspended
|
|
{
|
|
get { return _suspended; }
|
|
}
|
|
|
|
/// <summary> Provides the channels number within the the connection. </summary>
|
|
public ushort ChannelId
|
|
{
|
|
get { return _channelId; }
|
|
}
|
|
|
|
/// <summary> Provides the connection that this channel runs over. </summary>
|
|
public AMQConnection Connection
|
|
{
|
|
get { return _connection; }
|
|
}
|
|
|
|
/// <summary>
|
|
/// Declare a new exchange.
|
|
/// </summary>
|
|
/// <param name="exchangeName">Name of the exchange</param>
|
|
/// <param name="exchangeClass">Class of the exchange, from <see cref="ExchangeClassConstants"/></param>
|
|
public void DeclareExchange(String exchangeName, String exchangeClass)
|
|
{
|
|
_logger.Debug(string.Format("DeclareExchange vame={0} exchangeClass={1}", exchangeName, exchangeClass));
|
|
|
|
DeclareExchange(_channelId, 0, exchangeName, exchangeClass, false, false, false, false, true, null);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Declare a new exchange using the default exchange class.
|
|
/// </summary>
|
|
/// <param name="exchangeName">Name of the exchange</param>
|
|
public void DeleteExchange(string exchangeName)
|
|
{
|
|
throw new NotImplementedException();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Declare a new queue with the specified set of arguments.
|
|
/// </summary>
|
|
/// <param name="queueName">Name of the queue</param>
|
|
/// <param name="isDurable">True if the queue should be durable</param>
|
|
/// <param name="isExclusive">True if the queue should be exclusive to this channel</param>
|
|
/// <param name="isAutoDelete">True if the queue should be deleted when the channel closes</param>
|
|
public void DeclareQueue(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete)
|
|
{
|
|
DoQueueDeclare(queueName, isDurable, isExclusive, isAutoDelete);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Delete a queue with the specifies arguments.
|
|
/// </summary>
|
|
/// <param name="queueName">Name of the queue to delete</param>
|
|
/// <param name="ifUnused">If true, the queue will not deleted if it has no consumers</param>
|
|
/// <param name="ifEmpty">If true, the queue will not deleted if it has no messages</param>
|
|
/// <param name="noWait">If true, the server will not respond to the method</param>
|
|
public void DeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait)
|
|
{
|
|
DoDeleteQueue(queueName, ifUnused, ifEmpty, noWait);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Generate a new Unique name to use for a queue.
|
|
/// </summary>
|
|
/// <returns>A unique name to this channel</returns>
|
|
public string GenerateUniqueName()
|
|
{
|
|
string result = _connection.ProtocolSession.GenerateQueueName();
|
|
return Regex.Replace(result, "[^a-z0-9_]", "_");
|
|
}
|
|
|
|
/// <summary>
|
|
/// Removes all messages from a queue.
|
|
/// </summary>
|
|
/// <param name="queueName">Name of the queue to delete</param>
|
|
/// <param name="noWait">If true, the server will not respond to the method</param>
|
|
public void PurgeQueue(string queueName, bool noWait)
|
|
{
|
|
DoPurgeQueue(queueName, noWait);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Bind a queue to the specified exchange.
|
|
/// </summary>
|
|
/// <param name="queueName">Name of queue to bind</param>
|
|
/// <param name="exchangeName">Name of exchange to bind to</param>
|
|
/// <param name="routingKey">Routing key</param>
|
|
public void Bind(string queueName, string exchangeName, string routingKey)
|
|
{
|
|
DoBind(queueName, exchangeName, routingKey, new FieldTable());
|
|
}
|
|
|
|
/// <summary>
|
|
/// Bind a queue to the specified exchange.
|
|
/// </summary>
|
|
/// <param name="queueName">Name of queue to bind</param>
|
|
/// <param name="exchangeName">Name of exchange to bind to</param>
|
|
/// <param name="routingKey">Routing key</param>
|
|
/// <param name="args">Table of arguments for the binding. Used to bind with a Headers Exchange</param>
|
|
public void Bind(string queueName, string exchangeName, string routingKey, IFieldTable args)
|
|
{
|
|
DoBind(queueName, exchangeName, routingKey, (FieldTable)args);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Create a new empty message with no body.
|
|
/// </summary>
|
|
/// <returns>The new message</returns>
|
|
public IMessage CreateMessage()
|
|
{
|
|
return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream");
|
|
}
|
|
|
|
/// <summary>
|
|
/// Create a new message of the specified MIME type.
|
|
/// </summary>
|
|
/// <param name="mimeType">The mime type to create</param>
|
|
/// <returns>The new message</returns>
|
|
public IMessage CreateMessage(string mimeType)
|
|
{
|
|
return _messageFactoryRegistry.CreateMessage(mimeType);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Creates a new message for bytes (application/octet-stream).
|
|
/// </summary>
|
|
/// <returns>The new message</returns>
|
|
public IBytesMessage CreateBytesMessage()
|
|
{
|
|
return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream");
|
|
}
|
|
|
|
/// <summary>
|
|
/// Creates a new text message (text/plain) with empty content.
|
|
/// </summary>
|
|
/// <returns>The new message</returns>
|
|
public ITextMessage CreateTextMessage()
|
|
{
|
|
return CreateTextMessage(String.Empty);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Creates a new text message (text/plain) with a body.
|
|
/// </summary>
|
|
/// <param name="text">Initial body of the message</param>
|
|
/// <returns>The new message</returns>
|
|
public ITextMessage CreateTextMessage(string text)
|
|
{
|
|
ITextMessage msg = (ITextMessage)_messageFactoryRegistry.CreateMessage("text/plain");
|
|
msg.Text = text;
|
|
return msg;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Creates a new Consumer using the builder pattern.
|
|
/// </summary>
|
|
/// <param name="queueName">Name of queue to receive messages from</param>
|
|
/// <returns>The builder object</returns>
|
|
public MessageConsumerBuilder CreateConsumerBuilder(string queueName)
|
|
{
|
|
return new MessageConsumerBuilder(this, queueName);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Creates a new consumer.
|
|
/// </summary>
|
|
/// <param name="queueName">Name of queue to receive messages from</param>
|
|
/// <param name="prefetchLow">Low prefetch value</param>
|
|
/// <param name="prefetchHigh">High prefetch value</param>
|
|
/// <param name="noLocal">If true, messages sent on this channel will not be received by this consumer</param>
|
|
/// <param name="exclusive">If true, the consumer opens the queue in exclusive mode</param>
|
|
/// <returns>The new consumer</returns>
|
|
public IMessageConsumer CreateConsumer(string queueName,
|
|
int prefetchLow,
|
|
int prefetchHigh,
|
|
bool noLocal,
|
|
bool exclusive)
|
|
{
|
|
_logger.Debug(String.Format("CreateConsumer queueName={0} prefetchLow={1} prefetchHigh={2} noLocal={3} exclusive={4} ",
|
|
queueName, prefetchLow, prefetchHigh, noLocal, exclusive));
|
|
|
|
return CreateConsumerImpl(queueName, prefetchLow, prefetchHigh, noLocal, exclusive);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Unsubscribe from a queue.
|
|
/// </summary>
|
|
/// <param name="subscriptionName">Subscription name</param>
|
|
public void Unsubscribe(String name)
|
|
{
|
|
throw new NotImplementedException();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Create a new message publisher using the builder pattern.
|
|
/// </summary>
|
|
/// <returns>The builder object</returns>
|
|
public MessagePublisherBuilder CreatePublisherBuilder()
|
|
{
|
|
return new MessagePublisherBuilder(this);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Create a new message publisher.
|
|
/// </summary>
|
|
/// <param name="exchangeName">Name of exchange to publish to</param>
|
|
/// <param name="routingKey">Routing key</param>
|
|
/// <param name="deliveryMode">Default delivery mode</param>
|
|
/// <param name="timeToLive">Default TTL time of messages</param>
|
|
/// <param name="immediate">If true, sent immediately</param>
|
|
/// <param name="mandatory">If true, the broker will return an error
|
|
/// (as a connection exception) if the message cannot be delivered</param>
|
|
/// <param name="priority">Default message priority</param>
|
|
/// <returns>The new message publisher</returns>
|
|
public IMessagePublisher CreatePublisher(string exchangeName, string routingKey, DeliveryMode deliveryMode,
|
|
long timeToLive, bool immediate, bool mandatory, int priority)
|
|
{
|
|
_logger.Debug(string.Format("Using new CreatePublisher exchangeName={0}, exchangeClass={1} routingKey={2}",
|
|
exchangeName, "none", routingKey));
|
|
|
|
return CreateProducerImpl(exchangeName, routingKey, deliveryMode,
|
|
timeToLive, immediate, mandatory, priority);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Recover after transaction failure.
|
|
/// </summary>
|
|
/// <remarks>The 0-8 protocol does not support this, not implemented exception will always be thrown.</remarks>
|
|
public void Recover()
|
|
{
|
|
CheckNotClosed();
|
|
CheckNotTransacted();
|
|
|
|
throw new NotImplementedException();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Commit the transaction.
|
|
/// </summary>
|
|
public void Commit()
|
|
{
|
|
// FIXME: Fail over safety. Needs FailoverSupport?
|
|
CheckNotClosed();
|
|
CheckTransacted(); // throws IllegalOperationException if not a transacted session
|
|
|
|
try
|
|
{
|
|
// Acknowledge up to message last delivered (if any) for each consumer.
|
|
// Need to send ack for messages delivered to consumers so far.
|
|
foreach (BasicMessageConsumer consumer in _consumers.Values)
|
|
{
|
|
// Sends acknowledgement to server.
|
|
consumer.AcknowledgeDelivered();
|
|
}
|
|
|
|
// Commits outstanding messages sent and outstanding acknowledgements.
|
|
_connection.ConvenientProtocolWriter.SyncWrite(TxCommitBody.CreateAMQFrame(_channelId), typeof(TxCommitOkBody));
|
|
}
|
|
catch (AMQException e)
|
|
{
|
|
throw new QpidException("Failed to commit", e);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Rollback the transaction.
|
|
/// </summary>
|
|
public void Rollback()
|
|
{
|
|
lock (_suspensionLock)
|
|
{
|
|
CheckTransacted(); // throws IllegalOperationException if not a transacted session
|
|
|
|
try
|
|
{
|
|
bool suspended = IsSuspended;
|
|
if (!suspended)
|
|
{
|
|
Suspend(true);
|
|
}
|
|
|
|
// Reject up to message last delivered (if any) for each consumer.
|
|
// Need to send reject for messages delivered to consumers so far.
|
|
foreach (BasicMessageConsumer consumer in _consumers.Values)
|
|
{
|
|
// Sends acknowledgement to server.
|
|
consumer.RejectUnacked();
|
|
}
|
|
|
|
_connection.ConvenientProtocolWriter.SyncWrite(TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody));
|
|
|
|
if ( !suspended )
|
|
{
|
|
Suspend(false);
|
|
}
|
|
}
|
|
catch (AMQException e)
|
|
{
|
|
throw new QpidException("Failed to rollback", e);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Create a disconnected channel that will fault
|
|
/// for most things, but is useful for testing
|
|
/// </summary>
|
|
/// <returns>A new disconnected channel</returns>
|
|
public static IChannel CreateDisconnectedChannel()
|
|
{
|
|
return new AmqChannel();
|
|
}
|
|
|
|
public override void Close()
|
|
{
|
|
lock (_connection.FailoverMutex)
|
|
{
|
|
// We must close down all producers and consumers in an orderly fashion. This is the only method
|
|
// that can be called from a different thread of control from the one controlling the session
|
|
|
|
lock (_closingLock)
|
|
{
|
|
SetClosed();
|
|
|
|
// we pass null since this is not an error case
|
|
CloseProducersAndConsumers(null);
|
|
|
|
try
|
|
{
|
|
_connection.CloseSession(this);
|
|
}
|
|
catch (AMQException e)
|
|
{
|
|
throw new QpidException("Error closing session: " + e);
|
|
}
|
|
finally
|
|
{
|
|
_connection.DeregisterSession(_channelId);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Called when the server initiates the closure of the session
|
|
* unilaterally.
|
|
* @param e the exception that caused this session to be closed. Null causes the
|
|
*/
|
|
public void ClosedWithException(Exception e)
|
|
{
|
|
lock (_connection.FailoverMutex)
|
|
{
|
|
// An AMQException has an error code and message already and will be passed in when closure occurs as a
|
|
// result of a channel close request
|
|
SetClosed();
|
|
AMQException amqe;
|
|
|
|
if (e is AMQException)
|
|
{
|
|
amqe = (AMQException) e;
|
|
}
|
|
else
|
|
{
|
|
amqe = new AMQException("Closing session forcibly", e);
|
|
}
|
|
|
|
_connection.DeregisterSession(_channelId);
|
|
CloseProducersAndConsumers(amqe);
|
|
}
|
|
}
|
|
|
|
public void MessageReceived(UnprocessedMessage message)
|
|
{
|
|
if (_logger.IsDebugEnabled)
|
|
{
|
|
_logger.Debug("Message received in session with channel id " + _channelId);
|
|
}
|
|
|
|
if ( message.DeliverBody == null )
|
|
{
|
|
ReturnBouncedMessage(message);
|
|
}
|
|
else
|
|
{
|
|
_queue.Enqueue(message);
|
|
}
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
Close();
|
|
}
|
|
|
|
private void SetClosed()
|
|
{
|
|
Interlocked.Exchange(ref _closed, CLOSED);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Close all producers or consumers. This is called either in the error case or when closing the session normally.
|
|
/// <param name="amqe">the exception, may be null to indicate no error has occurred</param>
|
|
///
|
|
private void CloseProducersAndConsumers(AMQException amqe)
|
|
{
|
|
try
|
|
{
|
|
CloseProducers();
|
|
}
|
|
catch (QpidException e)
|
|
{
|
|
_logger.Error("Error closing session: " + e, e);
|
|
}
|
|
try
|
|
{
|
|
CloseConsumers(amqe);
|
|
}
|
|
catch (QpidException e)
|
|
{
|
|
_logger.Error("Error closing session: " + e, e);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Called to close message producers cleanly. This may or may <b>not</b> be as a result of an error. There is
|
|
/// currently no way of propagating errors to message producers (this is a JMS limitation).
|
|
/// </summary>
|
|
private void CloseProducers()
|
|
{
|
|
_logger.Debug("Closing producers on session " + this);
|
|
|
|
// we need to clone the list of producers since the close() method updates the _producers collection
|
|
// which would result in a concurrent modification exception
|
|
ArrayList clonedProducers = new ArrayList(_producers.Values);
|
|
|
|
foreach (BasicMessageProducer prod in clonedProducers)
|
|
{
|
|
_logger.Debug("Closing producer " + prod);
|
|
prod.Close();
|
|
}
|
|
|
|
// at this point the _producers map is empty
|
|
}
|
|
|
|
/// <summary>
|
|
/// Called to close message consumers cleanly. This may or may <b>not</b> be as a result of an error.
|
|
/// <param name="error">not null if this is a result of an error occurring at the connection level</param>
|
|
private void CloseConsumers(Exception error)
|
|
{
|
|
if (_dispatcher != null)
|
|
{
|
|
_dispatcher.StopDispatcher();
|
|
}
|
|
|
|
// we need to clone the list of consumers since the close() method updates the _consumers collection
|
|
// which would result in a concurrent modification exception
|
|
ArrayList clonedConsumers = new ArrayList(_consumers.Values);
|
|
|
|
foreach (BasicMessageConsumer con in clonedConsumers)
|
|
{
|
|
if (error != null)
|
|
{
|
|
con.NotifyError(error);
|
|
}
|
|
else
|
|
{
|
|
con.Close();
|
|
}
|
|
}
|
|
|
|
// at this point the _consumers map will be empty
|
|
}
|
|
|
|
private IMessagePublisher CreateProducerImpl(string exchangeName, string routingKey,
|
|
DeliveryMode deliveryMode,
|
|
long timeToLive, bool immediate, bool mandatory, int priority)
|
|
{
|
|
lock (_closingLock)
|
|
{
|
|
CheckNotClosed();
|
|
|
|
try
|
|
{
|
|
return new BasicMessageProducer(exchangeName, routingKey, _transacted, _channelId,
|
|
this, GetNextProducerId(),
|
|
deliveryMode, timeToLive, immediate, mandatory, priority);
|
|
}
|
|
catch (AMQException e)
|
|
{
|
|
_logger.Error("Error creating message producer: " + e, e);
|
|
throw new QpidException("Error creating message producer", e);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary> Creates a message consumer on this channel.</summary>
|
|
///
|
|
/// <param name="queueName">The name of the queue to attach the consumer to.</param>
|
|
/// <param name="prefetchLow">The pre-fetch buffer low-water mark.</param>
|
|
/// <param name="prefetchHigh">The pre-fetch buffer high-water mark.</param>
|
|
/// <param name="noLocal">The no-local flag, <tt>true</tt> means that the consumer does not receive messages sent on this channel.</param>
|
|
/// <param name="exclusive">The exclusive flag, <tt>true</tt> gives this consumer exclusive receive access to the queue.</param>
|
|
///
|
|
/// <return>The message consumer.</return>
|
|
private IMessageConsumer CreateConsumerImpl(string queueName,
|
|
int prefetchLow,
|
|
int prefetchHigh,
|
|
bool noLocal,
|
|
bool exclusive)
|
|
{
|
|
lock (_closingLock)
|
|
{
|
|
CheckNotClosed();
|
|
|
|
BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, queueName, noLocal,
|
|
_messageFactoryRegistry, this,
|
|
prefetchHigh, prefetchLow, exclusive);
|
|
try
|
|
{
|
|
RegisterConsumer(consumer);
|
|
}
|
|
catch (AMQException e)
|
|
{
|
|
throw new QpidException("Error registering consumer: " + e, e);
|
|
}
|
|
|
|
return consumer;
|
|
}
|
|
}
|
|
|
|
private void CheckTransacted()
|
|
{
|
|
if (!Transacted)
|
|
{
|
|
throw new InvalidOperationException("Channel is not transacted");
|
|
}
|
|
}
|
|
|
|
private void CheckNotTransacted()
|
|
{
|
|
if (Transacted)
|
|
{
|
|
throw new InvalidOperationException("Channel is transacted");
|
|
}
|
|
}
|
|
|
|
internal void Start()
|
|
{
|
|
_dispatcher = new Dispatcher(this);
|
|
Thread dispatcherThread = new Thread(new ThreadStart(_dispatcher.RunDispatcher));
|
|
dispatcherThread.IsBackground = true;
|
|
dispatcherThread.Start();
|
|
}
|
|
|
|
internal void Stop()
|
|
{
|
|
Suspend(true);
|
|
if (_dispatcher != null)
|
|
{
|
|
_dispatcher.StopDispatcher();
|
|
}
|
|
}
|
|
|
|
internal void RegisterConsumer(string consumerTag, IMessageConsumer consumer)
|
|
{
|
|
_consumers[consumerTag] = consumer;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Called by the MessageConsumer when closing, to deregister the consumer from the
|
|
/// map from consumerTag to consumer instance.
|
|
/// </summary>
|
|
/// <param name="consumerTag">the consumer tag, that was broker-generated</param>
|
|
internal void DeregisterConsumer(string consumerTag)
|
|
{
|
|
_consumers.Remove(consumerTag);
|
|
}
|
|
|
|
internal void RegisterProducer(long producerId, IMessagePublisher publisher)
|
|
{
|
|
_producers[producerId] = publisher;
|
|
}
|
|
|
|
internal void DeregisterProducer(long producerId)
|
|
{
|
|
_producers.Remove(producerId);
|
|
}
|
|
|
|
private long GetNextProducerId()
|
|
{
|
|
return ++_nextProducerId;
|
|
}
|
|
|
|
/**
|
|
* Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after
|
|
* failover when the client has veoted resubscription.
|
|
*
|
|
* The caller of this method must already hold the failover mutex.
|
|
*/
|
|
internal void MarkClosed()
|
|
{
|
|
SetClosed();
|
|
_connection.DeregisterSession(_channelId);
|
|
MarkClosedProducersAndConsumers();
|
|
}
|
|
|
|
private void MarkClosedProducersAndConsumers()
|
|
{
|
|
try
|
|
{
|
|
// no need for a markClosed* method in this case since there is no protocol traffic closing a producer
|
|
CloseProducers();
|
|
}
|
|
catch (QpidException e)
|
|
{
|
|
_logger.Error("Error closing session: " + e, e);
|
|
}
|
|
try
|
|
{
|
|
MarkClosedConsumers();
|
|
}
|
|
catch (QpidException e)
|
|
{
|
|
_logger.Error("Error closing session: " + e, e);
|
|
}
|
|
}
|
|
|
|
private void MarkClosedConsumers()
|
|
{
|
|
if (_dispatcher != null)
|
|
{
|
|
_dispatcher.StopDispatcher();
|
|
}
|
|
// we need to clone the list of consumers since the close() method updates the _consumers collection
|
|
// which would result in a concurrent modification exception
|
|
ArrayList clonedConsumers = new ArrayList(_consumers.Values);
|
|
|
|
foreach (BasicMessageConsumer consumer in clonedConsumers)
|
|
{
|
|
consumer.MarkClosed();
|
|
}
|
|
// at this point the _consumers map will be empty
|
|
}
|
|
|
|
private void DoPurgeQueue(string queueName, bool noWait)
|
|
{
|
|
try
|
|
{
|
|
_logger.DebugFormat("PurgeQueue {0}", queueName);
|
|
|
|
AMQFrame purgeQueue = QueuePurgeBody.CreateAMQFrame(_channelId, 0, queueName, noWait);
|
|
|
|
if (noWait)
|
|
_connection.ProtocolWriter.Write(purgeQueue);
|
|
else
|
|
_connection.ConvenientProtocolWriter.SyncWrite(purgeQueue, typeof(QueuePurgeOkBody));
|
|
}
|
|
catch (AMQException)
|
|
{
|
|
throw;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Replays frame on fail over.
|
|
*
|
|
* @throws AMQException
|
|
*/
|
|
internal void ReplayOnFailOver()
|
|
{
|
|
_logger.Debug(string.Format("Replaying frames for channel {0}", _channelId));
|
|
foreach (AMQFrame frame in _replayFrames)
|
|
{
|
|
_logger.Debug(string.Format("Replaying frame=[{0}]", frame));
|
|
_connection.ProtocolWriter.Write(frame);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Callers must hold the failover mutex before calling this method.
|
|
/// </summary>
|
|
/// <param name="consumer"></param>
|
|
private void RegisterConsumer(BasicMessageConsumer consumer)
|
|
{
|
|
// Need to generate a consumer tag on the client so we can exploit the nowait flag.
|
|
String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++);
|
|
consumer.ConsumerTag = tag;
|
|
_consumers.Add(tag, consumer);
|
|
|
|
String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.NoLocal,
|
|
consumer.Exclusive, consumer.AcknowledgeMode, tag);
|
|
|
|
}
|
|
|
|
internal void DoBind(string queueName, string exchangeName, string routingKey, FieldTable args)
|
|
{
|
|
|
|
_logger.Debug(string.Format("QueueBind queueName={0} exchangeName={1} routingKey={2}, arg={3}",
|
|
queueName, exchangeName, routingKey, args));
|
|
|
|
AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0,
|
|
queueName, exchangeName,
|
|
routingKey, false, args);
|
|
|
|
|
|
lock (_connection.FailoverMutex)
|
|
{
|
|
_connection.ConvenientProtocolWriter.SyncWrite(queueBind, typeof(QueueBindOkBody));
|
|
}
|
|
// AS FIXME: wasnae me
|
|
_replayFrames.Add(QueueBindBody.CreateAMQFrame(_channelId, 0,
|
|
queueName, exchangeName,
|
|
routingKey, true, args));
|
|
}
|
|
|
|
private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode, String tag)
|
|
{
|
|
|
|
AMQFrame basicConsume = BasicConsumeBody.CreateAMQFrame(_channelId, 0,
|
|
queueName, tag, noLocal,
|
|
acknowledgeMode == AcknowledgeMode.NoAcknowledge,
|
|
exclusive, true, new FieldTable());
|
|
|
|
_replayFrames.Add(basicConsume);
|
|
|
|
_connection.ProtocolWriter.Write(basicConsume);
|
|
return tag;
|
|
}
|
|
|
|
private void DoDeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait)
|
|
{
|
|
try
|
|
{
|
|
_logger.Debug(string.Format("DeleteQueue name={0}", queueName));
|
|
|
|
AMQFrame queueDelete = QueueDeleteBody.CreateAMQFrame(_channelId, 0, queueName, ifUnused, ifEmpty, noWait);
|
|
|
|
if (noWait)
|
|
{
|
|
_connection.ProtocolWriter.Write(queueDelete);
|
|
}
|
|
else
|
|
{
|
|
_connection.ConvenientProtocolWriter.SyncWrite(queueDelete, typeof(QueueDeleteOkBody));
|
|
}
|
|
// AS FIXME: wasnae me
|
|
_replayFrames.Add(QueueDeleteBody.CreateAMQFrame(_channelId, 0, queueName, ifUnused, ifEmpty, true));
|
|
}
|
|
catch (AMQException)
|
|
{
|
|
throw;
|
|
}
|
|
}
|
|
|
|
private void DoQueueDeclare(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete)
|
|
{
|
|
_logger.Debug(string.Format("DeclareQueue name={0} durable={1} exclusive={2}, auto-delete={3}",
|
|
queueName, isDurable, isExclusive, isAutoDelete));
|
|
|
|
AMQFrame queueDeclare = QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive,
|
|
isAutoDelete, false, null);
|
|
|
|
|
|
lock (_connection.FailoverMutex)
|
|
{
|
|
_connection.ConvenientProtocolWriter.SyncWrite(queueDeclare, typeof(QueueDeclareOkBody));
|
|
}
|
|
// AS FIXME: wasnae me
|
|
_replayFrames.Add(QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive,
|
|
isAutoDelete, true, null));
|
|
}
|
|
|
|
// AMQP-level method.
|
|
private void DeclareExchange(ushort channelId, ushort ticket, string exchangeName,
|
|
string exchangeClass, bool passive, bool durable,
|
|
bool autoDelete, bool xinternal, bool noWait, FieldTable args)
|
|
{
|
|
_logger.Debug(String.Format("DeclareExchange channelId={0} exchangeName={1} exchangeClass={2}",
|
|
_channelId, exchangeName, exchangeClass));
|
|
|
|
AMQFrame declareExchange = ExchangeDeclareBody.CreateAMQFrame(channelId, ticket, exchangeName, exchangeClass, passive,
|
|
durable, autoDelete, xinternal, noWait, args);
|
|
|
|
if (noWait)
|
|
{
|
|
lock (_connection.FailoverMutex)
|
|
{
|
|
_connection.ProtocolWriter.Write(declareExchange);
|
|
}
|
|
// AS FIXME: wasnae me
|
|
_replayFrames.Add(declareExchange);
|
|
}
|
|
else
|
|
{
|
|
throw new NotImplementedException("Don't use nowait=false with DeclareExchange");
|
|
//_connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof (ExchangeDeclareOkBody));
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from
|
|
* a BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is
|
|
* AUTO_ACK or similar.
|
|
*
|
|
* @param deliveryTag the tag of the last message to be acknowledged
|
|
* @param multiple if true will acknowledge all messages up to and including the one specified by the
|
|
* delivery tag
|
|
*/
|
|
internal void AcknowledgeMessage(ulong deliveryTag, bool multiple)
|
|
{
|
|
AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId, deliveryTag, multiple);
|
|
if (_logger.IsDebugEnabled)
|
|
{
|
|
_logger.Debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
|
|
}
|
|
// FIXME: lock FailoverMutex here?
|
|
_connection.ProtocolWriter.Write(ackFrame);
|
|
}
|
|
|
|
public void RejectMessage(ulong deliveryTag, bool requeue)
|
|
{
|
|
if ((_acknowledgeMode == AcknowledgeMode.ClientAcknowledge) || (_acknowledgeMode == AcknowledgeMode.SessionTransacted))
|
|
{
|
|
AMQFrame rejectFrame = BasicRejectBody.CreateAMQFrame(_channelId, deliveryTag, requeue);
|
|
_connection.ProtocolWriter.Write(rejectFrame);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Handle a message that bounced from the server, creating
|
|
/// the corresponding exception and notifying the connection about it
|
|
/// </summary>
|
|
/// <param name="message">Unprocessed message</param>
|
|
private void ReturnBouncedMessage(UnprocessedMessage message)
|
|
{
|
|
try
|
|
{
|
|
AbstractQmsMessage bouncedMessage =
|
|
_messageFactoryRegistry.CreateMessage(0, false, message.ContentHeader, message.Bodies);
|
|
|
|
int errorCode = message.BounceBody.ReplyCode;
|
|
string reason = message.BounceBody.ReplyText;
|
|
_logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")");
|
|
AMQException exception;
|
|
|
|
if (errorCode == AMQConstant.NO_CONSUMERS.Code)
|
|
{
|
|
exception = new AMQNoConsumersException(reason, bouncedMessage);
|
|
}
|
|
else if (errorCode == AMQConstant.NO_ROUTE.Code)
|
|
{
|
|
exception = new AMQNoRouteException(reason, bouncedMessage);
|
|
}
|
|
else
|
|
{
|
|
exception = new AMQUndeliveredException(errorCode, reason, bouncedMessage);
|
|
}
|
|
|
|
_connection.ExceptionReceived(exception);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", ex);
|
|
}
|
|
}
|
|
|
|
private void OnPrefetchLowMark(int count)
|
|
{
|
|
if (_acknowledgeMode == AcknowledgeMode.NoAcknowledge)
|
|
{
|
|
_logger.Warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + count);
|
|
Suspend(false);
|
|
}
|
|
}
|
|
|
|
private void OnPrefetchHighMark(int count)
|
|
{
|
|
if (_acknowledgeMode == AcknowledgeMode.NoAcknowledge)
|
|
{
|
|
_logger.Warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + count);
|
|
Suspend(true);
|
|
}
|
|
}
|
|
|
|
private void Suspend(bool suspend)
|
|
{
|
|
lock (_suspensionLock)
|
|
{
|
|
if (_logger.IsDebugEnabled)
|
|
{
|
|
_logger.Debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended"));
|
|
}
|
|
|
|
_suspended = suspend;
|
|
AMQFrame frame = ChannelFlowBody.CreateAMQFrame(_channelId, !suspend);
|
|
|
|
Connection.ConvenientProtocolWriter.SyncWrite(frame, typeof(ChannelFlowOkBody));
|
|
}
|
|
}
|
|
|
|
/// <summary>A Dispatcher turns the consumption of incoming messages from an arrival queue, into event notifications on consumers.
|
|
/// The arrival queue is typically a blocking queue, on which a dispatcher waits for messages to consume. Upon receipt of a message
|
|
/// the dispatcher finds the consumer that is listening to the queue to which the message has been send and notifies it of the new
|
|
/// message.
|
|
///
|
|
/// <p/>The Dispatcher also contains logic to recognize bounced messages. Bounced messages returned from the broker can be
|
|
/// told apart from regular deliveries because they do not have a delivery queue set on them. When the dispatcher receives a
|
|
/// bounced message it creates an exception and notifies the connection, to which its containing channel belongs, of the condition.
|
|
///
|
|
/// <p/><table id="crc"><caption>CRC Card</caption>
|
|
/// <tr><th> Responsibilities <th> Collaborations
|
|
/// <tr><td> Notify consumers of message arrivals on their queues. <td> <see cref="BasicMessageConsumer"/>
|
|
/// <tr><td> Notify the containing connection of bounced message arrivals. <td> <see cref="AMQConnection"/>
|
|
/// </table>
|
|
/// </summary>
|
|
///
|
|
/// <remarks>Stop mechanism seems wrong, as queue consume is evaluated after stop flag, so could consume and notify one more message.
|
|
/// Placing stop check after consume may also be wrong as it may cause a message to be thrown away. Seems more correct to use interupt on
|
|
/// the block thread to cause it to prematurely return from its wait, whereupon it can be made to re-check the stop flag.</remarks>
|
|
///
|
|
/// <remarks>Exception swallowed, if there is an exception whilst notifying the connection on bounced messages. Unhandled excetpion should
|
|
/// fall through and terminate the loop, as it is a bug if it occurrs.</remarks>
|
|
private class Dispatcher
|
|
{
|
|
/// <summary> Flag used to indicate when this dispatcher is to be stopped (0=go, 1=stop). </summary>
|
|
private int _stopped = 0;
|
|
|
|
/// <summary> The channel for which this is a dispatcher. </summary>
|
|
private AmqChannel _containingChannel;
|
|
|
|
/// <summary> Creates a dispatcher on the specified channel. </summary>
|
|
///
|
|
/// <param name="containingChannel"> The channel on which this is a dispatcher. </param>
|
|
public Dispatcher(AmqChannel containingChannel)
|
|
{
|
|
_containingChannel = containingChannel;
|
|
}
|
|
|
|
/// <summary>The message dispatch loop. Consumes messages from the channels queue, notifying consumers of regular deliveries, and
|
|
/// the connection of bounced messages.</summary>
|
|
public void RunDispatcher()
|
|
{
|
|
UnprocessedMessage message;
|
|
|
|
while (_stopped == 0 && (message = (UnprocessedMessage)_containingChannel._queue.Dequeue()) != null)
|
|
{
|
|
if (message.DeliverBody != null)
|
|
{
|
|
BasicMessageConsumer consumer = (BasicMessageConsumer) _containingChannel._consumers[message.DeliverBody.ConsumerTag];
|
|
|
|
if (consumer == null)
|
|
{
|
|
_logger.Warn("Received a message from queue " + message.DeliverBody.ConsumerTag + " without a f - ignoring...");
|
|
}
|
|
else
|
|
{
|
|
consumer.NotifyMessage(message, _containingChannel.ChannelId);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
try
|
|
{
|
|
// Bounced message is processed here, away from the transport thread
|
|
AbstractQmsMessage bouncedMessage = _containingChannel._messageFactoryRegistry.
|
|
CreateMessage(0, false, message.ContentHeader, message.Bodies);
|
|
|
|
int errorCode = message.BounceBody.ReplyCode;
|
|
string reason = message.BounceBody.ReplyText;
|
|
|
|
_logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")");
|
|
|
|
_containingChannel._connection.ExceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
_logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e);
|
|
}
|
|
}
|
|
}
|
|
|
|
_logger.Debug("Dispatcher thread terminating for channel: " + _containingChannel._channelId + ".");
|
|
}
|
|
|
|
/// <summary> Sets a stop flag on this dispatcher, which causes its dispatch loop to exit at the next available opportunity. </summary>
|
|
public void StopDispatcher()
|
|
{
|
|
Interlocked.Exchange(ref _stopped, 1);
|
|
}
|
|
}
|
|
}
|
|
}
|