1 /*
  2  * Copyright (c) 2021, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 package jdk.internal.misc;
 26 
 27 import java.lang.invoke.MethodHandles;
 28 import java.lang.invoke.VarHandle;
 29 import java.time.Duration;
 30 import java.util.Objects;
 31 import java.util.Set;
 32 import java.util.concurrent.ConcurrentHashMap;
 33 import java.util.concurrent.TimeoutException;
 34 import java.util.concurrent.locks.LockSupport;
 35 import java.util.stream.Stream;
 36 import jdk.internal.access.JavaLangAccess;
 37 import jdk.internal.access.SharedSecrets;
 38 import jdk.internal.vm.ThreadContainer;
 39 import jdk.internal.vm.ThreadContainers;
 40 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 41 
 42 /**
 43  * A grouping of threads that typically run closely related tasks. Threads started
 44  * in a flock remain in the flock until they terminate.
 45  *
 46  * <p> ThreadFlock defines the {@link #open(String) open} method to open a new flock,
 47  * the {@link #start(Thread) start} method to start a thread in the flock, and the
 48  * {@link #close() close} method to close the flock. The {@code close} waits for all
 49  * threads in the flock to finish. The {@link #awaitAll() awaitAll} method may be used
 50  * to wait for all threads to finish without closing the flock. The {@link #wakeup()}
 51  * method will cause {@code awaitAll} method to complete early, which can be used to
 52  * support cancellation in higher-level APIs. ThreadFlock also defines the {@link
 53  * #shutdown() shutdown} method to prevent new threads from starting while allowing
 54  * existing threads in the flock to continue.
 55  *
 56  * <p> Thread flocks are intended to be used in a <em>structured manner</em>. The
 57  * thread that opens a new flock is the {@link #owner() owner}. The owner closes the
 58  * flock when done, failure to do so may result in a resource leak. The {@code open}
 59  * and {@code close} should be matched to avoid closing an <em>enclosing</em> flock
 60  * while a <em>nested</em> flock is open. A ThreadFlock can be used with the
 61  * try-with-resources construct if required but more likely, the close method of a
 62  * higher-level API that implements {@link AutoCloseable} will close the flock.
 63  *
 64  * <p> Thread flocks are conceptually nodes in a tree. A thread {@code T} started in
 65  * flock "A" may itself open a new flock "B", implicitly forming a tree where flock
 66  * "A" is the parent of flock "B". When nested, say where thread {@code T} opens
 67  * flock "B" and then invokes a method that opens flock "C", then the enclosing
 68  * flock "B" is conceptually the parent of the nested flock "C". ThreadFlock does
 69  * not define APIs that exposes the tree structure. It does define the {@link
 70  * #containsThread(Thread) containsThread} method to test if a flock contains a
 71  * thread, a test that is equivalent to testing membership of flocks in the tree.
 72  * The {@code start} and {@code shutdown} methods are confined to the flock
 73  * owner or threads contained in the flock. The confinement check is equivalent to
 74  * invoking the {@code containsThread} method to test if the caller is contained
 75  * in the flock.
 76  *
 77  * <p> Unless otherwise specified, passing a {@code null} argument to a method
 78  * in this class will cause a {@link NullPointerException} to be thrown.
 79  */
 80 public class ThreadFlock implements AutoCloseable {
 81     private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
 82     private static final VarHandle THREAD_COUNT;
 83     private static final VarHandle PERMIT;
 84     static {
 85         try {
 86             MethodHandles.Lookup l = MethodHandles.lookup();
 87             THREAD_COUNT = l.findVarHandle(ThreadFlock.class, "threadCount", int.class);
 88             PERMIT = l.findVarHandle(ThreadFlock.class, "permit", boolean.class);
 89         } catch (Exception e) {
 90             throw new InternalError(e);
 91         }
 92     }
 93 
 94     private final Set<Thread> threads = ConcurrentHashMap.newKeySet();
 95 
 96     // thread count, need to re-examine contention once API is stable
 97     private volatile int threadCount;
 98 
 99     private final String name;
100     private final Object scopeLocalBindings;
101     private final ThreadContainerImpl container; // encapsulate for now
102 
103     // state
104     private volatile boolean shutdown;
105     private volatile boolean closed;
106 
107     // set by wakeup, cleared by awaitAll
108     private volatile boolean permit;
109 
110     ThreadFlock(String name) {
111         this.name = name;
112         this.scopeLocalBindings = JLA.scopeLocalBindings();
113         this.container = new ThreadContainerImpl(this);
114     }
115 
116     private long threadCount() {
117         return threadCount;
118     }
119 
120     private Object scopeLocalBindings() {
121         return scopeLocalBindings;
122     }
123 
124     private void incrementThreadCount() {
125         THREAD_COUNT.getAndAdd(this, 1);
126     }
127 
128     /**
129      * Decrement the thread count. Unpark the owner if the count goes to zero.
130      */
131     private void decrementThreadCount() {
132         int count = (int) THREAD_COUNT.getAndAdd(this, -1) - 1;
133 
134         // signal owner when the count goes to zero
135         if (count == 0) {
136             LockSupport.unpark(owner());
137         }
138     }
139 
140     /**
141      * Invoked on the parent thread when starting {@code thread}.
142      */
143     private void onStart(Thread thread) {
144         incrementThreadCount();
145         boolean done = false;
146         try {
147             boolean added = threads.add(thread);
148             assert added;
149             if (shutdown)
150                 throw new IllegalStateException("Shutdown");
151             done = true;
152         } finally {
153             if (!done) {
154                 threads.remove(thread);
155                 decrementThreadCount();
156             }
157         }
158     }
159 
160     /**
161      * Invoked on the terminating thread or the parent thread when starting
162      * {@code thread} failed. This method is only called if onStart succeeded.
163      */
164     private void onExit(Thread thread) {
165         boolean removed = threads.remove(thread);
166         assert removed;
167         decrementThreadCount();
168     }
169 
170     /**
171      * Clear wakeup permit.
172      */
173     private void clearPermit() {
174         if (permit)
175             permit = false;
176     }
177 
178     /**
179      * Sets the wakeup permit to the given value, returning the previous value.
180      */
181     private boolean getAndSetPermit(boolean newValue) {
182         if (permit != newValue) {
183             return (boolean) PERMIT.getAndSet(this, newValue);
184         } else {
185             return newValue;
186         }
187     }
188 
189     /**
190      * Throws IllegalStateException if the current thread is not the owner.
191      */
192     private void ensureOwner() {
193         if (Thread.currentThread() != owner())
194             throw new IllegalStateException("Not owner");
195     }
196 
197     /**
198      * Throws IllegalStateException if the current thread is not the owner
199      * or a thread contained in the flock.
200      */
201     private void ensureOwnerOrContainsThread() {
202         Thread currentThread = Thread.currentThread();
203         if (currentThread != owner() && !containsThread(currentThread))
204             throw new IllegalStateException("Current thread not owner or thread in flock");
205     }
206 
207     /**
208      * Opens a new thread flock. The flock is owned by the current thread. It can be
209      * named to aid debugging.
210      *
211      * <p> This method captures the current thread's {@linkplain ScopeLocal scope-local}
212      * bindings for inheritance by threads created in the flock.
213      *
214      * <p> For the purposes of containment, monitoring, and debugging, the parent
215      * of the new flock is determined as follows:
216      * <ul>
217      * <li> If the current thread is the owner of open flocks then the most recently
218      * created, and open, flock is the parent of the new flock. In other words, the
219      * <em>enclosing flock</em> is the parent.
220      * <li> If the current thread is not the owner of any open flocks then the
221      * parent of the new flock is the current thread's flock. If the current thread
222      * was not started in a flock then the new flock does not have a parent.
223      * </ul>
224      *
225      * @param name the name of the flock, can be null
226      * @return a new thread flock
227      */
228     public static ThreadFlock open(String name) {
229         var flock = new ThreadFlock(name);
230         flock.container.push();
231         return flock;
232     }
233 
234     /**
235      * {@return the name of this flock or {@code null} if unnamed}
236      */
237     public String name() {
238         return name;
239     }
240 
241     /**
242      * {@return the owner of this flock}
243      */
244     public Thread owner() {
245         return container.owner();
246     }
247 
248     /**
249      * Starts the given unstarted thread in this flock.
250      *
251      * <p> The thread is started with the {@linkplain ScopeLocal scope-local} bindings
252      * that were captured when opening the flock. The bindings must match the current
253      * thread's bindings.
254      *
255      * <p> This method may only be invoked by the flock owner or threads {@linkplain
256      * #containsThread(Thread) contained} in the flock.
257      *
258      * @param thread the unstarted thread
259      * @return the thread, started
260      * @throws IllegalStateException if this flock is shutdown or closed,
261      * or the current scope-local bindings are not the same as when the flock
262      * was created
263      * @throws IllegalThreadStateException if the thread has already started,
264      * or the caller thread is not the owner or a thread contained in the flock
265      */
266     public Thread start(Thread thread) {
267         ensureOwnerOrContainsThread();
268         JLA.start(thread, container);
269         return thread;
270     }
271 
272     /**
273      * Shutdown this flock so that no new threads can be started, existing threads
274      * in the flock will continue to run. This method is a no-op if the flock is
275      * already shutdown or closed.
276      *
277      * <p> This method may only be invoked by the flock owner or threads {@linkplain
278      * #containsThread(Thread) contained} in the flock.
279      *
280      * @throws IllegalStateException if the caller thread is not the owner
281      * or a thread contained in the flock
282      */
283     public void shutdown() {
284         ensureOwnerOrContainsThread();
285         if (!shutdown) {
286             shutdown = true;
287         }
288     }
289 
290     /**
291      * Wait for all threads in the flock to finish executing their tasks. This method
292      * waits until all threads finish, the {@link #wakeup() wakeup} method is invoked,
293      * or the current thread is interrupted.
294      *
295      * <p> This method may only be invoked by the flock owner. The method trivially
296      * returns true when the flock is closed.
297      *
298      * <p> This method clears the effect of any previous invocations of the
299      * {@code wakeup} method.
300      *
301      * @return true if there are no threads in the flock, false if wakeup was invoked
302      * and there are unfinished threads
303      * @throws InterruptedException if interrupted while waiting
304      * @throws IllegalStateException if invoked by a thread that is not the owner
305      */
306     public boolean awaitAll() throws InterruptedException {
307         ensureOwner();
308 
309         if (getAndSetPermit(false))
310             return (threadCount == 0);
311 
312         while (threadCount > 0 && !permit) {
313             LockSupport.park();
314             if (Thread.interrupted())
315                 throw new InterruptedException();
316         }
317         clearPermit();
318         return (threadCount == 0);
319     }
320 
321     /**
322      * Wait, up to the given waiting timeout, for all threads in the flock to finish
323      * executing their tasks. This method waits until all threads finish, the {@link
324      * #wakeup() wakeup} method is invoked, the current thread is interrupted, or
325      * the timeout expires.
326      *
327      * <p> This method may only be invoked by the flock owner. The method trivially
328      * returns true when the flock is closed.
329      *
330      * <p> This method clears the effect of any previous invocations of the {@code wakeup}
331      * method.
332      *
333      * @param timeout the maximum duration to wait
334      * @return true if there are no threads in the flock, false if wakeup was invoked
335      * and there are unfinished threads
336      * @throws InterruptedException if interrupted while waiting
337      * @throws TimeoutException if the wait timed out
338      * @throws IllegalStateException if invoked by a thread that is not the owner
339      */
340     public boolean awaitAll(Duration timeout)
341             throws InterruptedException, TimeoutException {
342         Objects.requireNonNull(timeout);
343         ensureOwner();
344 
345         if (getAndSetPermit(false))
346             return (threadCount == 0);
347 
348         long startNanos = System.nanoTime();
349         long nanos = NANOSECONDS.convert(timeout);
350         long remainingNanos = nanos;
351         while (threadCount > 0 && remainingNanos > 0 && !permit) {
352             LockSupport.parkNanos(remainingNanos);
353             if (Thread.interrupted())
354                 throw new InterruptedException();
355             remainingNanos = nanos - (System.nanoTime() - startNanos);
356         }
357 
358         boolean done = (threadCount == 0);
359         if (!done && remainingNanos <= 0 && !permit) {
360             throw new TimeoutException();
361         } else {
362             clearPermit();
363             return done;
364         }
365     }
366 
367     /**
368      * Causes the call to {@link #awaitAll()} or {@link #awaitAll(Duration)} by the
369      * {@linkplain #owner() owner} to return immediately.
370      *
371      * <p> If the owner is blocked in {@code awaitAll} then it will return immediately.
372      * If the owner is not blocked in {@code awaitAll} then its next call to wait
373      * will return immediately. The method does nothing when the flock is closed.
374      *
375      * @throws IllegalStateException if the caller thread is not the owner
376      * or a thread contained in the flock
377      */
378     public void wakeup() {
379         ensureOwnerOrContainsThread();
380         if (!getAndSetPermit(true) && Thread.currentThread() != owner()) {
381             LockSupport.unpark(owner());
382         }
383     }
384 
385     /**
386      * Closes this flock. This method first shuts down the flock to prevent
387      * new threads from starting. It then waits for the threads in the flock
388      * to finish executing their tasks. In other words, this method blocks until
389      * all threads in the flock finish.
390      *
391      * <p> This method may only be invoked by the flock owner.
392      *
393      * <p> If interrupted then this method continues to wait until all threads
394      * finish, before completing with the interrupt status set.
395      *
396      * <p> A ThreadFlock is intended to be used in a <em>structured manner</em>. If
397      * this method is called to close a flock before nested flocks are closed then it
398      * closes the nested flocks (in the reverse order that they were created in),
399      * closes this flock, and then throws {@link StructureViolationException}.
400      * Similarly, if called to close a flock that <em>encloses</em> {@linkplain
401      * ScopeLocal.Carrier#run(Runnable) operations} with scope-local bindings then
402      * it also throws {@code StructureViolationException} after closing the flock.
403      *
404      * @throws IllegalStateException if invoked by a thread that is not the owner
405      */
406     public void close() {
407         ensureOwner();
408         if (closed)
409             return;
410 
411         // shutdown, if not already shutdown
412         if (!shutdown)
413             shutdown = true;
414 
415         // wait for threads to finish
416         boolean interrupted = false;
417         try {
418             while (threadCount > 0) {
419                 LockSupport.park();
420                 if (Thread.interrupted()) {
421                     interrupted = true;
422                 }
423             }
424 
425         } finally {
426             try {
427                 container.close(); // may throw
428             } finally {
429                 closed = true;
430                 if (interrupted) Thread.currentThread().interrupt();
431             }
432         }
433     }
434 
435     /**
436      * {@return true if the flock has been {@linkplain #shutdown() shut down}}
437      */
438     public boolean isShutdown() {
439         return shutdown;
440     }
441 
442     /**
443      * {@return true if the flock has been {@linkplain #close() closed}}
444      */
445     public boolean isClosed() {
446         return closed;
447     }
448 
449     /**
450      * {@return a stream of the threads in this flock}
451      * The elements of the stream are threads that were started in this flock
452      * but have not terminated. The stream will reflect the set of threads in the
453      * flock at some point at or since the creation of the stream. It may or may
454      * not reflect changes to the set of threads subsequent to creation of the
455      * stream.
456      */
457     public Stream<Thread> threads() {
458         return threads.stream();
459     }
460 
461     /**
462      * Tests if this flock contains the given thread. This method returns {@code true}
463      * if the thread was started in this flock and has not finished. If the thread
464      * is not in this flock then it tests if the thread is in flocks owned by threads
465      * in this flock, essentially equivalent to invoking {@code containsThread} method
466      * on all flocks owned by the threads in this flock.
467      *
468      * @param thread the thread
469      * @return true if this flock contains the thread
470      */
471     public boolean containsThread(Thread thread) {
472         var c = JLA.threadContainer(thread);
473         if (c == this.container)
474             return true;
475         if (c != null && c != ThreadContainers.root()) {
476             var parent = c.parent();
477             while (parent != null) {
478                 if (parent == this.container)
479                     return true;
480                 parent = parent.parent();
481             }
482         }
483         return false;
484     }
485 
486     @Override
487     public String toString() {
488         String id = getClass().getName() + "@" + System.identityHashCode(this);
489         if (name != null) {
490             return name + "/" + id;
491         } else {
492             return id;
493         }
494     }
495 
496     /**
497      * A ThreadContainer backed by a ThreadFlock.
498      */
499     private static class ThreadContainerImpl extends ThreadContainer {
500         private final ThreadFlock flock;
501         private volatile Object key;
502         private boolean closing;
503 
504         ThreadContainerImpl(ThreadFlock flock) {
505             this.flock = flock;
506         }
507 
508         @Override
509         public ThreadContainerImpl push() {
510             // Virtual threads in the root containers are not tracked so need
511             // to register container to ensure that it is found
512             Thread thread = Thread.currentThread();
513             if (thread.isVirtual()
514                     && JLA.threadContainer(thread) == ThreadContainers.root()) {
515                 this.key = ThreadContainers.registerContainer(this);
516             }
517 
518             super.push();
519             return this;
520         }
521 
522         /**
523          * Invoked by ThreadFlock.close when closing the flock. This method pops the
524          * container from the current thread's scope stack.
525          */
526         void close() {
527             assert Thread.currentThread() == owner();
528             if (!closing) {
529                 closing = true;
530                 boolean atTop = popForcefully(); // may block
531                 Object key = this.key;
532                 if (key != null)
533                     ThreadContainers.deregisterContainer(key);
534                 if (!atTop)
535                     throw new StructureViolationException();
536             }
537         }
538 
539         /**
540          * Invoked when an enclosing scope is closing. Invokes ThreadFlock.close to
541          * close the flock. This method does not pop the container from the current
542          * thread's scope stack.
543          */
544         @Override
545         protected boolean tryClose() {
546             assert Thread.currentThread() == owner();
547             if (!closing) {
548                 closing = true;
549                 flock.close();
550                 Object key = this.key;
551                 if (key != null)
552                     ThreadContainers.deregisterContainer(key);
553                 return true;
554             } else {
555                 assert false : "Should not get there";
556                 return false;
557             }
558         }
559 
560         @Override
561         public String name() {
562             return flock.name();
563         }
564         @Override
565         public long threadCount() {
566             return flock.threadCount();
567         }
568         @Override
569         public Stream<Thread> threads() {
570             return flock.threads().filter(Thread::isAlive);
571         }
572         @Override
573         public void onStart(Thread thread) {
574             flock.onStart(thread);
575         }
576         @Override
577         public void onExit(Thread thread) {
578             flock.onExit(thread);
579         }
580         @Override
581         public String toString() {
582             return flock.toString();
583         }
584         @Override
585         public Object scopeLocalBindings() {
586             return flock.scopeLocalBindings();
587         }
588     }
589 }