|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
|
|
/* |
|
* This file is available under and governed by the GNU General Public |
|
* License version 2 only, as published by the Free Software Foundation. |
|
* However, the following notice accompanied the original version of this |
|
* file: |
|
* |
|
* Written by Doug Lea, Bill Scherer, and Michael Scott with |
|
* assistance from members of JCP JSR-166 Expert Group and released to |
|
* the public domain, as explained at |
|
* http://creativecommons.org/publicdomain/zero/1.0/ |
|
*/ |
|
|
|
package java.util.concurrent; |
|
|
|
import java.lang.invoke.MethodHandles; |
|
import java.lang.invoke.VarHandle; |
|
import java.util.AbstractQueue; |
|
import java.util.Collection; |
|
import java.util.Collections; |
|
import java.util.Iterator; |
|
import java.util.Objects; |
|
import java.util.Spliterator; |
|
import java.util.Spliterators; |
|
import java.util.concurrent.locks.LockSupport; |
|
import java.util.concurrent.locks.ReentrantLock; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
public class SynchronousQueue<E> extends AbstractQueue<E> |
|
implements BlockingQueue<E>, java.io.Serializable { |
|
private static final long serialVersionUID = -3223113410248163686L; |
|
|
|
/* |
|
* This class implements extensions of the dual stack and dual |
|
* queue algorithms described in "Nonblocking Concurrent Objects |
|
* with Condition Synchronization", by W. N. Scherer III and |
|
* M. L. Scott. 18th Annual Conf. on Distributed Computing, |
|
* Oct. 2004 (see also |
|
* http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html). |
|
* The (Lifo) stack is used for non-fair mode, and the (Fifo) |
|
* queue for fair mode. The performance of the two is generally |
|
* similar. Fifo usually supports higher throughput under |
|
* contention but Lifo maintains higher thread locality in common |
|
* applications. |
|
* |
|
* A dual queue (and similarly stack) is one that at any given |
|
* time either holds "data" -- items provided by put operations, |
|
* or "requests" -- slots representing take operations, or is |
|
* empty. A call to "fulfill" (i.e., a call requesting an item |
|
* from a queue holding data or vice versa) dequeues a |
|
* complementary node. The most interesting feature of these |
|
* queues is that any operation can figure out which mode the |
|
* queue is in, and act accordingly without needing locks. |
|
* |
|
* Both the queue and stack extend abstract class Transferer |
|
* defining the single method transfer that does a put or a |
|
* take. These are unified into a single method because in dual |
|
* data structures, the put and take operations are symmetrical, |
|
* so nearly all code can be combined. The resulting transfer |
|
* methods are on the long side, but are easier to follow than |
|
* they would be if broken up into nearly-duplicated parts. |
|
* |
|
* The queue and stack data structures share many conceptual |
|
* similarities but very few concrete details. For simplicity, |
|
* they are kept distinct so that they can later evolve |
|
* separately. |
|
* |
|
* The algorithms here differ from the versions in the above paper |
|
* in extending them for use in synchronous queues, as well as |
|
* dealing with cancellation. The main differences include: |
|
* |
|
* 1. The original algorithms used bit-marked pointers, but |
|
* the ones here use mode bits in nodes, leading to a number |
|
* of further adaptations. |
|
* 2. SynchronousQueues must block threads waiting to become |
|
* fulfilled. |
|
* 3. Support for cancellation via timeout and interrupts, |
|
* including cleaning out cancelled nodes/threads |
|
* from lists to avoid garbage retention and memory depletion. |
|
* |
|
* Blocking is mainly accomplished using LockSupport park/unpark, |
|
* except that nodes that appear to be the next ones to become |
|
* fulfilled first spin a bit (on multiprocessors only). On very |
|
* busy synchronous queues, spinning can dramatically improve |
|
* throughput. And on less busy ones, the amount of spinning is |
|
* small enough not to be noticeable. |
|
* |
|
* Cleaning is done in different ways in queues vs stacks. For |
|
* queues, we can almost always remove a node immediately in O(1) |
|
* time (modulo retries for consistency checks) when it is |
|
* cancelled. But if it may be pinned as the current tail, it must |
|
* wait until some subsequent cancellation. For stacks, we need a |
|
* potentially O(n) traversal to be sure that we can remove the |
|
* node, but this can run concurrently with other threads |
|
* accessing the stack. |
|
* |
|
* While garbage collection takes care of most node reclamation |
|
* issues that otherwise complicate nonblocking algorithms, care |
|
* is taken to "forget" references to data, other nodes, and |
|
* threads that might be held on to long-term by blocked |
|
* threads. In cases where setting to null would otherwise |
|
* conflict with main algorithms, this is done by changing a |
|
* node's link to now point to the node itself. This doesn't arise |
|
* much for Stack nodes (because blocked threads do not hang on to |
|
* old head pointers), but references in Queue nodes must be |
|
* aggressively forgotten to avoid reachability of everything any |
|
* node has ever referred to since arrival. |
|
* |
|
* The above steps improve throughput when many threads produce |
|
* and/or consume data. But they don't help much with |
|
* single-source / single-sink usages in which one side or the |
|
* other is always transiently blocked, and so throughput is |
|
* mainly a function of thread scheduling. This is not usually |
|
* noticeably improved with bounded short spin-waits. Instead both |
|
* forms of transfer try Thread.yield if apparently the sole |
|
* waiter. This works well when there are more tasks that cores, |
|
* which is expected to be the main usage context of this mode. In |
|
* other cases, waiters may help with some bookkeeping, then |
|
* park/unpark. |
|
*/ |
|
|
|
|
|
|
|
*/ |
|
abstract static class Transferer<E> { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
abstract E transfer(E e, boolean timed, long nanos); |
|
} |
|
|
|
|
|
|
|
|
|
*/ |
|
static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1023L; |
|
|
|
|
|
static final class TransferStack<E> extends Transferer<E> { |
|
/* |
|
* This extends Scherer-Scott dual stack algorithm, differing, |
|
* among other ways, by using "covering" nodes rather than |
|
* bit-marked pointers: Fulfilling operations push on marker |
|
* nodes (with FULFILLING bit set in mode) to reserve a spot |
|
* to match a waiting node. |
|
*/ |
|
|
|
/* Modes for SNodes, ORed together in node fields */ |
|
|
|
static final int REQUEST = 0; |
|
|
|
static final int DATA = 1; |
|
|
|
static final int FULFILLING = 2; |
|
|
|
|
|
static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; } |
|
|
|
|
|
static final class SNode implements ForkJoinPool.ManagedBlocker { |
|
volatile SNode next; |
|
volatile SNode match; |
|
volatile Thread waiter; |
|
Object item; |
|
int mode; |
|
// Note: item and mode fields don't need to be volatile |
|
// since they are always written before, and read after, |
|
// other volatile/atomic operations. |
|
|
|
SNode(Object item) { |
|
this.item = item; |
|
} |
|
|
|
boolean casNext(SNode cmp, SNode val) { |
|
return cmp == next && |
|
SNEXT.compareAndSet(this, cmp, val); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
boolean tryMatch(SNode s) { |
|
SNode m; Thread w; |
|
if ((m = match) == null) { |
|
if (SMATCH.compareAndSet(this, null, s)) { |
|
if ((w = waiter) != null) |
|
LockSupport.unpark(w); |
|
return true; |
|
} |
|
else |
|
m = match; |
|
} |
|
return m == s; |
|
} |
|
|
|
|
|
|
|
*/ |
|
boolean tryCancel() { |
|
return SMATCH.compareAndSet(this, null, this); |
|
} |
|
|
|
boolean isCancelled() { |
|
return match == this; |
|
} |
|
|
|
public final boolean isReleasable() { |
|
return match != null || Thread.currentThread().isInterrupted(); |
|
} |
|
|
|
public final boolean block() { |
|
while (!isReleasable()) LockSupport.park(); |
|
return true; |
|
} |
|
|
|
void forgetWaiter() { |
|
SWAITER.setOpaque(this, null); |
|
} |
|
|
|
|
|
private static final VarHandle SMATCH; |
|
private static final VarHandle SNEXT; |
|
private static final VarHandle SWAITER; |
|
static { |
|
try { |
|
MethodHandles.Lookup l = MethodHandles.lookup(); |
|
SMATCH = l.findVarHandle(SNode.class, "match", SNode.class); |
|
SNEXT = l.findVarHandle(SNode.class, "next", SNode.class); |
|
SWAITER = l.findVarHandle(SNode.class, "waiter", Thread.class); |
|
} catch (ReflectiveOperationException e) { |
|
throw new ExceptionInInitializerError(e); |
|
} |
|
} |
|
} |
|
|
|
|
|
volatile SNode head; |
|
|
|
boolean casHead(SNode h, SNode nh) { |
|
return h == head && |
|
SHEAD.compareAndSet(this, h, nh); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
static SNode snode(SNode s, Object e, SNode next, int mode) { |
|
if (s == null) s = new SNode(e); |
|
s.mode = mode; |
|
s.next = next; |
|
return s; |
|
} |
|
|
|
|
|
|
|
*/ |
|
@SuppressWarnings("unchecked") |
|
E transfer(E e, boolean timed, long nanos) { |
|
/* |
|
* Basic algorithm is to loop trying one of three actions: |
|
* |
|
* 1. If apparently empty or already containing nodes of same |
|
* mode, try to push node on stack and wait for a match, |
|
* returning it, or null if cancelled. |
|
* |
|
* 2. If apparently containing node of complementary mode, |
|
* try to push a fulfilling node on to stack, match |
|
* with corresponding waiting node, pop both from |
|
* stack, and return matched item. The matching or |
|
* unlinking might not actually be necessary because of |
|
* other threads performing action 3: |
|
* |
|
* 3. If top of stack already holds another fulfilling node, |
|
* help it out by doing its match and/or pop |
|
* operations, and then continue. The code for helping |
|
* is essentially the same as for fulfilling, except |
|
* that it doesn't return the item. |
|
*/ |
|
|
|
SNode s = null; |
|
int mode = (e == null) ? REQUEST : DATA; |
|
|
|
for (;;) { |
|
SNode h = head; |
|
if (h == null || h.mode == mode) { |
|
if (timed && nanos <= 0L) { |
|
if (h != null && h.isCancelled()) |
|
casHead(h, h.next); |
|
else |
|
return null; |
|
} else if (casHead(h, s = snode(s, e, h, mode))) { |
|
long deadline = timed ? System.nanoTime() + nanos : 0L; |
|
Thread w = Thread.currentThread(); |
|
int stat = -1; |
|
SNode m; |
|
while ((m = s.match) == null) { |
|
if ((timed && |
|
(nanos = deadline - System.nanoTime()) <= 0) || |
|
w.isInterrupted()) { |
|
if (s.tryCancel()) { |
|
clean(s); |
|
return null; |
|
} |
|
} else if ((m = s.match) != null) { |
|
break; |
|
} else if (stat <= 0) { |
|
if (stat < 0 && h == null && head == s) { |
|
stat = 0; |
|
Thread.yield(); |
|
} else { |
|
stat = 1; |
|
s.waiter = w; |
|
} |
|
} else if (!timed) { |
|
LockSupport.setCurrentBlocker(this); |
|
try { |
|
ForkJoinPool.managedBlock(s); |
|
} catch (InterruptedException cannotHappen) { } |
|
LockSupport.setCurrentBlocker(null); |
|
} else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD) |
|
LockSupport.parkNanos(this, nanos); |
|
} |
|
if (stat == 1) |
|
s.forgetWaiter(); |
|
Object result = (mode == REQUEST) ? m.item : s.item; |
|
if (h != null && h.next == s) |
|
casHead(h, s.next); |
|
return (E) result; |
|
} |
|
} else if (!isFulfilling(h.mode)) { |
|
if (h.isCancelled()) |
|
casHead(h, h.next); |
|
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { |
|
for (;;) { // loop until matched or waiters disappear |
|
SNode m = s.next; |
|
if (m == null) { // all waiters are gone |
|
casHead(s, null); |
|
s = null; |
|
break; |
|
} |
|
SNode mn = m.next; |
|
if (m.tryMatch(s)) { |
|
casHead(s, mn); |
|
return (E) ((mode == REQUEST) ? m.item : s.item); |
|
} else |
|
s.casNext(m, mn); |
|
} |
|
} |
|
} else { // help a fulfiller |
|
SNode m = h.next; |
|
if (m == null) |
|
casHead(h, null); |
|
else { |
|
SNode mn = m.next; |
|
if (m.tryMatch(h)) |
|
casHead(h, mn); |
|
else |
|
h.casNext(m, mn); |
|
} |
|
} |
|
} |
|
} |
|
|
|
|
|
|
|
*/ |
|
void clean(SNode s) { |
|
s.item = null; |
|
s.forgetWaiter(); |
|
|
|
/* |
|
* At worst we may need to traverse entire stack to unlink |
|
* s. If there are multiple concurrent calls to clean, we |
|
* might not see s if another thread has already removed |
|
* it. But we can stop when we see any node known to |
|
* follow s. We use s.next unless it too is cancelled, in |
|
* which case we try the node one past. We don't check any |
|
* further because we don't want to doubly traverse just to |
|
* find sentinel. |
|
*/ |
|
|
|
SNode past = s.next; |
|
if (past != null && past.isCancelled()) |
|
past = past.next; |
|
|
|
|
|
SNode p; |
|
while ((p = head) != null && p != past && p.isCancelled()) |
|
casHead(p, p.next); |
|
|
|
|
|
while (p != null && p != past) { |
|
SNode n = p.next; |
|
if (n != null && n.isCancelled()) |
|
p.casNext(n, n.next); |
|
else |
|
p = n; |
|
} |
|
} |
|
|
|
|
|
private static final VarHandle SHEAD; |
|
static { |
|
try { |
|
MethodHandles.Lookup l = MethodHandles.lookup(); |
|
SHEAD = l.findVarHandle(TransferStack.class, "head", SNode.class); |
|
} catch (ReflectiveOperationException e) { |
|
throw new ExceptionInInitializerError(e); |
|
} |
|
} |
|
} |
|
|
|
|
|
static final class TransferQueue<E> extends Transferer<E> { |
|
/* |
|
* This extends Scherer-Scott dual queue algorithm, differing, |
|
* among other ways, by using modes within nodes rather than |
|
* marked pointers. The algorithm is a little simpler than |
|
* that for stacks because fulfillers do not need explicit |
|
* nodes, and matching is done by CAS'ing QNode.item field |
|
* from non-null to null (for put) or vice versa (for take). |
|
*/ |
|
|
|
|
|
static final class QNode implements ForkJoinPool.ManagedBlocker { |
|
volatile QNode next; |
|
volatile Object item; |
|
volatile Thread waiter; |
|
final boolean isData; |
|
|
|
QNode(Object item, boolean isData) { |
|
this.item = item; |
|
this.isData = isData; |
|
} |
|
|
|
boolean casNext(QNode cmp, QNode val) { |
|
return next == cmp && |
|
QNEXT.compareAndSet(this, cmp, val); |
|
} |
|
|
|
boolean casItem(Object cmp, Object val) { |
|
return item == cmp && |
|
QITEM.compareAndSet(this, cmp, val); |
|
} |
|
|
|
|
|
|
|
*/ |
|
boolean tryCancel(Object cmp) { |
|
return QITEM.compareAndSet(this, cmp, this); |
|
} |
|
|
|
boolean isCancelled() { |
|
return item == this; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
boolean isOffList() { |
|
return next == this; |
|
} |
|
|
|
void forgetWaiter() { |
|
QWAITER.setOpaque(this, null); |
|
} |
|
|
|
boolean isFulfilled() { |
|
Object x; |
|
return isData == ((x = item) == null) || x == this; |
|
} |
|
|
|
public final boolean isReleasable() { |
|
Object x; |
|
return isData == ((x = item) == null) || x == this || |
|
Thread.currentThread().isInterrupted(); |
|
} |
|
|
|
public final boolean block() { |
|
while (!isReleasable()) LockSupport.park(); |
|
return true; |
|
} |
|
|
|
|
|
private static final VarHandle QITEM; |
|
private static final VarHandle QNEXT; |
|
private static final VarHandle QWAITER; |
|
static { |
|
try { |
|
MethodHandles.Lookup l = MethodHandles.lookup(); |
|
QITEM = l.findVarHandle(QNode.class, "item", Object.class); |
|
QNEXT = l.findVarHandle(QNode.class, "next", QNode.class); |
|
QWAITER = l.findVarHandle(QNode.class, "waiter", Thread.class); |
|
} catch (ReflectiveOperationException e) { |
|
throw new ExceptionInInitializerError(e); |
|
} |
|
} |
|
} |
|
|
|
|
|
transient volatile QNode head; |
|
|
|
transient volatile QNode tail; |
|
|
|
|
|
|
|
|
|
*/ |
|
transient volatile QNode cleanMe; |
|
|
|
TransferQueue() { |
|
QNode h = new QNode(null, false); |
|
head = h; |
|
tail = h; |
|
} |
|
|
|
|
|
|
|
|
|
*/ |
|
void advanceHead(QNode h, QNode nh) { |
|
if (h == head && |
|
QHEAD.compareAndSet(this, h, nh)) |
|
h.next = h; |
|
} |
|
|
|
|
|
|
|
*/ |
|
void advanceTail(QNode t, QNode nt) { |
|
if (tail == t) |
|
QTAIL.compareAndSet(this, t, nt); |
|
} |
|
|
|
|
|
|
|
*/ |
|
boolean casCleanMe(QNode cmp, QNode val) { |
|
return cleanMe == cmp && |
|
QCLEANME.compareAndSet(this, cmp, val); |
|
} |
|
|
|
|
|
|
|
*/ |
|
@SuppressWarnings("unchecked") |
|
E transfer(E e, boolean timed, long nanos) { |
|
/* Basic algorithm is to loop trying to take either of |
|
* two actions: |
|
* |
|
* 1. If queue apparently empty or holding same-mode nodes, |
|
* try to add node to queue of waiters, wait to be |
|
* fulfilled (or cancelled) and return matching item. |
|
* |
|
* 2. If queue apparently contains waiting items, and this |
|
* call is of complementary mode, try to fulfill by CAS'ing |
|
* item field of waiting node and dequeuing it, and then |
|
* returning matching item. |
|
* |
|
* In each case, along the way, check for and try to help |
|
* advance head and tail on behalf of other stalled/slow |
|
* threads. |
|
* |
|
* The loop starts off with a null check guarding against |
|
* seeing uninitialized head or tail values. This never |
|
* happens in current SynchronousQueue, but could if |
|
* callers held non-volatile/final ref to the |
|
* transferer. The check is here anyway because it places |
|
* null checks at top of loop, which is usually faster |
|
* than having them implicitly interspersed. |
|
*/ |
|
|
|
QNode s = null; |
|
boolean isData = (e != null); |
|
for (;;) { |
|
QNode t = tail, h = head, m, tn; |
|
if (t == null || h == null) |
|
; |
|
else if (h == t || t.isData == isData) { |
|
if (t != tail) |
|
; |
|
else if ((tn = t.next) != null) |
|
advanceTail(t, tn); |
|
else if (timed && nanos <= 0L) |
|
return null; |
|
else if (t.casNext(null, (s != null) ? s : |
|
(s = new QNode(e, isData)))) { |
|
advanceTail(t, s); |
|
long deadline = timed ? System.nanoTime() + nanos : 0L; |
|
Thread w = Thread.currentThread(); |
|
int stat = -1; |
|
Object item; |
|
while ((item = s.item) == e) { |
|
if ((timed && |
|
(nanos = deadline - System.nanoTime()) <= 0) || |
|
w.isInterrupted()) { |
|
if (s.tryCancel(e)) { |
|
clean(t, s); |
|
return null; |
|
} |
|
} else if ((item = s.item) != e) { |
|
break; |
|
} else if (stat <= 0) { |
|
if (t.next == s) { |
|
if (stat < 0 && t.isFulfilled()) { |
|
stat = 0; |
|
Thread.yield(); |
|
} |
|
else { |
|
stat = 1; |
|
s.waiter = w; |
|
} |
|
} |
|
} else if (!timed) { |
|
LockSupport.setCurrentBlocker(this); |
|
try { |
|
ForkJoinPool.managedBlock(s); |
|
} catch (InterruptedException cannotHappen) { } |
|
LockSupport.setCurrentBlocker(null); |
|
} |
|
else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD) |
|
LockSupport.parkNanos(this, nanos); |
|
} |
|
if (stat == 1) |
|
s.forgetWaiter(); |
|
if (!s.isOffList()) { // not already unlinked |
|
advanceHead(t, s); |
|
if (item != null) |
|
s.item = s; |
|
} |
|
return (item != null) ? (E)item : e; |
|
} |
|
|
|
} else if ((m = h.next) != null && t == tail && h == head) { |
|
Thread waiter; |
|
Object x = m.item; |
|
boolean fulfilled = ((isData == (x == null)) && |
|
x != m && m.casItem(x, e)); |
|
advanceHead(h, m); |
|
if (fulfilled) { |
|
if ((waiter = m.waiter) != null) |
|
LockSupport.unpark(waiter); |
|
return (x != null) ? (E)x : e; |
|
} |
|
} |
|
} |
|
} |
|
|
|
|
|
|
|
*/ |
|
void clean(QNode pred, QNode s) { |
|
s.forgetWaiter(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
while (pred.next == s) { |
|
QNode h = head; |
|
QNode hn = h.next; |
|
if (hn != null && hn.isCancelled()) { |
|
advanceHead(h, hn); |
|
continue; |
|
} |
|
QNode t = tail; |
|
if (t == h) |
|
return; |
|
QNode tn = t.next; |
|
if (t != tail) |
|
continue; |
|
if (tn != null) { |
|
advanceTail(t, tn); |
|
continue; |
|
} |
|
if (s != t) { |
|
QNode sn = s.next; |
|
if (sn == s || pred.casNext(s, sn)) |
|
return; |
|
} |
|
QNode dp = cleanMe; |
|
if (dp != null) { |
|
QNode d = dp.next; |
|
QNode dn; |
|
if (d == null || |
|
d == dp || |
|
!d.isCancelled() || |
|
(d != t && |
|
(dn = d.next) != null && |
|
dn != d && |
|
dp.casNext(d, dn))) |
|
casCleanMe(dp, null); |
|
if (dp == pred) |
|
return; |
|
} else if (casCleanMe(null, pred)) |
|
return; |
|
} |
|
} |
|
|
|
|
|
private static final VarHandle QHEAD; |
|
private static final VarHandle QTAIL; |
|
private static final VarHandle QCLEANME; |
|
static { |
|
try { |
|
MethodHandles.Lookup l = MethodHandles.lookup(); |
|
QHEAD = l.findVarHandle(TransferQueue.class, "head", |
|
QNode.class); |
|
QTAIL = l.findVarHandle(TransferQueue.class, "tail", |
|
QNode.class); |
|
QCLEANME = l.findVarHandle(TransferQueue.class, "cleanMe", |
|
QNode.class); |
|
} catch (ReflectiveOperationException e) { |
|
throw new ExceptionInInitializerError(e); |
|
} |
|
} |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
private transient volatile Transferer<E> transferer; |
|
|
|
|
|
|
|
*/ |
|
public SynchronousQueue() { |
|
this(false); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
public SynchronousQueue(boolean fair) { |
|
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
public void put(E e) throws InterruptedException { |
|
if (e == null) throw new NullPointerException(); |
|
if (transferer.transfer(e, false, 0) == null) { |
|
Thread.interrupted(); |
|
throw new InterruptedException(); |
|
} |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
public boolean offer(E e, long timeout, TimeUnit unit) |
|
throws InterruptedException { |
|
if (e == null) throw new NullPointerException(); |
|
if (transferer.transfer(e, true, unit.toNanos(timeout)) != null) |
|
return true; |
|
if (!Thread.interrupted()) |
|
return false; |
|
throw new InterruptedException(); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
public boolean offer(E e) { |
|
if (e == null) throw new NullPointerException(); |
|
return transferer.transfer(e, true, 0) != null; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
public E take() throws InterruptedException { |
|
E e = transferer.transfer(null, false, 0); |
|
if (e != null) |
|
return e; |
|
Thread.interrupted(); |
|
throw new InterruptedException(); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
public E poll(long timeout, TimeUnit unit) throws InterruptedException { |
|
E e = transferer.transfer(null, true, unit.toNanos(timeout)); |
|
if (e != null || !Thread.interrupted()) |
|
return e; |
|
throw new InterruptedException(); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
public E poll() { |
|
return transferer.transfer(null, true, 0); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
public boolean isEmpty() { |
|
return true; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
public int size() { |
|
return 0; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
public int remainingCapacity() { |
|
return 0; |
|
} |
|
|
|
|
|
|
|
|
|
*/ |
|
public void clear() { |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
public boolean contains(Object o) { |
|
return false; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
public boolean remove(Object o) { |
|
return false; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
public boolean containsAll(Collection<?> c) { |
|
return c.isEmpty(); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
public boolean removeAll(Collection<?> c) { |
|
return false; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
public boolean retainAll(Collection<?> c) { |
|
return false; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
public E peek() { |
|
return null; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
public Iterator<E> iterator() { |
|
return Collections.emptyIterator(); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
public Spliterator<E> spliterator() { |
|
return Spliterators.emptySpliterator(); |
|
} |
|
|
|
|
|
|
|
|
|
*/ |
|
public Object[] toArray() { |
|
return new Object[0]; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
public <T> T[] toArray(T[] a) { |
|
if (a.length > 0) |
|
a[0] = null; |
|
return a; |
|
} |
|
|
|
|
|
|
|
|
|
*/ |
|
public String toString() { |
|
return "[]"; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
public int drainTo(Collection<? super E> c) { |
|
Objects.requireNonNull(c); |
|
if (c == this) |
|
throw new IllegalArgumentException(); |
|
int n = 0; |
|
for (E e; (e = poll()) != null; n++) |
|
c.add(e); |
|
return n; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
public int drainTo(Collection<? super E> c, int maxElements) { |
|
Objects.requireNonNull(c); |
|
if (c == this) |
|
throw new IllegalArgumentException(); |
|
int n = 0; |
|
for (E e; n < maxElements && (e = poll()) != null; n++) |
|
c.add(e); |
|
return n; |
|
} |
|
|
|
/* |
|
* To cope with serialization strategy in the 1.5 version of |
|
* SynchronousQueue, we declare some unused classes and fields |
|
* that exist solely to enable serializability across versions. |
|
* These fields are never used, so are initialized only if this |
|
* object is ever serialized or deserialized. |
|
*/ |
|
|
|
@SuppressWarnings("serial") |
|
static class WaitQueue implements java.io.Serializable { } |
|
static class LifoWaitQueue extends WaitQueue { |
|
private static final long serialVersionUID = -3633113410248163686L; |
|
} |
|
static class FifoWaitQueue extends WaitQueue { |
|
private static final long serialVersionUID = -3623113410248163686L; |
|
} |
|
private ReentrantLock qlock; |
|
private WaitQueue waitingProducers; |
|
private WaitQueue waitingConsumers; |
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
private void writeObject(java.io.ObjectOutputStream s) |
|
throws java.io.IOException { |
|
boolean fair = transferer instanceof TransferQueue; |
|
if (fair) { |
|
qlock = new ReentrantLock(true); |
|
waitingProducers = new FifoWaitQueue(); |
|
waitingConsumers = new FifoWaitQueue(); |
|
} |
|
else { |
|
qlock = new ReentrantLock(); |
|
waitingProducers = new LifoWaitQueue(); |
|
waitingConsumers = new LifoWaitQueue(); |
|
} |
|
s.defaultWriteObject(); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
private void readObject(java.io.ObjectInputStream s) |
|
throws java.io.IOException, ClassNotFoundException { |
|
s.defaultReadObject(); |
|
if (waitingProducers instanceof FifoWaitQueue) |
|
transferer = new TransferQueue<E>(); |
|
else |
|
transferer = new TransferStack<E>(); |
|
} |
|
|
|
static { |
|
// Reduce the risk of rare disastrous classloading in first call to |
|
|
|
Class<?> ensureLoaded = LockSupport.class; |
|
} |
|
} |