< prev index next >

src/java.base/share/classes/jdk/internal/misc/ThreadFlock.java

Print this page

 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 package jdk.internal.misc;
 26 
 27 import java.lang.invoke.MethodHandles;
 28 import java.lang.invoke.VarHandle;
 29 import java.time.Duration;
 30 import java.util.Objects;
 31 import java.util.Set;
 32 import java.util.concurrent.ConcurrentHashMap;
 33 import java.util.concurrent.TimeoutException;
 34 import java.util.concurrent.locks.LockSupport;
 35 import java.util.stream.Stream;
 36 import jdk.internal.access.JavaLangAccess;
 37 import jdk.internal.access.SharedSecrets;
 38 import jdk.internal.vm.ExtentLocalContainer;
 39 import jdk.internal.vm.ThreadContainer;
 40 import jdk.internal.vm.ThreadContainers;
 41 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 42 
 43 /**
 44  * A grouping of threads that typically run closely related tasks. Threads started
 45  * in a flock remain in the flock until they terminate.
 46  *
 47  * <p> ThreadFlock defines the {@link #open(String) open} method to open a new flock,
 48  * the {@link #start(Thread) start} method to start a thread in the flock, and the
 49  * {@link #close() close} method to close the flock. The {@code close} waits for all
 50  * threads in the flock to finish. The {@link #awaitAll() awaitAll} method may be used
 51  * to wait for all threads to finish without closing the flock. The {@link #wakeup()}
 52  * method will cause {@code awaitAll} method to complete early, which can be used to
 53  * support cancellation in higher-level APIs. ThreadFlock also defines the {@link
 54  * #shutdown() shutdown} method to prevent new threads from starting while allowing
 55  * existing threads in the flock to continue.
 56  *
 57  * <p> Thread flocks are intended to be used in a <em>structured manner</em>. The
 58  * thread that opens a new flock is the {@link #owner() owner}. The owner closes the

 82     private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
 83     private static final VarHandle THREAD_COUNT;
 84     private static final VarHandle PERMIT;
 85     static {
 86         try {
 87             MethodHandles.Lookup l = MethodHandles.lookup();
 88             THREAD_COUNT = l.findVarHandle(ThreadFlock.class, "threadCount", int.class);
 89             PERMIT = l.findVarHandle(ThreadFlock.class, "permit", boolean.class);
 90             Unsafe.getUnsafe().ensureClassInitialized(StructureViolationExceptions.class);
 91         } catch (Exception e) {
 92             throw new InternalError(e);
 93         }
 94     }
 95 
 96     private final Set<Thread> threads = ConcurrentHashMap.newKeySet();
 97 
 98     // thread count, need to re-examine contention once API is stable
 99     private volatile int threadCount;
100 
101     private final String name;
102     private final ExtentLocalContainer.BindingsSnapshot extentLocalBindings;
103     private final ThreadContainerImpl container; // encapsulate for now
104 
105     // state
106     private volatile boolean shutdown;
107     private volatile boolean closed;
108 
109     // set by wakeup, cleared by awaitAll
110     private volatile boolean permit;
111 
112     ThreadFlock(String name) {
113         this.name = name;
114         this.extentLocalBindings = ExtentLocalContainer.captureBindings();
115         this.container = new ThreadContainerImpl(this);
116     }
117 
118     private long threadCount() {
119         return threadCount;
120     }
121 
122     private ExtentLocalContainer.BindingsSnapshot extentLocalBindings() {
123         return extentLocalBindings;
124     }
125 
126     private void incrementThreadCount() {
127         THREAD_COUNT.getAndAdd(this, 1);
128     }
129 
130     /**
131      * Decrement the thread count. Unpark the owner if the count goes to zero.
132      */
133     private void decrementThreadCount() {
134         int count = (int) THREAD_COUNT.getAndAdd(this, -1) - 1;
135 
136         // signal owner when the count goes to zero
137         if (count == 0) {
138             LockSupport.unpark(owner());
139         }
140     }
141 
142     /**
143      * Invoked on the parent thread when starting {@code thread}.

193      */
194     private void ensureOwner() {
195         if (Thread.currentThread() != owner())
196             throw new WrongThreadException("Current thread not owner");
197     }
198 
199     /**
200      * Throws WrongThreadException if the current thread is not the owner
201      * or a thread contained in the flock.
202      */
203     private void ensureOwnerOrContainsThread() {
204         Thread currentThread = Thread.currentThread();
205         if (currentThread != owner() && !containsThread(currentThread))
206             throw new WrongThreadException("Current thread not owner or thread in flock");
207     }
208 
209     /**
210      * Opens a new thread flock. The flock is owned by the current thread. It can be
211      * named to aid debugging.
212      *
213      * <p> This method captures the current thread's {@linkplain ExtentLocal extent-local}
214      * bindings for inheritance by threads created in the flock.
215      *
216      * <p> For the purposes of containment, monitoring, and debugging, the parent
217      * of the new flock is determined as follows:
218      * <ul>
219      * <li> If the current thread is the owner of open flocks then the most recently
220      * created, and open, flock is the parent of the new flock. In other words, the
221      * <em>enclosing flock</em> is the parent.
222      * <li> If the current thread is not the owner of any open flocks then the
223      * parent of the new flock is the current thread's flock. If the current thread
224      * was not started in a flock then the new flock does not have a parent.
225      * </ul>
226      *
227      * @param name the name of the flock, can be null
228      * @return a new thread flock
229      */
230     public static ThreadFlock open(String name) {
231         var flock = new ThreadFlock(name);
232         flock.container.push();
233         return flock;
234     }
235 
236     /**
237      * {@return the name of this flock or {@code null} if unnamed}
238      */
239     public String name() {
240         return name;
241     }
242 
243     /**
244      * {@return the owner of this flock}
245      */
246     public Thread owner() {
247         return container.owner();
248     }
249 
250     /**
251      * Starts the given unstarted thread in this flock.
252      *
253      * <p> The thread is started with the extent-local bindings that were captured
254      * when opening the flock. The bindings must match the current thread's bindings.
255      *
256      * <p> This method may only be invoked by the flock owner or threads {@linkplain
257      * #containsThread(Thread) contained} in the flock.
258      *
259      * @param thread the unstarted thread
260      * @return the thread, started
261      * @throws IllegalStateException if this flock is shutdown or closed
262      * @throws IllegalThreadStateException if the given thread was already started
263      * @throws WrongThreadException if the current thread is not the owner or a thread
264      * contained in the flock
265      * @throws jdk.incubator.concurrent.StructureViolationException if the current
266      * extent-local bindings are not the same as when the flock was created
267      */
268     public Thread start(Thread thread) {
269         ensureOwnerOrContainsThread();
270         JLA.start(thread, container);
271         return thread;
272     }
273 
274     /**
275      * Shutdown this flock so that no new threads can be started, existing threads
276      * in the flock will continue to run. This method is a no-op if the flock is
277      * already shutdown or closed.
278      *
279      * <p> This method may only be invoked by the flock owner or threads {@linkplain
280      * #containsThread(Thread) contained} in the flock.
281      *
282      * @throws WrongThreadException if the current thread is not the owner or a thread
283      * contained in the flock
284      */
285     public void shutdown() {
286         ensureOwnerOrContainsThread();

381         ensureOwnerOrContainsThread();
382         if (!getAndSetPermit(true) && Thread.currentThread() != owner()) {
383             LockSupport.unpark(owner());
384         }
385     }
386 
387     /**
388      * Closes this flock. This method first shuts down the flock to prevent
389      * new threads from starting. It then waits for the threads in the flock
390      * to finish executing their tasks. In other words, this method blocks until
391      * all threads in the flock finish.
392      *
393      * <p> This method may only be invoked by the flock owner.
394      *
395      * <p> If interrupted then this method continues to wait until all threads
396      * finish, before completing with the interrupt status set.
397      *
398      * <p> A ThreadFlock is intended to be used in a <em>structured manner</em>. If
399      * this method is called to close a flock before nested flocks are closed then it
400      * closes the nested flocks (in the reverse order that they were created in),
401      * closes this flock, and then throws {@link
402      * jdk.incubator.concurrent.StructureViolationException}.
403      * Similarly, if called to close a flock that <em>encloses</em> {@linkplain
404      * jdk.incubator.concurrent.ExtentLocal.Carrier#run(Runnable) operations} with
405      * extent-local bindings then it also throws {@code StructureViolationException}
406      * after closing the flock.
407      *
408      * @throws WrongThreadException if invoked by a thread that is not the owner
409      * @throws jdk.incubator.concurrent.StructureViolationException if a structure
410      * violation was detected
411      */
412     public void close() {
413         ensureOwner();
414         if (closed)
415             return;
416 
417         // shutdown, if not already shutdown
418         if (!shutdown)
419             shutdown = true;
420 
421         // wait for threads to finish
422         boolean interrupted = false;
423         try {
424             while (threadCount > 0) {
425                 LockSupport.park();
426                 if (Thread.interrupted()) {

568         public long threadCount() {
569             return flock.threadCount();
570         }
571         @Override
572         public Stream<Thread> threads() {
573             return flock.threads().filter(Thread::isAlive);
574         }
575         @Override
576         public void onStart(Thread thread) {
577             flock.onStart(thread);
578         }
579         @Override
580         public void onExit(Thread thread) {
581             flock.onExit(thread);
582         }
583         @Override
584         public String toString() {
585             return flock.toString();
586         }
587         @Override
588         public ExtentLocalContainer.BindingsSnapshot extentLocalBindings() {
589             return flock.extentLocalBindings();
590         }
591     }
592 }

 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 package jdk.internal.misc;
 26 
 27 import java.lang.invoke.MethodHandles;
 28 import java.lang.invoke.VarHandle;
 29 import java.time.Duration;
 30 import java.util.Objects;
 31 import java.util.Set;
 32 import java.util.concurrent.ConcurrentHashMap;
 33 import java.util.concurrent.TimeoutException;
 34 import java.util.concurrent.locks.LockSupport;
 35 import java.util.stream.Stream;
 36 import jdk.internal.access.JavaLangAccess;
 37 import jdk.internal.access.SharedSecrets;
 38 import jdk.internal.vm.ScopedValueContainer;
 39 import jdk.internal.vm.ThreadContainer;
 40 import jdk.internal.vm.ThreadContainers;
 41 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 42 
 43 /**
 44  * A grouping of threads that typically run closely related tasks. Threads started
 45  * in a flock remain in the flock until they terminate.
 46  *
 47  * <p> ThreadFlock defines the {@link #open(String) open} method to open a new flock,
 48  * the {@link #start(Thread) start} method to start a thread in the flock, and the
 49  * {@link #close() close} method to close the flock. The {@code close} waits for all
 50  * threads in the flock to finish. The {@link #awaitAll() awaitAll} method may be used
 51  * to wait for all threads to finish without closing the flock. The {@link #wakeup()}
 52  * method will cause {@code awaitAll} method to complete early, which can be used to
 53  * support cancellation in higher-level APIs. ThreadFlock also defines the {@link
 54  * #shutdown() shutdown} method to prevent new threads from starting while allowing
 55  * existing threads in the flock to continue.
 56  *
 57  * <p> Thread flocks are intended to be used in a <em>structured manner</em>. The
 58  * thread that opens a new flock is the {@link #owner() owner}. The owner closes the

 82     private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
 83     private static final VarHandle THREAD_COUNT;
 84     private static final VarHandle PERMIT;
 85     static {
 86         try {
 87             MethodHandles.Lookup l = MethodHandles.lookup();
 88             THREAD_COUNT = l.findVarHandle(ThreadFlock.class, "threadCount", int.class);
 89             PERMIT = l.findVarHandle(ThreadFlock.class, "permit", boolean.class);
 90             Unsafe.getUnsafe().ensureClassInitialized(StructureViolationExceptions.class);
 91         } catch (Exception e) {
 92             throw new InternalError(e);
 93         }
 94     }
 95 
 96     private final Set<Thread> threads = ConcurrentHashMap.newKeySet();
 97 
 98     // thread count, need to re-examine contention once API is stable
 99     private volatile int threadCount;
100 
101     private final String name;
102     private final ScopedValueContainer.BindingsSnapshot scopedValueBindings;
103     private final ThreadContainerImpl container; // encapsulate for now
104 
105     // state
106     private volatile boolean shutdown;
107     private volatile boolean closed;
108 
109     // set by wakeup, cleared by awaitAll
110     private volatile boolean permit;
111 
112     ThreadFlock(String name) {
113         this.name = name;
114         this.scopedValueBindings = ScopedValueContainer.captureBindings();
115         this.container = new ThreadContainerImpl(this);
116     }
117 
118     private long threadCount() {
119         return threadCount;
120     }
121 
122     private ScopedValueContainer.BindingsSnapshot scopedValueBindings() {
123         return scopedValueBindings;
124     }
125 
126     private void incrementThreadCount() {
127         THREAD_COUNT.getAndAdd(this, 1);
128     }
129 
130     /**
131      * Decrement the thread count. Unpark the owner if the count goes to zero.
132      */
133     private void decrementThreadCount() {
134         int count = (int) THREAD_COUNT.getAndAdd(this, -1) - 1;
135 
136         // signal owner when the count goes to zero
137         if (count == 0) {
138             LockSupport.unpark(owner());
139         }
140     }
141 
142     /**
143      * Invoked on the parent thread when starting {@code thread}.

193      */
194     private void ensureOwner() {
195         if (Thread.currentThread() != owner())
196             throw new WrongThreadException("Current thread not owner");
197     }
198 
199     /**
200      * Throws WrongThreadException if the current thread is not the owner
201      * or a thread contained in the flock.
202      */
203     private void ensureOwnerOrContainsThread() {
204         Thread currentThread = Thread.currentThread();
205         if (currentThread != owner() && !containsThread(currentThread))
206             throw new WrongThreadException("Current thread not owner or thread in flock");
207     }
208 
209     /**
210      * Opens a new thread flock. The flock is owned by the current thread. It can be
211      * named to aid debugging.
212      *
213      * <p> This method captures the current thread's {@linkplain ScopedValue scoped value}
214      * bindings for inheritance by threads created in the flock.
215      *
216      * <p> For the purposes of containment, monitoring, and debugging, the parent
217      * of the new flock is determined as follows:
218      * <ul>
219      * <li> If the current thread is the owner of open flocks then the most recently
220      * created, and open, flock is the parent of the new flock. In other words, the
221      * <em>enclosing flock</em> is the parent.
222      * <li> If the current thread is not the owner of any open flocks then the
223      * parent of the new flock is the current thread's flock. If the current thread
224      * was not started in a flock then the new flock does not have a parent.
225      * </ul>
226      *
227      * @param name the name of the flock, can be null
228      * @return a new thread flock
229      */
230     public static ThreadFlock open(String name) {
231         var flock = new ThreadFlock(name);
232         flock.container.push();
233         return flock;
234     }
235 
236     /**
237      * {@return the name of this flock or {@code null} if unnamed}
238      */
239     public String name() {
240         return name;
241     }
242 
243     /**
244      * {@return the owner of this flock}
245      */
246     public Thread owner() {
247         return container.owner();
248     }
249 
250     /**
251      * Starts the given unstarted thread in this flock.
252      *
253      * <p> The thread is started with the scoped value bindings that were captured
254      * when opening the flock. The bindings must match the current thread's bindings.
255      *
256      * <p> This method may only be invoked by the flock owner or threads {@linkplain
257      * #containsThread(Thread) contained} in the flock.
258      *
259      * @param thread the unstarted thread
260      * @return the thread, started
261      * @throws IllegalStateException if this flock is shutdown or closed
262      * @throws IllegalThreadStateException if the given thread was already started
263      * @throws WrongThreadException if the current thread is not the owner or a thread
264      * contained in the flock
265      * @throws jdk.incubator.concurrent.StructureViolationException if the current
266      * scoped value bindings are not the same as when the flock was created
267      */
268     public Thread start(Thread thread) {
269         ensureOwnerOrContainsThread();
270         JLA.start(thread, container);
271         return thread;
272     }
273 
274     /**
275      * Shutdown this flock so that no new threads can be started, existing threads
276      * in the flock will continue to run. This method is a no-op if the flock is
277      * already shutdown or closed.
278      *
279      * <p> This method may only be invoked by the flock owner or threads {@linkplain
280      * #containsThread(Thread) contained} in the flock.
281      *
282      * @throws WrongThreadException if the current thread is not the owner or a thread
283      * contained in the flock
284      */
285     public void shutdown() {
286         ensureOwnerOrContainsThread();

381         ensureOwnerOrContainsThread();
382         if (!getAndSetPermit(true) && Thread.currentThread() != owner()) {
383             LockSupport.unpark(owner());
384         }
385     }
386 
387     /**
388      * Closes this flock. This method first shuts down the flock to prevent
389      * new threads from starting. It then waits for the threads in the flock
390      * to finish executing their tasks. In other words, this method blocks until
391      * all threads in the flock finish.
392      *
393      * <p> This method may only be invoked by the flock owner.
394      *
395      * <p> If interrupted then this method continues to wait until all threads
396      * finish, before completing with the interrupt status set.
397      *
398      * <p> A ThreadFlock is intended to be used in a <em>structured manner</em>. If
399      * this method is called to close a flock before nested flocks are closed then it
400      * closes the nested flocks (in the reverse order that they were created in),
401      * closes this flock, and then throws {@code StructureViolationException}.
402      * Similarly, if this method is called to close a thread flock while executing with
403      * scoped value bindings, and the thread flock was created before the scoped values
404      * were bound, then {@code StructureViolationException} is thrown after closing the
405      * thread flock.

406      *
407      * @throws WrongThreadException if invoked by a thread that is not the owner
408      * @throws jdk.incubator.concurrent.StructureViolationException if a structure
409      * violation was detected
410      */
411     public void close() {
412         ensureOwner();
413         if (closed)
414             return;
415 
416         // shutdown, if not already shutdown
417         if (!shutdown)
418             shutdown = true;
419 
420         // wait for threads to finish
421         boolean interrupted = false;
422         try {
423             while (threadCount > 0) {
424                 LockSupport.park();
425                 if (Thread.interrupted()) {

567         public long threadCount() {
568             return flock.threadCount();
569         }
570         @Override
571         public Stream<Thread> threads() {
572             return flock.threads().filter(Thread::isAlive);
573         }
574         @Override
575         public void onStart(Thread thread) {
576             flock.onStart(thread);
577         }
578         @Override
579         public void onExit(Thread thread) {
580             flock.onExit(thread);
581         }
582         @Override
583         public String toString() {
584             return flock.toString();
585         }
586         @Override
587         public ScopedValueContainer.BindingsSnapshot scopedValueBindings() {
588             return flock.scopedValueBindings();
589         }
590     }
591 }
< prev index next >