14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package jdk.internal.misc;
26
27 import java.lang.invoke.MethodHandles;
28 import java.lang.invoke.VarHandle;
29 import java.time.Duration;
30 import java.util.Objects;
31 import java.util.Set;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.TimeoutException;
34 import java.util.concurrent.locks.LockSupport;
35 import java.util.stream.Stream;
36 import jdk.internal.access.JavaLangAccess;
37 import jdk.internal.access.SharedSecrets;
38 import jdk.internal.vm.ScopedValueContainer;
39 import jdk.internal.vm.ThreadContainer;
40 import jdk.internal.vm.ThreadContainers;
41 import static java.util.concurrent.TimeUnit.NANOSECONDS;
42
43 /**
44 * A grouping of threads that typically run closely related tasks. Threads started
45 * in a flock remain in the flock until they terminate.
46 *
47 * <p> ThreadFlock defines the {@link #open(String) open} method to open a new flock,
48 * the {@link #start(Thread) start} method to start a thread in the flock, and the
49 * {@link #close() close} method to close the flock. The {@code close} waits for all
50 * threads in the flock to finish. The {@link #awaitAll() awaitAll} method may be used
51 * to wait for all threads to finish without closing the flock. The {@link #wakeup()}
52 * method will cause {@code awaitAll} method to complete early, which can be used to
53 * support cancellation in higher-level APIs. ThreadFlock also defines the {@link
70 * not define APIs that exposes the tree structure. It does define the {@link
71 * #containsThread(Thread) containsThread} method to test if a flock contains a
72 * thread, a test that is equivalent to testing membership of flocks in the tree.
73 * The {@code start} and {@code shutdown} methods are confined to the flock
74 * owner or threads contained in the flock. The confinement check is equivalent to
75 * invoking the {@code containsThread} method to test if the caller is contained
76 * in the flock.
77 *
78 * <p> Unless otherwise specified, passing a {@code null} argument to a method
79 * in this class will cause a {@link NullPointerException} to be thrown.
80 */
81 public class ThreadFlock implements AutoCloseable {
82 private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
83 private static final VarHandle THREAD_COUNT;
84 private static final VarHandle PERMIT;
85 static {
86 try {
87 MethodHandles.Lookup l = MethodHandles.lookup();
88 THREAD_COUNT = l.findVarHandle(ThreadFlock.class, "threadCount", int.class);
89 PERMIT = l.findVarHandle(ThreadFlock.class, "permit", boolean.class);
90 Unsafe.getUnsafe().ensureClassInitialized(StructureViolationExceptions.class);
91 } catch (Exception e) {
92 throw new InternalError(e);
93 }
94 }
95
96 private final Set<Thread> threads = ConcurrentHashMap.newKeySet();
97
98 // thread count, need to re-examine contention once API is stable
99 private volatile int threadCount;
100
101 private final String name;
102 private final ScopedValueContainer.BindingsSnapshot scopedValueBindings;
103 private final ThreadContainerImpl container; // encapsulate for now
104
105 // state
106 private volatile boolean shutdown;
107 private volatile boolean closed;
108
109 // set by wakeup, cleared by awaitAll
110 private volatile boolean permit;
245 */
246 public Thread owner() {
247 return container.owner();
248 }
249
250 /**
251 * Starts the given unstarted thread in this flock.
252 *
253 * <p> The thread is started with the scoped value bindings that were captured
254 * when opening the flock. The bindings must match the current thread's bindings.
255 *
256 * <p> This method may only be invoked by the flock owner or threads {@linkplain
257 * #containsThread(Thread) contained} in the flock.
258 *
259 * @param thread the unstarted thread
260 * @return the thread, started
261 * @throws IllegalStateException if this flock is shutdown or closed
262 * @throws IllegalThreadStateException if the given thread was already started
263 * @throws WrongThreadException if the current thread is not the owner or a thread
264 * contained in the flock
265 * @throws jdk.incubator.concurrent.StructureViolationException if the current
266 * scoped value bindings are not the same as when the flock was created
267 */
268 public Thread start(Thread thread) {
269 ensureOwnerOrContainsThread();
270 JLA.start(thread, container);
271 return thread;
272 }
273
274 /**
275 * Shutdown this flock so that no new threads can be started, existing threads
276 * in the flock will continue to run. This method is a no-op if the flock is
277 * already shutdown or closed.
278 *
279 * <p> This method may only be invoked by the flock owner or threads {@linkplain
280 * #containsThread(Thread) contained} in the flock.
281 *
282 * @throws WrongThreadException if the current thread is not the owner or a thread
283 * contained in the flock
284 */
285 public void shutdown() {
286 ensureOwnerOrContainsThread();
388 * Closes this flock. This method first shuts down the flock to prevent
389 * new threads from starting. It then waits for the threads in the flock
390 * to finish executing their tasks. In other words, this method blocks until
391 * all threads in the flock finish.
392 *
393 * <p> This method may only be invoked by the flock owner.
394 *
395 * <p> If interrupted then this method continues to wait until all threads
396 * finish, before completing with the interrupt status set.
397 *
398 * <p> A ThreadFlock is intended to be used in a <em>structured manner</em>. If
399 * this method is called to close a flock before nested flocks are closed then it
400 * closes the nested flocks (in the reverse order that they were created in),
401 * closes this flock, and then throws {@code StructureViolationException}.
402 * Similarly, if this method is called to close a thread flock while executing with
403 * scoped value bindings, and the thread flock was created before the scoped values
404 * were bound, then {@code StructureViolationException} is thrown after closing the
405 * thread flock.
406 *
407 * @throws WrongThreadException if invoked by a thread that is not the owner
408 * @throws jdk.incubator.concurrent.StructureViolationException if a structure
409 * violation was detected
410 */
411 public void close() {
412 ensureOwner();
413 if (closed)
414 return;
415
416 // shutdown, if not already shutdown
417 if (!shutdown)
418 shutdown = true;
419
420 // wait for threads to finish
421 boolean interrupted = false;
422 try {
423 while (threadCount > 0) {
424 LockSupport.park();
425 if (Thread.interrupted()) {
426 interrupted = true;
427 }
428 }
429
496 } else {
497 return id;
498 }
499 }
500
501 /**
502 * A ThreadContainer backed by a ThreadFlock.
503 */
504 private static class ThreadContainerImpl extends ThreadContainer {
505 private final ThreadFlock flock;
506 private volatile Object key;
507 private boolean closing;
508
509 ThreadContainerImpl(ThreadFlock flock) {
510 super(/*shared*/ false);
511 this.flock = flock;
512 }
513
514 @Override
515 public ThreadContainerImpl push() {
516 // Virtual threads in the root containers are not tracked so need
517 // to register container to ensure that it is found
518 Thread thread = Thread.currentThread();
519 if (thread.isVirtual()
520 && JLA.threadContainer(thread) == ThreadContainers.root()) {
521 this.key = ThreadContainers.registerContainer(this);
522 }
523
524 super.push();
525 return this;
526 }
527
528 /**
529 * Invoked by ThreadFlock.close when closing the flock. This method pops the
530 * container from the current thread's scope stack.
531 */
532 void close() {
533 assert Thread.currentThread() == owner();
534 if (!closing) {
535 closing = true;
536 boolean atTop = popForcefully(); // may block
537 Object key = this.key;
538 if (key != null)
539 ThreadContainers.deregisterContainer(key);
540 if (!atTop)
541 StructureViolationExceptions.throwException();
542 }
543 }
544
545 /**
546 * Invoked when an enclosing scope is closing. Invokes ThreadFlock.close to
547 * close the flock. This method does not pop the container from the current
548 * thread's scope stack.
549 */
550 @Override
551 protected boolean tryClose() {
552 assert Thread.currentThread() == owner();
553 if (!closing) {
554 closing = true;
555 flock.close();
556 Object key = this.key;
557 if (key != null)
558 ThreadContainers.deregisterContainer(key);
559 return true;
560 } else {
561 assert false : "Should not get there";
562 return false;
563 }
564 }
565
566 @Override
567 public long threadCount() {
568 return flock.threadCount();
569 }
570 @Override
571 public Stream<Thread> threads() {
572 return flock.threads().filter(Thread::isAlive);
573 }
574 @Override
575 public void onStart(Thread thread) {
576 flock.onStart(thread);
577 }
578 @Override
579 public void onExit(Thread thread) {
580 flock.onExit(thread);
581 }
582 @Override
583 public String toString() {
584 return flock.toString();
585 }
586 @Override
587 public ScopedValueContainer.BindingsSnapshot scopedValueBindings() {
588 return flock.scopedValueBindings();
589 }
590 }
591 }
|
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package jdk.internal.misc;
26
27 import java.lang.invoke.MethodHandles;
28 import java.lang.invoke.VarHandle;
29 import java.time.Duration;
30 import java.util.Objects;
31 import java.util.Set;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.TimeoutException;
34 import java.util.concurrent.StructureViolationException;
35 import java.util.concurrent.locks.LockSupport;
36 import java.util.stream.Stream;
37 import jdk.internal.access.JavaLangAccess;
38 import jdk.internal.access.SharedSecrets;
39 import jdk.internal.vm.ScopedValueContainer;
40 import jdk.internal.vm.ThreadContainer;
41 import jdk.internal.vm.ThreadContainers;
42 import static java.util.concurrent.TimeUnit.NANOSECONDS;
43
44 /**
45 * A grouping of threads that typically run closely related tasks. Threads started
46 * in a flock remain in the flock until they terminate.
47 *
48 * <p> ThreadFlock defines the {@link #open(String) open} method to open a new flock,
49 * the {@link #start(Thread) start} method to start a thread in the flock, and the
50 * {@link #close() close} method to close the flock. The {@code close} waits for all
51 * threads in the flock to finish. The {@link #awaitAll() awaitAll} method may be used
52 * to wait for all threads to finish without closing the flock. The {@link #wakeup()}
53 * method will cause {@code awaitAll} method to complete early, which can be used to
54 * support cancellation in higher-level APIs. ThreadFlock also defines the {@link
71 * not define APIs that exposes the tree structure. It does define the {@link
72 * #containsThread(Thread) containsThread} method to test if a flock contains a
73 * thread, a test that is equivalent to testing membership of flocks in the tree.
74 * The {@code start} and {@code shutdown} methods are confined to the flock
75 * owner or threads contained in the flock. The confinement check is equivalent to
76 * invoking the {@code containsThread} method to test if the caller is contained
77 * in the flock.
78 *
79 * <p> Unless otherwise specified, passing a {@code null} argument to a method
80 * in this class will cause a {@link NullPointerException} to be thrown.
81 */
82 public class ThreadFlock implements AutoCloseable {
83 private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
84 private static final VarHandle THREAD_COUNT;
85 private static final VarHandle PERMIT;
86 static {
87 try {
88 MethodHandles.Lookup l = MethodHandles.lookup();
89 THREAD_COUNT = l.findVarHandle(ThreadFlock.class, "threadCount", int.class);
90 PERMIT = l.findVarHandle(ThreadFlock.class, "permit", boolean.class);
91 } catch (Exception e) {
92 throw new InternalError(e);
93 }
94 }
95
96 private final Set<Thread> threads = ConcurrentHashMap.newKeySet();
97
98 // thread count, need to re-examine contention once API is stable
99 private volatile int threadCount;
100
101 private final String name;
102 private final ScopedValueContainer.BindingsSnapshot scopedValueBindings;
103 private final ThreadContainerImpl container; // encapsulate for now
104
105 // state
106 private volatile boolean shutdown;
107 private volatile boolean closed;
108
109 // set by wakeup, cleared by awaitAll
110 private volatile boolean permit;
245 */
246 public Thread owner() {
247 return container.owner();
248 }
249
250 /**
251 * Starts the given unstarted thread in this flock.
252 *
253 * <p> The thread is started with the scoped value bindings that were captured
254 * when opening the flock. The bindings must match the current thread's bindings.
255 *
256 * <p> This method may only be invoked by the flock owner or threads {@linkplain
257 * #containsThread(Thread) contained} in the flock.
258 *
259 * @param thread the unstarted thread
260 * @return the thread, started
261 * @throws IllegalStateException if this flock is shutdown or closed
262 * @throws IllegalThreadStateException if the given thread was already started
263 * @throws WrongThreadException if the current thread is not the owner or a thread
264 * contained in the flock
265 * @throws StructureViolationException if the current scoped value bindings are
266 * not the same as when the flock was created
267 */
268 public Thread start(Thread thread) {
269 ensureOwnerOrContainsThread();
270 JLA.start(thread, container);
271 return thread;
272 }
273
274 /**
275 * Shutdown this flock so that no new threads can be started, existing threads
276 * in the flock will continue to run. This method is a no-op if the flock is
277 * already shutdown or closed.
278 *
279 * <p> This method may only be invoked by the flock owner or threads {@linkplain
280 * #containsThread(Thread) contained} in the flock.
281 *
282 * @throws WrongThreadException if the current thread is not the owner or a thread
283 * contained in the flock
284 */
285 public void shutdown() {
286 ensureOwnerOrContainsThread();
388 * Closes this flock. This method first shuts down the flock to prevent
389 * new threads from starting. It then waits for the threads in the flock
390 * to finish executing their tasks. In other words, this method blocks until
391 * all threads in the flock finish.
392 *
393 * <p> This method may only be invoked by the flock owner.
394 *
395 * <p> If interrupted then this method continues to wait until all threads
396 * finish, before completing with the interrupt status set.
397 *
398 * <p> A ThreadFlock is intended to be used in a <em>structured manner</em>. If
399 * this method is called to close a flock before nested flocks are closed then it
400 * closes the nested flocks (in the reverse order that they were created in),
401 * closes this flock, and then throws {@code StructureViolationException}.
402 * Similarly, if this method is called to close a thread flock while executing with
403 * scoped value bindings, and the thread flock was created before the scoped values
404 * were bound, then {@code StructureViolationException} is thrown after closing the
405 * thread flock.
406 *
407 * @throws WrongThreadException if invoked by a thread that is not the owner
408 * @throws StructureViolationException if a structure violation was detected
409 */
410 public void close() {
411 ensureOwner();
412 if (closed)
413 return;
414
415 // shutdown, if not already shutdown
416 if (!shutdown)
417 shutdown = true;
418
419 // wait for threads to finish
420 boolean interrupted = false;
421 try {
422 while (threadCount > 0) {
423 LockSupport.park();
424 if (Thread.interrupted()) {
425 interrupted = true;
426 }
427 }
428
495 } else {
496 return id;
497 }
498 }
499
500 /**
501 * A ThreadContainer backed by a ThreadFlock.
502 */
503 private static class ThreadContainerImpl extends ThreadContainer {
504 private final ThreadFlock flock;
505 private volatile Object key;
506 private boolean closing;
507
508 ThreadContainerImpl(ThreadFlock flock) {
509 super(/*shared*/ false);
510 this.flock = flock;
511 }
512
513 @Override
514 public ThreadContainerImpl push() {
515 // Virtual threads in the root containers may not be tracked so need
516 // to register container to ensure that it is found
517 if (!ThreadContainers.trackAllThreads()) {
518 Thread thread = Thread.currentThread();
519 if (thread.isVirtual()
520 && JLA.threadContainer(thread) == ThreadContainers.root()) {
521 this.key = ThreadContainers.registerContainer(this);
522 }
523 }
524 super.push();
525 return this;
526 }
527
528 /**
529 * Invoked by ThreadFlock.close when closing the flock. This method pops the
530 * container from the current thread's scope stack.
531 */
532 void close() {
533 assert Thread.currentThread() == owner();
534 if (!closing) {
535 closing = true;
536 boolean atTop = popForcefully(); // may block
537 Object key = this.key;
538 if (key != null)
539 ThreadContainers.deregisterContainer(key);
540 if (!atTop)
541 throw new StructureViolationException();
542 }
543 }
544
545 /**
546 * Invoked when an enclosing scope is closing. Invokes ThreadFlock.close to
547 * close the flock. This method does not pop the container from the current
548 * thread's scope stack.
549 */
550 @Override
551 protected boolean tryClose() {
552 assert Thread.currentThread() == owner();
553 if (!closing) {
554 closing = true;
555 flock.close();
556 Object key = this.key;
557 if (key != null)
558 ThreadContainers.deregisterContainer(key);
559 return true;
560 } else {
561 assert false : "Should not get there";
562 return false;
563 }
564 }
565
566 @Override
567 public String name() {
568 return flock.name();
569 }
570 @Override
571 public long threadCount() {
572 return flock.threadCount();
573 }
574 @Override
575 public Stream<Thread> threads() {
576 return flock.threads().filter(Thread::isAlive);
577 }
578 @Override
579 public void onStart(Thread thread) {
580 flock.onStart(thread);
581 }
582 @Override
583 public void onExit(Thread thread) {
584 flock.onExit(thread);
585 }
586 @Override
587 public ScopedValueContainer.BindingsSnapshot scopedValueBindings() {
588 return flock.scopedValueBindings();
589 }
590 }
591 }
|