< prev index next >

src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java

Print this page

  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.concurrent;
  26 
  27 import java.lang.invoke.MethodHandles;
  28 import java.lang.invoke.VarHandle;
  29 import java.security.AccessController;
  30 import java.security.PrivilegedAction;
  31 import java.time.Duration;
  32 import java.time.Instant;


  33 import java.util.Objects;
  34 import java.util.Optional;
  35 import java.util.concurrent.locks.ReentrantLock;
  36 import java.util.function.Function;

  37 import java.util.function.Supplier;

  38 import jdk.internal.javac.PreviewFeature;

  39 import jdk.internal.misc.ThreadFlock;
  40 import jdk.internal.invoke.MhUtil;
  41 
  42 /**
  43  * A basic API for <em>structured concurrency</em>. {@code StructuredTaskScope} supports
  44  * cases where a task splits into several concurrent subtasks, and where the subtasks must
  45  * complete before the main task continues. A {@code StructuredTaskScope} can be used to
  46  * ensure that the lifetime of a concurrent operation is confined by a <em>syntax block</em>,
  47  * just like that of a sequential operation in structured programming.
  48  *
  49  * <h2>Basic operation</h2>














  50  *
  51  * A {@code StructuredTaskScope} is created with one of its public constructors. It defines
  52  * the {@link #fork(Callable) fork} method to start a thread to execute a subtask, the {@link
  53  * #join() join} method to wait for all subtasks to finish, and the {@link #close() close}
  54  * method to close the task scope. The API is intended to be used with the {@code
  55  * try-with-resources} statement. The intention is that code in the try <em>block</em>
  56  * uses the {@code fork} method to fork threads to execute the subtasks, wait for the
  57  * subtasks to finish with the {@code join} method, and then <em>process the results</em>.
  58  * A call to the {@code fork} method returns a {@link Subtask Subtask} to representing
  59  * the <em>forked subtask</em>. Once {@code join} is called, the {@code Subtask} can be
  60  * used to get the result completed successfully, or the exception if the subtask failed.
  61  * {@snippet lang=java :
  62  *     Callable<String> task1 = ...
  63  *     Callable<Integer> task2 = ...
  64  *
  65  *     try (var scope = new StructuredTaskScope<Object>()) {


  66  *
  67  *         Subtask<String> subtask1 = scope.fork(task1);   // @highlight substring="fork"
  68  *         Subtask<Integer> subtask2 = scope.fork(task2);  // @highlight substring="fork"
  69  *
  70  *         scope.join();                                   // @highlight substring="join"


  71  *
  72  *         ... process results/exceptions ...
  73  *
  74  *     } // close                                          // @highlight substring="close"
  75  * }
  76  * <p> The following example forks a collection of homogeneous subtasks, waits for all of
  77  * them to complete with the {@code join} method, and uses the {@link Subtask.State
  78  * Subtask.State} to partition the subtasks into a set of the subtasks that completed
  79  * successfully and another for the subtasks that failed.
  80  * {@snippet lang=java :
  81  *     List<Callable<String>> callables = ...
  82  *
  83  *     try (var scope = new StructuredTaskScope<String>()) {





  84  *
  85  *         List<Subtask<String>> subtasks = callables.stream().map(scope::fork).toList();



  86  *
  87  *         scope.join();
  88  *
  89  *         Map<Boolean, Set<Subtask<String>>> map = subtasks.stream()
  90  *                 .collect(Collectors.partitioningBy(h -> h.state() == Subtask.State.SUCCESS,
  91  *                                                    Collectors.toSet()));





  92  *
  93  *     } // close
  94  * }





























  95  *
  96  * <p> To ensure correct usage, the {@code join} and {@code close} methods may only be
  97  * invoked by the <em>owner</em> (the thread that opened/created the task scope), and the
  98  * {@code close} method throws an exception after closing if the owner did not invoke the
  99  * {@code join} method after forking.
 100  *
 101  * <p> {@code StructuredTaskScope} defines the {@link #shutdown() shutdown} method to shut
 102  * down a task scope without closing it. The {@code shutdown()} method <em>cancels</em> all
 103  * unfinished subtasks by {@linkplain Thread#interrupt() interrupting} the threads. It
 104  * prevents new threads from starting in the task scope. If the owner is waiting in the
 105  * {@code join} method then it will wakeup.






 106  *
 107  * <p> Shutdown is used for <em>short-circuiting</em> and allow subclasses to implement
 108  * <em>policy</em> that does not require all subtasks to finish.




 109  *
 110  * <h2>Subclasses with policies for common cases</h2>
 111  *
 112  * Two subclasses of {@code StructuredTaskScope} are defined to implement policy for
 113  * common cases:
 114  * <ol>
 115  *   <li> {@link ShutdownOnSuccess ShutdownOnSuccess} captures the result of the first
 116  *   subtask to complete successfully. Once captured, it shuts down the task scope to
 117  *   interrupt unfinished threads and wakeup the owner. This class is intended for cases
 118  *   where the result of any subtask will do ("invoke any") and where there is no need to
 119  *   wait for results of other unfinished subtasks. It defines methods to get the first
 120  *   result or throw an exception if all subtasks fail.
 121  *   <li> {@link ShutdownOnFailure ShutdownOnFailure} captures the exception of the first
 122  *   subtask to fail. Once captured, it shuts down the task scope to interrupt unfinished
 123  *   threads and wakeup the owner. This class is intended for cases where the results of all
 124  *   subtasks are required ("invoke all"); if any subtask fails then the results of other
 125  *   unfinished subtasks are no longer needed. If defines methods to throw an exception if
 126  *   any of the subtasks fail.
 127  * </ol>
 128  *
 129  * <p> The following are two examples that use the two classes. In both cases, a pair of
 130  * subtasks are forked to fetch resources from two URL locations "left" and "right". The
 131  * first example creates a ShutdownOnSuccess object to capture the result of the first
 132  * subtask to complete successfully, cancelling the other by way of shutting down the task
 133  * scope. The main task waits in {@code join} until either subtask completes with a result
 134  * or both subtasks fail. It invokes {@link ShutdownOnSuccess#result(Function)
 135  * result(Function)} method to get the captured result. If both subtasks fail then this
 136  * method throws a {@code WebApplicationException} with the exception from one of the
 137  * subtasks as the cause.
 138  * {@snippet lang=java :
 139  *     try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
 140  *
 141  *         scope.fork(() -> fetch(left));
 142  *         scope.fork(() -> fetch(right));
 143  *
 144  *         scope.join();
 145  *
 146  *         // @link regex="result(?=\()" target="ShutdownOnSuccess#result" :
 147  *         String result = scope.result(e -> new WebApplicationException(e));



 148  *
 149  *         ...
 150  *     }
 151  * }
 152  * The second example creates a ShutdownOnFailure object to capture the exception of the
 153  * first subtask to fail, cancelling the other by way of shutting down the task scope. The
 154  * main task waits in {@link #joinUntil(Instant)} until both subtasks complete with a
 155  * result, either fails, or a deadline is reached. It invokes {@link
 156  * ShutdownOnFailure#throwIfFailed(Function) throwIfFailed(Function)} to throw an exception
 157  * if either subtask fails. This method is a no-op if both subtasks complete successfully.
 158  * The example uses {@link Supplier#get()} to get the result of each subtask. Using
 159  * {@code Supplier} instead of {@code Subtask} is preferred for common cases where the
 160  * object returned by fork is only used to get the result of a subtask that completed
 161  * successfully.
 162  * {@snippet lang=java :
 163  *    Instant deadline = ...
 164  *
 165  *    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {


 166  *
 167  *         Supplier<String> supplier1 = scope.fork(() -> query(left));
 168  *         Supplier<String> supplier2 = scope.fork(() -> query(right));
 169  *
 170  *         scope.joinUntil(deadline);


 171  *
 172  *         // @link substring="throwIfFailed" target="ShutdownOnFailure#throwIfFailed" :
 173  *         scope.throwIfFailed(e -> new WebApplicationException(e));



 174  *
 175  *         // both subtasks completed successfully
 176  *         String result = Stream.of(supplier1, supplier2)
 177  *                 .map(Supplier::get)
 178  *                 .collect(Collectors.joining(", ", "{ ", " }"));



 179  *
 180  *         ...
 181  *     }
 182  * }






 183  *
 184  * <h2>Extending StructuredTaskScope</h2>

 185  *
 186  * {@code StructuredTaskScope} can be extended, and the {@link #handleComplete(Subtask)
 187  * handleComplete} method overridden, to implement policies other than those implemented
 188  * by {@code ShutdownOnSuccess} and {@code ShutdownOnFailure}. A subclass may, for example,
 189  * collect the results of subtasks that complete successfully and ignore subtasks that
 190  * fail. It may collect exceptions when subtasks fail. It may invoke the {@link #shutdown()
 191  * shutdown} method to shut down and cause {@link #join() join} to wakeup when some
 192  * condition arises.
 193  *
 194  * <p> A subclass will typically define methods to make available results, state, or other
 195  * outcome to code that executes after the {@code join} method. A subclass that collects
 196  * results and ignores subtasks that fail may define a method that returns the results.
 197  * A subclass that implements a policy to shut down when a subtask fails may define a
 198  * method to get the exception of the first subtask to fail.
 199  *
 200  * <p> The following is an example of a simple {@code StructuredTaskScope} implementation
 201  * that collects homogenous subtasks that complete successfully. It defines the method
 202  * "{@code completedSuccessfully()}" that the main task can invoke after it joins.


 203  * {@snippet lang=java :
 204  *     class CollectingScope<T> extends StructuredTaskScope<T> {
 205  *         private final Queue<Subtask<? extends T>> subtasks = new LinkedTransferQueue<>();
 206  *
 207  *         @Override
 208  *         protected void handleComplete(Subtask<? extends T> subtask) {
 209  *             if (subtask.state() == Subtask.State.SUCCESS) {
 210  *                 subtasks.add(subtask);
 211  *             }
 212  *         }
 213  *
 214  *         @Override
 215  *         public CollectingScope<T> join() throws InterruptedException {
 216  *             super.join();
 217  *             return this;
 218  *         }
 219  *
 220  *         public Stream<Subtask<? extends T>> completedSuccessfully() {
 221  *             // @link substring="ensureOwnerAndJoined" target="ensureOwnerAndJoined" :
 222  *             super.ensureOwnerAndJoined();
 223  *             return subtasks.stream();
 224  *         }
 225  *     }
 226  * }
 227  * <p> The implementations of the {@code completedSuccessfully()} method in the example
 228  * invokes {@link #ensureOwnerAndJoined()} to ensure that the method can only be invoked
 229  * by the owner thread and only after it has joined.
 230  *
 231  * <h2><a id="TreeStructure">Tree structure</a></h2>





 232  *
 233  * Task scopes form a tree where parent-child relations are established implicitly when
 234  * opening a new task scope:
 235  * <ul>
 236  *   <li> A parent-child relation is established when a thread started in a task scope
 237  *   opens its own task scope. A thread started in task scope "A" that opens task scope
 238  *   "B" establishes a parent-child relation where task scope "A" is the parent of task
 239  *   scope "B".
 240  *   <li> A parent-child relation is established with nesting. If a thread opens task
 241  *   scope "B", then opens task scope "C" (before it closes "B"), then the enclosing task
 242  *   scope "B" is the parent of the nested task scope "C".
 243  * </ul>
 244  *
 245  * The <i>descendants</i> of a task scope are the child task scopes that it is a parent
 246  * of, plus the descendants of the child task scopes, recursively.





 247  *
 248  * <p> The tree structure supports:
 249  * <ul>
 250  *   <li> Inheritance of {@linkplain ScopedValue scoped values} across threads.
 251  *   <li> Confinement checks. The phrase "threads contained in the task scope" in method
 252  *   descriptions means threads started in the task scope or descendant scopes.
 253  * </ul>
 254  *
 255  * <p> The following example demonstrates the inheritance of a scoped value. A scoped
 256  * value {@code USERNAME} is bound to the value "{@code duke}". A {@code StructuredTaskScope}
 257  * is created and its {@code fork} method invoked to start a thread to execute {@code
 258  * childTask}. The thread inherits the scoped value <em>bindings</em> captured when
 259  * creating the task scope. The code in {@code childTask} uses the value of the scoped
 260  * value and so reads the value "{@code duke}".

 261  * {@snippet lang=java :

 262  *     private static final ScopedValue<String> USERNAME = ScopedValue.newInstance();
 263  *
 264  *     // @link substring="runWhere" target="ScopedValue#runWhere(ScopedValue, Object, Runnable)" :
 265  *     ScopedValue.runWhere(USERNAME, "duke", () -> {
 266  *         try (var scope = new StructuredTaskScope<String>()) {
 267  *
 268  *             scope.fork(() -> childTask());           // @highlight substring="fork"
 269  *             ...
 270  *          }
 271  *     });
 272  *
 273  *     ...

 274  *
 275  *     String childTask() {
 276  *         // @link substring="get" target="ScopedValue#get()" :
 277  *         String name = USERNAME.get();   // "duke"
 278  *         ...
 279  *     }
 280  * }
 281  *
 282  * <p> {@code StructuredTaskScope} does not define APIs that exposes the tree structure
 283  * at this time.



 284  *
 285  * <p> Unless otherwise specified, passing a {@code null} argument to a constructor
 286  * or method in this class will cause a {@link NullPointerException} to be thrown.






 287  *
 288  * <h2>Memory consistency effects</h2>
 289  *
 290  * <p> Actions in the owner thread of, or a thread contained in, the task scope prior to
 291  * {@linkplain #fork forking} of a subtask
 292  * <a href="{@docRoot}/java.base/java/util/concurrent/package-summary.html#MemoryVisibility">
 293  * <i>happen-before</i></a> any actions taken by that subtask, which in turn <i>happen-before</i>
 294  * the subtask result is {@linkplain Subtask#get() retrieved} or <i>happen-before</i> any
 295  * actions taken in a thread after {@linkplain #join() joining} of the task scope.
 296  *
 297  * @jls 17.4.5 Happens-before Order



 298  *
 299  * @param <T> the result type of tasks executed in the task scope



 300  * @since 21
 301  */
 302 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 303 public class StructuredTaskScope<T> implements AutoCloseable {
 304     private final ThreadFactory factory;







 305     private final ThreadFlock flock;
 306     private final ReentrantLock shutdownLock = new ReentrantLock();
 307 
 308     // states: OPEN -> SHUTDOWN -> CLOSED
 309     private static final int OPEN     = 0;   // initial state
 310     private static final int SHUTDOWN = 1;
 311     private static final int CLOSED   = 2;





















































































































 312 
 313     // state: set to SHUTDOWN by any thread, set to CLOSED by owner, read by any thread
 314     private volatile int state;







 315 
 316     // Counters to support checking that the task scope owner joins before processing
 317     // results and attempts join before closing the task scope. These counters are
 318     // accessed only by the owner thread.
 319     private int forkRound;         // incremented when the first subtask is forked after join
 320     private int lastJoinAttempted; // set to the current fork round when join is attempted
 321     private int lastJoinCompleted; // set to the current fork round when join completes
 322 
 323     /**
 324      * Represents a subtask forked with {@link #fork(Callable)}.






 325      * @param <T> the result type
 326      * @since 21
 327      */
 328     @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 329     public sealed interface Subtask<T> extends Supplier<T> permits SubtaskImpl {
 330         /**
 331          * {@return the value returning task provided to the {@code fork} method}
 332          *
 333          * @apiNote Task objects with unique identity may be used for correlation by
 334          * implementations of {@link #handleComplete(Subtask) handleComplete}.
 335          */
 336         Callable<? extends T> task();
 337 
 338         /**
 339          * Represents the state of a subtask.
 340          * @see Subtask#state()
 341          * @since 21
 342          */
 343         @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 344         enum State {
 345             /**
 346              * The subtask result or exception is not available. This state indicates that
 347              * the subtask was forked but has not completed, it completed after the task
 348              * scope was {@linkplain #shutdown() shut down}, or it was forked after the
 349              * task scope was shut down.
 350              */
 351             UNAVAILABLE,
 352             /**
 353              * The subtask completed successfully with a result. The {@link Subtask#get()
 354              * Subtask.get()} method can be used to obtain the result. This is a terminal
 355              * state.
 356              */
 357             SUCCESS,
 358             /**
 359              * The subtask failed with an exception. The {@link Subtask#exception()
 360              * Subtask.exception()} method can be used to obtain the exception. This is a
 361              * terminal state.
 362              */
 363             FAILED,
 364         }
 365 
 366         /**
 367          * {@return the state of the subtask}
 368          */
 369         State state();
 370 
 371         /**
 372          * Returns the result of the subtask.




 373          *
 374          * <p> To ensure correct usage, if the scope owner {@linkplain #fork(Callable) forks}
 375          * a subtask, then it must join (with {@link #join() join} or {@link #joinUntil(Instant)
 376          * joinUntil}) before it can obtain the result of the subtask.



 377          *
 378          * @return the possibly-null result
 379          * @throws IllegalStateException if the subtask has not completed, did not complete
 380          * successfully, or the current thread is the task scope owner and did not join
 381          * after forking
 382          * @see State#SUCCESS
 383          */
 384         T get();
 385 
 386         /**
 387          * {@return the exception thrown by the subtask}








 388          *
 389          * <p> To ensure correct usage, if the scope owner {@linkplain #fork(Callable) forks}
 390          * a subtask, then it must join (with {@link #join() join} or {@link #joinUntil(Instant)
 391          * joinUntil}) before it can obtain the exception thrown by the subtask.
 392          *
 393          * @throws IllegalStateException if the subtask has not completed, completed with
 394          * a result, or the current thread is the task scope owner and did not join after
 395          * forking
 396          * @see State#FAILED
 397          */
 398         Throwable exception();
 399     }
 400 
 401     /**
 402      * Creates a structured task scope with the given name and thread factory. The task
 403      * scope is optionally named for the purposes of monitoring and management. The thread
 404      * factory is used to {@link ThreadFactory#newThread(Runnable) create} threads when
 405      * subtasks are {@linkplain #fork(Callable) forked}. The task scope is owned by the
 406      * current thread.














 407      *
 408      * <p> Construction captures the current thread's {@linkplain ScopedValue scoped value}
 409      * bindings for inheritance by threads started in the task scope. The
 410      * <a href="#TreeStructure">Tree Structure</a> section in the class description details
 411      * how parent-child relations are established implicitly for the purpose of inheritance
 412      * of scoped value bindings.

 413      *
 414      * @param name the name of the task scope, can be null
 415      * @param factory the thread factory





























 416      */
 417     @SuppressWarnings("this-escape")
 418     public StructuredTaskScope(String name, ThreadFactory factory) {
 419         this.factory = Objects.requireNonNull(factory, "'factory' is null");
 420         if (name == null)
 421             name = Objects.toIdentityString(this);
 422         this.flock = ThreadFlock.open(name);



























































































































































































 423     }
 424 
 425     /**
 426      * Creates an unnamed structured task scope that creates virtual threads. The task
 427      * scope is owned by the current thread.



















 428      *
 429      * @implSpec This constructor is equivalent to invoking the 2-arg constructor with a
 430      * name of {@code null} and a thread factory that creates virtual threads.
 431      */
 432     public StructuredTaskScope() {
 433         this(null, Thread.ofVirtual().factory());
 434     }













 435 
 436     private IllegalStateException newIllegalStateExceptionScopeClosed() {
 437         return new IllegalStateException("Task scope is closed");
 438     }





 439 
 440     private IllegalStateException newIllegalStateExceptionNoJoin() {
 441         return new IllegalStateException("Owner did not join after forking subtasks");










 442     }
 443 
 444     /**
 445      * Throws IllegalStateException if the scope is closed, returning the state if not
 446      * closed.


 447      */
 448     private int ensureOpen() {
 449         int s = state;
 450         if (s == CLOSED)
 451             throw newIllegalStateExceptionScopeClosed();
 452         return s;
 453     }
 454 
 455     /**
 456      * Throws WrongThreadException if the current thread is not the owner.
 457      */
 458     private void ensureOwner() {
 459         if (Thread.currentThread() != flock.owner())
 460             throw new WrongThreadException("Current thread not owner");


 461     }
 462 
 463     /**
 464      * Throws WrongThreadException if the current thread is not the owner
 465      * or a thread contained in the tree.



 466      */
 467     private void ensureOwnerOrContainsThread() {
 468         Thread currentThread = Thread.currentThread();
 469         if (currentThread != flock.owner() && !flock.containsThread(currentThread))
 470             throw new WrongThreadException("Current thread not owner or thread in the tree");





 471     }
 472 
 473     /**
 474      * Throws IllegalStateException if the current thread is the owner, and the owner did
 475      * not join after forking a subtask in the given fork round.






























 476      */
 477     private void ensureJoinedIfOwner(int round) {
 478         if (Thread.currentThread() == flock.owner() && (round > lastJoinCompleted)) {
 479             throw newIllegalStateExceptionNoJoin();
















 480         }


 481     }
 482 
 483     /**
 484      * Ensures that the current thread is the owner of this task scope and that it joined
 485      * (with {@link #join()} or {@link #joinUntil(Instant)}) after {@linkplain #fork(Callable)
 486      * forking} subtasks.


 487      *
 488      * @apiNote This method can be used by subclasses that define methods to make available
 489      * results, state, or other outcome to code intended to execute after the join method.

 490      *
 491      * @throws WrongThreadException if the current thread is not the task scope owner
 492      * @throws IllegalStateException if the task scope is open and task scope owner did
 493      * not join after forking


 494      */
 495     protected final void ensureOwnerAndJoined() {
 496         ensureOwner();
 497         if (forkRound > lastJoinCompleted) {
 498             throw newIllegalStateExceptionNoJoin();
 499         }
 500     }
 501 
 502     /**
 503      * Invoked by a subtask when it completes successfully or fails in this task scope.
 504      * This method is not invoked if a subtask completes after the task scope is
 505      * {@linkplain #shutdown() shut down}.
 506      *
 507      * @implSpec The default implementation throws {@code NullPointerException} if the
 508      * subtask is {@code null}. It throws {@link IllegalArgumentException} if the subtask
 509      * has not completed.
 510      *
 511      * @apiNote The {@code handleComplete} method should be thread safe. It may be
 512      * invoked by several threads concurrently.


 513      *
 514      * @param subtask the subtask



 515      *
 516      * @throws IllegalArgumentException if called with a subtask that has not completed


 517      */
 518     protected void handleComplete(Subtask<? extends T> subtask) {
 519         if (subtask.state() == Subtask.State.UNAVAILABLE)
 520             throw new IllegalArgumentException();
 521     }
 522 
 523     /**
 524      * Starts a new thread in this task scope to execute a value-returning task, thus
 525      * creating a <em>subtask</em> of this task scope.
 526      *
 527      * <p> The value-returning task is provided to this method as a {@link Callable}, the
 528      * thread executes the task's {@link Callable#call() call} method. The thread is
 529      * created with the task scope's {@link ThreadFactory}. It inherits the current thread's
 530      * {@linkplain ScopedValue scoped value} bindings. The bindings must match the bindings
 531      * captured when the task scope was created.
 532      *
 533      * <p> This method returns a {@link Subtask Subtask} to represent the <em>forked
 534      * subtask</em>. The {@code Subtask} object can be used to obtain the result when
 535      * the subtask completes successfully, or the exception when the subtask fails. To
 536      * ensure correct usage, the {@link Subtask#get() get()} and {@link Subtask#exception()
 537      * exception()} methods may only be called by the task scope owner after it has waited
 538      * for all threads to finish with the {@link #join() join} or {@link #joinUntil(Instant)}
 539      * methods. When the subtask completes, the thread invokes the {@link
 540      * #handleComplete(Subtask) handleComplete} method to consume the completed subtask.
 541      * If the task scope is {@linkplain #shutdown() shut down} before the subtask completes
 542      * then the {@code handleComplete} method will not be invoked.
 543      *
 544      * <p> If this task scope is {@linkplain #shutdown() shutdown} (or in the process of
 545      * shutting down) then the subtask will not run and the {@code handleComplete} method
 546      * will not be invoked.












 547      *
 548      * <p> This method may only be invoked by the task scope owner or threads contained
 549      * in the task scope.







 550      *
 551      * @implSpec This method may be overridden for customization purposes, wrapping tasks
 552      * for example. If overridden, the subclass must invoke {@code super.fork} to start a
 553      * new thread in this task scope.
 554      *
 555      * @param task the value-returning task for the thread to execute
 556      * @param <U> the result type
 557      * @return the subtask
 558      * @throws IllegalStateException if this task scope is closed
 559      * @throws WrongThreadException if the current thread is not the task scope owner or a
 560      * thread contained in the task scope
 561      * @throws StructureViolationException if the current scoped value bindings are not
 562      * the same as when the task scope was created
 563      * @throws RejectedExecutionException if the thread factory rejected creating a
 564      * thread to run the subtask
 565      */
 566     public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
 567         Objects.requireNonNull(task, "'task' is null");
 568         int s = ensureOpen();   // throws ISE if closed
 569 
 570         // when forked by the owner, the subtask is forked in the current or next round
 571         int round = -1;
 572         if (Thread.currentThread() == flock.owner()) {
 573             round = forkRound;
 574             if (forkRound == lastJoinCompleted) {
 575                 // new round if first fork after join
 576                 round++;
 577             }
 578         }
 579 
 580         SubtaskImpl<U> subtask = new SubtaskImpl<>(this, task, round);
 581         if (s < SHUTDOWN) {
 582             // create thread to run task
 583             Thread thread = factory.newThread(subtask);
 584             if (thread == null) {
 585                 throw new RejectedExecutionException("Rejected by thread factory");
 586             }
 587 
 588             // attempt to start the thread
 589             try {
 590                 flock.start(thread);
 591             } catch (IllegalStateException e) {
 592                 // shutdown by another thread, or underlying flock is shutdown due
 593                 // to unstructured use
 594             }
 595         }
 596 
 597         // force owner to join if this is the first fork in the round
 598         if (Thread.currentThread() == flock.owner() && round > forkRound) {
 599             forkRound = round;
 600         }
 601 
 602         // return forked subtask or a subtask that did not run
 603         return subtask;
 604     }
 605 
 606     /**
 607      * Wait for all threads to finish or the task scope to shut down.
 608      */
 609     private void implJoin(Duration timeout)
 610         throws InterruptedException, TimeoutException
 611     {
 612         ensureOwner();
 613         lastJoinAttempted = forkRound;
 614         int s = ensureOpen();  // throws ISE if closed
 615         if (s == OPEN) {
 616             // wait for all threads, wakeup, interrupt, or timeout
 617             if (timeout != null) {
 618                 flock.awaitAll(timeout);
 619             } else {
 620                 flock.awaitAll();
 621             }
 622         }
 623         lastJoinCompleted = forkRound;
 624     }
 625 
 626     /**
 627      * Wait for all subtasks started in this task scope to finish or the task scope to
 628      * shut down.
 629      *
 630      * <p> This method waits for all subtasks by waiting for all threads {@linkplain
 631      * #fork(Callable) started} in this task scope to finish execution. It stops waiting
 632      * when all threads finish, the task scope is {@linkplain #shutdown() shut down}, or
 633      * the current thread is {@linkplain Thread#interrupt() interrupted}.
 634      *
 635      * <p> This method may only be invoked by the task scope owner.
 636      *
 637      * @implSpec This method may be overridden for customization purposes or to return a
 638      * more specific return type. If overridden, the subclass must invoke {@code
 639      * super.join} to ensure that the method waits for threads in this task scope to
 640      * finish.
 641      *
 642      * @return this task scope
 643      * @throws IllegalStateException if this task scope is closed
 644      * @throws WrongThreadException if the current thread is not the task scope owner
 645      * @throws InterruptedException if interrupted while waiting






 646      */
 647     public StructuredTaskScope<T> join() throws InterruptedException {
 648         try {
 649             implJoin(null);
 650         } catch (TimeoutException e) {
 651             throw new InternalError();
 652         }
 653         return this;
 654     }
 655 
 656     /**
 657      * Wait for all subtasks started in this task scope to finish or the task scope to
 658      * shut down, up to the given deadline.





 659      *
 660      * <p> This method waits for all subtasks by waiting for all threads {@linkplain
 661      * #fork(Callable) started} in this task scope to finish execution. It stops waiting
 662      * when all threads finish, the task scope is {@linkplain #shutdown() shut down}, the
 663      * deadline is reached, or the current thread is {@linkplain Thread#interrupt()
 664      * interrupted}.
 665      *
 666      * <p> This method may only be invoked by the task scope owner.
 667      *
 668      * @implSpec This method may be overridden for customization purposes or to return a
 669      * more specific return type. If overridden, the subclass must invoke {@code
 670      * super.joinUntil} to ensure that the method waits for threads in this task scope to
 671      * finish.
 672      *
 673      * @param deadline the deadline
 674      * @return this task scope
 675      * @throws IllegalStateException if this task scope is closed
 676      * @throws WrongThreadException if the current thread is not the task scope owner





 677      * @throws InterruptedException if interrupted while waiting
 678      * @throws TimeoutException if the deadline is reached while waiting
 679      */
 680     public StructuredTaskScope<T> joinUntil(Instant deadline)
 681         throws InterruptedException, TimeoutException
 682     {
 683         Duration timeout = Duration.between(Instant.now(), deadline);
 684         implJoin(timeout);
 685         return this;
 686     }
 687 
 688     /**
 689      * Interrupt all unfinished threads.
 690      */
 691     private void implInterruptAll() {
 692         flock.threads()
 693             .filter(t -> t != Thread.currentThread())
 694             .forEach(t -> {
 695                 try {
 696                     t.interrupt();
 697                 } catch (Throwable ignore) { }
 698             });
 699     }
 700 
 701     @SuppressWarnings("removal")
 702     private void interruptAll() {
 703         if (System.getSecurityManager() == null) {
 704             implInterruptAll();
 705         } else {
 706             PrivilegedAction<Void> pa = () -> {
 707                 implInterruptAll();
 708                 return null;
 709             };
 710             AccessController.doPrivileged(pa);
 711         }
 712     }
 713 
 714     /**
 715      * Shutdown the task scope if not already shutdown. Return true if this method
 716      * shutdowns the task scope, false if already shutdown.
 717      */
 718     private boolean implShutdown() {
 719         shutdownLock.lock();
 720         try {
 721             if (state < SHUTDOWN) {
 722                 // prevent new threads from starting
 723                 flock.shutdown();
 724 
 725                 // set status before interrupting tasks
 726                 state = SHUTDOWN;



 727 
 728                 // interrupt all unfinished threads
 729                 interruptAll();
 730 
 731                 return true;
 732             } else {
 733                 // already shutdown
 734                 return false;
 735             }
 736         } finally {
 737             shutdownLock.unlock();
 738         }
 739     }
 740 
 741     /**
 742      * Shut down this task scope without closing it. Shutting down a task scope prevents
 743      * new threads from starting, interrupts all unfinished threads, and causes the
 744      * {@link #join() join} method to wakeup. Shutdown is useful for cases where the
 745      * results of unfinished subtasks are no longer needed. It will typically be called
 746      * by the {@link #handleComplete(Subtask)} implementation of a subclass that
 747      * implements a policy to discard unfinished tasks once some outcome is reached.
 748      *
 749      * <p> More specifically, this method:
 750      * <ul>
 751      * <li> {@linkplain Thread#interrupt() Interrupts} all unfinished threads in the
 752      * task scope (except the current thread).
 753      * <li> Wakes up the task scope owner if it is waiting in {@link #join()} or {@link
 754      * #joinUntil(Instant)}. If the task scope owner is not waiting then its next call to
 755      * {@code join} or {@code joinUntil} will return immediately.
 756      * </ul>
 757      *
 758      * <p> The {@linkplain Subtask.State state} of unfinished subtasks that complete at
 759      * around the time that the task scope is shutdown is not defined. A subtask that
 760      * completes successfully with a result, or fails with an exception, at around
 761      * the time that the task scope is shutdown may or may not <i>transition</i> to a
 762      * terminal state.
 763      *
 764      * <p> This method may only be invoked by the task scope owner or threads contained
 765      * in the task scope.



 766      *
 767      * @implSpec This method may be overridden for customization purposes. If overridden,
 768      * the subclass must invoke {@code super.shutdown} to ensure that the method shuts
 769      * down the task scope.

 770      *
 771      * @apiNote
 772      * There may be threads that have not finished because they are executing code that
 773      * did not respond (or respond promptly) to thread interrupt. This method does not wait
 774      * for these threads. When the owner invokes the {@link #close() close} method
 775      * to close the task scope then it will wait for the remaining threads to finish.
 776      *
 777      * @throws IllegalStateException if this task scope is closed
 778      * @throws WrongThreadException if the current thread is not the task scope owner or
 779      * a thread contained in the task scope
 780      * @see #isShutdown()
 781      */
 782     public void shutdown() {
 783         ensureOwnerOrContainsThread();
 784         int s = ensureOpen();  // throws ISE if closed
 785         if (s < SHUTDOWN && implShutdown())
 786             flock.wakeup();
 787     }
 788 
 789     /**
 790      * {@return true if this task scope is shutdown, otherwise false}
 791      * @see #shutdown()
 792      */
 793     public final boolean isShutdown() {
 794         return state >= SHUTDOWN;
 795     }
 796 
 797     /**
 798      * Closes this task scope.
 799      *
 800      * <p> This method first shuts down the task scope (as if by invoking the {@link
 801      * #shutdown() shutdown} method). It then waits for the threads executing any
 802      * unfinished tasks to finish. If interrupted, this method will continue to wait for
 803      * the threads to finish before completing with the interrupt status set.

 804      *
 805      * <p> This method may only be invoked by the task scope owner. If the task scope
 806      * is already closed then the task scope owner invoking this method has no effect.
 807      *
 808      * <p> A {@code StructuredTaskScope} is intended to be used in a <em>structured
 809      * manner</em>. If this method is called to close a task scope before nested task
 810      * scopes are closed then it closes the underlying construct of each nested task scope
 811      * (in the reverse order that they were created in), closes this task scope, and then
 812      * throws {@link StructureViolationException}.
 813      * Similarly, if this method is called to close a task scope while executing with
 814      * {@linkplain ScopedValue scoped value} bindings, and the task scope was created
 815      * before the scoped values were bound, then {@code StructureViolationException} is
 816      * thrown after closing the task scope.
 817      * If a thread terminates without first closing task scopes that it owns then
 818      * termination will cause the underlying construct of each of its open tasks scopes to
 819      * be closed. Closing is performed in the reverse order that the task scopes were
 820      * created in. Thread termination may therefore be delayed when the task scope owner
 821      * has to wait for threads forked in these task scopes to finish.
 822      *
 823      * @implSpec This method may be overridden for customization purposes. If overridden,
 824      * the subclass must invoke {@code super.close} to close the task scope.
 825      *
 826      * @throws IllegalStateException thrown after closing the task scope if the task scope
 827      * owner did not attempt to join after forking
 828      * @throws WrongThreadException if the current thread is not the task scope owner
 829      * @throws StructureViolationException if a structure violation was detected
 830      */
 831     @Override
 832     public void close() {
 833         ensureOwner();
 834         int s = state;
 835         if (s == CLOSED)
 836             return;







 837 

 838         try {
 839             if (s < SHUTDOWN)
 840                 implShutdown();
 841             flock.close();
 842         } finally {
 843             state = CLOSED;
 844         }
 845 
 846         // throw ISE if the owner didn't attempt to join after forking
 847         if (forkRound > lastJoinAttempted) {
 848             lastJoinCompleted = forkRound;
 849             throw newIllegalStateExceptionNoJoin();
 850         }
 851     }
 852 




 853     @Override
 854     public String toString() {
 855         String name = flock.name();
 856         return switch (state) {
 857             case OPEN     -> name;
 858             case SHUTDOWN -> name + "/shutdown";
 859             case CLOSED   -> name + "/closed";
 860             default -> throw new InternalError();
 861         };
 862     }
 863 
 864     /**
 865      * Subtask implementation, runs the task specified to the fork method.
 866      */
 867     private static final class SubtaskImpl<T> implements Subtask<T>, Runnable {
 868         private static final AltResult RESULT_NULL = new AltResult(Subtask.State.SUCCESS);
 869 
 870         private record AltResult(Subtask.State state, Throwable exception) {
 871             AltResult(Subtask.State state) {
 872                 this(state, null);
 873             }
 874         }
 875 
 876         private final StructuredTaskScope<? super T> scope;
 877         private final Callable<? extends T> task;
 878         private final int round;
 879         private volatile Object result;
 880 
 881         SubtaskImpl(StructuredTaskScope<? super T> scope,
 882                     Callable<? extends T> task,
 883                     int round) {
 884             this.scope = scope;
 885             this.task = task;
 886             this.round = round;
 887         }
 888 
 889         @Override
 890         public void run() {
 891             T result = null;
 892             Throwable ex = null;
 893             try {
 894                 result = task.call();
 895             } catch (Throwable e) {
 896                 ex = e;
 897             }
 898 
 899             // nothing to do if task scope is shutdown
 900             if (scope.isShutdown())
 901                 return;
 902 
 903             // capture result or exception, invoke handleComplete
 904             if (ex == null) {
 905                 this.result = (result != null) ? result : RESULT_NULL;
 906             } else {
 907                 this.result = new AltResult(State.FAILED, ex);
 908             }
 909             scope.handleComplete(this);
 910         }
 911 
 912         @Override
 913         public Callable<? extends T> task() {
 914             return task;
 915         }
 916 
 917         @Override
 918         public Subtask.State state() {
 919             Object result = this.result;
 920             if (result == null) {
 921                 return State.UNAVAILABLE;
 922             } else if (result instanceof AltResult alt) {
 923                 // null or failed
 924                 return alt.state();
 925             } else {
 926                 return State.SUCCESS;
 927             }
 928         }
 929 

 930         @Override
 931         public T get() {
 932             scope.ensureJoinedIfOwner(round);
 933             Object result = this.result;
 934             if (result instanceof AltResult) {
 935                 if (result == RESULT_NULL) return null;
 936             } else if (result != null) {
 937                 @SuppressWarnings("unchecked")
 938                 T r = (T) result;
 939                 return r;
 940             }
 941             throw new IllegalStateException(
 942                     "Result is unavailable or subtask did not complete successfully");
 943         }
 944 
 945         @Override
 946         public Throwable exception() {
 947             scope.ensureJoinedIfOwner(round);
 948             Object result = this.result;
 949             if (result instanceof AltResult alt && alt.state() == State.FAILED) {
 950                 return alt.exception();
 951             }
 952             throw new IllegalStateException(
 953                     "Exception is unavailable or subtask did not complete with exception");
 954         }
 955 
 956         @Override
 957         public String toString() {
 958             String stateAsString = switch (state()) {
 959                 case UNAVAILABLE -> "[Unavailable]";
 960                 case SUCCESS     -> "[Completed successfully]";
 961                 case FAILED      -> {
 962                     Throwable ex = ((AltResult) result).exception();
 963                     yield "[Failed: " + ex + "]";
 964                 }
 965             };
 966             return Objects.toIdentityString(this) + stateAsString;
 967         }
 968     }
 969 
 970     /**
 971      * A {@code StructuredTaskScope} that captures the result of the first subtask to
 972      * complete {@linkplain Subtask.State#SUCCESS successfully}. Once captured, it
 973      * {@linkplain #shutdown() shuts down} the task scope to interrupt unfinished threads
 974      * and wakeup the task scope owner. The policy implemented by this class is intended
 975      * for cases where the result of any subtask will do ("invoke any") and where the
 976      * results of other unfinished subtasks are no longer needed.
 977      *
 978      * <p> Unless otherwise specified, passing a {@code null} argument to a method
 979      * in this class will cause a {@link NullPointerException} to be thrown.
 980      *
 981      * @apiNote This class implements a policy to shut down the task scope when a subtask
 982      * completes successfully. There shouldn't be any need to directly shut down the task
 983      * scope with the {@link #shutdown() shutdown} method.
 984      *
 985      * @param <T> the result type
 986      * @since 21
 987      */
 988     @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 989     public static final class ShutdownOnSuccess<T> extends StructuredTaskScope<T> {
 990         private static final Object RESULT_NULL = new Object();
 991         private static final VarHandle FIRST_RESULT;
 992         private static final VarHandle FIRST_EXCEPTION;
 993         static {
 994             MethodHandles.Lookup l = MethodHandles.lookup();
 995             FIRST_RESULT = MhUtil.findVarHandle(l, "firstResult", Object.class);
 996             FIRST_EXCEPTION = MhUtil.findVarHandle(l, "firstException", Throwable.class);
 997         }
 998         private volatile Object firstResult;
 999         private volatile Throwable firstException;
1000 
1001         /**
1002          * Constructs a new {@code ShutdownOnSuccess} with the given name and thread factory.
1003          * The task scope is optionally named for the purposes of monitoring and management.
1004          * The thread factory is used to {@link ThreadFactory#newThread(Runnable) create}
1005          * threads when subtasks are {@linkplain #fork(Callable) forked}. The task scope
1006          * is owned by the current thread.
1007          *
1008          * <p> Construction captures the current thread's {@linkplain ScopedValue scoped
1009          * value} bindings for inheritance by threads started in the task scope. The
1010          * <a href="#TreeStructure">Tree Structure</a> section in the class description
1011          * details how parent-child relations are established implicitly for the purpose
1012          * of inheritance of scoped value bindings.
1013          *
1014          * @param name the name of the task scope, can be null
1015          * @param factory the thread factory
1016          */
1017         public ShutdownOnSuccess(String name, ThreadFactory factory) {
1018             super(name, factory);
1019         }
1020 
1021         /**
1022          * Constructs a new unnamed {@code ShutdownOnSuccess} that creates virtual threads.
1023          *
1024          * @implSpec This constructor is equivalent to invoking the 2-arg constructor with
1025          * a name of {@code null} and a thread factory that creates virtual threads.
1026          */
1027         public ShutdownOnSuccess() {
1028             this(null, Thread.ofVirtual().factory());
1029         }
1030 
1031         @Override
1032         protected void handleComplete(Subtask<? extends T> subtask) {
1033             if (firstResult != null) {
1034                 // already captured a result
1035                 return;


1036             }


1037 
1038             if (subtask.state() == Subtask.State.SUCCESS) {
1039                 // task succeeded
1040                 T result = subtask.get();
1041                 Object r = (result != null) ? result : RESULT_NULL;
1042                 if (FIRST_RESULT.compareAndSet(this, null, r)) {
1043                     super.shutdown();
1044                 }
1045             } else if (firstException == null) {
1046                 // capture the exception thrown by the first subtask that failed
1047                 FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
1048             }
1049         }


1050 
1051         /**
1052          * Wait for a subtask started in this task scope to complete {@linkplain
1053          * Subtask.State#SUCCESS successfully} or all subtasks to complete.
1054          *
1055          * <p> This method waits for all subtasks by waiting for all threads {@linkplain
1056          * #fork(Callable) started} in this task scope to finish execution. It stops waiting
1057          * when all threads finish, a subtask completes successfully, or the current
1058          * thread is {@linkplain Thread#interrupt() interrupted}. It also stops waiting
1059          * if the {@link #shutdown() shutdown} method is invoked directly to shut down
1060          * this task scope.
1061          *
1062          * <p> This method may only be invoked by the task scope owner.
1063          *
1064          * @throws IllegalStateException {@inheritDoc}
1065          * @throws WrongThreadException {@inheritDoc}
1066          */
1067         @Override
1068         public ShutdownOnSuccess<T> join() throws InterruptedException {
1069             super.join();
1070             return this;









1071         }
1072 
1073         /**
1074          * Wait for a subtask started in this task scope to complete {@linkplain
1075          * Subtask.State#SUCCESS successfully} or all subtasks to complete, up to the
1076          * given deadline.
1077          *
1078          * <p> This method waits for all subtasks by waiting for all threads {@linkplain
1079          * #fork(Callable) started} in this task scope to finish execution. It stops waiting
1080          * when all threads finish, a subtask completes successfully, the deadline is
1081          * reached, or the current thread is {@linkplain Thread#interrupt() interrupted}.
1082          * It also stops waiting if the {@link #shutdown() shutdown} method is invoked
1083          * directly to shut down this task scope.
1084          *
1085          * <p> This method may only be invoked by the task scope owner.
1086          *
1087          * @throws IllegalStateException {@inheritDoc}
1088          * @throws WrongThreadException {@inheritDoc}
1089          */
1090         @Override
1091         public ShutdownOnSuccess<T> joinUntil(Instant deadline)
1092             throws InterruptedException, TimeoutException
1093         {
1094             super.joinUntil(deadline);
1095             return this;






1096         }

1097 
1098         /**
1099          * {@return the result of the first subtask that completed {@linkplain
1100          * Subtask.State#SUCCESS successfully}}
1101          *
1102          * <p> When no subtask completed successfully, but a subtask {@linkplain
1103          * Subtask.State#FAILED failed} then {@code ExecutionException} is thrown with
1104          * the subtask's exception as the {@linkplain Throwable#getCause() cause}.
1105          *
1106          * @throws ExecutionException if no subtasks completed successfully but at least
1107          * one subtask failed
1108          * @throws IllegalStateException if no subtasks completed or the task scope owner
1109          * did not join after forking
1110          * @throws WrongThreadException if the current thread is not the task scope owner
1111          */
1112         public T result() throws ExecutionException {
1113             return result(ExecutionException::new);
1114         }

1115 
1116         /**
1117          * Returns the result of the first subtask that completed {@linkplain
1118          * Subtask.State#SUCCESS successfully}, otherwise throws an exception produced
1119          * by the given exception supplying function.
1120          *
1121          * <p> When no subtask completed successfully, but a subtask {@linkplain
1122          * Subtask.State#FAILED failed}, then the exception supplying function is invoked
1123          * with subtask's exception.
1124          *
1125          * @param esf the exception supplying function
1126          * @param <X> type of the exception to be thrown
1127          * @return the result of the first subtask that completed with a result
1128          *
1129          * @throws X if no subtasks completed successfully but at least one subtask failed
1130          * @throws IllegalStateException if no subtasks completed or the task scope owner
1131          * did not join after forking
1132          * @throws WrongThreadException if the current thread is not the task scope owner
1133          */
1134         public <X extends Throwable> T result(Function<Throwable, ? extends X> esf) throws X {
1135             Objects.requireNonNull(esf);
1136             ensureOwnerAndJoined();
1137 
1138             Object result = firstResult;
1139             if (result == RESULT_NULL) {
1140                 return null;
1141             } else if (result != null) {
1142                 @SuppressWarnings("unchecked")
1143                 T r = (T) result;
1144                 return r;
1145             }
1146 
1147             Throwable exception = firstException;
1148             if (exception != null) {
1149                 X ex = esf.apply(exception);
1150                 Objects.requireNonNull(ex, "esf returned null");
1151                 throw ex;


1152             }
1153 
1154             throw new IllegalStateException("No completed subtasks");
1155         }
1156     }
1157 
1158     /**
1159      * A {@code StructuredTaskScope} that captures the exception of the first subtask to
1160      * {@linkplain Subtask.State#FAILED fail}. Once captured, it {@linkplain #shutdown()
1161      * shuts down} the task scope to interrupt unfinished threads and wakeup the task
1162      * scope owner. The policy implemented by this class is intended for cases where the
1163      * results for all subtasks are required ("invoke all"); if any subtask fails then the
1164      * results of other unfinished subtasks are no longer needed.
1165      *
1166      * <p> Unless otherwise specified, passing a {@code null} argument to a method
1167      * in this class will cause a {@link NullPointerException} to be thrown.
1168      *
1169      * @apiNote This class implements a policy to shut down the task scope when a subtask
1170      * fails. There shouldn't be any need to directly shut down the task scope with the
1171      * {@link #shutdown() shutdown} method.
1172      *
1173      * @since 21
1174      */
1175     @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
1176     public static final class ShutdownOnFailure extends StructuredTaskScope<Object> {
1177         private static final VarHandle FIRST_EXCEPTION =
1178                 MhUtil.findVarHandle(MethodHandles.lookup(), "firstException", Throwable.class);
1179         private volatile Throwable firstException;
1180 
1181         /**
1182          * Constructs a new {@code ShutdownOnFailure} with the given name and thread factory.
1183          * The task scope is optionally named for the purposes of monitoring and management.
1184          * The thread factory is used to {@link ThreadFactory#newThread(Runnable) create}
1185          * threads when subtasks are {@linkplain #fork(Callable) forked}. The task scope
1186          * is owned by the current thread.
1187          *
1188          * <p> Construction captures the current thread's {@linkplain ScopedValue scoped
1189          * value} bindings for inheritance by threads started in the task scope. The
1190          * <a href="#TreeStructure">Tree Structure</a> section in the class description
1191          * details how parent-child relations are established implicitly for the purpose
1192          * of inheritance of scoped value bindings.
1193          *
1194          * @param name the name of the task scope, can be null
1195          * @param factory the thread factory
1196          */
1197         public ShutdownOnFailure(String name, ThreadFactory factory) {
1198             super(name, factory);
1199         }
1200 
1201         /**
1202          * Constructs a new unnamed {@code ShutdownOnFailure} that creates virtual threads.
1203          *
1204          * @implSpec This constructor is equivalent to invoking the 2-arg constructor with
1205          * a name of {@code null} and a thread factory that creates virtual threads.
1206          */
1207         public ShutdownOnFailure() {
1208             this(null, Thread.ofVirtual().factory());
1209         }
1210 
1211         @Override
1212         protected void handleComplete(Subtask<?> subtask) {
1213             if (subtask.state() == Subtask.State.FAILED
1214                     && firstException == null
1215                     && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception())) {
1216                 super.shutdown();
1217             }
1218         }
1219 
1220         /**
1221          * Wait for all subtasks started in this task scope to complete or for a subtask
1222          * to {@linkplain Subtask.State#FAILED fail}.
1223          *
1224          * <p> This method waits for all subtasks by waiting for all threads {@linkplain
1225          * #fork(Callable) started} in this task scope to finish execution. It stops waiting
1226          * when all threads finish, a subtask fails, or the current thread is {@linkplain
1227          * Thread#interrupt() interrupted}. It also stops waiting if the {@link #shutdown()
1228          * shutdown} method is invoked directly to shut down this task scope.
1229          *
1230          * <p> This method may only be invoked by the task scope owner.
1231          *
1232          * @throws IllegalStateException {@inheritDoc}
1233          * @throws WrongThreadException {@inheritDoc}
1234          */
1235         @Override
1236         public ShutdownOnFailure join() throws InterruptedException {
1237             super.join();
1238             return this;










1239         }
1240 
1241         /**
1242          * Wait for all subtasks started in this task scope to complete or for a subtask
1243          * to {@linkplain Subtask.State#FAILED fail}, up to the given deadline.
1244          *
1245          * <p> This method waits for all subtasks by waiting for all threads {@linkplain
1246          * #fork(Callable) started} in this task scope to finish execution. It stops waiting
1247          * when all threads finish, a subtask fails, the deadline is reached, or the current
1248          * thread is {@linkplain Thread#interrupt() interrupted}. It also stops waiting
1249          * if the {@link #shutdown() shutdown} method is invoked directly to shut down
1250          * this task scope.
1251          *
1252          * <p> This method may only be invoked by the task scope owner.
1253          *
1254          * @throws IllegalStateException {@inheritDoc}
1255          * @throws WrongThreadException {@inheritDoc}
1256          */
1257         @Override
1258         public ShutdownOnFailure joinUntil(Instant deadline)
1259             throws InterruptedException, TimeoutException
1260         {
1261             super.joinUntil(deadline);
1262             return this;
1263         }
1264 
1265         /**
1266          * Returns the exception of the first subtask that {@linkplain Subtask.State#FAILED
1267          * failed}. If no subtasks failed then an empty {@code Optional} is returned.
1268          *
1269          * @return the exception for the first subtask to fail or an empty optional if no
1270          * subtasks failed
1271          *
1272          * @throws WrongThreadException if the current thread is not the task scope owner
1273          * @throws IllegalStateException if the task scope owner did not join after forking
1274          */
1275         public Optional<Throwable> exception() {
1276             ensureOwnerAndJoined();
1277             return Optional.ofNullable(firstException);
1278         }
1279 
1280         /**
1281          * Throws if a subtask {@linkplain Subtask.State#FAILED failed}.
1282          * If any subtask failed with an exception then {@code ExecutionException} is
1283          * thrown with the exception of the first subtask to fail as the {@linkplain
1284          * Throwable#getCause() cause}. This method does nothing if no subtasks failed.
1285          *
1286          * @throws ExecutionException if a subtask failed
1287          * @throws WrongThreadException if the current thread is not the task scope owner
1288          * @throws IllegalStateException if the task scope owner did not join after forking
1289          */
1290         public void throwIfFailed() throws ExecutionException {
1291             throwIfFailed(ExecutionException::new);
1292         }

1293 
1294         /**
1295          * Throws the exception produced by the given exception supplying function if a
1296          * subtask {@linkplain Subtask.State#FAILED failed}. If any subtask failed with
1297          * an exception then the function is invoked with the exception of the first
1298          * subtask to fail. The exception returned by the function is thrown. This method
1299          * does nothing if no subtasks failed.
1300          *
1301          * @param esf the exception supplying function
1302          * @param <X> type of the exception to be thrown
1303          *
1304          * @throws X produced by the exception supplying function
1305          * @throws WrongThreadException if the current thread is not the task scope owner
1306          * @throws IllegalStateException if the task scope owner did not join after forking
1307          */
1308         public <X extends Throwable>
1309         void throwIfFailed(Function<Throwable, ? extends X> esf) throws X {
1310             ensureOwnerAndJoined();
1311             Objects.requireNonNull(esf);
1312             Throwable exception = firstException;
1313             if (exception != null) {
1314                 X ex = esf.apply(exception);
1315                 Objects.requireNonNull(ex, "esf returned null");
1316                 throw ex;
1317             }
1318         }
1319     }
1320 }

  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.concurrent;
  26 
  27 import java.lang.invoke.MethodHandles;
  28 import java.lang.invoke.VarHandle;
  29 import java.security.AccessController;
  30 import java.security.PrivilegedAction;
  31 import java.time.Duration;
  32 import java.util.ArrayList;
  33 import java.util.List;
  34 import java.util.NoSuchElementException;
  35 import java.util.Objects;


  36 import java.util.function.Function;
  37 import java.util.function.Predicate;
  38 import java.util.function.Supplier;
  39 import java.util.stream.Stream;
  40 import jdk.internal.javac.PreviewFeature;
  41 import jdk.internal.misc.InnocuousThread;
  42 import jdk.internal.misc.ThreadFlock;
  43 import jdk.internal.invoke.MhUtil;
  44 
  45 /**
  46  * An API for <em>structured concurrency</em>. {@code StructuredTaskScope} supports cases
  47  * where a main task splits into several concurrent subtasks, and where the subtasks must
  48  * complete before the main task continues. A {@code StructuredTaskScope} can be used to
  49  * ensure that the lifetime of a concurrent operation is confined by a <em>syntax block</em>,
  50  * just like that of a sequential operation in structured programming.
  51  *
  52  * <p> {@code StructuredTaskScope} defines the static method {@link #open() open} to open
  53  * a new {@code StructuredTaskScope} and the {@link #close() close} method to close it.
  54  * The API is designed to be used with the {@code try-with-resources} statement where
  55  * the {@code StructuredTaskScope} is opened as a resource and then closed automatically.
  56  * The code in the block uses the {@link #fork(Callable) fork} method to fork subtasks.
  57  * After forking, it uses the {@link #join() join} method to wait for all subtasks to
  58  * finish (or some other outcome) as a single operation. Forking a subtask starts a new
  59  * {@link Thread} to run the subtask. The thread executing the main task does not continue
  60  * beyond the {@code close} method until all threads started to execute subtasks have finished.
  61  * To ensure correct usage, the {@code fork}, {@code join} and {@code close} methods may
  62  * only be invoked by the <em>owner thread</em> (the thread that opened the {@code
  63  * StructuredTaskScope}), the {@code fork} method may not be called after {@code join},
  64  * the {@code join} method may only be invoked once, and the {@code close} method throws
  65  * an exception after closing if the owner did not invoke the {@code join} method after
  66  * forking subtasks.
  67  *
  68  * <p> As a first example, consider a main task that splits into two subtasks to concurrently
  69  * fetch resources from two URL locations "left" and "right". Both subtasks may complete
  70  * successfully, one subtask may succeed and the other may fail, or both subtasks may
  71  * fail. The main task in this example is interested in the successful result from both
  72  * subtasks. It waits in the {@link #join() join} method for both subtasks to complete
  73  * successfully or for either subtask to fail.




  74  * {@snippet lang=java :
  75  *    // @link substring="open" target="#open()" :
  76  *    try (var scope = StructuredTaskScope.open()) {
  77  *
  78  *        // @link substring="fork" target="#fork(Callable)" :
  79  *        Subtask<String> subtask1 = scope.fork(() -> query(left));
  80  *        Subtask<Integer> subtask2 = scope.fork(() -> query(right));
  81  *
  82  *        // throws if either subtask fails
  83  *        scope.join();  // @link substring="join" target="#join()"
  84  *
  85  *        // both subtasks completed successfully
  86  *        // @link substring="get" target="Subtask#get()" :
  87  *        return new MyResult(subtask1.get(), subtask2.get());
  88  *
  89  *    // @link substring="close" target="#close()" :
  90  *    } // close

  91  * }






  92  *
  93  * <p> If both subtasks complete successfully then the {@code join} method completes
  94  * normally and the main task uses the {@link Subtask#get() Subtask.get()} method to get
  95  * the result of each subtask. If one of the subtasks fails then the other subtask is
  96  * cancelled (this will interrupt the thread executing the other subtask) and the {@code
  97  * join} method throws {@link FailedException} with the exception from the failed subtask
  98  * as the {@linkplain Throwable#getCause() cause}.
  99  *
 100  * <p> In the example, the subtasks produce results of different types ({@code String} and
 101  * {@code Integer}). In other cases the subtasks may all produce results of the same type.
 102  * If the example had used {@code StructuredTaskScope.<String>open()} then it could
 103  * only be used to fork subtasks that return a {@code String} result.
 104  *
 105  * <h2>Joiners</h2>
 106  *
 107  * <p> In the example above, the main task fails if any subtask fails. If all subtasks
 108  * succeed then the {@code join} method completes normally. Other policy and outcome is
 109  * supported by creating a {@code StructuredTaskScope} with a {@link Joiner} that
 110  * implements the desired policy. A {@code Joiner} handles subtask completion and produces
 111  * the outcome for the {@link #join() join} method. In the example above, {@code join}
 112  * returns {@code null}. Depending on the {@code Joiner}, {@code join} may return a
 113  * result, a stream of elements, or some other object. The {@code Joiner} interface defines
 114  * factory methods to create {@code Joiner}s for some common cases.
 115  *
 116  * <p> A {@code Joiner} may <a id="CancelExecution"><em>cancel execution</em></a> (sometimes
 117  * called "short-circuiting") when some condition is reached that does not require the
 118  * result of subtasks that are still executing. Cancelling execution prevents new threads
 119  * from being started to execute further subtasks, {@linkplain Thread#interrupt() interrupts}
 120  * the threads executing subtasks that have not completed, and causes the {@code join}
 121  * method to wakeup with the outcome (result or exception). In the above example, the
 122  * outcome is that {@code join} completes with a result of {@code null} when all subtasks
 123  * succeed. It cancels execution if any of the subtasks fail, throwing the exception from
 124  * the first subtask that fails. Other {@code Joiner} implementations may return an object
 125  * instead of {@code null} and may cancel execution or throw based on some other policy.
 126  *
 127  * <p> To allow for cancelling execution, subtasks must be coded so that they
 128  * finish as soon as possible when interrupted. Subtasks that do not respond to interrupt,
 129  * e.g. block on methods that are not interruptible, may delay the closing of a task scope
 130  * indefinitely. The {@link #close() close} method always waits for threads executing
 131  * subtasks to finish, even if execution is cancelled, so execution cannot continue beyond
 132  * the {@code close} method until the interrupted threads finish.
 133  *
 134  * <p> Now consider another example that also splits into two subtasks. In this example,
 135  * each subtask produces a {@code String} result and the main task is only interested in
 136  * the result from the first subtask to complete successfully. The example uses {@link
 137  * Joiner#anySuccessfulResultOrThrow() Joiner.anySuccessfulResultOrThrow()} to
 138  * create a {@code Joiner} that makes available the result of the first subtask to
 139  * complete successfully. The type parameter in the example is "{@code String}" so that
 140  * only subtasks that return a {@code String} can be forked.
 141  * {@snippet lang=java :
 142  *    // @link substring="open" target="#open(Policy)" :
 143  *    try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow())) {
 144  *
 145  *        scope.fork(callable1);
 146  *        scope.fork(callable2);
 147  *
 148  *        // throws if both subtasks fail
 149  *        String firstResult = scope.join();


 150  *
 151  *    }
 152  * }
 153  *
 154  * <p> In the example, the main task forks the two subtasks, then waits in the {@code
 155  * join} method for either subtask to complete successfully or for both subtasks to fail.
 156  * If one of the subtasks completes successfully then the {@code Joiner} causes the other
 157  * subtask to be cancelled (this will interrupt the thread executing the subtask), and
 158  * the {@code join} method returns the result from the successful subtask. Cancelling the
 159  * other subtask avoids the main task waiting for a result that it doesn't care about. If
 160  * both subtasks fail then the {@code join} method throws {@link FailedException} with the
 161  * exception from one of the subtasks as the {@linkplain Throwable#getCause() cause}.
 162  *
 163  * <p> Whether code uses the {@code Subtask} returned from {@code fork} will depend on
 164  * the {@code Joiner} and usage. Some {@code Joiner} implementations are suited to subtasks
 165  * that return results of the same type and where the {@code join} method returns a result
 166  * for the main task to use. Code that forks subtasks that return results of different
 167  * types, and uses a {@code Joiner} such as {@code Joiner.awaitAllSuccessfulOrThrow()} that
 168  * does not return a result, will use {@link Subtask#get() Subtask.get()} after joining.
 169  *
 170  * <h2>Exception handling</h2>
 171  *
 172  * <p> A {@code StructuredTaskScope} is opened with a {@link Joiner Joiner} that
 173  * handles subtask completion and produces the outcome for the {@link #join() join} method.
 174  * In some cases, the outcome will be a result, in other cases it will be an exception.
 175  * If the outcome is an exception then the {@code join} method throws {@link
 176  * FailedException} with the exception as the {@linkplain Throwable#getCause()
 177  * cause}. For many {@code Joiner} implementations, the exception will be an exception
 178  * thrown by a subtask that failed. In the case of {@link Joiner#allSuccessfulOrThrow()
 179  * allSuccessfulOrThrow} and {@link Joiner#awaitAllSuccessfulOrThrow() awaitAllSuccessfulOrThrow}
 180  * for example, the exception is from the first subtask to fail.







 181  *
 182  * <p> Many of the details for how exceptions are handled will depend on usage. In some
 183  * cases it may be useful to add a {@code catch} block to catch {@code FailedException}.
 184  * The exception handling may use {@code instanceof} with pattern matching to handle
 185  * specific causes.





 186  * {@snippet lang=java :
 187  *    try (var scope = StructuredTaskScope.open()) {
 188  *
 189  *        ..

 190  *
 191  *    } catch (StructuredTaskScope.FailedException e) {
 192  *
 193  *        Throwable cause = e.getCause();
 194  *        switch (cause) {
 195  *            case IOException ioe -> ..
 196  *            default -> ..
 197  *        }
 198  *
 199  *    }

 200  * }
 201  * In other cases it may not be useful to catch {@code FailedException} but instead leave
 202  * it to propagate to the configured {@linkplain Thread.UncaughtExceptionHandler uncaught
 203  * exception handler} for logging purposes.









 204  *
 205  * <p> For cases where a specific exception triggers the use of a default result then it
 206  * may be more appropriate to handle this in the subtask itself rather than the subtask
 207  * failing and code in the main task handling the exception.
 208  *
 209  * <h2>Configuration</h2>

 210  *
 211  * A {@code StructuredTaskScope} is opened with {@linkplain Config configuration} that
 212  * consists of a {@link ThreadFactory} to create threads, an optional name for monitoring
 213  * and management purposes, and an optional timeout.
 214  *
 215  * <p> The {@link #open()} and {@link #open(Joiner)} methods create a {@code StructuredTaskScope}
 216  * with the <a id="DefaultConfiguration"> <em>default configuration</em></a>. The default
 217  * configuration has a {@code ThreadFactory} that creates unnamed
 218  * <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>,
 219  * is unnamed for monitoring and management purposes, and has no timeout.
 220  *
 221  * <p> The 2-arg {@link #open(Joiner, Function) open} method can be used to create a
 222  * {@code StructuredTaskScope} that uses a different {@code ThreadFactory}, has a name for
 223  * the purposes of monitoring and management, or has a timeout that cancels execution if
 224  * the timeout expires before or while waiting for subtasks to complete. The {@code open}
 225  * method is called with a {@linkplain Function function} that is applied to the default
 226  * configuration and returns a {@link Config Config} for the {@code StructuredTaskScope}
 227  * under construction.
 228  *
 229  * <p> The following example opens a new {@code StructuredTaskScope} with a {@code
 230  * ThreadFactory} that creates virtual threads {@linkplain Thread#setName(String) named}
 231  * "duke-0", "duke-1" ...
 232  * {@snippet lang = java:
 233  *    // @link substring="name" target="Thread.Builder#name(String, long)" :
 234  *    ThreadFactory factory = Thread.ofVirtual().name("duke-", 0).factory();
 235  *
 236  *    // @link substring="withThreadFactory" target="Config#withThreadFactory(ThreadFactory)" :
 237  *    try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) {
 238  *
 239  *        scope.fork( .. );   // runs in a virtual thread with name "duke-0"
 240  *        scope.fork( .. );   // runs in a virtual thread with name "duke-1"
 241  *
 242  *        scope.join();






 243  *
 244  *     }
 245  *}



 246  *
 247  * <p> A second example sets a timeout, represented by a {@link Duration}. The timeout
 248  * starts when the new task scope is opened. If the timeout expires before the {@code join}
 249  * method has completed then <a href="#CancelExecution">execution is cancelled</a>. This
 250  * interrupts the threads executing the two subtasks and causes the {@link #join() join}
 251  * method to throw {@link TimeoutException}.
 252  * {@snippet lang=java :
 253  *    Duration timeout = Duration.ofSeconds(10);

 254  *
 255  *    // @link substring="allSuccessfulOrThrow" target="Joiner#allSuccessfulOrThrow()" :
 256  *    try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
 257  *    // @link substring="withTimeout" target="Config#withTimeout(Duration)" :
 258  *                                              cf -> cf.withTimeout(timeout))) {


 259  *
 260  *        scope.fork(callable1);
 261  *        scope.fork(callable2);



 262  *
 263  *        List<String> result = scope.join()
 264  *                                   .map(Subtask::get)
 265  *                                   .toList();
 266  *
 267  *   }

 268  * }



 269  *
 270  * <h2>Inheritance of scoped value bindings</h2>
 271  *
 272  * {@link ScopedValue} supports the execution of a method with a {@code ScopedValue} bound
 273  * to a value for the bounded period of execution of the method by the <em>current thread</em>.
 274  * It allows a value to be safely and efficiently shared to methods without using method
 275  * parameters.
 276  *
 277  * <p> When used in conjunction with a {@code StructuredTaskScope}, a {@code ScopedValue}
 278  * can also safely and efficiently share a value to methods executed by subtasks forked
 279  * in the task scope. When a {@code ScopedValue} object is bound to a value in the thread
 280  * executing the main task then that binding is inherited by the threads created to
 281  * execute the subtasks. The thread executing the main task does not continue beyond the
 282  * {@link #close() close} method until all threads executing the subtasks have finished.
 283  * This ensures that the {@code ScopedValue} is not reverted to being {@linkplain
 284  * ScopedValue#isBound() unbound} (or its previous value) while subtasks are executing.
 285  * In addition to providing a safe and efficient means to inherit a value into subtasks,
 286  * the inheritance allows sequential code using {@code ScopedValue} be refactored to use
 287  * structured concurrency.
 288  *
 289  * <p> To ensure correctness, opening a new {@code StructuredTaskScope} captures the
 290  * current thread's scoped value bindings. These are the scoped values bindings that are
 291  * inherited by the threads created to execute subtasks in the task scope. Forking a
 292  * subtask checks that the bindings in effect at the time that the subtask is forked
 293  * match the bindings when the {@code StructuredTaskScope} was created. This check ensures
 294  * that a subtask does not inherit a binding that is reverted in the main task before the
 295  * subtask has completed.
 296  *
 297  * <p> A {@code ScopedValue} that is shared across threads requires that the value be an
 298  * immutable object or for all access to the value to be appropriately synchronized.




 299  *
 300  * <p> The following example demonstrates the inheritance of scoped value bindings. The
 301  * scoped value USERNAME is bound to the value "duke" for the bounded period of a lambda
 302  * expression by the thread executing it. The code in the block opens a {@code
 303  * StructuredTaskScope} and forks two subtasks, it then waits in the {@code join} method
 304  * and aggregates the results from both subtasks. If code executed by the threads
 305  * running subtask1 and subtask2 uses {@link ScopedValue#get()}, to get the value of
 306  * USERNAME, then value "duke" will be returned.
 307  * {@snippet lang=java :
 308  *     // @link substring="newInstance" target="ScopedValue#newInstance()" :
 309  *     private static final ScopedValue<String> USERNAME = ScopedValue.newInstance();
 310  *
 311  *     // @link substring="callWhere" target="ScopedValue#where" :
 312  *     MyResult result = ScopedValue.where(USERNAME, "duke").call(() -> {

 313  *
 314  *         try (var scope = StructuredTaskScope.open()) {



 315  *
 316  *             Subtask<String> subtask1 = scope.fork( .. );    // inherits binding
 317  *             Subtask<Integer> subtask2 = scope.fork( .. );   // inherits binding
 318  *
 319  *             scope.join();
 320  *             return new MyResult(subtask1.get(), subtask2.get());
 321  *         }
 322  *
 323  *     });
 324  * }
 325  *
 326  * <p> A scoped value inherited into a subtask may be
 327  * <a href="{@docRoot}/java.base/java/lang/ScopedValues.html#rebind">rebound</a> to a new
 328  * value in the subtask for the bounded execution of some method executed in the subtask.
 329  * When the method completes, the value of the {@code ScopedValue} reverts to its previous
 330  * value, the value inherited from the thread executing the main task.
 331  *
 332  * <p> A subtask may execute code that itself opens a new {@code StructuredTaskScope}.
 333  * A main task executing in thread T1 opens a {@code StructuredTaskScope} and forks a
 334  * subtask that runs in thread T2. The scoped value bindings captured when T1 opens the
 335  * task scope are inherited into T2. The subtask (in thread T2) executes code that opens a
 336  * new {@code StructuredTaskScope} and forks a subtask that runs in thread T3. The scoped
 337  * value bindings captured when T2 opens the task scope are inherited into T3. These
 338  * include (or may be the same) as the bindings that were inherited from T1. In effect,
 339  * scoped values are inherited into a tree of subtasks, not just one level of subtask.
 340  *
 341  * <h2>Memory consistency effects</h2>
 342  *
 343  * <p> Actions in the owner thread of a {@code StructuredTaskScope} prior to
 344  * {@linkplain #fork forking} of a subtask
 345  * <a href="{@docRoot}/java.base/java/util/concurrent/package-summary.html#MemoryVisibility">
 346  * <i>happen-before</i></a> any actions taken by that subtask, which in turn
 347  * <i>happen-before</i> the subtask result is {@linkplain Subtask#get() retrieved}.

 348  *
 349  * <h2>General exceptions</h2>
 350  *
 351  * <p> Unless otherwise specified, passing a {@code null} argument to a method in this
 352  * class will cause a {@link NullPointerException} to be thrown.
 353  *
 354  * @param <T> the result type of tasks executed in the task scope
 355  * @param <R> the type of the result returned by the join method
 356  *
 357  * @jls 17.4.5 Happens-before Order
 358  * @since 21
 359  */
 360 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 361 public class StructuredTaskScope<T, R> implements AutoCloseable {
 362     private static final VarHandle CANCELLED;
 363     static {
 364         MethodHandles.Lookup l = MethodHandles.lookup();
 365         CANCELLED = MhUtil.findVarHandle(l, "cancelled", boolean.class);
 366     }
 367 
 368     private final Joiner<? super T, ? extends R> joiner;
 369     private final ThreadFactory threadFactory;
 370     private final ThreadFlock flock;

 371 
 372     // state, only accessed by owner thread
 373     private static final int ST_NEW            = 0;
 374     private static final int ST_FORKED         = 1;   // subtasks forked, need to join
 375     private static final int ST_JOIN_STARTED   = 2;   // join started, can no longer fork
 376     private static final int ST_JOIN_COMPLETED = 3;   // join completed
 377     private static final int ST_CLOSED         = 4;   // closed
 378     private int state;
 379 
 380     // timer task, only accessed by owner thread
 381     private Future<?> timerTask;
 382 
 383     // set or read by any thread
 384     private volatile boolean cancelled;
 385 
 386     // set by the timer thread, read by the owner thread
 387     private volatile boolean timeoutExpired;
 388 
 389     /**
 390      * Throws WrongThreadException if the current thread is not the owner thread.
 391      */
 392     private void ensureOwner() {
 393         if (Thread.currentThread() != flock.owner()) {
 394             throw new WrongThreadException("Current thread not owner");
 395         }
 396     }
 397 
 398     /**
 399      * Throws IllegalStateException if already joined or task scope is closed.
 400      */
 401     private void ensureNotJoined() {
 402         assert Thread.currentThread() == flock.owner();
 403         if (state > ST_FORKED) {
 404             throw new IllegalStateException("Already joined or task scope is closed");
 405         }
 406     }
 407 
 408     /**
 409      * Throws IllegalStateException if invoked by the owner thread and the owner thread
 410      * has not joined.
 411      */
 412     private void ensureJoinedIfOwner() {
 413         if (Thread.currentThread() == flock.owner() && state <= ST_JOIN_STARTED) {
 414             throw new IllegalStateException("join not called");
 415         }
 416     }
 417 
 418     /**
 419      * Interrupts all threads in this task scope, except the current thread.
 420      */
 421     private void implInterruptAll() {
 422         flock.threads()
 423                 .filter(t -> t != Thread.currentThread())
 424                 .forEach(t -> {
 425                     try {
 426                         t.interrupt();
 427                     } catch (Throwable ignore) { }
 428                 });
 429     }
 430 
 431     @SuppressWarnings("removal")
 432     private void interruptAll() {
 433         if (System.getSecurityManager() == null) {
 434             implInterruptAll();
 435         } else {
 436             PrivilegedAction<Void> pa = () -> {
 437                 implInterruptAll();
 438                 return null;
 439             };
 440             AccessController.doPrivileged(pa);
 441         }
 442     }
 443 
 444     /**
 445      * Cancel exception if not already cancelled.
 446      */
 447     private void cancelExecution() {
 448         if (!cancelled && CANCELLED.compareAndSet(this, false, true)) {
 449             // prevent new threads from starting
 450             flock.shutdown();
 451 
 452             // interrupt all unfinished threads
 453             interruptAll();
 454 
 455             // wakeup join
 456             flock.wakeup();
 457         }
 458     }
 459 
 460     /**
 461      * Schedules a task to cancel execution on timeout.
 462      */
 463     private void scheduleTimeout(Duration timeout) {
 464         assert Thread.currentThread() == flock.owner() && timerTask == null;
 465         timerTask = TimerSupport.schedule(timeout, () -> {
 466             if (!cancelled) {
 467                 timeoutExpired = true;
 468                 cancelExecution();
 469             }
 470         });
 471     }
 472 
 473     /**
 474      * Cancels the timer task if set.
 475      */
 476     private void cancelTimeout() {
 477         assert Thread.currentThread() == flock.owner();
 478         if (timerTask != null) {
 479             timerTask.cancel(false);
 480         }
 481     }
 482 
 483     /**
 484      * Invoked by the thread for a subtask when the subtask completes before execution
 485      * was cancelled.
 486      */
 487     private void onComplete(SubtaskImpl<? extends T> subtask) {
 488         assert subtask.state() != Subtask.State.UNAVAILABLE;
 489         if (joiner.onComplete(subtask)) {
 490             cancelExecution();
 491         }
 492     }
 493 
 494     /**
 495      * Initialize a new StructuredTaskScope.
 496      */
 497     @SuppressWarnings("this-escape")
 498     private StructuredTaskScope(Joiner<? super T, ? extends R> joiner,
 499                                 ThreadFactory threadFactory,
 500                                 String name) {
 501         this.joiner = joiner;
 502         this.threadFactory = threadFactory;
 503 
 504         if (name == null)
 505             name = Objects.toIdentityString(this);
 506         this.flock = ThreadFlock.open(name);
 507     }


 508 
 509     /**
 510      * Represents a subtask forked with {@link #fork(Callable)} or {@link #fork(Runnable)}.
 511      *
 512      * <p> Code that forks subtasks can use the {@link #get() get()} method after {@linkplain
 513      * #join() joining} to obtain the result of a subtask that completed successfully. It
 514      * can use the {@link #exception()} method to obtain the exception thrown by a subtask
 515      * that failed.
 516      *
 517      * @param <T> the result type
 518      * @since 21
 519      */
 520     @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 521     public sealed interface Subtask<T> extends Supplier<T> permits SubtaskImpl {








 522         /**
 523          * Represents the state of a subtask.
 524          * @see Subtask#state()
 525          * @since 21
 526          */
 527         @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 528         enum State {
 529             /**
 530              * The subtask result or exception is not available. This state indicates that
 531              * the subtask was forked but has not completed, it completed after execution
 532              * was cancelled, or it was forked after execution was cancelled (in which
 533              * case a thread was not created to execute the subtask).
 534              */
 535             UNAVAILABLE,
 536             /**
 537              * The subtask completed successfully. The {@link Subtask#get() Subtask.get()}
 538              * method can be used to get the result. This is a terminal state.

 539              */
 540             SUCCESS,
 541             /**
 542              * The subtask failed with an exception. The {@link Subtask#exception()
 543              * Subtask.exception()} method can be used to get the exception. This is a
 544              * terminal state.
 545              */
 546             FAILED,
 547         }
 548 
 549         /**
 550          * {@return the subtask state}
 551          */
 552         State state();
 553 
 554         /**
 555          * Returns the result of this subtask if it completed successfully. If
 556          * {@linkplain #fork(Callable) forked} to execute a value-returning task then the
 557          * result from the {@link Callable#call() call} method is returned. If
 558          * {@linkplain #fork(Runnable) forked} to execute a task that does not return a
 559          * result then {@code null} is returned.
 560          *
 561          * <p> Code executing in the scope owner thread can use this method to get the
 562          * result of a successful subtask only after it has {@linkplain #join() joined}.
 563          *
 564          * <p> Code executing in the {@code Joiner} {@link Joiner#onComplete(Subtask)
 565          * onComplete} method should test that the {@linkplain #state() subtask state} is
 566          * {@link State#SUCCESS SUCCESS} before using this method to get the result.
 567          *
 568          * @return the possibly-null result
 569          * @throws IllegalStateException if the subtask has not completed, did not complete
 570          * successfully, or the current thread is the task scope owner invoking this
 571          * method before {@linkplain #join() joining}
 572          * @see State#SUCCESS
 573          */
 574         T get();
 575 
 576         /**
 577          * {@return the exception thrown by this subtask if it failed} If
 578          * {@linkplain #fork(Callable) forked} to execute a value-returning task then
 579          * the exception thrown by the {@link Callable#call() call} method is returned.
 580          * If {@linkplain #fork(Runnable) forked} to execute a task that does not return
 581          * a result then the exception thrown by the {@link Runnable#run() run} method is
 582          * returned.
 583          *
 584          * <p> Code executing in the scope owner thread can use this method to get the
 585          * exception thrown by a failed subtask only after it has {@linkplain #join() joined}.
 586          *
 587          * <p> Code executing in a {@code Joiner} {@link Joiner#onComplete(Subtask)
 588          * onComplete} method should test that the {@linkplain #state() subtask state} is
 589          * {@link State#FAILED FAILED} before using this method to get the exception.
 590          *
 591          * @throws IllegalStateException if the subtask has not completed, completed with
 592          * a result, or the current thread is the task scope owner invoking this method
 593          * before {@linkplain #join() joining}
 594          * @see State#FAILED
 595          */
 596         Throwable exception();
 597     }
 598 
 599     /**
 600      * An object used with a {@link StructuredTaskScope} to handle subtask completion
 601      * and produce the result for a main task waiting in the {@link #join() join} method
 602      * for subtasks to complete.
 603      *
 604      * <p> Joiner defines static methods to create {@code Joiner} objects for common cases:
 605      * <ul>
 606      *   <li> {@link #allSuccessfulOrThrow() allSuccessfulOrThrow()} creates a {@code Joiner}
 607      *   that yields a stream of the completed subtasks for {@code join} to return when
 608      *   all subtasks complete successfully. It cancels execution and causes {@code join}
 609      *   to throw if any subtask fails.
 610      *   <li> {@link #anySuccessfulResultOrThrow() anySuccessfulResultOrThrow()} creates a
 611      *   {@code Joiner} that yields the result of the first subtask to succeed. It cancels
 612      *   execution and causes {@code join} to throw if all subtasks fail.
 613      *   <li> {@link #awaitAllSuccessfulOrThrow() awaitAllSuccessfulOrThrow()} creates a
 614      *   {@code Joiner} that waits for all successful subtasks. It cancels execution and
 615      *   causes {@code join} to throw if any subtask fails.
 616      *   <li> {@link #awaitAll() awaitAll()} creates a {@code Joiner} that waits for all
 617      *   subtasks. It does not cancel execution or cause {@code join} to throw.
 618      * </ul>
 619      *
 620      * <p> In addition to the methods to create {@code Joiner} objects for common cases,
 621      * the {@link #allUntil(Predicate) allUntil(Predicate)} method is defined to create a
 622      * {@code Joiner} that yields a stream of all subtasks. It is created with a {@link
 623      * Predicate Predicate} that determines if execution should continue or be cancelled.
 624      * This {@code Joiner} can be built upon to create custom policies that cancel
 625      * execution based on some condition.
 626      *
 627      * <p> More advanced policies can be developed by implementing the {@code Joiner}
 628      * interface. The {@link #onFork(Subtask)} method is invoked when subtasks are forked.
 629      * The {@link #onComplete(Subtask)} method is invoked when subtasks complete with a
 630      * result or exception. These methods return a {@code boolean} to indicate if execution
 631      * should be cancelled. These methods can be used to collect subtasks, results, or
 632      * exceptions, and control when to cancel execution. The {@link #result()} method
 633      * must be implemented to produce the result (or exception) for the {@code join}
 634      * method.
 635      *
 636      * <p> Unless otherwise specified, passing a {@code null} argument to a method
 637      * in this class will cause a {@link NullPointerException} to be thrown.
 638      *
 639      * @implSpec Implementations of this interface must be thread safe. The {@link
 640      * #onComplete(Subtask)} method defined by this interface may be invoked by several
 641      * threads concurrently.
 642      *
 643      * @apiNote It is very important that a new {@code Joiner} object is created for each
 644      * {@code StructuredTaskScope}. {@code Joiner} objects should never be shared with
 645      * different task scopes or re-used after a task is closed.
 646      *
 647      * <p> Designing a {@code Joiner} should take into account the code at the use-site
 648      * where the results from the {@link StructuredTaskScope#join() join} method are
 649      * processed. It should be clear what the {@code Joiner} does vs. the application
 650      * code at the use-site. In general, the {@code Joiner} implementation is not the
 651      * place to code "business logic". A {@code Joiner} should be designed to be as
 652      * general purpose as possible.
 653      *
 654      * @param <T> the result type of tasks executed in the task scope
 655      * @param <R> the type of results returned by the join method
 656      * @since 24
 657      * @see #open(Joiner)
 658      */
 659     @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 660     @FunctionalInterface
 661     public interface Joiner<T, R> {
 662 
 663         /**
 664          * Invoked by {@link #fork(Callable) fork(Callable)} and {@link #fork(Runnable)
 665          * fork(Runnable)} when forking a subtask. The method is invoked from the task
 666          * owner thread. The method is invoked before a thread is created to run the
 667          * subtask.
 668          *
 669          * @implSpec The default implementation throws {@code NullPointerException} if the
 670          * subtask is {@code null}. It throws {@code IllegalArgumentException} if the
 671          * subtask is not in the {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state, it
 672          * otherwise returns {@code false}.
 673          *
 674          * @apiNote This method is invoked by the {@code fork} methods. It should not be
 675          * invoked directly.
 676          *
 677          * @param subtask the subtask
 678          * @return {@code true} to cancel execution
 679          */
 680         default boolean onFork(Subtask<? extends T> subtask) {
 681             if (subtask.state() != Subtask.State.UNAVAILABLE) {
 682                 throw new IllegalArgumentException();
 683             }
 684             return false;
 685         }
 686 
 687         /**
 688          * Invoked by the thread started to execute a subtask after the subtask completes
 689          * successfully or fails with an exception. This method is not invoked if a
 690          * subtask completes after execution has been cancelled.
 691          *
 692          * @implSpec The default implementation throws {@code NullPointerException} if the
 693          * subtask is {@code null}. It throws {@code IllegalArgumentException} if the
 694          * subtask is not in the {@link Subtask.State#SUCCESS SUCCESS} or {@link
 695          * Subtask.State#FAILED FAILED} state, it otherwise returns {@code false}.
 696          *
 697          * @apiNote This method is invoked by subtasks when they complete. It should not
 698          * be invoked directly.
 699          *
 700          * @param subtask the subtask
 701          * @return {@code true} to cancel execution
 702          */
 703         default boolean onComplete(Subtask<? extends T> subtask) {
 704             if (subtask.state() == Subtask.State.UNAVAILABLE) {
 705                 throw new IllegalArgumentException();
 706             }
 707             return false;
 708         }
 709 
 710         /**
 711          * Invoked by {@link #join()} to produce the result (or exception) after waiting
 712          * for all subtasks to complete or execution to be cancelled. The result from this
 713          * method is returned by the {@code join} method. If this method throws, then
 714          * {@code join} throws {@link FailedException} with the exception thrown by
 715          * this method as the cause.
 716          *
 717          * <p> In normal usage, this method will be called at most once by the {@code join}
 718          * method to produce the result (or exception). The behavior of this method when
 719          * invoked directly, and invoked more than once, is not specified. Where possible,
 720          * an implementation should return an equal result (or throw the same exception)
 721          * on second or subsequent calls to produce the outcome.
 722          *
 723          * @apiNote This method is invoked by the {@code join} method. It should not be
 724          * invoked directly.
 725          *
 726          * @return the result
 727          * @throws Throwable the exception
 728          */
 729         R result() throws Throwable;
 730 
 731         /**
 732          * {@return a new Joiner object that yields a stream of all subtasks when all
 733          * subtasks complete successfully}
 734          * This method throws, and <a href="StructuredTaskScope.html#CancelExecution">
 735          * execution is cancelled</a>, if any subtask fails.
 736          *
 737          * <p> If all subtasks complete successfully, the joiner's {@link Joiner#result()}
 738          * method returns a stream of all subtasks in the order that they were forked.
 739          * If any subtask failed then the {@code result} method throws the exception from
 740          * the first subtask to fail.
 741          *
 742          * @apiNote Joiners returned by this method are suited to cases where all subtasks
 743          * return a result of the same type. Joiners returned by {@link
 744          * #awaitAllSuccessfulOrThrow()} are suited to cases where the subtasks return
 745          * results of different types.
 746          *
 747          * @param <T> the result type of subtasks
 748          */
 749         static <T> Joiner<T, Stream<Subtask<T>>> allSuccessfulOrThrow() {
 750             return new AllSuccessful<>();
 751         }
 752 
 753         /**
 754          * {@return a new Joiner object that yields the result of any subtask that
 755          * completed successfully}
 756          * This method throws, and <a href="StructuredTaskScope.html#CancelExecution">
 757          * execution is cancelled</a>, if all subtasks fail.
 758          *
 759          * <p> The joiner's {@link Joiner#result()} method returns the result of a subtask
 760          * that completed successfully. If all subtasks fail then the {@code result} method
 761          * throws the exception from one of the failed subtasks. The {@code result} method
 762          * throws {@code NoSuchElementException} if no subtasks were forked.
 763          *
 764          * @param <T> the result type of subtasks
 765          */
 766         static <T> Joiner<T, T> anySuccessfulResultOrThrow() {
 767             return new AnySuccessful<>();
 768         }
 769 
 770         /**
 771          * {@return a new Joiner object that waits for subtasks to complete successfully}
 772          * This method throws, and <a href="StructuredTaskScope.html#CancelExecution">
 773          * execution is cancelled</a>, if any subtask fails.
 774          *
 775          * <p> The joiner's {@link Joiner#result() result} method returns {@code null}
 776          * if all subtasks complete successfully, or throws the exception from the first
 777          * subtask to fail.
 778          *
 779          * @apiNote Joiners returned by this method are suited to cases where subtasks
 780          * return results of different types. Joiners returned by {@link #allSuccessfulOrThrow()}
 781          * are suited to cases where the subtasks return a result of the same type.
 782          *
 783          * @param <T> the result type of subtasks
 784          */
 785         static <T> Joiner<T, Void> awaitAllSuccessfulOrThrow() {
 786             return new AwaitSuccessful<>();
 787         }
 788 
 789         /**
 790          * {@return a new Joiner object that waits for all subtasks to complete}
 791          * This method does not cancel execution if a subtask fails.
 792          *
 793          * <p> The joiner's {@link Joiner#result() result} method returns {@code null}.
 794          *
 795          * @apiNote This Joiner can be useful for cases where subtasks make use of
 796          * <em>side-effects</em> rather than return results or fail with exceptions.
 797          * The {@link #fork(Runnable) fork(Runnable)} method can be used to fork subtasks
 798          * that do not return a result.
 799          *
 800          * @param <T> the result type of subtasks
 801          */
 802         static <T> Joiner<T, Void> awaitAll() {
 803             // ensure that new Joiner object is returned
 804             return new Joiner<T, Void>() {
 805                 @Override
 806                 public Void result() {
 807                     return null;
 808                 }
 809             };
 810         }
 811 
 812         /**
 813          * {@return a new Joiner object that yields a stream of all subtasks when all
 814          * subtasks complete or <a href="StructuredTaskScope.html#CancelExecution">
 815          * execution is cancelled</a> by a predicate}
 816          *
 817          * <p> The joiner's {@link Joiner#onComplete(Subtask)} method invokes the
 818          * predicate's {@link Predicate#test(Object) test} method with the subtask that
 819          * completed successfully or failed with an exception. If the {@code test} method
 820          * returns {@code true} then <a href="StructuredTaskScope.html#CancelExecution">
 821          * execution is cancelled</a>. The {@code test} method must be thread safe as it
 822          * may be invoked concurrently from several threads.
 823          *
 824          * <p> The joiner's {@link #result()} method returns the stream of all subtasks,
 825          * in fork order. The stream may contain subtasks that have completed
 826          * (in {@link Subtask.State#SUCCESS SUCCESS} or {@link Subtask.State#FAILED FAILED}
 827          * state) or subtasks in the {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state
 828          * if execution was cancelled before all subtasks were forked or completed.
 829          *
 830          * <p> The following example uses this method to create a {@code Joiner} that
 831          * <a href="StructuredTaskScope.html#CancelExecution">cancels execution</a> when
 832          * two or more subtasks fail.
 833          * {@snippet lang=java :
 834          *    class CancelAfterTwoFailures<T> implements Predicate<Subtask<? extends T>> {
 835          *         private final AtomicInteger failedCount = new AtomicInteger();
 836          *         @Override
 837          *         public boolean test(Subtask<? extends T> subtask) {
 838          *             return subtask.state() == Subtask.State.FAILED
 839          *                     && failedCount.incrementAndGet() >= 2;
 840          *         }
 841          *     }
 842          *
 843          *     var joiner = Joiner.all(new CancelAfterTwoFailures<String>());
 844          * }
 845          *
 846          * @param isDone the predicate to evaluate completed subtasks
 847          * @param <T> the result type of subtasks
 848          */
 849         static <T> Joiner<T, Stream<Subtask<T>>> allUntil(Predicate<Subtask<? extends T>> isDone) {
 850             return new AllSubtasks<>(isDone);
 851         }
 852     }
 853 
 854     /**
 855      * Represents the configuration for a {@code StructuredTaskScope}.
 856      *
 857      * <p> The configuration for a {@code StructuredTaskScope} consists of a {@link
 858      * ThreadFactory} to create threads, an optional name for the purposes of monitoring
 859      * and management, and an optional timeout.
 860      *
 861      * <p> Creating a {@code StructuredTaskScope} with {@link #open()} or {@link #open(Joiner)}
 862      * uses the <a href="StructuredTaskScope.html#DefaultConfiguration">default
 863      * configuration</a>. The default configuration consists of a thread factory that
 864      * creates unnamed <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">
 865      * virtual threads</a>, no name for monitoring and management purposes, and no timeout.
 866      *
 867      * <p> Creating a {@code StructuredTaskScope} with its 2-arg {@link #open(Joiner, Function)
 868      * open} method allows a different configuration to be used. The function specified
 869      * to the {@code open} method is applied to the default configuration and returns the
 870      * configuration for the {@code StructuredTaskScope} under construction. The function
 871      * can use the {@code with-} prefixed methods defined here to specify the components
 872      * of the configuration to use.
 873      *
 874      * <p> Unless otherwise specified, passing a {@code null} argument to a method
 875      * in this class will cause a {@link NullPointerException} to be thrown.
 876      *
 877      * @since 24

 878      */
 879     @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 880     public sealed interface Config permits ConfigImpl {
 881         /**
 882          * {@return a new {@code Config} object with the given thread factory}
 883          * The other components are the same as this object. The thread factory is used by
 884          * a task scope to create threads when {@linkplain #fork(Callable) forking} subtasks.
 885          * @param threadFactory the thread factory
 886          *
 887          * @apiNote The thread factory will typically create
 888          * <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>,
 889          * maybe with names for monitoring purposes, an {@linkplain Thread.UncaughtExceptionHandler
 890          * uncaught exception handler}, or other properties configured.
 891          *
 892          * @see #fork(Callable)
 893          */
 894         Config withThreadFactory(ThreadFactory threadFactory);
 895 
 896         /**
 897          * {@return a new {@code Config} object with the given name}
 898          * The other components are the same as this object. A task scope is optionally
 899          * named for the purposes of monitoring and management.
 900          * @param name the name
 901          * @see StructuredTaskScope#toString()
 902          */
 903         Config withName(String name);
 904 
 905         /**
 906          * {@return a new {@code Config} object with the given timeout}
 907          * The other components are the same as this object.
 908          * @param timeout the timeout
 909          *
 910          * @apiNote Applications using deadlines, expressed as an {@link java.time.Instant},
 911          * can use {@link Duration#between Duration.between(Instant.now(), deadline)} to
 912          * compute the timeout for this method.
 913          *
 914          * @see #join()
 915          */
 916         Config withTimeout(Duration timeout);
 917     }
 918 
 919     /**
 920      * Exception thrown by {@link #join()} when the outcome is an exception rather than a
 921      * result.
 922      *
 923      * @since 24
 924      */
 925     @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 926     public static class FailedException extends RuntimeException {
 927         @java.io.Serial
 928         static final long serialVersionUID = -1533055100078459923L;


 929 
 930         /**
 931          * Constructs a {@code FailedException} with the specified cause.
 932          *
 933          * @param  cause the cause, can be {@code null}
 934          */
 935         public FailedException(Throwable cause) {
 936             super(cause);
 937         }
 938     }
 939 
 940     /**
 941      * Exception thrown by {@link #join()} if the task scope was created a timeout and
 942      * the timeout expired before or while waiting in {@code join}.
 943      *
 944      * @since 24
 945      * @see Config#withTimeout(Duration)
 946      */
 947     @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 948     public static class TimeoutException extends RuntimeException {
 949         @java.io.Serial
 950         static final long serialVersionUID = 705788143955048766L;
 951 
 952         /**
 953          * Constructs a {@code TimeoutException} with no detail message.
 954          */
 955         public TimeoutException() { }
 956     }
 957 
 958     /**
 959      * Opens a new structured task scope to use the given {@code Joiner} object and with
 960      * configuration that is the result of applying the given function to the
 961      * <a href="#DefaultConfiguration">default configuration</a>.
 962      *
 963      * <p> The {@code configFunction} is called with the default configuration and returns
 964      * the configuration for the new structured task scope. The function may, for example,
 965      * set the {@linkplain Config#withThreadFactory(ThreadFactory) ThreadFactory} or set
 966      * a {@linkplain Config#withTimeout(Duration) timeout}.
 967      *
 968      * <p> If a {@code ThreadFactory} is set then its {@link ThreadFactory#newThread(Runnable)
 969      * newThread} method will be called to create threads when {@linkplain #fork(Callable)
 970      * forking} subtasks in this task scope. If a {@code ThreadFactory} is not set then
 971      * forking subtasks will create an unnamed virtual thread for each subtask.
 972      *
 973      * <p> If a {@linkplain Config#withTimeout(Duration) timeout} is set then it starts
 974      * when the task scope is opened. If the timeout expires before the task scope has
 975      * {@linkplain #join() joined} then execution is cancelled and the {@code join} method
 976      * throws {@link TimeoutException}.
 977      *
 978      * <p> The new task scope is owned by the current thread. Only code executing in this
 979      * thread can {@linkplain #fork(Callable) fork}, {@linkplain #join() join}, or
 980      * {@linkplain #close close} the task scope.
 981      *
 982      * <p> Construction captures the current thread's {@linkplain ScopedValue scoped
 983      * value} bindings for inheritance by threads started in the task scope.
 984      *
 985      * @param joiner the joiner
 986      * @param configFunction a function to produce the configuration
 987      * @return a new task scope
 988      * @param <T> the result type of tasks executed in the task scope
 989      * @param <R> the type of the result returned by the join method
 990      * @since 24
 991      */
 992     public static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner,
 993                                                         Function<Config, Config> configFunction) {
 994         Objects.requireNonNull(joiner);
 995 
 996         var config = (ConfigImpl) configFunction.apply(ConfigImpl.defaultConfig());
 997         var scope = new StructuredTaskScope<T, R>(joiner, config.threadFactory(), config.name());
 998 
 999         // schedule timeout
1000         Duration timeout = config.timeout();
1001         if (timeout != null) {
1002             boolean scheduled = false;
1003             try {
1004                 scope.scheduleTimeout(timeout);
1005                 scheduled = true;
1006             } finally {
1007                 if (!scheduled) {
1008                     scope.close();  // pop if scheduling timeout failed
1009                 }
1010             }
1011         }
1012 
1013         return scope;
1014     }
1015 
1016     /**
1017      * Opens a new structured task scope to use the given {@code Joiner} object. The
1018      * task scope is created with the <a href="#DefaultConfiguration">default configuration</a>.
1019      * The default configuration has a {@code ThreadFactory} that creates unnamed
1020      * <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>,
1021      * is unnamed for monitoring and management purposes, and has no timeout.
1022      *
1023      * @implSpec
1024      * This factory method is equivalent to invoking the 2-arg open method with the given
1025      * joiner and the {@linkplain Function#identity() identity function}.
1026      *
1027      * @param joiner the joiner
1028      * @return a new task scope
1029      * @param <T> the result type of tasks executed in the task scope
1030      * @param <R> the type of the result returned by the join method
1031      * @since 24
1032      */
1033     public static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner) {
1034         return open(joiner, Function.identity());



1035     }
1036 
1037     /**
1038      * Opens a new structured task scope that can be used to fork subtasks that return
1039      * results of any type. The {@link #join()} method waits for all subtasks to succeed
1040      * or any subtask to fail.
1041      *
1042      * <p> The {@code join} method returns {@code null} if all subtasks complete successfully.
1043      * It throws {@link FailedException} if any subtask fails, with the exception from
1044      * the first subtask to fail as the cause.
1045      *
1046      * <p> The task scope is created with the <a href="#DefaultConfiguration">default
1047      * configuration</a>. The default configuration has a {@code ThreadFactory} that creates
1048      * unnamed <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual
1049      * threads</a>, is unnamed for monitoring and management purposes, and has no timeout.
1050      *
1051      * @implSpec
1052      * This factory method is equivalent to invoking the 2-arg open method with a joiner
1053      * created with {@link Joiner#awaitAllSuccessfulOrThrow() awaitAllSuccessfulOrThrow()}
1054      * and the {@linkplain Function#identity() identity function}.
1055      *
1056      * @param <T> the result type of subtasks
1057      * @return a new task scope
1058      * @since 24
1059      */
1060     public static <T> StructuredTaskScope<T, Void> open() {
1061         return open(Joiner.awaitAllSuccessfulOrThrow(), Function.identity());

1062     }
1063 
1064     /**
1065      * Starts a new thread in this task scope to execute a value-returning task, thus
1066      * creating a <em>subtask</em>. The value-returning task is provided to this method
1067      * as a {@link Callable}, the thread executes the task's {@link Callable#call() call}
1068      * method.















1069      *
1070      * <p> This method first creates a {@link Subtask Subtask} to represent the <em>forked
1071      * subtask</em>. It invokes the joiner's {@link Joiner#onFork(Subtask) onFork} method
1072      * with the {@code Subtask} object. If the {@code onFork} completes with an exception
1073      * or error then it is propagated by the {@code fork} method. If execution is
1074      * {@linkplain #isCancelled() cancelled}, or {@code onFork} returns {@code true} to
1075      * cancel execution, then this method returns the {@code Subtask} (in the {@link
1076      * Subtask.State#UNAVAILABLE UNAVAILABLE} state) without creating a thread to execute
1077      * the subtask. If execution is not cancelled then a thread is created with the
1078      * {@link ThreadFactory} configured when the task scope was created, and the thread is
1079      * started. Forking a subtask inherits the current thread's {@linkplain ScopedValue
1080      * scoped value} bindings. The bindings must match the bindings captured when the
1081      * task scope was opened. If the subtask completes (successfully or with an exception)
1082      * before execution is cancelled, then the thread invokes the joiner's
1083      * {@link Joiner#onComplete(Subtask) onComplete} method with subtask in the
1084      * {@link Subtask.State#SUCCESS SUCCESS} or {@link Subtask.State#FAILED FAILED} state.
1085      *
1086      * <p> This method returns the {@link Subtask Subtask} object. In some usages, this
1087      * object may be used to get its result. In other cases it may be used for correlation
1088      * or just discarded. To ensure correct usage, the {@link Subtask#get() Subtask.get()}
1089      * method may only be called by the task scope owner to get the result after it has
1090      * waited for subtasks to complete with the {@link #join() join} method and the subtask
1091      * completed successfully. Similarly, the {@link Subtask#exception() Subtask.exception()}
1092      * method may only be called by the task scope owner after it has joined and the subtask
1093      * failed. If execution was cancelled before the subtask was forked, or before it
1094      * completes, then neither method can be used to obtain the outcome.
1095      *
1096      * <p> This method may only be invoked by the task scope owner.


1097      *
1098      * @param task the value-returning task for the thread to execute
1099      * @param <U> the result type
1100      * @return the subtask
1101      * @throws WrongThreadException if the current thread is not the task scope owner
1102      * @throws IllegalStateException if the owner has already {@linkplain #join() joined}
1103      * or the task scope is closed
1104      * @throws StructureViolationException if the current scoped value bindings are not
1105      * the same as when the task scope was created
1106      * @throws RejectedExecutionException if the thread factory rejected creating a
1107      * thread to run the subtask
1108      */
1109     public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
1110         Objects.requireNonNull(task);
1111         ensureOwner();
1112         ensureNotJoined();
1113 
1114         var subtask = new SubtaskImpl<U>(this, task);
1115 
1116         // notify joiner, even if cancelled
1117         if (joiner.onFork(subtask)) {
1118             cancelExecution();


1119         }
1120 
1121         if (!cancelled) {

1122             // create thread to run task
1123             Thread thread = threadFactory.newThread(subtask);
1124             if (thread == null) {
1125                 throw new RejectedExecutionException("Rejected by thread factory");
1126             }
1127 
1128             // attempt to start the thread
1129             try {
1130                 flock.start(thread);
1131             } catch (IllegalStateException e) {
1132                 // shutdown by another thread, or underlying flock is shutdown due
1133                 // to unstructured use
1134             }
1135         }
1136 
1137         // force owner to join
1138         state = ST_FORKED;




1139         return subtask;
1140     }
1141 
1142     /**
1143      * Starts a new thread in this task scope to execute a task that does not return a
1144      * result, creating a <em>subtask</em>.



























1145      *
1146      * <p> This method works exactly the same as {@link #fork(Callable)} except that
1147      * the task is provided to this method as a {@link Runnable}, the thread executes
1148      * the task's {@link Runnable#run() run} method, and its result is {@code null}.

1149      *
1150      * @param task the task for the thread to execute
1151      * @return the subtask
1152      * @throws WrongThreadException if the current thread is not the task scope owner
1153      * @throws IllegalStateException if the owner has already {@linkplain #join() joined}
1154      * or the task scope is closed
1155      * @throws StructureViolationException if the current scoped value bindings are not
1156      * the same as when the task scope was created
1157      * @throws RejectedExecutionException if the thread factory rejected creating a
1158      * thread to run the subtask
1159      * @since 24
1160      */
1161     public Subtask<? extends T> fork(Runnable task) {
1162         Objects.requireNonNull(task);
1163         return fork(() -> { task.run(); return null; });




1164     }
1165 
1166     /**
1167      * Waits for all subtasks started in this task scope to complete or execution to be
1168      * cancelled. If a {@linkplain  Config#withTimeout(Duration) timeout} has been set
1169      * then execution will be cancelled if the timeout expires before or while waiting.
1170      * Once finished waiting, the {@code Joiner}'s {@link Joiner#result() result} method
1171      * is invoked to get the result or throw an exception. If the {@code result} method
1172      * throws then this method throws {@code FailedException} with the exception thrown
1173      * by the {@code result()} method as the cause.
1174      *
1175      * <p> This method waits for all subtasks by waiting for all threads {@linkplain
1176      * #fork(Callable) started} in this task scope to finish execution. It stops waiting
1177      * when all threads finish, the {@code Joiner}'s {@link Joiner#onFork(Subtask)
1178      * onFork} or {@link Joiner#onComplete(Subtask) onComplete} returns {@code true}
1179      * to cancel execution, the timeout (if set) expires, or the current thread is
1180      * {@linkplain Thread#interrupt() interrupted}.

1181      *
1182      * <p> This method may only be invoked by the task scope owner, and only once.



1183      *
1184      * @return the {@link Joiner#result() result}


1185      * @throws WrongThreadException if the current thread is not the task scope owner
1186      * @throws IllegalStateException if already joined or this task scope is closed
1187      * @throws FailedException if the <i>outcome</i> is an exception, thrown with the
1188      * exception from {@link Joiner#result() Joiner.result()} as the cause
1189      * @throws TimeoutException if a timeout is set and the timeout expires before or
1190      * while waiting
1191      * @throws InterruptedException if interrupted while waiting












1192      */
1193     public R join() throws InterruptedException {
1194         ensureOwner();
1195         ensureNotJoined();






1196 
1197         // join started
1198         state = ST_JOIN_STARTED;










1199 
1200         // wait for all subtasks, execution to be cancelled, or interrupt
1201         flock.awaitAll();








1202 
1203         // throw if timeout expired
1204         if (timeoutExpired) {
1205             throw new TimeoutException();
1206         }
1207         cancelTimeout();
1208 
1209         // all subtasks completed or cancelled
1210         state = ST_JOIN_COMPLETED;
1211 
1212         // invoke joiner to get result
1213         try {
1214             return joiner.result();
1215         } catch (Throwable e) {
1216             throw new FailedException(e);


1217         }
1218     }
1219 
1220     /**
1221      * {@return {@code true} if <a href="#CancelExecution">execution is cancelled</a>,
1222      * or in the process of being cancelled, otherwise {@code false}}



















1223      *
1224      * <p> Cancelling execution prevents new threads from starting in the task scope and
1225      * {@linkplain Thread#interrupt() interrupts} threads executing unfinished subtasks.
1226      * It may take some time before the interrupted threads finish execution; this
1227      * method may return {@code true} before all threads have been interrupted or before
1228      * all threads have finished.
1229      *
1230      * @apiNote A main task with a lengthy "forking phase" (the code that executes before
1231      * the main task invokes {@link #join() join}) may use this method to avoid doing work
1232      * in cases where execution was cancelled by the completion of a previously forked
1233      * subtask or timeout.
1234      *
1235      * @since 24









1236      */
1237     public boolean isCancelled() {
1238         return cancelled;











1239     }
1240 
1241     /**
1242      * Closes this task scope.
1243      *
1244      * <p> This method first <a href="#CancelExecution">cancels execution</a>, if not
1245      * already cancelled. This interrupts the threads executing unfinished subtasks. This
1246      * method then waits for all threads to finish. If interrupted while waiting then it
1247      * will continue to wait until the threads finish, before completing with the interrupt
1248      * status set.
1249      *
1250      * <p> This method may only be invoked by the task scope owner. If the task scope
1251      * is already closed then the task scope owner invoking this method has no effect.
1252      *
1253      * <p> A {@code StructuredTaskScope} is intended to be used in a <em>structured
1254      * manner</em>. If this method is called to close a task scope before nested task
1255      * scopes are closed then it closes the underlying construct of each nested task scope
1256      * (in the reverse order that they were created in), closes this task scope, and then
1257      * throws {@link StructureViolationException}.
1258      * Similarly, if this method is called to close a task scope while executing with
1259      * {@linkplain ScopedValue scoped value} bindings, and the task scope was created
1260      * before the scoped values were bound, then {@code StructureViolationException} is
1261      * thrown after closing the task scope.
1262      * If a thread terminates without first closing task scopes that it owns then
1263      * termination will cause the underlying construct of each of its open tasks scopes to
1264      * be closed. Closing is performed in the reverse order that the task scopes were
1265      * created in. Thread termination may therefore be delayed when the task scope owner
1266      * has to wait for threads forked in these task scopes to finish.
1267      *



1268      * @throws IllegalStateException thrown after closing the task scope if the task scope
1269      * owner did not attempt to join after forking
1270      * @throws WrongThreadException if the current thread is not the task scope owner
1271      * @throws StructureViolationException if a structure violation was detected
1272      */
1273     @Override
1274     public void close() {
1275         ensureOwner();
1276         int s = state;
1277         if (s == ST_CLOSED) {
1278             return;
1279         }
1280 
1281         // cancel execution if join did not complete
1282         if (s < ST_JOIN_COMPLETED) {
1283             cancelExecution();
1284             cancelTimeout();
1285         }
1286 
1287         // wait for stragglers
1288         try {


1289             flock.close();
1290         } finally {
1291             state = ST_CLOSED;
1292         }
1293 
1294         // throw ISE if the owner didn't join after forking
1295         if (s == ST_FORKED) {
1296             throw new IllegalStateException("Owner did not join after forking");

1297         }
1298     }
1299 
1300     /**
1301      * {@inheritDoc}  If a {@link Config#withName(String) name} for monitoring and
1302      * monitoring purposes has been set then the string representation includes the name.
1303      */
1304     @Override
1305     public String toString() {
1306         return flock.name();






1307     }
1308 
1309     /**
1310      * Subtask implementation, runs the task specified to the fork method.
1311      */
1312     private static final class SubtaskImpl<T> implements Subtask<T>, Runnable {
1313         private static final AltResult RESULT_NULL = new AltResult(Subtask.State.SUCCESS);
1314 
1315         private record AltResult(Subtask.State state, Throwable exception) {
1316             AltResult(Subtask.State state) {
1317                 this(state, null);
1318             }
1319         }
1320 
1321         private final StructuredTaskScope<? super T, ?> scope;
1322         private final Callable<? extends T> task;

1323         private volatile Object result;
1324 
1325         SubtaskImpl(StructuredTaskScope<? super T, ?> scope, Callable<? extends T> task) {


1326             this.scope = scope;
1327             this.task = task;

1328         }
1329 
1330         @Override
1331         public void run() {
1332             T result = null;
1333             Throwable ex = null;
1334             try {
1335                 result = task.call();
1336             } catch (Throwable e) {
1337                 ex = e;
1338             }
1339 
1340             // nothing to do if task scope is cancelled
1341             if (scope.isCancelled())
1342                 return;
1343 
1344             // set result/exception and invoke onComplete
1345             if (ex == null) {
1346                 this.result = (result != null) ? result : RESULT_NULL;
1347             } else {
1348                 this.result = new AltResult(State.FAILED, ex);
1349             }
1350             scope.onComplete(this);





1351         }
1352 
1353         @Override
1354         public Subtask.State state() {
1355             Object result = this.result;
1356             if (result == null) {
1357                 return State.UNAVAILABLE;
1358             } else if (result instanceof AltResult alt) {
1359                 // null or failed
1360                 return alt.state();
1361             } else {
1362                 return State.SUCCESS;
1363             }
1364         }
1365 
1366 
1367         @Override
1368         public T get() {
1369             scope.ensureJoinedIfOwner();
1370             Object result = this.result;
1371             if (result instanceof AltResult) {
1372                 if (result == RESULT_NULL) return null;
1373             } else if (result != null) {
1374                 @SuppressWarnings("unchecked")
1375                 T r = (T) result;
1376                 return r;
1377             }
1378             throw new IllegalStateException(
1379                     "Result is unavailable or subtask did not complete successfully");
1380         }
1381 
1382         @Override
1383         public Throwable exception() {
1384             scope.ensureJoinedIfOwner();
1385             Object result = this.result;
1386             if (result instanceof AltResult alt && alt.state() == State.FAILED) {
1387                 return alt.exception();
1388             }
1389             throw new IllegalStateException(
1390                     "Exception is unavailable or subtask did not complete with exception");
1391         }
1392 
1393         @Override
1394         public String toString() {
1395             String stateAsString = switch (state()) {
1396                 case UNAVAILABLE -> "[Unavailable]";
1397                 case SUCCESS     -> "[Completed successfully]";
1398                 case FAILED      -> {
1399                     Throwable ex = ((AltResult) result).exception();
1400                     yield "[Failed: " + ex + "]";
1401                 }
1402             };
1403             return Objects.toIdentityString(this) + stateAsString;
1404         }
1405     }
1406 
1407     /**
1408      * A joiner that returns a stream of all subtasks when all subtasks complete
1409      * successfully. If any subtask fails then execution is cancelled.














1410      */
1411     private static final class AllSuccessful<T> implements Joiner<T, Stream<Subtask<T>>> {



1412         private static final VarHandle FIRST_EXCEPTION;
1413         static {
1414             MethodHandles.Lookup l = MethodHandles.lookup();

1415             FIRST_EXCEPTION = MhUtil.findVarHandle(l, "firstException", Throwable.class);
1416         }

1417         private volatile Throwable firstException;
1418 
1419         // list of forked subtasks, only accessed by owner thread
1420         private final List<Subtask<T>> subtasks = new ArrayList<>();
1421 
1422         @Override
1423         public boolean onFork(Subtask<? extends T> subtask) {
1424             @SuppressWarnings("unchecked")
1425             var tmp = (Subtask<T>) Objects.requireNonNull(subtask);
1426             subtasks.add(tmp);
1427             return false;









1428         }
1429 
1430         @Override
1431         public boolean onComplete(Subtask<? extends T> subtask) {
1432             return (subtask.state() == Subtask.State.FAILED)
1433                     && (firstException == null)
1434                     && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());



1435         }
1436 
1437         @Override
1438         public Stream<Subtask<T>> result() throws Throwable {
1439             Throwable ex = firstException;
1440             if (ex != null) {
1441                 throw ex;
1442             } else {
1443                 return subtasks.stream();
1444             }
1445         }
1446     }
1447 
1448     /**
1449      * A joiner that returns the result of the first subtask to complete successfully.
1450      * If any subtask completes successfully then execution is cancelled.
1451      */
1452     private static final class AnySuccessful<T> implements Joiner<T, T> {
1453         private static final VarHandle FIRST_SUCCESS;
1454         private static final VarHandle FIRST_EXCEPTION;
1455         static {
1456             MethodHandles.Lookup l = MethodHandles.lookup();
1457             FIRST_SUCCESS = MhUtil.findVarHandle(l, "firstSuccess", Subtask.class);
1458             FIRST_EXCEPTION = MhUtil.findVarHandle(l, "firstException", Throwable.class);
1459         }
1460         private volatile Subtask<T> firstSuccess;
1461         private volatile Throwable firstException;
1462 
















1463         @Override
1464         public boolean onComplete(Subtask<? extends T> subtask) {
1465             Objects.requireNonNull(subtask);
1466             if (firstSuccess == null) {
1467                 if (subtask.state() == Subtask.State.SUCCESS) {
1468                     // capture the first subtask that completes successfully
1469                     return FIRST_SUCCESS.compareAndSet(this, null, subtask);
1470                 } else if (firstException == null) {
1471                     // capture the exception thrown by the first task to fail
1472                     FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
1473                 }
1474             }
1475             return false;
1476         }
1477 

















1478         @Override
1479         public T result() throws Throwable {
1480             Subtask<T> firstSuccess = this.firstSuccess;
1481             if (firstSuccess != null) {
1482                 return firstSuccess.get();
1483             }
1484             Throwable firstException = this.firstException;
1485             if (firstException != null) {
1486                 throw firstException;
1487             } else {
1488                 throw new NoSuchElementException("No subtasks completed");
1489             }
1490         }
1491     }
1492 
1493     /**
1494      * A joiner that that waits for all successful subtasks. If any subtask fails the
1495      * execution is cancelled.
1496      */
1497     private static final class AwaitSuccessful<T> implements Joiner<T, Void> {
1498         private static final VarHandle FIRST_EXCEPTION;
1499         static {
1500             MethodHandles.Lookup l = MethodHandles.lookup();
1501             FIRST_EXCEPTION = MhUtil.findVarHandle(l, "firstException", Throwable.class);







1502         }
1503         private volatile Throwable firstException;
1504 
1505         @Override
1506         public boolean onComplete(Subtask<? extends T> subtask) {
1507             return (subtask.state() == Subtask.State.FAILED)
1508                     && (firstException == null)
1509                     && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
1510         }
























1511 
1512         @Override
1513         public Void result() throws Throwable {
1514             Throwable ex = firstException;
1515             if (ex != null) {
1516                 throw ex;
1517             } else {
1518                 return null;
1519             }


1520         }
1521     }
1522 
1523     /**
1524      * A joiner that returns a stream of all subtasks.














1525      */
1526     private static class AllSubtasks<T> implements Joiner<T, Stream<Subtask<T>>> {
1527         private final Predicate<Subtask<? extends T>> isDone;
1528         // list of forked subtasks, only accessed by owner thread
1529         private final List<Subtask<T>> subtasks = new ArrayList<>();

1530 
1531         AllSubtasks(Predicate<Subtask<? extends T>> isDone) {
1532             this.isDone = Objects.requireNonNull(isDone);
















1533         }
1534 
1535         @Override
1536         public boolean onFork(Subtask<? extends T> subtask) {
1537             @SuppressWarnings("unchecked")
1538             var tmp = (Subtask<T>) Objects.requireNonNull(subtask);
1539             subtasks.add(tmp);
1540             return false;


1541         }
1542 
1543         @Override
1544         public boolean onComplete(Subtask<? extends T> subtask) {
1545             return isDone.test(Objects.requireNonNull(subtask));




1546         }
1547 















1548         @Override
1549         public Stream<Subtask<T>> result() {
1550             return subtasks.stream();
1551         }
1552     }
1553 
1554     /**
1555      * Implementation of Config.
1556      */
1557     private record ConfigImpl(ThreadFactory threadFactory,
1558                               String name,
1559                               Duration timeout) implements Config {
1560         static Config defaultConfig() {
1561             return new ConfigImpl(Thread.ofVirtual().factory(), null, null);
1562         }
1563 
















1564         @Override
1565         public Config withThreadFactory(ThreadFactory threadFactory) {
1566             return new ConfigImpl(Objects.requireNonNull(threadFactory), name, timeout);



1567         }
1568 
1569         @Override
1570         public Config withName(String name) {
1571             return new ConfigImpl(threadFactory, Objects.requireNonNull(name), timeout);










1572         }
1573 
1574         @Override
1575         public Config withTimeout(Duration timeout) {
1576             return new ConfigImpl(threadFactory, name, Objects.requireNonNull(timeout));









1577         }
1578     }
1579 
1580     /**
1581      * Used to schedule a task to cancel execution when a timeout expires.
1582      */
1583     private static class TimerSupport {
1584         private static final ScheduledExecutorService DELAYED_TASK_SCHEDULER;
1585         static {
1586             ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor)
1587                 Executors.newScheduledThreadPool(1, task -> {
1588                     Thread t = InnocuousThread.newThread("StructuredTaskScope-Timer", task);
1589                     t.setDaemon(true);
1590                     return t;
1591                 });
1592             stpe.setRemoveOnCancelPolicy(true);
1593             DELAYED_TASK_SCHEDULER = stpe;
1594         }
1595 
1596         static Future<?> schedule(Duration timeout, Runnable task) {
1597             long nanos = TimeUnit.NANOSECONDS.convert(timeout);
1598             return DELAYED_TASK_SCHEDULER.schedule(task, nanos, TimeUnit.NANOSECONDS);





1599         }
1600     }
1601 }
< prev index next >