< prev index next >

src/java.base/share/classes/java/util/concurrent/locks/AbstractQueuedSynchronizer.java

Print this page




  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent.locks;
  37 
  38 import java.lang.invoke.MethodHandles;
  39 import java.lang.invoke.VarHandle;
  40 import java.util.ArrayList;
  41 import java.util.Collection;
  42 import java.util.Date;
  43 import java.util.concurrent.TimeUnit;
  44 


  45 /**
  46  * Provides a framework for implementing blocking locks and related
  47  * synchronizers (semaphores, events, etc) that rely on
  48  * first-in-first-out (FIFO) wait queues.  This class is designed to
  49  * be a useful basis for most kinds of synchronizers that rely on a
  50  * single atomic {@code int} value to represent state. Subclasses
  51  * must define the protected methods that change this state, and which
  52  * define what that state means in terms of this object being acquired
  53  * or released.  Given these, the other methods in this class carry
  54  * out all queuing and blocking mechanics. Subclasses can maintain
  55  * other state fields, but only the atomically updated {@code int}
  56  * value manipulated using methods {@link #getState}, {@link
  57  * #setState} and {@link #compareAndSetState} is tracked with respect
  58  * to synchronization.
  59  *
  60  * <p>Subclasses should be defined as non-public internal helper
  61  * classes that are used to implement the synchronization properties
  62  * of their enclosing class.  Class
  63  * {@code AbstractQueuedSynchronizer} does not implement any
  64  * synchronization interface.  Instead it defines methods such as


 457          * cancels itself, not any other node.
 458          */
 459         volatile Node prev;
 460 
 461         /**
 462          * Link to the successor node that the current node/thread
 463          * unparks upon release. Assigned during enqueuing, adjusted
 464          * when bypassing cancelled predecessors, and nulled out (for
 465          * sake of GC) when dequeued.  The enq operation does not
 466          * assign next field of a predecessor until after attachment,
 467          * so seeing a null next field does not necessarily mean that
 468          * node is at end of queue. However, if a next field appears
 469          * to be null, we can scan prev's from the tail to
 470          * double-check.  The next field of cancelled nodes is set to
 471          * point to the node itself instead of null, to make life
 472          * easier for isOnSyncQueue.
 473          */
 474         volatile Node next;
 475 
 476         /**
 477          * The thread that enqueued this node.  Initialized on
 478          * construction and nulled out after use.
 479          */
 480         volatile Thread thread;
 481 
 482         /**
 483          * Link to next node waiting on condition, or the special
 484          * value SHARED.  Because condition queues are accessed only
 485          * when holding in exclusive mode, we just need a simple
 486          * linked queue to hold nodes while they are waiting on
 487          * conditions. They are then transferred to the queue to
 488          * re-acquire. And because conditions can only be exclusive,
 489          * we save a field by using special value to indicate shared
 490          * mode.
 491          */
 492         Node nextWaiter;
 493 
 494         /**
 495          * Returns true if node is waiting in shared mode.
 496          */
 497         final boolean isShared() {
 498             return nextWaiter == SHARED;
 499         }
 500 


 502          * Returns previous node, or throws NullPointerException if null.
 503          * Use when predecessor cannot be null.  The null check could
 504          * be elided, but is present to help the VM.
 505          *
 506          * @return the predecessor of this node
 507          */
 508         final Node predecessor() {
 509             Node p = prev;
 510             if (p == null)
 511                 throw new NullPointerException();
 512             else
 513                 return p;
 514         }
 515 
 516         /** Establishes initial head or SHARED marker. */
 517         Node() {}
 518 
 519         /** Constructor used by addWaiter. */
 520         Node(Node nextWaiter) {
 521             this.nextWaiter = nextWaiter;
 522             THREAD.set(this, Thread.currentThread());
 523         }
 524 
 525         /** Constructor used by addConditionWaiter. */
 526         Node(int waitStatus) {
 527             WAITSTATUS.set(this, waitStatus);
 528             THREAD.set(this, Thread.currentThread());
 529         }
 530 
 531         /** CASes waitStatus field. */
 532         final boolean compareAndSetWaitStatus(int expect, int update) {
 533             return WAITSTATUS.compareAndSet(this, expect, update);
 534         }
 535 
 536         /** CASes next field. */
 537         final boolean compareAndSetNext(Node expect, Node update) {
 538             return NEXT.compareAndSet(this, expect, update);
 539         }
 540 
 541         final void setPrevRelaxed(Node p) {
 542             PREV.set(this, p);
 543         }
 544 
 545         // VarHandle mechanics
 546         private static final VarHandle NEXT;
 547         private static final VarHandle PREV;
 548         private static final VarHandle THREAD;
 549         private static final VarHandle WAITSTATUS;
 550         static {
 551             try {
 552                 MethodHandles.Lookup l = MethodHandles.lookup();
 553                 NEXT = l.findVarHandle(Node.class, "next", Node.class);
 554                 PREV = l.findVarHandle(Node.class, "prev", Node.class);
 555                 THREAD = l.findVarHandle(Node.class, "thread", Thread.class);
 556                 WAITSTATUS = l.findVarHandle(Node.class, "waitStatus", int.class);
 557             } catch (ReflectiveOperationException e) {
 558                 throw new ExceptionInInitializerError(e);
 559             }
 560         }
 561     }
 562 
 563     /**
 564      * Head of the wait queue, lazily initialized.  Except for
 565      * initialization, it is modified only via method setHead.  Note:
 566      * If head exists, its waitStatus is guaranteed not to be
 567      * CANCELLED.
 568      */
 569     private transient volatile Node head;
 570 
 571     /**
 572      * Tail of the wait queue, lazily initialized.  Modified only via
 573      * method enq to add new wait node.
 574      */
 575     private transient volatile Node tail;


 656                 node.setPrevRelaxed(oldTail);
 657                 if (compareAndSetTail(oldTail, node)) {
 658                     oldTail.next = node;
 659                     return node;
 660                 }
 661             } else {
 662                 initializeSyncQueue();
 663             }
 664         }
 665     }
 666 
 667     /**
 668      * Sets head of queue to be node, thus dequeuing. Called only by
 669      * acquire methods.  Also nulls out unused fields for sake of GC
 670      * and to suppress unnecessary signals and traversals.
 671      *
 672      * @param node the node
 673      */
 674     private void setHead(Node node) {
 675         head = node;
 676         node.thread = null;
 677         node.prev = null;
 678     }
 679 
 680     /**
 681      * Wakes up node's successor, if one exists.
 682      *
 683      * @param node the node
 684      */
 685     private void unparkSuccessor(Node node) {
 686         /*
 687          * If status is negative (i.e., possibly needing signal) try
 688          * to clear in anticipation of signalling.  It is OK if this
 689          * fails or if status is changed by waiting thread.
 690          */
 691         int ws = node.waitStatus;
 692         if (ws < 0)
 693             node.compareAndSetWaitStatus(ws, 0);
 694 
 695         /*
 696          * Thread to unpark is held in successor, which is normally
 697          * just the next node.  But if cancelled or apparently null,
 698          * traverse backwards from tail to find the actual
 699          * non-cancelled successor.
 700          */
 701         Node s = node.next;
 702         if (s == null || s.waitStatus > 0) {
 703             s = null;
 704             for (Node p = tail; p != node && p != null; p = p.prev)
 705                 if (p.waitStatus <= 0)
 706                     s = p;
 707         }
 708         if (s != null)
 709             LockSupport.unpark(s.thread);
 710     }
 711 
 712     /**
 713      * Release action for shared mode -- signals successor and ensures
 714      * propagation. (Note: For exclusive mode, release just amounts
 715      * to calling unparkSuccessor of head if it needs signal.)
 716      */
 717     private void doReleaseShared() {
 718         /*
 719          * Ensure that a release propagates, even if there are other
 720          * in-progress acquires/releases.  This proceeds in the usual
 721          * way of trying to unparkSuccessor of head if it needs
 722          * signal. But if it does not, status is set to PROPAGATE to
 723          * ensure that upon release, propagation continues.
 724          * Additionally, we must loop in case a new node is added
 725          * while we are doing this. Also, unlike other uses of
 726          * unparkSuccessor, we need to know if CAS to reset status
 727          * fails, if so rechecking.
 728          */
 729         for (;;) {


 774         if (propagate > 0 || h == null || h.waitStatus < 0 ||
 775             (h = head) == null || h.waitStatus < 0) {
 776             Node s = node.next;
 777             if (s == null || s.isShared())
 778                 doReleaseShared();
 779         }
 780     }
 781 
 782     // Utilities for various versions of acquire
 783 
 784     /**
 785      * Cancels an ongoing attempt to acquire.
 786      *
 787      * @param node the node
 788      */
 789     private void cancelAcquire(Node node) {
 790         // Ignore if node doesn't exist
 791         if (node == null)
 792             return;
 793 
 794         node.thread = null;
 795 
 796         // Skip cancelled predecessors
 797         Node pred = node.prev;
 798         while (pred.waitStatus > 0)
 799             node.prev = pred = pred.prev;
 800 
 801         // predNext is the apparent node to unsplice. CASes below will
 802         // fail if not, in which case, we lost race vs another cancel
 803         // or signal, so no further action is necessary, although with
 804         // a possibility that a cancelled node may transiently remain
 805         // reachable.
 806         Node predNext = pred.next;
 807 
 808         // Can use unconditional write instead of CAS here.
 809         // After this atomic step, other Nodes can skip past us.
 810         // Before, we are free of interference from other threads.
 811         node.waitStatus = Node.CANCELLED;
 812 
 813         // If we are the tail, remove ourselves.
 814         if (node == tail && compareAndSetTail(node, pred)) {
 815             pred.compareAndSetNext(predNext, null);
 816         } else {
 817             // If successor needs signal, try to set pred's next-link
 818             // so it will get one. Otherwise wake it up to propagate.
 819             int ws;
 820             if (pred != head &&
 821                 ((ws = pred.waitStatus) == Node.SIGNAL ||
 822                  (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
 823                 pred.thread != null) {
 824                 Node next = node.next;
 825                 if (next != null && next.waitStatus <= 0)
 826                     pred.compareAndSetNext(predNext, next);
 827             } else {
 828                 unparkSuccessor(node);
 829             }
 830 
 831             node.next = node; // help GC
 832         }
 833     }
 834 
 835     /**
 836      * Checks and updates status for a node that failed to acquire.
 837      * Returns true if thread should block. This is the main signal
 838      * control in all acquire loops.  Requires that pred == node.prev.
 839      *
 840      * @param pred node's predecessor holding status
 841      * @param node the node
 842      * @return {@code true} if thread should block
 843      */


1412      *
1413      * @return {@code true} if there has ever been contention
1414      */
1415     public final boolean hasContended() {
1416         return head != null;
1417     }
1418 
1419     /**
1420      * Returns the first (longest-waiting) thread in the queue, or
1421      * {@code null} if no threads are currently queued.
1422      *
1423      * <p>In this implementation, this operation normally returns in
1424      * constant time, but may iterate upon contention if other threads are
1425      * concurrently modifying the queue.
1426      *
1427      * @return the first (longest-waiting) thread in the queue, or
1428      *         {@code null} if no threads are currently queued
1429      */
1430     public final Thread getFirstQueuedThread() {
1431         // handle only fast path, else relay
1432         return (head == tail) ? null : fullGetFirstQueuedThread();

1433     }
1434 
1435     /**
1436      * Version of getFirstQueuedThread called when fastpath fails.
1437      */
1438     private Thread fullGetFirstQueuedThread() {
1439         /*
1440          * The first node is normally head.next. Try to get its
1441          * thread field, ensuring consistent reads: If thread
1442          * field is nulled out or s.prev is no longer head, then
1443          * some other thread(s) concurrently performed setHead in
1444          * between some of our reads. We try this twice before
1445          * resorting to traversal.
1446          */
1447         Node h, s;
1448         Thread st;
1449         if (((h = head) != null && (s = h.next) != null &&
1450              s.prev == head && (st = s.thread) != null) ||
1451             ((h = head) != null && (s = h.next) != null &&
1452              s.prev == head && (st = s.thread) != null))
1453             return st;
1454 
1455         /*
1456          * Head's next field might not have been set yet, or may have
1457          * been unset after setHead. So we must check to see if tail
1458          * is actually first node. If not, we continue on, safely
1459          * traversing from tail back to head to find first,
1460          * guaranteeing termination.
1461          */
1462 
1463         Thread firstThread = null;
1464         for (Node p = tail; p != null && p != head; p = p.prev) {
1465             Thread t = p.thread;
1466             if (t != null)
1467                 firstThread = t;
1468         }
1469         return firstThread;
1470     }
1471 
1472     /**
1473      * Returns true if the given thread is currently queued.
1474      *
1475      * <p>This implementation traverses the queue to determine
1476      * presence of the given thread.
1477      *
1478      * @param thread the thread
1479      * @return {@code true} if the given thread is on the queue
1480      * @throws NullPointerException if the thread is null
1481      */
1482     public final boolean isQueued(Thread thread) {
1483         if (thread == null)
1484             throw new NullPointerException();
1485         for (Node p = tail; p != null; p = p.prev)
1486             if (p.thread == thread)
1487                 return true;
1488         return false;
1489     }
1490 
1491     /**
1492      * Returns {@code true} if the apparent first queued thread, if one
1493      * exists, is waiting in exclusive mode.  If this method returns
1494      * {@code true}, and the current thread is attempting to acquire in
1495      * shared mode (that is, this method is invoked from {@link
1496      * #tryAcquireShared}) then it is guaranteed that the current thread
1497      * is not the first queued thread.  Used only as a heuristic in
1498      * ReentrantReadWriteLock.
1499      */
1500     final boolean apparentlyFirstQueuedIsExclusive() {
1501         Node h, s;
1502         return (h = head) != null &&
1503             (s = h.next)  != null &&
1504             !s.isShared()         &&
1505             s.thread != null;
1506     }
1507 
1508     /**
1509      * Queries whether any threads have been waiting to acquire longer
1510      * than the current thread.
1511      *
1512      * <p>An invocation of this method is equivalent to (but may be
1513      * more efficient than):
1514      * <pre> {@code
1515      * getFirstQueuedThread() != Thread.currentThread()
1516      *   && hasQueuedThreads()}</pre>
1517      *
1518      * <p>Note that because cancellations due to interrupts and
1519      * timeouts may occur at any time, a {@code true} return does not
1520      * guarantee that some other thread will acquire before the current
1521      * thread.  Likewise, it is possible for another thread to win a
1522      * race to enqueue after this method has returned {@code false},
1523      * due to the queue being empty.
1524      *
1525      * <p>This method is designed to be used by a fair synchronizer to


1541      *   } else {
1542      *     // try to acquire normally
1543      *   }
1544      * }}</pre>
1545      *
1546      * @return {@code true} if there is a queued thread preceding the
1547      *         current thread, and {@code false} if the current thread
1548      *         is at the head of the queue or the queue is empty
1549      * @since 1.7
1550      */
1551     public final boolean hasQueuedPredecessors() {
1552         Node h, s;
1553         if ((h = head) != null) {
1554             if ((s = h.next) == null || s.waitStatus > 0) {
1555                 s = null; // traverse in case of concurrent cancellation
1556                 for (Node p = tail; p != h && p != null; p = p.prev) {
1557                     if (p.waitStatus <= 0)
1558                         s = p;
1559                 }
1560             }
1561             if (s != null && s.thread != Thread.currentThread())
1562                 return true;
1563         }
1564         return false;
1565     }
1566 
1567     // Instrumentation and monitoring methods
1568 
1569     /**
1570      * Returns an estimate of the number of threads waiting to
1571      * acquire.  The value is only an estimate because the number of
1572      * threads may change dynamically while this method traverses
1573      * internal data structures.  This method is designed for use in
1574      * monitoring system state, not for synchronization control.
1575      *
1576      * @return the estimated number of threads waiting to acquire
1577      */
1578     public final int getQueueLength() {
1579         int n = 0;
1580         for (Node p = tail; p != null; p = p.prev) {
1581             if (p.thread != null)
1582                 ++n;
1583         }
1584         return n;
1585     }
1586 
1587     /**
1588      * Returns a collection containing threads that may be waiting to
1589      * acquire.  Because the actual set of threads may change
1590      * dynamically while constructing this result, the returned
1591      * collection is only a best-effort estimate.  The elements of the
1592      * returned collection are in no particular order.  This method is
1593      * designed to facilitate construction of subclasses that provide
1594      * more extensive monitoring facilities.
1595      *
1596      * @return the collection of threads
1597      */
1598     public final Collection<Thread> getQueuedThreads() {
1599         ArrayList<Thread> list = new ArrayList<>();
1600         for (Node p = tail; p != null; p = p.prev) {
1601             Thread t = p.thread;
1602             if (t != null)
1603                 list.add(t);
1604         }
1605         return list;
1606     }
1607 
1608     /**
1609      * Returns a collection containing threads that may be waiting to
1610      * acquire in exclusive mode. This has the same properties
1611      * as {@link #getQueuedThreads} except that it only returns
1612      * those threads waiting due to an exclusive acquire.
1613      *
1614      * @return the collection of threads
1615      */
1616     public final Collection<Thread> getExclusiveQueuedThreads() {
1617         ArrayList<Thread> list = new ArrayList<>();
1618         for (Node p = tail; p != null; p = p.prev) {
1619             if (!p.isShared()) {
1620                 Thread t = p.thread;
1621                 if (t != null)
1622                     list.add(t);
1623             }
1624         }
1625         return list;
1626     }
1627 
1628     /**
1629      * Returns a collection containing threads that may be waiting to
1630      * acquire in shared mode. This has the same properties
1631      * as {@link #getQueuedThreads} except that it only returns
1632      * those threads waiting due to a shared acquire.
1633      *
1634      * @return the collection of threads
1635      */
1636     public final Collection<Thread> getSharedQueuedThreads() {
1637         ArrayList<Thread> list = new ArrayList<>();
1638         for (Node p = tail; p != null; p = p.prev) {
1639             if (p.isShared()) {
1640                 Thread t = p.thread;
1641                 if (t != null)
1642                     list.add(t);
1643             }
1644         }
1645         return list;
1646     }
1647 
1648     /**
1649      * Returns a string identifying this synchronizer, as well as its state.
1650      * The state, in brackets, includes the String {@code "State ="}
1651      * followed by the current value of {@link #getState}, and either
1652      * {@code "nonempty"} or {@code "empty"} depending on whether the
1653      * queue is empty.
1654      *
1655      * @return a string identifying this synchronizer, as well as its state
1656      */
1657     public String toString() {
1658         return super.toString()
1659             + "[State = " + getState() + ", "
1660             + (hasQueuedThreads() ? "non" : "") + "empty queue]";
1661     }
1662 


1709      * @param node the node
1710      * @return true if successfully transferred (else the node was
1711      * cancelled before signal)
1712      */
1713     final boolean transferForSignal(Node node) {
1714         /*
1715          * If cannot change waitStatus, the node has been cancelled.
1716          */
1717         if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
1718             return false;
1719 
1720         /*
1721          * Splice onto queue and try to set waitStatus of predecessor to
1722          * indicate that thread is (probably) waiting. If cancelled or
1723          * attempt to set waitStatus fails, wake up to resync (in which
1724          * case the waitStatus can be transiently and harmlessly wrong).
1725          */
1726         Node p = enq(node);
1727         int ws = p.waitStatus;
1728         if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
1729             LockSupport.unpark(node.thread);
1730         return true;
1731     }
1732 
1733     /**
1734      * Transfers node, if necessary, to sync queue after a cancelled wait.
1735      * Returns true if thread was cancelled before being signalled.
1736      *
1737      * @param node the node
1738      * @return true if cancelled before the node was signalled
1739      */
1740     final boolean transferAfterCancelledWait(Node node) {
1741         if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
1742             enq(node);
1743             return true;
1744         }
1745         /*
1746          * If we lost out to a signal(), then we can't proceed
1747          * until it finishes its enq().  Cancelling during an
1748          * incomplete transfer is both rare and transient, so just
1749          * spin.


2270                     ++n;
2271             }
2272             return n;
2273         }
2274 
2275         /**
2276          * Returns a collection containing those threads that may be
2277          * waiting on this Condition.
2278          * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}.
2279          *
2280          * @return the collection of threads
2281          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
2282          *         returns {@code false}
2283          */
2284         protected final Collection<Thread> getWaitingThreads() {
2285             if (!isHeldExclusively())
2286                 throw new IllegalMonitorStateException();
2287             ArrayList<Thread> list = new ArrayList<>();
2288             for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
2289                 if (w.waitStatus == Node.CONDITION) {
2290                     Thread t = w.thread;
2291                     if (t != null)
2292                         list.add(t);
2293                 }
2294             }
2295             return list;
2296         }
2297     }
2298 
2299     // VarHandle mechanics
2300     private static final VarHandle STATE;
2301     private static final VarHandle HEAD;
2302     private static final VarHandle TAIL;
2303 
2304     static {
2305         try {
2306             MethodHandles.Lookup l = MethodHandles.lookup();
2307             STATE = l.findVarHandle(AbstractQueuedSynchronizer.class, "state", int.class);
2308             HEAD = l.findVarHandle(AbstractQueuedSynchronizer.class, "head", Node.class);
2309             TAIL = l.findVarHandle(AbstractQueuedSynchronizer.class, "tail", Node.class);
2310         } catch (ReflectiveOperationException e) {
2311             throw new ExceptionInInitializerError(e);
2312         }




  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent.locks;
  37 
  38 import java.lang.invoke.MethodHandles;
  39 import java.lang.invoke.VarHandle;
  40 import java.util.ArrayList;
  41 import java.util.Collection;
  42 import java.util.Date;
  43 import java.util.concurrent.TimeUnit;
  44 
  45 import jdk.internal.misc.Strands;
  46 
  47 /**
  48  * Provides a framework for implementing blocking locks and related
  49  * synchronizers (semaphores, events, etc) that rely on
  50  * first-in-first-out (FIFO) wait queues.  This class is designed to
  51  * be a useful basis for most kinds of synchronizers that rely on a
  52  * single atomic {@code int} value to represent state. Subclasses
  53  * must define the protected methods that change this state, and which
  54  * define what that state means in terms of this object being acquired
  55  * or released.  Given these, the other methods in this class carry
  56  * out all queuing and blocking mechanics. Subclasses can maintain
  57  * other state fields, but only the atomically updated {@code int}
  58  * value manipulated using methods {@link #getState}, {@link
  59  * #setState} and {@link #compareAndSetState} is tracked with respect
  60  * to synchronization.
  61  *
  62  * <p>Subclasses should be defined as non-public internal helper
  63  * classes that are used to implement the synchronization properties
  64  * of their enclosing class.  Class
  65  * {@code AbstractQueuedSynchronizer} does not implement any
  66  * synchronization interface.  Instead it defines methods such as


 459          * cancels itself, not any other node.
 460          */
 461         volatile Node prev;
 462 
 463         /**
 464          * Link to the successor node that the current node/thread
 465          * unparks upon release. Assigned during enqueuing, adjusted
 466          * when bypassing cancelled predecessors, and nulled out (for
 467          * sake of GC) when dequeued.  The enq operation does not
 468          * assign next field of a predecessor until after attachment,
 469          * so seeing a null next field does not necessarily mean that
 470          * node is at end of queue. However, if a next field appears
 471          * to be null, we can scan prev's from the tail to
 472          * double-check.  The next field of cancelled nodes is set to
 473          * point to the node itself instead of null, to make life
 474          * easier for isOnSyncQueue.
 475          */
 476         volatile Node next;
 477 
 478         /**
 479          * The strand that enqueued this node.  Initialized on
 480          * construction and nulled out after use.
 481          */
 482         volatile Object strand;
 483 
 484         /**
 485          * Link to next node waiting on condition, or the special
 486          * value SHARED.  Because condition queues are accessed only
 487          * when holding in exclusive mode, we just need a simple
 488          * linked queue to hold nodes while they are waiting on
 489          * conditions. They are then transferred to the queue to
 490          * re-acquire. And because conditions can only be exclusive,
 491          * we save a field by using special value to indicate shared
 492          * mode.
 493          */
 494         Node nextWaiter;
 495 
 496         /**
 497          * Returns true if node is waiting in shared mode.
 498          */
 499         final boolean isShared() {
 500             return nextWaiter == SHARED;
 501         }
 502 


 504          * Returns previous node, or throws NullPointerException if null.
 505          * Use when predecessor cannot be null.  The null check could
 506          * be elided, but is present to help the VM.
 507          *
 508          * @return the predecessor of this node
 509          */
 510         final Node predecessor() {
 511             Node p = prev;
 512             if (p == null)
 513                 throw new NullPointerException();
 514             else
 515                 return p;
 516         }
 517 
 518         /** Establishes initial head or SHARED marker. */
 519         Node() {}
 520 
 521         /** Constructor used by addWaiter. */
 522         Node(Node nextWaiter) {
 523             this.nextWaiter = nextWaiter;
 524             STRAND.set(this, Strands.currentStrand());
 525         }
 526 
 527         /** Constructor used by addConditionWaiter. */
 528         Node(int waitStatus) {
 529             WAITSTATUS.set(this, waitStatus);
 530             STRAND.set(this, Strands.currentStrand());
 531         }
 532 
 533         /** CASes waitStatus field. */
 534         final boolean compareAndSetWaitStatus(int expect, int update) {
 535             return WAITSTATUS.compareAndSet(this, expect, update);
 536         }
 537 
 538         /** CASes next field. */
 539         final boolean compareAndSetNext(Node expect, Node update) {
 540             return NEXT.compareAndSet(this, expect, update);
 541         }
 542 
 543         final void setPrevRelaxed(Node p) {
 544             PREV.set(this, p);
 545         }
 546 
 547         // VarHandle mechanics
 548         private static final VarHandle NEXT;
 549         private static final VarHandle PREV;
 550         private static final VarHandle STRAND;
 551         private static final VarHandle WAITSTATUS;
 552         static {
 553             try {
 554                 MethodHandles.Lookup l = MethodHandles.lookup();
 555                 NEXT = l.findVarHandle(Node.class, "next", Node.class);
 556                 PREV = l.findVarHandle(Node.class, "prev", Node.class);
 557                 STRAND = l.findVarHandle(Node.class, "strand", Object.class);
 558                 WAITSTATUS = l.findVarHandle(Node.class, "waitStatus", int.class);
 559             } catch (ReflectiveOperationException e) {
 560                 throw new ExceptionInInitializerError(e);
 561             }
 562         }
 563     }
 564 
 565     /**
 566      * Head of the wait queue, lazily initialized.  Except for
 567      * initialization, it is modified only via method setHead.  Note:
 568      * If head exists, its waitStatus is guaranteed not to be
 569      * CANCELLED.
 570      */
 571     private transient volatile Node head;
 572 
 573     /**
 574      * Tail of the wait queue, lazily initialized.  Modified only via
 575      * method enq to add new wait node.
 576      */
 577     private transient volatile Node tail;


 658                 node.setPrevRelaxed(oldTail);
 659                 if (compareAndSetTail(oldTail, node)) {
 660                     oldTail.next = node;
 661                     return node;
 662                 }
 663             } else {
 664                 initializeSyncQueue();
 665             }
 666         }
 667     }
 668 
 669     /**
 670      * Sets head of queue to be node, thus dequeuing. Called only by
 671      * acquire methods.  Also nulls out unused fields for sake of GC
 672      * and to suppress unnecessary signals and traversals.
 673      *
 674      * @param node the node
 675      */
 676     private void setHead(Node node) {
 677         head = node;
 678         node.strand = null;
 679         node.prev = null;
 680     }
 681 
 682     /**
 683      * Wakes up node's successor, if one exists.
 684      *
 685      * @param node the node
 686      */
 687     private void unparkSuccessor(Node node) {
 688         /*
 689          * If status is negative (i.e., possibly needing signal) try
 690          * to clear in anticipation of signalling.  It is OK if this
 691          * fails or if status is changed by waiting thread.
 692          */
 693         int ws = node.waitStatus;
 694         if (ws < 0)
 695             node.compareAndSetWaitStatus(ws, 0);
 696 
 697         /*
 698          * Thread to unpark is held in successor, which is normally
 699          * just the next node.  But if cancelled or apparently null,
 700          * traverse backwards from tail to find the actual
 701          * non-cancelled successor.
 702          */
 703         Node s = node.next;
 704         if (s == null || s.waitStatus > 0) {
 705             s = null;
 706             for (Node p = tail; p != node && p != null; p = p.prev)
 707                 if (p.waitStatus <= 0)
 708                     s = p;
 709         }
 710         if (s != null)
 711             LockSupport.unpark(s.strand);
 712     }
 713 
 714     /**
 715      * Release action for shared mode -- signals successor and ensures
 716      * propagation. (Note: For exclusive mode, release just amounts
 717      * to calling unparkSuccessor of head if it needs signal.)
 718      */
 719     private void doReleaseShared() {
 720         /*
 721          * Ensure that a release propagates, even if there are other
 722          * in-progress acquires/releases.  This proceeds in the usual
 723          * way of trying to unparkSuccessor of head if it needs
 724          * signal. But if it does not, status is set to PROPAGATE to
 725          * ensure that upon release, propagation continues.
 726          * Additionally, we must loop in case a new node is added
 727          * while we are doing this. Also, unlike other uses of
 728          * unparkSuccessor, we need to know if CAS to reset status
 729          * fails, if so rechecking.
 730          */
 731         for (;;) {


 776         if (propagate > 0 || h == null || h.waitStatus < 0 ||
 777             (h = head) == null || h.waitStatus < 0) {
 778             Node s = node.next;
 779             if (s == null || s.isShared())
 780                 doReleaseShared();
 781         }
 782     }
 783 
 784     // Utilities for various versions of acquire
 785 
 786     /**
 787      * Cancels an ongoing attempt to acquire.
 788      *
 789      * @param node the node
 790      */
 791     private void cancelAcquire(Node node) {
 792         // Ignore if node doesn't exist
 793         if (node == null)
 794             return;
 795 
 796         node.strand = null;
 797 
 798         // Skip cancelled predecessors
 799         Node pred = node.prev;
 800         while (pred.waitStatus > 0)
 801             node.prev = pred = pred.prev;
 802 
 803         // predNext is the apparent node to unsplice. CASes below will
 804         // fail if not, in which case, we lost race vs another cancel
 805         // or signal, so no further action is necessary, although with
 806         // a possibility that a cancelled node may transiently remain
 807         // reachable.
 808         Node predNext = pred.next;
 809 
 810         // Can use unconditional write instead of CAS here.
 811         // After this atomic step, other Nodes can skip past us.
 812         // Before, we are free of interference from other threads.
 813         node.waitStatus = Node.CANCELLED;
 814 
 815         // If we are the tail, remove ourselves.
 816         if (node == tail && compareAndSetTail(node, pred)) {
 817             pred.compareAndSetNext(predNext, null);
 818         } else {
 819             // If successor needs signal, try to set pred's next-link
 820             // so it will get one. Otherwise wake it up to propagate.
 821             int ws;
 822             if (pred != head &&
 823                 ((ws = pred.waitStatus) == Node.SIGNAL ||
 824                  (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
 825                 pred.strand != null) {
 826                 Node next = node.next;
 827                 if (next != null && next.waitStatus <= 0)
 828                     pred.compareAndSetNext(predNext, next);
 829             } else {
 830                 unparkSuccessor(node);
 831             }
 832 
 833             node.next = node; // help GC
 834         }
 835     }
 836 
 837     /**
 838      * Checks and updates status for a node that failed to acquire.
 839      * Returns true if thread should block. This is the main signal
 840      * control in all acquire loops.  Requires that pred == node.prev.
 841      *
 842      * @param pred node's predecessor holding status
 843      * @param node the node
 844      * @return {@code true} if thread should block
 845      */


1414      *
1415      * @return {@code true} if there has ever been contention
1416      */
1417     public final boolean hasContended() {
1418         return head != null;
1419     }
1420 
1421     /**
1422      * Returns the first (longest-waiting) thread in the queue, or
1423      * {@code null} if no threads are currently queued.
1424      *
1425      * <p>In this implementation, this operation normally returns in
1426      * constant time, but may iterate upon contention if other threads are
1427      * concurrently modifying the queue.
1428      *
1429      * @return the first (longest-waiting) thread in the queue, or
1430      *         {@code null} if no threads are currently queued
1431      */
1432     public final Thread getFirstQueuedThread() {
1433         // handle only fast path, else relay
1434         // FIXME: may throw ClassCastException
1435         return (head == tail) ? null : (Thread) fullGetFirstQueuedStrand();
1436     }
1437 
1438     /**
1439      * Version of getFirstQueuedStrand called when fastpath fails.
1440      */
1441     private Object fullGetFirstQueuedStrand() {
1442         /*
1443          * The first node is normally head.next. Try to get its
1444          * thread field, ensuring consistent reads: If thread
1445          * field is nulled out or s.prev is no longer head, then
1446          * some other thread(s) concurrently performed setHead in
1447          * between some of our reads. We try this twice before
1448          * resorting to traversal.
1449          */
1450         Node h, s;
1451         Object st;
1452         if (((h = head) != null && (s = h.next) != null &&
1453              s.prev == head && (st = s.strand) != null) ||
1454             ((h = head) != null && (s = h.next) != null &&
1455              s.prev == head && (st = s.strand) != null))
1456             return st;
1457 
1458         /*
1459          * Head's next field might not have been set yet, or may have
1460          * been unset after setHead. So we must check to see if tail
1461          * is actually first node. If not, we continue on, safely
1462          * traversing from tail back to head to find first,
1463          * guaranteeing termination.
1464          */
1465 
1466         Object firstStrand= null;
1467         for (Node p = tail; p != null && p != head; p = p.prev) {
1468             Object strand = p.strand;
1469             if (strand != null)
1470                 firstStrand = strand;
1471         }
1472         return firstStrand;
1473     }
1474 
1475     /**
1476      * Returns true if the given thread is currently queued.
1477      *
1478      * <p>This implementation traverses the queue to determine
1479      * presence of the given thread.
1480      *
1481      * @param thread the thread
1482      * @return {@code true} if the given thread is on the queue
1483      * @throws NullPointerException if the thread is null
1484      */
1485     public final boolean isQueued(Thread thread) {
1486         if (thread == null)
1487             throw new NullPointerException();
1488         for (Node p = tail; p != null; p = p.prev)
1489             if (p.strand == thread)
1490                 return true;
1491         return false;
1492     }
1493 
1494     /**
1495      * Returns {@code true} if the apparent first queued thread, if one
1496      * exists, is waiting in exclusive mode.  If this method returns
1497      * {@code true}, and the current thread is attempting to acquire in
1498      * shared mode (that is, this method is invoked from {@link
1499      * #tryAcquireShared}) then it is guaranteed that the current thread
1500      * is not the first queued thread.  Used only as a heuristic in
1501      * ReentrantReadWriteLock.
1502      */
1503     final boolean apparentlyFirstQueuedIsExclusive() {
1504         Node h, s;
1505         return (h = head) != null &&
1506             (s = h.next)  != null &&
1507             !s.isShared()         &&
1508             s.strand != null;
1509     }
1510 
1511     /**
1512      * Queries whether any threads have been waiting to acquire longer
1513      * than the current thread.
1514      *
1515      * <p>An invocation of this method is equivalent to (but may be
1516      * more efficient than):
1517      * <pre> {@code
1518      * getFirstQueuedThread() != Thread.currentThread()
1519      *   && hasQueuedThreads()}</pre>
1520      *
1521      * <p>Note that because cancellations due to interrupts and
1522      * timeouts may occur at any time, a {@code true} return does not
1523      * guarantee that some other thread will acquire before the current
1524      * thread.  Likewise, it is possible for another thread to win a
1525      * race to enqueue after this method has returned {@code false},
1526      * due to the queue being empty.
1527      *
1528      * <p>This method is designed to be used by a fair synchronizer to


1544      *   } else {
1545      *     // try to acquire normally
1546      *   }
1547      * }}</pre>
1548      *
1549      * @return {@code true} if there is a queued thread preceding the
1550      *         current thread, and {@code false} if the current thread
1551      *         is at the head of the queue or the queue is empty
1552      * @since 1.7
1553      */
1554     public final boolean hasQueuedPredecessors() {
1555         Node h, s;
1556         if ((h = head) != null) {
1557             if ((s = h.next) == null || s.waitStatus > 0) {
1558                 s = null; // traverse in case of concurrent cancellation
1559                 for (Node p = tail; p != h && p != null; p = p.prev) {
1560                     if (p.waitStatus <= 0)
1561                         s = p;
1562                 }
1563             }
1564             if (s != null && s.strand != Strands.currentStrand())
1565                 return true;
1566         }
1567         return false;
1568     }
1569 
1570     // Instrumentation and monitoring methods
1571 
1572     /**
1573      * Returns an estimate of the number of threads waiting to
1574      * acquire.  The value is only an estimate because the number of
1575      * threads may change dynamically while this method traverses
1576      * internal data structures.  This method is designed for use in
1577      * monitoring system state, not for synchronization control.
1578      *
1579      * @return the estimated number of threads waiting to acquire
1580      */
1581     public final int getQueueLength() {
1582         int n = 0;
1583         for (Node p = tail; p != null; p = p.prev) {
1584             if (p.strand != null)
1585                 ++n;
1586         }
1587         return n;
1588     }
1589 
1590     /**
1591      * Returns a collection containing threads that may be waiting to
1592      * acquire.  Because the actual set of threads may change
1593      * dynamically while constructing this result, the returned
1594      * collection is only a best-effort estimate.  The elements of the
1595      * returned collection are in no particular order.  This method is
1596      * designed to facilitate construction of subclasses that provide
1597      * more extensive monitoring facilities.
1598      *
1599      * @return the collection of threads
1600      */
1601     public final Collection<Thread> getQueuedThreads() {
1602         ArrayList<Thread> list = new ArrayList<>();
1603         for (Node p = tail; p != null; p = p.prev) {
1604             Object s = p.strand;
1605             if (s instanceof Thread)
1606                 list.add((Thread)s);
1607         }
1608         return list;
1609     }
1610 
1611     /**
1612      * Returns a collection containing threads that may be waiting to
1613      * acquire in exclusive mode. This has the same properties
1614      * as {@link #getQueuedThreads} except that it only returns
1615      * those threads waiting due to an exclusive acquire.
1616      *
1617      * @return the collection of threads
1618      */
1619     public final Collection<Thread> getExclusiveQueuedThreads() {
1620         ArrayList<Thread> list = new ArrayList<>();
1621         for (Node p = tail; p != null; p = p.prev) {
1622             if (!p.isShared()) {
1623                 Object s = p.strand;
1624                 if (s instanceof Thread)
1625                     list.add((Thread)s);
1626             }
1627         }
1628         return list;
1629     }
1630 
1631     /**
1632      * Returns a collection containing threads that may be waiting to
1633      * acquire in shared mode. This has the same properties
1634      * as {@link #getQueuedThreads} except that it only returns
1635      * those threads waiting due to a shared acquire.
1636      *
1637      * @return the collection of threads
1638      */
1639     public final Collection<Thread> getSharedQueuedThreads() {
1640         ArrayList<Thread> list = new ArrayList<>();
1641         for (Node p = tail; p != null; p = p.prev) {
1642             if (p.isShared()) {
1643                 Object s = p.strand;
1644                 if (s instanceof Thread)
1645                     list.add((Thread)s);
1646             }
1647         }
1648         return list;
1649     }
1650 
1651     /**
1652      * Returns a string identifying this synchronizer, as well as its state.
1653      * The state, in brackets, includes the String {@code "State ="}
1654      * followed by the current value of {@link #getState}, and either
1655      * {@code "nonempty"} or {@code "empty"} depending on whether the
1656      * queue is empty.
1657      *
1658      * @return a string identifying this synchronizer, as well as its state
1659      */
1660     public String toString() {
1661         return super.toString()
1662             + "[State = " + getState() + ", "
1663             + (hasQueuedThreads() ? "non" : "") + "empty queue]";
1664     }
1665 


1712      * @param node the node
1713      * @return true if successfully transferred (else the node was
1714      * cancelled before signal)
1715      */
1716     final boolean transferForSignal(Node node) {
1717         /*
1718          * If cannot change waitStatus, the node has been cancelled.
1719          */
1720         if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
1721             return false;
1722 
1723         /*
1724          * Splice onto queue and try to set waitStatus of predecessor to
1725          * indicate that thread is (probably) waiting. If cancelled or
1726          * attempt to set waitStatus fails, wake up to resync (in which
1727          * case the waitStatus can be transiently and harmlessly wrong).
1728          */
1729         Node p = enq(node);
1730         int ws = p.waitStatus;
1731         if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
1732             LockSupport.unpark(node.strand);
1733         return true;
1734     }
1735 
1736     /**
1737      * Transfers node, if necessary, to sync queue after a cancelled wait.
1738      * Returns true if thread was cancelled before being signalled.
1739      *
1740      * @param node the node
1741      * @return true if cancelled before the node was signalled
1742      */
1743     final boolean transferAfterCancelledWait(Node node) {
1744         if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
1745             enq(node);
1746             return true;
1747         }
1748         /*
1749          * If we lost out to a signal(), then we can't proceed
1750          * until it finishes its enq().  Cancelling during an
1751          * incomplete transfer is both rare and transient, so just
1752          * spin.


2273                     ++n;
2274             }
2275             return n;
2276         }
2277 
2278         /**
2279          * Returns a collection containing those threads that may be
2280          * waiting on this Condition.
2281          * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}.
2282          *
2283          * @return the collection of threads
2284          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
2285          *         returns {@code false}
2286          */
2287         protected final Collection<Thread> getWaitingThreads() {
2288             if (!isHeldExclusively())
2289                 throw new IllegalMonitorStateException();
2290             ArrayList<Thread> list = new ArrayList<>();
2291             for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
2292                 if (w.waitStatus == Node.CONDITION) {
2293                     Object s = w.strand;
2294                     if (s != null)
2295                         list.add((Thread)s);
2296                 }
2297             }
2298             return list;
2299         }
2300     }
2301 
2302     // VarHandle mechanics
2303     private static final VarHandle STATE;
2304     private static final VarHandle HEAD;
2305     private static final VarHandle TAIL;
2306 
2307     static {
2308         try {
2309             MethodHandles.Lookup l = MethodHandles.lookup();
2310             STATE = l.findVarHandle(AbstractQueuedSynchronizer.class, "state", int.class);
2311             HEAD = l.findVarHandle(AbstractQueuedSynchronizer.class, "head", Node.class);
2312             TAIL = l.findVarHandle(AbstractQueuedSynchronizer.class, "tail", Node.class);
2313         } catch (ReflectiveOperationException e) {
2314             throw new ExceptionInInitializerError(e);
2315         }


< prev index next >