1 /*
2 * Copyright (c) 2021, 2024, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package jdk.internal.misc;
26
27 import java.lang.invoke.MethodHandles;
28 import java.lang.invoke.VarHandle;
29 import java.time.Duration;
30 import java.util.Objects;
31 import java.util.Set;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.TimeoutException;
34 import java.util.concurrent.StructureViolationException;
35 import java.util.concurrent.locks.LockSupport;
36 import java.util.stream.Stream;
37 import jdk.internal.access.JavaLangAccess;
38 import jdk.internal.access.SharedSecrets;
39 import jdk.internal.vm.ScopedValueContainer;
40 import jdk.internal.vm.ThreadContainer;
41 import jdk.internal.vm.ThreadContainers;
42 import static java.util.concurrent.TimeUnit.NANOSECONDS;
43
44 /**
45 * A grouping of threads that typically run closely related tasks. Threads started
46 * in a flock remain in the flock until they terminate.
47 *
48 * <p> ThreadFlock defines the {@link #open(String) open} method to open a new flock,
49 * the {@link #start(Thread) start} method to start a thread in the flock, and the
50 * {@link #close() close} method to close the flock. The {@code close} waits for all
51 * threads in the flock to finish. The {@link #awaitAll() awaitAll} method may be used
52 * to wait for all threads to finish without closing the flock. The {@link #wakeup()}
53 * method will cause {@code awaitAll} method to complete early, which can be used to
54 * support cancellation in higher-level APIs. ThreadFlock also defines the {@link
55 * #shutdown() shutdown} method to prevent new threads from starting while allowing
56 * existing threads in the flock to continue.
57 *
58 * <p> Thread flocks are intended to be used in a <em>structured manner</em>. The
59 * thread that opens a new flock is the {@link #owner() owner}. The owner closes the
60 * flock when done, failure to do so may result in a resource leak. The {@code open}
61 * and {@code close} should be matched to avoid closing an <em>enclosing</em> flock
62 * while a <em>nested</em> flock is open. A ThreadFlock can be used with the
63 * try-with-resources construct if required but more likely, the close method of a
64 * higher-level API that implements {@link AutoCloseable} will close the flock.
65 *
66 * <p> Thread flocks are conceptually nodes in a tree. A thread {@code T} started in
67 * flock "A" may itself open a new flock "B", implicitly forming a tree where flock
68 * "A" is the parent of flock "B". When nested, say where thread {@code T} opens
69 * flock "B" and then invokes a method that opens flock "C", then the enclosing
70 * flock "B" is conceptually the parent of the nested flock "C". ThreadFlock does
71 * not define APIs that exposes the tree structure. It does define the {@link
72 * #containsThread(Thread) containsThread} method to test if a flock contains a
73 * thread, a test that is equivalent to testing membership of flocks in the tree.
74 * The {@code start} and {@code shutdown} methods are confined to the flock
75 * owner or threads contained in the flock. The confinement check is equivalent to
76 * invoking the {@code containsThread} method to test if the caller is contained
77 * in the flock.
78 *
79 * <p> Unless otherwise specified, passing a {@code null} argument to a method
80 * in this class will cause a {@link NullPointerException} to be thrown.
81 */
82 public class ThreadFlock implements AutoCloseable {
83 private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
84 private static final VarHandle THREAD_COUNT;
85 private static final VarHandle PERMIT;
86 static {
87 try {
88 MethodHandles.Lookup l = MethodHandles.lookup();
89 THREAD_COUNT = l.findVarHandle(ThreadFlock.class, "threadCount", int.class);
90 PERMIT = l.findVarHandle(ThreadFlock.class, "permit", boolean.class);
91 } catch (Exception e) {
92 throw new InternalError(e);
93 }
94 }
95
96 private final Set<Thread> threads = ConcurrentHashMap.newKeySet();
97
98 // thread count, need to re-examine contention once API is stable
99 private volatile int threadCount;
100
101 private final String name;
102 private final ScopedValueContainer.BindingsSnapshot scopedValueBindings;
103 private final ThreadContainerImpl container; // encapsulate for now
104
105 // state
106 private volatile boolean shutdown;
107 private volatile boolean closed;
108
109 // set by wakeup, cleared by awaitAll
110 private volatile boolean permit;
111
112 ThreadFlock(String name) {
113 this.name = name;
114 this.scopedValueBindings = ScopedValueContainer.captureBindings();
115 this.container = new ThreadContainerImpl(this);
116 }
117
118 private long threadCount() {
119 return threadCount;
120 }
121
122 private ScopedValueContainer.BindingsSnapshot scopedValueBindings() {
123 return scopedValueBindings;
124 }
125
126 private void incrementThreadCount() {
127 THREAD_COUNT.getAndAdd(this, 1);
128 }
129
130 /**
131 * Decrement the thread count. Unpark the owner if the count goes to zero.
132 */
133 private void decrementThreadCount() {
134 int count = (int) THREAD_COUNT.getAndAdd(this, -1) - 1;
135
136 // signal owner when the count goes to zero
137 if (count == 0) {
138 LockSupport.unpark(owner());
139 }
140 }
141
142 /**
143 * Invoked on the parent thread when starting {@code thread}.
144 */
145 private void onStart(Thread thread) {
146 incrementThreadCount();
147 boolean done = false;
148 try {
149 boolean added = threads.add(thread);
150 assert added;
151 if (shutdown)
152 throw new IllegalStateException("Shutdown");
153 done = true;
154 } finally {
155 if (!done) {
156 threads.remove(thread);
157 decrementThreadCount();
158 }
159 }
160 }
161
162 /**
163 * Invoked on the terminating thread or the parent thread when starting
164 * {@code thread} failed. This method is only called if onStart succeeded.
165 */
166 private void onExit(Thread thread) {
167 boolean removed = threads.remove(thread);
168 assert removed;
169 decrementThreadCount();
170 }
171
172 /**
173 * Clear wakeup permit.
174 */
175 private void clearPermit() {
176 if (permit)
177 permit = false;
178 }
179
180 /**
181 * Sets the wakeup permit to the given value, returning the previous value.
182 */
183 private boolean getAndSetPermit(boolean newValue) {
184 if (permit != newValue) {
185 return (boolean) PERMIT.getAndSet(this, newValue);
186 } else {
187 return newValue;
188 }
189 }
190
191 /**
192 * Throws WrongThreadException if the current thread is not the owner.
193 */
194 private void ensureOwner() {
195 if (Thread.currentThread() != owner())
196 throw new WrongThreadException("Current thread not owner");
197 }
198
199 /**
200 * Throws WrongThreadException if the current thread is not the owner
201 * or a thread contained in the flock.
202 */
203 private void ensureOwnerOrContainsThread() {
204 Thread currentThread = Thread.currentThread();
205 if (currentThread != owner() && !containsThread(currentThread))
206 throw new WrongThreadException("Current thread not owner or thread in flock");
207 }
208
209 /**
210 * Opens a new thread flock. The flock is owned by the current thread. It can be
211 * named to aid debugging.
212 *
213 * <p> This method captures the current thread's {@linkplain ScopedValue scoped value}
214 * bindings for inheritance by threads created in the flock.
215 *
216 * <p> For the purposes of containment, monitoring, and debugging, the parent
217 * of the new flock is determined as follows:
218 * <ul>
219 * <li> If the current thread is the owner of open flocks then the most recently
220 * created, and open, flock is the parent of the new flock. In other words, the
221 * <em>enclosing flock</em> is the parent.
222 * <li> If the current thread is not the owner of any open flocks then the
223 * parent of the new flock is the current thread's flock. If the current thread
224 * was not started in a flock then the new flock does not have a parent.
225 * </ul>
226 *
227 * @param name the name of the flock, can be null
228 * @return a new thread flock
229 */
230 public static ThreadFlock open(String name) {
231 var flock = new ThreadFlock(name);
232 flock.container.push();
233 return flock;
234 }
235
236 /**
237 * {@return the name of this flock or {@code null} if unnamed}
238 */
239 public String name() {
240 return name;
241 }
242
243 /**
244 * {@return the owner of this flock}
245 */
246 public Thread owner() {
247 return container.owner();
248 }
249
250 /**
251 * Starts the given unstarted thread in this flock.
252 *
253 * <p> The thread is started with the scoped value bindings that were captured
254 * when opening the flock. The bindings must match the current thread's bindings.
255 *
256 * <p> This method may only be invoked by the flock owner or threads {@linkplain
257 * #containsThread(Thread) contained} in the flock.
258 *
259 * @param thread the unstarted thread
260 * @return the thread, started
261 * @throws IllegalStateException if this flock is shutdown or closed
262 * @throws IllegalThreadStateException if the given thread was already started
263 * @throws WrongThreadException if the current thread is not the owner or a thread
264 * contained in the flock
265 * @throws StructureViolationException if the current scoped value bindings are
266 * not the same as when the flock was created
267 */
268 public Thread start(Thread thread) {
269 ensureOwnerOrContainsThread();
270 JLA.start(thread, container);
271 return thread;
272 }
273
274 /**
275 * Shutdown this flock so that no new threads can be started, existing threads
276 * in the flock will continue to run. This method is a no-op if the flock is
277 * already shutdown or closed.
278 */
279 public void shutdown() {
280 if (!shutdown) {
281 shutdown = true;
282 }
283 }
284
285 /**
286 * Wait for all threads in the flock to finish executing their tasks. This method
287 * waits until all threads finish, the {@link #wakeup() wakeup} method is invoked,
288 * or the current thread is interrupted.
289 *
290 * <p> This method may only be invoked by the flock owner. The method trivially
291 * returns true when the flock is closed.
292 *
293 * <p> This method clears the effect of any previous invocations of the
294 * {@code wakeup} method.
295 *
296 * @return true if there are no threads in the flock, false if wakeup was invoked
297 * and there are unfinished threads
298 * @throws InterruptedException if interrupted while waiting
299 * @throws WrongThreadException if invoked by a thread that is not the owner
300 */
301 public boolean awaitAll() throws InterruptedException {
302 ensureOwner();
303
304 if (getAndSetPermit(false))
305 return (threadCount == 0);
306
307 while (threadCount > 0 && !permit) {
308 LockSupport.park();
309 if (Thread.interrupted())
310 throw new InterruptedException();
311 }
312 clearPermit();
313 return (threadCount == 0);
314 }
315
316 /**
317 * Wait, up to the given waiting timeout, for all threads in the flock to finish
318 * executing their tasks. This method waits until all threads finish, the {@link
319 * #wakeup() wakeup} method is invoked, the current thread is interrupted, or
320 * the timeout expires.
321 *
322 * <p> This method may only be invoked by the flock owner. The method trivially
323 * returns true when the flock is closed.
324 *
325 * <p> This method clears the effect of any previous invocations of the {@code wakeup}
326 * method.
327 *
328 * @param timeout the maximum duration to wait
329 * @return true if there are no threads in the flock, false if wakeup was invoked
330 * and there are unfinished threads
331 * @throws InterruptedException if interrupted while waiting
332 * @throws TimeoutException if the wait timed out
333 * @throws WrongThreadException if invoked by a thread that is not the owner
334 */
335 public boolean awaitAll(Duration timeout)
336 throws InterruptedException, TimeoutException {
337 Objects.requireNonNull(timeout);
338 ensureOwner();
339
340 if (getAndSetPermit(false))
341 return (threadCount == 0);
342
343 long startNanos = System.nanoTime();
344 long nanos = NANOSECONDS.convert(timeout);
345 long remainingNanos = nanos;
346 while (threadCount > 0 && remainingNanos > 0 && !permit) {
347 LockSupport.parkNanos(remainingNanos);
348 if (Thread.interrupted())
349 throw new InterruptedException();
350 remainingNanos = nanos - (System.nanoTime() - startNanos);
351 }
352
353 boolean done = (threadCount == 0);
354 if (!done && remainingNanos <= 0 && !permit) {
355 throw new TimeoutException();
356 } else {
357 clearPermit();
358 return done;
359 }
360 }
361
362 /**
363 * Causes the call to {@link #awaitAll()} or {@link #awaitAll(Duration)} by the
364 * {@linkplain #owner() owner} to return immediately.
365 *
366 * <p> If the owner is blocked in {@code awaitAll} then it will return immediately.
367 * If the owner is not blocked in {@code awaitAll} then its next call to wait
368 * will return immediately. The method does nothing when the flock is closed.
369 */
370 public void wakeup() {
371 if (!getAndSetPermit(true) && Thread.currentThread() != owner()) {
372 LockSupport.unpark(owner());
373 }
374 }
375
376 /**
377 * Closes this flock. This method first shuts down the flock to prevent
378 * new threads from starting. It then waits for the threads in the flock
379 * to finish executing their tasks. In other words, this method blocks until
380 * all threads in the flock finish.
381 *
382 * <p> This method may only be invoked by the flock owner.
383 *
384 * <p> If interrupted then this method continues to wait until all threads
385 * finish, before completing with the interrupt status set.
386 *
387 * <p> A ThreadFlock is intended to be used in a <em>structured manner</em>. If
388 * this method is called to close a flock before nested flocks are closed then it
389 * closes the nested flocks (in the reverse order that they were created in),
390 * closes this flock, and then throws {@code StructureViolationException}.
391 * Similarly, if this method is called to close a thread flock while executing with
392 * scoped value bindings, and the thread flock was created before the scoped values
393 * were bound, then {@code StructureViolationException} is thrown after closing the
394 * thread flock.
395 *
396 * @throws WrongThreadException if invoked by a thread that is not the owner
397 * @throws StructureViolationException if a structure violation was detected
398 */
399 public void close() {
400 ensureOwner();
401 if (closed)
402 return;
403
404 // shutdown, if not already shutdown
405 if (!shutdown)
406 shutdown = true;
407
408 // wait for threads to finish
409 boolean interrupted = false;
410 try {
411 while (threadCount > 0) {
412 LockSupport.park();
413 if (Thread.interrupted()) {
414 interrupted = true;
415 }
416 }
417
418 } finally {
419 try {
420 container.close(); // may throw
421 } finally {
422 closed = true;
423 if (interrupted) Thread.currentThread().interrupt();
424 }
425 }
426 }
427
428 /**
429 * {@return true if the flock has been {@linkplain #shutdown() shut down}}
430 */
431 public boolean isShutdown() {
432 return shutdown;
433 }
434
435 /**
436 * {@return true if the flock has been {@linkplain #close() closed}}
437 */
438 public boolean isClosed() {
439 return closed;
440 }
441
442 /**
443 * {@return a stream of the threads in this flock}
444 * The elements of the stream are threads that were started in this flock
445 * but have not terminated. The stream will reflect the set of threads in the
446 * flock at some point at or since the creation of the stream. It may or may
447 * not reflect changes to the set of threads subsequent to creation of the
448 * stream.
449 */
450 public Stream<Thread> threads() {
451 return threads.stream();
452 }
453
454 /**
455 * Tests if this flock contains the given thread. This method returns {@code true}
456 * if the thread was started in this flock and has not finished. If the thread
457 * is not in this flock then it tests if the thread is in flocks owned by threads
458 * in this flock, essentially equivalent to invoking {@code containsThread} method
459 * on all flocks owned by the threads in this flock.
460 *
461 * @param thread the thread
462 * @return true if this flock contains the thread
463 */
464 public boolean containsThread(Thread thread) {
465 var c = JLA.threadContainer(thread);
466 if (c == this.container)
467 return true;
468 if (c != null && c != ThreadContainers.root()) {
469 var parent = c.parent();
470 while (parent != null) {
471 if (parent == this.container)
472 return true;
473 parent = parent.parent();
474 }
475 }
476 return false;
477 }
478
479 @Override
480 public String toString() {
481 String id = Objects.toIdentityString(this);
482 if (name != null) {
483 return name + "/" + id;
484 } else {
485 return id;
486 }
487 }
488
489 /**
490 * A ThreadContainer backed by a ThreadFlock.
491 */
492 private static class ThreadContainerImpl extends ThreadContainer {
493 private final ThreadFlock flock;
494 private volatile Object key;
495 private boolean closing;
496
497 ThreadContainerImpl(ThreadFlock flock) {
498 super(/*shared*/ false);
499 this.flock = flock;
500 }
501
502 @Override
503 public ThreadContainerImpl push() {
504 // Virtual threads in the root containers may not be tracked so need
505 // to register container to ensure that it is found
506 if (!ThreadContainers.trackAllThreads()) {
507 Thread thread = Thread.currentThread();
508 if (thread.isVirtual()
509 && JLA.threadContainer(thread) == ThreadContainers.root()) {
510 this.key = ThreadContainers.registerContainer(this);
511 }
512 }
513 super.push();
514 return this;
515 }
516
517 /**
518 * Invoked by ThreadFlock.close when closing the flock. This method pops the
519 * container from the current thread's scope stack.
520 */
521 void close() {
522 assert Thread.currentThread() == owner();
523 if (!closing) {
524 closing = true;
525 boolean atTop = popForcefully(); // may block
526 Object key = this.key;
527 if (key != null)
528 ThreadContainers.deregisterContainer(key);
529 if (!atTop)
530 throw new StructureViolationException();
531 }
532 }
533
534 /**
535 * Invoked when an enclosing scope is closing. Invokes ThreadFlock.close to
536 * close the flock. This method does not pop the container from the current
537 * thread's scope stack.
538 */
539 @Override
540 protected boolean tryClose() {
541 assert Thread.currentThread() == owner();
542 if (!closing) {
543 closing = true;
544 flock.close();
545 Object key = this.key;
546 if (key != null)
547 ThreadContainers.deregisterContainer(key);
548 return true;
549 } else {
550 assert false : "Should not get there";
551 return false;
552 }
553 }
554
555 @Override
556 public String name() {
557 return flock.name();
558 }
559 @Override
560 public long threadCount() {
561 return flock.threadCount();
562 }
563 @Override
564 public Stream<Thread> threads() {
565 return flock.threads().filter(Thread::isAlive);
566 }
567 @Override
568 public void onStart(Thread thread) {
569 flock.onStart(thread);
570 }
571 @Override
572 public void onExit(Thread thread) {
573 flock.onExit(thread);
574 }
575 @Override
576 public ScopedValueContainer.BindingsSnapshot scopedValueBindings() {
577 return flock.scopedValueBindings();
578 }
579 }
580 }
--- EOF ---