1 /*
  2  * Copyright (c) 2021, 2022, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 package jdk.internal.misc;
 26 
 27 import java.lang.invoke.MethodHandles;
 28 import java.lang.invoke.VarHandle;
 29 import java.time.Duration;
 30 import java.util.Objects;
 31 import java.util.Set;
 32 import java.util.concurrent.ConcurrentHashMap;
 33 import java.util.concurrent.TimeoutException;
 34 import java.util.concurrent.locks.LockSupport;
 35 import java.util.stream.Stream;
 36 import jdk.internal.access.JavaLangAccess;
 37 import jdk.internal.access.SharedSecrets;
 38 import jdk.internal.vm.ExtentLocalContainer;
 39 import jdk.internal.vm.ThreadContainer;
 40 import jdk.internal.vm.ThreadContainers;
 41 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 42 
 43 /**
 44  * A grouping of threads that typically run closely related tasks. Threads started
 45  * in a flock remain in the flock until they terminate.
 46  *
 47  * <p> ThreadFlock defines the {@link #open(String) open} method to open a new flock,
 48  * the {@link #start(Thread) start} method to start a thread in the flock, and the
 49  * {@link #close() close} method to close the flock. The {@code close} waits for all
 50  * threads in the flock to finish. The {@link #awaitAll() awaitAll} method may be used
 51  * to wait for all threads to finish without closing the flock. The {@link #wakeup()}
 52  * method will cause {@code awaitAll} method to complete early, which can be used to
 53  * support cancellation in higher-level APIs. ThreadFlock also defines the {@link
 54  * #shutdown() shutdown} method to prevent new threads from starting while allowing
 55  * existing threads in the flock to continue.
 56  *
 57  * <p> Thread flocks are intended to be used in a <em>structured manner</em>. The
 58  * thread that opens a new flock is the {@link #owner() owner}. The owner closes the
 59  * flock when done, failure to do so may result in a resource leak. The {@code open}
 60  * and {@code close} should be matched to avoid closing an <em>enclosing</em> flock
 61  * while a <em>nested</em> flock is open. A ThreadFlock can be used with the
 62  * try-with-resources construct if required but more likely, the close method of a
 63  * higher-level API that implements {@link AutoCloseable} will close the flock.
 64  *
 65  * <p> Thread flocks are conceptually nodes in a tree. A thread {@code T} started in
 66  * flock "A" may itself open a new flock "B", implicitly forming a tree where flock
 67  * "A" is the parent of flock "B". When nested, say where thread {@code T} opens
 68  * flock "B" and then invokes a method that opens flock "C", then the enclosing
 69  * flock "B" is conceptually the parent of the nested flock "C". ThreadFlock does
 70  * not define APIs that exposes the tree structure. It does define the {@link
 71  * #containsThread(Thread) containsThread} method to test if a flock contains a
 72  * thread, a test that is equivalent to testing membership of flocks in the tree.
 73  * The {@code start} and {@code shutdown} methods are confined to the flock
 74  * owner or threads contained in the flock. The confinement check is equivalent to
 75  * invoking the {@code containsThread} method to test if the caller is contained
 76  * in the flock.
 77  *
 78  * <p> Unless otherwise specified, passing a {@code null} argument to a method
 79  * in this class will cause a {@link NullPointerException} to be thrown.
 80  */
 81 public class ThreadFlock implements AutoCloseable {
 82     private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
 83     private static final VarHandle THREAD_COUNT;
 84     private static final VarHandle PERMIT;
 85     static {
 86         try {
 87             MethodHandles.Lookup l = MethodHandles.lookup();
 88             THREAD_COUNT = l.findVarHandle(ThreadFlock.class, "threadCount", int.class);
 89             PERMIT = l.findVarHandle(ThreadFlock.class, "permit", boolean.class);
 90             Unsafe.getUnsafe().ensureClassInitialized(StructureViolationExceptions.class);
 91         } catch (Exception e) {
 92             throw new InternalError(e);
 93         }
 94     }
 95 
 96     private final Set<Thread> threads = ConcurrentHashMap.newKeySet();
 97 
 98     // thread count, need to re-examine contention once API is stable
 99     private volatile int threadCount;
100 
101     private final String name;
102     private final ExtentLocalContainer.BindingsSnapshot extentLocalBindings;
103     private final ThreadContainerImpl container; // encapsulate for now
104 
105     // state
106     private volatile boolean shutdown;
107     private volatile boolean closed;
108 
109     // set by wakeup, cleared by awaitAll
110     private volatile boolean permit;
111 
112     ThreadFlock(String name) {
113         this.name = name;
114         this.extentLocalBindings = ExtentLocalContainer.captureBindings();
115         this.container = new ThreadContainerImpl(this);
116     }
117 
118     private long threadCount() {
119         return threadCount;
120     }
121 
122     private ExtentLocalContainer.BindingsSnapshot extentLocalBindings() {
123         return extentLocalBindings;
124     }
125 
126     private void incrementThreadCount() {
127         THREAD_COUNT.getAndAdd(this, 1);
128     }
129 
130     /**
131      * Decrement the thread count. Unpark the owner if the count goes to zero.
132      */
133     private void decrementThreadCount() {
134         int count = (int) THREAD_COUNT.getAndAdd(this, -1) - 1;
135 
136         // signal owner when the count goes to zero
137         if (count == 0) {
138             LockSupport.unpark(owner());
139         }
140     }
141 
142     /**
143      * Invoked on the parent thread when starting {@code thread}.
144      */
145     private void onStart(Thread thread) {
146         incrementThreadCount();
147         boolean done = false;
148         try {
149             boolean added = threads.add(thread);
150             assert added;
151             if (shutdown)
152                 throw new IllegalStateException("Shutdown");
153             done = true;
154         } finally {
155             if (!done) {
156                 threads.remove(thread);
157                 decrementThreadCount();
158             }
159         }
160     }
161 
162     /**
163      * Invoked on the terminating thread or the parent thread when starting
164      * {@code thread} failed. This method is only called if onStart succeeded.
165      */
166     private void onExit(Thread thread) {
167         boolean removed = threads.remove(thread);
168         assert removed;
169         decrementThreadCount();
170     }
171 
172     /**
173      * Clear wakeup permit.
174      */
175     private void clearPermit() {
176         if (permit)
177             permit = false;
178     }
179 
180     /**
181      * Sets the wakeup permit to the given value, returning the previous value.
182      */
183     private boolean getAndSetPermit(boolean newValue) {
184         if (permit != newValue) {
185             return (boolean) PERMIT.getAndSet(this, newValue);
186         } else {
187             return newValue;
188         }
189     }
190 
191     /**
192      * Throws WrongThreadException if the current thread is not the owner.
193      */
194     private void ensureOwner() {
195         if (Thread.currentThread() != owner())
196             throw new WrongThreadException("Current thread not owner");
197     }
198 
199     /**
200      * Throws WrongThreadException if the current thread is not the owner
201      * or a thread contained in the flock.
202      */
203     private void ensureOwnerOrContainsThread() {
204         Thread currentThread = Thread.currentThread();
205         if (currentThread != owner() && !containsThread(currentThread))
206             throw new WrongThreadException("Current thread not owner or thread in flock");
207     }
208 
209     /**
210      * Opens a new thread flock. The flock is owned by the current thread. It can be
211      * named to aid debugging.
212      *
213      * <p> This method captures the current thread's {@linkplain ExtentLocal extent-local}
214      * bindings for inheritance by threads created in the flock.
215      *
216      * <p> For the purposes of containment, monitoring, and debugging, the parent
217      * of the new flock is determined as follows:
218      * <ul>
219      * <li> If the current thread is the owner of open flocks then the most recently
220      * created, and open, flock is the parent of the new flock. In other words, the
221      * <em>enclosing flock</em> is the parent.
222      * <li> If the current thread is not the owner of any open flocks then the
223      * parent of the new flock is the current thread's flock. If the current thread
224      * was not started in a flock then the new flock does not have a parent.
225      * </ul>
226      *
227      * @param name the name of the flock, can be null
228      * @return a new thread flock
229      */
230     public static ThreadFlock open(String name) {
231         var flock = new ThreadFlock(name);
232         flock.container.push();
233         return flock;
234     }
235 
236     /**
237      * {@return the name of this flock or {@code null} if unnamed}
238      */
239     public String name() {
240         return name;
241     }
242 
243     /**
244      * {@return the owner of this flock}
245      */
246     public Thread owner() {
247         return container.owner();
248     }
249 
250     /**
251      * Starts the given unstarted thread in this flock.
252      *
253      * <p> The thread is started with the extent-local bindings that were captured
254      * when opening the flock. The bindings must match the current thread's bindings.
255      *
256      * <p> This method may only be invoked by the flock owner or threads {@linkplain
257      * #containsThread(Thread) contained} in the flock.
258      *
259      * @param thread the unstarted thread
260      * @return the thread, started
261      * @throws IllegalStateException if this flock is shutdown or closed
262      * @throws IllegalThreadStateException if the given thread was already started
263      * @throws WrongThreadException if the current thread is not the owner or a thread
264      * contained in the flock
265      * @throws jdk.incubator.concurrent.StructureViolationException if the current
266      * extent-local bindings are not the same as when the flock was created
267      */
268     public Thread start(Thread thread) {
269         ensureOwnerOrContainsThread();
270         JLA.start(thread, container);
271         return thread;
272     }
273 
274     /**
275      * Shutdown this flock so that no new threads can be started, existing threads
276      * in the flock will continue to run. This method is a no-op if the flock is
277      * already shutdown or closed.
278      *
279      * <p> This method may only be invoked by the flock owner or threads {@linkplain
280      * #containsThread(Thread) contained} in the flock.
281      *
282      * @throws WrongThreadException if the current thread is not the owner or a thread
283      * contained in the flock
284      */
285     public void shutdown() {
286         ensureOwnerOrContainsThread();
287         if (!shutdown) {
288             shutdown = true;
289         }
290     }
291 
292     /**
293      * Wait for all threads in the flock to finish executing their tasks. This method
294      * waits until all threads finish, the {@link #wakeup() wakeup} method is invoked,
295      * or the current thread is interrupted.
296      *
297      * <p> This method may only be invoked by the flock owner. The method trivially
298      * returns true when the flock is closed.
299      *
300      * <p> This method clears the effect of any previous invocations of the
301      * {@code wakeup} method.
302      *
303      * @return true if there are no threads in the flock, false if wakeup was invoked
304      * and there are unfinished threads
305      * @throws InterruptedException if interrupted while waiting
306      * @throws WrongThreadException if invoked by a thread that is not the owner
307      */
308     public boolean awaitAll() throws InterruptedException {
309         ensureOwner();
310 
311         if (getAndSetPermit(false))
312             return (threadCount == 0);
313 
314         while (threadCount > 0 && !permit) {
315             LockSupport.park();
316             if (Thread.interrupted())
317                 throw new InterruptedException();
318         }
319         clearPermit();
320         return (threadCount == 0);
321     }
322 
323     /**
324      * Wait, up to the given waiting timeout, for all threads in the flock to finish
325      * executing their tasks. This method waits until all threads finish, the {@link
326      * #wakeup() wakeup} method is invoked, the current thread is interrupted, or
327      * the timeout expires.
328      *
329      * <p> This method may only be invoked by the flock owner. The method trivially
330      * returns true when the flock is closed.
331      *
332      * <p> This method clears the effect of any previous invocations of the {@code wakeup}
333      * method.
334      *
335      * @param timeout the maximum duration to wait
336      * @return true if there are no threads in the flock, false if wakeup was invoked
337      * and there are unfinished threads
338      * @throws InterruptedException if interrupted while waiting
339      * @throws TimeoutException if the wait timed out
340      * @throws WrongThreadException if invoked by a thread that is not the owner
341      */
342     public boolean awaitAll(Duration timeout)
343             throws InterruptedException, TimeoutException {
344         Objects.requireNonNull(timeout);
345         ensureOwner();
346 
347         if (getAndSetPermit(false))
348             return (threadCount == 0);
349 
350         long startNanos = System.nanoTime();
351         long nanos = NANOSECONDS.convert(timeout);
352         long remainingNanos = nanos;
353         while (threadCount > 0 && remainingNanos > 0 && !permit) {
354             LockSupport.parkNanos(remainingNanos);
355             if (Thread.interrupted())
356                 throw new InterruptedException();
357             remainingNanos = nanos - (System.nanoTime() - startNanos);
358         }
359 
360         boolean done = (threadCount == 0);
361         if (!done && remainingNanos <= 0 && !permit) {
362             throw new TimeoutException();
363         } else {
364             clearPermit();
365             return done;
366         }
367     }
368 
369     /**
370      * Causes the call to {@link #awaitAll()} or {@link #awaitAll(Duration)} by the
371      * {@linkplain #owner() owner} to return immediately.
372      *
373      * <p> If the owner is blocked in {@code awaitAll} then it will return immediately.
374      * If the owner is not blocked in {@code awaitAll} then its next call to wait
375      * will return immediately. The method does nothing when the flock is closed.
376      *
377      * @throws WrongThreadException if the current thread is not the owner or a thread
378      * contained in the flock
379      */
380     public void wakeup() {
381         ensureOwnerOrContainsThread();
382         if (!getAndSetPermit(true) && Thread.currentThread() != owner()) {
383             LockSupport.unpark(owner());
384         }
385     }
386 
387     /**
388      * Closes this flock. This method first shuts down the flock to prevent
389      * new threads from starting. It then waits for the threads in the flock
390      * to finish executing their tasks. In other words, this method blocks until
391      * all threads in the flock finish.
392      *
393      * <p> This method may only be invoked by the flock owner.
394      *
395      * <p> If interrupted then this method continues to wait until all threads
396      * finish, before completing with the interrupt status set.
397      *
398      * <p> A ThreadFlock is intended to be used in a <em>structured manner</em>. If
399      * this method is called to close a flock before nested flocks are closed then it
400      * closes the nested flocks (in the reverse order that they were created in),
401      * closes this flock, and then throws {@link
402      * jdk.incubator.concurrent.StructureViolationException}.
403      * Similarly, if called to close a flock that <em>encloses</em> {@linkplain
404      * jdk.incubator.concurrent.ExtentLocal.Carrier#run(Runnable) operations} with
405      * extent-local bindings then it also throws {@code StructureViolationException}
406      * after closing the flock.
407      *
408      * @throws WrongThreadException if invoked by a thread that is not the owner
409      * @throws jdk.incubator.concurrent.StructureViolationException if a structure
410      * violation was detected
411      */
412     public void close() {
413         ensureOwner();
414         if (closed)
415             return;
416 
417         // shutdown, if not already shutdown
418         if (!shutdown)
419             shutdown = true;
420 
421         // wait for threads to finish
422         boolean interrupted = false;
423         try {
424             while (threadCount > 0) {
425                 LockSupport.park();
426                 if (Thread.interrupted()) {
427                     interrupted = true;
428                 }
429             }
430 
431         } finally {
432             try {
433                 container.close(); // may throw
434             } finally {
435                 closed = true;
436                 if (interrupted) Thread.currentThread().interrupt();
437             }
438         }
439     }
440 
441     /**
442      * {@return true if the flock has been {@linkplain #shutdown() shut down}}
443      */
444     public boolean isShutdown() {
445         return shutdown;
446     }
447 
448     /**
449      * {@return true if the flock has been {@linkplain #close() closed}}
450      */
451     public boolean isClosed() {
452         return closed;
453     }
454 
455     /**
456      * {@return a stream of the live threads in this flock}
457      * The elements of the stream are threads that were started in this flock
458      * but have not terminated. The stream will reflect the set of threads in the
459      * flock at some point at or since the creation of the stream. It may or may
460      * not reflect changes to the set of threads subsequent to creation of the
461      * stream.
462      */
463     public Stream<Thread> threads() {
464         return threads.stream().filter(Thread::isAlive);
465     }
466 
467     /**
468      * Tests if this flock contains the given thread. This method returns {@code true}
469      * if the thread was started in this flock and has not finished. If the thread
470      * is not in this flock then it tests if the thread is in flocks owned by threads
471      * in this flock, essentially equivalent to invoking {@code containsThread} method
472      * on all flocks owned by the threads in this flock.
473      *
474      * @param thread the thread
475      * @return true if this flock contains the thread
476      */
477     public boolean containsThread(Thread thread) {
478         var c = JLA.threadContainer(thread);
479         if (c == this.container)
480             return true;
481         if (c != null && c != ThreadContainers.root()) {
482             var parent = c.parent();
483             while (parent != null) {
484                 if (parent == this.container)
485                     return true;
486                 parent = parent.parent();
487             }
488         }
489         return false;
490     }
491 
492     @Override
493     public String toString() {
494         String id = Objects.toIdentityString(this);
495         if (name != null) {
496             return name + "/" + id;
497         } else {
498             return id;
499         }
500     }
501 
502     /**
503      * A ThreadContainer backed by a ThreadFlock.
504      */
505     private static class ThreadContainerImpl extends ThreadContainer {
506         private final ThreadFlock flock;
507         private volatile Object key;
508         private boolean closing;
509 
510         ThreadContainerImpl(ThreadFlock flock) {
511             super(/*shared*/ false);
512             this.flock = flock;
513         }
514 
515         @Override
516         public ThreadContainerImpl push() {
517             // Virtual threads in the root containers are not tracked so need
518             // to register container to ensure that it is found
519             Thread thread = Thread.currentThread();
520             if (thread.isVirtual()
521                     && JLA.threadContainer(thread) == ThreadContainers.root()) {
522                 this.key = ThreadContainers.registerContainer(this);
523             }
524 
525             super.push();
526             return this;
527         }
528 
529         /**
530          * Invoked by ThreadFlock.close when closing the flock. This method pops the
531          * container from the current thread's scope stack.
532          */
533         void close() {
534             assert Thread.currentThread() == owner();
535             if (!closing) {
536                 closing = true;
537                 boolean atTop = popForcefully(); // may block
538                 Object key = this.key;
539                 if (key != null)
540                     ThreadContainers.deregisterContainer(key);
541                 if (!atTop)
542                     StructureViolationExceptions.throwException();
543             }
544         }
545 
546         /**
547          * Invoked when an enclosing scope is closing. Invokes ThreadFlock.close to
548          * close the flock. This method does not pop the container from the current
549          * thread's scope stack.
550          */
551         @Override
552         protected boolean tryClose() {
553             assert Thread.currentThread() == owner();
554             if (!closing) {
555                 closing = true;
556                 flock.close();
557                 Object key = this.key;
558                 if (key != null)
559                     ThreadContainers.deregisterContainer(key);
560                 return true;
561             } else {
562                 assert false : "Should not get there";
563                 return false;
564             }
565         }
566 
567         @Override
568         public long threadCount() {
569             return flock.threadCount();
570         }
571         @Override
572         public Stream<Thread> threads() {
573             return flock.threads().filter(Thread::isAlive);
574         }
575         @Override
576         public void onStart(Thread thread) {
577             flock.onStart(thread);
578         }
579         @Override
580         public void onExit(Thread thread) {
581             flock.onExit(thread);
582         }
583         @Override
584         public String toString() {
585             return flock.toString();
586         }
587         @Override
588         public ExtentLocalContainer.BindingsSnapshot extentLocalBindings() {
589             return flock.extentLocalBindings();
590         }
591     }
592 }