|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  */ | 
|  |  | 
|  | /* | 
|  |  * This file is available under and governed by the GNU General Public | 
|  |  * License version 2 only, as published by the Free Software Foundation. | 
|  |  * However, the following notice accompanied the original version of this | 
|  |  * file: | 
|  |  * | 
|  |  * Written by Doug Lea, Bill Scherer, and Michael Scott with | 
|  |  * assistance from members of JCP JSR-166 Expert Group and released to | 
|  |  * the public domain, as explained at | 
|  |  * http://creativecommons.org/publicdomain/zero/1.0/ | 
|  |  */ | 
|  |  | 
|  | package java.util.concurrent; | 
|  |  | 
|  | import java.lang.invoke.MethodHandles; | 
|  | import java.lang.invoke.VarHandle; | 
|  | import java.util.AbstractQueue; | 
|  | import java.util.Collection; | 
|  | import java.util.Collections; | 
|  | import java.util.Iterator; | 
|  | import java.util.Objects; | 
|  | import java.util.Spliterator; | 
|  | import java.util.Spliterators; | 
|  | import java.util.concurrent.locks.LockSupport; | 
|  | import java.util.concurrent.locks.ReentrantLock; | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  */ | 
|  | public class SynchronousQueue<E> extends AbstractQueue<E> | 
|  |     implements BlockingQueue<E>, java.io.Serializable { | 
|  |     private static final long serialVersionUID = -3223113410248163686L; | 
|  |  | 
|  |     /* | 
|  |      * This class implements extensions of the dual stack and dual | 
|  |      * queue algorithms described in "Nonblocking Concurrent Objects | 
|  |      * with Condition Synchronization", by W. N. Scherer III and | 
|  |      * M. L. Scott.  18th Annual Conf. on Distributed Computing, | 
|  |      * Oct. 2004 (see also | 
|  |      * http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html). | 
|  |      * The (Lifo) stack is used for non-fair mode, and the (Fifo) | 
|  |      * queue for fair mode. The performance of the two is generally | 
|  |      * similar. Fifo usually supports higher throughput under | 
|  |      * contention but Lifo maintains higher thread locality in common | 
|  |      * applications. | 
|  |      * | 
|  |      * A dual queue (and similarly stack) is one that at any given | 
|  |      * time either holds "data" -- items provided by put operations, | 
|  |      * or "requests" -- slots representing take operations, or is | 
|  |      * empty. A call to "fulfill" (i.e., a call requesting an item | 
|  |      * from a queue holding data or vice versa) dequeues a | 
|  |      * complementary node.  The most interesting feature of these | 
|  |      * queues is that any operation can figure out which mode the | 
|  |      * queue is in, and act accordingly without needing locks. | 
|  |      * | 
|  |      * Both the queue and stack extend abstract class Transferer | 
|  |      * defining the single method transfer that does a put or a | 
|  |      * take. These are unified into a single method because in dual | 
|  |      * data structures, the put and take operations are symmetrical, | 
|  |      * so nearly all code can be combined. The resulting transfer | 
|  |      * methods are on the long side, but are easier to follow than | 
|  |      * they would be if broken up into nearly-duplicated parts. | 
|  |      * | 
|  |      * The queue and stack data structures share many conceptual | 
|  |      * similarities but very few concrete details. For simplicity, | 
|  |      * they are kept distinct so that they can later evolve | 
|  |      * separately. | 
|  |      * | 
|  |      * The algorithms here differ from the versions in the above paper | 
|  |      * in extending them for use in synchronous queues, as well as | 
|  |      * dealing with cancellation. The main differences include: | 
|  |      * | 
|  |      *  1. The original algorithms used bit-marked pointers, but | 
|  |      *     the ones here use mode bits in nodes, leading to a number | 
|  |      *     of further adaptations. | 
|  |      *  2. SynchronousQueues must block threads waiting to become | 
|  |      *     fulfilled. | 
|  |      *  3. Support for cancellation via timeout and interrupts, | 
|  |      *     including cleaning out cancelled nodes/threads | 
|  |      *     from lists to avoid garbage retention and memory depletion. | 
|  |      * | 
|  |      * Blocking is mainly accomplished using LockSupport park/unpark, | 
|  |      * except that nodes that appear to be the next ones to become | 
|  |      * fulfilled first spin a bit (on multiprocessors only). On very | 
|  |      * busy synchronous queues, spinning can dramatically improve | 
|  |      * throughput. And on less busy ones, the amount of spinning is | 
|  |      * small enough not to be noticeable. | 
|  |      * | 
|  |      * Cleaning is done in different ways in queues vs stacks.  For | 
|  |      * queues, we can almost always remove a node immediately in O(1) | 
|  |      * time (modulo retries for consistency checks) when it is | 
|  |      * cancelled. But if it may be pinned as the current tail, it must | 
|  |      * wait until some subsequent cancellation. For stacks, we need a | 
|  |      * potentially O(n) traversal to be sure that we can remove the | 
|  |      * node, but this can run concurrently with other threads | 
|  |      * accessing the stack. | 
|  |      * | 
|  |      * While garbage collection takes care of most node reclamation | 
|  |      * issues that otherwise complicate nonblocking algorithms, care | 
|  |      * is taken to "forget" references to data, other nodes, and | 
|  |      * threads that might be held on to long-term by blocked | 
|  |      * threads. In cases where setting to null would otherwise | 
|  |      * conflict with main algorithms, this is done by changing a | 
|  |      * node's link to now point to the node itself. This doesn't arise | 
|  |      * much for Stack nodes (because blocked threads do not hang on to | 
|  |      * old head pointers), but references in Queue nodes must be | 
|  |      * aggressively forgotten to avoid reachability of everything any | 
|  |      * node has ever referred to since arrival. | 
|  |      * | 
|  |      * The above steps improve throughput when many threads produce | 
|  |      * and/or consume data. But they don't help much with | 
|  |      * single-source / single-sink usages in which one side or the | 
|  |      * other is always transiently blocked, and so throughput is | 
|  |      * mainly a function of thread scheduling. This is not usually | 
|  |      * noticeably improved with bounded short spin-waits. Instead both | 
|  |      * forms of transfer try Thread.yield if apparently the sole | 
|  |      * waiter. This works well when there are more tasks that cores, | 
|  |      * which is expected to be the main usage context of this mode. In | 
|  |      * other cases, waiters may help with some bookkeeping, then | 
|  |      * park/unpark. | 
|  |      */ | 
|  |  | 
|  |      | 
|  |  | 
|  |      */ | 
|  |     abstract static class Transferer<E> { | 
|  |          | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |          */ | 
|  |         abstract E transfer(E e, boolean timed, long nanos); | 
|  |     } | 
|  |  | 
|  |      | 
|  |  | 
|  |  | 
|  |      */ | 
|  |     static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1023L; | 
|  |  | 
|  |      | 
|  |     static final class TransferStack<E> extends Transferer<E> { | 
|  |         /* | 
|  |          * This extends Scherer-Scott dual stack algorithm, differing, | 
|  |          * among other ways, by using "covering" nodes rather than | 
|  |          * bit-marked pointers: Fulfilling operations push on marker | 
|  |          * nodes (with FULFILLING bit set in mode) to reserve a spot | 
|  |          * to match a waiting node. | 
|  |          */ | 
|  |  | 
|  |         /* Modes for SNodes, ORed together in node fields */ | 
|  |          | 
|  |         static final int REQUEST    = 0; | 
|  |          | 
|  |         static final int DATA       = 1; | 
|  |          | 
|  |         static final int FULFILLING = 2; | 
|  |  | 
|  |          | 
|  |         static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; } | 
|  |  | 
|  |          | 
|  |         static final class SNode implements ForkJoinPool.ManagedBlocker { | 
|  |             volatile SNode next;         | 
|  |             volatile SNode match;        | 
|  |             volatile Thread waiter;      | 
|  |             Object item;                 | 
|  |             int mode; | 
|  |             // Note: item and mode fields don't need to be volatile | 
|  |             // since they are always written before, and read after, | 
|  |             // other volatile/atomic operations. | 
|  |  | 
|  |             SNode(Object item) { | 
|  |                 this.item = item; | 
|  |             } | 
|  |  | 
|  |             boolean casNext(SNode cmp, SNode val) { | 
|  |                 return cmp == next && | 
|  |                     SNEXT.compareAndSet(this, cmp, val); | 
|  |             } | 
|  |  | 
|  |              | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |              */ | 
|  |             boolean tryMatch(SNode s) { | 
|  |                 SNode m; Thread w; | 
|  |                 if ((m = match) == null) { | 
|  |                     if (SMATCH.compareAndSet(this, null, s)) { | 
|  |                         if ((w = waiter) != null) | 
|  |                             LockSupport.unpark(w); | 
|  |                         return true; | 
|  |                     } | 
|  |                     else | 
|  |                         m = match; | 
|  |                 } | 
|  |                 return m == s; | 
|  |             } | 
|  |  | 
|  |              | 
|  |  | 
|  |              */ | 
|  |             boolean tryCancel() { | 
|  |                 return SMATCH.compareAndSet(this, null, this); | 
|  |             } | 
|  |  | 
|  |             boolean isCancelled() { | 
|  |                 return match == this; | 
|  |             } | 
|  |  | 
|  |             public final boolean isReleasable() { | 
|  |                 return match != null || Thread.currentThread().isInterrupted(); | 
|  |             } | 
|  |  | 
|  |             public final boolean block() { | 
|  |                 while (!isReleasable()) LockSupport.park(); | 
|  |                 return true; | 
|  |             } | 
|  |  | 
|  |             void forgetWaiter() { | 
|  |                 SWAITER.setOpaque(this, null); | 
|  |             } | 
|  |  | 
|  |              | 
|  |             private static final VarHandle SMATCH; | 
|  |             private static final VarHandle SNEXT; | 
|  |             private static final VarHandle SWAITER; | 
|  |             static { | 
|  |                 try { | 
|  |                     MethodHandles.Lookup l = MethodHandles.lookup(); | 
|  |                     SMATCH = l.findVarHandle(SNode.class, "match", SNode.class); | 
|  |                     SNEXT = l.findVarHandle(SNode.class, "next", SNode.class); | 
|  |                     SWAITER = l.findVarHandle(SNode.class, "waiter", Thread.class); | 
|  |                 } catch (ReflectiveOperationException e) { | 
|  |                     throw new ExceptionInInitializerError(e); | 
|  |                 } | 
|  |             } | 
|  |         } | 
|  |  | 
|  |          | 
|  |         volatile SNode head; | 
|  |  | 
|  |         boolean casHead(SNode h, SNode nh) { | 
|  |             return h == head && | 
|  |                 SHEAD.compareAndSet(this, h, nh); | 
|  |         } | 
|  |  | 
|  |          | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |          */ | 
|  |         static SNode snode(SNode s, Object e, SNode next, int mode) { | 
|  |             if (s == null) s = new SNode(e); | 
|  |             s.mode = mode; | 
|  |             s.next = next; | 
|  |             return s; | 
|  |         } | 
|  |  | 
|  |          | 
|  |  | 
|  |          */ | 
|  |         @SuppressWarnings("unchecked") | 
|  |         E transfer(E e, boolean timed, long nanos) { | 
|  |             /* | 
|  |              * Basic algorithm is to loop trying one of three actions: | 
|  |              * | 
|  |              * 1. If apparently empty or already containing nodes of same | 
|  |              *    mode, try to push node on stack and wait for a match, | 
|  |              *    returning it, or null if cancelled. | 
|  |              * | 
|  |              * 2. If apparently containing node of complementary mode, | 
|  |              *    try to push a fulfilling node on to stack, match | 
|  |              *    with corresponding waiting node, pop both from | 
|  |              *    stack, and return matched item. The matching or | 
|  |              *    unlinking might not actually be necessary because of | 
|  |              *    other threads performing action 3: | 
|  |              * | 
|  |              * 3. If top of stack already holds another fulfilling node, | 
|  |              *    help it out by doing its match and/or pop | 
|  |              *    operations, and then continue. The code for helping | 
|  |              *    is essentially the same as for fulfilling, except | 
|  |              *    that it doesn't return the item. | 
|  |              */ | 
|  |  | 
|  |             SNode s = null;  | 
|  |             int mode = (e == null) ? REQUEST : DATA; | 
|  |  | 
|  |             for (;;) { | 
|  |                 SNode h = head; | 
|  |                 if (h == null || h.mode == mode) {   | 
|  |                     if (timed && nanos <= 0L) {      | 
|  |                         if (h != null && h.isCancelled()) | 
|  |                             casHead(h, h.next);      | 
|  |                         else | 
|  |                             return null; | 
|  |                     } else if (casHead(h, s = snode(s, e, h, mode))) { | 
|  |                         long deadline = timed ? System.nanoTime() + nanos : 0L; | 
|  |                         Thread w = Thread.currentThread(); | 
|  |                         int stat = -1;  | 
|  |                         SNode m;                     | 
|  |                         while ((m = s.match) == null) { | 
|  |                             if ((timed && | 
|  |                                  (nanos = deadline - System.nanoTime()) <= 0) || | 
|  |                                 w.isInterrupted()) { | 
|  |                                 if (s.tryCancel()) { | 
|  |                                     clean(s);        | 
|  |                                     return null; | 
|  |                                 } | 
|  |                             } else if ((m = s.match) != null) { | 
|  |                                 break;               | 
|  |                             } else if (stat <= 0) { | 
|  |                                 if (stat < 0 && h == null && head == s) { | 
|  |                                     stat = 0;        | 
|  |                                     Thread.yield(); | 
|  |                                 } else { | 
|  |                                     stat = 1; | 
|  |                                     s.waiter = w;    | 
|  |                                 } | 
|  |                             } else if (!timed) { | 
|  |                                 LockSupport.setCurrentBlocker(this); | 
|  |                                 try { | 
|  |                                     ForkJoinPool.managedBlock(s); | 
|  |                                 } catch (InterruptedException cannotHappen) { } | 
|  |                                 LockSupport.setCurrentBlocker(null); | 
|  |                             } else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD) | 
|  |                                 LockSupport.parkNanos(this, nanos); | 
|  |                         } | 
|  |                         if (stat == 1) | 
|  |                             s.forgetWaiter(); | 
|  |                         Object result = (mode == REQUEST) ? m.item : s.item; | 
|  |                         if (h != null && h.next == s) | 
|  |                             casHead(h, s.next);      | 
|  |                         return (E) result; | 
|  |                     } | 
|  |                 } else if (!isFulfilling(h.mode)) {  | 
|  |                     if (h.isCancelled())             | 
|  |                         casHead(h, h.next);          | 
|  |                     else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { | 
|  |                         for (;;) { // loop until matched or waiters disappear | 
|  |                             SNode m = s.next;        | 
|  |                             if (m == null) {        // all waiters are gone | 
|  |                                 casHead(s, null);    | 
|  |                                 s = null;            | 
|  |                                 break;               | 
|  |                             } | 
|  |                             SNode mn = m.next; | 
|  |                             if (m.tryMatch(s)) { | 
|  |                                 casHead(s, mn);      | 
|  |                                 return (E) ((mode == REQUEST) ? m.item : s.item); | 
|  |                             } else                   | 
|  |                                 s.casNext(m, mn);    | 
|  |                         } | 
|  |                     } | 
|  |                 } else {                            // help a fulfiller | 
|  |                     SNode m = h.next;                | 
|  |                     if (m == null)                   | 
|  |                         casHead(h, null);            | 
|  |                     else { | 
|  |                         SNode mn = m.next; | 
|  |                         if (m.tryMatch(h))           | 
|  |                             casHead(h, mn);          | 
|  |                         else                         | 
|  |                             h.casNext(m, mn);        | 
|  |                     } | 
|  |                 } | 
|  |             } | 
|  |         } | 
|  |  | 
|  |          | 
|  |  | 
|  |          */ | 
|  |         void clean(SNode s) { | 
|  |             s.item = null;    | 
|  |             s.forgetWaiter(); | 
|  |  | 
|  |             /* | 
|  |              * At worst we may need to traverse entire stack to unlink | 
|  |              * s. If there are multiple concurrent calls to clean, we | 
|  |              * might not see s if another thread has already removed | 
|  |              * it. But we can stop when we see any node known to | 
|  |              * follow s. We use s.next unless it too is cancelled, in | 
|  |              * which case we try the node one past. We don't check any | 
|  |              * further because we don't want to doubly traverse just to | 
|  |              * find sentinel. | 
|  |              */ | 
|  |  | 
|  |             SNode past = s.next; | 
|  |             if (past != null && past.isCancelled()) | 
|  |                 past = past.next; | 
|  |  | 
|  |              | 
|  |             SNode p; | 
|  |             while ((p = head) != null && p != past && p.isCancelled()) | 
|  |                 casHead(p, p.next); | 
|  |  | 
|  |              | 
|  |             while (p != null && p != past) { | 
|  |                 SNode n = p.next; | 
|  |                 if (n != null && n.isCancelled()) | 
|  |                     p.casNext(n, n.next); | 
|  |                 else | 
|  |                     p = n; | 
|  |             } | 
|  |         } | 
|  |  | 
|  |          | 
|  |         private static final VarHandle SHEAD; | 
|  |         static { | 
|  |             try { | 
|  |                 MethodHandles.Lookup l = MethodHandles.lookup(); | 
|  |                 SHEAD = l.findVarHandle(TransferStack.class, "head", SNode.class); | 
|  |             } catch (ReflectiveOperationException e) { | 
|  |                 throw new ExceptionInInitializerError(e); | 
|  |             } | 
|  |         } | 
|  |     } | 
|  |  | 
|  |      | 
|  |     static final class TransferQueue<E> extends Transferer<E> { | 
|  |         /* | 
|  |          * This extends Scherer-Scott dual queue algorithm, differing, | 
|  |          * among other ways, by using modes within nodes rather than | 
|  |          * marked pointers. The algorithm is a little simpler than | 
|  |          * that for stacks because fulfillers do not need explicit | 
|  |          * nodes, and matching is done by CAS'ing QNode.item field | 
|  |          * from non-null to null (for put) or vice versa (for take). | 
|  |          */ | 
|  |  | 
|  |          | 
|  |         static final class QNode implements ForkJoinPool.ManagedBlocker { | 
|  |             volatile QNode next;           | 
|  |             volatile Object item;          | 
|  |             volatile Thread waiter;        | 
|  |             final boolean isData; | 
|  |  | 
|  |             QNode(Object item, boolean isData) { | 
|  |                 this.item = item; | 
|  |                 this.isData = isData; | 
|  |             } | 
|  |  | 
|  |             boolean casNext(QNode cmp, QNode val) { | 
|  |                 return next == cmp && | 
|  |                     QNEXT.compareAndSet(this, cmp, val); | 
|  |             } | 
|  |  | 
|  |             boolean casItem(Object cmp, Object val) { | 
|  |                 return item == cmp && | 
|  |                     QITEM.compareAndSet(this, cmp, val); | 
|  |             } | 
|  |  | 
|  |              | 
|  |  | 
|  |              */ | 
|  |             boolean tryCancel(Object cmp) { | 
|  |                 return QITEM.compareAndSet(this, cmp, this); | 
|  |             } | 
|  |  | 
|  |             boolean isCancelled() { | 
|  |                 return item == this; | 
|  |             } | 
|  |  | 
|  |              | 
|  |  | 
|  |  | 
|  |  | 
|  |              */ | 
|  |             boolean isOffList() { | 
|  |                 return next == this; | 
|  |             } | 
|  |  | 
|  |             void forgetWaiter() { | 
|  |                 QWAITER.setOpaque(this, null); | 
|  |             } | 
|  |  | 
|  |             boolean isFulfilled() { | 
|  |                 Object x; | 
|  |                 return isData == ((x = item) == null) || x == this; | 
|  |             } | 
|  |  | 
|  |             public final boolean isReleasable() { | 
|  |                 Object x; | 
|  |                 return isData == ((x = item) == null) || x == this || | 
|  |                     Thread.currentThread().isInterrupted(); | 
|  |             } | 
|  |  | 
|  |             public final boolean block() { | 
|  |                 while (!isReleasable()) LockSupport.park(); | 
|  |                 return true; | 
|  |             } | 
|  |  | 
|  |              | 
|  |             private static final VarHandle QITEM; | 
|  |             private static final VarHandle QNEXT; | 
|  |             private static final VarHandle QWAITER; | 
|  |             static { | 
|  |                 try { | 
|  |                     MethodHandles.Lookup l = MethodHandles.lookup(); | 
|  |                     QITEM = l.findVarHandle(QNode.class, "item", Object.class); | 
|  |                     QNEXT = l.findVarHandle(QNode.class, "next", QNode.class); | 
|  |                     QWAITER = l.findVarHandle(QNode.class, "waiter", Thread.class); | 
|  |                 } catch (ReflectiveOperationException e) { | 
|  |                     throw new ExceptionInInitializerError(e); | 
|  |                 } | 
|  |             } | 
|  |         } | 
|  |  | 
|  |          | 
|  |         transient volatile QNode head; | 
|  |          | 
|  |         transient volatile QNode tail; | 
|  |          | 
|  |  | 
|  |  | 
|  |  | 
|  |          */ | 
|  |         transient volatile QNode cleanMe; | 
|  |  | 
|  |         TransferQueue() { | 
|  |             QNode h = new QNode(null, false);  | 
|  |             head = h; | 
|  |             tail = h; | 
|  |         } | 
|  |  | 
|  |          | 
|  |  | 
|  |  | 
|  |          */ | 
|  |         void advanceHead(QNode h, QNode nh) { | 
|  |             if (h == head && | 
|  |                 QHEAD.compareAndSet(this, h, nh)) | 
|  |                 h.next = h;  | 
|  |         } | 
|  |  | 
|  |          | 
|  |  | 
|  |          */ | 
|  |         void advanceTail(QNode t, QNode nt) { | 
|  |             if (tail == t) | 
|  |                 QTAIL.compareAndSet(this, t, nt); | 
|  |         } | 
|  |  | 
|  |          | 
|  |  | 
|  |          */ | 
|  |         boolean casCleanMe(QNode cmp, QNode val) { | 
|  |             return cleanMe == cmp && | 
|  |                 QCLEANME.compareAndSet(this, cmp, val); | 
|  |         } | 
|  |  | 
|  |          | 
|  |  | 
|  |          */ | 
|  |         @SuppressWarnings("unchecked") | 
|  |         E transfer(E e, boolean timed, long nanos) { | 
|  |             /* Basic algorithm is to loop trying to take either of | 
|  |              * two actions: | 
|  |              * | 
|  |              * 1. If queue apparently empty or holding same-mode nodes, | 
|  |              *    try to add node to queue of waiters, wait to be | 
|  |              *    fulfilled (or cancelled) and return matching item. | 
|  |              * | 
|  |              * 2. If queue apparently contains waiting items, and this | 
|  |              *    call is of complementary mode, try to fulfill by CAS'ing | 
|  |              *    item field of waiting node and dequeuing it, and then | 
|  |              *    returning matching item. | 
|  |              * | 
|  |              * In each case, along the way, check for and try to help | 
|  |              * advance head and tail on behalf of other stalled/slow | 
|  |              * threads. | 
|  |              * | 
|  |              * The loop starts off with a null check guarding against | 
|  |              * seeing uninitialized head or tail values. This never | 
|  |              * happens in current SynchronousQueue, but could if | 
|  |              * callers held non-volatile/final ref to the | 
|  |              * transferer. The check is here anyway because it places | 
|  |              * null checks at top of loop, which is usually faster | 
|  |              * than having them implicitly interspersed. | 
|  |              */ | 
|  |  | 
|  |             QNode s = null;                   | 
|  |             boolean isData = (e != null); | 
|  |             for (;;) { | 
|  |                 QNode t = tail, h = head, m, tn;          | 
|  |                 if (t == null || h == null) | 
|  |                     ;                                     | 
|  |                 else if (h == t || t.isData == isData) {  | 
|  |                     if (t != tail)                        | 
|  |                         ; | 
|  |                     else if ((tn = t.next) != null)       | 
|  |                         advanceTail(t, tn); | 
|  |                     else if (timed && nanos <= 0L)        | 
|  |                         return null; | 
|  |                     else if (t.casNext(null, (s != null) ? s : | 
|  |                                        (s = new QNode(e, isData)))) { | 
|  |                         advanceTail(t, s); | 
|  |                         long deadline = timed ? System.nanoTime() + nanos : 0L; | 
|  |                         Thread w = Thread.currentThread(); | 
|  |                         int stat = -1;  | 
|  |                         Object item; | 
|  |                         while ((item = s.item) == e) { | 
|  |                             if ((timed && | 
|  |                                  (nanos = deadline - System.nanoTime()) <= 0) || | 
|  |                                 w.isInterrupted()) { | 
|  |                                 if (s.tryCancel(e)) { | 
|  |                                     clean(t, s); | 
|  |                                     return null; | 
|  |                                 } | 
|  |                             } else if ((item = s.item) != e) { | 
|  |                                 break;                    | 
|  |                             } else if (stat <= 0) { | 
|  |                                 if (t.next == s) { | 
|  |                                     if (stat < 0 && t.isFulfilled()) { | 
|  |                                         stat = 0;         | 
|  |                                         Thread.yield(); | 
|  |                                     } | 
|  |                                     else { | 
|  |                                         stat = 1; | 
|  |                                         s.waiter = w; | 
|  |                                     } | 
|  |                                 } | 
|  |                             } else if (!timed) { | 
|  |                                 LockSupport.setCurrentBlocker(this); | 
|  |                                 try { | 
|  |                                     ForkJoinPool.managedBlock(s); | 
|  |                                 } catch (InterruptedException cannotHappen) { } | 
|  |                                 LockSupport.setCurrentBlocker(null); | 
|  |                             } | 
|  |                             else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD) | 
|  |                                 LockSupport.parkNanos(this, nanos); | 
|  |                         } | 
|  |                         if (stat == 1) | 
|  |                             s.forgetWaiter(); | 
|  |                         if (!s.isOffList()) {            // not already unlinked | 
|  |                             advanceHead(t, s);            | 
|  |                             if (item != null)             | 
|  |                                 s.item = s; | 
|  |                         } | 
|  |                         return (item != null) ? (E)item : e; | 
|  |                     } | 
|  |  | 
|  |                 } else if ((m = h.next) != null && t == tail && h == head) { | 
|  |                     Thread waiter; | 
|  |                     Object x = m.item; | 
|  |                     boolean fulfilled = ((isData == (x == null)) && | 
|  |                                          x != m && m.casItem(x, e)); | 
|  |                     advanceHead(h, m);                     | 
|  |                     if (fulfilled) { | 
|  |                         if ((waiter = m.waiter) != null) | 
|  |                             LockSupport.unpark(waiter); | 
|  |                         return (x != null) ? (E)x : e; | 
|  |                     } | 
|  |                 } | 
|  |             } | 
|  |         } | 
|  |  | 
|  |          | 
|  |  | 
|  |          */ | 
|  |         void clean(QNode pred, QNode s) { | 
|  |             s.forgetWaiter(); | 
|  |              | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |              */ | 
|  |             while (pred.next == s) {  | 
|  |                 QNode h = head; | 
|  |                 QNode hn = h.next;    | 
|  |                 if (hn != null && hn.isCancelled()) { | 
|  |                     advanceHead(h, hn); | 
|  |                     continue; | 
|  |                 } | 
|  |                 QNode t = tail;       | 
|  |                 if (t == h) | 
|  |                     return; | 
|  |                 QNode tn = t.next; | 
|  |                 if (t != tail) | 
|  |                     continue; | 
|  |                 if (tn != null) { | 
|  |                     advanceTail(t, tn); | 
|  |                     continue; | 
|  |                 } | 
|  |                 if (s != t) {         | 
|  |                     QNode sn = s.next; | 
|  |                     if (sn == s || pred.casNext(s, sn)) | 
|  |                         return; | 
|  |                 } | 
|  |                 QNode dp = cleanMe; | 
|  |                 if (dp != null) {     | 
|  |                     QNode d = dp.next; | 
|  |                     QNode dn; | 
|  |                     if (d == null ||                | 
|  |                         d == dp ||                  | 
|  |                         !d.isCancelled() ||         | 
|  |                         (d != t &&                  | 
|  |                          (dn = d.next) != null &&   | 
|  |                          dn != d &&                 | 
|  |                          dp.casNext(d, dn)))        | 
|  |                         casCleanMe(dp, null); | 
|  |                     if (dp == pred) | 
|  |                         return;       | 
|  |                 } else if (casCleanMe(null, pred)) | 
|  |                     return;           | 
|  |             } | 
|  |         } | 
|  |  | 
|  |          | 
|  |         private static final VarHandle QHEAD; | 
|  |         private static final VarHandle QTAIL; | 
|  |         private static final VarHandle QCLEANME; | 
|  |         static { | 
|  |             try { | 
|  |                 MethodHandles.Lookup l = MethodHandles.lookup(); | 
|  |                 QHEAD = l.findVarHandle(TransferQueue.class, "head", | 
|  |                                         QNode.class); | 
|  |                 QTAIL = l.findVarHandle(TransferQueue.class, "tail", | 
|  |                                         QNode.class); | 
|  |                 QCLEANME = l.findVarHandle(TransferQueue.class, "cleanMe", | 
|  |                                            QNode.class); | 
|  |             } catch (ReflectiveOperationException e) { | 
|  |                 throw new ExceptionInInitializerError(e); | 
|  |             } | 
|  |         } | 
|  |     } | 
|  |  | 
|  |      | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |      */ | 
|  |     private transient volatile Transferer<E> transferer; | 
|  |  | 
|  |      | 
|  |  | 
|  |      */ | 
|  |     public SynchronousQueue() { | 
|  |         this(false); | 
|  |     } | 
|  |  | 
|  |      | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |      */ | 
|  |     public SynchronousQueue(boolean fair) { | 
|  |         transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); | 
|  |     } | 
|  |  | 
|  |      | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |      */ | 
|  |     public void put(E e) throws InterruptedException { | 
|  |         if (e == null) throw new NullPointerException(); | 
|  |         if (transferer.transfer(e, false, 0) == null) { | 
|  |             Thread.interrupted(); | 
|  |             throw new InterruptedException(); | 
|  |         } | 
|  |     } | 
|  |  | 
|  |      | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |      */ | 
|  |     public boolean offer(E e, long timeout, TimeUnit unit) | 
|  |         throws InterruptedException { | 
|  |         if (e == null) throw new NullPointerException(); | 
|  |         if (transferer.transfer(e, true, unit.toNanos(timeout)) != null) | 
|  |             return true; | 
|  |         if (!Thread.interrupted()) | 
|  |             return false; | 
|  |         throw new InterruptedException(); | 
|  |     } | 
|  |  | 
|  |      | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |      */ | 
|  |     public boolean offer(E e) { | 
|  |         if (e == null) throw new NullPointerException(); | 
|  |         return transferer.transfer(e, true, 0) != null; | 
|  |     } | 
|  |  | 
|  |      | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |      */ | 
|  |     public E take() throws InterruptedException { | 
|  |         E e = transferer.transfer(null, false, 0); | 
|  |         if (e != null) | 
|  |             return e; | 
|  |         Thread.interrupted(); | 
|  |         throw new InterruptedException(); | 
|  |     } | 
|  |  | 
|  |      | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |      */ | 
|  |     public E poll(long timeout, TimeUnit unit) throws InterruptedException { | 
|  |         E e = transferer.transfer(null, true, unit.toNanos(timeout)); | 
|  |         if (e != null || !Thread.interrupted()) | 
|  |             return e; | 
|  |         throw new InterruptedException(); | 
|  |     } | 
|  |  | 
|  |      | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |      */ | 
|  |     public E poll() { | 
|  |         return transferer.transfer(null, true, 0); | 
|  |     } | 
|  |  | 
|  |      | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |      */ | 
|  |     public boolean isEmpty() { | 
|  |         return true; | 
|  |     } | 
|  |  | 
|  |      | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |      */ | 
|  |     public int size() { | 
|  |         return 0; | 
|  |     } | 
|  |  | 
|  |      | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |      */ | 
|  |     public int remainingCapacity() { | 
|  |         return 0; | 
|  |     } | 
|  |  | 
|  |      | 
|  |  | 
|  |  | 
|  |      */ | 
|  |     public void clear() { | 
|  |     } | 
|  |  | 
|  |      | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |      */ | 
|  |     public boolean contains(Object o) { | 
|  |         return false; | 
|  |     } | 
|  |  | 
|  |      | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |      */ | 
|  |     public boolean remove(Object o) { | 
|  |         return false; | 
|  |     } | 
|  |  | 
|  |      | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |      */ | 
|  |     public boolean containsAll(Collection<?> c) { | 
|  |         return c.isEmpty(); | 
|  |     } | 
|  |  | 
|  |      | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |      */ | 
|  |     public boolean removeAll(Collection<?> c) { | 
|  |         return false; | 
|  |     } | 
|  |  | 
|  |      | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |      */ | 
|  |     public boolean retainAll(Collection<?> c) { | 
|  |         return false; | 
|  |     } | 
|  |  | 
|  |      | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |      */ | 
|  |     public E peek() { | 
|  |         return null; | 
|  |     } | 
|  |  | 
|  |      | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |      */ | 
|  |     public Iterator<E> iterator() { | 
|  |         return Collections.emptyIterator(); | 
|  |     } | 
|  |  | 
|  |      | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |      */ | 
|  |     public Spliterator<E> spliterator() { | 
|  |         return Spliterators.emptySpliterator(); | 
|  |     } | 
|  |  | 
|  |      | 
|  |  | 
|  |  | 
|  |      */ | 
|  |     public Object[] toArray() { | 
|  |         return new Object[0]; | 
|  |     } | 
|  |  | 
|  |      | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |      */ | 
|  |     public <T> T[] toArray(T[] a) { | 
|  |         if (a.length > 0) | 
|  |             a[0] = null; | 
|  |         return a; | 
|  |     } | 
|  |  | 
|  |      | 
|  |  | 
|  |  | 
|  |      */ | 
|  |     public String toString() { | 
|  |         return "[]"; | 
|  |     } | 
|  |  | 
|  |      | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |      */ | 
|  |     public int drainTo(Collection<? super E> c) { | 
|  |         Objects.requireNonNull(c); | 
|  |         if (c == this) | 
|  |             throw new IllegalArgumentException(); | 
|  |         int n = 0; | 
|  |         for (E e; (e = poll()) != null; n++) | 
|  |             c.add(e); | 
|  |         return n; | 
|  |     } | 
|  |  | 
|  |      | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |      */ | 
|  |     public int drainTo(Collection<? super E> c, int maxElements) { | 
|  |         Objects.requireNonNull(c); | 
|  |         if (c == this) | 
|  |             throw new IllegalArgumentException(); | 
|  |         int n = 0; | 
|  |         for (E e; n < maxElements && (e = poll()) != null; n++) | 
|  |             c.add(e); | 
|  |         return n; | 
|  |     } | 
|  |  | 
|  |     /* | 
|  |      * To cope with serialization strategy in the 1.5 version of | 
|  |      * SynchronousQueue, we declare some unused classes and fields | 
|  |      * that exist solely to enable serializability across versions. | 
|  |      * These fields are never used, so are initialized only if this | 
|  |      * object is ever serialized or deserialized. | 
|  |      */ | 
|  |  | 
|  |     @SuppressWarnings("serial") | 
|  |     static class WaitQueue implements java.io.Serializable { } | 
|  |     static class LifoWaitQueue extends WaitQueue { | 
|  |         private static final long serialVersionUID = -3633113410248163686L; | 
|  |     } | 
|  |     static class FifoWaitQueue extends WaitQueue { | 
|  |         private static final long serialVersionUID = -3623113410248163686L; | 
|  |     } | 
|  |     private ReentrantLock qlock; | 
|  |     private WaitQueue waitingProducers; | 
|  |     private WaitQueue waitingConsumers; | 
|  |  | 
|  |      | 
|  |  | 
|  |  | 
|  |  | 
|  |      */ | 
|  |     private void writeObject(java.io.ObjectOutputStream s) | 
|  |         throws java.io.IOException { | 
|  |         boolean fair = transferer instanceof TransferQueue; | 
|  |         if (fair) { | 
|  |             qlock = new ReentrantLock(true); | 
|  |             waitingProducers = new FifoWaitQueue(); | 
|  |             waitingConsumers = new FifoWaitQueue(); | 
|  |         } | 
|  |         else { | 
|  |             qlock = new ReentrantLock(); | 
|  |             waitingProducers = new LifoWaitQueue(); | 
|  |             waitingConsumers = new LifoWaitQueue(); | 
|  |         } | 
|  |         s.defaultWriteObject(); | 
|  |     } | 
|  |  | 
|  |      | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |      */ | 
|  |     private void readObject(java.io.ObjectInputStream s) | 
|  |         throws java.io.IOException, ClassNotFoundException { | 
|  |         s.defaultReadObject(); | 
|  |         if (waitingProducers instanceof FifoWaitQueue) | 
|  |             transferer = new TransferQueue<E>(); | 
|  |         else | 
|  |             transferer = new TransferStack<E>(); | 
|  |     } | 
|  |  | 
|  |     static { | 
|  |         // Reduce the risk of rare disastrous classloading in first call to | 
|  |          | 
|  |         Class<?> ensureLoaded = LockSupport.class; | 
|  |     } | 
|  | } |