< prev index next >

src/java.base/share/classes/java/util/concurrent/locks/AbstractQueuedSynchronizer.java

Print this page

        

*** 40,49 **** --- 40,51 ---- import java.util.ArrayList; import java.util.Collection; import java.util.Date; import java.util.concurrent.TimeUnit; + import jdk.internal.misc.Strands; + /** * Provides a framework for implementing blocking locks and related * synchronizers (semaphores, events, etc) that rely on * first-in-first-out (FIFO) wait queues. This class is designed to * be a useful basis for most kinds of synchronizers that rely on a
*** 472,485 **** * easier for isOnSyncQueue. */ volatile Node next; /** ! * The thread that enqueued this node. Initialized on * construction and nulled out after use. */ ! volatile Thread thread; /** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple --- 474,487 ---- * easier for isOnSyncQueue. */ volatile Node next; /** ! * The strand that enqueued this node. Initialized on * construction and nulled out after use. */ ! volatile Object strand; /** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple
*** 517,533 **** Node() {} /** Constructor used by addWaiter. */ Node(Node nextWaiter) { this.nextWaiter = nextWaiter; ! THREAD.set(this, Thread.currentThread()); } /** Constructor used by addConditionWaiter. */ Node(int waitStatus) { WAITSTATUS.set(this, waitStatus); ! THREAD.set(this, Thread.currentThread()); } /** CASes waitStatus field. */ final boolean compareAndSetWaitStatus(int expect, int update) { return WAITSTATUS.compareAndSet(this, expect, update); --- 519,535 ---- Node() {} /** Constructor used by addWaiter. */ Node(Node nextWaiter) { this.nextWaiter = nextWaiter; ! STRAND.set(this, Strands.currentStrand()); } /** Constructor used by addConditionWaiter. */ Node(int waitStatus) { WAITSTATUS.set(this, waitStatus); ! STRAND.set(this, Strands.currentStrand()); } /** CASes waitStatus field. */ final boolean compareAndSetWaitStatus(int expect, int update) { return WAITSTATUS.compareAndSet(this, expect, update);
*** 543,560 **** } // VarHandle mechanics private static final VarHandle NEXT; private static final VarHandle PREV; ! private static final VarHandle THREAD; private static final VarHandle WAITSTATUS; static { try { MethodHandles.Lookup l = MethodHandles.lookup(); NEXT = l.findVarHandle(Node.class, "next", Node.class); PREV = l.findVarHandle(Node.class, "prev", Node.class); ! THREAD = l.findVarHandle(Node.class, "thread", Thread.class); WAITSTATUS = l.findVarHandle(Node.class, "waitStatus", int.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } } --- 545,562 ---- } // VarHandle mechanics private static final VarHandle NEXT; private static final VarHandle PREV; ! private static final VarHandle STRAND; private static final VarHandle WAITSTATUS; static { try { MethodHandles.Lookup l = MethodHandles.lookup(); NEXT = l.findVarHandle(Node.class, "next", Node.class); PREV = l.findVarHandle(Node.class, "prev", Node.class); ! STRAND = l.findVarHandle(Node.class, "strand", Object.class); WAITSTATUS = l.findVarHandle(Node.class, "waitStatus", int.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } }
*** 671,681 **** * * @param node the node */ private void setHead(Node node) { head = node; ! node.thread = null; node.prev = null; } /** * Wakes up node's successor, if one exists. --- 673,683 ---- * * @param node the node */ private void setHead(Node node) { head = node; ! node.strand = null; node.prev = null; } /** * Wakes up node's successor, if one exists.
*** 704,714 **** for (Node p = tail; p != node && p != null; p = p.prev) if (p.waitStatus <= 0) s = p; } if (s != null) ! LockSupport.unpark(s.thread); } /** * Release action for shared mode -- signals successor and ensures * propagation. (Note: For exclusive mode, release just amounts --- 706,716 ---- for (Node p = tail; p != node && p != null; p = p.prev) if (p.waitStatus <= 0) s = p; } if (s != null) ! LockSupport.unpark(s.strand); } /** * Release action for shared mode -- signals successor and ensures * propagation. (Note: For exclusive mode, release just amounts
*** 789,799 **** private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; ! node.thread = null; // Skip cancelled predecessors Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; --- 791,801 ---- private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; ! node.strand = null; // Skip cancelled predecessors Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev;
*** 818,828 **** // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) && ! pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) pred.compareAndSetNext(predNext, next); } else { unparkSuccessor(node); --- 820,830 ---- // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) && ! pred.strand != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) pred.compareAndSetNext(predNext, next); } else { unparkSuccessor(node);
*** 1427,1474 **** * @return the first (longest-waiting) thread in the queue, or * {@code null} if no threads are currently queued */ public final Thread getFirstQueuedThread() { // handle only fast path, else relay ! return (head == tail) ? null : fullGetFirstQueuedThread(); } /** ! * Version of getFirstQueuedThread called when fastpath fails. */ ! private Thread fullGetFirstQueuedThread() { /* * The first node is normally head.next. Try to get its * thread field, ensuring consistent reads: If thread * field is nulled out or s.prev is no longer head, then * some other thread(s) concurrently performed setHead in * between some of our reads. We try this twice before * resorting to traversal. */ Node h, s; ! Thread st; if (((h = head) != null && (s = h.next) != null && ! s.prev == head && (st = s.thread) != null) || ((h = head) != null && (s = h.next) != null && ! s.prev == head && (st = s.thread) != null)) return st; /* * Head's next field might not have been set yet, or may have * been unset after setHead. So we must check to see if tail * is actually first node. If not, we continue on, safely * traversing from tail back to head to find first, * guaranteeing termination. */ ! Thread firstThread = null; for (Node p = tail; p != null && p != head; p = p.prev) { ! Thread t = p.thread; ! if (t != null) ! firstThread = t; } ! return firstThread; } /** * Returns true if the given thread is currently queued. * --- 1429,1477 ---- * @return the first (longest-waiting) thread in the queue, or * {@code null} if no threads are currently queued */ public final Thread getFirstQueuedThread() { // handle only fast path, else relay ! // FIXME: may throw ClassCastException ! return (head == tail) ? null : (Thread) fullGetFirstQueuedStrand(); } /** ! * Version of getFirstQueuedStrand called when fastpath fails. */ ! private Object fullGetFirstQueuedStrand() { /* * The first node is normally head.next. Try to get its * thread field, ensuring consistent reads: If thread * field is nulled out or s.prev is no longer head, then * some other thread(s) concurrently performed setHead in * between some of our reads. We try this twice before * resorting to traversal. */ Node h, s; ! Object st; if (((h = head) != null && (s = h.next) != null && ! s.prev == head && (st = s.strand) != null) || ((h = head) != null && (s = h.next) != null && ! s.prev == head && (st = s.strand) != null)) return st; /* * Head's next field might not have been set yet, or may have * been unset after setHead. So we must check to see if tail * is actually first node. If not, we continue on, safely * traversing from tail back to head to find first, * guaranteeing termination. */ ! Object firstStrand= null; for (Node p = tail; p != null && p != head; p = p.prev) { ! Object strand = p.strand; ! if (strand != null) ! firstStrand = strand; } ! return firstStrand; } /** * Returns true if the given thread is currently queued. *
*** 1481,1491 **** */ public final boolean isQueued(Thread thread) { if (thread == null) throw new NullPointerException(); for (Node p = tail; p != null; p = p.prev) ! if (p.thread == thread) return true; return false; } /** --- 1484,1494 ---- */ public final boolean isQueued(Thread thread) { if (thread == null) throw new NullPointerException(); for (Node p = tail; p != null; p = p.prev) ! if (p.strand == thread) return true; return false; } /**
*** 1500,1510 **** final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && (s = h.next) != null && !s.isShared() && ! s.thread != null; } /** * Queries whether any threads have been waiting to acquire longer * than the current thread. --- 1503,1513 ---- final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && (s = h.next) != null && !s.isShared() && ! s.strand != null; } /** * Queries whether any threads have been waiting to acquire longer * than the current thread.
*** 1556,1566 **** for (Node p = tail; p != h && p != null; p = p.prev) { if (p.waitStatus <= 0) s = p; } } ! if (s != null && s.thread != Thread.currentThread()) return true; } return false; } --- 1559,1569 ---- for (Node p = tail; p != h && p != null; p = p.prev) { if (p.waitStatus <= 0) s = p; } } ! if (s != null && s.strand != Strands.currentStrand()) return true; } return false; }
*** 1576,1586 **** * @return the estimated number of threads waiting to acquire */ public final int getQueueLength() { int n = 0; for (Node p = tail; p != null; p = p.prev) { ! if (p.thread != null) ++n; } return n; } --- 1579,1589 ---- * @return the estimated number of threads waiting to acquire */ public final int getQueueLength() { int n = 0; for (Node p = tail; p != null; p = p.prev) { ! if (p.strand != null) ++n; } return n; }
*** 1596,1608 **** * @return the collection of threads */ public final Collection<Thread> getQueuedThreads() { ArrayList<Thread> list = new ArrayList<>(); for (Node p = tail; p != null; p = p.prev) { ! Thread t = p.thread; ! if (t != null) ! list.add(t); } return list; } /** --- 1599,1611 ---- * @return the collection of threads */ public final Collection<Thread> getQueuedThreads() { ArrayList<Thread> list = new ArrayList<>(); for (Node p = tail; p != null; p = p.prev) { ! Object s = p.strand; ! if (s instanceof Thread) ! list.add((Thread)s); } return list; } /**
*** 1615,1627 **** */ public final Collection<Thread> getExclusiveQueuedThreads() { ArrayList<Thread> list = new ArrayList<>(); for (Node p = tail; p != null; p = p.prev) { if (!p.isShared()) { ! Thread t = p.thread; ! if (t != null) ! list.add(t); } } return list; } --- 1618,1630 ---- */ public final Collection<Thread> getExclusiveQueuedThreads() { ArrayList<Thread> list = new ArrayList<>(); for (Node p = tail; p != null; p = p.prev) { if (!p.isShared()) { ! Object s = p.strand; ! if (s instanceof Thread) ! list.add((Thread)s); } } return list; }
*** 1635,1647 **** */ public final Collection<Thread> getSharedQueuedThreads() { ArrayList<Thread> list = new ArrayList<>(); for (Node p = tail; p != null; p = p.prev) { if (p.isShared()) { ! Thread t = p.thread; ! if (t != null) ! list.add(t); } } return list; } --- 1638,1650 ---- */ public final Collection<Thread> getSharedQueuedThreads() { ArrayList<Thread> list = new ArrayList<>(); for (Node p = tail; p != null; p = p.prev) { if (p.isShared()) { ! Object s = p.strand; ! if (s instanceof Thread) ! list.add((Thread)s); } } return list; }
*** 1724,1734 **** * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) ! LockSupport.unpark(node.thread); return true; } /** * Transfers node, if necessary, to sync queue after a cancelled wait. --- 1727,1737 ---- * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) ! LockSupport.unpark(node.strand); return true; } /** * Transfers node, if necessary, to sync queue after a cancelled wait.
*** 2285,2297 **** if (!isHeldExclusively()) throw new IllegalMonitorStateException(); ArrayList<Thread> list = new ArrayList<>(); for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) { ! Thread t = w.thread; ! if (t != null) ! list.add(t); } } return list; } } --- 2288,2300 ---- if (!isHeldExclusively()) throw new IllegalMonitorStateException(); ArrayList<Thread> list = new ArrayList<>(); for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) { ! Object s = w.strand; ! if (s != null) ! list.add((Thread)s); } } return list; } }
< prev index next >