1 /*
   2  * Copyright (c) 2021, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.concurrent;
  26 
  27 import java.lang.invoke.MethodHandles;
  28 import java.lang.invoke.VarHandle;
  29 import java.security.AccessController;
  30 import java.security.PrivilegedAction;
  31 import java.time.Duration;
  32 import java.time.Instant;
  33 import java.util.Objects;
  34 import java.util.Optional;
  35 import java.util.Set;
  36 import java.util.concurrent.locks.ReentrantLock;
  37 import java.util.function.Function;
  38 import jdk.internal.misc.ThreadFlock;
  39 import jdk.internal.javac.PreviewFeature;
  40 
  41 /**
  42  * A basic API for <em>structured concurrency</em>. StructuredExecutor supports cases
  43  * where a task splits into several concurrent sub-tasks to be executed in their own
  44  * threads and where the sub-tasks must complete before the main task can continue.
  45  *
  46  * <p> <b>StructuredExecutor is work-in-progress.</b>
  47  *
  48  * <p> StructuredExecutor defines the {@link #open() open} method to open a new executor,
  49  * the {@link #fork(Callable) fork} method to start a thread to execute a task, the {@link
  50  * #join() join} method to wait for all threads to finish, and the {@link #close() close}
  51  * method to close the executor. The API is intended to be used with the {@code
  52  * try-with-resources} construct. The intention is that code in the <em>block</em> uses
  53  * the {@code fork} method to fork threads to execute the sub-tasks, wait for the threads
  54  * to finish with the {@code join} method, and then <em>process the results</em>.
  55  * Processing of results may include handling or re-throwing of exceptions.
  56  * <pre>{@code
  57  *         try (var executor = StructuredExecutor.open()) {
  58  *             Future<String> future1 = executor.fork(task1);
  59  *             Future<String> future2 = executor.fork(task2);
  60  *
  61  *             executor.join();
  62  *
  63  *             ... process results/exceptions ...
  64  *
  65  *         }
  66  * }</pre>
  67  * To ensure correct usage, the {@code join} and {@code close} methods may only be invoked
  68  * by the <em>executor owner</em> (the thread that opened the executor), and the {@code close}
  69  * method throws an exception after closing if the owner did not invoke the {@code join}
  70  * method.
  71  *
  72  * <p> StructuredExecutor defines the {@link #shutdown() shutdown} method to shut down an
  73  * executor without closing it. Shutdown is useful for cases where a task completes with
  74  * a result (or exception) and the results of other unfinished tasks are no longer needed.
  75  * Invoking {@code shutdown} while the owner is waiting in the {@code join} method will cause
  76  * the {@code join} to wakeup. It also interrupts all unfinished threads and prevents new
  77  * threads from starting in the executor.
  78  *
  79  * <p> StructuredExecutor defines the 2-arg {@link #fork(Callable, CompletionHandler) fork}
  80  * method that executes a {@link CompletionHandler CompletionHandler} after a task completes.
  81  * The completion handler can be used to implement policy, collect results and/or exceptions,
  82  * and provide an API that makes available the outcome to the main task to process after the
  83  * {@code join} method.
  84  * <pre>{@code
  85  *         try (var executor = StructuredExecutor.open()) {
  86  *
  87  *             MyHandler<String> handler = ...
  88  *
  89  *             Future<String> future1 = executor.fork(task1, handler);
  90  *             Future<String> future2 = executor.fork(task2, handler);
  91  *
  92  *             executor.join();
  93  *
  94  *             ... invoke handler methods to examine outcome, process results/exceptions, ...
  95  *
  96  *         }
  97  * }</pre>
  98  *
  99  * <p> StructuredExecutor defines two completion handlers that implement policy for two
 100  * common cases:
 101  * <ol>
 102  *   <li> {@link ShutdownOnSuccess ShutdownOnSuccess} captures the first result and shuts
 103  *   down the executor to interrupt unfinished threads and wakeup the owner. This handler
 104  *   is intended for cases where the result of any task will do ("invoke any") and where the
 105  *   results of other unfinished tasks are no longer needed. It defines methods to get the
 106  *   first result or throw an exception if all tasks fail.
 107  *   <li> {@link ShutdownOnFailure ShutdownOnFailure} captures the first exception and shuts
 108  *   down the executor. This handler is intended for cases where the results of all tasks
 109  *   are required ("invoke all"); if any task fails then the results of other unfinished tasks
 110  *   are no longer needed. If defines methods to throw an exception if any of the tasks fail.
 111  * </ol>
 112  *
 113  * <p> The following are two examples that use the built-in completion handlers. In both
 114  * cases, a pair of tasks are forked to fetch resources from two URL locations "left" and
 115  * "right". The first example creates a ShutdownOnSuccess object to capture the result of
 116  * the first task to complete normally, cancelling the other by way of shutting down the
 117  * executor. The main task waits in {@code join} until either task completes with a result
 118  * or both tasks fail. It invokes the handler's {@link ShutdownOnSuccess#result(Function)
 119  * result(Function)} method to get the captured result. If both tasks fails then this
 120  * method throws WebApplicationException with the exception from one of the tasks as the
 121  * cause.
 122  * <pre>{@code
 123  *         try (var executor = StructuredExecutor.open()) {
 124  *             var handler = new ShutdownOnSuccess<String>();
 125  *
 126  *             executor.fork(() -> fetch(left), handler);
 127  *             executor.fork(() -> fetch(right), handler);
 128  *
 129  *             executor.join();
 130  *
 131  *             String result = handler.result(e -> new WebApplicationException(e));
 132  *
 133  *             :
 134  *         }
 135  * }</pre>
 136  * The second example creates a ShutdownOnFailure operation to capture the exception of
 137  * the first task to fail, cancelling the other by way of shutting down the executor. The
 138  * main task waits in {@link #joinUntil(Instant)} until both tasks complete with a result,
 139  * either fails, or a deadline is reached. It invokes the handler's {@link
 140  * ShutdownOnFailure#throwIfFailed(Function) throwIfFailed(Function)} to throw an exception
 141  * when either task fails. This method is a no-op if no tasks fail. The main task uses
 142  * {@code Future}'s {@link Future#resultNow() resultNow()} method to retrieve the results.
 143  *
 144  * <pre>{@code
 145  *        Instant deadline = ...
 146  *
 147  *        try (var executor = StructuredExecutor.open()) {
 148  *             var handler = new ShutdownOnFailure();
 149  *
 150  *             Future<String> future1 = executor.fork(() -> query(left), handler);
 151  *             Future<String> future2 = executor.fork(() -> query(right), handler);
 152  *
 153  *             executor.joinUntil(deadline);
 154  *
 155  *             handler.throwIfFailed(e -> new WebApplicationException(e));
 156  *
 157  *             // both tasks completed successfully
 158  *             String result = Stream.of(future1, future2)
 159  *                 .map(Future::resultNow)
 160  *                 .collect(Collectors.join(", ", "{ ", " }"));
 161  *
 162  *             :
 163  *         }
 164  * }</pre>
 165  *
 166  * <p> A StructuredExecutor is conceptually a node in a tree. A thread started in executor
 167  * "A" may itself open a new executor "B", implicitly forming a tree where executor "A" is
 168  * the parent of executor "B". When nested, say where thread opens executor "B" and then
 169  * invokes a method that opens executor "C", then the enclosing executor "B" is conceptually
 170  * the parent of the nested executor "C". The tree structure supports the inheritance of
 171  * {@linkplain ScopeLocal scope-local} bindings. It also supports confinement checks.
 172  * The phrase "threads contained in the executor" in method descriptions means threads in
 173  * executors in the tree. StructuredExecutor does not define APIs that exposes the tree
 174  * structure at this time.
 175  *
 176  * <p> Unless otherwise specified, passing a {@code null} argument to a constructor
 177  * or method in this class will cause a {@link NullPointerException} to be thrown.
 178  *
 179  * @since 99
 180  */
 181 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 182 public class StructuredExecutor implements Executor, AutoCloseable {
 183     private static final VarHandle FUTURES;
 184     static {
 185         try {
 186             MethodHandles.Lookup l = MethodHandles.lookup();
 187             FUTURES = l.findVarHandle(StructuredExecutor.class, "futures", Set.class);
 188         } catch (Exception e) {
 189             throw new InternalError(e);
 190         }
 191     }
 192 
 193     private final ThreadFactory factory;
 194     private final ThreadFlock flock;
 195     private final ReentrantLock shutdownLock = new ReentrantLock();
 196 
 197     // the set of "tracked" Future objects, created lazily
 198     private volatile Set<Future<?>> futures;
 199 
 200     // set to true when owner calls join
 201     private boolean joinInvoked;
 202 
 203     // states: OPEN -> SHUTDOWN -> CLOSED
 204     private static final int OPEN     = 0;
 205     private static final int SHUTDOWN = 1;
 206     private static final int CLOSED   = 2;
 207     private volatile int state;
 208 
 209     StructuredExecutor(String name, ThreadFactory factory) {
 210         this.factory = Objects.requireNonNull(factory);
 211         this.flock = ThreadFlock.open(name);
 212     }
 213 
 214     /**
 215      * Throws IllegalStateException if the current thread is not the owner.
 216      */
 217     private void ensureOwner() {
 218         if (Thread.currentThread() != flock.owner())
 219             throw new IllegalStateException("Not owner");
 220     }
 221 
 222     /**
 223      * Throws IllegalStateException if the current thread is not the owner
 224      * or a thread contained in the tree.
 225      */
 226     private void ensureOwnerOrContainsThread() {
 227         Thread currentThread = Thread.currentThread();
 228         if (currentThread != flock.owner() && !flock.containsThread(currentThread))
 229             throw new IllegalStateException("Current thread not owner or thread in executor");
 230     }
 231 
 232     /**
 233      * Tests if the executor is shutdown.
 234      */
 235     private boolean isShutdown() {
 236         return state >= SHUTDOWN;
 237     }
 238 
 239     /**
 240      * Track the given Future.
 241      */
 242     private void track(Future<?> future) {
 243         // create the set of Futures if not already created
 244         Set<Future<?>> futures = this.futures;
 245         if (futures == null) {
 246             futures = ConcurrentHashMap.newKeySet();
 247             if (!FUTURES.compareAndSet(this, null, futures)) {
 248                 // lost the race
 249                 futures = this.futures;
 250             }
 251         }
 252         futures.add(future);
 253     }
 254 
 255     /**
 256      * Stop tracking the Future.
 257      */
 258     private void untrack(Future<?> future) {
 259         assert futures != null;
 260         futures.remove(future);
 261     }
 262 
 263     /**
 264      * Opens a new StructuredExecutor that creates threads with given thread factory
 265      * to run tasks. The executor is owned by the current thread. The executor is
 266      * optionally named.
 267      *
 268      * <p> This method captures the current thread's {@linkplain ScopeLocal scope-local}
 269      * bindings for inheritance by threads created in the executor.
 270      *
 271      * <p> For the purposes of confinement and inheritance of scope-local bindings, the
 272      * parent of the executor is determined as follows:
 273      * <ul>
 274      * <li> If the current thread is the owner of open executors then the most recently
 275      * created, and open, executor is the parent of the new executor. In other words, the
 276      * <em>enclosing executor</em> is the parent.
 277      * <li> If the current thread is not the owner of any open executors then the
 278      * parent of the new executor is the current thread's executor. If the current thread
 279      * was not started in an executor then the new executor does not have a parent.
 280      * </ul>
 281      *
 282      * @param name the name of the executor, can be null
 283      * @param factory the thread factory
 284      * @return a new StructuredExecutor
 285      */
 286     public static StructuredExecutor open(String name, ThreadFactory factory) {
 287         return new StructuredExecutor(name, factory);
 288     }
 289 
 290     /**
 291      * Opens a new StructuredExecutor that creates virtual threads to run tasks.
 292      *
 293      * <p> This method is equivalent to invoking {@link #open(String, ThreadFactory)}
 294      * with the given name and a thread factory that creates virtual threads.
 295      *
 296      * @param name the name of the executor
 297      * @return a new StructuredExecutor
 298      */
 299     public static StructuredExecutor open(String name) {
 300         ThreadFactory factory = Thread.ofVirtual().factory();
 301         return new StructuredExecutor(Objects.requireNonNull(name), factory);
 302     }
 303 
 304     /**
 305      * Opens a new StructuredExecutor that creates virtual threads to run tasks.
 306      * The executor is unnamed.
 307      *
 308      * <p> This method is equivalent to invoking {@link #open(String, ThreadFactory)}
 309      * with a name of {@code null} and a thread factory that creates virtual threads.
 310      *
 311      * @return a new StructuredExecutor
 312      */
 313     public static StructuredExecutor open() {
 314         ThreadFactory factory = Thread.ofVirtual().factory();
 315         return new StructuredExecutor(null, factory);
 316     }
 317 
 318     /**
 319      * Starts a new thread to run the given task. If handler is non-null then it is
 320      * invoked if the task completes before the executor is shutdown.
 321      */
 322     private <V> Future<V> spawn(Callable<? extends V> task,
 323                                 CompletionHandler<? super V> handler) {
 324         Objects.requireNonNull(task);
 325 
 326         // create future
 327         var future = new FutureImpl<V>(this, task, handler);
 328 
 329         // check state before creating thread
 330         int s = state;
 331         if (s >= SHUTDOWN) {
 332             // the executor is closed, shutdown, or in the process of shutting down
 333             if (state == CLOSED)
 334                 throw new IllegalStateException("Executor is closed");
 335             future.cancel(false);
 336             return future;
 337         }
 338 
 339         // create thread
 340         Thread thread = factory.newThread(future);
 341         if (thread == null)
 342             throw new RejectedExecutionException();
 343 
 344         // attempt to start the thread
 345         try {
 346             flock.start(thread);
 347         } catch (IllegalStateException e) {
 348             if (flock.isShutdown()) {
 349                 // the executor is closed, shutdown, or in the process of shutting down
 350                 if (state == CLOSED)
 351                     throw new IllegalStateException("Executor is closed");
 352                 future.cancel(false);
 353             } else {
 354                 // scope-locals don't match
 355                 throw e;
 356             }
 357         }
 358 
 359         return future;
 360     }
 361 
 362     /**
 363      * Starts a new thread to run the given task.
 364      *
 365      * <p> The thread inherits the current thread's {@linkplain ScopeLocal scope-local}
 366      * bindings and must match the bindings captured when the executor was created.
 367      *
 368      * <p> If this executor is {@linkplain #shutdown() shutdown} (or in the process of
 369      * shutting down) then this method returns a Future representing a {@link
 370      * Future.State#CANCELLED cancelled} task that was not run.
 371      *
 372      * <p> This method may only be invoked by the executor owner or threads contained
 373      * in the executor.
 374      *
 375      * @param task the task to run
 376      * @param <V> the task return type
 377      * @return a future
 378      * @throws IllegalStateException if this executor is closed, the current
 379      * scope-local bindings are not the same as when the executor was created,
 380      * or the caller thread is not the owner or a thread contained in the executor
 381      * @throws RejectedExecutionException if the thread factory rejected creating a
 382      * thread to run the task
 383      */
 384     public <V> Future<V> fork(Callable<? extends V> task) {
 385         return spawn(task, null);
 386     }
 387 
 388     /**
 389      * Starts a new thread to run the given task and a completion handler when the task
 390      * completes.
 391      *
 392      * <p> The thread inherits the current thread's {@linkplain ScopeLocal scope-local}
 393      * bindings and must match the bindings captured when the executor was created.
 394      *
 395      * <p> The completion handler's {@link CompletionHandler#handle(StructuredExecutor, Future)
 396      * handle} method is invoked if the task completes before the executor is {@link
 397      * #shutdown() shutdown}. If the executor shuts down at or around the same time that
 398      * the task completes then the completion handler may or may not be invoked.
 399      * The {@link CompletionHandler#compose(CompletionHandler, CompletionHandler) comppose}
 400      * method can be used to compose more than one handler where required. The {@code handle}
 401      * method is run by the thread when the task completes with a result or exception. If
 402      * the {@link Future#cancel(boolean) Future.cancel} is used to cancel a task, and before
 403      * the executor is shutdown, then the {@code handle} method is run by the thread that
 404      * invokes {@code cancel}.
 405      *
 406      * <p> If this executor is {@linkplain #shutdown() shutdown} (or in the process of
 407      * shutting down) then this method returns a Future representing a {@link
 408      * Future.State#CANCELLED cancelled} task that was not run.
 409      *
 410      * <p> This method may only be invoked by the executor owner or threads contained
 411      * in the executor.
 412      *
 413      * @param task the task to run
 414      * @param handler the completion handler to run when the task completes
 415      * @param <V> the task return type
 416      * @return a future
 417      *
 418      * @throws IllegalStateException if this executor is closed, the current
 419      * scope-local bindings are not the same as when the executor was created,
 420      * or the caller thread is not the owner or a thread contained in the executor
 421      * @throws RejectedExecutionException if the thread factory rejected creating a
 422      * thread to run the task
 423      */
 424     public <V> Future<V> fork(Callable<? extends V> task,
 425                               CompletionHandler<? super V> handler) {
 426         return spawn(task, Objects.requireNonNull(handler));
 427     }
 428 
 429     /**
 430      * Starts a new thread in this executor to run the given task.
 431      *
 432      * <p> The thread inherits the current thread's {@linkplain ScopeLocal scope-local}
 433      * bindings and must match the bindings captured when the executor was created.
 434      *
 435      * <p> If this executor is {@linkplain #shutdown() shutdown} (or in the process of
 436      * shutting down) then the task does not run.
 437      *
 438      * <p> This method may only be invoked by the executor owner or threads contained
 439      * in the executor.
 440      *
 441      * @param task the task to run
 442      * @throws IllegalStateException if this executor is closed, the current
 443      * scope-local bindings are not the same as when the executor was created,
 444      * or the caller thread is not the owner or a thread contained in the executor
 445      * @throws RejectedExecutionException if the thread factory rejected creating a
 446      * thread to run the task
 447      */
 448     @Override
 449     public void execute(Runnable task) {
 450         spawn(Executors.callable(task), null);
 451     }
 452 
 453     /**
 454      * Wait for all threads to finish or the executor to shutdown.
 455      */
 456     private void implJoin(Duration timeout)
 457         throws InterruptedException, TimeoutException
 458     {
 459         ensureOwner();
 460         joinInvoked = true;
 461         int s = state;
 462         if (s >= SHUTDOWN) {
 463             if (s == CLOSED)
 464                 throw new IllegalStateException("Executor is closed");
 465             return;
 466         }
 467 
 468         // wait for all threads, wakeup, interrupt, or timeout
 469         if (timeout != null) {
 470             flock.awaitAll(timeout);
 471         } else {
 472             flock.awaitAll();
 473         }
 474     }
 475 
 476     /**
 477      * Wait for all unfinished threads or the executor to shutdown. This method waits
 478      * until all threads in the executor finish their tasks (including {@linkplain
 479      * #fork(Callable, CompletionHandler) completion handlers}), the {@link #shutdown()
 480      * shutdown} method is invoked to shut down the executor, or the current thread is
 481      * interrupted.
 482      *
 483      * <p> This method may only be invoked by the executor owner.
 484      *
 485      * @throws IllegalStateException if this executor is closed or the caller thread
 486      * is not the owner
 487      * @throws InterruptedException if interrupted while waiting
 488      */
 489     public void join() throws InterruptedException {
 490         try {
 491             implJoin(null);
 492         } catch (TimeoutException e) {
 493             throw new InternalError();
 494         }
 495     }
 496 
 497     /**
 498      * Wait for all unfinished threads or the executor to shutdown, up to the given
 499      * deadline. This method waits until all threads in the executor finish their
 500      * tasks (including {@linkplain #fork(Callable, CompletionHandler) completion
 501      * handlers}), the {@link #shutdown() shutdown} method is invoked to shut down
 502      * the executor, the current thread is interrupted, or the deadline is reached.
 503      *
 504      * <p> This method may only be invoked by the executor owner.
 505      *
 506      * @param deadline the deadline
 507      * @throws IllegalStateException if this executor is closed or the caller thread
 508      * is not the owner
 509      * @throws InterruptedException if interrupted while waiting
 510      * @throws TimeoutException if the deadline is reached while waiting
 511      */
 512     public void joinUntil(Instant deadline)
 513         throws InterruptedException, TimeoutException
 514     {
 515         Duration timeout = Duration.between(Instant.now(), deadline);
 516         implJoin(timeout);
 517     }
 518 
 519     /**
 520      * Cancel all tracked Future objects.
 521      */
 522     private void cancelTrackedFutures() {
 523         Set<Future<?>> futures = this.futures;
 524         if (futures != null) {
 525             futures.forEach(f -> f.cancel(false));
 526         }
 527     }
 528 
 529     /**
 530      * Interrupt all unfinished threads.
 531      */
 532     private void implInterruptAll() {
 533         flock.threads().forEach(t -> {
 534             if (t != Thread.currentThread()) {
 535                 t.interrupt();
 536             }
 537         });
 538     }
 539 
 540     @SuppressWarnings("removal")
 541     private void interruptAll() {
 542         if (System.getSecurityManager() == null) {
 543             implInterruptAll();
 544         } else {
 545             PrivilegedAction<Void> pa = () -> {
 546                 implInterruptAll();
 547                 return null;
 548             };
 549             AccessController.doPrivileged(pa);
 550         }
 551     }
 552 
 553     /**
 554      * Shutdown the executor if not already shutdown. Return true if this method
 555      * shutdowns the executor, false if already shutdown.
 556      */
 557     private boolean implShutdown() {
 558         if (state < SHUTDOWN) {
 559             shutdownLock.lock();
 560             try {
 561                 if (state < SHUTDOWN) {
 562 
 563                     // prevent new threads from starting
 564                     flock.shutdown();
 565 
 566                     // wakeup any threads waiting in Future::get
 567                     cancelTrackedFutures();
 568 
 569                     // interrupt all unfinished threads
 570                     interruptAll();
 571 
 572                     state = SHUTDOWN;
 573                     return true;
 574                 }
 575             } finally {
 576                 shutdownLock.unlock();
 577             }
 578         }
 579         assert state >= SHUTDOWN;
 580         return false;
 581     }
 582 
 583     /**
 584      * Shutdown the executor without closing it. Shutting down an executor prevents new
 585      * threads from starting, interrupts all unfinished threads, and causes the
 586      * {@link #join() join} method to wakeup. Shutdown is useful for cases where the
 587      * results of unfinished tasks are no longer needed.
 588      *
 589      * <p> More specifically, this method:
 590      * <ul>
 591      * <li> {@linkplain Future#cancel(boolean) Cancels} the tasks that have threads
 592      * {@linkplain Future#get() waiting} on a result so that the waiting threads wakeup.
 593      * <li> {@linkplain Thread#interrupt() Interrupts} all unfinished threads in the
 594      * executor (except the current thread).
 595      * <li> Wakes up the owner if it is waiting in {@link #join()} or {@link
 596      * #joinUntil(Instant)}. If the owner is not waiting then its next call to {@code
 597      * join} or {@code joinUntil} will return immediately.
 598      * </ul>
 599      *
 600      * <p> When this method completes then the Future objects for all tasks will be
 601      * {@linkplain Future#isDone() done}, normally or abnormally. There may still be
 602      * threads that have not finished because they are executing code that did not
 603      * respond (or respond promptly) to thread interrupt. This method does not wait
 604      * for these threads. When the owner invokes the {@link #close() close} method
 605      * to close the executor then it will wait for the remaining threads to finish.
 606      *
 607      * <p> This method may only be invoked by the executor owner or threads contained
 608      * in the executor.
 609      *
 610      * @throws IllegalStateException if this executor is closed, or the caller thread
 611      * is not the owner or a thread contained in the executor
 612      */
 613     public void shutdown() {
 614         ensureOwnerOrContainsThread();
 615         if (state == CLOSED)
 616             throw new IllegalStateException("Executor is closed");
 617         if (implShutdown())
 618             flock.wakeup();
 619     }
 620 
 621     /**
 622      * Closes this executor.
 623      *
 624      * <p> This method first shuts down the executor (as if by invoking the {@link
 625      * #shutdown() shutdown} method). It then waits for the threads executing any
 626      * unfinished tasks to finish. If interrupted then this method will continue to
 627      * wait for the threads to finish before completing with the interrupt status set.
 628      *
 629      * <p> This method may only be invoked by the executor owner.
 630      *
 631      * <p> A StructuredExecutor is intended to be used in a <em>structured manner</em>. If
 632      * this method is called to close an executor before nested executors are closed then
 633      * it closes the underlying construct of each nested executor (in the reverse order
 634      * that they were created in), closes this executor, and then throws {@link
 635      * StructureViolationException}.
 636      *
 637      * Similarly, if called to close an executor that <em>encloses</em> {@linkplain
 638      * ScopeLocal.Carrier#run(Runnable) operations} with scope-local bindings then
 639      * it also throws {@code StructureViolationException} after closing the executor.
 640      *
 641      * @throws IllegalStateException if invoked by a thread that is not the owner,
 642      * or thrown after closing the executor if the owner did not join the executor
 643      */
 644     @Override
 645     public void close() {
 646         ensureOwner();
 647         if (state == CLOSED)
 648             return;
 649 
 650         try {
 651             implShutdown();
 652             flock.close();
 653         } finally {
 654             state = CLOSED;
 655         }
 656 
 657         if (!joinInvoked) {
 658             throw new IllegalStateException("Owner did not invoke join or joinUntil");
 659         }
 660     }
 661 
 662     @Override
 663     public String toString() {
 664         StringBuilder sb = new StringBuilder();
 665         String name = flock.name();
 666         if (name != null) {
 667             sb.append(name);
 668             sb.append('/');
 669         }
 670         String id = getClass().getName() + "@" + System.identityHashCode(this);
 671         sb.append(id);
 672         int s = state;
 673         if (s == CLOSED)
 674             sb.append("/closed");
 675         else if (s == SHUTDOWN)
 676             sb.append("/shutdown");
 677         return sb.toString();
 678     }
 679 
 680     /**
 681      * The Future implementation returned by the fork methods. Most methods are
 682      * overridden to support cancellation when the executor is shutdown.
 683      * The blocking get methods register the Future with the executor so that they
 684      * are cancelled when the executor shuts down.
 685      */
 686     private class FutureImpl<V> extends FutureTask<V> {
 687         private final StructuredExecutor executor;
 688         private final CompletionHandler<? super V> handler;
 689 
 690         @SuppressWarnings("unchecked")
 691         FutureImpl(StructuredExecutor executor,
 692                    Callable<? extends V> task,
 693                    CompletionHandler<? super V> handler) {
 694             super((Callable<V>) task);
 695             this.executor = executor;
 696             this.handler = handler;
 697         }
 698 
 699         @Override
 700         @SuppressWarnings("unchecked")
 701         protected void done() {
 702             if (handler != null && !executor.isShutdown()) {
 703                 var handler = (CompletionHandler<Object>) this.handler;
 704                 var future = (Future<Object>) this;
 705                 handler.handle(executor, future);
 706             }
 707         }
 708 
 709         private void cancelIfShutdown() {
 710             if (executor.isShutdown() && !super.isDone()) {
 711                 super.cancel(false);
 712             }
 713         }
 714 
 715         @Override
 716         public boolean isDone() {
 717             cancelIfShutdown();
 718             return super.isDone();
 719         }
 720 
 721         @Override
 722         public boolean isCancelled() {
 723             cancelIfShutdown();
 724             return super.isCancelled();
 725         }
 726 
 727         @Override
 728         public boolean cancel(boolean mayInterruptIfRunning) {
 729             cancelIfShutdown();
 730             return super.cancel(mayInterruptIfRunning);
 731         }
 732 
 733         @Override
 734         public V get() throws InterruptedException, ExecutionException {
 735             if (super.isDone())
 736                 return super.get();
 737             executor.track(this);
 738             try {
 739                 cancelIfShutdown();
 740                 return super.get();
 741             } finally {
 742                 executor.untrack(this);
 743             }
 744         }
 745 
 746         @Override
 747         public V get(long timeout, TimeUnit unit)
 748                 throws InterruptedException, ExecutionException, TimeoutException {
 749             Objects.requireNonNull(unit);
 750             if (super.isDone())
 751                 return super.get();
 752             executor.track(this);
 753             try {
 754                 cancelIfShutdown();
 755                 return super.get(timeout, unit);
 756             } finally {
 757                 executor.untrack(this);
 758             }
 759         }
 760 
 761         @Override
 762         public V resultNow() {
 763             cancelIfShutdown();
 764             return super.resultNow();
 765         }
 766 
 767         @Override
 768         public Throwable exceptionNow() {
 769             cancelIfShutdown();
 770             return super.exceptionNow();
 771         }
 772 
 773         @Override
 774         public State state() {
 775             cancelIfShutdown();
 776             return super.state();
 777         }
 778 
 779         @Override
 780         public String toString() {
 781             cancelIfShutdown();
 782             return super.toString();
 783         }
 784     }
 785 
 786     /**
 787      * An operation that is invoked after a task completes. A CompletionHandler is specified
 788      * to the {@link #fork(Callable, CompletionHandler) fork} method to execute after a task
 789      * completes.
 790      *
 791      * <p> A CompletionHandler implements a policy on how tasks that complete normally and
 792      * abnormally are handled. It may, for example, collect the results of tasks that complete
 793      * with a result and ignore tasks that fail. It may collect exceptions when tasks fail. It
 794      * may invoke the {@link #shutdown() shutdown} method to shut down the executor and
 795      * cause {@link #join() join} to wakeup when some condition arises.
 796      *
 797      * <p> A CompletionHandler will typically define methods to make available results, state
 798      * or outcome to code that executes after the {@code join} method. A completion handler
 799      * that collects results and ignores tasks that fail may define a method that returns
 800      * a possibly empty collection of the results. A completion handler that implements
 801      * a policy to shut down the executor when a task fails may define a method to retrieve
 802      * the exception of the first task to fail.
 803      *
 804      * <p> The {@link #compose(CompletionHandler, CompletionHandler) compose} method may be
 805      * used to compose more than one completion handler if required.
 806      *
 807      * <p> Unless otherwise specified, passing a {@code null} argument a method
 808      * in this class will cause a {@link NullPointerException} to be thrown.
 809      *
 810      * @param <V> the result type
 811      * @since 99
 812      */
 813     @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 814     public interface CompletionHandler<V> {
 815         /**
 816          * Performs an action on a completed task.
 817          *
 818          * @param executor the executor
 819          * @param future the Future for the completed task
 820          * @throws IllegalArgumentException if the task has not completed
 821          */
 822         void handle(StructuredExecutor executor, Future<V> future);
 823 
 824         /**
 825          * Returns a composed {@code CompletionHandler} that performs, in sequence, a
 826          * {@code first} handler followed by a {@code second} handler. If performing
 827          * either handler throws an exception, it is relayed to the caller of the
 828          * composed handler. If performing the first handler throws an exception,
 829          * the {@code second} handler will not be performed.
 830          *
 831          * @param first the first handler
 832          * @param second the second handler
 833          * @param <V> the result type
 834          * @return a composed CompletionHandler that performs in sequence the first
 835          * and second handlers
 836          */
 837         static <V> CompletionHandler<V> compose(CompletionHandler<? extends V> first,
 838                                                 CompletionHandler<? extends V> second) {
 839             Objects.requireNonNull(first);
 840             Objects.requireNonNull(second);
 841             @SuppressWarnings("unchecked")
 842             var handler1 = (CompletionHandler<V>) first;
 843             @SuppressWarnings("unchecked")
 844             var handler2 = (CompletionHandler<V>) second;
 845             return (e, f) -> {
 846                 handler1.handle(e, f);
 847                 handler2.handle(e, f);
 848             };
 849         }
 850     }
 851 
 852     /**
 853      * A CompletionHandler that captures the result of the first task to complete
 854      * successfully. Once captured, the handler {@linkplain #shutdown() shuts down}
 855      * the executor to interrupt unfinished threads and wakeup the owner. This handler
 856      * is intended for cases where the result of any task will do ("invoke any") and
 857      * where the results of other unfinished tasks are no longer needed.
 858      *
 859      * <p> Unless otherwise specified, passing a {@code null} argument a method
 860      * in this class will cause a {@link NullPointerException} to be thrown.
 861      *
 862      * @param <V> the result type
 863      * @since 99
 864      */
 865     @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 866     public static final class ShutdownOnSuccess<V> implements CompletionHandler<V> {
 867         private static final VarHandle FIRST_SUCCESS;
 868         private static final VarHandle FIRST_FAILED;
 869         private static final VarHandle FIRST_CANCELLED;
 870         static {
 871             try {
 872                 MethodHandles.Lookup l = MethodHandles.lookup();
 873                 FIRST_SUCCESS = l.findVarHandle(ShutdownOnSuccess.class, "firstSuccess", Future.class);
 874                 FIRST_FAILED = l.findVarHandle(ShutdownOnSuccess.class, "firstFailed", Future.class);
 875                 FIRST_CANCELLED = l.findVarHandle(ShutdownOnSuccess.class, "firstCancelled", Future.class);
 876             } catch (Exception e) {
 877                 throw new InternalError(e);
 878             }
 879         }
 880         private volatile Future<V> firstSuccess;
 881         private volatile Future<V> firstFailed;
 882         private volatile Future<V> firstCancelled;
 883 
 884         /**
 885          * Creates a new ShutdownOnSuccess object.
 886          */
 887         public ShutdownOnSuccess() { }
 888 
 889         /**
 890          * Shutdown the given executor when invoked for the first time with a task
 891          * that completed with a result.
 892          *
 893          * @param executor the executor
 894          * @param future the completed task
 895          * @throws IllegalArgumentException {@inheritDoc}
 896          * @see #shutdown()
 897          * @see Future.State#SUCCESS
 898          */
 899         @Override
 900         public void handle(StructuredExecutor executor, Future<V> future) {
 901             Objects.requireNonNull(executor);
 902             switch (future.state()) {
 903                 case RUNNING -> throw new IllegalArgumentException("Task is not completed");
 904                 case SUCCESS -> {
 905                     // capture first task to complete normally
 906                     if (firstSuccess == null
 907                             && FIRST_SUCCESS.compareAndSet(this, null, future)) {
 908                         executor.shutdown();
 909                     }
 910                 }
 911                 case FAILED -> {
 912                     // capture first task to complete with an exception
 913                     if (firstSuccess == null && firstFailed == null) {
 914                         FIRST_FAILED.compareAndSet(this, null, future);
 915                     }
 916                 }
 917                 case CANCELLED ->  {
 918                     // capture the first cancelled task
 919                     if (firstSuccess == null && firstFailed == null && firstCancelled == null) {
 920                         FIRST_CANCELLED.compareAndSet(this, null, future);
 921                     }
 922                 }
 923             }
 924         }
 925 
 926         /**
 927          * {@return the result of the first task that completed with a result}
 928          *
 929          * <p> When no task completed with a result but a task completed with an exception
 930          * then {@code ExecutionException} is thrown with the exception as the {@linkplain
 931          * Throwable#getCause() cause}. If only cancelled tasks were notified to the {@code
 932          * handle} method then {@code CancellationException} is thrown.
 933          *
 934          * @throws ExecutionException if no tasks completed with a result but a task
 935          * completed with an exception
 936          * @throws CancellationException if all tasks were cancelled
 937          * @throws IllegalStateException if the handle method was not invoked with a
 938          * completed task
 939          */
 940         public V result() throws ExecutionException {
 941             Future<V> f = firstSuccess;
 942             if (f != null)
 943                 return f.resultNow();
 944             if ((f = firstFailed) != null)
 945                 throw new ExecutionException(f.exceptionNow());
 946             if (firstCancelled != null)
 947                 throw new CancellationException();
 948             throw new IllegalStateException("No completed tasks");
 949         }
 950 
 951         /**
 952          * Returns the result of the first task that completed with a result, otherwise
 953          * throws an exception produced by the given exception supplying function.
 954          *
 955          * <p> When no task completed with a result but a task completed with an
 956          * exception then the exception supplying function is invoked with the
 957          * exception. If only cancelled tasks were notified to the {@code handle}
 958          * method then the exception supplying function is invoked with a
 959          * {@code CancellationException}.
 960          *
 961          * @param esf the exception supplying function
 962          * @param <X> type of the exception to be thrown
 963          * @return the result of the first task that completed with a result
 964          * @throws X if no task completed with a result
 965          * @throws IllegalStateException if the handle method was not invoked with a
 966          * completed task
 967          */
 968         public <X extends Throwable> V result(Function<Throwable, ? extends X> esf) throws X {
 969             Objects.requireNonNull(esf);
 970             Future<V> f = firstSuccess;
 971             if (f != null)
 972                 return f.resultNow();
 973             Throwable throwable = null;
 974             if ((f = firstFailed) != null) {
 975                 throwable = f.exceptionNow();
 976             } else if (firstCancelled != null) {
 977                 throwable = new CancellationException();
 978             }
 979             if (throwable != null) {
 980                 X ex = esf.apply(throwable);
 981                 Objects.requireNonNull(ex, "esf returned null");
 982                 throw ex;
 983             }
 984             throw new IllegalStateException("No tasks completed");
 985         }
 986     }
 987 
 988     /**
 989      * A CompletionHandler that captures the exception of the first task to complete
 990      * abnormally. Once captured, the handler {@linkplain #shutdown() shuts down} the
 991      * executor to interrupt unfinished threads and wakeup the owner. This handler is
 992      * intended for cases where the results for all tasks are required ("invoke all");
 993      * if any task fails then the results of other unfinished tasks are no longer needed.
 994      *
 995      * <p> Unless otherwise specified, passing a {@code null} argument a method
 996      * in this class will cause a {@link NullPointerException} to be thrown.
 997      *
 998      * @since 99
 999      */
1000     @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
1001     public static final class ShutdownOnFailure implements CompletionHandler<Object> {
1002         private static final VarHandle FIRST_FAILED;
1003         private static final VarHandle FIRST_CANCELLED;
1004         static {
1005             try {
1006                 MethodHandles.Lookup l = MethodHandles.lookup();
1007                 FIRST_FAILED = l.findVarHandle(ShutdownOnFailure.class, "firstFailed", Future.class);
1008                 FIRST_CANCELLED = l.findVarHandle(ShutdownOnFailure.class, "fistCancelled", Future.class);
1009             } catch (Exception e) {
1010                 throw new InternalError(e);
1011             }
1012         }
1013         private volatile Future<Object> firstFailed;
1014         private volatile Future<Object> fistCancelled;
1015 
1016         /**
1017          * Creates a new ShutdownOnFailure object.
1018          */
1019         public ShutdownOnFailure() { }
1020 
1021         /**
1022          * Shutdown the given executor when invoked for the first time with a task
1023          * that completed abnormally (exception or cancelled).
1024          *
1025          * @param executor the executor
1026          * @param future the completed task
1027          * @throws IllegalArgumentException {@inheritDoc}
1028          * @see #shutdown()
1029          * @see Future.State#FAILED
1030          * @see Future.State#CANCELLED
1031          */
1032         @Override
1033         public void handle(StructuredExecutor executor, Future<Object> future) {
1034             Objects.requireNonNull(executor);
1035             switch (future.state()) {
1036                 case RUNNING -> throw new IllegalArgumentException("Task is not completed");
1037                 case SUCCESS -> { }
1038                 case FAILED -> {
1039                     if (firstFailed == null
1040                             && FIRST_FAILED.compareAndSet(this, null, future)) {
1041                         executor.shutdown();
1042                     }
1043                 }
1044                 case CANCELLED -> {
1045                     if (firstFailed == null && fistCancelled == null
1046                             && FIRST_CANCELLED.compareAndSet(this, null, future)) {
1047                         executor.shutdown();
1048                     }
1049                 }
1050             }
1051         }
1052 
1053         /**
1054          * Returns the exception for the first task that completed with an exception.
1055          * If no task completed with an exception but cancelled tasks were notified
1056          * to the {@code handle} method then a {@code CancellationException} is returned.
1057          * If no tasks completed abnormally then an empty {@code Optional} is returned.
1058          *
1059          * @return the exception for a task that completed abnormally or an empty
1060          * optional if no tasks completed abnormally
1061          */
1062         public Optional<Throwable> exception() {
1063             Future<Object> f = firstFailed;
1064             if (f != null)
1065                 return Optional.of(f.exceptionNow());
1066             if (fistCancelled != null)
1067                 return Optional.of(new CancellationException());
1068             return Optional.empty();
1069         }
1070 
1071         /**
1072          * Throws if a task completed abnormally. If any task completed with an
1073          * exception then {@code ExecutionException} is thrown with the exception of
1074          * the first task to fail as the {@linkplain Throwable#getCause() cause}.
1075          * If no task completed with an exception but cancelled tasks were notified
1076          * to the {@code handle} method then {@code CancellationException} is thrown.
1077          * This method does nothing if no tasks completed abnormally.
1078          *
1079          * @throws ExecutionException if a task completed with an exception
1080          * @throws CancellationException if no tasks completed with an exception but
1081          * tasks were cancelled
1082          */
1083         public void throwIfFailed() throws ExecutionException {
1084             Future<Object> f = firstFailed;
1085             if (f != null)
1086                 throw new ExecutionException(f.exceptionNow());
1087             if (fistCancelled != null)
1088                 throw new CancellationException();
1089         }
1090 
1091         /**
1092          * Throws the exception produced by the given exception supplying function if
1093          * a task completed abnormally. If any task completed with an exception then
1094          * the function is invoked with the exception of the first task to fail.
1095          * If no task completed with an exception but cancelled tasks were notified to
1096          * the {@code handle} method then the function is called with a {@code
1097          * CancellationException}. The exception returned by the function is thrown.
1098          * This method does nothing if no tasks completed abnormally.
1099          *
1100          * @param esf the exception supplying function
1101          * @param <X> type of the exception to be thrown
1102          * @throws X produced by the exception supplying function
1103          */
1104         public <X extends Throwable>
1105         void throwIfFailed(Function<Throwable, ? extends X> esf) throws X {
1106             Objects.requireNonNull(esf);
1107             Throwable throwable = null;
1108             Future<Object> f = firstFailed;
1109             if (f != null) {
1110                 throwable = f.exceptionNow();
1111             } else if (fistCancelled != null) {
1112                 throwable = new CancellationException();
1113             }
1114             if (throwable != null) {
1115                 X ex = esf.apply(throwable);
1116                 Objects.requireNonNull(ex, "esf returned null");
1117                 throw ex;
1118             }
1119         }
1120     }
1121 }