< prev index next >

src/java.base/share/classes/java/util/concurrent/SynchronousQueue.java

Print this page




  31  * Written by Doug Lea, Bill Scherer, and Michael Scott with
  32  * assistance from members of JCP JSR-166 Expert Group and released to
  33  * the public domain, as explained at
  34  * http://creativecommons.org/publicdomain/zero/1.0/
  35  */
  36 
  37 package java.util.concurrent;
  38 
  39 import java.lang.invoke.MethodHandles;
  40 import java.lang.invoke.VarHandle;
  41 import java.util.AbstractQueue;
  42 import java.util.Collection;
  43 import java.util.Collections;
  44 import java.util.Iterator;
  45 import java.util.Objects;
  46 import java.util.Spliterator;
  47 import java.util.Spliterators;
  48 import java.util.concurrent.locks.LockSupport;
  49 import java.util.concurrent.locks.ReentrantLock;
  50 


  51 /**
  52  * A {@linkplain BlockingQueue blocking queue} in which each insert
  53  * operation must wait for a corresponding remove operation by another
  54  * thread, and vice versa.  A synchronous queue does not have any
  55  * internal capacity, not even a capacity of one.  You cannot
  56  * {@code peek} at a synchronous queue because an element is only
  57  * present when you try to remove it; you cannot insert an element
  58  * (using any method) unless another thread is trying to remove it;
  59  * you cannot iterate as there is nothing to iterate.  The
  60  * <em>head</em> of the queue is the element that the first queued
  61  * inserting thread is trying to add to the queue; if there is no such
  62  * queued thread then no element is available for removal and
  63  * {@code poll()} will return {@code null}.  For purposes of other
  64  * {@code Collection} methods (for example {@code contains}), a
  65  * {@code SynchronousQueue} acts as an empty collection.  This queue
  66  * does not permit {@code null} elements.
  67  *
  68  * <p>Synchronous queues are similar to rendezvous channels used in
  69  * CSP and Ada. They are well suited for handoff designs, in which an
  70  * object running in one thread must sync up with an object running


 219          * bit-marked pointers: Fulfilling operations push on marker
 220          * nodes (with FULFILLING bit set in mode) to reserve a spot
 221          * to match a waiting node.
 222          */
 223 
 224         /* Modes for SNodes, ORed together in node fields */
 225         /** Node represents an unfulfilled consumer */
 226         static final int REQUEST    = 0;
 227         /** Node represents an unfulfilled producer */
 228         static final int DATA       = 1;
 229         /** Node is fulfilling another unfulfilled DATA or REQUEST */
 230         static final int FULFILLING = 2;
 231 
 232         /** Returns true if m has fulfilling bit set. */
 233         static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
 234 
 235         /** Node class for TransferStacks. */
 236         static final class SNode {
 237             volatile SNode next;        // next node in stack
 238             volatile SNode match;       // the node matched to this
 239             volatile Thread waiter;     // to control park/unpark
 240             Object item;                // data; or null for REQUESTs
 241             int mode;
 242             // Note: item and mode fields don't need to be volatile
 243             // since they are always written before, and read after,
 244             // other volatile/atomic operations.
 245 
 246             SNode(Object item) {
 247                 this.item = item;
 248             }
 249 
 250             boolean casNext(SNode cmp, SNode val) {
 251                 return cmp == next &&
 252                     SNEXT.compareAndSet(this, cmp, val);
 253             }
 254 
 255             /**
 256              * Tries to match node s to this node, if so, waking up thread.
 257              * Fulfillers call tryMatch to identify their waiters.
 258              * Waiters block until they have been matched.
 259              *
 260              * @param s the node to match
 261              * @return true if successfully matched to s
 262              */
 263             boolean tryMatch(SNode s) {
 264                 if (match == null &&
 265                     SMATCH.compareAndSet(this, null, s)) {
 266                     Thread w = waiter;
 267                     if (w != null) {    // waiters need at most one unpark
 268                         waiter = null;
 269                         LockSupport.unpark(w);
 270                     }
 271                     return true;
 272                 }
 273                 return match == s;
 274             }
 275 
 276             /**
 277              * Tries to cancel a wait by matching node to itself.
 278              */
 279             void tryCancel() {
 280                 SMATCH.compareAndSet(this, null, this);
 281             }
 282 
 283             boolean isCancelled() {
 284                 return match == this;
 285             }
 286 


 416              * before actually parking, thus covering race vs
 417              * fulfiller noticing that waiter is non-null so should be
 418              * woken.
 419              *
 420              * When invoked by nodes that appear at the point of call
 421              * to be at the head of the stack, calls to park are
 422              * preceded by spins to avoid blocking when producers and
 423              * consumers are arriving very close in time.  This can
 424              * happen enough to bother only on multiprocessors.
 425              *
 426              * The order of checks for returning out of main loop
 427              * reflects fact that interrupts have precedence over
 428              * normal returns, which have precedence over
 429              * timeouts. (So, on timeout, one last check for match is
 430              * done before giving up.) Except that calls from untimed
 431              * SynchronousQueue.{poll/offer} don't check interrupts
 432              * and don't wait at all, so are trapped in transfer
 433              * method rather than calling awaitFulfill.
 434              */
 435             final long deadline = timed ? System.nanoTime() + nanos : 0L;
 436             Thread w = Thread.currentThread();
 437             int spins = shouldSpin(s)
 438                 ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
 439                 : 0;
 440             for (;;) {
 441                 if (w.isInterrupted())
 442                     s.tryCancel();
 443                 SNode m = s.match;
 444                 if (m != null)
 445                     return m;
 446                 if (timed) {
 447                     nanos = deadline - System.nanoTime();
 448                     if (nanos <= 0L) {
 449                         s.tryCancel();
 450                         continue;
 451                     }
 452                 }
 453                 if (spins > 0) {
 454                     Thread.onSpinWait();
 455                     spins = shouldSpin(s) ? (spins - 1) : 0;
 456                 }
 457                 else if (s.waiter == null)
 458                     s.waiter = w; // establish waiter so can park next iter
 459                 else if (!timed)
 460                     LockSupport.park(this);
 461                 else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)


 519                 throw new ExceptionInInitializerError(e);
 520             }
 521         }
 522     }
 523 
 524     /** Dual Queue */
 525     static final class TransferQueue<E> extends Transferer<E> {
 526         /*
 527          * This extends Scherer-Scott dual queue algorithm, differing,
 528          * among other ways, by using modes within nodes rather than
 529          * marked pointers. The algorithm is a little simpler than
 530          * that for stacks because fulfillers do not need explicit
 531          * nodes, and matching is done by CAS'ing QNode.item field
 532          * from non-null to null (for put) or vice versa (for take).
 533          */
 534 
 535         /** Node class for TransferQueue. */
 536         static final class QNode {
 537             volatile QNode next;          // next node in queue
 538             volatile Object item;         // CAS'ed to or from null
 539             volatile Thread waiter;       // to control park/unpark
 540             final boolean isData;
 541 
 542             QNode(Object item, boolean isData) {
 543                 this.item = item;
 544                 this.isData = isData;
 545             }
 546 
 547             boolean casNext(QNode cmp, QNode val) {
 548                 return next == cmp &&
 549                     QNEXT.compareAndSet(this, cmp, val);
 550             }
 551 
 552             boolean casItem(Object cmp, Object val) {
 553                 return item == cmp &&
 554                     QITEM.compareAndSet(this, cmp, val);
 555             }
 556 
 557             /**
 558              * Tries to cancel by CAS'ing ref to this as item.
 559              */


 715 
 716                     advanceHead(h, m);              // successfully fulfilled
 717                     LockSupport.unpark(m.waiter);
 718                     return (x != null) ? (E)x : e;
 719                 }
 720             }
 721         }
 722 
 723         /**
 724          * Spins/blocks until node s is fulfilled.
 725          *
 726          * @param s the waiting node
 727          * @param e the comparison value for checking match
 728          * @param timed true if timed wait
 729          * @param nanos timeout value
 730          * @return matched item, or s if cancelled
 731          */
 732         Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
 733             /* Same idea as TransferStack.awaitFulfill */
 734             final long deadline = timed ? System.nanoTime() + nanos : 0L;
 735             Thread w = Thread.currentThread();
 736             int spins = (head.next == s)
 737                 ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
 738                 : 0;
 739             for (;;) {
 740                 if (w.isInterrupted())
 741                     s.tryCancel(e);
 742                 Object x = s.item;
 743                 if (x != e)
 744                     return x;
 745                 if (timed) {
 746                     nanos = deadline - System.nanoTime();
 747                     if (nanos <= 0L) {
 748                         s.tryCancel(e);
 749                         continue;
 750                     }
 751                 }
 752                 if (spins > 0) {
 753                     --spins;
 754                     Thread.onSpinWait();
 755                 }
 756                 else if (s.waiter == null)
 757                     s.waiter = w;
 758                 else if (!timed)
 759                     LockSupport.park(this);
 760                 else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)




  31  * Written by Doug Lea, Bill Scherer, and Michael Scott with
  32  * assistance from members of JCP JSR-166 Expert Group and released to
  33  * the public domain, as explained at
  34  * http://creativecommons.org/publicdomain/zero/1.0/
  35  */
  36 
  37 package java.util.concurrent;
  38 
  39 import java.lang.invoke.MethodHandles;
  40 import java.lang.invoke.VarHandle;
  41 import java.util.AbstractQueue;
  42 import java.util.Collection;
  43 import java.util.Collections;
  44 import java.util.Iterator;
  45 import java.util.Objects;
  46 import java.util.Spliterator;
  47 import java.util.Spliterators;
  48 import java.util.concurrent.locks.LockSupport;
  49 import java.util.concurrent.locks.ReentrantLock;
  50 
  51 import jdk.internal.misc.Strands;
  52 
  53 /**
  54  * A {@linkplain BlockingQueue blocking queue} in which each insert
  55  * operation must wait for a corresponding remove operation by another
  56  * thread, and vice versa.  A synchronous queue does not have any
  57  * internal capacity, not even a capacity of one.  You cannot
  58  * {@code peek} at a synchronous queue because an element is only
  59  * present when you try to remove it; you cannot insert an element
  60  * (using any method) unless another thread is trying to remove it;
  61  * you cannot iterate as there is nothing to iterate.  The
  62  * <em>head</em> of the queue is the element that the first queued
  63  * inserting thread is trying to add to the queue; if there is no such
  64  * queued thread then no element is available for removal and
  65  * {@code poll()} will return {@code null}.  For purposes of other
  66  * {@code Collection} methods (for example {@code contains}), a
  67  * {@code SynchronousQueue} acts as an empty collection.  This queue
  68  * does not permit {@code null} elements.
  69  *
  70  * <p>Synchronous queues are similar to rendezvous channels used in
  71  * CSP and Ada. They are well suited for handoff designs, in which an
  72  * object running in one thread must sync up with an object running


 221          * bit-marked pointers: Fulfilling operations push on marker
 222          * nodes (with FULFILLING bit set in mode) to reserve a spot
 223          * to match a waiting node.
 224          */
 225 
 226         /* Modes for SNodes, ORed together in node fields */
 227         /** Node represents an unfulfilled consumer */
 228         static final int REQUEST    = 0;
 229         /** Node represents an unfulfilled producer */
 230         static final int DATA       = 1;
 231         /** Node is fulfilling another unfulfilled DATA or REQUEST */
 232         static final int FULFILLING = 2;
 233 
 234         /** Returns true if m has fulfilling bit set. */
 235         static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
 236 
 237         /** Node class for TransferStacks. */
 238         static final class SNode {
 239             volatile SNode next;        // next node in stack
 240             volatile SNode match;       // the node matched to this
 241             volatile Object waiter;     // to control park/unpark
 242             Object item;                // data; or null for REQUESTs
 243             int mode;
 244             // Note: item and mode fields don't need to be volatile
 245             // since they are always written before, and read after,
 246             // other volatile/atomic operations.
 247 
 248             SNode(Object item) {
 249                 this.item = item;
 250             }
 251 
 252             boolean casNext(SNode cmp, SNode val) {
 253                 return cmp == next &&
 254                     SNEXT.compareAndSet(this, cmp, val);
 255             }
 256 
 257             /**
 258              * Tries to match node s to this node, if so, waking up thread.
 259              * Fulfillers call tryMatch to identify their waiters.
 260              * Waiters block until they have been matched.
 261              *
 262              * @param s the node to match
 263              * @return true if successfully matched to s
 264              */
 265             boolean tryMatch(SNode s) {
 266                 if (match == null &&
 267                     SMATCH.compareAndSet(this, null, s)) {
 268                     Object w = waiter;
 269                     if (w != null) {    // waiters need at most one unpark
 270                         waiter = null;
 271                         LockSupport.unpark(w);
 272                     }
 273                     return true;
 274                 }
 275                 return match == s;
 276             }
 277 
 278             /**
 279              * Tries to cancel a wait by matching node to itself.
 280              */
 281             void tryCancel() {
 282                 SMATCH.compareAndSet(this, null, this);
 283             }
 284 
 285             boolean isCancelled() {
 286                 return match == this;
 287             }
 288 


 418              * before actually parking, thus covering race vs
 419              * fulfiller noticing that waiter is non-null so should be
 420              * woken.
 421              *
 422              * When invoked by nodes that appear at the point of call
 423              * to be at the head of the stack, calls to park are
 424              * preceded by spins to avoid blocking when producers and
 425              * consumers are arriving very close in time.  This can
 426              * happen enough to bother only on multiprocessors.
 427              *
 428              * The order of checks for returning out of main loop
 429              * reflects fact that interrupts have precedence over
 430              * normal returns, which have precedence over
 431              * timeouts. (So, on timeout, one last check for match is
 432              * done before giving up.) Except that calls from untimed
 433              * SynchronousQueue.{poll/offer} don't check interrupts
 434              * and don't wait at all, so are trapped in transfer
 435              * method rather than calling awaitFulfill.
 436              */
 437             final long deadline = timed ? System.nanoTime() + nanos : 0L;
 438             Object w = Strands.currentStrand();
 439             int spins = shouldSpin(s)
 440                 ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
 441                 : 0;
 442             for (;;) {
 443                 if (Strands.isInterrupted())
 444                     s.tryCancel();
 445                 SNode m = s.match;
 446                 if (m != null)
 447                     return m;
 448                 if (timed) {
 449                     nanos = deadline - System.nanoTime();
 450                     if (nanos <= 0L) {
 451                         s.tryCancel();
 452                         continue;
 453                     }
 454                 }
 455                 if (spins > 0) {
 456                     Thread.onSpinWait();
 457                     spins = shouldSpin(s) ? (spins - 1) : 0;
 458                 }
 459                 else if (s.waiter == null)
 460                     s.waiter = w; // establish waiter so can park next iter
 461                 else if (!timed)
 462                     LockSupport.park(this);
 463                 else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)


 521                 throw new ExceptionInInitializerError(e);
 522             }
 523         }
 524     }
 525 
 526     /** Dual Queue */
 527     static final class TransferQueue<E> extends Transferer<E> {
 528         /*
 529          * This extends Scherer-Scott dual queue algorithm, differing,
 530          * among other ways, by using modes within nodes rather than
 531          * marked pointers. The algorithm is a little simpler than
 532          * that for stacks because fulfillers do not need explicit
 533          * nodes, and matching is done by CAS'ing QNode.item field
 534          * from non-null to null (for put) or vice versa (for take).
 535          */
 536 
 537         /** Node class for TransferQueue. */
 538         static final class QNode {
 539             volatile QNode next;          // next node in queue
 540             volatile Object item;         // CAS'ed to or from null
 541             volatile Object waiter;       // to control park/unpark
 542             final boolean isData;
 543 
 544             QNode(Object item, boolean isData) {
 545                 this.item = item;
 546                 this.isData = isData;
 547             }
 548 
 549             boolean casNext(QNode cmp, QNode val) {
 550                 return next == cmp &&
 551                     QNEXT.compareAndSet(this, cmp, val);
 552             }
 553 
 554             boolean casItem(Object cmp, Object val) {
 555                 return item == cmp &&
 556                     QITEM.compareAndSet(this, cmp, val);
 557             }
 558 
 559             /**
 560              * Tries to cancel by CAS'ing ref to this as item.
 561              */


 717 
 718                     advanceHead(h, m);              // successfully fulfilled
 719                     LockSupport.unpark(m.waiter);
 720                     return (x != null) ? (E)x : e;
 721                 }
 722             }
 723         }
 724 
 725         /**
 726          * Spins/blocks until node s is fulfilled.
 727          *
 728          * @param s the waiting node
 729          * @param e the comparison value for checking match
 730          * @param timed true if timed wait
 731          * @param nanos timeout value
 732          * @return matched item, or s if cancelled
 733          */
 734         Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
 735             /* Same idea as TransferStack.awaitFulfill */
 736             final long deadline = timed ? System.nanoTime() + nanos : 0L;
 737             Object w = Strands.currentStrand();
 738             int spins = (head.next == s)
 739                 ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
 740                 : 0;
 741             for (;;) {
 742                 if (Strands.isInterrupted())
 743                     s.tryCancel(e);
 744                 Object x = s.item;
 745                 if (x != e)
 746                     return x;
 747                 if (timed) {
 748                     nanos = deadline - System.nanoTime();
 749                     if (nanos <= 0L) {
 750                         s.tryCancel(e);
 751                         continue;
 752                     }
 753                 }
 754                 if (spins > 0) {
 755                     --spins;
 756                     Thread.onSpinWait();
 757                 }
 758                 else if (s.waiter == null)
 759                     s.waiter = w;
 760                 else if (!timed)
 761                     LockSupport.park(this);
 762                 else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)


< prev index next >