1 /*
  2  * Copyright (c) 2021, 2023, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 package jdk.internal.misc;
 26 
 27 import java.lang.invoke.MethodHandles;
 28 import java.lang.invoke.VarHandle;
 29 import java.time.Duration;
 30 import java.util.Objects;
 31 import java.util.Set;
 32 import java.util.concurrent.ConcurrentHashMap;
 33 import java.util.concurrent.TimeoutException;
 34 import java.util.concurrent.StructureViolationException;
 35 import java.util.concurrent.locks.LockSupport;
 36 import java.util.stream.Stream;
 37 import jdk.internal.access.JavaLangAccess;
 38 import jdk.internal.access.SharedSecrets;
 39 import jdk.internal.vm.ScopedValueContainer;
 40 import jdk.internal.vm.ThreadContainer;
 41 import jdk.internal.vm.ThreadContainers;
 42 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 43 
 44 /**
 45  * A grouping of threads that typically run closely related tasks. Threads started
 46  * in a flock remain in the flock until they terminate.
 47  *
 48  * <p> ThreadFlock defines the {@link #open(String) open} method to open a new flock,
 49  * the {@link #start(Thread) start} method to start a thread in the flock, and the
 50  * {@link #close() close} method to close the flock. The {@code close} waits for all
 51  * threads in the flock to finish. The {@link #awaitAll() awaitAll} method may be used
 52  * to wait for all threads to finish without closing the flock. The {@link #wakeup()}
 53  * method will cause {@code awaitAll} method to complete early, which can be used to
 54  * support cancellation in higher-level APIs. ThreadFlock also defines the {@link
 55  * #shutdown() shutdown} method to prevent new threads from starting while allowing
 56  * existing threads in the flock to continue.
 57  *
 58  * <p> Thread flocks are intended to be used in a <em>structured manner</em>. The
 59  * thread that opens a new flock is the {@link #owner() owner}. The owner closes the
 60  * flock when done, failure to do so may result in a resource leak. The {@code open}
 61  * and {@code close} should be matched to avoid closing an <em>enclosing</em> flock
 62  * while a <em>nested</em> flock is open. A ThreadFlock can be used with the
 63  * try-with-resources construct if required but more likely, the close method of a
 64  * higher-level API that implements {@link AutoCloseable} will close the flock.
 65  *
 66  * <p> Thread flocks are conceptually nodes in a tree. A thread {@code T} started in
 67  * flock "A" may itself open a new flock "B", implicitly forming a tree where flock
 68  * "A" is the parent of flock "B". When nested, say where thread {@code T} opens
 69  * flock "B" and then invokes a method that opens flock "C", then the enclosing
 70  * flock "B" is conceptually the parent of the nested flock "C". ThreadFlock does
 71  * not define APIs that exposes the tree structure. It does define the {@link
 72  * #containsThread(Thread) containsThread} method to test if a flock contains a
 73  * thread, a test that is equivalent to testing membership of flocks in the tree.
 74  * The {@code start} and {@code shutdown} methods are confined to the flock
 75  * owner or threads contained in the flock. The confinement check is equivalent to
 76  * invoking the {@code containsThread} method to test if the caller is contained
 77  * in the flock.
 78  *
 79  * <p> Unless otherwise specified, passing a {@code null} argument to a method
 80  * in this class will cause a {@link NullPointerException} to be thrown.
 81  */
 82 public class ThreadFlock implements AutoCloseable {
 83     private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
 84     private static final VarHandle THREAD_COUNT;
 85     private static final VarHandle PERMIT;
 86     static {
 87         try {
 88             MethodHandles.Lookup l = MethodHandles.lookup();
 89             THREAD_COUNT = l.findVarHandle(ThreadFlock.class, "threadCount", int.class);
 90             PERMIT = l.findVarHandle(ThreadFlock.class, "permit", boolean.class);
 91         } catch (Exception e) {
 92             throw new InternalError(e);
 93         }
 94     }
 95 
 96     private final Set<Thread> threads = ConcurrentHashMap.newKeySet();
 97 
 98     // thread count, need to re-examine contention once API is stable
 99     private volatile int threadCount;
100 
101     private final String name;
102     private final ScopedValueContainer.BindingsSnapshot scopedValueBindings;
103     private final ThreadContainerImpl container; // encapsulate for now
104 
105     // state
106     private volatile boolean shutdown;
107     private volatile boolean closed;
108 
109     // set by wakeup, cleared by awaitAll
110     private volatile boolean permit;
111 
112     ThreadFlock(String name) {
113         this.name = name;
114         this.scopedValueBindings = ScopedValueContainer.captureBindings();
115         this.container = new ThreadContainerImpl(this);
116     }
117 
118     private long threadCount() {
119         return threadCount;
120     }
121 
122     private ScopedValueContainer.BindingsSnapshot scopedValueBindings() {
123         return scopedValueBindings;
124     }
125 
126     private void incrementThreadCount() {
127         THREAD_COUNT.getAndAdd(this, 1);
128     }
129 
130     /**
131      * Decrement the thread count. Unpark the owner if the count goes to zero.
132      */
133     private void decrementThreadCount() {
134         int count = (int) THREAD_COUNT.getAndAdd(this, -1) - 1;
135 
136         // signal owner when the count goes to zero
137         if (count == 0) {
138             LockSupport.unpark(owner());
139         }
140     }
141 
142     /**
143      * Invoked on the parent thread when starting {@code thread}.
144      */
145     private void onStart(Thread thread) {
146         incrementThreadCount();
147         boolean done = false;
148         try {
149             boolean added = threads.add(thread);
150             assert added;
151             if (shutdown)
152                 throw new IllegalStateException("Shutdown");
153             done = true;
154         } finally {
155             if (!done) {
156                 threads.remove(thread);
157                 decrementThreadCount();
158             }
159         }
160     }
161 
162     /**
163      * Invoked on the terminating thread or the parent thread when starting
164      * {@code thread} failed. This method is only called if onStart succeeded.
165      */
166     private void onExit(Thread thread) {
167         boolean removed = threads.remove(thread);
168         assert removed;
169         decrementThreadCount();
170     }
171 
172     /**
173      * Clear wakeup permit.
174      */
175     private void clearPermit() {
176         if (permit)
177             permit = false;
178     }
179 
180     /**
181      * Sets the wakeup permit to the given value, returning the previous value.
182      */
183     private boolean getAndSetPermit(boolean newValue) {
184         if (permit != newValue) {
185             return (boolean) PERMIT.getAndSet(this, newValue);
186         } else {
187             return newValue;
188         }
189     }
190 
191     /**
192      * Throws WrongThreadException if the current thread is not the owner.
193      */
194     private void ensureOwner() {
195         if (Thread.currentThread() != owner())
196             throw new WrongThreadException("Current thread not owner");
197     }
198 
199     /**
200      * Throws WrongThreadException if the current thread is not the owner
201      * or a thread contained in the flock.
202      */
203     private void ensureOwnerOrContainsThread() {
204         Thread currentThread = Thread.currentThread();
205         if (currentThread != owner() && !containsThread(currentThread))
206             throw new WrongThreadException("Current thread not owner or thread in flock");
207     }
208 
209     /**
210      * Opens a new thread flock. The flock is owned by the current thread. It can be
211      * named to aid debugging.
212      *
213      * <p> This method captures the current thread's {@linkplain ScopedValue scoped value}
214      * bindings for inheritance by threads created in the flock.
215      *
216      * <p> For the purposes of containment, monitoring, and debugging, the parent
217      * of the new flock is determined as follows:
218      * <ul>
219      * <li> If the current thread is the owner of open flocks then the most recently
220      * created, and open, flock is the parent of the new flock. In other words, the
221      * <em>enclosing flock</em> is the parent.
222      * <li> If the current thread is not the owner of any open flocks then the
223      * parent of the new flock is the current thread's flock. If the current thread
224      * was not started in a flock then the new flock does not have a parent.
225      * </ul>
226      *
227      * @param name the name of the flock, can be null
228      * @return a new thread flock
229      */
230     public static ThreadFlock open(String name) {
231         var flock = new ThreadFlock(name);
232         flock.container.push();
233         return flock;
234     }
235 
236     /**
237      * {@return the name of this flock or {@code null} if unnamed}
238      */
239     public String name() {
240         return name;
241     }
242 
243     /**
244      * {@return the owner of this flock}
245      */
246     public Thread owner() {
247         return container.owner();
248     }
249 
250     /**
251      * Starts the given unstarted thread in this flock.
252      *
253      * <p> The thread is started with the scoped value bindings that were captured
254      * when opening the flock. The bindings must match the current thread's bindings.
255      *
256      * <p> This method may only be invoked by the flock owner or threads {@linkplain
257      * #containsThread(Thread) contained} in the flock.
258      *
259      * @param thread the unstarted thread
260      * @return the thread, started
261      * @throws IllegalStateException if this flock is shutdown or closed
262      * @throws IllegalThreadStateException if the given thread was already started
263      * @throws WrongThreadException if the current thread is not the owner or a thread
264      * contained in the flock
265      * @throws StructureViolationException if the current scoped value bindings are
266      * not the same as when the flock was created
267      */
268     public Thread start(Thread thread) {
269         ensureOwnerOrContainsThread();
270         JLA.start(thread, container);
271         return thread;
272     }
273 
274     /**
275      * Shutdown this flock so that no new threads can be started, existing threads
276      * in the flock will continue to run. This method is a no-op if the flock is
277      * already shutdown or closed.
278      *
279      * <p> This method may only be invoked by the flock owner or threads {@linkplain
280      * #containsThread(Thread) contained} in the flock.
281      *
282      * @throws WrongThreadException if the current thread is not the owner or a thread
283      * contained in the flock
284      */
285     public void shutdown() {
286         ensureOwnerOrContainsThread();
287         if (!shutdown) {
288             shutdown = true;
289         }
290     }
291 
292     /**
293      * Wait for all threads in the flock to finish executing their tasks. This method
294      * waits until all threads finish, the {@link #wakeup() wakeup} method is invoked,
295      * or the current thread is interrupted.
296      *
297      * <p> This method may only be invoked by the flock owner. The method trivially
298      * returns true when the flock is closed.
299      *
300      * <p> This method clears the effect of any previous invocations of the
301      * {@code wakeup} method.
302      *
303      * @return true if there are no threads in the flock, false if wakeup was invoked
304      * and there are unfinished threads
305      * @throws InterruptedException if interrupted while waiting
306      * @throws WrongThreadException if invoked by a thread that is not the owner
307      */
308     public boolean awaitAll() throws InterruptedException {
309         ensureOwner();
310 
311         if (getAndSetPermit(false))
312             return (threadCount == 0);
313 
314         while (threadCount > 0 && !permit) {
315             LockSupport.park();
316             if (Thread.interrupted())
317                 throw new InterruptedException();
318         }
319         clearPermit();
320         return (threadCount == 0);
321     }
322 
323     /**
324      * Wait, up to the given waiting timeout, for all threads in the flock to finish
325      * executing their tasks. This method waits until all threads finish, the {@link
326      * #wakeup() wakeup} method is invoked, the current thread is interrupted, or
327      * the timeout expires.
328      *
329      * <p> This method may only be invoked by the flock owner. The method trivially
330      * returns true when the flock is closed.
331      *
332      * <p> This method clears the effect of any previous invocations of the {@code wakeup}
333      * method.
334      *
335      * @param timeout the maximum duration to wait
336      * @return true if there are no threads in the flock, false if wakeup was invoked
337      * and there are unfinished threads
338      * @throws InterruptedException if interrupted while waiting
339      * @throws TimeoutException if the wait timed out
340      * @throws WrongThreadException if invoked by a thread that is not the owner
341      */
342     public boolean awaitAll(Duration timeout)
343             throws InterruptedException, TimeoutException {
344         Objects.requireNonNull(timeout);
345         ensureOwner();
346 
347         if (getAndSetPermit(false))
348             return (threadCount == 0);
349 
350         long startNanos = System.nanoTime();
351         long nanos = NANOSECONDS.convert(timeout);
352         long remainingNanos = nanos;
353         while (threadCount > 0 && remainingNanos > 0 && !permit) {
354             LockSupport.parkNanos(remainingNanos);
355             if (Thread.interrupted())
356                 throw new InterruptedException();
357             remainingNanos = nanos - (System.nanoTime() - startNanos);
358         }
359 
360         boolean done = (threadCount == 0);
361         if (!done && remainingNanos <= 0 && !permit) {
362             throw new TimeoutException();
363         } else {
364             clearPermit();
365             return done;
366         }
367     }
368 
369     /**
370      * Causes the call to {@link #awaitAll()} or {@link #awaitAll(Duration)} by the
371      * {@linkplain #owner() owner} to return immediately.
372      *
373      * <p> If the owner is blocked in {@code awaitAll} then it will return immediately.
374      * If the owner is not blocked in {@code awaitAll} then its next call to wait
375      * will return immediately. The method does nothing when the flock is closed.
376      *
377      * @throws WrongThreadException if the current thread is not the owner or a thread
378      * contained in the flock
379      */
380     public void wakeup() {
381         ensureOwnerOrContainsThread();
382         if (!getAndSetPermit(true) && Thread.currentThread() != owner()) {
383             LockSupport.unpark(owner());
384         }
385     }
386 
387     /**
388      * Closes this flock. This method first shuts down the flock to prevent
389      * new threads from starting. It then waits for the threads in the flock
390      * to finish executing their tasks. In other words, this method blocks until
391      * all threads in the flock finish.
392      *
393      * <p> This method may only be invoked by the flock owner.
394      *
395      * <p> If interrupted then this method continues to wait until all threads
396      * finish, before completing with the interrupt status set.
397      *
398      * <p> A ThreadFlock is intended to be used in a <em>structured manner</em>. If
399      * this method is called to close a flock before nested flocks are closed then it
400      * closes the nested flocks (in the reverse order that they were created in),
401      * closes this flock, and then throws {@code StructureViolationException}.
402      * Similarly, if this method is called to close a thread flock while executing with
403      * scoped value bindings, and the thread flock was created before the scoped values
404      * were bound, then {@code StructureViolationException} is thrown after closing the
405      * thread flock.
406      *
407      * @throws WrongThreadException if invoked by a thread that is not the owner
408      * @throws StructureViolationException if a structure violation was detected
409      */
410     public void close() {
411         ensureOwner();
412         if (closed)
413             return;
414 
415         // shutdown, if not already shutdown
416         if (!shutdown)
417             shutdown = true;
418 
419         // wait for threads to finish
420         boolean interrupted = false;
421         try {
422             while (threadCount > 0) {
423                 LockSupport.park();
424                 if (Thread.interrupted()) {
425                     interrupted = true;
426                 }
427             }
428 
429         } finally {
430             try {
431                 container.close(); // may throw
432             } finally {
433                 closed = true;
434                 if (interrupted) Thread.currentThread().interrupt();
435             }
436         }
437     }
438 
439     /**
440      * {@return true if the flock has been {@linkplain #shutdown() shut down}}
441      */
442     public boolean isShutdown() {
443         return shutdown;
444     }
445 
446     /**
447      * {@return true if the flock has been {@linkplain #close() closed}}
448      */
449     public boolean isClosed() {
450         return closed;
451     }
452 
453     /**
454      * {@return a stream of the threads in this flock}
455      * The elements of the stream are threads that were started in this flock
456      * but have not terminated. The stream will reflect the set of threads in the
457      * flock at some point at or since the creation of the stream. It may or may
458      * not reflect changes to the set of threads subsequent to creation of the
459      * stream.
460      */
461     public Stream<Thread> threads() {
462         return threads.stream();
463     }
464 
465     /**
466      * Tests if this flock contains the given thread. This method returns {@code true}
467      * if the thread was started in this flock and has not finished. If the thread
468      * is not in this flock then it tests if the thread is in flocks owned by threads
469      * in this flock, essentially equivalent to invoking {@code containsThread} method
470      * on all flocks owned by the threads in this flock.
471      *
472      * @param thread the thread
473      * @return true if this flock contains the thread
474      */
475     public boolean containsThread(Thread thread) {
476         var c = JLA.threadContainer(thread);
477         if (c == this.container)
478             return true;
479         if (c != null && c != ThreadContainers.root()) {
480             var parent = c.parent();
481             while (parent != null) {
482                 if (parent == this.container)
483                     return true;
484                 parent = parent.parent();
485             }
486         }
487         return false;
488     }
489 
490     @Override
491     public String toString() {
492         String id = Objects.toIdentityString(this);
493         if (name != null) {
494             return name + "/" + id;
495         } else {
496             return id;
497         }
498     }
499 
500     /**
501      * A ThreadContainer backed by a ThreadFlock.
502      */
503     private static class ThreadContainerImpl extends ThreadContainer {
504         private final ThreadFlock flock;
505         private volatile Object key;
506         private boolean closing;
507 
508         ThreadContainerImpl(ThreadFlock flock) {
509             super(/*shared*/ false);
510             this.flock = flock;
511         }
512 
513         @Override
514         public ThreadContainerImpl push() {
515             // Virtual threads in the root containers may not be tracked so need
516             // to register container to ensure that it is found
517             if (!ThreadContainers.trackAllThreads()) {
518                 Thread thread = Thread.currentThread();
519                 if (thread.isVirtual()
520                         && JLA.threadContainer(thread) == ThreadContainers.root()) {
521                     this.key = ThreadContainers.registerContainer(this);
522                 }
523             }
524             super.push();
525             return this;
526         }
527 
528         /**
529          * Invoked by ThreadFlock.close when closing the flock. This method pops the
530          * container from the current thread's scope stack.
531          */
532         void close() {
533             assert Thread.currentThread() == owner();
534             if (!closing) {
535                 closing = true;
536                 boolean atTop = popForcefully(); // may block
537                 Object key = this.key;
538                 if (key != null)
539                     ThreadContainers.deregisterContainer(key);
540                 if (!atTop)
541                     throw new StructureViolationException();
542             }
543         }
544 
545         /**
546          * Invoked when an enclosing scope is closing. Invokes ThreadFlock.close to
547          * close the flock. This method does not pop the container from the current
548          * thread's scope stack.
549          */
550         @Override
551         protected boolean tryClose() {
552             assert Thread.currentThread() == owner();
553             if (!closing) {
554                 closing = true;
555                 flock.close();
556                 Object key = this.key;
557                 if (key != null)
558                     ThreadContainers.deregisterContainer(key);
559                 return true;
560             } else {
561                 assert false : "Should not get there";
562                 return false;
563             }
564         }
565 
566         @Override
567         public String name() {
568             return flock.name();
569         }
570         @Override
571         public long threadCount() {
572             return flock.threadCount();
573         }
574         @Override
575         public Stream<Thread> threads() {
576             return flock.threads().filter(Thread::isAlive);
577         }
578         @Override
579         public void onStart(Thread thread) {
580             flock.onStart(thread);
581         }
582         @Override
583         public void onExit(Thread thread) {
584             flock.onExit(thread);
585         }
586         @Override
587         public ScopedValueContainer.BindingsSnapshot scopedValueBindings() {
588             return flock.scopedValueBindings();
589         }
590     }
591 }
--- EOF ---