1 /*
  2  * Copyright (c) 2021, 2024, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 package jdk.internal.misc;
 26 
 27 import java.lang.invoke.MethodHandles;
 28 import java.lang.invoke.VarHandle;
 29 import java.time.Duration;
 30 import java.util.Objects;
 31 import java.util.Set;
 32 import java.util.concurrent.ConcurrentHashMap;
 33 import java.util.concurrent.TimeoutException;
 34 import java.util.concurrent.StructureViolationException;
 35 import java.util.concurrent.locks.LockSupport;
 36 import java.util.stream.Stream;
 37 import jdk.internal.access.JavaLangAccess;
 38 import jdk.internal.access.SharedSecrets;
 39 import jdk.internal.vm.ScopedValueContainer;
 40 import jdk.internal.vm.ThreadContainer;
 41 import jdk.internal.vm.ThreadContainers;
 42 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 43 
 44 /**
 45  * A grouping of threads that typically run closely related tasks. Threads started
 46  * in a flock remain in the flock until they terminate.
 47  *
 48  * <p> ThreadFlock defines the {@link #open(String) open} method to open a new flock,
 49  * the {@link #start(Thread) start} method to start a thread in the flock, and the
 50  * {@link #close() close} method to close the flock. The {@code close} waits for all
 51  * threads in the flock to finish. The {@link #awaitAll() awaitAll} method may be used
 52  * to wait for all threads to finish without closing the flock. The {@link #wakeup()}
 53  * method will cause {@code awaitAll} method to complete early, which can be used to
 54  * support cancellation in higher-level APIs. ThreadFlock also defines the {@link
 55  * #shutdown() shutdown} method to prevent new threads from starting while allowing
 56  * existing threads in the flock to continue.
 57  *
 58  * <p> Thread flocks are intended to be used in a <em>structured manner</em>. The
 59  * thread that opens a new flock is the {@link #owner() owner}. The owner closes the
 60  * flock when done, failure to do so may result in a resource leak. The {@code open}
 61  * and {@code close} should be matched to avoid closing an <em>enclosing</em> flock
 62  * while a <em>nested</em> flock is open. A ThreadFlock can be used with the
 63  * try-with-resources construct if required but more likely, the close method of a
 64  * higher-level API that implements {@link AutoCloseable} will close the flock.
 65  *
 66  * <p> Thread flocks are conceptually nodes in a tree. A thread {@code T} started in
 67  * flock "A" may itself open a new flock "B", implicitly forming a tree where flock
 68  * "A" is the parent of flock "B". When nested, say where thread {@code T} opens
 69  * flock "B" and then invokes a method that opens flock "C", then the enclosing
 70  * flock "B" is conceptually the parent of the nested flock "C". ThreadFlock does
 71  * not define APIs that exposes the tree structure. It does define the {@link
 72  * #containsThread(Thread) containsThread} method to test if a flock contains a
 73  * thread, a test that is equivalent to testing membership of flocks in the tree.
 74  * The {@code start} and {@code shutdown} methods are confined to the flock
 75  * owner or threads contained in the flock. The confinement check is equivalent to
 76  * invoking the {@code containsThread} method to test if the caller is contained
 77  * in the flock.
 78  *
 79  * <p> Unless otherwise specified, passing a {@code null} argument to a method
 80  * in this class will cause a {@link NullPointerException} to be thrown.
 81  */
 82 public class ThreadFlock implements AutoCloseable {
 83     private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
 84     private static final VarHandle THREAD_COUNT;
 85     private static final VarHandle PERMIT;
 86     static {
 87         try {
 88             MethodHandles.Lookup l = MethodHandles.lookup();
 89             THREAD_COUNT = l.findVarHandle(ThreadFlock.class, "threadCount", int.class);
 90             PERMIT = l.findVarHandle(ThreadFlock.class, "permit", boolean.class);
 91         } catch (Exception e) {
 92             throw new InternalError(e);
 93         }
 94     }
 95 
 96     private final Set<Thread> threads = ConcurrentHashMap.newKeySet();
 97 
 98     // thread count, need to re-examine contention once API is stable
 99     private volatile int threadCount;
100 
101     private final String name;
102     private final ScopedValueContainer.BindingsSnapshot scopedValueBindings;
103     private final ThreadContainerImpl container; // encapsulate for now
104 
105     // state
106     private volatile boolean shutdown;
107     private volatile boolean closed;
108 
109     // set by wakeup, cleared by awaitAll
110     private volatile boolean permit;
111 
112     ThreadFlock(String name) {
113         this.name = name;
114         this.scopedValueBindings = ScopedValueContainer.captureBindings();
115         this.container = new ThreadContainerImpl(this);
116     }
117 
118     private long threadCount() {
119         return threadCount;
120     }
121 
122     private ScopedValueContainer.BindingsSnapshot scopedValueBindings() {
123         return scopedValueBindings;
124     }
125 
126     private void incrementThreadCount() {
127         THREAD_COUNT.getAndAdd(this, 1);
128     }
129 
130     /**
131      * Decrement the thread count. Unpark the owner if the count goes to zero.
132      */
133     private void decrementThreadCount() {
134         int count = (int) THREAD_COUNT.getAndAdd(this, -1) - 1;
135 
136         // signal owner when the count goes to zero
137         if (count == 0) {
138             LockSupport.unpark(owner());
139         }
140     }
141 
142     /**
143      * Invoked on the parent thread when starting {@code thread}.
144      */
145     private void onStart(Thread thread) {
146         incrementThreadCount();
147         boolean done = false;
148         try {
149             boolean added = threads.add(thread);
150             assert added;
151             if (shutdown)
152                 throw new IllegalStateException("Shutdown");
153             done = true;
154         } finally {
155             if (!done) {
156                 threads.remove(thread);
157                 decrementThreadCount();
158             }
159         }
160     }
161 
162     /**
163      * Invoked on the terminating thread or the parent thread when starting
164      * {@code thread} failed. This method is only called if onStart succeeded.
165      */
166     private void onExit(Thread thread) {
167         boolean removed = threads.remove(thread);
168         assert removed;
169         decrementThreadCount();
170     }
171 
172     /**
173      * Clear wakeup permit.
174      */
175     private void clearPermit() {
176         if (permit)
177             permit = false;
178     }
179 
180     /**
181      * Sets the wakeup permit to the given value, returning the previous value.
182      */
183     private boolean getAndSetPermit(boolean newValue) {
184         if (permit != newValue) {
185             return (boolean) PERMIT.getAndSet(this, newValue);
186         } else {
187             return newValue;
188         }
189     }
190 
191     /**
192      * Throws WrongThreadException if the current thread is not the owner.
193      */
194     private void ensureOwner() {
195         if (Thread.currentThread() != owner())
196             throw new WrongThreadException("Current thread not owner");
197     }
198 
199     /**
200      * Throws WrongThreadException if the current thread is not the owner
201      * or a thread contained in the flock.
202      */
203     private void ensureOwnerOrContainsThread() {
204         Thread currentThread = Thread.currentThread();
205         if (currentThread != owner() && !containsThread(currentThread))
206             throw new WrongThreadException("Current thread not owner or thread in flock");
207     }
208 
209     /**
210      * Opens a new thread flock. The flock is owned by the current thread. It can be
211      * named to aid debugging.
212      *
213      * <p> This method captures the current thread's {@linkplain ScopedValue scoped value}
214      * bindings for inheritance by threads created in the flock.
215      *
216      * <p> For the purposes of containment, monitoring, and debugging, the parent
217      * of the new flock is determined as follows:
218      * <ul>
219      * <li> If the current thread is the owner of open flocks then the most recently
220      * created, and open, flock is the parent of the new flock. In other words, the
221      * <em>enclosing flock</em> is the parent.
222      * <li> If the current thread is not the owner of any open flocks then the
223      * parent of the new flock is the current thread's flock. If the current thread
224      * was not started in a flock then the new flock does not have a parent.
225      * </ul>
226      *
227      * @param name the name of the flock, can be null
228      * @return a new thread flock
229      */
230     public static ThreadFlock open(String name) {
231         var flock = new ThreadFlock(name);
232         flock.container.push();
233         return flock;
234     }
235 
236     /**
237      * {@return the name of this flock or {@code null} if unnamed}
238      */
239     public String name() {
240         return name;
241     }
242 
243     /**
244      * {@return the owner of this flock}
245      */
246     public Thread owner() {
247         return container.owner();
248     }
249 
250     /**
251      * Starts the given unstarted thread in this flock.
252      *
253      * <p> The thread is started with the scoped value bindings that were captured
254      * when opening the flock. The bindings must match the current thread's bindings.
255      *
256      * <p> This method may only be invoked by the flock owner or threads {@linkplain
257      * #containsThread(Thread) contained} in the flock.
258      *
259      * @param thread the unstarted thread
260      * @return the thread, started
261      * @throws IllegalStateException if this flock is shutdown or closed
262      * @throws IllegalThreadStateException if the given thread was already started
263      * @throws WrongThreadException if the current thread is not the owner or a thread
264      * contained in the flock
265      * @throws StructureViolationException if the current scoped value bindings are
266      * not the same as when the flock was created
267      */
268     public Thread start(Thread thread) {
269         ensureOwnerOrContainsThread();
270         JLA.start(thread, container);
271         return thread;
272     }
273 
274     /**
275      * Shutdown this flock so that no new threads can be started, existing threads
276      * in the flock will continue to run. This method is a no-op if the flock is
277      * already shutdown or closed.






278      */
279     public void shutdown() {

280         if (!shutdown) {
281             shutdown = true;
282         }
283     }
284 
285     /**
286      * Wait for all threads in the flock to finish executing their tasks. This method
287      * waits until all threads finish, the {@link #wakeup() wakeup} method is invoked,
288      * or the current thread is interrupted.
289      *
290      * <p> This method may only be invoked by the flock owner. The method trivially
291      * returns true when the flock is closed.
292      *
293      * <p> This method clears the effect of any previous invocations of the
294      * {@code wakeup} method.
295      *
296      * @return true if there are no threads in the flock, false if wakeup was invoked
297      * and there are unfinished threads
298      * @throws InterruptedException if interrupted while waiting
299      * @throws WrongThreadException if invoked by a thread that is not the owner
300      */
301     public boolean awaitAll() throws InterruptedException {
302         ensureOwner();
303 
304         if (getAndSetPermit(false))
305             return (threadCount == 0);
306 
307         while (threadCount > 0 && !permit) {
308             LockSupport.park();
309             if (Thread.interrupted())
310                 throw new InterruptedException();
311         }
312         clearPermit();
313         return (threadCount == 0);
314     }
315 
316     /**
317      * Wait, up to the given waiting timeout, for all threads in the flock to finish
318      * executing their tasks. This method waits until all threads finish, the {@link
319      * #wakeup() wakeup} method is invoked, the current thread is interrupted, or
320      * the timeout expires.
321      *
322      * <p> This method may only be invoked by the flock owner. The method trivially
323      * returns true when the flock is closed.
324      *
325      * <p> This method clears the effect of any previous invocations of the {@code wakeup}
326      * method.
327      *
328      * @param timeout the maximum duration to wait
329      * @return true if there are no threads in the flock, false if wakeup was invoked
330      * and there are unfinished threads
331      * @throws InterruptedException if interrupted while waiting
332      * @throws TimeoutException if the wait timed out
333      * @throws WrongThreadException if invoked by a thread that is not the owner
334      */
335     public boolean awaitAll(Duration timeout)
336             throws InterruptedException, TimeoutException {
337         Objects.requireNonNull(timeout);
338         ensureOwner();
339 
340         if (getAndSetPermit(false))
341             return (threadCount == 0);
342 
343         long startNanos = System.nanoTime();
344         long nanos = NANOSECONDS.convert(timeout);
345         long remainingNanos = nanos;
346         while (threadCount > 0 && remainingNanos > 0 && !permit) {
347             LockSupport.parkNanos(remainingNanos);
348             if (Thread.interrupted())
349                 throw new InterruptedException();
350             remainingNanos = nanos - (System.nanoTime() - startNanos);
351         }
352 
353         boolean done = (threadCount == 0);
354         if (!done && remainingNanos <= 0 && !permit) {
355             throw new TimeoutException();
356         } else {
357             clearPermit();
358             return done;
359         }
360     }
361 
362     /**
363      * Causes the call to {@link #awaitAll()} or {@link #awaitAll(Duration)} by the
364      * {@linkplain #owner() owner} to return immediately.
365      *
366      * <p> If the owner is blocked in {@code awaitAll} then it will return immediately.
367      * If the owner is not blocked in {@code awaitAll} then its next call to wait
368      * will return immediately. The method does nothing when the flock is closed.



369      */
370     public void wakeup() {

371         if (!getAndSetPermit(true) && Thread.currentThread() != owner()) {
372             LockSupport.unpark(owner());
373         }
374     }
375 
376     /**
377      * Closes this flock. This method first shuts down the flock to prevent
378      * new threads from starting. It then waits for the threads in the flock
379      * to finish executing their tasks. In other words, this method blocks until
380      * all threads in the flock finish.
381      *
382      * <p> This method may only be invoked by the flock owner.
383      *
384      * <p> If interrupted then this method continues to wait until all threads
385      * finish, before completing with the interrupt status set.
386      *
387      * <p> A ThreadFlock is intended to be used in a <em>structured manner</em>. If
388      * this method is called to close a flock before nested flocks are closed then it
389      * closes the nested flocks (in the reverse order that they were created in),
390      * closes this flock, and then throws {@code StructureViolationException}.
391      * Similarly, if this method is called to close a thread flock while executing with
392      * scoped value bindings, and the thread flock was created before the scoped values
393      * were bound, then {@code StructureViolationException} is thrown after closing the
394      * thread flock.
395      *
396      * @throws WrongThreadException if invoked by a thread that is not the owner
397      * @throws StructureViolationException if a structure violation was detected
398      */
399     public void close() {
400         ensureOwner();
401         if (closed)
402             return;
403 
404         // shutdown, if not already shutdown
405         if (!shutdown)
406             shutdown = true;
407 
408         // wait for threads to finish
409         boolean interrupted = false;
410         try {
411             while (threadCount > 0) {
412                 LockSupport.park();
413                 if (Thread.interrupted()) {
414                     interrupted = true;
415                 }
416             }
417 
418         } finally {
419             try {
420                 container.close(); // may throw
421             } finally {
422                 closed = true;
423                 if (interrupted) Thread.currentThread().interrupt();
424             }
425         }
426     }
427 
428     /**
429      * {@return true if the flock has been {@linkplain #shutdown() shut down}}
430      */
431     public boolean isShutdown() {
432         return shutdown;
433     }
434 
435     /**
436      * {@return true if the flock has been {@linkplain #close() closed}}
437      */
438     public boolean isClosed() {
439         return closed;
440     }
441 
442     /**
443      * {@return a stream of the threads in this flock}
444      * The elements of the stream are threads that were started in this flock
445      * but have not terminated. The stream will reflect the set of threads in the
446      * flock at some point at or since the creation of the stream. It may or may
447      * not reflect changes to the set of threads subsequent to creation of the
448      * stream.
449      */
450     public Stream<Thread> threads() {
451         return threads.stream();
452     }
453 
454     /**
455      * Tests if this flock contains the given thread. This method returns {@code true}
456      * if the thread was started in this flock and has not finished. If the thread
457      * is not in this flock then it tests if the thread is in flocks owned by threads
458      * in this flock, essentially equivalent to invoking {@code containsThread} method
459      * on all flocks owned by the threads in this flock.
460      *
461      * @param thread the thread
462      * @return true if this flock contains the thread
463      */
464     public boolean containsThread(Thread thread) {
465         var c = JLA.threadContainer(thread);
466         if (c == this.container)
467             return true;
468         if (c != null && c != ThreadContainers.root()) {
469             var parent = c.parent();
470             while (parent != null) {
471                 if (parent == this.container)
472                     return true;
473                 parent = parent.parent();
474             }
475         }
476         return false;
477     }
478 
479     @Override
480     public String toString() {
481         String id = Objects.toIdentityString(this);
482         if (name != null) {
483             return name + "/" + id;
484         } else {
485             return id;
486         }
487     }
488 
489     /**
490      * A ThreadContainer backed by a ThreadFlock.
491      */
492     private static class ThreadContainerImpl extends ThreadContainer {
493         private final ThreadFlock flock;
494         private volatile Object key;
495         private boolean closing;
496 
497         ThreadContainerImpl(ThreadFlock flock) {
498             super(/*shared*/ false);
499             this.flock = flock;
500         }
501 
502         @Override
503         public ThreadContainerImpl push() {
504             // Virtual threads in the root containers may not be tracked so need
505             // to register container to ensure that it is found
506             if (!ThreadContainers.trackAllThreads()) {
507                 Thread thread = Thread.currentThread();
508                 if (thread.isVirtual()
509                         && JLA.threadContainer(thread) == ThreadContainers.root()) {
510                     this.key = ThreadContainers.registerContainer(this);
511                 }
512             }
513             super.push();
514             return this;
515         }
516 
517         /**
518          * Invoked by ThreadFlock.close when closing the flock. This method pops the
519          * container from the current thread's scope stack.
520          */
521         void close() {
522             assert Thread.currentThread() == owner();
523             if (!closing) {
524                 closing = true;
525                 boolean atTop = popForcefully(); // may block
526                 Object key = this.key;
527                 if (key != null)
528                     ThreadContainers.deregisterContainer(key);
529                 if (!atTop)
530                     throw new StructureViolationException();
531             }
532         }
533 
534         /**
535          * Invoked when an enclosing scope is closing. Invokes ThreadFlock.close to
536          * close the flock. This method does not pop the container from the current
537          * thread's scope stack.
538          */
539         @Override
540         protected boolean tryClose() {
541             assert Thread.currentThread() == owner();
542             if (!closing) {
543                 closing = true;
544                 flock.close();
545                 Object key = this.key;
546                 if (key != null)
547                     ThreadContainers.deregisterContainer(key);
548                 return true;
549             } else {
550                 assert false : "Should not get there";
551                 return false;
552             }
553         }
554 
555         @Override
556         public String name() {
557             return flock.name();
558         }
559         @Override
560         public long threadCount() {
561             return flock.threadCount();
562         }
563         @Override
564         public Stream<Thread> threads() {
565             return flock.threads().filter(Thread::isAlive);
566         }
567         @Override
568         public void onStart(Thread thread) {
569             flock.onStart(thread);
570         }
571         @Override
572         public void onExit(Thread thread) {
573             flock.onExit(thread);
574         }
575         @Override
576         public ScopedValueContainer.BindingsSnapshot scopedValueBindings() {
577             return flock.scopedValueBindings();
578         }
579     }
580 }
--- EOF ---