1 /*
  2  * Copyright (c) 2021, 2024, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 package jdk.internal.misc;
 26 
 27 import java.lang.invoke.MethodHandles;
 28 import java.lang.invoke.VarHandle;
 29 import java.time.Duration;
 30 import java.util.Objects;
 31 import java.util.Set;
 32 import java.util.concurrent.ConcurrentHashMap;
 33 import java.util.concurrent.TimeoutException;
 34 import java.util.concurrent.StructureViolationException;
 35 import java.util.concurrent.locks.LockSupport;
 36 import java.util.stream.Stream;
 37 import jdk.internal.access.JavaLangAccess;
 38 import jdk.internal.access.SharedSecrets;
 39 import jdk.internal.invoke.MhUtil;
 40 import jdk.internal.vm.ScopedValueContainer;
 41 import jdk.internal.vm.ThreadContainer;
 42 import jdk.internal.vm.ThreadContainers;
 43 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 44 
 45 /**
 46  * A grouping of threads that typically run closely related tasks. Threads started
 47  * in a flock remain in the flock until they terminate.
 48  *
 49  * <p> ThreadFlock defines the {@link #open(String) open} method to open a new flock,
 50  * the {@link #start(Thread) start} method to start a thread in the flock, and the
 51  * {@link #close() close} method to close the flock. The {@code close} waits for all
 52  * threads in the flock to finish. The {@link #awaitAll() awaitAll} method may be used
 53  * to wait for all threads to finish without closing the flock. The {@link #wakeup()}
 54  * method will cause {@code awaitAll} method to complete early, which can be used to
 55  * support cancellation in higher-level APIs. ThreadFlock also defines the {@link
 56  * #shutdown() shutdown} method to prevent new threads from starting while allowing
 57  * existing threads in the flock to continue.
 58  *
 59  * <p> Thread flocks are intended to be used in a <em>structured manner</em>. The
 60  * thread that opens a new flock is the {@link #owner() owner}. The owner closes the
 61  * flock when done, failure to do so may result in a resource leak. The {@code open}
 62  * and {@code close} should be matched to avoid closing an <em>enclosing</em> flock
 63  * while a <em>nested</em> flock is open. A ThreadFlock can be used with the
 64  * try-with-resources construct if required but more likely, the close method of a
 65  * higher-level API that implements {@link AutoCloseable} will close the flock.
 66  *
 67  * <p> Thread flocks are conceptually nodes in a tree. A thread {@code T} started in
 68  * flock "A" may itself open a new flock "B", implicitly forming a tree where flock
 69  * "A" is the parent of flock "B". When nested, say where thread {@code T} opens
 70  * flock "B" and then invokes a method that opens flock "C", then the enclosing
 71  * flock "B" is conceptually the parent of the nested flock "C". ThreadFlock does
 72  * not define APIs that exposes the tree structure. It does define the {@link
 73  * #containsThread(Thread) containsThread} method to test if a flock contains a
 74  * thread, a test that is equivalent to testing membership of flocks in the tree.
 75  * The {@code start} and {@code shutdown} methods are confined to the flock
 76  * owner or threads contained in the flock. The confinement check is equivalent to
 77  * invoking the {@code containsThread} method to test if the caller is contained
 78  * in the flock.
 79  *
 80  * <p> Unless otherwise specified, passing a {@code null} argument to a method
 81  * in this class will cause a {@link NullPointerException} to be thrown.
 82  */
 83 public class ThreadFlock implements AutoCloseable {
 84     private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
 85     private static final VarHandle THREAD_COUNT;
 86     private static final VarHandle PERMIT;
 87     static {
 88         MethodHandles.Lookup l = MethodHandles.lookup();
 89         THREAD_COUNT = MhUtil.findVarHandle(l, "threadCount", int.class);
 90         PERMIT = MhUtil.findVarHandle(l, "permit", boolean.class);
 91     }
 92 
 93     private final Set<Thread> threads = ConcurrentHashMap.newKeySet();
 94 
 95     // thread count, need to re-examine contention once API is stable
 96     private volatile int threadCount;
 97 
 98     private final String name;
 99     private final ScopedValueContainer.BindingsSnapshot scopedValueBindings;
100     private final ThreadContainerImpl container; // encapsulate for now
101 
102     // state
103     private volatile boolean shutdown;
104     private volatile boolean closed;
105 
106     // set by wakeup, cleared by awaitAll
107     private volatile boolean permit;
108 
109     ThreadFlock(String name) {
110         this.name = name;
111         this.scopedValueBindings = ScopedValueContainer.captureBindings();
112         this.container = new ThreadContainerImpl(this);
113     }
114 
115     private long threadCount() {
116         return threadCount;
117     }
118 
119     private ScopedValueContainer.BindingsSnapshot scopedValueBindings() {
120         return scopedValueBindings;
121     }
122 
123     private void incrementThreadCount() {
124         THREAD_COUNT.getAndAdd(this, 1);
125     }
126 
127     /**
128      * Decrement the thread count. Unpark the owner if the count goes to zero.
129      */
130     private void decrementThreadCount() {
131         int count = (int) THREAD_COUNT.getAndAdd(this, -1) - 1;
132 
133         // signal owner when the count goes to zero
134         if (count == 0) {
135             LockSupport.unpark(owner());
136         }
137     }
138 
139     /**
140      * Invoked on the parent thread when starting {@code thread}.
141      */
142     private void onStart(Thread thread) {
143         incrementThreadCount();
144         boolean done = false;
145         try {
146             boolean added = threads.add(thread);
147             assert added;
148             if (shutdown)
149                 throw new IllegalStateException("Shutdown");
150             done = true;
151         } finally {
152             if (!done) {
153                 threads.remove(thread);
154                 decrementThreadCount();
155             }
156         }
157     }
158 
159     /**
160      * Invoked on the terminating thread or the parent thread when starting
161      * {@code thread} failed. This method is only called if onStart succeeded.
162      */
163     private void onExit(Thread thread) {
164         boolean removed = threads.remove(thread);
165         assert removed;
166         decrementThreadCount();
167     }
168 
169     /**
170      * Clear wakeup permit.
171      */
172     private void clearPermit() {
173         if (permit)
174             permit = false;
175     }
176 
177     /**
178      * Sets the wakeup permit to the given value, returning the previous value.
179      */
180     private boolean getAndSetPermit(boolean newValue) {
181         if (permit != newValue) {
182             return (boolean) PERMIT.getAndSet(this, newValue);
183         } else {
184             return newValue;
185         }
186     }
187 
188     /**
189      * Throws WrongThreadException if the current thread is not the owner.
190      */
191     private void ensureOwner() {
192         if (Thread.currentThread() != owner())
193             throw new WrongThreadException("Current thread not owner");
194     }
195 
196     /**
197      * Throws WrongThreadException if the current thread is not the owner
198      * or a thread contained in the flock.
199      */
200     private void ensureOwnerOrContainsThread() {
201         Thread currentThread = Thread.currentThread();
202         if (currentThread != owner() && !containsThread(currentThread))
203             throw new WrongThreadException("Current thread not owner or thread in flock");
204     }
205 
206     /**
207      * Opens a new thread flock. The flock is owned by the current thread. It can be
208      * named to aid debugging.
209      *
210      * <p> This method captures the current thread's {@linkplain ScopedValue scoped value}
211      * bindings for inheritance by threads created in the flock.
212      *
213      * <p> For the purposes of containment, monitoring, and debugging, the parent
214      * of the new flock is determined as follows:
215      * <ul>
216      * <li> If the current thread is the owner of open flocks then the most recently
217      * created, and open, flock is the parent of the new flock. In other words, the
218      * <em>enclosing flock</em> is the parent.
219      * <li> If the current thread is not the owner of any open flocks then the
220      * parent of the new flock is the current thread's flock. If the current thread
221      * was not started in a flock then the new flock does not have a parent.
222      * </ul>
223      *
224      * @param name the name of the flock, can be null
225      * @return a new thread flock
226      */
227     public static ThreadFlock open(String name) {
228         var flock = new ThreadFlock(name);
229         flock.container.push();
230         return flock;
231     }
232 
233     /**
234      * {@return the name of this flock or {@code null} if unnamed}
235      */
236     public String name() {
237         return name;
238     }
239 
240     /**
241      * {@return the owner of this flock}
242      */
243     public Thread owner() {
244         return container.owner();
245     }
246 
247     /**
248      * Starts the given unstarted thread in this flock.
249      *
250      * <p> The thread is started with the scoped value bindings that were captured
251      * when opening the flock. The bindings must match the current thread's bindings.
252      *
253      * <p> This method may only be invoked by the flock owner or threads {@linkplain
254      * #containsThread(Thread) contained} in the flock.
255      *
256      * @param thread the unstarted thread
257      * @return the thread, started
258      * @throws IllegalStateException if this flock is shutdown or closed
259      * @throws IllegalThreadStateException if the given thread was already started
260      * @throws WrongThreadException if the current thread is not the owner or a thread
261      * contained in the flock
262      * @throws StructureViolationException if the current scoped value bindings are
263      * not the same as when the flock was created
264      */
265     public Thread start(Thread thread) {
266         ensureOwnerOrContainsThread();
267         JLA.start(thread, container);
268         return thread;
269     }
270 
271     /**
272      * Shutdown this flock so that no new threads can be started, existing threads
273      * in the flock will continue to run. This method is a no-op if the flock is
274      * already shutdown or closed.
275      *
276      * <p> This method may only be invoked by the flock owner or threads {@linkplain
277      * #containsThread(Thread) contained} in the flock.
278      *
279      * @throws WrongThreadException if the current thread is not the owner or a thread
280      * contained in the flock
281      */
282     public void shutdown() {
283         ensureOwnerOrContainsThread();
284         if (!shutdown) {
285             shutdown = true;
286         }
287     }
288 
289     /**
290      * Wait for all threads in the flock to finish executing their tasks. This method
291      * waits until all threads finish, the {@link #wakeup() wakeup} method is invoked,
292      * or the current thread is interrupted.
293      *
294      * <p> This method may only be invoked by the flock owner. The method trivially
295      * returns true when the flock is closed.
296      *
297      * <p> This method clears the effect of any previous invocations of the
298      * {@code wakeup} method.
299      *
300      * @return true if there are no threads in the flock, false if wakeup was invoked
301      * and there are unfinished threads
302      * @throws InterruptedException if interrupted while waiting
303      * @throws WrongThreadException if invoked by a thread that is not the owner
304      */
305     public boolean awaitAll() throws InterruptedException {
306         ensureOwner();
307 
308         if (getAndSetPermit(false))
309             return (threadCount == 0);
310 
311         while (threadCount > 0 && !permit) {
312             LockSupport.park();
313             if (Thread.interrupted())
314                 throw new InterruptedException();
315         }
316         clearPermit();
317         return (threadCount == 0);
318     }
319 
320     /**
321      * Wait, up to the given waiting timeout, for all threads in the flock to finish
322      * executing their tasks. This method waits until all threads finish, the {@link
323      * #wakeup() wakeup} method is invoked, the current thread is interrupted, or
324      * the timeout expires.
325      *
326      * <p> This method may only be invoked by the flock owner. The method trivially
327      * returns true when the flock is closed.
328      *
329      * <p> This method clears the effect of any previous invocations of the {@code wakeup}
330      * method.
331      *
332      * @param timeout the maximum duration to wait
333      * @return true if there are no threads in the flock, false if wakeup was invoked
334      * and there are unfinished threads
335      * @throws InterruptedException if interrupted while waiting
336      * @throws TimeoutException if the wait timed out
337      * @throws WrongThreadException if invoked by a thread that is not the owner
338      */
339     public boolean awaitAll(Duration timeout)
340             throws InterruptedException, TimeoutException {
341         Objects.requireNonNull(timeout);
342         ensureOwner();
343 
344         if (getAndSetPermit(false))
345             return (threadCount == 0);
346 
347         long startNanos = System.nanoTime();
348         long nanos = NANOSECONDS.convert(timeout);
349         long remainingNanos = nanos;
350         while (threadCount > 0 && remainingNanos > 0 && !permit) {
351             LockSupport.parkNanos(remainingNanos);
352             if (Thread.interrupted())
353                 throw new InterruptedException();
354             remainingNanos = nanos - (System.nanoTime() - startNanos);
355         }
356 
357         boolean done = (threadCount == 0);
358         if (!done && remainingNanos <= 0 && !permit) {
359             throw new TimeoutException();
360         } else {
361             clearPermit();
362             return done;
363         }
364     }
365 
366     /**
367      * Causes the call to {@link #awaitAll()} or {@link #awaitAll(Duration)} by the
368      * {@linkplain #owner() owner} to return immediately.
369      *
370      * <p> If the owner is blocked in {@code awaitAll} then it will return immediately.
371      * If the owner is not blocked in {@code awaitAll} then its next call to wait
372      * will return immediately. The method does nothing when the flock is closed.
373      *
374      * @throws WrongThreadException if the current thread is not the owner or a thread
375      * contained in the flock
376      */
377     public void wakeup() {
378         ensureOwnerOrContainsThread();
379         if (!getAndSetPermit(true) && Thread.currentThread() != owner()) {
380             LockSupport.unpark(owner());
381         }
382     }
383 
384     /**
385      * Closes this flock. This method first shuts down the flock to prevent
386      * new threads from starting. It then waits for the threads in the flock
387      * to finish executing their tasks. In other words, this method blocks until
388      * all threads in the flock finish.
389      *
390      * <p> This method may only be invoked by the flock owner.
391      *
392      * <p> If interrupted then this method continues to wait until all threads
393      * finish, before completing with the interrupt status set.
394      *
395      * <p> A ThreadFlock is intended to be used in a <em>structured manner</em>. If
396      * this method is called to close a flock before nested flocks are closed then it
397      * closes the nested flocks (in the reverse order that they were created in),
398      * closes this flock, and then throws {@code StructureViolationException}.
399      * Similarly, if this method is called to close a thread flock while executing with
400      * scoped value bindings, and the thread flock was created before the scoped values
401      * were bound, then {@code StructureViolationException} is thrown after closing the
402      * thread flock.
403      *
404      * @throws WrongThreadException if invoked by a thread that is not the owner
405      * @throws StructureViolationException if a structure violation was detected
406      */
407     public void close() {
408         ensureOwner();
409         if (closed)
410             return;
411 
412         // shutdown, if not already shutdown
413         if (!shutdown)
414             shutdown = true;
415 
416         // wait for threads to finish
417         boolean interrupted = false;
418         try {
419             while (threadCount > 0) {
420                 LockSupport.park();
421                 if (Thread.interrupted()) {
422                     interrupted = true;
423                 }
424             }
425 
426         } finally {
427             try {
428                 container.close(); // may throw
429             } finally {
430                 closed = true;
431                 if (interrupted) Thread.currentThread().interrupt();
432             }
433         }
434     }
435 
436     /**
437      * {@return true if the flock has been {@linkplain #shutdown() shut down}}
438      */
439     public boolean isShutdown() {
440         return shutdown;
441     }
442 
443     /**
444      * {@return true if the flock has been {@linkplain #close() closed}}
445      */
446     public boolean isClosed() {
447         return closed;
448     }
449 
450     /**
451      * {@return a stream of the threads in this flock}
452      * The elements of the stream are threads that were started in this flock
453      * but have not terminated. The stream will reflect the set of threads in the
454      * flock at some point at or since the creation of the stream. It may or may
455      * not reflect changes to the set of threads subsequent to creation of the
456      * stream.
457      */
458     public Stream<Thread> threads() {
459         return threads.stream();
460     }
461 
462     /**
463      * Tests if this flock contains the given thread. This method returns {@code true}
464      * if the thread was started in this flock and has not finished. If the thread
465      * is not in this flock then it tests if the thread is in flocks owned by threads
466      * in this flock, essentially equivalent to invoking {@code containsThread} method
467      * on all flocks owned by the threads in this flock.
468      *
469      * @param thread the thread
470      * @return true if this flock contains the thread
471      */
472     public boolean containsThread(Thread thread) {
473         var c = JLA.threadContainer(thread);
474         if (c == this.container)
475             return true;
476         if (c != null && c != ThreadContainers.root()) {
477             var parent = c.parent();
478             while (parent != null) {
479                 if (parent == this.container)
480                     return true;
481                 parent = parent.parent();
482             }
483         }
484         return false;
485     }
486 
487     @Override
488     public String toString() {
489         String id = Objects.toIdentityString(this);
490         if (name != null) {
491             return name + "/" + id;
492         } else {
493             return id;
494         }
495     }
496 
497     /**
498      * A ThreadContainer backed by a ThreadFlock.
499      */
500     private static class ThreadContainerImpl extends ThreadContainer {
501         private final ThreadFlock flock;
502         private volatile Object key;
503         private boolean closing;
504 
505         ThreadContainerImpl(ThreadFlock flock) {
506             super(/*shared*/ false);
507             this.flock = flock;
508         }
509 
510         @Override
511         public ThreadContainerImpl push() {
512             // Virtual threads in the root containers may not be tracked so need
513             // to register container to ensure that it is found
514             if (!ThreadContainers.trackAllThreads()) {
515                 Thread thread = Thread.currentThread();
516                 if (thread.isVirtual()
517                         && JLA.threadContainer(thread) == ThreadContainers.root()) {
518                     this.key = ThreadContainers.registerContainer(this);
519                 }
520             }
521             super.push();
522             return this;
523         }
524 
525         /**
526          * Invoked by ThreadFlock.close when closing the flock. This method pops the
527          * container from the current thread's scope stack.
528          */
529         void close() {
530             assert Thread.currentThread() == owner();
531             if (!closing) {
532                 closing = true;
533                 boolean atTop = popForcefully(); // may block
534                 Object key = this.key;
535                 if (key != null)
536                     ThreadContainers.deregisterContainer(key);
537                 if (!atTop)
538                     throw new StructureViolationException();
539             }
540         }
541 
542         /**
543          * Invoked when an enclosing scope is closing. Invokes ThreadFlock.close to
544          * close the flock. This method does not pop the container from the current
545          * thread's scope stack.
546          */
547         @Override
548         protected boolean tryClose() {
549             assert Thread.currentThread() == owner();
550             if (!closing) {
551                 closing = true;
552                 flock.close();
553                 Object key = this.key;
554                 if (key != null)
555                     ThreadContainers.deregisterContainer(key);
556                 return true;
557             } else {
558                 assert false : "Should not get there";
559                 return false;
560             }
561         }
562 
563         @Override
564         public String name() {
565             return flock.name();
566         }
567         @Override
568         public long threadCount() {
569             return flock.threadCount();
570         }
571         @Override
572         public Stream<Thread> threads() {
573             return flock.threads().filter(Thread::isAlive);
574         }
575         @Override
576         public void onStart(Thread thread) {
577             flock.onStart(thread);
578         }
579         @Override
580         public void onExit(Thread thread) {
581             flock.onExit(thread);
582         }
583         @Override
584         public ScopedValueContainer.BindingsSnapshot scopedValueBindings() {
585             return flock.scopedValueBindings();
586         }
587     }
588 }