1 /*
   2  * Copyright (c) 2021, 2023, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.concurrent;
  26 
  27 import java.lang.invoke.MethodHandles;
  28 import java.lang.invoke.VarHandle;
  29 import java.security.AccessController;
  30 import java.security.PrivilegedAction;
  31 import java.time.Duration;
  32 import java.time.Instant;
  33 import java.util.Objects;
  34 import java.util.Optional;
  35 import java.util.concurrent.locks.ReentrantLock;
  36 import java.util.function.Function;
  37 import java.util.function.Supplier;
  38 import jdk.internal.javac.PreviewFeature;
  39 import jdk.internal.misc.ThreadFlock;
  40 
  41 /**
  42  * A basic API for <em>structured concurrency</em>. {@code StructuredTaskScope} supports
  43  * cases where a task splits into several concurrent subtasks, and where the subtasks must
  44  * complete before the main task continues. A {@code StructuredTaskScope} can be used to
  45  * ensure that the lifetime of a concurrent operation is confined by a <em>syntax block</em>,
  46  * just like that of a sequential operation in structured programming.
  47  *
  48  * <h2>Basic operation</h2>
  49  *
  50  * A {@code StructuredTaskScope} is created with one of its public constructors. It defines
  51  * the {@link #fork(Callable) fork} method to start a thread to execute a subtask, the {@link
  52  * #join() join} method to wait for all subtasks to finish, and the {@link #close() close}
  53  * method to close the task scope. The API is intended to be used with the {@code
  54  * try-with-resources} statement. The intention is that code in the try <em>block</em>
  55  * uses the {@code fork} method to fork threads to execute the subtasks, wait for the
  56  * subtasks to finish with the {@code join} method, and then <em>process the results</em>.
  57  * A call to the {@code fork} method returns a {@link Subtask Subtask} to representing
  58  * the <em>forked subtask</em>. Once {@code join} is called, the {@code Subtask} can be
  59  * used to get the result completed successfully, or the exception if the subtask failed.
  60  * {@snippet lang=java :
  61  *     Callable<String> task1 = ...
  62  *     Callable<Integer> task2 = ...
  63  *
  64  *     try (var scope = new StructuredTaskScope<Object>()) {
  65  *
  66  *         Subtask<String> subtask1 = scope.fork(task1);   // @highlight substring="fork"
  67  *         Subtask<Integer> subtask2 = scope.fork(task2);  // @highlight substring="fork"
  68  *
  69  *         scope.join();                                   // @highlight substring="join"
  70  *
  71  *         ... process results/exceptions ...
  72  *
  73  *     } // close                                          // @highlight substring="close"
  74  * }
  75  * <p> The following example forks a collection of homogeneous subtasks, waits for all of
  76  * them to complete with the {@code join} method, and uses the {@link Subtask.State
  77  * Subtask.State} to partition the subtasks into a set of the subtasks that completed
  78  * successfully and another for the subtasks that failed.
  79  * {@snippet lang=java :
  80  *     List<Callable<String>> callables = ...
  81  *
  82  *     try (var scope = new StructuredTaskScope<String>()) {
  83  *
  84  *         List<Subtask<String>> subtasks = callables.stream().map(scope::fork).toList();
  85  *
  86  *         scope.join();
  87  *
  88  *         Map<Boolean, Set<Subtask<String>>> map = subtasks.stream()
  89  *                 .collect(Collectors.partitioningBy(h -> h.state() == Subtask.State.SUCCESS,
  90  *                                                    Collectors.toSet()));
  91  *
  92  *     } // close
  93  * }
  94  *
  95  * <p> To ensure correct usage, the {@code join} and {@code close} methods may only be
  96  * invoked by the <em>owner</em> (the thread that opened/created the task scope), and the
  97  * {@code close} method throws an exception after closing if the owner did not invoke the
  98  * {@code join} method after forking.
  99  *
 100  * <p> {@code StructuredTaskScope} defines the {@link #shutdown() shutdown} method to shut
 101  * down a task scope without closing it. The {@code shutdown()} method <em>cancels</em> all
 102  * unfinished subtasks by {@linkplain Thread#interrupt() interrupting} the threads. It
 103  * prevents new threads from starting in the task scope. If the owner is waiting in the
 104  * {@code join} method then it will wakeup.
 105  *
 106  * <p> Shutdown is used for <em>short-circuiting</em> and allow subclasses to implement
 107  * <em>policy</em> that does not require all subtasks to finish.
 108  *
 109  * <h2>Subclasses with policies for common cases</h2>
 110  *
 111  * Two subclasses of {@code StructuredTaskScope} are defined to implement policy for
 112  * common cases:
 113  * <ol>
 114  *   <li> {@link ShutdownOnSuccess ShutdownOnSuccess} captures the result of the first
 115  *   subtask to complete successfully. Once captured, it shuts down the task scope to
 116  *   interrupt unfinished threads and wakeup the owner. This class is intended for cases
 117  *   where the result of any subtask will do ("invoke any") and where there is no need to
 118  *   wait for results of other unfinished subtasks. It defines methods to get the first
 119  *   result or throw an exception if all subtasks fail.
 120  *   <li> {@link ShutdownOnFailure ShutdownOnFailure} captures the exception of the first
 121  *   subtask to fail. Once captured, it shuts down the task scope to interrupt unfinished
 122  *   threads and wakeup the owner. This class is intended for cases where the results of all
 123  *   subtasks are required ("invoke all"); if any subtask fails then the results of other
 124  *   unfinished subtasks are no longer needed. If defines methods to throw an exception if
 125  *   any of the subtasks fail.
 126  * </ol>
 127  *
 128  * <p> The following are two examples that use the two classes. In both cases, a pair of
 129  * subtasks are forked to fetch resources from two URL locations "left" and "right". The
 130  * first example creates a ShutdownOnSuccess object to capture the result of the first
 131  * subtask to complete successfully, cancelling the other by way of shutting down the task
 132  * scope. The main task waits in {@code join} until either subtask completes with a result
 133  * or both subtasks fail. It invokes {@link ShutdownOnSuccess#result(Function)
 134  * result(Function)} method to get the captured result. If both subtasks fail then this
 135  * method throws a {@code WebApplicationException} with the exception from one of the
 136  * subtasks as the cause.
 137  * {@snippet lang=java :
 138  *     try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
 139  *
 140  *         scope.fork(() -> fetch(left));
 141  *         scope.fork(() -> fetch(right));
 142  *
 143  *         scope.join();
 144  *
 145  *         // @link regex="result(?=\()" target="ShutdownOnSuccess#result" :
 146  *         String result = scope.result(e -> new WebApplicationException(e));
 147  *
 148  *         ...
 149  *     }
 150  * }
 151  * The second example creates a ShutdownOnFailure object to capture the exception of the
 152  * first subtask to fail, cancelling the other by way of shutting down the task scope. The
 153  * main task waits in {@link #joinUntil(Instant)} until both subtasks complete with a
 154  * result, either fails, or a deadline is reached. It invokes {@link
 155  * ShutdownOnFailure#throwIfFailed(Function) throwIfFailed(Function)} to throw an exception
 156  * if either subtask fails. This method is a no-op if both subtasks complete successfully.
 157  * The example uses {@link Supplier#get()} to get the result of each subtask. Using
 158  * {@code Supplier} instead of {@code Subtask} is preferred for common cases where the
 159  * object returned by fork is only used to get the result of a subtask that completed
 160  * successfully.
 161  * {@snippet lang=java :
 162  *    Instant deadline = ...
 163  *
 164  *    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
 165  *
 166  *         Supplier<String> supplier1 = scope.fork(() -> query(left));
 167  *         Supplier<String> supplier2 = scope.fork(() -> query(right));
 168  *
 169  *         scope.joinUntil(deadline);
 170  *
 171  *         // @link substring="throwIfFailed" target="ShutdownOnFailure#throwIfFailed" :
 172  *         scope.throwIfFailed(e -> new WebApplicationException(e));
 173  *
 174  *         // both subtasks completed successfully
 175  *         String result = Stream.of(supplier1, supplier2)
 176  *                 .map(Supplier::get)
 177  *                 .collect(Collectors.joining(", ", "{ ", " }"));
 178  *
 179  *         ...
 180  *     }
 181  * }
 182  *
 183  * <h2>Extending StructuredTaskScope</h2>
 184  *
 185  * {@code StructuredTaskScope} can be extended, and the {@link #handleComplete(Subtask)
 186  * handleComplete} method overridden, to implement policies other than those implemented
 187  * by {@code ShutdownOnSuccess} and {@code ShutdownOnFailure}. A subclass may, for example,
 188  * collect the results of subtasks that complete successfully and ignore subtasks that
 189  * fail. It may collect exceptions when subtasks fail. It may invoke the {@link #shutdown()
 190  * shutdown} method to shut down and cause {@link #join() join} to wakeup when some
 191  * condition arises.
 192  *
 193  * <p> A subclass will typically define methods to make available results, state, or other
 194  * outcome to code that executes after the {@code join} method. A subclass that collects
 195  * results and ignores subtasks that fail may define a method that returns the results.
 196  * A subclass that implements a policy to shut down when a subtask fails may define a
 197  * method to get the exception of the first subtask to fail.
 198  *
 199  * <p> The following is an example of a simple {@code StructuredTaskScope} implementation
 200  * that collects homogenous subtasks that complete successfully. It defines the method
 201  * "{@code completedSuccessfully()}" that the main task can invoke after it joins.
 202  * {@snippet lang=java :
 203  *     class CollectingScope<T> extends StructuredTaskScope<T> {
 204  *         private final Queue<Subtask<? extends T>> subtasks = new LinkedTransferQueue<>();
 205  *
 206  *         @Override
 207  *         protected void handleComplete(Subtask<? extends T> subtask) {
 208  *             if (subtask.state() == Subtask.State.SUCCESS) {
 209  *                 subtasks.add(subtask);
 210  *             }
 211  *         }
 212  *
 213  *         @Override
 214  *         public CollectingScope<T> join() throws InterruptedException {
 215  *             super.join();
 216  *             return this;
 217  *         }
 218  *
 219  *         public Stream<Subtask<? extends T>> completedSuccessfully() {
 220  *             // @link substring="ensureOwnerAndJoined" target="ensureOwnerAndJoined" :
 221  *             super.ensureOwnerAndJoined();
 222  *             return subtasks.stream();
 223  *         }
 224  *     }
 225  * }
 226  * <p> The implementations of the {@code completedSuccessfully()} method in the example
 227  * invokes {@link #ensureOwnerAndJoined()} to ensure that the method can only be invoked
 228  * by the owner thread and only after it has joined.
 229  *
 230  * <h2><a id="TreeStructure">Tree structure</a></h2>
 231  *
 232  * Task scopes form a tree where parent-child relations are established implicitly when
 233  * opening a new task scope:
 234  * <ul>
 235  *   <li> A parent-child relation is established when a thread started in a task scope
 236  *   opens its own task scope. A thread started in task scope "A" that opens task scope
 237  *   "B" establishes a parent-child relation where task scope "A" is the parent of task
 238  *   scope "B".
 239  *   <li> A parent-child relation is established with nesting. If a thread opens task
 240  *   scope "B", then opens task scope "C" (before it closes "B"), then the enclosing task
 241  *   scope "B" is the parent of the nested task scope "C".
 242  * </ul>
 243  *
 244  * The <i>descendants</i> of a task scope are the child task scopes that it is a parent
 245  * of, plus the descendants of the child task scopes, recursively.
 246  *
 247  * <p> The tree structure supports:
 248  * <ul>
 249  *   <li> Inheritance of {@linkplain ScopedValue scoped values} across threads.
 250  *   <li> Confinement checks. The phrase "threads contained in the task scope" in method
 251  *   descriptions means threads started in the task scope or descendant scopes.
 252  * </ul>
 253  *
 254  * <p> The following example demonstrates the inheritance of a scoped value. A scoped
 255  * value {@code USERNAME} is bound to the value "{@code duke}". A {@code StructuredTaskScope}
 256  * is created and its {@code fork} method invoked to start a thread to execute {@code
 257  * childTask}. The thread inherits the scoped value <em>bindings</em> captured when
 258  * creating the task scope. The code in {@code childTask} uses the value of the scoped
 259  * value and so reads the value "{@code duke}".
 260  * {@snippet lang=java :
 261  *     private static final ScopedValue<String> USERNAME = ScopedValue.newInstance();
 262  *
 263  *     // @link substring="runWhere" target="ScopedValue#runWhere(ScopedValue, Object, Runnable)" :
 264  *     ScopedValue.runWhere(USERNAME, "duke", () -> {
 265  *         try (var scope = new StructuredTaskScope<String>()) {
 266  *
 267  *             scope.fork(() -> childTask());           // @highlight substring="fork"
 268  *             ...
 269  *          }
 270  *     });
 271  *
 272  *     ...
 273  *
 274  *     String childTask() {
 275  *         // @link substring="get" target="ScopedValue#get()" :
 276  *         String name = USERNAME.get();   // "duke"
 277  *         ...
 278  *     }
 279  * }
 280  *
 281  * <p> {@code StructuredTaskScope} does not define APIs that exposes the tree structure
 282  * at this time.
 283  *
 284  * <p> Unless otherwise specified, passing a {@code null} argument to a constructor
 285  * or method in this class will cause a {@link NullPointerException} to be thrown.
 286  *
 287  * <h2>Memory consistency effects</h2>
 288  *
 289  * <p> Actions in the owner thread of, or a thread contained in, the task scope prior to
 290  * {@linkplain #fork forking} of a subtask
 291  * <a href="{@docRoot}/java.base/java/util/concurrent/package-summary.html#MemoryVisibility">
 292  * <i>happen-before</i></a> any actions taken by that subtask, which in turn <i>happen-before</i>
 293  * the subtask result is {@linkplain Subtask#get() retrieved} or <i>happen-before</i> any
 294  * actions taken in a thread after {@linkplain #join() joining} of the task scope.
 295  *
 296  * @jls 17.4.5 Happens-before Order
 297  *
 298  * @param <T> the result type of tasks executed in the task scope
 299  * @since 21
 300  */
 301 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 302 public class StructuredTaskScope<T> implements AutoCloseable {
 303     private final ThreadFactory factory;
 304     private final ThreadFlock flock;
 305     private final ReentrantLock shutdownLock = new ReentrantLock();
 306 
 307     // states: OPEN -> SHUTDOWN -> CLOSED
 308     private static final int OPEN     = 0;   // initial state
 309     private static final int SHUTDOWN = 1;
 310     private static final int CLOSED   = 2;
 311 
 312     // state: set to SHUTDOWN by any thread, set to CLOSED by owner, read by any thread
 313     private volatile int state;
 314 
 315     // Counters to support checking that the task scope owner joins before processing
 316     // results and attempts join before closing the task scope. These counters are
 317     // accessed only by the owner thread.
 318     private int forkRound;         // incremented when the first subtask is forked after join
 319     private int lastJoinAttempted; // set to the current fork round when join is attempted
 320     private int lastJoinCompleted; // set to the current fork round when join completes
 321 
 322     /**
 323      * Represents a subtask forked with {@link #fork(Callable)}.
 324      * @param <T> the result type
 325      * @since 21
 326      */
 327     @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 328     public sealed interface Subtask<T> extends Supplier<T> permits SubtaskImpl {
 329         /**
 330          * {@return the value returning task provided to the {@code fork} method}
 331          *
 332          * @apiNote Task objects with unique identity may be used for correlation by
 333          * implementations of {@link #handleComplete(Subtask) handleComplete}.
 334          */
 335         Callable<? extends T> task();
 336 
 337         /**
 338          * Represents the state of a subtask.
 339          * @see Subtask#state()
 340          * @since 21
 341          */
 342         @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 343         enum State {
 344             /**
 345              * The subtask result or exception is not available. This state indicates that
 346              * the subtask was forked but has not completed, it completed after the task
 347              * scope was {@linkplain #shutdown() shut down}, or it was forked after the
 348              * task scope was shut down.
 349              */
 350             UNAVAILABLE,
 351             /**
 352              * The subtask completed successfully with a result. The {@link Subtask#get()
 353              * Subtask.get()} method can be used to obtain the result. This is a terminal
 354              * state.
 355              */
 356             SUCCESS,
 357             /**
 358              * The subtask failed with an exception. The {@link Subtask#exception()
 359              * Subtask.exception()} method can be used to obtain the exception. This is a
 360              * terminal state.
 361              */
 362             FAILED,
 363         }
 364 
 365         /**
 366          * {@return the state of the subtask}
 367          */
 368         State state();
 369 
 370         /**
 371          * Returns the result of the subtask.
 372          *
 373          * <p> To ensure correct usage, if the scope owner {@linkplain #fork(Callable) forks}
 374          * a subtask, then it must join (with {@link #join() join} or {@link #joinUntil(Instant)
 375          * joinUntil}) before it can obtain the result of the subtask.
 376          *
 377          * @return the possibly-null result
 378          * @throws IllegalStateException if the subtask has not completed, did not complete
 379          * successfully, or the current thread is the task scope owner and did not join
 380          * after forking
 381          * @see State#SUCCESS
 382          */
 383         T get();
 384 
 385         /**
 386          * {@return the exception thrown by the subtask}
 387          *
 388          * <p> To ensure correct usage, if the scope owner {@linkplain #fork(Callable) forks}
 389          * a subtask, then it must join (with {@link #join() join} or {@link #joinUntil(Instant)
 390          * joinUntil}) before it can obtain the exception thrown by the subtask.
 391          *
 392          * @throws IllegalStateException if the subtask has not completed, completed with
 393          * a result, or the current thread is the task scope owner and did not join after
 394          * forking
 395          * @see State#FAILED
 396          */
 397         Throwable exception();
 398     }
 399 
 400     /**
 401      * Creates a structured task scope with the given name and thread factory. The task
 402      * scope is optionally named for the purposes of monitoring and management. The thread
 403      * factory is used to {@link ThreadFactory#newThread(Runnable) create} threads when
 404      * subtasks are {@linkplain #fork(Callable) forked}. The task scope is owned by the
 405      * current thread.
 406      *
 407      * <p> Construction captures the current thread's {@linkplain ScopedValue scoped value}
 408      * bindings for inheritance by threads started in the task scope. The
 409      * <a href="#TreeStructure">Tree Structure</a> section in the class description details
 410      * how parent-child relations are established implicitly for the purpose of inheritance
 411      * of scoped value bindings.
 412      *
 413      * @param name the name of the task scope, can be null
 414      * @param factory the thread factory
 415      */
 416     public StructuredTaskScope(String name, ThreadFactory factory) {
 417         this.factory = Objects.requireNonNull(factory, "'factory' is null");
 418         if (name == null)
 419             name = Objects.toIdentityString(this);
 420         this.flock = ThreadFlock.open(name);
 421     }
 422 
 423     /**
 424      * Creates an unnamed structured task scope that creates virtual threads. The task
 425      * scope is owned by the current thread.
 426      *
 427      * @implSpec This constructor is equivalent to invoking the 2-arg constructor with a
 428      * name of {@code null} and a thread factory that creates virtual threads.
 429      */
 430     public StructuredTaskScope() {
 431         this(null, Thread.ofVirtual().factory());
 432     }
 433 
 434     private IllegalStateException newIllegalStateExceptionScopeClosed() {
 435         return new IllegalStateException("Task scope is closed");
 436     }
 437 
 438     private IllegalStateException newIllegalStateExceptionNoJoin() {
 439         return new IllegalStateException("Owner did not join after forking subtasks");
 440     }
 441 
 442     /**
 443      * Throws IllegalStateException if the scope is closed, returning the state if not
 444      * closed.
 445      */
 446     private int ensureOpen() {
 447         int s = state;
 448         if (s == CLOSED)
 449             throw newIllegalStateExceptionScopeClosed();
 450         return s;
 451     }
 452 
 453     /**
 454      * Throws WrongThreadException if the current thread is not the owner.
 455      */
 456     private void ensureOwner() {
 457         if (Thread.currentThread() != flock.owner())
 458             throw new WrongThreadException("Current thread not owner");
 459     }
 460 
 461     /**
 462      * Throws WrongThreadException if the current thread is not the owner
 463      * or a thread contained in the tree.
 464      */
 465     private void ensureOwnerOrContainsThread() {
 466         Thread currentThread = Thread.currentThread();
 467         if (currentThread != flock.owner() && !flock.containsThread(currentThread))
 468             throw new WrongThreadException("Current thread not owner or thread in the tree");
 469     }
 470 
 471     /**
 472      * Throws IllegalStateException if the current thread is the owner, and the owner did
 473      * not join after forking a subtask in the given fork round.
 474      */
 475     private void ensureJoinedIfOwner(int round) {
 476         if (Thread.currentThread() == flock.owner() && (round > lastJoinCompleted)) {
 477             throw newIllegalStateExceptionNoJoin();
 478         }
 479     }
 480 
 481     /**
 482      * Ensures that the current thread is the owner of this task scope and that it joined
 483      * (with {@link #join()} or {@link #joinUntil(Instant)}) after {@linkplain #fork(Callable)
 484      * forking} subtasks.
 485      *
 486      * @apiNote This method can be used by subclasses that define methods to make available
 487      * results, state, or other outcome to code intended to execute after the join method.
 488      *
 489      * @throws WrongThreadException if the current thread is not the task scope owner
 490      * @throws IllegalStateException if the task scope is open and task scope owner did
 491      * not join after forking
 492      */
 493     protected final void ensureOwnerAndJoined() {
 494         ensureOwner();
 495         if (forkRound > lastJoinCompleted) {
 496             throw newIllegalStateExceptionNoJoin();
 497         }
 498     }
 499 
 500     /**
 501      * Invoked by a subtask when it completes successfully or fails in this task scope.
 502      * This method is not invoked if a subtask completes after the task scope is
 503      * {@linkplain #shutdown() shut down}.
 504      *
 505      * @implSpec The default implementation throws {@code NullPointerException} if the
 506      * subtask is {@code null}. It throws {@link IllegalArgumentException} if the subtask
 507      * has not completed.
 508      *
 509      * @apiNote The {@code handleComplete} method should be thread safe. It may be
 510      * invoked by several threads concurrently.
 511      *
 512      * @param subtask the subtask
 513      *
 514      * @throws IllegalArgumentException if called with a subtask that has not completed
 515      */
 516     protected void handleComplete(Subtask<? extends T> subtask) {
 517         if (subtask.state() == Subtask.State.UNAVAILABLE)
 518             throw new IllegalArgumentException();
 519     }
 520 
 521     /**
 522      * Starts a new thread in this task scope to execute a value-returning task, thus
 523      * creating a <em>subtask</em> of this task scope.
 524      *
 525      * <p> The value-returning task is provided to this method as a {@link Callable}, the
 526      * thread executes the task's {@link Callable#call() call} method. The thread is
 527      * created with the task scope's {@link ThreadFactory}. It inherits the current thread's
 528      * {@linkplain ScopedValue scoped value} bindings. The bindings must match the bindings
 529      * captured when the task scope was created.
 530      *
 531      * <p> This method returns a {@link Subtask Subtask} to represent the <em>forked
 532      * subtask</em>. The {@code Subtask} object can be used to obtain the result when
 533      * the subtask completes successfully, or the exception when the subtask fails. To
 534      * ensure correct usage, the {@link Subtask#get() get()} and {@link Subtask#exception()
 535      * exception()} methods may only be called by the task scope owner after it has waited
 536      * for all threads to finish with the {@link #join() join} or {@link #joinUntil(Instant)}
 537      * methods. When the subtask completes, the thread invokes the {@link
 538      * #handleComplete(Subtask) handleComplete} method to consume the completed subtask.
 539      * If the task scope is {@linkplain #shutdown() shut down} before the subtask completes
 540      * then the {@code handleComplete} method will not be invoked.
 541      *
 542      * <p> If this task scope is {@linkplain #shutdown() shutdown} (or in the process of
 543      * shutting down) then the subtask will not run and the {@code handleComplete} method
 544      * will not be invoked.
 545      *
 546      * <p> This method may only be invoked by the task scope owner or threads contained
 547      * in the task scope.
 548      *
 549      * @implSpec This method may be overridden for customization purposes, wrapping tasks
 550      * for example. If overridden, the subclass must invoke {@code super.fork} to start a
 551      * new thread in this task scope.
 552      *
 553      * @param task the value-returning task for the thread to execute
 554      * @param <U> the result type
 555      * @return the subtask
 556      * @throws IllegalStateException if this task scope is closed
 557      * @throws WrongThreadException if the current thread is not the task scope owner or a
 558      * thread contained in the task scope
 559      * @throws StructureViolationException if the current scoped value bindings are not
 560      * the same as when the task scope was created
 561      * @throws RejectedExecutionException if the thread factory rejected creating a
 562      * thread to run the subtask
 563      */
 564     public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
 565         Objects.requireNonNull(task, "'task' is null");
 566         int s = ensureOpen();   // throws ISE if closed
 567 
 568         // when forked by the owner, the subtask is forked in the current or next round
 569         int round = -1;
 570         if (Thread.currentThread() == flock.owner()) {
 571             round = forkRound;
 572             if (forkRound == lastJoinCompleted) {
 573                 // new round if first fork after join
 574                 round++;
 575             }
 576         }
 577 
 578         SubtaskImpl<U> subtask = new SubtaskImpl<>(this, task, round);
 579         if (s < SHUTDOWN) {
 580             // create thread to run task
 581             Thread thread = factory.newThread(subtask);
 582             if (thread == null) {
 583                 throw new RejectedExecutionException("Rejected by thread factory");
 584             }
 585 
 586             // attempt to start the thread
 587             try {
 588                 flock.start(thread);
 589             } catch (IllegalStateException e) {
 590                 // shutdown by another thread, or underlying flock is shutdown due
 591                 // to unstructured use
 592             }
 593         }
 594 
 595         // force owner to join if this is the first fork in the round
 596         if (Thread.currentThread() == flock.owner() && round > forkRound) {
 597             forkRound = round;
 598         }
 599 
 600         // return forked subtask or a subtask that did not run
 601         return subtask;
 602     }
 603 
 604     /**
 605      * Wait for all threads to finish or the task scope to shut down.
 606      */
 607     private void implJoin(Duration timeout)
 608         throws InterruptedException, TimeoutException
 609     {
 610         ensureOwner();
 611         lastJoinAttempted = forkRound;
 612         int s = ensureOpen();  // throws ISE if closed
 613         if (s == OPEN) {
 614             // wait for all threads, wakeup, interrupt, or timeout
 615             if (timeout != null) {
 616                 flock.awaitAll(timeout);
 617             } else {
 618                 flock.awaitAll();
 619             }
 620         }
 621         lastJoinCompleted = forkRound;
 622     }
 623 
 624     /**
 625      * Wait for all subtasks started in this task scope to finish or the task scope to
 626      * shut down.
 627      *
 628      * <p> This method waits for all subtasks by waiting for all threads {@linkplain
 629      * #fork(Callable) started} in this task scope to finish execution. It stops waiting
 630      * when all threads finish, the task scope is {@linkplain #shutdown() shut down}, or
 631      * the current thread is {@linkplain Thread#interrupt() interrupted}.
 632      *
 633      * <p> This method may only be invoked by the task scope owner.
 634      *
 635      * @implSpec This method may be overridden for customization purposes or to return a
 636      * more specific return type. If overridden, the subclass must invoke {@code
 637      * super.join} to ensure that the method waits for threads in this task scope to
 638      * finish.
 639      *
 640      * @return this task scope
 641      * @throws IllegalStateException if this task scope is closed
 642      * @throws WrongThreadException if the current thread is not the task scope owner
 643      * @throws InterruptedException if interrupted while waiting
 644      */
 645     public StructuredTaskScope<T> join() throws InterruptedException {
 646         try {
 647             implJoin(null);
 648         } catch (TimeoutException e) {
 649             throw new InternalError();
 650         }
 651         return this;
 652     }
 653 
 654     /**
 655      * Wait for all subtasks started in this task scope to finish or the task scope to
 656      * shut down, up to the given deadline.
 657      *
 658      * <p> This method waits for all subtasks by waiting for all threads {@linkplain
 659      * #fork(Callable) started} in this task scope to finish execution. It stops waiting
 660      * when all threads finish, the task scope is {@linkplain #shutdown() shut down}, the
 661      * deadline is reached, or the current thread is {@linkplain Thread#interrupt()
 662      * interrupted}.
 663      *
 664      * <p> This method may only be invoked by the task scope owner.
 665      *
 666      * @implSpec This method may be overridden for customization purposes or to return a
 667      * more specific return type. If overridden, the subclass must invoke {@code
 668      * super.joinUntil} to ensure that the method waits for threads in this task scope to
 669      * finish.
 670      *
 671      * @param deadline the deadline
 672      * @return this task scope
 673      * @throws IllegalStateException if this task scope is closed
 674      * @throws WrongThreadException if the current thread is not the task scope owner
 675      * @throws InterruptedException if interrupted while waiting
 676      * @throws TimeoutException if the deadline is reached while waiting
 677      */
 678     public StructuredTaskScope<T> joinUntil(Instant deadline)
 679         throws InterruptedException, TimeoutException
 680     {
 681         Duration timeout = Duration.between(Instant.now(), deadline);
 682         implJoin(timeout);
 683         return this;
 684     }
 685 
 686     /**
 687      * Interrupt all unfinished threads.
 688      */
 689     private void implInterruptAll() {
 690         flock.threads()
 691             .filter(t -> t != Thread.currentThread())
 692             .forEach(t -> {
 693                 try {
 694                     t.interrupt();
 695                 } catch (Throwable ignore) { }
 696             });
 697     }
 698 
 699     @SuppressWarnings("removal")
 700     private void interruptAll() {
 701         if (System.getSecurityManager() == null) {
 702             implInterruptAll();
 703         } else {
 704             PrivilegedAction<Void> pa = () -> {
 705                 implInterruptAll();
 706                 return null;
 707             };
 708             AccessController.doPrivileged(pa);
 709         }
 710     }
 711 
 712     /**
 713      * Shutdown the task scope if not already shutdown. Return true if this method
 714      * shutdowns the task scope, false if already shutdown.
 715      */
 716     private boolean implShutdown() {
 717         shutdownLock.lock();
 718         try {
 719             if (state < SHUTDOWN) {
 720                 // prevent new threads from starting
 721                 flock.shutdown();
 722 
 723                 // set status before interrupting tasks
 724                 state = SHUTDOWN;
 725 
 726                 // interrupt all unfinished threads
 727                 interruptAll();
 728 
 729                 return true;
 730             } else {
 731                 // already shutdown
 732                 return false;
 733             }
 734         } finally {
 735             shutdownLock.unlock();
 736         }
 737     }
 738 
 739     /**
 740      * Shut down this task scope without closing it. Shutting down a task scope prevents
 741      * new threads from starting, interrupts all unfinished threads, and causes the
 742      * {@link #join() join} method to wakeup. Shutdown is useful for cases where the
 743      * results of unfinished subtasks are no longer needed. It will typically be called
 744      * by the {@link #handleComplete(Subtask)} implementation of a subclass that
 745      * implements a policy to discard unfinished tasks once some outcome is reached.
 746      *
 747      * <p> More specifically, this method:
 748      * <ul>
 749      * <li> {@linkplain Thread#interrupt() Interrupts} all unfinished threads in the
 750      * task scope (except the current thread).
 751      * <li> Wakes up the task scope owner if it is waiting in {@link #join()} or {@link
 752      * #joinUntil(Instant)}. If the task scope owner is not waiting then its next call to
 753      * {@code join} or {@code joinUntil} will return immediately.
 754      * </ul>
 755      *
 756      * <p> The {@linkplain Subtask.State state} of unfinished subtasks that complete at
 757      * around the time that the task scope is shutdown is not defined. A subtask that
 758      * completes successfully with a result, or fails with an exception, at around
 759      * the time that the task scope is shutdown may or may not <i>transition</i> to a
 760      * terminal state.
 761      *
 762      * <p> This method may only be invoked by the task scope owner or threads contained
 763      * in the task scope.
 764      *
 765      * @implSpec This method may be overridden for customization purposes. If overridden,
 766      * the subclass must invoke {@code super.shutdown} to ensure that the method shuts
 767      * down the task scope.
 768      *
 769      * @apiNote
 770      * There may be threads that have not finished because they are executing code that
 771      * did not respond (or respond promptly) to thread interrupt. This method does not wait
 772      * for these threads. When the owner invokes the {@link #close() close} method
 773      * to close the task scope then it will wait for the remaining threads to finish.
 774      *
 775      * @throws IllegalStateException if this task scope is closed
 776      * @throws WrongThreadException if the current thread is not the task scope owner or
 777      * a thread contained in the task scope
 778      * @see #isShutdown()
 779      */
 780     public void shutdown() {
 781         ensureOwnerOrContainsThread();
 782         int s = ensureOpen();  // throws ISE if closed
 783         if (s < SHUTDOWN && implShutdown())
 784             flock.wakeup();
 785     }
 786 
 787     /**
 788      * {@return true if this task scope is shutdown, otherwise false}
 789      * @see #shutdown()
 790      */
 791     public final boolean isShutdown() {
 792         return state >= SHUTDOWN;
 793     }
 794 
 795     /**
 796      * Closes this task scope.
 797      *
 798      * <p> This method first shuts down the task scope (as if by invoking the {@link
 799      * #shutdown() shutdown} method). It then waits for the threads executing any
 800      * unfinished tasks to finish. If interrupted, this method will continue to wait for
 801      * the threads to finish before completing with the interrupt status set.
 802      *
 803      * <p> This method may only be invoked by the task scope owner. If the task scope
 804      * is already closed then the task scope owner invoking this method has no effect.
 805      *
 806      * <p> A {@code StructuredTaskScope} is intended to be used in a <em>structured
 807      * manner</em>. If this method is called to close a task scope before nested task
 808      * scopes are closed then it closes the underlying construct of each nested task scope
 809      * (in the reverse order that they were created in), closes this task scope, and then
 810      * throws {@link StructureViolationException}.
 811      * Similarly, if this method is called to close a task scope while executing with
 812      * {@linkplain ScopedValue scoped value} bindings, and the task scope was created
 813      * before the scoped values were bound, then {@code StructureViolationException} is
 814      * thrown after closing the task scope.
 815      * If a thread terminates without first closing task scopes that it owns then
 816      * termination will cause the underlying construct of each of its open tasks scopes to
 817      * be closed. Closing is performed in the reverse order that the task scopes were
 818      * created in. Thread termination may therefore be delayed when the task scope owner
 819      * has to wait for threads forked in these task scopes to finish.
 820      *
 821      * @implSpec This method may be overridden for customization purposes. If overridden,
 822      * the subclass must invoke {@code super.close} to close the task scope.
 823      *
 824      * @throws IllegalStateException thrown after closing the task scope if the task scope
 825      * owner did not attempt to join after forking
 826      * @throws WrongThreadException if the current thread is not the task scope owner
 827      * @throws StructureViolationException if a structure violation was detected
 828      */
 829     @Override
 830     public void close() {
 831         ensureOwner();
 832         int s = state;
 833         if (s == CLOSED)
 834             return;
 835 
 836         try {
 837             if (s < SHUTDOWN)
 838                 implShutdown();
 839             flock.close();
 840         } finally {
 841             state = CLOSED;
 842         }
 843 
 844         // throw ISE if the owner didn't attempt to join after forking
 845         if (forkRound > lastJoinAttempted) {
 846             lastJoinCompleted = forkRound;
 847             throw newIllegalStateExceptionNoJoin();
 848         }
 849     }
 850 
 851     @Override
 852     public String toString() {
 853         String name = flock.name();
 854         return switch (state) {
 855             case OPEN     -> name;
 856             case SHUTDOWN -> name + "/shutdown";
 857             case CLOSED   -> name + "/closed";
 858             default -> throw new InternalError();
 859         };
 860     }
 861 
 862     /**
 863      * Subtask implementation, runs the task specified to the fork method.
 864      */
 865     private static final class SubtaskImpl<T> implements Subtask<T>, Runnable {
 866         private static final AltResult RESULT_NULL = new AltResult(Subtask.State.SUCCESS);
 867 
 868         private record AltResult(Subtask.State state, Throwable exception) {
 869             AltResult(Subtask.State state) {
 870                 this(state, null);
 871             }
 872         }
 873 
 874         private final StructuredTaskScope<? super T> scope;
 875         private final Callable<? extends T> task;
 876         private final int round;
 877         private volatile Object result;
 878 
 879         SubtaskImpl(StructuredTaskScope<? super T> scope,
 880                     Callable<? extends T> task,
 881                     int round) {
 882             this.scope = scope;
 883             this.task = task;
 884             this.round = round;
 885         }
 886 
 887         @Override
 888         public void run() {
 889             T result = null;
 890             Throwable ex = null;
 891             try {
 892                 result = task.call();
 893             } catch (Throwable e) {
 894                 ex = e;
 895             }
 896 
 897             // nothing to do if task scope is shutdown
 898             if (scope.isShutdown())
 899                 return;
 900 
 901             // capture result or exception, invoke handleComplete
 902             if (ex == null) {
 903                 this.result = (result != null) ? result : RESULT_NULL;
 904             } else {
 905                 this.result = new AltResult(State.FAILED, ex);
 906             }
 907             scope.handleComplete(this);
 908         }
 909 
 910         @Override
 911         public Callable<? extends T> task() {
 912             return task;
 913         }
 914 
 915         @Override
 916         public Subtask.State state() {
 917             Object result = this.result;
 918             if (result == null) {
 919                 return State.UNAVAILABLE;
 920             } else if (result instanceof AltResult alt) {
 921                 // null or failed
 922                 return alt.state();
 923             } else {
 924                 return State.SUCCESS;
 925             }
 926         }
 927 
 928         @Override
 929         public T get() {
 930             scope.ensureJoinedIfOwner(round);
 931             Object result = this.result;
 932             if (result instanceof AltResult) {
 933                 if (result == RESULT_NULL) return null;
 934             } else if (result != null) {
 935                 @SuppressWarnings("unchecked")
 936                 T r = (T) result;
 937                 return r;
 938             }
 939             throw new IllegalStateException(
 940                     "Result is unavailable or subtask did not complete successfully");
 941         }
 942 
 943         @Override
 944         public Throwable exception() {
 945             scope.ensureJoinedIfOwner(round);
 946             Object result = this.result;
 947             if (result instanceof AltResult alt && alt.state() == State.FAILED) {
 948                 return alt.exception();
 949             }
 950             throw new IllegalStateException(
 951                     "Exception is unavailable or subtask did not complete with exception");
 952         }
 953 
 954         @Override
 955         public String toString() {
 956             String stateAsString = switch (state()) {
 957                 case UNAVAILABLE -> "[Unavailable]";
 958                 case SUCCESS     -> "[Completed successfully]";
 959                 case FAILED      -> {
 960                     Throwable ex = ((AltResult) result).exception();
 961                     yield "[Failed: " + ex + "]";
 962                 }
 963             };
 964             return Objects.toIdentityString(this) + stateAsString;
 965         }
 966     }
 967 
 968     /**
 969      * A {@code StructuredTaskScope} that captures the result of the first subtask to
 970      * complete {@linkplain Subtask.State#SUCCESS successfully}. Once captured, it
 971      * {@linkplain #shutdown() shuts down} the task scope to interrupt unfinished threads
 972      * and wakeup the task scope owner. The policy implemented by this class is intended
 973      * for cases where the result of any subtask will do ("invoke any") and where the
 974      * results of other unfinished subtasks are no longer needed.
 975      *
 976      * <p> Unless otherwise specified, passing a {@code null} argument to a method
 977      * in this class will cause a {@link NullPointerException} to be thrown.
 978      *
 979      * @apiNote This class implements a policy to shut down the task scope when a subtask
 980      * completes successfully. There shouldn't be any need to directly shut down the task
 981      * scope with the {@link #shutdown() shutdown} method.
 982      *
 983      * @param <T> the result type
 984      * @since 21
 985      */
 986     @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 987     public static final class ShutdownOnSuccess<T> extends StructuredTaskScope<T> {
 988         private static final Object RESULT_NULL = new Object();
 989         private static final VarHandle FIRST_RESULT;
 990         private static final VarHandle FIRST_EXCEPTION;
 991         static {
 992             try {
 993                 MethodHandles.Lookup l = MethodHandles.lookup();
 994                 FIRST_RESULT = l.findVarHandle(ShutdownOnSuccess.class, "firstResult", Object.class);
 995                 FIRST_EXCEPTION = l.findVarHandle(ShutdownOnSuccess.class, "firstException", Throwable.class);
 996             } catch (Exception e) {
 997                 throw new ExceptionInInitializerError(e);
 998             }
 999         }
1000         private volatile Object firstResult;
1001         private volatile Throwable firstException;
1002 
1003         /**
1004          * Constructs a new {@code ShutdownOnSuccess} with the given name and thread factory.
1005          * The task scope is optionally named for the purposes of monitoring and management.
1006          * The thread factory is used to {@link ThreadFactory#newThread(Runnable) create}
1007          * threads when subtasks are {@linkplain #fork(Callable) forked}. The task scope
1008          * is owned by the current thread.
1009          *
1010          * <p> Construction captures the current thread's {@linkplain ScopedValue scoped
1011          * value} bindings for inheritance by threads started in the task scope. The
1012          * <a href="#TreeStructure">Tree Structure</a> section in the class description
1013          * details how parent-child relations are established implicitly for the purpose
1014          * of inheritance of scoped value bindings.
1015          *
1016          * @param name the name of the task scope, can be null
1017          * @param factory the thread factory
1018          */
1019         public ShutdownOnSuccess(String name, ThreadFactory factory) {
1020             super(name, factory);
1021         }
1022 
1023         /**
1024          * Constructs a new unnamed {@code ShutdownOnSuccess} that creates virtual threads.
1025          *
1026          * @implSpec This constructor is equivalent to invoking the 2-arg constructor with
1027          * a name of {@code null} and a thread factory that creates virtual threads.
1028          */
1029         public ShutdownOnSuccess() {
1030             this(null, Thread.ofVirtual().factory());
1031         }
1032 
1033         @Override
1034         protected void handleComplete(Subtask<? extends T> subtask) {
1035             if (firstResult != null) {
1036                 // already captured a result
1037                 return;
1038             }
1039 
1040             if (subtask.state() == Subtask.State.SUCCESS) {
1041                 // task succeeded
1042                 T result = subtask.get();
1043                 Object r = (result != null) ? result : RESULT_NULL;
1044                 if (FIRST_RESULT.compareAndSet(this, null, r)) {
1045                     super.shutdown();
1046                 }
1047             } else if (firstException == null) {
1048                 // capture the exception thrown by the first subtask that failed
1049                 FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
1050             }
1051         }
1052 
1053         /**
1054          * Wait for a subtask started in this task scope to complete {@linkplain
1055          * Subtask.State#SUCCESS successfully} or all subtasks to complete.
1056          *
1057          * <p> This method waits for all subtasks by waiting for all threads {@linkplain
1058          * #fork(Callable) started} in this task scope to finish execution. It stops waiting
1059          * when all threads finish, a subtask completes successfully, or the current
1060          * thread is {@linkplain Thread#interrupt() interrupted}. It also stops waiting
1061          * if the {@link #shutdown() shutdown} method is invoked directly to shut down
1062          * this task scope.
1063          *
1064          * <p> This method may only be invoked by the task scope owner.
1065          *
1066          * @throws IllegalStateException {@inheritDoc}
1067          * @throws WrongThreadException {@inheritDoc}
1068          */
1069         @Override
1070         public ShutdownOnSuccess<T> join() throws InterruptedException {
1071             super.join();
1072             return this;
1073         }
1074 
1075         /**
1076          * Wait for a subtask started in this task scope to complete {@linkplain
1077          * Subtask.State#SUCCESS successfully} or all subtasks to complete, up to the
1078          * given deadline.
1079          *
1080          * <p> This method waits for all subtasks by waiting for all threads {@linkplain
1081          * #fork(Callable) started} in this task scope to finish execution. It stops waiting
1082          * when all threads finish, a subtask completes successfully, the deadline is
1083          * reached, or the current thread is {@linkplain Thread#interrupt() interrupted}.
1084          * It also stops waiting if the {@link #shutdown() shutdown} method is invoked
1085          * directly to shut down this task scope.
1086          *
1087          * <p> This method may only be invoked by the task scope owner.
1088          *
1089          * @throws IllegalStateException {@inheritDoc}
1090          * @throws WrongThreadException {@inheritDoc}
1091          */
1092         @Override
1093         public ShutdownOnSuccess<T> joinUntil(Instant deadline)
1094             throws InterruptedException, TimeoutException
1095         {
1096             super.joinUntil(deadline);
1097             return this;
1098         }
1099 
1100         /**
1101          * {@return the result of the first subtask that completed {@linkplain
1102          * Subtask.State#SUCCESS successfully}}
1103          *
1104          * <p> When no subtask completed successfully, but a subtask {@linkplain
1105          * Subtask.State#FAILED failed} then {@code ExecutionException} is thrown with
1106          * the subtask's exception as the {@linkplain Throwable#getCause() cause}.
1107          *
1108          * @throws ExecutionException if no subtasks completed successfully but at least
1109          * one subtask failed
1110          * @throws IllegalStateException if no subtasks completed or the task scope owner
1111          * did not join after forking
1112          * @throws WrongThreadException if the current thread is not the task scope owner
1113          */
1114         public T result() throws ExecutionException {
1115             return result(ExecutionException::new);
1116         }
1117 
1118         /**
1119          * Returns the result of the first subtask that completed {@linkplain
1120          * Subtask.State#SUCCESS successfully}, otherwise throws an exception produced
1121          * by the given exception supplying function.
1122          *
1123          * <p> When no subtask completed successfully, but a subtask {@linkplain
1124          * Subtask.State#FAILED failed}, then the exception supplying function is invoked
1125          * with subtask's exception.
1126          *
1127          * @param esf the exception supplying function
1128          * @param <X> type of the exception to be thrown
1129          * @return the result of the first subtask that completed with a result
1130          *
1131          * @throws X if no subtasks completed successfully but at least one subtask failed
1132          * @throws IllegalStateException if no subtasks completed or the task scope owner
1133          * did not join after forking
1134          * @throws WrongThreadException if the current thread is not the task scope owner
1135          */
1136         public <X extends Throwable> T result(Function<Throwable, ? extends X> esf) throws X {
1137             Objects.requireNonNull(esf);
1138             ensureOwnerAndJoined();
1139 
1140             Object result = firstResult;
1141             if (result == RESULT_NULL) {
1142                 return null;
1143             } else if (result != null) {
1144                 @SuppressWarnings("unchecked")
1145                 T r = (T) result;
1146                 return r;
1147             }
1148 
1149             Throwable exception = firstException;
1150             if (exception != null) {
1151                 X ex = esf.apply(exception);
1152                 Objects.requireNonNull(ex, "esf returned null");
1153                 throw ex;
1154             }
1155 
1156             throw new IllegalStateException("No completed subtasks");
1157         }
1158     }
1159 
1160     /**
1161      * A {@code StructuredTaskScope} that captures the exception of the first subtask to
1162      * {@linkplain Subtask.State#FAILED fail}. Once captured, it {@linkplain #shutdown()
1163      * shuts down} the task scope to interrupt unfinished threads and wakeup the task
1164      * scope owner. The policy implemented by this class is intended for cases where the
1165      * results for all subtasks are required ("invoke all"); if any subtask fails then the
1166      * results of other unfinished subtasks are no longer needed.
1167      *
1168      * <p> Unless otherwise specified, passing a {@code null} argument to a method
1169      * in this class will cause a {@link NullPointerException} to be thrown.
1170      *
1171      * @apiNote This class implements a policy to shut down the task scope when a subtask
1172      * fails. There shouldn't be any need to directly shut down the task scope with the
1173      * {@link #shutdown() shutdown} method.
1174      *
1175      * @since 21
1176      */
1177     @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
1178     public static final class ShutdownOnFailure extends StructuredTaskScope<Object> {
1179         private static final VarHandle FIRST_EXCEPTION;
1180         static {
1181             try {
1182                 MethodHandles.Lookup l = MethodHandles.lookup();
1183                 FIRST_EXCEPTION = l.findVarHandle(ShutdownOnFailure.class, "firstException", Throwable.class);
1184             } catch (Exception e) {
1185                 throw new ExceptionInInitializerError(e);
1186             }
1187         }
1188         private volatile Throwable firstException;
1189 
1190         /**
1191          * Constructs a new {@code ShutdownOnFailure} with the given name and thread factory.
1192          * The task scope is optionally named for the purposes of monitoring and management.
1193          * The thread factory is used to {@link ThreadFactory#newThread(Runnable) create}
1194          * threads when subtasks are {@linkplain #fork(Callable) forked}. The task scope
1195          * is owned by the current thread.
1196          *
1197          * <p> Construction captures the current thread's {@linkplain ScopedValue scoped
1198          * value} bindings for inheritance by threads started in the task scope. The
1199          * <a href="#TreeStructure">Tree Structure</a> section in the class description
1200          * details how parent-child relations are established implicitly for the purpose
1201          * of inheritance of scoped value bindings.
1202          *
1203          * @param name the name of the task scope, can be null
1204          * @param factory the thread factory
1205          */
1206         public ShutdownOnFailure(String name, ThreadFactory factory) {
1207             super(name, factory);
1208         }
1209 
1210         /**
1211          * Constructs a new unnamed {@code ShutdownOnFailure} that creates virtual threads.
1212          *
1213          * @implSpec This constructor is equivalent to invoking the 2-arg constructor with
1214          * a name of {@code null} and a thread factory that creates virtual threads.
1215          */
1216         public ShutdownOnFailure() {
1217             this(null, Thread.ofVirtual().factory());
1218         }
1219 
1220         @Override
1221         protected void handleComplete(Subtask<?> subtask) {
1222             if (subtask.state() == Subtask.State.FAILED
1223                     && firstException == null
1224                     && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception())) {
1225                 super.shutdown();
1226             }
1227         }
1228 
1229         /**
1230          * Wait for all subtasks started in this task scope to complete or for a subtask
1231          * to {@linkplain Subtask.State#FAILED fail}.
1232          *
1233          * <p> This method waits for all subtasks by waiting for all threads {@linkplain
1234          * #fork(Callable) started} in this task scope to finish execution. It stops waiting
1235          * when all threads finish, a subtask fails, or the current thread is {@linkplain
1236          * Thread#interrupt() interrupted}. It also stops waiting if the {@link #shutdown()
1237          * shutdown} method is invoked directly to shut down this task scope.
1238          *
1239          * <p> This method may only be invoked by the task scope owner.
1240          *
1241          * @throws IllegalStateException {@inheritDoc}
1242          * @throws WrongThreadException {@inheritDoc}
1243          */
1244         @Override
1245         public ShutdownOnFailure join() throws InterruptedException {
1246             super.join();
1247             return this;
1248         }
1249 
1250         /**
1251          * Wait for all subtasks started in this task scope to complete or for a subtask
1252          * to {@linkplain Subtask.State#FAILED fail}, up to the given deadline.
1253          *
1254          * <p> This method waits for all subtasks by waiting for all threads {@linkplain
1255          * #fork(Callable) started} in this task scope to finish execution. It stops waiting
1256          * when all threads finish, a subtask fails, the deadline is reached, or the current
1257          * thread is {@linkplain Thread#interrupt() interrupted}. It also stops waiting
1258          * if the {@link #shutdown() shutdown} method is invoked directly to shut down
1259          * this task scope.
1260          *
1261          * <p> This method may only be invoked by the task scope owner.
1262          *
1263          * @throws IllegalStateException {@inheritDoc}
1264          * @throws WrongThreadException {@inheritDoc}
1265          */
1266         @Override
1267         public ShutdownOnFailure joinUntil(Instant deadline)
1268             throws InterruptedException, TimeoutException
1269         {
1270             super.joinUntil(deadline);
1271             return this;
1272         }
1273 
1274         /**
1275          * Returns the exception of the first subtask that {@linkplain Subtask.State#FAILED
1276          * failed}. If no subtasks failed then an empty {@code Optional} is returned.
1277          *
1278          * @return the exception for the first subtask to fail or an empty optional if no
1279          * subtasks failed
1280          *
1281          * @throws WrongThreadException if the current thread is not the task scope owner
1282          * @throws IllegalStateException if the task scope owner did not join after forking
1283          */
1284         public Optional<Throwable> exception() {
1285             ensureOwnerAndJoined();
1286             return Optional.ofNullable(firstException);
1287         }
1288 
1289         /**
1290          * Throws if a subtask {@linkplain Subtask.State#FAILED failed}.
1291          * If any subtask failed with an exception then {@code ExecutionException} is
1292          * thrown with the exception of the first subtask to fail as the {@linkplain
1293          * Throwable#getCause() cause}. This method does nothing if no subtasks failed.
1294          *
1295          * @throws ExecutionException if a subtask failed
1296          * @throws WrongThreadException if the current thread is not the task scope owner
1297          * @throws IllegalStateException if the task scope owner did not join after forking
1298          */
1299         public void throwIfFailed() throws ExecutionException {
1300             throwIfFailed(ExecutionException::new);
1301         }
1302 
1303         /**
1304          * Throws the exception produced by the given exception supplying function if a
1305          * subtask {@linkplain Subtask.State#FAILED failed}. If any subtask failed with
1306          * an exception then the function is invoked with the exception of the first
1307          * subtask to fail. The exception returned by the function is thrown. This method
1308          * does nothing if no subtasks failed.
1309          *
1310          * @param esf the exception supplying function
1311          * @param <X> type of the exception to be thrown
1312          *
1313          * @throws X produced by the exception supplying function
1314          * @throws WrongThreadException if the current thread is not the task scope owner
1315          * @throws IllegalStateException if the task scope owner did not join after forking
1316          */
1317         public <X extends Throwable>
1318         void throwIfFailed(Function<Throwable, ? extends X> esf) throws X {
1319             ensureOwnerAndJoined();
1320             Objects.requireNonNull(esf);
1321             Throwable exception = firstException;
1322             if (exception != null) {
1323                 X ex = esf.apply(exception);
1324                 Objects.requireNonNull(ex, "esf returned null");
1325                 throw ex;
1326             }
1327         }
1328     }
1329 }