121 lines
3.6 KiB
C#
121 lines
3.6 KiB
C#
/*
|
|
*
|
|
* Licensed to the Apache Software Foundation (ASF) under one
|
|
* or more contributor license agreements. See the NOTICE file
|
|
* distributed with this work for additional information
|
|
* regarding copyright ownership. The ASF licenses this file
|
|
* to you under the Apache License, Version 2.0 (the
|
|
* "License"); you may not use this file except in compliance
|
|
* with the License. You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing,
|
|
* software distributed under the License is distributed on an
|
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
* KIND, either express or implied. See the License for the
|
|
* specific language governing permissions and limitations
|
|
* under the License.
|
|
*
|
|
*/
|
|
using System;
|
|
using System.Collections;
|
|
using System.Threading;
|
|
using log4net;
|
|
using Apache.Qpid.Client.Protocol;
|
|
using Apache.Qpid.Framing;
|
|
|
|
namespace Qpid.Client.Transport.Socket.Blocking
|
|
{
|
|
public class BlockingSocketTransport : ITransport
|
|
{
|
|
// static readonly ILog _log = LogManager.GetLogger(typeof(BlockingSocketTransport));
|
|
|
|
// Configuration variables.
|
|
string _host;
|
|
int _port;
|
|
IProtocolListener _protocolListener;
|
|
|
|
// Runtime variables.
|
|
private BlockingSocketProcessor _socketProcessor;
|
|
private AmqpChannel _amqpChannel;
|
|
|
|
private ReaderRunner _readerRunner;
|
|
private Thread _readerThread;
|
|
|
|
public BlockingSocketTransport(string host, int port, AMQConnection connection)
|
|
{
|
|
_host = host;
|
|
_port = port;
|
|
_protocolListener = connection.ProtocolListener;
|
|
}
|
|
|
|
public void Open()
|
|
{
|
|
_socketProcessor = new BlockingSocketProcessor(_host, _port, _protocolListener);
|
|
_socketProcessor.Connect();
|
|
_amqpChannel = new AmqpChannel(_socketProcessor.ByteChannel);
|
|
_readerRunner = new ReaderRunner(this);
|
|
_readerThread = new Thread(new ThreadStart(_readerRunner.Run));
|
|
_readerThread.Start();
|
|
}
|
|
|
|
public string getLocalEndPoint()
|
|
{
|
|
return _socketProcessor.getLocalEndPoint();
|
|
}
|
|
|
|
public void Close()
|
|
{
|
|
StopReaderThread();
|
|
_socketProcessor.Disconnect();
|
|
}
|
|
|
|
public IProtocolChannel ProtocolChannel { get { return _amqpChannel; } }
|
|
public IProtocolWriter ProtocolWriter { get { return _amqpChannel; } }
|
|
|
|
public void StopReaderThread()
|
|
{
|
|
_readerRunner.Stop();
|
|
}
|
|
|
|
class ReaderRunner
|
|
{
|
|
BlockingSocketTransport _transport;
|
|
bool _running = true;
|
|
|
|
public ReaderRunner(BlockingSocketTransport transport)
|
|
{
|
|
_transport = transport;
|
|
}
|
|
|
|
public void Run()
|
|
{
|
|
try
|
|
{
|
|
while (_running)
|
|
{
|
|
Queue frames = _transport.ProtocolChannel.Read();
|
|
|
|
foreach (IDataBlock dataBlock in frames)
|
|
{
|
|
_transport._protocolListener.OnMessage(dataBlock);
|
|
}
|
|
}
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
_transport._protocolListener.OnException(e);
|
|
}
|
|
}
|
|
|
|
public void Stop()
|
|
{
|
|
// TODO: Check if this is thread safe. running is not volitile....
|
|
_running = false;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|