< prev index next >

src/java.base/share/classes/java/util/concurrent/locks/AbstractQueuedLongSynchronizer.java

Print this page




  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent.locks;
  37 
  38 import java.lang.invoke.MethodHandles;
  39 import java.lang.invoke.VarHandle;
  40 import java.util.ArrayList;
  41 import java.util.Collection;
  42 import java.util.Date;
  43 import java.util.concurrent.TimeUnit;
  44 import java.util.concurrent.locks.AbstractQueuedSynchronizer.Node;
  45 


  46 /**
  47  * A version of {@link AbstractQueuedSynchronizer} in
  48  * which synchronization state is maintained as a {@code long}.
  49  * This class has exactly the same structure, properties, and methods
  50  * as {@code AbstractQueuedSynchronizer} with the exception
  51  * that all state-related parameters and results are defined
  52  * as {@code long} rather than {@code int}. This class
  53  * may be useful when creating synchronizers such as
  54  * multilevel locks and barriers that require
  55  * 64 bits of state.
  56  *
  57  * <p>See {@link AbstractQueuedSynchronizer} for usage
  58  * notes and examples.
  59  *
  60  * @since 1.6
  61  * @author Doug Lea
  62  */
  63 public abstract class AbstractQueuedLongSynchronizer
  64     extends AbstractOwnableSynchronizer
  65     implements java.io.Serializable {


 176                 node.setPrevRelaxed(oldTail);
 177                 if (compareAndSetTail(oldTail, node)) {
 178                     oldTail.next = node;
 179                     return node;
 180                 }
 181             } else {
 182                 initializeSyncQueue();
 183             }
 184         }
 185     }
 186 
 187     /**
 188      * Sets head of queue to be node, thus dequeuing. Called only by
 189      * acquire methods.  Also nulls out unused fields for sake of GC
 190      * and to suppress unnecessary signals and traversals.
 191      *
 192      * @param node the node
 193      */
 194     private void setHead(Node node) {
 195         head = node;
 196         node.thread = null;
 197         node.prev = null;
 198     }
 199 
 200     /**
 201      * Wakes up node's successor, if one exists.
 202      *
 203      * @param node the node
 204      */
 205     private void unparkSuccessor(Node node) {
 206         /*
 207          * If status is negative (i.e., possibly needing signal) try
 208          * to clear in anticipation of signalling.  It is OK if this
 209          * fails or if status is changed by waiting thread.
 210          */
 211         int ws = node.waitStatus;
 212         if (ws < 0)
 213             node.compareAndSetWaitStatus(ws, 0);
 214 
 215         /*
 216          * Thread to unpark is held in successor, which is normally
 217          * just the next node.  But if cancelled or apparently null,
 218          * traverse backwards from tail to find the actual
 219          * non-cancelled successor.
 220          */
 221         Node s = node.next;
 222         if (s == null || s.waitStatus > 0) {
 223             s = null;
 224             for (Node p = tail; p != node && p != null; p = p.prev)
 225                 if (p.waitStatus <= 0)
 226                     s = p;
 227         }
 228         if (s != null)
 229             LockSupport.unpark(s.thread);
 230     }
 231 
 232     /**
 233      * Release action for shared mode -- signals successor and ensures
 234      * propagation. (Note: For exclusive mode, release just amounts
 235      * to calling unparkSuccessor of head if it needs signal.)
 236      */
 237     private void doReleaseShared() {
 238         /*
 239          * Ensure that a release propagates, even if there are other
 240          * in-progress acquires/releases.  This proceeds in the usual
 241          * way of trying to unparkSuccessor of head if it needs
 242          * signal. But if it does not, status is set to PROPAGATE to
 243          * ensure that upon release, propagation continues.
 244          * Additionally, we must loop in case a new node is added
 245          * while we are doing this. Also, unlike other uses of
 246          * unparkSuccessor, we need to know if CAS to reset status
 247          * fails, if so rechecking.
 248          */
 249         for (;;) {


 294         if (propagate > 0 || h == null || h.waitStatus < 0 ||
 295             (h = head) == null || h.waitStatus < 0) {
 296             Node s = node.next;
 297             if (s == null || s.isShared())
 298                 doReleaseShared();
 299         }
 300     }
 301 
 302     // Utilities for various versions of acquire
 303 
 304     /**
 305      * Cancels an ongoing attempt to acquire.
 306      *
 307      * @param node the node
 308      */
 309     private void cancelAcquire(Node node) {
 310         // Ignore if node doesn't exist
 311         if (node == null)
 312             return;
 313 
 314         node.thread = null;
 315 
 316         // Skip cancelled predecessors
 317         Node pred = node.prev;
 318         while (pred.waitStatus > 0)
 319             node.prev = pred = pred.prev;
 320 
 321         // predNext is the apparent node to unsplice. CASes below will
 322         // fail if not, in which case, we lost race vs another cancel
 323         // or signal, so no further action is necessary, although with
 324         // a possibility that a cancelled node may transiently remain
 325         // reachable.
 326         Node predNext = pred.next;
 327 
 328         // Can use unconditional write instead of CAS here.
 329         // After this atomic step, other Nodes can skip past us.
 330         // Before, we are free of interference from other threads.
 331         node.waitStatus = Node.CANCELLED;
 332 
 333         // If we are the tail, remove ourselves.
 334         if (node == tail && compareAndSetTail(node, pred)) {
 335             pred.compareAndSetNext(predNext, null);
 336         } else {
 337             // If successor needs signal, try to set pred's next-link
 338             // so it will get one. Otherwise wake it up to propagate.
 339             int ws;
 340             if (pred != head &&
 341                 ((ws = pred.waitStatus) == Node.SIGNAL ||
 342                  (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
 343                 pred.thread != null) {
 344                 Node next = node.next;
 345                 if (next != null && next.waitStatus <= 0)
 346                     pred.compareAndSetNext(predNext, next);
 347             } else {
 348                 unparkSuccessor(node);
 349             }
 350 
 351             node.next = node; // help GC
 352         }
 353     }
 354 
 355     /**
 356      * Checks and updates status for a node that failed to acquire.
 357      * Returns true if thread should block. This is the main signal
 358      * control in all acquire loops.  Requires that pred == node.prev.
 359      *
 360      * @param pred node's predecessor holding status
 361      * @param node the node
 362      * @return {@code true} if thread should block
 363      */


 932      *
 933      * @return {@code true} if there has ever been contention
 934      */
 935     public final boolean hasContended() {
 936         return head != null;
 937     }
 938 
 939     /**
 940      * Returns the first (longest-waiting) thread in the queue, or
 941      * {@code null} if no threads are currently queued.
 942      *
 943      * <p>In this implementation, this operation normally returns in
 944      * constant time, but may iterate upon contention if other threads are
 945      * concurrently modifying the queue.
 946      *
 947      * @return the first (longest-waiting) thread in the queue, or
 948      *         {@code null} if no threads are currently queued
 949      */
 950     public final Thread getFirstQueuedThread() {
 951         // handle only fast path, else relay
 952         return (head == tail) ? null : fullGetFirstQueuedThread();
 953     }
 954 
 955     /**
 956      * Version of getFirstQueuedThread called when fastpath fails.
 957      */
 958     private Thread fullGetFirstQueuedThread() {
 959         /*
 960          * The first node is normally head.next. Try to get its
 961          * thread field, ensuring consistent reads: If thread
 962          * field is nulled out or s.prev is no longer head, then
 963          * some other thread(s) concurrently performed setHead in
 964          * between some of our reads. We try this twice before
 965          * resorting to traversal.
 966          */
 967         Node h, s;
 968         Thread st;
 969         if (((h = head) != null && (s = h.next) != null &&
 970              s.prev == head && (st = s.thread) != null) ||
 971             ((h = head) != null && (s = h.next) != null &&
 972              s.prev == head && (st = s.thread) != null))
 973             return st;
 974 
 975         /*
 976          * Head's next field might not have been set yet, or may have
 977          * been unset after setHead. So we must check to see if tail
 978          * is actually first node. If not, we continue on, safely
 979          * traversing from tail back to head to find first,
 980          * guaranteeing termination.
 981          */
 982 
 983         Thread firstThread = null;
 984         for (Node p = tail; p != null && p != head; p = p.prev) {
 985             Thread t = p.thread;
 986             if (t != null)
 987                 firstThread = t;
 988         }
 989         return firstThread;
 990     }
 991 
 992     /**
 993      * Returns true if the given thread is currently queued.
 994      *
 995      * <p>This implementation traverses the queue to determine
 996      * presence of the given thread.
 997      *
 998      * @param thread the thread
 999      * @return {@code true} if the given thread is on the queue
1000      * @throws NullPointerException if the thread is null
1001      */
1002     public final boolean isQueued(Thread thread) {
1003         if (thread == null)
1004             throw new NullPointerException();
1005         for (Node p = tail; p != null; p = p.prev)
1006             if (p.thread == thread)
1007                 return true;
1008         return false;
1009     }
1010 
1011     /**
1012      * Returns {@code true} if the apparent first queued thread, if one
1013      * exists, is waiting in exclusive mode.  If this method returns
1014      * {@code true}, and the current thread is attempting to acquire in
1015      * shared mode (that is, this method is invoked from {@link
1016      * #tryAcquireShared}) then it is guaranteed that the current thread
1017      * is not the first queued thread.  Used only as a heuristic in
1018      * ReentrantReadWriteLock.
1019      */
1020     final boolean apparentlyFirstQueuedIsExclusive() {
1021         Node h, s;
1022         return (h = head) != null &&
1023             (s = h.next)  != null &&
1024             !s.isShared()         &&
1025             s.thread != null;
1026     }
1027 
1028     /**
1029      * Queries whether any threads have been waiting to acquire longer
1030      * than the current thread.
1031      *
1032      * <p>An invocation of this method is equivalent to (but may be
1033      * more efficient than):
1034      * <pre> {@code
1035      * getFirstQueuedThread() != Thread.currentThread()
1036      *   && hasQueuedThreads()}</pre>
1037      *
1038      * <p>Note that because cancellations due to interrupts and
1039      * timeouts may occur at any time, a {@code true} return does not
1040      * guarantee that some other thread will acquire before the current
1041      * thread.  Likewise, it is possible for another thread to win a
1042      * race to enqueue after this method has returned {@code false},
1043      * due to the queue being empty.
1044      *
1045      * <p>This method is designed to be used by a fair synchronizer to


1061      *   } else {
1062      *     // try to acquire normally
1063      *   }
1064      * }}</pre>
1065      *
1066      * @return {@code true} if there is a queued thread preceding the
1067      *         current thread, and {@code false} if the current thread
1068      *         is at the head of the queue or the queue is empty
1069      * @since 1.7
1070      */
1071     public final boolean hasQueuedPredecessors() {
1072         Node h, s;
1073         if ((h = head) != null) {
1074             if ((s = h.next) == null || s.waitStatus > 0) {
1075                 s = null; // traverse in case of concurrent cancellation
1076                 for (Node p = tail; p != h && p != null; p = p.prev) {
1077                     if (p.waitStatus <= 0)
1078                         s = p;
1079                 }
1080             }
1081             if (s != null && s.thread != Thread.currentThread())
1082                 return true;
1083         }
1084         return false;
1085     }
1086 
1087     // Instrumentation and monitoring methods
1088 
1089     /**
1090      * Returns an estimate of the number of threads waiting to
1091      * acquire.  The value is only an estimate because the number of
1092      * threads may change dynamically while this method traverses
1093      * internal data structures.  This method is designed for use in
1094      * monitoring system state, not for synchronization control.
1095      *
1096      * @return the estimated number of threads waiting to acquire
1097      */
1098     public final int getQueueLength() {
1099         int n = 0;
1100         for (Node p = tail; p != null; p = p.prev) {
1101             if (p.thread != null)
1102                 ++n;
1103         }
1104         return n;
1105     }
1106 
1107     /**
1108      * Returns a collection containing threads that may be waiting to
1109      * acquire.  Because the actual set of threads may change
1110      * dynamically while constructing this result, the returned
1111      * collection is only a best-effort estimate.  The elements of the
1112      * returned collection are in no particular order.  This method is
1113      * designed to facilitate construction of subclasses that provide
1114      * more extensive monitoring facilities.
1115      *
1116      * @return the collection of threads
1117      */
1118     public final Collection<Thread> getQueuedThreads() {
1119         ArrayList<Thread> list = new ArrayList<>();
1120         for (Node p = tail; p != null; p = p.prev) {
1121             Thread t = p.thread;
1122             if (t != null)
1123                 list.add(t);
1124         }
1125         return list;
1126     }
1127 
1128     /**
1129      * Returns a collection containing threads that may be waiting to
1130      * acquire in exclusive mode. This has the same properties
1131      * as {@link #getQueuedThreads} except that it only returns
1132      * those threads waiting due to an exclusive acquire.
1133      *
1134      * @return the collection of threads
1135      */
1136     public final Collection<Thread> getExclusiveQueuedThreads() {
1137         ArrayList<Thread> list = new ArrayList<>();
1138         for (Node p = tail; p != null; p = p.prev) {
1139             if (!p.isShared()) {
1140                 Thread t = p.thread;
1141                 if (t != null)
1142                     list.add(t);
1143             }
1144         }
1145         return list;
1146     }
1147 
1148     /**
1149      * Returns a collection containing threads that may be waiting to
1150      * acquire in shared mode. This has the same properties
1151      * as {@link #getQueuedThreads} except that it only returns
1152      * those threads waiting due to a shared acquire.
1153      *
1154      * @return the collection of threads
1155      */
1156     public final Collection<Thread> getSharedQueuedThreads() {
1157         ArrayList<Thread> list = new ArrayList<>();
1158         for (Node p = tail; p != null; p = p.prev) {
1159             if (p.isShared()) {
1160                 Thread t = p.thread;
1161                 if (t != null)
1162                     list.add(t);
1163             }
1164         }
1165         return list;
1166     }
1167 
1168     /**
1169      * Returns a string identifying this synchronizer, as well as its state.
1170      * The state, in brackets, includes the String {@code "State ="}
1171      * followed by the current value of {@link #getState}, and either
1172      * {@code "nonempty"} or {@code "empty"} depending on whether the
1173      * queue is empty.
1174      *
1175      * @return a string identifying this synchronizer, as well as its state
1176      */
1177     public String toString() {
1178         return super.toString()
1179             + "[State = " + getState() + ", "
1180             + (hasQueuedThreads() ? "non" : "") + "empty queue]";
1181     }
1182 


1229      * @param node the node
1230      * @return true if successfully transferred (else the node was
1231      * cancelled before signal)
1232      */
1233     final boolean transferForSignal(Node node) {
1234         /*
1235          * If cannot change waitStatus, the node has been cancelled.
1236          */
1237         if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
1238             return false;
1239 
1240         /*
1241          * Splice onto queue and try to set waitStatus of predecessor to
1242          * indicate that thread is (probably) waiting. If cancelled or
1243          * attempt to set waitStatus fails, wake up to resync (in which
1244          * case the waitStatus can be transiently and harmlessly wrong).
1245          */
1246         Node p = enq(node);
1247         int ws = p.waitStatus;
1248         if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
1249             LockSupport.unpark(node.thread);
1250         return true;
1251     }
1252 
1253     /**
1254      * Transfers node, if necessary, to sync queue after a cancelled wait.
1255      * Returns true if thread was cancelled before being signalled.
1256      *
1257      * @param node the node
1258      * @return true if cancelled before the node was signalled
1259      */
1260     final boolean transferAfterCancelledWait(Node node) {
1261         if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
1262             enq(node);
1263             return true;
1264         }
1265         /*
1266          * If we lost out to a signal(), then we can't proceed
1267          * until it finishes its enq().  Cancelling during an
1268          * incomplete transfer is both rare and transient, so just
1269          * spin.


1792                     ++n;
1793             }
1794             return n;
1795         }
1796 
1797         /**
1798          * Returns a collection containing those threads that may be
1799          * waiting on this Condition.
1800          * Implements {@link AbstractQueuedLongSynchronizer#getWaitingThreads(ConditionObject)}.
1801          *
1802          * @return the collection of threads
1803          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1804          *         returns {@code false}
1805          */
1806         protected final Collection<Thread> getWaitingThreads() {
1807             if (!isHeldExclusively())
1808                 throw new IllegalMonitorStateException();
1809             ArrayList<Thread> list = new ArrayList<>();
1810             for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
1811                 if (w.waitStatus == Node.CONDITION) {
1812                     Thread t = w.thread;
1813                     if (t != null)
1814                         list.add(t);
1815                 }
1816             }
1817             return list;
1818         }
1819     }
1820 
1821     // VarHandle mechanics
1822     private static final VarHandle STATE;
1823     private static final VarHandle HEAD;
1824     private static final VarHandle TAIL;
1825 
1826     static {
1827         try {
1828             MethodHandles.Lookup l = MethodHandles.lookup();
1829             STATE = l.findVarHandle(AbstractQueuedLongSynchronizer.class, "state", long.class);
1830             HEAD = l.findVarHandle(AbstractQueuedLongSynchronizer.class, "head", Node.class);
1831             TAIL = l.findVarHandle(AbstractQueuedLongSynchronizer.class, "tail", Node.class);
1832         } catch (ReflectiveOperationException e) {
1833             throw new ExceptionInInitializerError(e);
1834         }




  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent.locks;
  37 
  38 import java.lang.invoke.MethodHandles;
  39 import java.lang.invoke.VarHandle;
  40 import java.util.ArrayList;
  41 import java.util.Collection;
  42 import java.util.Date;
  43 import java.util.concurrent.TimeUnit;
  44 import java.util.concurrent.locks.AbstractQueuedSynchronizer.Node;
  45 
  46 import jdk.internal.misc.Strands;
  47 
  48 /**
  49  * A version of {@link AbstractQueuedSynchronizer} in
  50  * which synchronization state is maintained as a {@code long}.
  51  * This class has exactly the same structure, properties, and methods
  52  * as {@code AbstractQueuedSynchronizer} with the exception
  53  * that all state-related parameters and results are defined
  54  * as {@code long} rather than {@code int}. This class
  55  * may be useful when creating synchronizers such as
  56  * multilevel locks and barriers that require
  57  * 64 bits of state.
  58  *
  59  * <p>See {@link AbstractQueuedSynchronizer} for usage
  60  * notes and examples.
  61  *
  62  * @since 1.6
  63  * @author Doug Lea
  64  */
  65 public abstract class AbstractQueuedLongSynchronizer
  66     extends AbstractOwnableSynchronizer
  67     implements java.io.Serializable {


 178                 node.setPrevRelaxed(oldTail);
 179                 if (compareAndSetTail(oldTail, node)) {
 180                     oldTail.next = node;
 181                     return node;
 182                 }
 183             } else {
 184                 initializeSyncQueue();
 185             }
 186         }
 187     }
 188 
 189     /**
 190      * Sets head of queue to be node, thus dequeuing. Called only by
 191      * acquire methods.  Also nulls out unused fields for sake of GC
 192      * and to suppress unnecessary signals and traversals.
 193      *
 194      * @param node the node
 195      */
 196     private void setHead(Node node) {
 197         head = node;
 198         node.strand = null;
 199         node.prev = null;
 200     }
 201 
 202     /**
 203      * Wakes up node's successor, if one exists.
 204      *
 205      * @param node the node
 206      */
 207     private void unparkSuccessor(Node node) {
 208         /*
 209          * If status is negative (i.e., possibly needing signal) try
 210          * to clear in anticipation of signalling.  It is OK if this
 211          * fails or if status is changed by waiting thread.
 212          */
 213         int ws = node.waitStatus;
 214         if (ws < 0)
 215             node.compareAndSetWaitStatus(ws, 0);
 216 
 217         /*
 218          * Thread to unpark is held in successor, which is normally
 219          * just the next node.  But if cancelled or apparently null,
 220          * traverse backwards from tail to find the actual
 221          * non-cancelled successor.
 222          */
 223         Node s = node.next;
 224         if (s == null || s.waitStatus > 0) {
 225             s = null;
 226             for (Node p = tail; p != node && p != null; p = p.prev)
 227                 if (p.waitStatus <= 0)
 228                     s = p;
 229         }
 230         if (s != null)
 231             LockSupport.unpark(s.strand);
 232     }
 233 
 234     /**
 235      * Release action for shared mode -- signals successor and ensures
 236      * propagation. (Note: For exclusive mode, release just amounts
 237      * to calling unparkSuccessor of head if it needs signal.)
 238      */
 239     private void doReleaseShared() {
 240         /*
 241          * Ensure that a release propagates, even if there are other
 242          * in-progress acquires/releases.  This proceeds in the usual
 243          * way of trying to unparkSuccessor of head if it needs
 244          * signal. But if it does not, status is set to PROPAGATE to
 245          * ensure that upon release, propagation continues.
 246          * Additionally, we must loop in case a new node is added
 247          * while we are doing this. Also, unlike other uses of
 248          * unparkSuccessor, we need to know if CAS to reset status
 249          * fails, if so rechecking.
 250          */
 251         for (;;) {


 296         if (propagate > 0 || h == null || h.waitStatus < 0 ||
 297             (h = head) == null || h.waitStatus < 0) {
 298             Node s = node.next;
 299             if (s == null || s.isShared())
 300                 doReleaseShared();
 301         }
 302     }
 303 
 304     // Utilities for various versions of acquire
 305 
 306     /**
 307      * Cancels an ongoing attempt to acquire.
 308      *
 309      * @param node the node
 310      */
 311     private void cancelAcquire(Node node) {
 312         // Ignore if node doesn't exist
 313         if (node == null)
 314             return;
 315 
 316         node.strand = null;
 317 
 318         // Skip cancelled predecessors
 319         Node pred = node.prev;
 320         while (pred.waitStatus > 0)
 321             node.prev = pred = pred.prev;
 322 
 323         // predNext is the apparent node to unsplice. CASes below will
 324         // fail if not, in which case, we lost race vs another cancel
 325         // or signal, so no further action is necessary, although with
 326         // a possibility that a cancelled node may transiently remain
 327         // reachable.
 328         Node predNext = pred.next;
 329 
 330         // Can use unconditional write instead of CAS here.
 331         // After this atomic step, other Nodes can skip past us.
 332         // Before, we are free of interference from other threads.
 333         node.waitStatus = Node.CANCELLED;
 334 
 335         // If we are the tail, remove ourselves.
 336         if (node == tail && compareAndSetTail(node, pred)) {
 337             pred.compareAndSetNext(predNext, null);
 338         } else {
 339             // If successor needs signal, try to set pred's next-link
 340             // so it will get one. Otherwise wake it up to propagate.
 341             int ws;
 342             if (pred != head &&
 343                 ((ws = pred.waitStatus) == Node.SIGNAL ||
 344                  (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
 345                 pred.strand != null) {
 346                 Node next = node.next;
 347                 if (next != null && next.waitStatus <= 0)
 348                     pred.compareAndSetNext(predNext, next);
 349             } else {
 350                 unparkSuccessor(node);
 351             }
 352 
 353             node.next = node; // help GC
 354         }
 355     }
 356 
 357     /**
 358      * Checks and updates status for a node that failed to acquire.
 359      * Returns true if thread should block. This is the main signal
 360      * control in all acquire loops.  Requires that pred == node.prev.
 361      *
 362      * @param pred node's predecessor holding status
 363      * @param node the node
 364      * @return {@code true} if thread should block
 365      */


 934      *
 935      * @return {@code true} if there has ever been contention
 936      */
 937     public final boolean hasContended() {
 938         return head != null;
 939     }
 940 
 941     /**
 942      * Returns the first (longest-waiting) thread in the queue, or
 943      * {@code null} if no threads are currently queued.
 944      *
 945      * <p>In this implementation, this operation normally returns in
 946      * constant time, but may iterate upon contention if other threads are
 947      * concurrently modifying the queue.
 948      *
 949      * @return the first (longest-waiting) thread in the queue, or
 950      *         {@code null} if no threads are currently queued
 951      */
 952     public final Thread getFirstQueuedThread() {
 953         // handle only fast path, else relay
 954         return (head == tail) ? null : (Thread) fullGetFirstQueuedStrand();
 955     }
 956 
 957     /**
 958      * Version of getFirstQueuedStrand called when fastpath fails.
 959      */
 960     private Object fullGetFirstQueuedStrand() {
 961         /*
 962          * The first node is normally head.next. Try to get its
 963          * thread field, ensuring consistent reads: If thread
 964          * field is nulled out or s.prev is no longer head, then
 965          * some other thread(s) concurrently performed setHead in
 966          * between some of our reads. We try this twice before
 967          * resorting to traversal.
 968          */
 969         Node h, s;
 970         Object st;
 971         if (((h = head) != null && (s = h.next) != null &&
 972              s.prev == head && (st = s.strand) != null) ||
 973             ((h = head) != null && (s = h.next) != null &&
 974              s.prev == head && (st = s.strand) != null))
 975             return st;
 976 
 977         /*
 978          * Head's next field might not have been set yet, or may have
 979          * been unset after setHead. So we must check to see if tail
 980          * is actually first node. If not, we continue on, safely
 981          * traversing from tail back to head to find first,
 982          * guaranteeing termination.
 983          */
 984 
 985         Object firstStrand = null;
 986         for (Node p = tail; p != null && p != head; p = p.prev) {
 987             Object strand = p.strand;
 988             if (strand != null)
 989                 firstStrand = strand;
 990         }
 991         return firstStrand;
 992     }
 993 
 994     /**
 995      * Returns true if the given thread is currently queued.
 996      *
 997      * <p>This implementation traverses the queue to determine
 998      * presence of the given thread.
 999      *
1000      * @param thread the thread
1001      * @return {@code true} if the given thread is on the queue
1002      * @throws NullPointerException if the thread is null
1003      */
1004     public final boolean isQueued(Thread thread) {
1005         if (thread == null)
1006             throw new NullPointerException();
1007         for (Node p = tail; p != null; p = p.prev)
1008             if (p.strand == thread)
1009                 return true;
1010         return false;
1011     }
1012 
1013     /**
1014      * Returns {@code true} if the apparent first queued thread, if one
1015      * exists, is waiting in exclusive mode.  If this method returns
1016      * {@code true}, and the current thread is attempting to acquire in
1017      * shared mode (that is, this method is invoked from {@link
1018      * #tryAcquireShared}) then it is guaranteed that the current thread
1019      * is not the first queued thread.  Used only as a heuristic in
1020      * ReentrantReadWriteLock.
1021      */
1022     final boolean apparentlyFirstQueuedIsExclusive() {
1023         Node h, s;
1024         return (h = head) != null &&
1025             (s = h.next)  != null &&
1026             !s.isShared()         &&
1027             s.strand != null;
1028     }
1029 
1030     /**
1031      * Queries whether any threads have been waiting to acquire longer
1032      * than the current thread.
1033      *
1034      * <p>An invocation of this method is equivalent to (but may be
1035      * more efficient than):
1036      * <pre> {@code
1037      * getFirstQueuedThread() != Thread.currentThread()
1038      *   && hasQueuedThreads()}</pre>
1039      *
1040      * <p>Note that because cancellations due to interrupts and
1041      * timeouts may occur at any time, a {@code true} return does not
1042      * guarantee that some other thread will acquire before the current
1043      * thread.  Likewise, it is possible for another thread to win a
1044      * race to enqueue after this method has returned {@code false},
1045      * due to the queue being empty.
1046      *
1047      * <p>This method is designed to be used by a fair synchronizer to


1063      *   } else {
1064      *     // try to acquire normally
1065      *   }
1066      * }}</pre>
1067      *
1068      * @return {@code true} if there is a queued thread preceding the
1069      *         current thread, and {@code false} if the current thread
1070      *         is at the head of the queue or the queue is empty
1071      * @since 1.7
1072      */
1073     public final boolean hasQueuedPredecessors() {
1074         Node h, s;
1075         if ((h = head) != null) {
1076             if ((s = h.next) == null || s.waitStatus > 0) {
1077                 s = null; // traverse in case of concurrent cancellation
1078                 for (Node p = tail; p != h && p != null; p = p.prev) {
1079                     if (p.waitStatus <= 0)
1080                         s = p;
1081                 }
1082             }
1083             if (s != null && s.strand != Strands.currentStrand())
1084                 return true;
1085         }
1086         return false;
1087     }
1088 
1089     // Instrumentation and monitoring methods
1090 
1091     /**
1092      * Returns an estimate of the number of threads waiting to
1093      * acquire.  The value is only an estimate because the number of
1094      * threads may change dynamically while this method traverses
1095      * internal data structures.  This method is designed for use in
1096      * monitoring system state, not for synchronization control.
1097      *
1098      * @return the estimated number of threads waiting to acquire
1099      */
1100     public final int getQueueLength() {
1101         int n = 0;
1102         for (Node p = tail; p != null; p = p.prev) {
1103             if (p.strand != null)
1104                 ++n;
1105         }
1106         return n;
1107     }
1108 
1109     /**
1110      * Returns a collection containing threads that may be waiting to
1111      * acquire.  Because the actual set of threads may change
1112      * dynamically while constructing this result, the returned
1113      * collection is only a best-effort estimate.  The elements of the
1114      * returned collection are in no particular order.  This method is
1115      * designed to facilitate construction of subclasses that provide
1116      * more extensive monitoring facilities.
1117      *
1118      * @return the collection of threads
1119      */
1120     public final Collection<Thread> getQueuedThreads() {
1121         ArrayList<Thread> list = new ArrayList<>();
1122         for (Node p = tail; p != null; p = p.prev) {
1123             Object s = p.strand;
1124             if (s instanceof Thread)
1125                 list.add((Thread)s);
1126         }
1127         return list;
1128     }
1129 
1130     /**
1131      * Returns a collection containing threads that may be waiting to
1132      * acquire in exclusive mode. This has the same properties
1133      * as {@link #getQueuedThreads} except that it only returns
1134      * those threads waiting due to an exclusive acquire.
1135      *
1136      * @return the collection of threads
1137      */
1138     public final Collection<Thread> getExclusiveQueuedThreads() {
1139         ArrayList<Thread> list = new ArrayList<>();
1140         for (Node p = tail; p != null; p = p.prev) {
1141             if (!p.isShared()) {
1142                 Object s = p.strand;
1143                 if (s instanceof Thread)
1144                     list.add((Thread)s);
1145             }
1146         }
1147         return list;
1148     }
1149 
1150     /**
1151      * Returns a collection containing threads that may be waiting to
1152      * acquire in shared mode. This has the same properties
1153      * as {@link #getQueuedThreads} except that it only returns
1154      * those threads waiting due to a shared acquire.
1155      *
1156      * @return the collection of threads
1157      */
1158     public final Collection<Thread> getSharedQueuedThreads() {
1159         ArrayList<Thread> list = new ArrayList<>();
1160         for (Node p = tail; p != null; p = p.prev) {
1161             if (p.isShared()) {
1162                 Object s = p.strand;
1163                 if (s instanceof Thread)
1164                     list.add((Thread)s);
1165             }
1166         }
1167         return list;
1168     }
1169 
1170     /**
1171      * Returns a string identifying this synchronizer, as well as its state.
1172      * The state, in brackets, includes the String {@code "State ="}
1173      * followed by the current value of {@link #getState}, and either
1174      * {@code "nonempty"} or {@code "empty"} depending on whether the
1175      * queue is empty.
1176      *
1177      * @return a string identifying this synchronizer, as well as its state
1178      */
1179     public String toString() {
1180         return super.toString()
1181             + "[State = " + getState() + ", "
1182             + (hasQueuedThreads() ? "non" : "") + "empty queue]";
1183     }
1184 


1231      * @param node the node
1232      * @return true if successfully transferred (else the node was
1233      * cancelled before signal)
1234      */
1235     final boolean transferForSignal(Node node) {
1236         /*
1237          * If cannot change waitStatus, the node has been cancelled.
1238          */
1239         if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
1240             return false;
1241 
1242         /*
1243          * Splice onto queue and try to set waitStatus of predecessor to
1244          * indicate that thread is (probably) waiting. If cancelled or
1245          * attempt to set waitStatus fails, wake up to resync (in which
1246          * case the waitStatus can be transiently and harmlessly wrong).
1247          */
1248         Node p = enq(node);
1249         int ws = p.waitStatus;
1250         if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
1251             LockSupport.unpark(node.strand);
1252         return true;
1253     }
1254 
1255     /**
1256      * Transfers node, if necessary, to sync queue after a cancelled wait.
1257      * Returns true if thread was cancelled before being signalled.
1258      *
1259      * @param node the node
1260      * @return true if cancelled before the node was signalled
1261      */
1262     final boolean transferAfterCancelledWait(Node node) {
1263         if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
1264             enq(node);
1265             return true;
1266         }
1267         /*
1268          * If we lost out to a signal(), then we can't proceed
1269          * until it finishes its enq().  Cancelling during an
1270          * incomplete transfer is both rare and transient, so just
1271          * spin.


1794                     ++n;
1795             }
1796             return n;
1797         }
1798 
1799         /**
1800          * Returns a collection containing those threads that may be
1801          * waiting on this Condition.
1802          * Implements {@link AbstractQueuedLongSynchronizer#getWaitingThreads(ConditionObject)}.
1803          *
1804          * @return the collection of threads
1805          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1806          *         returns {@code false}
1807          */
1808         protected final Collection<Thread> getWaitingThreads() {
1809             if (!isHeldExclusively())
1810                 throw new IllegalMonitorStateException();
1811             ArrayList<Thread> list = new ArrayList<>();
1812             for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
1813                 if (w.waitStatus == Node.CONDITION) {
1814                     Object s = w.strand;
1815                     if (s instanceof Thread)
1816                         list.add((Thread)s);
1817                 }
1818             }
1819             return list;
1820         }
1821     }
1822 
1823     // VarHandle mechanics
1824     private static final VarHandle STATE;
1825     private static final VarHandle HEAD;
1826     private static final VarHandle TAIL;
1827 
1828     static {
1829         try {
1830             MethodHandles.Lookup l = MethodHandles.lookup();
1831             STATE = l.findVarHandle(AbstractQueuedLongSynchronizer.class, "state", long.class);
1832             HEAD = l.findVarHandle(AbstractQueuedLongSynchronizer.class, "head", Node.class);
1833             TAIL = l.findVarHandle(AbstractQueuedLongSynchronizer.class, "tail", Node.class);
1834         } catch (ReflectiveOperationException e) {
1835             throw new ExceptionInInitializerError(e);
1836         }


< prev index next >