150 lines
5.2 KiB
Text
150 lines
5.2 KiB
Text
/*
|
|
*
|
|
* Licensed to the Apache Software Foundation (ASF) under one
|
|
* or more contributor license agreements. See the NOTICE file
|
|
* distributed with this work for additional information
|
|
* regarding copyright ownership. The ASF licenses this file
|
|
* to you under the Apache License, Version 2.0 (the
|
|
* "License"); you may not use this file except in compliance
|
|
* with the License. You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing,
|
|
* software distributed under the License is distributed on an
|
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
* KIND, either express or implied. See the License for the
|
|
* specific language governing permissions and limitations
|
|
* under the License.
|
|
*
|
|
*/
|
|
using System;
|
|
using System.Threading;
|
|
using log4net;
|
|
using NUnit.Framework;
|
|
using Apache.Qpid.Messaging;
|
|
|
|
namespace Apache.Qpid.Integration.Tests.testcases
|
|
{
|
|
[TestFixture, Category("Integration")]
|
|
public class ServiceProvidingClient : BaseMessagingTestFixture
|
|
{
|
|
private static ILog _logger = LogManager.GetLogger(typeof(ServiceProvidingClient));
|
|
|
|
private int _messageCount;
|
|
|
|
private string _replyToExchangeName;
|
|
private string _replyToRoutingKey;
|
|
const int PACK = 100;
|
|
|
|
private IMessagePublisher _destinationPublisher;
|
|
private IMessageConsumer _consumer;
|
|
|
|
private string _serviceName = "ServiceQ1";
|
|
|
|
private string _selector = null;
|
|
|
|
[SetUp]
|
|
public override void Init()
|
|
{
|
|
base.Init();
|
|
|
|
_logger.Info("Starting...");
|
|
_logger.Info("Service (queue) name is '" + _serviceName + "'...");
|
|
|
|
_connection.ExceptionListener = new ExceptionListenerDelegate(OnConnectionException);
|
|
|
|
_logger.Info("Message selector is <" + _selector + ">...");
|
|
|
|
_channel.DeclareQueue(_serviceName, false, false, false);
|
|
|
|
_consumer = _channel.CreateConsumerBuilder(_serviceName)
|
|
.WithPrefetchLow(100)
|
|
.WithPrefetchHigh(500)
|
|
.WithNoLocal(true)
|
|
.Create();
|
|
_consumer.OnMessage = new MessageReceivedDelegate(OnMessage);
|
|
}
|
|
|
|
public override void Shutdown()
|
|
{
|
|
_consumer.Dispose();
|
|
base.Shutdown();
|
|
}
|
|
|
|
private void OnConnectionException(Exception e)
|
|
{
|
|
_logger.Info("Connection exception occurred", e);
|
|
// XXX: Test still doesn't shutdown when broker terminates. Is there no heartbeat?
|
|
}
|
|
|
|
[Test]
|
|
public void Test()
|
|
{
|
|
_connection.Start();
|
|
_logger.Info("Waiting...");
|
|
|
|
ServiceRequestingClient client = new ServiceRequestingClient();
|
|
client.Init();
|
|
client.SendMessages();
|
|
}
|
|
|
|
private void OnMessage(IMessage message)
|
|
{
|
|
// _logger.Info("Got message '" + message + "'");
|
|
|
|
ITextMessage tm = (ITextMessage)message;
|
|
|
|
try
|
|
{
|
|
string replyToExchangeName = tm.ReplyToExchangeName;
|
|
string replyToRoutingKey = tm.ReplyToRoutingKey;
|
|
|
|
_replyToExchangeName = replyToExchangeName;
|
|
_replyToRoutingKey = replyToRoutingKey;
|
|
_logger.Debug("About to create a producer");
|
|
|
|
// Console.WriteLine("ReplyTo.ExchangeName = " + _replyToExchangeName);
|
|
// Console.WriteLine("ReplyTo.RoutingKey = " + _replyToRoutingKey);
|
|
|
|
_destinationPublisher = _channel.CreatePublisherBuilder()
|
|
.WithExchangeName(_replyToExchangeName)
|
|
.WithRoutingKey(_replyToRoutingKey)
|
|
.WithDeliveryMode(DeliveryMode.NonPersistent)
|
|
.Create();
|
|
_destinationPublisher.DisableMessageTimestamp = true;
|
|
_logger.Debug("After create a producer");
|
|
}
|
|
catch (QpidException e)
|
|
{
|
|
_logger.Error("Error creating destination", e);
|
|
throw e;
|
|
}
|
|
_messageCount++;
|
|
if (_messageCount % PACK == 0)
|
|
{
|
|
_logger.Info("Received message total: " + _messageCount);
|
|
_logger.Info(string.Format("Sending response to '{0}:{1}'",
|
|
_replyToExchangeName, _replyToRoutingKey));
|
|
}
|
|
|
|
try
|
|
{
|
|
String payload = "This is a response: sing together: 'Mahnah mahnah...'" + tm.Text;
|
|
ITextMessage msg = _channel.CreateTextMessage(payload);
|
|
if ( tm.Headers.Contains("timeSent") )
|
|
{
|
|
msg.Headers["timeSent"] = tm.Headers["timeSent"];
|
|
}
|
|
_destinationPublisher.Send(msg);
|
|
} catch ( QpidException e )
|
|
{
|
|
_logger.Error("Error sending message: " + e, e);
|
|
throw e;
|
|
} finally
|
|
{
|
|
_destinationPublisher.Dispose();
|
|
}
|
|
}
|
|
}
|
|
}
|