Former-commit-id:a02aeb236c
[formerly9f19e3f712
] [formerlya02aeb236c
[formerly9f19e3f712
] [formerly06a8b51d6d
[formerly 64fa9254b946eae7e61bbc3f513b7c3696c4f54f]]] Former-commit-id:06a8b51d6d
Former-commit-id:8e80217e59
[formerly3360eb6c5f
] Former-commit-id:377dcd10b9
375 lines
11 KiB
C#
375 lines
11 KiB
C#
/*
|
|
*
|
|
* Licensed to the Apache Software Foundation (ASF) under one
|
|
* or more contributor license agreements. See the NOTICE file
|
|
* distributed with this work for additional information
|
|
* regarding copyright ownership. The ASF licenses this file
|
|
* to you under the Apache License, Version 2.0 (the
|
|
* "License"); you may not use this file except in compliance
|
|
* with the License. You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing,
|
|
* software distributed under the License is distributed on an
|
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
* KIND, either express or implied. See the License for the
|
|
* specific language governing permissions and limitations
|
|
* under the License.
|
|
*
|
|
*/
|
|
using System;
|
|
using System.Threading;
|
|
|
|
namespace Apache.Qpid.Collections
|
|
{
|
|
public class SynchronousQueue : BlockingQueue
|
|
{
|
|
/// <summary>
|
|
/// Lock protecting both wait queues
|
|
/// </summary>
|
|
// private readonly object _qlock = new object();
|
|
|
|
/// <summary>
|
|
/// Queue holding waiting puts
|
|
/// </summary>
|
|
// private readonly WaitQueue _waitingProducers;
|
|
|
|
/// <summary>
|
|
/// Queue holding waiting takes
|
|
/// </summary>
|
|
// private readonly WaitQueue _waitingConsumers;
|
|
|
|
/**
|
|
* Queue to hold waiting puts/takes; specialized to Fifo/Lifo below.
|
|
* These queues have all transient fields, but are serializable
|
|
* in order to recover fairness settings when deserialized.
|
|
*/
|
|
internal abstract class WaitQueue
|
|
{
|
|
/** Creates, adds, and returns node for x. */
|
|
internal abstract Node Enq(Object x);
|
|
/** Removes and returns node, or null if empty. */
|
|
internal abstract Node Deq();
|
|
/** Removes a cancelled node to avoid garbage retention. */
|
|
internal abstract void Unlink(Node node);
|
|
/** Returns true if a cancelled node might be on queue. */
|
|
internal abstract bool ShouldUnlink(Node node);
|
|
}
|
|
|
|
/**
|
|
* FIFO queue to hold waiting puts/takes.
|
|
*/
|
|
sealed class FifoWaitQueue : WaitQueue
|
|
{
|
|
private Node head;
|
|
private Node last;
|
|
|
|
internal override Node Enq(Object x)
|
|
{
|
|
Node p = new Node(x);
|
|
if (last == null)
|
|
{
|
|
last = head = p;
|
|
}
|
|
else
|
|
{
|
|
last = last.next = p;
|
|
}
|
|
return p;
|
|
}
|
|
|
|
internal override Node Deq()
|
|
{
|
|
Node p = head;
|
|
if (p != null)
|
|
{
|
|
if ((head = p.next) == null)
|
|
{
|
|
last = null;
|
|
}
|
|
p.next = null;
|
|
}
|
|
return p;
|
|
}
|
|
|
|
internal override bool ShouldUnlink(Node node)
|
|
{
|
|
return (node == last || node.next != null);
|
|
}
|
|
|
|
internal override void Unlink(Node node)
|
|
{
|
|
Node p = head;
|
|
Node trail = null;
|
|
while (p != null)
|
|
{
|
|
if (p == node)
|
|
{
|
|
Node next = p.next;
|
|
if (trail == null)
|
|
{
|
|
head = next;
|
|
}
|
|
else
|
|
{
|
|
trail.next = next;
|
|
}
|
|
if (last == node)
|
|
{
|
|
last = trail;
|
|
}
|
|
break;
|
|
}
|
|
trail = p;
|
|
p = p.next;
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* LIFO queue to hold waiting puts/takes.
|
|
*/
|
|
sealed class LifoWaitQueue : WaitQueue
|
|
{
|
|
private Node head;
|
|
|
|
internal override Node Enq(Object x)
|
|
{
|
|
return head = new Node(x, head);
|
|
}
|
|
|
|
internal override Node Deq()
|
|
{
|
|
Node p = head;
|
|
if (p != null)
|
|
{
|
|
head = p.next;
|
|
p.next = null;
|
|
}
|
|
return p;
|
|
}
|
|
|
|
internal override bool ShouldUnlink(Node node)
|
|
{
|
|
// Return false if already dequeued or is bottom node (in which
|
|
// case we might retain at most one garbage node)
|
|
return (node == head || node.next != null);
|
|
}
|
|
|
|
internal override void Unlink(Node node)
|
|
{
|
|
Node p = head;
|
|
Node trail = null;
|
|
while (p != null)
|
|
{
|
|
if (p == node)
|
|
{
|
|
Node next = p.next;
|
|
if (trail == null)
|
|
head = next;
|
|
else
|
|
trail.next = next;
|
|
break;
|
|
}
|
|
trail = p;
|
|
p = p.next;
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Nodes each maintain an item and handle waits and signals for
|
|
* getting and setting it. The class extends
|
|
* AbstractQueuedSynchronizer to manage blocking, using AQS state
|
|
* 0 for waiting, 1 for ack, -1 for cancelled.
|
|
*/
|
|
sealed internal class Node
|
|
{
|
|
|
|
/** Synchronization state value representing that node acked */
|
|
private const int ACK = 1;
|
|
/** Synchronization state value representing that node cancelled */
|
|
private const int CANCEL = -1;
|
|
|
|
internal int state = 0;
|
|
|
|
/** The item being transferred */
|
|
internal Object item;
|
|
/** Next node in wait queue */
|
|
internal Node next;
|
|
|
|
/** Creates a node with initial item */
|
|
internal Node(Object x)
|
|
{
|
|
item = x;
|
|
}
|
|
|
|
/** Creates a node with initial item and next */
|
|
internal Node(Object x, Node n)
|
|
{
|
|
item = x;
|
|
next = n;
|
|
}
|
|
|
|
/**
|
|
* Takes item and nulls out field (for sake of GC)
|
|
*
|
|
* PRE: lock owned
|
|
*/
|
|
private Object Extract()
|
|
{
|
|
Object x = item;
|
|
item = null;
|
|
return x;
|
|
}
|
|
|
|
/**
|
|
* Tries to cancel on interrupt; if so rethrowing,
|
|
* else setting interrupt state
|
|
*
|
|
* PRE: lock owned
|
|
*/
|
|
/*private void checkCancellationOnInterrupt(InterruptedException ie)
|
|
throws InterruptedException
|
|
{
|
|
if (state == 0) {
|
|
state = CANCEL;
|
|
notify();
|
|
throw ie;
|
|
}
|
|
Thread.currentThread().interrupt();
|
|
}*/
|
|
|
|
/**
|
|
* Fills in the slot created by the consumer and signal consumer to
|
|
* continue.
|
|
*/
|
|
internal bool SetItem(Object x)
|
|
{
|
|
lock (this)
|
|
{
|
|
if (state != 0) return false;
|
|
item = x;
|
|
state = ACK;
|
|
Monitor.Pulse(this);
|
|
return true;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Removes item from slot created by producer and signal producer
|
|
* to continue.
|
|
*/
|
|
internal Object GetItem()
|
|
{
|
|
if (state != 0) return null;
|
|
state = ACK;
|
|
Monitor.Pulse(this);
|
|
return Extract();
|
|
}
|
|
|
|
/**
|
|
* Waits for a consumer to take item placed by producer.
|
|
*/
|
|
internal void WaitForTake() //throws InterruptedException {
|
|
{
|
|
while (state == 0)
|
|
{
|
|
Monitor.Wait(this);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Waits for a producer to put item placed by consumer.
|
|
*/
|
|
internal object WaitForPut()
|
|
{
|
|
lock (this)
|
|
{
|
|
while (state == 0) Monitor.Wait(this);
|
|
}
|
|
return Extract();
|
|
}
|
|
|
|
private bool Attempt(long nanos)
|
|
{
|
|
if (state != 0) return true;
|
|
if (nanos <= 0) {
|
|
state = CANCEL;
|
|
Monitor.Pulse(this);
|
|
return false;
|
|
}
|
|
|
|
while (true)
|
|
{
|
|
Monitor.Wait(nanos);
|
|
//TimeUnit.NANOSECONDS.timedWait(this, nanos);
|
|
if (state != 0)
|
|
{
|
|
return true;
|
|
}
|
|
//nanos = deadline - Utils.nanoTime();
|
|
//if (nanos <= 0)
|
|
else
|
|
{
|
|
state = CANCEL;
|
|
Monitor.Pulse(this);
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Waits for a consumer to take item placed by producer or time out.
|
|
*/
|
|
internal bool WaitForTake(long nanos)
|
|
{
|
|
return Attempt(nanos);
|
|
}
|
|
|
|
/**
|
|
* Waits for a producer to put item placed by consumer, or time out.
|
|
*/
|
|
internal object WaitForPut(long nanos)
|
|
{
|
|
if (!Attempt(nanos))
|
|
{
|
|
return null;
|
|
}
|
|
else
|
|
{
|
|
return Extract();
|
|
}
|
|
}
|
|
}
|
|
|
|
public SynchronousQueue(bool strict)
|
|
{
|
|
// TODO !!!!
|
|
}
|
|
|
|
public override bool EnqueueNoThrow(object e)
|
|
{
|
|
throw new NotImplementedException();
|
|
}
|
|
|
|
public override void EnqueueBlocking(object e)
|
|
{
|
|
throw new NotImplementedException();
|
|
}
|
|
|
|
public override object DequeueBlocking()
|
|
{
|
|
throw new NotImplementedException();
|
|
}
|
|
|
|
public override int RemainingCapacity
|
|
{
|
|
get
|
|
{
|
|
throw new NotImplementedException();
|
|
}
|
|
}
|
|
}
|
|
}
|