282 lines
12 KiB
C#
282 lines
12 KiB
C#
/*
|
|
*
|
|
* Licensed to the Apache Software Foundation (ASF) under one
|
|
* or more contributor license agreements. See the NOTICE file
|
|
* distributed with this work for additional information
|
|
* regarding copyright ownership. The ASF licenses this file
|
|
* to you under the Apache License, Version 2.0 (the
|
|
* "License"); you may not use this file except in compliance
|
|
* with the License. You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing,
|
|
* software distributed under the License is distributed on an
|
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
* KIND, either express or implied. See the License for the
|
|
* specific language governing permissions and limitations
|
|
* under the License.
|
|
*
|
|
*/
|
|
using System;
|
|
using System.Threading;
|
|
using log4net;
|
|
using NUnit.Framework;
|
|
using Apache.Qpid.Framing;
|
|
using Apache.Qpid.Messaging;
|
|
using Apache.Qpid.Client.Qms;
|
|
using Apache.Qpid.Client;
|
|
|
|
namespace Apache.Qpid.Integration.Tests.testcases
|
|
{
|
|
/// <summary>
|
|
/// Sets up a producer/consumer pair to send test messages through a header exchange. The header exchange matching pattern is tested to
|
|
/// verify that it correctly matches or filters out messages based on their headers.
|
|
///
|
|
/// Check that a message matching all fields of a headers exchange is passed by the exchange.
|
|
/// Check that a message containing values for empty fields of a headers exchange is passed by the exchange.
|
|
/// Check that a message matching only some fields of a headers exhcnage is not passed by the exchange.
|
|
/// Check that a message with additional fields to the correct matching fields of a headers exchange is passed by the exchange.
|
|
/// </summary>
|
|
///
|
|
/// <todo>Remove the HeadersMatchingProducer class and rename this to HeaderExchangeTest. The producer and consumer are implemented
|
|
/// in a single test class to make running this as part of an automated test suite possible.</todo>
|
|
///
|
|
/// <todo>Consider not using a delegate to callback the OnMessage method. Easier to just call receive on the consumer but using the
|
|
/// callback does demonstrate how to do so.</todo>
|
|
[TestFixture, Category("Integration")]
|
|
public class HeadersExchangeTest : BaseMessagingTestFixture
|
|
{
|
|
private static ILog _logger = LogManager.GetLogger(typeof(HeadersExchangeTest));
|
|
|
|
/// <summary> Holds the default test timeout for broker communications before tests give up. </summary>
|
|
private static readonly int TIMEOUT = 2000;
|
|
|
|
/// <summary> Holds the name of the headers exchange to create to send test messages on. </summary>
|
|
private string _exchangeName = "ServiceQ1";
|
|
|
|
/// <summary> Used to preserve the most recent exception in case test cases need to examine it. </summary>
|
|
private Exception _lastException = null;
|
|
|
|
/// <summary> Used to preserve the most recent message from the test consumer. </summary>
|
|
private IMessage _lastMessage = null;
|
|
|
|
/// <summary> The test consumer to get messages from the broker with. </summary>
|
|
private IMessageConsumer _consumer;
|
|
|
|
private IMessagePublisher _publisher;
|
|
|
|
private AutoResetEvent _evt = new AutoResetEvent(false);
|
|
|
|
private MessageReceivedDelegate _msgRecDelegate;
|
|
private ExceptionListenerDelegate _exceptionDelegate;
|
|
|
|
/// <summary> Holds the test connection. </summary>
|
|
protected IConnection _connection;
|
|
|
|
/// <summary> Holds the test channel. </summary>
|
|
protected IChannel _channel;
|
|
|
|
[SetUp]
|
|
public override void Init()
|
|
{
|
|
// Ensure that the base init method is called. It establishes a connection with the broker.
|
|
base.Init();
|
|
|
|
connectionInfo = QpidConnectionInfo.FromUrl(connectionUri);
|
|
_connection = new AMQConnection(connectionInfo);
|
|
_channel = _connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge, 500, 300);
|
|
|
|
_logger.Info("Starting...");
|
|
_logger.Info("Exchange name is '" + _exchangeName + "'...");
|
|
|
|
// Register this to listen for exceptions on the test connection.
|
|
_exceptionDelegate = new ExceptionListenerDelegate(OnException);
|
|
_connection.ExceptionListener += _exceptionDelegate;
|
|
|
|
// Declare a new headers exchange with the name of the test service.
|
|
_channel.DeclareExchange(_exchangeName, ExchangeClassConstants.HEADERS);
|
|
|
|
// Create a non-durable, temporary (aka auto-delete), exclusive queue.
|
|
string queueName = _channel.GenerateUniqueName();
|
|
_channel.DeclareQueue(queueName, false, true, true);
|
|
|
|
// Bind the queue to the new headers exchange, setting up some header patterns for the exchange to match.
|
|
_channel.Bind(queueName, _exchangeName, null, CreatePatternAsFieldTable());
|
|
|
|
// Create a test consumer to consume messages from the test exchange.
|
|
_consumer = _channel.CreateConsumerBuilder(queueName)
|
|
.WithPrefetchLow(100)
|
|
.WithPrefetchHigh(500)
|
|
.WithNoLocal(false) // make sure we get our own messages
|
|
.Create();
|
|
|
|
// Register this to listen for messages on the consumer.
|
|
_msgRecDelegate = new MessageReceivedDelegate(OnMessage);
|
|
_consumer.OnMessage += _msgRecDelegate;
|
|
|
|
// Clear the most recent message and exception.
|
|
_lastException = null;
|
|
_lastMessage = null;
|
|
|
|
_publisher = _channel.CreatePublisherBuilder()
|
|
.WithExchangeName(_exchangeName)
|
|
.WithMandatory(true)
|
|
.Create();
|
|
|
|
_publisher.DeliveryMode = DeliveryMode.NonPersistent;
|
|
|
|
// Start all channel
|
|
_connection.Start();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Deregisters the on message delegate before closing the connection.
|
|
/// </summary>
|
|
[TearDown]
|
|
public override void Shutdown()
|
|
{
|
|
_logger.Info("public void Shutdown(): called");
|
|
|
|
//_consumer.OnMessage -= _msgRecDelegate;
|
|
//_connection.ExceptionListener -= _exceptionDelegate;
|
|
|
|
_connection.Stop();
|
|
_connection.Close();
|
|
_connection.Dispose();
|
|
|
|
base.Shutdown();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Callback method that is passed any messages received on the test channel.
|
|
/// </summary>
|
|
///
|
|
/// <param name="message">The received message.</param>
|
|
public void OnMessage(IMessage message)
|
|
{
|
|
_logger.Debug(string.Format("message.Type = {0}", message.GetType()));
|
|
_logger.Debug("Got message '" + message + "'");
|
|
|
|
// Preserve the most recent exception so that test cases can examine it.
|
|
_lastMessage = message;
|
|
|
|
// Notify any waiting threads that a message has been received.
|
|
_evt.Set();
|
|
}
|
|
|
|
/// <summary>Callback method to handle any exceptions raised by the test connection.</summary>
|
|
///
|
|
/// <param name="e">The connection exception.</param>
|
|
public void OnException(Exception e)
|
|
{
|
|
// Preserve the most recent exception in case test cases need to examine it.
|
|
_lastException = e;
|
|
|
|
// Notify any waiting threads that an exception event has occurred.
|
|
_evt.Set();
|
|
}
|
|
|
|
/// <summary>Check that a message matching all fields of a headers exchange is passed by the exchange.</summary>
|
|
[Test]
|
|
public void TestMatchAll()
|
|
{
|
|
IMessage msg = _channel.CreateTextMessage("matches match2=''");
|
|
msg.Headers["match1"] = "foo";
|
|
msg.Headers["match2"] = "";
|
|
|
|
// Use the SendTestMessage helper method to verify that the message was sent and received.
|
|
SendTestMessage(msg, true);
|
|
}
|
|
|
|
/// <summary>Check that a message containing values for empty fields of a headers exchange is passed by the exchange.</summary>
|
|
[Test]
|
|
public void TestMatchEmptyMatchesAnything()
|
|
{
|
|
// Send a test message that matches the headers exchange.
|
|
IMessage msg = _channel.CreateTextMessage("matches match1='foo' and match2='bar'");
|
|
msg.Headers["match1"] = "foo";
|
|
msg.Headers["match2"] = "bar";
|
|
|
|
// Use the SendTestMessage helper method to verify that the message was sent and received.
|
|
SendTestMessage(msg, true);
|
|
}
|
|
|
|
/// <summary>Check that a message matching only some fields of a headers exchange is not passed by the exchange.</summary>
|
|
[Test]
|
|
public void TestMatchOneFails()
|
|
{
|
|
IMessage msg = _channel.CreateTextMessage("not match - only match1");
|
|
msg.Headers["match1"] = "foo";
|
|
|
|
// Use the SendTestMessage helper method to verify that the message was sent and not received.
|
|
SendTestMessage(msg, false);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Check that a message with additional fields to the correct matching fields of a headers exchange is passed by
|
|
/// the exchange.
|
|
/// </summary>
|
|
[Test]
|
|
public void TestMatchExtraFields()
|
|
{
|
|
IMessage msg = _channel.CreateTextMessage("matches - extra headers");
|
|
msg.Headers["match1"] = "foo";
|
|
msg.Headers["match2"] = "bar";
|
|
msg.Headers["match3"] = "not required";
|
|
|
|
// Use the SendTestMessage helper method to verify that the message was sent and received.
|
|
SendTestMessage(msg, true);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Sends the specified message to the test publisher, and confirms that it was received by the test consumer or not
|
|
/// depending on whether or not the message should be received by the consumer.
|
|
///
|
|
/// Any exceptions raised by the connection will cause an Assert failure exception to be raised.
|
|
/// </summary>
|
|
///
|
|
/// <param name="msgSend">The message to send.</param>
|
|
/// <param name="shouldPass">A flag to indicate whether or not the message should be received by the consumer.</param>
|
|
private void SendTestMessage(IMessage msgSend, bool shouldPass)
|
|
{
|
|
_publisher.Send(msgSend);
|
|
_evt.WaitOne(TIMEOUT, true);
|
|
|
|
// Check that an exception other than not routable was raised in which case re-raise it as a test error.
|
|
if (_lastException != null && !(_lastException.InnerException is AMQUndeliveredException))
|
|
{
|
|
Assert.Fail("Exception {0} was raised by the broker connection.", _lastException);
|
|
}
|
|
// Check that a message was returned if the test is expecting the message to pass.
|
|
else if (shouldPass)
|
|
{
|
|
Assert.IsNotNull(_lastMessage, "Did not get a matching message from the headers exchange.");
|
|
}
|
|
// Check that a not routable exception was raised if the test is expecting the message to fail.
|
|
else if (_lastException != null && _lastException.InnerException is AMQUndeliveredException)
|
|
{
|
|
Assert.IsNull(_lastMessage, "Message could not be routed so consumer should not have received it.");
|
|
}
|
|
// The broker did not respond within the test timeout so fail the test.
|
|
else
|
|
{
|
|
Assert.Fail("The test timed out without a response from the broker.");
|
|
}
|
|
}
|
|
|
|
/// <summary> Returns a field table containing patterns to match the test header exchange against. </summary>
|
|
///
|
|
/// <returns> A field table containing test patterns. </returns>
|
|
private FieldTable CreatePatternAsFieldTable()
|
|
{
|
|
FieldTable matchTable = new FieldTable();
|
|
|
|
matchTable["match1"] = "foo";
|
|
matchTable["match2"] = "";
|
|
matchTable["x-match"] = "all";
|
|
|
|
return matchTable;
|
|
}
|
|
}
|
|
}
|