1 /*
  2  * Copyright (c) 2021, 2024, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 package jdk.internal.misc;
 26 
 27 import java.lang.invoke.MethodHandles;
 28 import java.lang.invoke.VarHandle;
 29 import java.time.Duration;
 30 import java.util.Objects;
 31 import java.util.Set;
 32 import java.util.concurrent.ConcurrentHashMap;
 33 import java.util.concurrent.TimeoutException;
 34 import java.util.concurrent.StructureViolationException;
 35 import java.util.concurrent.locks.LockSupport;
 36 import java.util.stream.Stream;
 37 import jdk.internal.access.JavaLangAccess;
 38 import jdk.internal.access.SharedSecrets;
 39 import jdk.internal.invoke.MhUtil;
 40 import jdk.internal.vm.ScopedValueContainer;
 41 import jdk.internal.vm.ThreadContainer;
 42 import jdk.internal.vm.ThreadContainers;
 43 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 44 
 45 /**
 46  * A grouping of threads that typically run closely related tasks. Threads started
 47  * in a flock remain in the flock until they terminate.
 48  *
 49  * <p> ThreadFlock defines the {@link #open(String) open} method to open a new flock,
 50  * the {@link #start(Thread) start} method to start a thread in the flock, and the
 51  * {@link #close() close} method to close the flock. The {@code close} waits for all
 52  * threads in the flock to finish. The {@link #awaitAll() awaitAll} method may be used
 53  * to wait for all threads to finish without closing the flock. The {@link #wakeup()}
 54  * method will cause {@code awaitAll} method to complete early, which can be used to
 55  * support cancellation in higher-level APIs. ThreadFlock also defines the {@link
 56  * #shutdown() shutdown} method to prevent new threads from starting while allowing
 57  * existing threads in the flock to continue.
 58  *
 59  * <p> Thread flocks are intended to be used in a <em>structured manner</em>. The
 60  * thread that opens a new flock is the {@link #owner() owner}. The owner closes the
 61  * flock when done, failure to do so may result in a resource leak. The {@code open}
 62  * and {@code close} should be matched to avoid closing an <em>enclosing</em> flock
 63  * while a <em>nested</em> flock is open. A ThreadFlock can be used with the
 64  * try-with-resources construct if required but more likely, the close method of a
 65  * higher-level API that implements {@link AutoCloseable} will close the flock.
 66  *
 67  * <p> Thread flocks are conceptually nodes in a tree. A thread {@code T} started in
 68  * flock "A" may itself open a new flock "B", implicitly forming a tree where flock
 69  * "A" is the parent of flock "B". When nested, say where thread {@code T} opens
 70  * flock "B" and then invokes a method that opens flock "C", then the enclosing
 71  * flock "B" is conceptually the parent of the nested flock "C". ThreadFlock does
 72  * not define APIs that exposes the tree structure. It does define the {@link
 73  * #containsThread(Thread) containsThread} method to test if a flock contains a
 74  * thread, a test that is equivalent to testing membership of flocks in the tree.
 75  * The {@code start} and {@code shutdown} methods are confined to the flock
 76  * owner or threads contained in the flock. The confinement check is equivalent to
 77  * invoking the {@code containsThread} method to test if the caller is contained
 78  * in the flock.
 79  *
 80  * <p> Unless otherwise specified, passing a {@code null} argument to a method
 81  * in this class will cause a {@link NullPointerException} to be thrown.
 82  */
 83 public class ThreadFlock implements AutoCloseable {
 84     private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
 85     private static final VarHandle THREAD_COUNT;
 86     private static final VarHandle PERMIT;
 87     static {
 88         MethodHandles.Lookup l = MethodHandles.lookup();
 89         THREAD_COUNT = MhUtil.findVarHandle(l, "threadCount", int.class);
 90         PERMIT = MhUtil.findVarHandle(l, "permit", boolean.class);
 91     }
 92 
 93     private final Set<Thread> threads = ConcurrentHashMap.newKeySet();
 94 
 95     // thread count, need to re-examine contention once API is stable
 96     private volatile int threadCount;
 97 
 98     private final String name;
 99     private final ScopedValueContainer.BindingsSnapshot scopedValueBindings;
100     private final ThreadContainerImpl container; // encapsulate for now
101 
102     // state
103     private volatile boolean shutdown;
104     private volatile boolean closed;
105 
106     // set by wakeup, cleared by awaitAll
107     private volatile boolean permit;
108 
109     ThreadFlock(String name) {
110         this.name = name;
111         this.scopedValueBindings = ScopedValueContainer.captureBindings();
112         this.container = new ThreadContainerImpl(this);
113     }
114 
115     private long threadCount() {
116         return threadCount;
117     }
118 
119     private ScopedValueContainer.BindingsSnapshot scopedValueBindings() {
120         return scopedValueBindings;
121     }
122 
123     private void incrementThreadCount() {
124         THREAD_COUNT.getAndAdd(this, 1);
125     }
126 
127     /**
128      * Decrement the thread count. Unpark the owner if the count goes to zero.
129      */
130     private void decrementThreadCount() {
131         int count = (int) THREAD_COUNT.getAndAdd(this, -1) - 1;
132 
133         // signal owner when the count goes to zero
134         if (count == 0) {
135             LockSupport.unpark(owner());
136         }
137     }
138 
139     /**
140      * Invoked on the parent thread when starting {@code thread}.
141      */
142     private void onStart(Thread thread) {
143         incrementThreadCount();
144         boolean done = false;
145         try {
146             boolean added = threads.add(thread);
147             assert added;
148             if (shutdown)
149                 throw new IllegalStateException("Shutdown");
150             done = true;
151         } finally {
152             if (!done) {
153                 threads.remove(thread);
154                 decrementThreadCount();
155             }
156         }
157     }
158 
159     /**
160      * Invoked on the terminating thread or the parent thread when starting
161      * {@code thread} failed. This method is only called if onStart succeeded.
162      */
163     private void onExit(Thread thread) {
164         boolean removed = threads.remove(thread);
165         assert removed;
166         decrementThreadCount();
167     }
168 
169     /**
170      * Clear wakeup permit.
171      */
172     private void clearPermit() {
173         if (permit)
174             permit = false;
175     }
176 
177     /**
178      * Sets the wakeup permit to the given value, returning the previous value.
179      */
180     private boolean getAndSetPermit(boolean newValue) {
181         if (permit != newValue) {
182             return (boolean) PERMIT.getAndSet(this, newValue);
183         } else {
184             return newValue;
185         }
186     }
187 
188     /**
189      * Throws WrongThreadException if the current thread is not the owner.
190      */
191     private void ensureOwner() {
192         if (Thread.currentThread() != owner())
193             throw new WrongThreadException("Current thread not owner");
194     }
195 
196     /**
197      * Throws WrongThreadException if the current thread is not the owner
198      * or a thread contained in the flock.
199      */
200     private void ensureOwnerOrContainsThread() {
201         Thread currentThread = Thread.currentThread();
202         if (currentThread != owner() && !containsThread(currentThread))
203             throw new WrongThreadException("Current thread not owner or thread in flock");
204     }
205 
206     /**
207      * Opens a new thread flock. The flock is owned by the current thread. It can be
208      * named to aid debugging.
209      *
210      * <p> This method captures the current thread's {@linkplain ScopedValue scoped value}
211      * bindings for inheritance by threads created in the flock.
212      *
213      * <p> For the purposes of containment, monitoring, and debugging, the parent
214      * of the new flock is determined as follows:
215      * <ul>
216      * <li> If the current thread is the owner of open flocks then the most recently
217      * created, and open, flock is the parent of the new flock. In other words, the
218      * <em>enclosing flock</em> is the parent.
219      * <li> If the current thread is not the owner of any open flocks then the
220      * parent of the new flock is the current thread's flock. If the current thread
221      * was not started in a flock then the new flock does not have a parent.
222      * </ul>
223      *
224      * @param name the name of the flock, can be null
225      * @return a new thread flock
226      */
227     public static ThreadFlock open(String name) {
228         var flock = new ThreadFlock(name);
229         flock.container.push();
230         return flock;
231     }
232 
233     /**
234      * {@return the name of this flock or {@code null} if unnamed}
235      */
236     public String name() {
237         return name;
238     }
239 
240     /**
241      * {@return the owner of this flock}
242      */
243     public Thread owner() {
244         return container.owner();
245     }
246 
247     /**
248      * Starts the given unstarted thread in this flock.
249      *
250      * <p> The thread is started with the scoped value bindings that were captured
251      * when opening the flock. The bindings must match the current thread's bindings.
252      *
253      * <p> This method may only be invoked by the flock owner or threads {@linkplain
254      * #containsThread(Thread) contained} in the flock.
255      *
256      * @param thread the unstarted thread
257      * @return the thread, started
258      * @throws IllegalStateException if this flock is shutdown or closed
259      * @throws IllegalThreadStateException if the given thread was already started
260      * @throws WrongThreadException if the current thread is not the owner or a thread
261      * contained in the flock
262      * @throws StructureViolationException if the current scoped value bindings are
263      * not the same as when the flock was created
264      */
265     public Thread start(Thread thread) {
266         ensureOwnerOrContainsThread();
267         JLA.start(thread, container);
268         return thread;
269     }
270 
271     /**
272      * Shutdown this flock so that no new threads can be started, existing threads
273      * in the flock will continue to run. This method is a no-op if the flock is
274      * already shutdown or closed.
275      */
276     public void shutdown() {
277         if (!shutdown) {
278             shutdown = true;
279         }
280     }
281 
282     /**
283      * Wait for all threads in the flock to finish executing their tasks. This method
284      * waits until all threads finish, the {@link #wakeup() wakeup} method is invoked,
285      * or the current thread is interrupted.
286      *
287      * <p> This method may only be invoked by the flock owner. The method trivially
288      * returns true when the flock is closed.
289      *
290      * <p> This method clears the effect of any previous invocations of the
291      * {@code wakeup} method.
292      *
293      * @return true if there are no threads in the flock, false if wakeup was invoked
294      * and there are unfinished threads
295      * @throws InterruptedException if interrupted while waiting
296      * @throws WrongThreadException if invoked by a thread that is not the owner
297      */
298     public boolean awaitAll() throws InterruptedException {
299         ensureOwner();
300 
301         if (getAndSetPermit(false))
302             return (threadCount == 0);
303 
304         while (threadCount > 0 && !permit) {
305             LockSupport.park();
306             if (Thread.interrupted())
307                 throw new InterruptedException();
308         }
309         clearPermit();
310         return (threadCount == 0);
311     }
312 
313     /**
314      * Wait, up to the given waiting timeout, for all threads in the flock to finish
315      * executing their tasks. This method waits until all threads finish, the {@link
316      * #wakeup() wakeup} method is invoked, the current thread is interrupted, or
317      * the timeout expires.
318      *
319      * <p> This method may only be invoked by the flock owner. The method trivially
320      * returns true when the flock is closed.
321      *
322      * <p> This method clears the effect of any previous invocations of the {@code wakeup}
323      * method.
324      *
325      * @param timeout the maximum duration to wait
326      * @return true if there are no threads in the flock, false if wakeup was invoked
327      * and there are unfinished threads
328      * @throws InterruptedException if interrupted while waiting
329      * @throws TimeoutException if the wait timed out
330      * @throws WrongThreadException if invoked by a thread that is not the owner
331      */
332     public boolean awaitAll(Duration timeout)
333             throws InterruptedException, TimeoutException {
334         Objects.requireNonNull(timeout);
335         ensureOwner();
336 
337         if (getAndSetPermit(false))
338             return (threadCount == 0);
339 
340         long startNanos = System.nanoTime();
341         long nanos = NANOSECONDS.convert(timeout);
342         long remainingNanos = nanos;
343         while (threadCount > 0 && remainingNanos > 0 && !permit) {
344             LockSupport.parkNanos(remainingNanos);
345             if (Thread.interrupted())
346                 throw new InterruptedException();
347             remainingNanos = nanos - (System.nanoTime() - startNanos);
348         }
349 
350         boolean done = (threadCount == 0);
351         if (!done && remainingNanos <= 0 && !permit) {
352             throw new TimeoutException();
353         } else {
354             clearPermit();
355             return done;
356         }
357     }
358 
359     /**
360      * Causes the call to {@link #awaitAll()} or {@link #awaitAll(Duration)} by the
361      * {@linkplain #owner() owner} to return immediately.
362      *
363      * <p> If the owner is blocked in {@code awaitAll} then it will return immediately.
364      * If the owner is not blocked in {@code awaitAll} then its next call to wait
365      * will return immediately. The method does nothing when the flock is closed.
366      */
367     public void wakeup() {
368         if (!getAndSetPermit(true) && Thread.currentThread() != owner()) {
369             LockSupport.unpark(owner());
370         }
371     }
372 
373     /**
374      * Closes this flock. This method first shuts down the flock to prevent
375      * new threads from starting. It then waits for the threads in the flock
376      * to finish executing their tasks. In other words, this method blocks until
377      * all threads in the flock finish.
378      *
379      * <p> This method may only be invoked by the flock owner.
380      *
381      * <p> If interrupted then this method continues to wait until all threads
382      * finish, before completing with the interrupt status set.
383      *
384      * <p> A ThreadFlock is intended to be used in a <em>structured manner</em>. If
385      * this method is called to close a flock before nested flocks are closed then it
386      * closes the nested flocks (in the reverse order that they were created in),
387      * closes this flock, and then throws {@code StructureViolationException}.
388      * Similarly, if this method is called to close a thread flock while executing with
389      * scoped value bindings, and the thread flock was created before the scoped values
390      * were bound, then {@code StructureViolationException} is thrown after closing the
391      * thread flock.
392      *
393      * @throws WrongThreadException if invoked by a thread that is not the owner
394      * @throws StructureViolationException if a structure violation was detected
395      */
396     public void close() {
397         ensureOwner();
398         if (closed)
399             return;
400 
401         // shutdown, if not already shutdown
402         if (!shutdown)
403             shutdown = true;
404 
405         // wait for threads to finish
406         boolean interrupted = false;
407         try {
408             while (threadCount > 0) {
409                 LockSupport.park();
410                 if (Thread.interrupted()) {
411                     interrupted = true;
412                 }
413             }
414 
415         } finally {
416             try {
417                 container.close(); // may throw
418             } finally {
419                 closed = true;
420                 if (interrupted) Thread.currentThread().interrupt();
421             }
422         }
423     }
424 
425     /**
426      * {@return true if the flock has been {@linkplain #shutdown() shut down}}
427      */
428     public boolean isShutdown() {
429         return shutdown;
430     }
431 
432     /**
433      * {@return true if the flock has been {@linkplain #close() closed}}
434      */
435     public boolean isClosed() {
436         return closed;
437     }
438 
439     /**
440      * {@return a stream of the threads in this flock}
441      * The elements of the stream are threads that were started in this flock
442      * but have not terminated. The stream will reflect the set of threads in the
443      * flock at some point at or since the creation of the stream. It may or may
444      * not reflect changes to the set of threads subsequent to creation of the
445      * stream.
446      */
447     public Stream<Thread> threads() {
448         return threads.stream();
449     }
450 
451     /**
452      * Tests if this flock contains the given thread. This method returns {@code true}
453      * if the thread was started in this flock and has not finished. If the thread
454      * is not in this flock then it tests if the thread is in flocks owned by threads
455      * in this flock, essentially equivalent to invoking {@code containsThread} method
456      * on all flocks owned by the threads in this flock.
457      *
458      * @param thread the thread
459      * @return true if this flock contains the thread
460      */
461     public boolean containsThread(Thread thread) {
462         var c = JLA.threadContainer(thread);
463         if (c == this.container)
464             return true;
465         if (c != null && c != ThreadContainers.root()) {
466             var parent = c.parent();
467             while (parent != null) {
468                 if (parent == this.container)
469                     return true;
470                 parent = parent.parent();
471             }
472         }
473         return false;
474     }
475 
476     @Override
477     public String toString() {
478         String id = Objects.toIdentityString(this);
479         if (name != null) {
480             return name + "/" + id;
481         } else {
482             return id;
483         }
484     }
485 
486     /**
487      * A ThreadContainer backed by a ThreadFlock.
488      */
489     private static class ThreadContainerImpl extends ThreadContainer {
490         private final ThreadFlock flock;
491         private volatile Object key;
492         private boolean closing;
493 
494         ThreadContainerImpl(ThreadFlock flock) {
495             super(/*shared*/ false);
496             this.flock = flock;
497         }
498 
499         @Override
500         public ThreadContainerImpl push() {
501             // Virtual threads in the root containers may not be tracked so need
502             // to register container to ensure that it is found
503             if (!ThreadContainers.trackAllThreads()) {
504                 Thread thread = Thread.currentThread();
505                 if (thread.isVirtual()
506                         && JLA.threadContainer(thread) == ThreadContainers.root()) {
507                     this.key = ThreadContainers.registerContainer(this);
508                 }
509             }
510             super.push();
511             return this;
512         }
513 
514         /**
515          * Invoked by ThreadFlock.close when closing the flock. This method pops the
516          * container from the current thread's scope stack.
517          */
518         void close() {
519             assert Thread.currentThread() == owner();
520             if (!closing) {
521                 closing = true;
522                 boolean atTop = popForcefully(); // may block
523                 Object key = this.key;
524                 if (key != null)
525                     ThreadContainers.deregisterContainer(key);
526                 if (!atTop)
527                     throw new StructureViolationException();
528             }
529         }
530 
531         /**
532          * Invoked when an enclosing scope is closing. Invokes ThreadFlock.close to
533          * close the flock. This method does not pop the container from the current
534          * thread's scope stack.
535          */
536         @Override
537         protected boolean tryClose() {
538             assert Thread.currentThread() == owner();
539             if (!closing) {
540                 closing = true;
541                 flock.close();
542                 Object key = this.key;
543                 if (key != null)
544                     ThreadContainers.deregisterContainer(key);
545                 return true;
546             } else {
547                 assert false : "Should not get there";
548                 return false;
549             }
550         }
551 
552         @Override
553         public String name() {
554             return flock.name();
555         }
556         @Override
557         public long threadCount() {
558             return flock.threadCount();
559         }
560         @Override
561         public Stream<Thread> threads() {
562             return flock.threads().filter(Thread::isAlive);
563         }
564         @Override
565         public void onStart(Thread thread) {
566             flock.onStart(thread);
567         }
568         @Override
569         public void onExit(Thread thread) {
570             flock.onExit(thread);
571         }
572         @Override
573         public ScopedValueContainer.BindingsSnapshot scopedValueBindings() {
574             return flock.scopedValueBindings();
575         }
576     }
577 }