1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea, Bill Scherer, and Michael Scott with
  32  * assistance from members of JCP JSR-166 Expert Group and released to
  33  * the public domain, as explained at
  34  * http://creativecommons.org/publicdomain/zero/1.0/
  35  */
  36 
  37 package java.util.concurrent;
  38 
  39 import java.lang.invoke.MethodHandles;
  40 import java.lang.invoke.VarHandle;
  41 import java.util.AbstractQueue;
  42 import java.util.Collection;
  43 import java.util.Collections;
  44 import java.util.Iterator;
  45 import java.util.Objects;
  46 import java.util.Spliterator;
  47 import java.util.Spliterators;
  48 import java.util.concurrent.locks.LockSupport;
  49 import java.util.concurrent.locks.ReentrantLock;
  50 
  51 import jdk.internal.misc.Strands;
  52 
  53 /**
  54  * A {@linkplain BlockingQueue blocking queue} in which each insert
  55  * operation must wait for a corresponding remove operation by another
  56  * thread, and vice versa.  A synchronous queue does not have any
  57  * internal capacity, not even a capacity of one.  You cannot
  58  * {@code peek} at a synchronous queue because an element is only
  59  * present when you try to remove it; you cannot insert an element
  60  * (using any method) unless another thread is trying to remove it;
  61  * you cannot iterate as there is nothing to iterate.  The
  62  * <em>head</em> of the queue is the element that the first queued
  63  * inserting thread is trying to add to the queue; if there is no such
  64  * queued thread then no element is available for removal and
  65  * {@code poll()} will return {@code null}.  For purposes of other
  66  * {@code Collection} methods (for example {@code contains}), a
  67  * {@code SynchronousQueue} acts as an empty collection.  This queue
  68  * does not permit {@code null} elements.
  69  *
  70  * <p>Synchronous queues are similar to rendezvous channels used in
  71  * CSP and Ada. They are well suited for handoff designs, in which an
  72  * object running in one thread must sync up with an object running
  73  * in another thread in order to hand it some information, event, or
  74  * task.
  75  *
  76  * <p>This class supports an optional fairness policy for ordering
  77  * waiting producer and consumer threads.  By default, this ordering
  78  * is not guaranteed. However, a queue constructed with fairness set
  79  * to {@code true} grants threads access in FIFO order.
  80  *
  81  * <p>This class and its iterator implement all of the <em>optional</em>
  82  * methods of the {@link Collection} and {@link Iterator} interfaces.
  83  *
  84  * <p>This class is a member of the
  85  * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework">
  86  * Java Collections Framework</a>.
  87  *
  88  * @since 1.5
  89  * @author Doug Lea and Bill Scherer and Michael Scott
  90  * @param <E> the type of elements held in this queue
  91  */
  92 public class SynchronousQueue<E> extends AbstractQueue<E>
  93     implements BlockingQueue<E>, java.io.Serializable {
  94     private static final long serialVersionUID = -3223113410248163686L;
  95 
  96     /*
  97      * This class implements extensions of the dual stack and dual
  98      * queue algorithms described in "Nonblocking Concurrent Objects
  99      * with Condition Synchronization", by W. N. Scherer III and
 100      * M. L. Scott.  18th Annual Conf. on Distributed Computing,
 101      * Oct. 2004 (see also
 102      * http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html).
 103      * The (Lifo) stack is used for non-fair mode, and the (Fifo)
 104      * queue for fair mode. The performance of the two is generally
 105      * similar. Fifo usually supports higher throughput under
 106      * contention but Lifo maintains higher thread locality in common
 107      * applications.
 108      *
 109      * A dual queue (and similarly stack) is one that at any given
 110      * time either holds "data" -- items provided by put operations,
 111      * or "requests" -- slots representing take operations, or is
 112      * empty. A call to "fulfill" (i.e., a call requesting an item
 113      * from a queue holding data or vice versa) dequeues a
 114      * complementary node.  The most interesting feature of these
 115      * queues is that any operation can figure out which mode the
 116      * queue is in, and act accordingly without needing locks.
 117      *
 118      * Both the queue and stack extend abstract class Transferer
 119      * defining the single method transfer that does a put or a
 120      * take. These are unified into a single method because in dual
 121      * data structures, the put and take operations are symmetrical,
 122      * so nearly all code can be combined. The resulting transfer
 123      * methods are on the long side, but are easier to follow than
 124      * they would be if broken up into nearly-duplicated parts.
 125      *
 126      * The queue and stack data structures share many conceptual
 127      * similarities but very few concrete details. For simplicity,
 128      * they are kept distinct so that they can later evolve
 129      * separately.
 130      *
 131      * The algorithms here differ from the versions in the above paper
 132      * in extending them for use in synchronous queues, as well as
 133      * dealing with cancellation. The main differences include:
 134      *
 135      *  1. The original algorithms used bit-marked pointers, but
 136      *     the ones here use mode bits in nodes, leading to a number
 137      *     of further adaptations.
 138      *  2. SynchronousQueues must block threads waiting to become
 139      *     fulfilled.
 140      *  3. Support for cancellation via timeout and interrupts,
 141      *     including cleaning out cancelled nodes/threads
 142      *     from lists to avoid garbage retention and memory depletion.
 143      *
 144      * Blocking is mainly accomplished using LockSupport park/unpark,
 145      * except that nodes that appear to be the next ones to become
 146      * fulfilled first spin a bit (on multiprocessors only). On very
 147      * busy synchronous queues, spinning can dramatically improve
 148      * throughput. And on less busy ones, the amount of spinning is
 149      * small enough not to be noticeable.
 150      *
 151      * Cleaning is done in different ways in queues vs stacks.  For
 152      * queues, we can almost always remove a node immediately in O(1)
 153      * time (modulo retries for consistency checks) when it is
 154      * cancelled. But if it may be pinned as the current tail, it must
 155      * wait until some subsequent cancellation. For stacks, we need a
 156      * potentially O(n) traversal to be sure that we can remove the
 157      * node, but this can run concurrently with other threads
 158      * accessing the stack.
 159      *
 160      * While garbage collection takes care of most node reclamation
 161      * issues that otherwise complicate nonblocking algorithms, care
 162      * is taken to "forget" references to data, other nodes, and
 163      * threads that might be held on to long-term by blocked
 164      * threads. In cases where setting to null would otherwise
 165      * conflict with main algorithms, this is done by changing a
 166      * node's link to now point to the node itself. This doesn't arise
 167      * much for Stack nodes (because blocked threads do not hang on to
 168      * old head pointers), but references in Queue nodes must be
 169      * aggressively forgotten to avoid reachability of everything any
 170      * node has ever referred to since arrival.
 171      */
 172 
 173     /**
 174      * Shared internal API for dual stacks and queues.
 175      */
 176     abstract static class Transferer<E> {
 177         /**
 178          * Performs a put or take.
 179          *
 180          * @param e if non-null, the item to be handed to a consumer;
 181          *          if null, requests that transfer return an item
 182          *          offered by producer.
 183          * @param timed if this operation should timeout
 184          * @param nanos the timeout, in nanoseconds
 185          * @return if non-null, the item provided or received; if null,
 186          *         the operation failed due to timeout or interrupt --
 187          *         the caller can distinguish which of these occurred
 188          *         by checking Thread.interrupted.
 189          */
 190         abstract E transfer(E e, boolean timed, long nanos);
 191     }
 192 
 193     /**
 194      * The number of times to spin before blocking in timed waits.
 195      * The value is empirically derived -- it works well across a
 196      * variety of processors and OSes. Empirically, the best value
 197      * seems not to vary with number of CPUs (beyond 2) so is just
 198      * a constant.
 199      */
 200     static final int MAX_TIMED_SPINS =
 201         (Runtime.getRuntime().availableProcessors() < 2) ? 0 : 32;
 202 
 203     /**
 204      * The number of times to spin before blocking in untimed waits.
 205      * This is greater than timed value because untimed waits spin
 206      * faster since they don't need to check times on each spin.
 207      */
 208     static final int MAX_UNTIMED_SPINS = MAX_TIMED_SPINS * 16;
 209 
 210     /**
 211      * The number of nanoseconds for which it is faster to spin
 212      * rather than to use timed park. A rough estimate suffices.
 213      */
 214     static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L;
 215 
 216     /** Dual stack */
 217     static final class TransferStack<E> extends Transferer<E> {
 218         /*
 219          * This extends Scherer-Scott dual stack algorithm, differing,
 220          * among other ways, by using "covering" nodes rather than
 221          * bit-marked pointers: Fulfilling operations push on marker
 222          * nodes (with FULFILLING bit set in mode) to reserve a spot
 223          * to match a waiting node.
 224          */
 225 
 226         /* Modes for SNodes, ORed together in node fields */
 227         /** Node represents an unfulfilled consumer */
 228         static final int REQUEST    = 0;
 229         /** Node represents an unfulfilled producer */
 230         static final int DATA       = 1;
 231         /** Node is fulfilling another unfulfilled DATA or REQUEST */
 232         static final int FULFILLING = 2;
 233 
 234         /** Returns true if m has fulfilling bit set. */
 235         static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
 236 
 237         /** Node class for TransferStacks. */
 238         static final class SNode {
 239             volatile SNode next;        // next node in stack
 240             volatile SNode match;       // the node matched to this
 241             volatile Object waiter;     // to control park/unpark
 242             Object item;                // data; or null for REQUESTs
 243             int mode;
 244             // Note: item and mode fields don't need to be volatile
 245             // since they are always written before, and read after,
 246             // other volatile/atomic operations.
 247 
 248             SNode(Object item) {
 249                 this.item = item;
 250             }
 251 
 252             boolean casNext(SNode cmp, SNode val) {
 253                 return cmp == next &&
 254                     SNEXT.compareAndSet(this, cmp, val);
 255             }
 256 
 257             /**
 258              * Tries to match node s to this node, if so, waking up thread.
 259              * Fulfillers call tryMatch to identify their waiters.
 260              * Waiters block until they have been matched.
 261              *
 262              * @param s the node to match
 263              * @return true if successfully matched to s
 264              */
 265             boolean tryMatch(SNode s) {
 266                 if (match == null &&
 267                     SMATCH.compareAndSet(this, null, s)) {
 268                     Object w = waiter;
 269                     if (w != null) {    // waiters need at most one unpark
 270                         waiter = null;
 271                         LockSupport.unpark(w);
 272                     }
 273                     return true;
 274                 }
 275                 return match == s;
 276             }
 277 
 278             /**
 279              * Tries to cancel a wait by matching node to itself.
 280              */
 281             void tryCancel() {
 282                 SMATCH.compareAndSet(this, null, this);
 283             }
 284 
 285             boolean isCancelled() {
 286                 return match == this;
 287             }
 288 
 289             // VarHandle mechanics
 290             private static final VarHandle SMATCH;
 291             private static final VarHandle SNEXT;
 292             static {
 293                 try {
 294                     MethodHandles.Lookup l = MethodHandles.lookup();
 295                     SMATCH = l.findVarHandle(SNode.class, "match", SNode.class);
 296                     SNEXT = l.findVarHandle(SNode.class, "next", SNode.class);
 297                 } catch (ReflectiveOperationException e) {
 298                     throw new ExceptionInInitializerError(e);
 299                 }
 300             }
 301         }
 302 
 303         /** The head (top) of the stack */
 304         volatile SNode head;
 305 
 306         boolean casHead(SNode h, SNode nh) {
 307             return h == head &&
 308                 SHEAD.compareAndSet(this, h, nh);
 309         }
 310 
 311         /**
 312          * Creates or resets fields of a node. Called only from transfer
 313          * where the node to push on stack is lazily created and
 314          * reused when possible to help reduce intervals between reads
 315          * and CASes of head and to avoid surges of garbage when CASes
 316          * to push nodes fail due to contention.
 317          */
 318         static SNode snode(SNode s, Object e, SNode next, int mode) {
 319             if (s == null) s = new SNode(e);
 320             s.mode = mode;
 321             s.next = next;
 322             return s;
 323         }
 324 
 325         /**
 326          * Puts or takes an item.
 327          */
 328         @SuppressWarnings("unchecked")
 329         E transfer(E e, boolean timed, long nanos) {
 330             /*
 331              * Basic algorithm is to loop trying one of three actions:
 332              *
 333              * 1. If apparently empty or already containing nodes of same
 334              *    mode, try to push node on stack and wait for a match,
 335              *    returning it, or null if cancelled.
 336              *
 337              * 2. If apparently containing node of complementary mode,
 338              *    try to push a fulfilling node on to stack, match
 339              *    with corresponding waiting node, pop both from
 340              *    stack, and return matched item. The matching or
 341              *    unlinking might not actually be necessary because of
 342              *    other threads performing action 3:
 343              *
 344              * 3. If top of stack already holds another fulfilling node,
 345              *    help it out by doing its match and/or pop
 346              *    operations, and then continue. The code for helping
 347              *    is essentially the same as for fulfilling, except
 348              *    that it doesn't return the item.
 349              */
 350 
 351             SNode s = null; // constructed/reused as needed
 352             int mode = (e == null) ? REQUEST : DATA;
 353 
 354             for (;;) {
 355                 SNode h = head;
 356                 if (h == null || h.mode == mode) {  // empty or same-mode
 357                     if (timed && nanos <= 0L) {     // can't wait
 358                         if (h != null && h.isCancelled())
 359                             casHead(h, h.next);     // pop cancelled node
 360                         else
 361                             return null;
 362                     } else if (casHead(h, s = snode(s, e, h, mode))) {
 363                         SNode m = awaitFulfill(s, timed, nanos);
 364                         if (m == s) {               // wait was cancelled
 365                             clean(s);
 366                             return null;
 367                         }
 368                         if ((h = head) != null && h.next == s)
 369                             casHead(h, s.next);     // help s's fulfiller
 370                         return (E) ((mode == REQUEST) ? m.item : s.item);
 371                     }
 372                 } else if (!isFulfilling(h.mode)) { // try to fulfill
 373                     if (h.isCancelled())            // already cancelled
 374                         casHead(h, h.next);         // pop and retry
 375                     else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
 376                         for (;;) { // loop until matched or waiters disappear
 377                             SNode m = s.next;       // m is s's match
 378                             if (m == null) {        // all waiters are gone
 379                                 casHead(s, null);   // pop fulfill node
 380                                 s = null;           // use new node next time
 381                                 break;              // restart main loop
 382                             }
 383                             SNode mn = m.next;
 384                             if (m.tryMatch(s)) {
 385                                 casHead(s, mn);     // pop both s and m
 386                                 return (E) ((mode == REQUEST) ? m.item : s.item);
 387                             } else                  // lost match
 388                                 s.casNext(m, mn);   // help unlink
 389                         }
 390                     }
 391                 } else {                            // help a fulfiller
 392                     SNode m = h.next;               // m is h's match
 393                     if (m == null)                  // waiter is gone
 394                         casHead(h, null);           // pop fulfilling node
 395                     else {
 396                         SNode mn = m.next;
 397                         if (m.tryMatch(h))          // help match
 398                             casHead(h, mn);         // pop both h and m
 399                         else                        // lost match
 400                             h.casNext(m, mn);       // help unlink
 401                     }
 402                 }
 403             }
 404         }
 405 
 406         /**
 407          * Spins/blocks until node s is matched by a fulfill operation.
 408          *
 409          * @param s the waiting node
 410          * @param timed true if timed wait
 411          * @param nanos timeout value
 412          * @return matched node, or s if cancelled
 413          */
 414         SNode awaitFulfill(SNode s, boolean timed, long nanos) {
 415             /*
 416              * When a node/thread is about to block, it sets its waiter
 417              * field and then rechecks state at least one more time
 418              * before actually parking, thus covering race vs
 419              * fulfiller noticing that waiter is non-null so should be
 420              * woken.
 421              *
 422              * When invoked by nodes that appear at the point of call
 423              * to be at the head of the stack, calls to park are
 424              * preceded by spins to avoid blocking when producers and
 425              * consumers are arriving very close in time.  This can
 426              * happen enough to bother only on multiprocessors.
 427              *
 428              * The order of checks for returning out of main loop
 429              * reflects fact that interrupts have precedence over
 430              * normal returns, which have precedence over
 431              * timeouts. (So, on timeout, one last check for match is
 432              * done before giving up.) Except that calls from untimed
 433              * SynchronousQueue.{poll/offer} don't check interrupts
 434              * and don't wait at all, so are trapped in transfer
 435              * method rather than calling awaitFulfill.
 436              */
 437             final long deadline = timed ? System.nanoTime() + nanos : 0L;
 438             Object w = Strands.currentStrand();
 439             int spins = shouldSpin(s)
 440                 ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
 441                 : 0;
 442             for (;;) {
 443                 if (Strands.isInterrupted())
 444                     s.tryCancel();
 445                 SNode m = s.match;
 446                 if (m != null)
 447                     return m;
 448                 if (timed) {
 449                     nanos = deadline - System.nanoTime();
 450                     if (nanos <= 0L) {
 451                         s.tryCancel();
 452                         continue;
 453                     }
 454                 }
 455                 if (spins > 0) {
 456                     Thread.onSpinWait();
 457                     spins = shouldSpin(s) ? (spins - 1) : 0;
 458                 }
 459                 else if (s.waiter == null)
 460                     s.waiter = w; // establish waiter so can park next iter
 461                 else if (!timed)
 462                     LockSupport.park(this);
 463                 else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
 464                     LockSupport.parkNanos(this, nanos);
 465             }
 466         }
 467 
 468         /**
 469          * Returns true if node s is at head or there is an active
 470          * fulfiller.
 471          */
 472         boolean shouldSpin(SNode s) {
 473             SNode h = head;
 474             return (h == s || h == null || isFulfilling(h.mode));
 475         }
 476 
 477         /**
 478          * Unlinks s from the stack.
 479          */
 480         void clean(SNode s) {
 481             s.item = null;   // forget item
 482             s.waiter = null; // forget thread
 483 
 484             /*
 485              * At worst we may need to traverse entire stack to unlink
 486              * s. If there are multiple concurrent calls to clean, we
 487              * might not see s if another thread has already removed
 488              * it. But we can stop when we see any node known to
 489              * follow s. We use s.next unless it too is cancelled, in
 490              * which case we try the node one past. We don't check any
 491              * further because we don't want to doubly traverse just to
 492              * find sentinel.
 493              */
 494 
 495             SNode past = s.next;
 496             if (past != null && past.isCancelled())
 497                 past = past.next;
 498 
 499             // Absorb cancelled nodes at head
 500             SNode p;
 501             while ((p = head) != null && p != past && p.isCancelled())
 502                 casHead(p, p.next);
 503 
 504             // Unsplice embedded nodes
 505             while (p != null && p != past) {
 506                 SNode n = p.next;
 507                 if (n != null && n.isCancelled())
 508                     p.casNext(n, n.next);
 509                 else
 510                     p = n;
 511             }
 512         }
 513 
 514         // VarHandle mechanics
 515         private static final VarHandle SHEAD;
 516         static {
 517             try {
 518                 MethodHandles.Lookup l = MethodHandles.lookup();
 519                 SHEAD = l.findVarHandle(TransferStack.class, "head", SNode.class);
 520             } catch (ReflectiveOperationException e) {
 521                 throw new ExceptionInInitializerError(e);
 522             }
 523         }
 524     }
 525 
 526     /** Dual Queue */
 527     static final class TransferQueue<E> extends Transferer<E> {
 528         /*
 529          * This extends Scherer-Scott dual queue algorithm, differing,
 530          * among other ways, by using modes within nodes rather than
 531          * marked pointers. The algorithm is a little simpler than
 532          * that for stacks because fulfillers do not need explicit
 533          * nodes, and matching is done by CAS'ing QNode.item field
 534          * from non-null to null (for put) or vice versa (for take).
 535          */
 536 
 537         /** Node class for TransferQueue. */
 538         static final class QNode {
 539             volatile QNode next;          // next node in queue
 540             volatile Object item;         // CAS'ed to or from null
 541             volatile Object waiter;       // to control park/unpark
 542             final boolean isData;
 543 
 544             QNode(Object item, boolean isData) {
 545                 this.item = item;
 546                 this.isData = isData;
 547             }
 548 
 549             boolean casNext(QNode cmp, QNode val) {
 550                 return next == cmp &&
 551                     QNEXT.compareAndSet(this, cmp, val);
 552             }
 553 
 554             boolean casItem(Object cmp, Object val) {
 555                 return item == cmp &&
 556                     QITEM.compareAndSet(this, cmp, val);
 557             }
 558 
 559             /**
 560              * Tries to cancel by CAS'ing ref to this as item.
 561              */
 562             void tryCancel(Object cmp) {
 563                 QITEM.compareAndSet(this, cmp, this);
 564             }
 565 
 566             boolean isCancelled() {
 567                 return item == this;
 568             }
 569 
 570             /**
 571              * Returns true if this node is known to be off the queue
 572              * because its next pointer has been forgotten due to
 573              * an advanceHead operation.
 574              */
 575             boolean isOffList() {
 576                 return next == this;
 577             }
 578 
 579             // VarHandle mechanics
 580             private static final VarHandle QITEM;
 581             private static final VarHandle QNEXT;
 582             static {
 583                 try {
 584                     MethodHandles.Lookup l = MethodHandles.lookup();
 585                     QITEM = l.findVarHandle(QNode.class, "item", Object.class);
 586                     QNEXT = l.findVarHandle(QNode.class, "next", QNode.class);
 587                 } catch (ReflectiveOperationException e) {
 588                     throw new ExceptionInInitializerError(e);
 589                 }
 590             }
 591         }
 592 
 593         /** Head of queue */
 594         transient volatile QNode head;
 595         /** Tail of queue */
 596         transient volatile QNode tail;
 597         /**
 598          * Reference to a cancelled node that might not yet have been
 599          * unlinked from queue because it was the last inserted node
 600          * when it was cancelled.
 601          */
 602         transient volatile QNode cleanMe;
 603 
 604         TransferQueue() {
 605             QNode h = new QNode(null, false); // initialize to dummy node.
 606             head = h;
 607             tail = h;
 608         }
 609 
 610         /**
 611          * Tries to cas nh as new head; if successful, unlink
 612          * old head's next node to avoid garbage retention.
 613          */
 614         void advanceHead(QNode h, QNode nh) {
 615             if (h == head &&
 616                 QHEAD.compareAndSet(this, h, nh))
 617                 h.next = h; // forget old next
 618         }
 619 
 620         /**
 621          * Tries to cas nt as new tail.
 622          */
 623         void advanceTail(QNode t, QNode nt) {
 624             if (tail == t)
 625                 QTAIL.compareAndSet(this, t, nt);
 626         }
 627 
 628         /**
 629          * Tries to CAS cleanMe slot.
 630          */
 631         boolean casCleanMe(QNode cmp, QNode val) {
 632             return cleanMe == cmp &&
 633                 QCLEANME.compareAndSet(this, cmp, val);
 634         }
 635 
 636         /**
 637          * Puts or takes an item.
 638          */
 639         @SuppressWarnings("unchecked")
 640         E transfer(E e, boolean timed, long nanos) {
 641             /* Basic algorithm is to loop trying to take either of
 642              * two actions:
 643              *
 644              * 1. If queue apparently empty or holding same-mode nodes,
 645              *    try to add node to queue of waiters, wait to be
 646              *    fulfilled (or cancelled) and return matching item.
 647              *
 648              * 2. If queue apparently contains waiting items, and this
 649              *    call is of complementary mode, try to fulfill by CAS'ing
 650              *    item field of waiting node and dequeuing it, and then
 651              *    returning matching item.
 652              *
 653              * In each case, along the way, check for and try to help
 654              * advance head and tail on behalf of other stalled/slow
 655              * threads.
 656              *
 657              * The loop starts off with a null check guarding against
 658              * seeing uninitialized head or tail values. This never
 659              * happens in current SynchronousQueue, but could if
 660              * callers held non-volatile/final ref to the
 661              * transferer. The check is here anyway because it places
 662              * null checks at top of loop, which is usually faster
 663              * than having them implicitly interspersed.
 664              */
 665 
 666             QNode s = null; // constructed/reused as needed
 667             boolean isData = (e != null);
 668 
 669             for (;;) {
 670                 QNode t = tail;
 671                 QNode h = head;
 672                 if (t == null || h == null)         // saw uninitialized value
 673                     continue;                       // spin
 674 
 675                 if (h == t || t.isData == isData) { // empty or same-mode
 676                     QNode tn = t.next;
 677                     if (t != tail)                  // inconsistent read
 678                         continue;
 679                     if (tn != null) {               // lagging tail
 680                         advanceTail(t, tn);
 681                         continue;
 682                     }
 683                     if (timed && nanos <= 0L)       // can't wait
 684                         return null;
 685                     if (s == null)
 686                         s = new QNode(e, isData);
 687                     if (!t.casNext(null, s))        // failed to link in
 688                         continue;
 689 
 690                     advanceTail(t, s);              // swing tail and wait
 691                     Object x = awaitFulfill(s, e, timed, nanos);
 692                     if (x == s) {                   // wait was cancelled
 693                         clean(t, s);
 694                         return null;
 695                     }
 696 
 697                     if (!s.isOffList()) {           // not already unlinked
 698                         advanceHead(t, s);          // unlink if head
 699                         if (x != null)              // and forget fields
 700                             s.item = s;
 701                         s.waiter = null;
 702                     }
 703                     return (x != null) ? (E)x : e;
 704 
 705                 } else {                            // complementary-mode
 706                     QNode m = h.next;               // node to fulfill
 707                     if (t != tail || m == null || h != head)
 708                         continue;                   // inconsistent read
 709 
 710                     Object x = m.item;
 711                     if (isData == (x != null) ||    // m already fulfilled
 712                         x == m ||                   // m cancelled
 713                         !m.casItem(x, e)) {         // lost CAS
 714                         advanceHead(h, m);          // dequeue and retry
 715                         continue;
 716                     }
 717 
 718                     advanceHead(h, m);              // successfully fulfilled
 719                     LockSupport.unpark(m.waiter);
 720                     return (x != null) ? (E)x : e;
 721                 }
 722             }
 723         }
 724 
 725         /**
 726          * Spins/blocks until node s is fulfilled.
 727          *
 728          * @param s the waiting node
 729          * @param e the comparison value for checking match
 730          * @param timed true if timed wait
 731          * @param nanos timeout value
 732          * @return matched item, or s if cancelled
 733          */
 734         Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
 735             /* Same idea as TransferStack.awaitFulfill */
 736             final long deadline = timed ? System.nanoTime() + nanos : 0L;
 737             Object w = Strands.currentStrand();
 738             int spins = (head.next == s)
 739                 ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
 740                 : 0;
 741             for (;;) {
 742                 if (Strands.isInterrupted())
 743                     s.tryCancel(e);
 744                 Object x = s.item;
 745                 if (x != e)
 746                     return x;
 747                 if (timed) {
 748                     nanos = deadline - System.nanoTime();
 749                     if (nanos <= 0L) {
 750                         s.tryCancel(e);
 751                         continue;
 752                     }
 753                 }
 754                 if (spins > 0) {
 755                     --spins;
 756                     Thread.onSpinWait();
 757                 }
 758                 else if (s.waiter == null)
 759                     s.waiter = w;
 760                 else if (!timed)
 761                     LockSupport.park(this);
 762                 else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
 763                     LockSupport.parkNanos(this, nanos);
 764             }
 765         }
 766 
 767         /**
 768          * Gets rid of cancelled node s with original predecessor pred.
 769          */
 770         void clean(QNode pred, QNode s) {
 771             s.waiter = null; // forget thread
 772             /*
 773              * At any given time, exactly one node on list cannot be
 774              * deleted -- the last inserted node. To accommodate this,
 775              * if we cannot delete s, we save its predecessor as
 776              * "cleanMe", deleting the previously saved version
 777              * first. At least one of node s or the node previously
 778              * saved can always be deleted, so this always terminates.
 779              */
 780             while (pred.next == s) { // Return early if already unlinked
 781                 QNode h = head;
 782                 QNode hn = h.next;   // Absorb cancelled first node as head
 783                 if (hn != null && hn.isCancelled()) {
 784                     advanceHead(h, hn);
 785                     continue;
 786                 }
 787                 QNode t = tail;      // Ensure consistent read for tail
 788                 if (t == h)
 789                     return;
 790                 QNode tn = t.next;
 791                 if (t != tail)
 792                     continue;
 793                 if (tn != null) {
 794                     advanceTail(t, tn);
 795                     continue;
 796                 }
 797                 if (s != t) {        // If not tail, try to unsplice
 798                     QNode sn = s.next;
 799                     if (sn == s || pred.casNext(s, sn))
 800                         return;
 801                 }
 802                 QNode dp = cleanMe;
 803                 if (dp != null) {    // Try unlinking previous cancelled node
 804                     QNode d = dp.next;
 805                     QNode dn;
 806                     if (d == null ||               // d is gone or
 807                         d == dp ||                 // d is off list or
 808                         !d.isCancelled() ||        // d not cancelled or
 809                         (d != t &&                 // d not tail and
 810                          (dn = d.next) != null &&  //   has successor
 811                          dn != d &&                //   that is on list
 812                          dp.casNext(d, dn)))       // d unspliced
 813                         casCleanMe(dp, null);
 814                     if (dp == pred)
 815                         return;      // s is already saved node
 816                 } else if (casCleanMe(null, pred))
 817                     return;          // Postpone cleaning s
 818             }
 819         }
 820 
 821         // VarHandle mechanics
 822         private static final VarHandle QHEAD;
 823         private static final VarHandle QTAIL;
 824         private static final VarHandle QCLEANME;
 825         static {
 826             try {
 827                 MethodHandles.Lookup l = MethodHandles.lookup();
 828                 QHEAD = l.findVarHandle(TransferQueue.class, "head",
 829                                         QNode.class);
 830                 QTAIL = l.findVarHandle(TransferQueue.class, "tail",
 831                                         QNode.class);
 832                 QCLEANME = l.findVarHandle(TransferQueue.class, "cleanMe",
 833                                            QNode.class);
 834             } catch (ReflectiveOperationException e) {
 835                 throw new ExceptionInInitializerError(e);
 836             }
 837         }
 838     }
 839 
 840     /**
 841      * The transferer. Set only in constructor, but cannot be declared
 842      * as final without further complicating serialization.  Since
 843      * this is accessed only at most once per public method, there
 844      * isn't a noticeable performance penalty for using volatile
 845      * instead of final here.
 846      */
 847     private transient volatile Transferer<E> transferer;
 848 
 849     /**
 850      * Creates a {@code SynchronousQueue} with nonfair access policy.
 851      */
 852     public SynchronousQueue() {
 853         this(false);
 854     }
 855 
 856     /**
 857      * Creates a {@code SynchronousQueue} with the specified fairness policy.
 858      *
 859      * @param fair if true, waiting threads contend in FIFO order for
 860      *        access; otherwise the order is unspecified.
 861      */
 862     public SynchronousQueue(boolean fair) {
 863         transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
 864     }
 865 
 866     /**
 867      * Adds the specified element to this queue, waiting if necessary for
 868      * another thread to receive it.
 869      *
 870      * @throws InterruptedException {@inheritDoc}
 871      * @throws NullPointerException {@inheritDoc}
 872      */
 873     public void put(E e) throws InterruptedException {
 874         if (e == null) throw new NullPointerException();
 875         if (transferer.transfer(e, false, 0) == null) {
 876             Thread.interrupted();
 877             throw new InterruptedException();
 878         }
 879     }
 880 
 881     /**
 882      * Inserts the specified element into this queue, waiting if necessary
 883      * up to the specified wait time for another thread to receive it.
 884      *
 885      * @return {@code true} if successful, or {@code false} if the
 886      *         specified waiting time elapses before a consumer appears
 887      * @throws InterruptedException {@inheritDoc}
 888      * @throws NullPointerException {@inheritDoc}
 889      */
 890     public boolean offer(E e, long timeout, TimeUnit unit)
 891         throws InterruptedException {
 892         if (e == null) throw new NullPointerException();
 893         if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
 894             return true;
 895         if (!Thread.interrupted())
 896             return false;
 897         throw new InterruptedException();
 898     }
 899 
 900     /**
 901      * Inserts the specified element into this queue, if another thread is
 902      * waiting to receive it.
 903      *
 904      * @param e the element to add
 905      * @return {@code true} if the element was added to this queue, else
 906      *         {@code false}
 907      * @throws NullPointerException if the specified element is null
 908      */
 909     public boolean offer(E e) {
 910         if (e == null) throw new NullPointerException();
 911         return transferer.transfer(e, true, 0) != null;
 912     }
 913 
 914     /**
 915      * Retrieves and removes the head of this queue, waiting if necessary
 916      * for another thread to insert it.
 917      *
 918      * @return the head of this queue
 919      * @throws InterruptedException {@inheritDoc}
 920      */
 921     public E take() throws InterruptedException {
 922         E e = transferer.transfer(null, false, 0);
 923         if (e != null)
 924             return e;
 925         Thread.interrupted();
 926         throw new InterruptedException();
 927     }
 928 
 929     /**
 930      * Retrieves and removes the head of this queue, waiting
 931      * if necessary up to the specified wait time, for another thread
 932      * to insert it.
 933      *
 934      * @return the head of this queue, or {@code null} if the
 935      *         specified waiting time elapses before an element is present
 936      * @throws InterruptedException {@inheritDoc}
 937      */
 938     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
 939         E e = transferer.transfer(null, true, unit.toNanos(timeout));
 940         if (e != null || !Thread.interrupted())
 941             return e;
 942         throw new InterruptedException();
 943     }
 944 
 945     /**
 946      * Retrieves and removes the head of this queue, if another thread
 947      * is currently making an element available.
 948      *
 949      * @return the head of this queue, or {@code null} if no
 950      *         element is available
 951      */
 952     public E poll() {
 953         return transferer.transfer(null, true, 0);
 954     }
 955 
 956     /**
 957      * Always returns {@code true}.
 958      * A {@code SynchronousQueue} has no internal capacity.
 959      *
 960      * @return {@code true}
 961      */
 962     public boolean isEmpty() {
 963         return true;
 964     }
 965 
 966     /**
 967      * Always returns zero.
 968      * A {@code SynchronousQueue} has no internal capacity.
 969      *
 970      * @return zero
 971      */
 972     public int size() {
 973         return 0;
 974     }
 975 
 976     /**
 977      * Always returns zero.
 978      * A {@code SynchronousQueue} has no internal capacity.
 979      *
 980      * @return zero
 981      */
 982     public int remainingCapacity() {
 983         return 0;
 984     }
 985 
 986     /**
 987      * Does nothing.
 988      * A {@code SynchronousQueue} has no internal capacity.
 989      */
 990     public void clear() {
 991     }
 992 
 993     /**
 994      * Always returns {@code false}.
 995      * A {@code SynchronousQueue} has no internal capacity.
 996      *
 997      * @param o the element
 998      * @return {@code false}
 999      */
1000     public boolean contains(Object o) {
1001         return false;
1002     }
1003 
1004     /**
1005      * Always returns {@code false}.
1006      * A {@code SynchronousQueue} has no internal capacity.
1007      *
1008      * @param o the element to remove
1009      * @return {@code false}
1010      */
1011     public boolean remove(Object o) {
1012         return false;
1013     }
1014 
1015     /**
1016      * Returns {@code false} unless the given collection is empty.
1017      * A {@code SynchronousQueue} has no internal capacity.
1018      *
1019      * @param c the collection
1020      * @return {@code false} unless given collection is empty
1021      */
1022     public boolean containsAll(Collection<?> c) {
1023         return c.isEmpty();
1024     }
1025 
1026     /**
1027      * Always returns {@code false}.
1028      * A {@code SynchronousQueue} has no internal capacity.
1029      *
1030      * @param c the collection
1031      * @return {@code false}
1032      */
1033     public boolean removeAll(Collection<?> c) {
1034         return false;
1035     }
1036 
1037     /**
1038      * Always returns {@code false}.
1039      * A {@code SynchronousQueue} has no internal capacity.
1040      *
1041      * @param c the collection
1042      * @return {@code false}
1043      */
1044     public boolean retainAll(Collection<?> c) {
1045         return false;
1046     }
1047 
1048     /**
1049      * Always returns {@code null}.
1050      * A {@code SynchronousQueue} does not return elements
1051      * unless actively waited on.
1052      *
1053      * @return {@code null}
1054      */
1055     public E peek() {
1056         return null;
1057     }
1058 
1059     /**
1060      * Returns an empty iterator in which {@code hasNext} always returns
1061      * {@code false}.
1062      *
1063      * @return an empty iterator
1064      */
1065     public Iterator<E> iterator() {
1066         return Collections.emptyIterator();
1067     }
1068 
1069     /**
1070      * Returns an empty spliterator in which calls to
1071      * {@link Spliterator#trySplit() trySplit} always return {@code null}.
1072      *
1073      * @return an empty spliterator
1074      * @since 1.8
1075      */
1076     public Spliterator<E> spliterator() {
1077         return Spliterators.emptySpliterator();
1078     }
1079 
1080     /**
1081      * Returns a zero-length array.
1082      * @return a zero-length array
1083      */
1084     public Object[] toArray() {
1085         return new Object[0];
1086     }
1087 
1088     /**
1089      * Sets the zeroth element of the specified array to {@code null}
1090      * (if the array has non-zero length) and returns it.
1091      *
1092      * @param a the array
1093      * @return the specified array
1094      * @throws NullPointerException if the specified array is null
1095      */
1096     public <T> T[] toArray(T[] a) {
1097         if (a.length > 0)
1098             a[0] = null;
1099         return a;
1100     }
1101 
1102     /**
1103      * Always returns {@code "[]"}.
1104      * @return {@code "[]"}
1105      */
1106     public String toString() {
1107         return "[]";
1108     }
1109 
1110     /**
1111      * @throws UnsupportedOperationException {@inheritDoc}
1112      * @throws ClassCastException            {@inheritDoc}
1113      * @throws NullPointerException          {@inheritDoc}
1114      * @throws IllegalArgumentException      {@inheritDoc}
1115      */
1116     public int drainTo(Collection<? super E> c) {
1117         Objects.requireNonNull(c);
1118         if (c == this)
1119             throw new IllegalArgumentException();
1120         int n = 0;
1121         for (E e; (e = poll()) != null; n++)
1122             c.add(e);
1123         return n;
1124     }
1125 
1126     /**
1127      * @throws UnsupportedOperationException {@inheritDoc}
1128      * @throws ClassCastException            {@inheritDoc}
1129      * @throws NullPointerException          {@inheritDoc}
1130      * @throws IllegalArgumentException      {@inheritDoc}
1131      */
1132     public int drainTo(Collection<? super E> c, int maxElements) {
1133         Objects.requireNonNull(c);
1134         if (c == this)
1135             throw new IllegalArgumentException();
1136         int n = 0;
1137         for (E e; n < maxElements && (e = poll()) != null; n++)
1138             c.add(e);
1139         return n;
1140     }
1141 
1142     /*
1143      * To cope with serialization strategy in the 1.5 version of
1144      * SynchronousQueue, we declare some unused classes and fields
1145      * that exist solely to enable serializability across versions.
1146      * These fields are never used, so are initialized only if this
1147      * object is ever serialized or deserialized.
1148      */
1149 
1150     @SuppressWarnings("serial")
1151     static class WaitQueue implements java.io.Serializable { }
1152     static class LifoWaitQueue extends WaitQueue {
1153         private static final long serialVersionUID = -3633113410248163686L;
1154     }
1155     static class FifoWaitQueue extends WaitQueue {
1156         private static final long serialVersionUID = -3623113410248163686L;
1157     }
1158     private ReentrantLock qlock;
1159     private WaitQueue waitingProducers;
1160     private WaitQueue waitingConsumers;
1161 
1162     /**
1163      * Saves this queue to a stream (that is, serializes it).
1164      * @param s the stream
1165      * @throws java.io.IOException if an I/O error occurs
1166      */
1167     private void writeObject(java.io.ObjectOutputStream s)
1168         throws java.io.IOException {
1169         boolean fair = transferer instanceof TransferQueue;
1170         if (fair) {
1171             qlock = new ReentrantLock(true);
1172             waitingProducers = new FifoWaitQueue();
1173             waitingConsumers = new FifoWaitQueue();
1174         }
1175         else {
1176             qlock = new ReentrantLock();
1177             waitingProducers = new LifoWaitQueue();
1178             waitingConsumers = new LifoWaitQueue();
1179         }
1180         s.defaultWriteObject();
1181     }
1182 
1183     /**
1184      * Reconstitutes this queue from a stream (that is, deserializes it).
1185      * @param s the stream
1186      * @throws ClassNotFoundException if the class of a serialized object
1187      *         could not be found
1188      * @throws java.io.IOException if an I/O error occurs
1189      */
1190     private void readObject(java.io.ObjectInputStream s)
1191         throws java.io.IOException, ClassNotFoundException {
1192         s.defaultReadObject();
1193         if (waitingProducers instanceof FifoWaitQueue)
1194             transferer = new TransferQueue<E>();
1195         else
1196             transferer = new TransferStack<E>();
1197     }
1198 
1199     static {
1200         // Reduce the risk of rare disastrous classloading in first call to
1201         // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
1202         Class<?> ensureLoaded = LockSupport.class;
1203     }
1204 }