< prev index next >

src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java

Print this page

   1 /*
   2  * Copyright (c) 2021, 2023, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.concurrent;
  26 
  27 import java.lang.invoke.MethodHandles;
  28 import java.lang.invoke.VarHandle;
  29 import java.security.AccessController;
  30 import java.security.PrivilegedAction;
  31 import java.time.Duration;
  32 import java.time.Instant;


  33 import java.util.Objects;
  34 import java.util.Optional;
  35 import java.util.concurrent.locks.ReentrantLock;
  36 import java.util.function.Function;

  37 import java.util.function.Supplier;

  38 import jdk.internal.javac.PreviewFeature;

  39 import jdk.internal.misc.ThreadFlock;
  40 
  41 /**
  42  * A basic API for <em>structured concurrency</em>. {@code StructuredTaskScope} supports
  43  * cases where a task splits into several concurrent subtasks, and where the subtasks must
  44  * complete before the main task continues. A {@code StructuredTaskScope} can be used to
  45  * ensure that the lifetime of a concurrent operation is confined by a <em>syntax block</em>,
  46  * just like that of a sequential operation in structured programming.
  47  *
  48  * <h2>Basic operation</h2>












  49  *
  50  * A {@code StructuredTaskScope} is created with one of its public constructors. It defines
  51  * the {@link #fork(Callable) fork} method to start a thread to execute a subtask, the {@link
  52  * #join() join} method to wait for all subtasks to finish, and the {@link #close() close}
  53  * method to close the task scope. The API is intended to be used with the {@code
  54  * try-with-resources} statement. The intention is that code in the try <em>block</em>
  55  * uses the {@code fork} method to fork threads to execute the subtasks, wait for the
  56  * subtasks to finish with the {@code join} method, and then <em>process the results</em>.
  57  * A call to the {@code fork} method returns a {@link Subtask Subtask} to representing
  58  * the <em>forked subtask</em>. Once {@code join} is called, the {@code Subtask} can be
  59  * used to get the result completed successfully, or the exception if the subtask failed.
  60  * {@snippet lang=java :
  61  *     Callable<String> task1 = ...
  62  *     Callable<Integer> task2 = ...



  63  *
  64  *     try (var scope = new StructuredTaskScope<Object>()) {








  65  *
  66  *         Subtask<String> subtask1 = scope.fork(task1);   // @highlight substring="fork"
  67  *         Subtask<Integer> subtask2 = scope.fork(task2);  // @highlight substring="fork"

  68  *
  69  *         scope.join();                                   // @highlight substring="join"

  70  *
  71  *         ... process results/exceptions ...

  72  *
  73  *     } // close                                          // @highlight substring="close"
  74  * }
  75  * <p> The following example forks a collection of homogeneous subtasks, waits for all of
  76  * them to complete with the {@code join} method, and uses the {@link Subtask.State
  77  * Subtask.State} to partition the subtasks into a set of the subtasks that completed
  78  * successfully and another for the subtasks that failed.
  79  * {@snippet lang=java :
  80  *     List<Callable<String>> callables = ...
  81  *
  82  *     try (var scope = new StructuredTaskScope<String>()) {








  83  *
  84  *         List<Subtask<String>> subtasks = callables.stream().map(scope::fork).toList();









  85  *
  86  *         scope.join();

  87  *
  88  *         Map<Boolean, Set<Subtask<String>>> map = subtasks.stream()
  89  *                 .collect(Collectors.partitioningBy(h -> h.state() == Subtask.State.SUCCESS,
  90  *                                                    Collectors.toSet()));
  91  *
  92  *     } // close

  93  * }
  94  *
  95  * <p> To ensure correct usage, the {@code join} and {@code close} methods may only be
  96  * invoked by the <em>owner</em> (the thread that opened/created the task scope), and the
  97  * {@code close} method throws an exception after closing if the owner did not invoke the
  98  * {@code join} method after forking.
  99  *
 100  * <p> {@code StructuredTaskScope} defines the {@link #shutdown() shutdown} method to shut
 101  * down a task scope without closing it. The {@code shutdown()} method <em>cancels</em> all
 102  * unfinished subtasks by {@linkplain Thread#interrupt() interrupting} the threads. It
 103  * prevents new threads from starting in the task scope. If the owner is waiting in the
 104  * {@code join} method then it will wakeup.
 105  *
 106  * <p> Shutdown is used for <em>short-circuiting</em> and allow subclasses to implement
 107  * <em>policy</em> that does not require all subtasks to finish.




 108  *
 109  * <h2>Subclasses with policies for common cases</h2>
 110  *
 111  * Two subclasses of {@code StructuredTaskScope} are defined to implement policy for
 112  * common cases:
 113  * <ol>
 114  *   <li> {@link ShutdownOnSuccess ShutdownOnSuccess} captures the result of the first
 115  *   subtask to complete successfully. Once captured, it shuts down the task scope to
 116  *   interrupt unfinished threads and wakeup the owner. This class is intended for cases
 117  *   where the result of any subtask will do ("invoke any") and where there is no need to
 118  *   wait for results of other unfinished subtasks. It defines methods to get the first
 119  *   result or throw an exception if all subtasks fail.
 120  *   <li> {@link ShutdownOnFailure ShutdownOnFailure} captures the exception of the first
 121  *   subtask to fail. Once captured, it shuts down the task scope to interrupt unfinished
 122  *   threads and wakeup the owner. This class is intended for cases where the results of all
 123  *   subtasks are required ("invoke all"); if any subtask fails then the results of other
 124  *   unfinished subtasks are no longer needed. If defines methods to throw an exception if
 125  *   any of the subtasks fail.
 126  * </ol>
 127  *
 128  * <p> The following are two examples that use the two classes. In both cases, a pair of
 129  * subtasks are forked to fetch resources from two URL locations "left" and "right". The
 130  * first example creates a ShutdownOnSuccess object to capture the result of the first
 131  * subtask to complete successfully, cancelling the other by way of shutting down the task
 132  * scope. The main task waits in {@code join} until either subtask completes with a result
 133  * or both subtasks fail. It invokes {@link ShutdownOnSuccess#result(Function)
 134  * result(Function)} method to get the captured result. If both subtasks fail then this
 135  * method throws a {@code WebApplicationException} with the exception from one of the
 136  * subtasks as the cause.
 137  * {@snippet lang=java :
 138  *     try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
 139  *
 140  *         scope.fork(() -> fetch(left));
 141  *         scope.fork(() -> fetch(right));

 142  *
 143  *         scope.join();
 144  *
 145  *         // @link regex="result(?=\()" target="ShutdownOnSuccess#result" :
 146  *         String result = scope.result(e -> new WebApplicationException(e));

 147  *
 148  *         ...
 149  *     }
 150  * }
 151  * The second example creates a ShutdownOnFailure object to capture the exception of the
 152  * first subtask to fail, cancelling the other by way of shutting down the task scope. The
 153  * main task waits in {@link #joinUntil(Instant)} until both subtasks complete with a
 154  * result, either fails, or a deadline is reached. It invokes {@link
 155  * ShutdownOnFailure#throwIfFailed(Function) throwIfFailed(Function)} to throw an exception
 156  * if either subtask fails. This method is a no-op if both subtasks complete successfully.
 157  * The example uses {@link Supplier#get()} to get the result of each subtask. Using
 158  * {@code Supplier} instead of {@code Subtask} is preferred for common cases where the
 159  * object returned by fork is only used to get the result of a subtask that completed
 160  * successfully.
 161  * {@snippet lang=java :
 162  *    Instant deadline = ...
 163  *
 164  *    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {






 165  *
 166  *         Supplier<String> supplier1 = scope.fork(() -> query(left));
 167  *         Supplier<String> supplier2 = scope.fork(() -> query(right));




 168  *
 169  *         scope.joinUntil(deadline);

 170  *
 171  *         // @link substring="throwIfFailed" target="ShutdownOnFailure#throwIfFailed" :
 172  *         scope.throwIfFailed(e -> new WebApplicationException(e));
 173  *
 174  *         // both subtasks completed successfully
 175  *         String result = Stream.of(supplier1, supplier2)
 176  *                 .map(Supplier::get)
 177  *                 .collect(Collectors.joining(", ", "{ ", " }"));
 178  *
 179  *         ...
 180  *     }
 181  * }
 182  *
 183  * <h2>Extending StructuredTaskScope</h2>
 184  *
 185  * {@code StructuredTaskScope} can be extended, and the {@link #handleComplete(Subtask)
 186  * handleComplete} method overridden, to implement policies other than those implemented
 187  * by {@code ShutdownOnSuccess} and {@code ShutdownOnFailure}. A subclass may, for example,
 188  * collect the results of subtasks that complete successfully and ignore subtasks that
 189  * fail. It may collect exceptions when subtasks fail. It may invoke the {@link #shutdown()
 190  * shutdown} method to shut down and cause {@link #join() join} to wakeup when some
 191  * condition arises.
 192  *
 193  * <p> A subclass will typically define methods to make available results, state, or other
 194  * outcome to code that executes after the {@code join} method. A subclass that collects
 195  * results and ignores subtasks that fail may define a method that returns the results.
 196  * A subclass that implements a policy to shut down when a subtask fails may define a
 197  * method to get the exception of the first subtask to fail.
 198  *
 199  * <p> The following is an example of a simple {@code StructuredTaskScope} implementation
 200  * that collects homogenous subtasks that complete successfully. It defines the method
 201  * "{@code completedSuccessfully()}" that the main task can invoke after it joins.


 202  * {@snippet lang=java :
 203  *     class CollectingScope<T> extends StructuredTaskScope<T> {
 204  *         private final Queue<Subtask<? extends T>> subtasks = new LinkedTransferQueue<>();
 205  *
 206  *         @Override
 207  *         protected void handleComplete(Subtask<? extends T> subtask) {
 208  *             if (subtask.state() == Subtask.State.SUCCESS) {
 209  *                 subtasks.add(subtask);
 210  *             }
 211  *         }
 212  *
 213  *         @Override
 214  *         public CollectingScope<T> join() throws InterruptedException {
 215  *             super.join();
 216  *             return this;
 217  *         }
 218  *
 219  *         public Stream<Subtask<? extends T>> completedSuccessfully() {
 220  *             // @link substring="ensureOwnerAndJoined" target="ensureOwnerAndJoined" :
 221  *             super.ensureOwnerAndJoined();
 222  *             return subtasks.stream();
 223  *         }
 224  *     }
 225  * }
 226  * <p> The implementations of the {@code completedSuccessfully()} method in the example
 227  * invokes {@link #ensureOwnerAndJoined()} to ensure that the method can only be invoked
 228  * by the owner thread and only after it has joined.
 229  *
 230  * <h2><a id="TreeStructure">Tree structure</a></h2>





 231  *
 232  * Task scopes form a tree where parent-child relations are established implicitly when
 233  * opening a new task scope:
 234  * <ul>
 235  *   <li> A parent-child relation is established when a thread started in a task scope
 236  *   opens its own task scope. A thread started in task scope "A" that opens task scope
 237  *   "B" establishes a parent-child relation where task scope "A" is the parent of task
 238  *   scope "B".
 239  *   <li> A parent-child relation is established with nesting. If a thread opens task
 240  *   scope "B", then opens task scope "C" (before it closes "B"), then the enclosing task
 241  *   scope "B" is the parent of the nested task scope "C".
 242  * </ul>
 243  *
 244  * The <i>descendants</i> of a task scope are the child task scopes that it is a parent
 245  * of, plus the descendants of the child task scopes, recursively.





 246  *
 247  * <p> The tree structure supports:
 248  * <ul>
 249  *   <li> Inheritance of {@linkplain ScopedValue scoped values} across threads.
 250  *   <li> Confinement checks. The phrase "threads contained in the task scope" in method
 251  *   descriptions means threads started in the task scope or descendant scopes.
 252  * </ul>
 253  *
 254  * <p> The following example demonstrates the inheritance of a scoped value. A scoped
 255  * value {@code USERNAME} is bound to the value "{@code duke}". A {@code StructuredTaskScope}
 256  * is created and its {@code fork} method invoked to start a thread to execute {@code
 257  * childTask}. The thread inherits the scoped value <em>bindings</em> captured when
 258  * creating the task scope. The code in {@code childTask} uses the value of the scoped
 259  * value and so reads the value "{@code duke}".

 260  * {@snippet lang=java :

 261  *     private static final ScopedValue<String> USERNAME = ScopedValue.newInstance();
 262  *
 263  *     // @link substring="runWhere" target="ScopedValue#runWhere(ScopedValue, Object, Runnable)" :
 264  *     ScopedValue.runWhere(USERNAME, "duke", () -> {
 265  *         try (var scope = new StructuredTaskScope<String>()) {
 266  *
 267  *             scope.fork(() -> childTask());           // @highlight substring="fork"
 268  *             ...
 269  *          }
 270  *     });
 271  *
 272  *     ...

 273  *
 274  *     String childTask() {
 275  *         // @link substring="get" target="ScopedValue#get()" :
 276  *         String name = USERNAME.get();   // "duke"
 277  *         ...
 278  *     }
 279  * }
 280  *
 281  * <p> {@code StructuredTaskScope} does not define APIs that exposes the tree structure
 282  * at this time.



 283  *
 284  * <p> Unless otherwise specified, passing a {@code null} argument to a constructor
 285  * or method in this class will cause a {@link NullPointerException} to be thrown.






 286  *
 287  * <h2>Memory consistency effects</h2>
 288  *
 289  * <p> Actions in the owner thread of, or a thread contained in, the task scope prior to
 290  * {@linkplain #fork forking} of a subtask
 291  * <a href="{@docRoot}/java.base/java/util/concurrent/package-summary.html#MemoryVisibility">
 292  * <i>happen-before</i></a> any actions taken by that subtask, which in turn <i>happen-before</i>
 293  * the subtask result is {@linkplain Subtask#get() retrieved} or <i>happen-before</i> any
 294  * actions taken in a thread after {@linkplain #join() joining} of the task scope.
 295  *
 296  * @jls 17.4.5 Happens-before Order



 297  *
 298  * @param <T> the result type of tasks executed in the task scope



 299  * @since 21
 300  */
 301 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 302 public class StructuredTaskScope<T> implements AutoCloseable {
 303     private final ThreadFactory factory;











 304     private final ThreadFlock flock;
 305     private final ReentrantLock shutdownLock = new ReentrantLock();
 306 
 307     // states: OPEN -> SHUTDOWN -> CLOSED
 308     private static final int OPEN     = 0;   // initial state
 309     private static final int SHUTDOWN = 1;
 310     private static final int CLOSED   = 2;

















































































































 311 
 312     // state: set to SHUTDOWN by any thread, set to CLOSED by owner, read by any thread
 313     private volatile int state;







 314 
 315     // Counters to support checking that the task scope owner joins before processing
 316     // results and attempts join before closing the task scope. These counters are
 317     // accessed only by the owner thread.
 318     private int forkRound;         // incremented when the first subtask is forked after join
 319     private int lastJoinAttempted; // set to the current fork round when join is attempted
 320     private int lastJoinCompleted; // set to the current fork round when join completes
 321 
 322     /**
 323      * Represents a subtask forked with {@link #fork(Callable)}.






 324      * @param <T> the result type
 325      * @since 21
 326      */
 327     @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 328     public sealed interface Subtask<T> extends Supplier<T> permits SubtaskImpl {
 329         /**
 330          * {@return the value returning task provided to the {@code fork} method}
 331          *
 332          * @apiNote Task objects with unique identity may be used for correlation by
 333          * implementations of {@link #handleComplete(Subtask) handleComplete}.
 334          */
 335         Callable<? extends T> task();
 336 
 337         /**
 338          * Represents the state of a subtask.
 339          * @see Subtask#state()
 340          * @since 21
 341          */
 342         @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 343         enum State {
 344             /**
 345              * The subtask result or exception is not available. This state indicates that
 346              * the subtask was forked but has not completed, it completed after the task
 347              * scope was {@linkplain #shutdown() shut down}, or it was forked after the
 348              * task scope was shut down.
 349              */
 350             UNAVAILABLE,
 351             /**
 352              * The subtask completed successfully with a result. The {@link Subtask#get()
 353              * Subtask.get()} method can be used to obtain the result. This is a terminal
 354              * state.
 355              */
 356             SUCCESS,
 357             /**
 358              * The subtask failed with an exception. The {@link Subtask#exception()
 359              * Subtask.exception()} method can be used to obtain the exception. This is a
 360              * terminal state.
 361              */
 362             FAILED,
 363         }
 364 
 365         /**
 366          * {@return the state of the subtask}
 367          */
 368         State state();
 369 
 370         /**
 371          * Returns the result of the subtask.




 372          *
 373          * <p> To ensure correct usage, if the scope owner {@linkplain #fork(Callable) forks}
 374          * a subtask, then it must join (with {@link #join() join} or {@link #joinUntil(Instant)
 375          * joinUntil}) before it can obtain the result of the subtask.



 376          *
 377          * @return the possibly-null result
 378          * @throws IllegalStateException if the subtask has not completed, did not complete
 379          * successfully, or the current thread is the task scope owner and did not join
 380          * after forking
 381          * @see State#SUCCESS
 382          */
 383         T get();
 384 
 385         /**
 386          * {@return the exception thrown by the subtask}








 387          *
 388          * <p> To ensure correct usage, if the scope owner {@linkplain #fork(Callable) forks}
 389          * a subtask, then it must join (with {@link #join() join} or {@link #joinUntil(Instant)
 390          * joinUntil}) before it can obtain the exception thrown by the subtask.
 391          *
 392          * @throws IllegalStateException if the subtask has not completed, completed with
 393          * a result, or the current thread is the task scope owner and did not join after
 394          * forking
 395          * @see State#FAILED
 396          */
 397         Throwable exception();
 398     }
 399 
 400     /**
 401      * Creates a structured task scope with the given name and thread factory. The task
 402      * scope is optionally named for the purposes of monitoring and management. The thread
 403      * factory is used to {@link ThreadFactory#newThread(Runnable) create} threads when
 404      * subtasks are {@linkplain #fork(Callable) forked}. The task scope is owned by the
 405      * current thread.
 406      *
 407      * <p> Construction captures the current thread's {@linkplain ScopedValue scoped value}
 408      * bindings for inheritance by threads started in the task scope. The
 409      * <a href="#TreeStructure">Tree Structure</a> section in the class description details
 410      * how parent-child relations are established implicitly for the purpose of inheritance
 411      * of scoped value bindings.










 412      *
 413      * @param name the name of the task scope, can be null
 414      * @param factory the thread factory
 415      */
 416     @SuppressWarnings("this-escape")
 417     public StructuredTaskScope(String name, ThreadFactory factory) {
 418         this.factory = Objects.requireNonNull(factory, "'factory' is null");
 419         if (name == null)
 420             name = Objects.toIdentityString(this);
 421         this.flock = ThreadFlock.open(name);
 422     }
 423 
 424     /**
 425      * Creates an unnamed structured task scope that creates virtual threads. The task
 426      * scope is owned by the current thread.



















 427      *
 428      * @implSpec This constructor is equivalent to invoking the 2-arg constructor with a
 429      * name of {@code null} and a thread factory that creates virtual threads.


 430      */
 431     public StructuredTaskScope() {
 432         this(null, Thread.ofVirtual().factory());
 433     }
 434 
 435     private IllegalStateException newIllegalStateExceptionScopeClosed() {
 436         return new IllegalStateException("Task scope is closed");
 437     }




















 438 
 439     private IllegalStateException newIllegalStateExceptionNoJoin() {
 440         return new IllegalStateException("Owner did not join after forking subtasks");
 441     }



















 442 
 443     /**
 444      * Throws IllegalStateException if the scope is closed, returning the state if not
 445      * closed.
 446      */
 447     private int ensureOpen() {
 448         int s = state;
 449         if (s == CLOSED)
 450             throw newIllegalStateExceptionScopeClosed();
 451         return s;
 452     }










 453 
 454     /**
 455      * Throws WrongThreadException if the current thread is not the owner.
 456      */
 457     private void ensureOwner() {
 458         if (Thread.currentThread() != flock.owner())
 459             throw new WrongThreadException("Current thread not owner");
 460     }














 461 
 462     /**
 463      * Throws WrongThreadException if the current thread is not the owner
 464      * or a thread contained in the tree.
 465      */
 466     private void ensureOwnerOrContainsThread() {
 467         Thread currentThread = Thread.currentThread();
 468         if (currentThread != flock.owner() && !flock.containsThread(currentThread))
 469             throw new WrongThreadException("Current thread not owner or thread in the tree");
 470     }










 471 
 472     /**
 473      * Throws IllegalStateException if the current thread is the owner, and the owner did
 474      * not join after forking a subtask in the given fork round.
 475      */
 476     private void ensureJoinedIfOwner(int round) {
 477         if (Thread.currentThread() == flock.owner() && (round > lastJoinCompleted)) {
 478             throw newIllegalStateExceptionNoJoin();










































































 479         }
 480     }
 481 
 482     /**
 483      * Ensures that the current thread is the owner of this task scope and that it joined
 484      * (with {@link #join()} or {@link #joinUntil(Instant)}) after {@linkplain #fork(Callable)
 485      * forking} subtasks.
 486      *
 487      * @apiNote This method can be used by subclasses that define methods to make available
 488      * results, state, or other outcome to code intended to execute after the join method.

 489      *
 490      * @throws WrongThreadException if the current thread is not the task scope owner
 491      * @throws IllegalStateException if the task scope is open and task scope owner did
 492      * not join after forking














 493      */
 494     protected final void ensureOwnerAndJoined() {
 495         ensureOwner();
 496         if (forkRound > lastJoinCompleted) {
 497             throw newIllegalStateExceptionNoJoin();
 498         }

































 499     }
 500 
 501     /**
 502      * Invoked by a subtask when it completes successfully or fails in this task scope.
 503      * This method is not invoked if a subtask completes after the task scope is
 504      * {@linkplain #shutdown() shut down}.





 505      *
 506      * @implSpec The default implementation throws {@code NullPointerException} if the
 507      * subtask is {@code null}. It throws {@link IllegalArgumentException} if the subtask
 508      * has not completed.
 509      *
 510      * @apiNote The {@code handleComplete} method should be thread safe. It may be
 511      * invoked by several threads concurrently.


 512      *
 513      * @param subtask the subtask


 514      *
 515      * @throws IllegalArgumentException if called with a subtask that has not completed








 516      */
 517     protected void handleComplete(Subtask<? extends T> subtask) {
 518         if (subtask.state() == Subtask.State.UNAVAILABLE)
 519             throw new IllegalArgumentException();



















 520     }
 521 
 522     /**
 523      * Starts a new thread in this task scope to execute a value-returning task, thus
 524      * creating a <em>subtask</em> of this task scope.



 525      *
 526      * <p> The value-returning task is provided to this method as a {@link Callable}, the
 527      * thread executes the task's {@link Callable#call() call} method. The thread is
 528      * created with the task scope's {@link ThreadFactory}. It inherits the current thread's
 529      * {@linkplain ScopedValue scoped value} bindings. The bindings must match the bindings
 530      * captured when the task scope was created.
 531      *
 532      * <p> This method returns a {@link Subtask Subtask} to represent the <em>forked
 533      * subtask</em>. The {@code Subtask} object can be used to obtain the result when
 534      * the subtask completes successfully, or the exception when the subtask fails. To
 535      * ensure correct usage, the {@link Subtask#get() get()} and {@link Subtask#exception()
 536      * exception()} methods may only be called by the task scope owner after it has waited
 537      * for all threads to finish with the {@link #join() join} or {@link #joinUntil(Instant)}
 538      * methods. When the subtask completes, the thread invokes the {@link
 539      * #handleComplete(Subtask) handleComplete} method to consume the completed subtask.
 540      * If the task scope is {@linkplain #shutdown() shut down} before the subtask completes
 541      * then the {@code handleComplete} method will not be invoked.





 542      *
 543      * <p> If this task scope is {@linkplain #shutdown() shutdown} (or in the process of
 544      * shutting down) then the subtask will not run and the {@code handleComplete} method
 545      * will not be invoked.












 546      *
 547      * <p> This method may only be invoked by the task scope owner or threads contained
 548      * in the task scope.







 549      *
 550      * @implSpec This method may be overridden for customization purposes, wrapping tasks
 551      * for example. If overridden, the subclass must invoke {@code super.fork} to start a
 552      * new thread in this task scope.
 553      *
 554      * @param task the value-returning task for the thread to execute
 555      * @param <U> the result type
 556      * @return the subtask
 557      * @throws IllegalStateException if this task scope is closed
 558      * @throws WrongThreadException if the current thread is not the task scope owner or a
 559      * thread contained in the task scope
 560      * @throws StructureViolationException if the current scoped value bindings are not
 561      * the same as when the task scope was created
 562      * @throws RejectedExecutionException if the thread factory rejected creating a
 563      * thread to run the subtask
 564      */
 565     public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
 566         Objects.requireNonNull(task, "'task' is null");
 567         int s = ensureOpen();   // throws ISE if closed
 568 
 569         // when forked by the owner, the subtask is forked in the current or next round
 570         int round = -1;
 571         if (Thread.currentThread() == flock.owner()) {
 572             round = forkRound;
 573             if (forkRound == lastJoinCompleted) {
 574                 // new round if first fork after join
 575                 round++;
 576             }
 577         }
 578 
 579         SubtaskImpl<U> subtask = new SubtaskImpl<>(this, task, round);
 580         if (s < SHUTDOWN) {






 581             // create thread to run task
 582             Thread thread = factory.newThread(subtask);
 583             if (thread == null) {
 584                 throw new RejectedExecutionException("Rejected by thread factory");
 585             }
 586 
 587             // attempt to start the thread
 588             try {
 589                 flock.start(thread);
 590             } catch (IllegalStateException e) {
 591                 // shutdown by another thread, or underlying flock is shutdown due
 592                 // to unstructured use
 593             }
 594         }
 595 
 596         // force owner to join if this is the first fork in the round
 597         if (Thread.currentThread() == flock.owner() && round > forkRound) {
 598             forkRound = round;
 599         }
 600 
 601         // return forked subtask or a subtask that did not run
 602         return subtask;
 603     }
 604 
 605     /**
 606      * Wait for all threads to finish or the task scope to shut down.
 607      */
 608     private void implJoin(Duration timeout)
 609         throws InterruptedException, TimeoutException
 610     {
 611         ensureOwner();
 612         lastJoinAttempted = forkRound;
 613         int s = ensureOpen();  // throws ISE if closed
 614         if (s == OPEN) {
 615             // wait for all threads, wakeup, interrupt, or timeout
 616             if (timeout != null) {
 617                 flock.awaitAll(timeout);
 618             } else {
 619                 flock.awaitAll();
 620             }
 621         }
 622         lastJoinCompleted = forkRound;
 623     }
 624 
 625     /**
 626      * Wait for all subtasks started in this task scope to finish or the task scope to
 627      * shut down.
 628      *
 629      * <p> This method waits for all subtasks by waiting for all threads {@linkplain
 630      * #fork(Callable) started} in this task scope to finish execution. It stops waiting
 631      * when all threads finish, the task scope is {@linkplain #shutdown() shut down}, or
 632      * the current thread is {@linkplain Thread#interrupt() interrupted}.
 633      *
 634      * <p> This method may only be invoked by the task scope owner.
 635      *
 636      * @implSpec This method may be overridden for customization purposes or to return a
 637      * more specific return type. If overridden, the subclass must invoke {@code
 638      * super.join} to ensure that the method waits for threads in this task scope to
 639      * finish.
 640      *
 641      * @return this task scope
 642      * @throws IllegalStateException if this task scope is closed


 643      * @throws WrongThreadException if the current thread is not the task scope owner
 644      * @throws InterruptedException if interrupted while waiting




 645      */
 646     public StructuredTaskScope<T> join() throws InterruptedException {
 647         try {
 648             implJoin(null);
 649         } catch (TimeoutException e) {
 650             throw new InternalError();
 651         }
 652         return this;
 653     }
 654 
 655     /**
 656      * Wait for all subtasks started in this task scope to finish or the task scope to
 657      * shut down, up to the given deadline.





 658      *
 659      * <p> This method waits for all subtasks by waiting for all threads {@linkplain
 660      * #fork(Callable) started} in this task scope to finish execution. It stops waiting
 661      * when all threads finish, the task scope is {@linkplain #shutdown() shut down}, the
 662      * deadline is reached, or the current thread is {@linkplain Thread#interrupt()
 663      * interrupted}.

 664      *
 665      * <p> This method may only be invoked by the task scope owner.
 666      *
 667      * @implSpec This method may be overridden for customization purposes or to return a
 668      * more specific return type. If overridden, the subclass must invoke {@code
 669      * super.joinUntil} to ensure that the method waits for threads in this task scope to
 670      * finish.
 671      *
 672      * @param deadline the deadline
 673      * @return this task scope
 674      * @throws IllegalStateException if this task scope is closed
 675      * @throws WrongThreadException if the current thread is not the task scope owner


 676      * @throws InterruptedException if interrupted while waiting
 677      * @throws TimeoutException if the deadline is reached while waiting
 678      */
 679     public StructuredTaskScope<T> joinUntil(Instant deadline)
 680         throws InterruptedException, TimeoutException
 681     {
 682         Duration timeout = Duration.between(Instant.now(), deadline);
 683         implJoin(timeout);
 684         return this;
 685     }
 686 
 687     /**
 688      * Interrupt all unfinished threads.
 689      */
 690     private void implInterruptAll() {
 691         flock.threads()
 692             .filter(t -> t != Thread.currentThread())
 693             .forEach(t -> {
 694                 try {
 695                     t.interrupt();
 696                 } catch (Throwable ignore) { }
 697             });
 698     }
 699 
 700     @SuppressWarnings("removal")
 701     private void interruptAll() {
 702         if (System.getSecurityManager() == null) {
 703             implInterruptAll();
 704         } else {
 705             PrivilegedAction<Void> pa = () -> {
 706                 implInterruptAll();
 707                 return null;
 708             };
 709             AccessController.doPrivileged(pa);
 710         }
 711     }
 712 
 713     /**
 714      * Shutdown the task scope if not already shutdown. Return true if this method
 715      * shutdowns the task scope, false if already shutdown.
 716      */
 717     private boolean implShutdown() {
 718         shutdownLock.lock();
 719         try {
 720             if (state < SHUTDOWN) {
 721                 // prevent new threads from starting
 722                 flock.shutdown();
 723 
 724                 // set status before interrupting tasks
 725                 state = SHUTDOWN;


 726 
 727                 // interrupt all unfinished threads
 728                 interruptAll();


 729 
 730                 return true;
 731             } else {
 732                 // already shutdown
 733                 return false;
 734             }
 735         } finally {
 736             shutdownLock.unlock();
 737         }
 738     }
 739 
 740     /**
 741      * Shut down this task scope without closing it. Shutting down a task scope prevents
 742      * new threads from starting, interrupts all unfinished threads, and causes the
 743      * {@link #join() join} method to wakeup. Shutdown is useful for cases where the
 744      * results of unfinished subtasks are no longer needed. It will typically be called
 745      * by the {@link #handleComplete(Subtask)} implementation of a subclass that
 746      * implements a policy to discard unfinished tasks once some outcome is reached.
 747      *
 748      * <p> More specifically, this method:
 749      * <ul>
 750      * <li> {@linkplain Thread#interrupt() Interrupts} all unfinished threads in the
 751      * task scope (except the current thread).
 752      * <li> Wakes up the task scope owner if it is waiting in {@link #join()} or {@link
 753      * #joinUntil(Instant)}. If the task scope owner is not waiting then its next call to
 754      * {@code join} or {@code joinUntil} will return immediately.
 755      * </ul>
 756      *
 757      * <p> The {@linkplain Subtask.State state} of unfinished subtasks that complete at
 758      * around the time that the task scope is shutdown is not defined. A subtask that
 759      * completes successfully with a result, or fails with an exception, at around
 760      * the time that the task scope is shutdown may or may not <i>transition</i> to a
 761      * terminal state.


 762      *
 763      * <p> This method may only be invoked by the task scope owner or threads contained
 764      * in the task scope.

 765      *
 766      * @implSpec This method may be overridden for customization purposes. If overridden,
 767      * the subclass must invoke {@code super.shutdown} to ensure that the method shuts
 768      * down the task scope.







 769      *
 770      * @apiNote
 771      * There may be threads that have not finished because they are executing code that
 772      * did not respond (or respond promptly) to thread interrupt. This method does not wait
 773      * for these threads. When the owner invokes the {@link #close() close} method
 774      * to close the task scope then it will wait for the remaining threads to finish.
 775      *
 776      * @throws IllegalStateException if this task scope is closed
 777      * @throws WrongThreadException if the current thread is not the task scope owner or
 778      * a thread contained in the task scope
 779      * @see #isShutdown()
 780      */
 781     public void shutdown() {
 782         ensureOwnerOrContainsThread();
 783         int s = ensureOpen();  // throws ISE if closed
 784         if (s < SHUTDOWN && implShutdown())
 785             flock.wakeup();
 786     }
 787 
 788     /**
 789      * {@return true if this task scope is shutdown, otherwise false}
 790      * @see #shutdown()













 791      */
 792     public final boolean isShutdown() {
 793         return state >= SHUTDOWN;
 794     }
 795 
 796     /**
 797      * Closes this task scope.
 798      *
 799      * <p> This method first shuts down the task scope (as if by invoking the {@link
 800      * #shutdown() shutdown} method). It then waits for the threads executing any
 801      * unfinished tasks to finish. If interrupted, this method will continue to wait for
 802      * the threads to finish before completing with the interrupt status set.

 803      *
 804      * <p> This method may only be invoked by the task scope owner. If the task scope
 805      * is already closed then the task scope owner invoking this method has no effect.
 806      *
 807      * <p> A {@code StructuredTaskScope} is intended to be used in a <em>structured
 808      * manner</em>. If this method is called to close a task scope before nested task
 809      * scopes are closed then it closes the underlying construct of each nested task scope
 810      * (in the reverse order that they were created in), closes this task scope, and then
 811      * throws {@link StructureViolationException}.
 812      * Similarly, if this method is called to close a task scope while executing with
 813      * {@linkplain ScopedValue scoped value} bindings, and the task scope was created
 814      * before the scoped values were bound, then {@code StructureViolationException} is
 815      * thrown after closing the task scope.
 816      * If a thread terminates without first closing task scopes that it owns then
 817      * termination will cause the underlying construct of each of its open tasks scopes to
 818      * be closed. Closing is performed in the reverse order that the task scopes were
 819      * created in. Thread termination may therefore be delayed when the task scope owner
 820      * has to wait for threads forked in these task scopes to finish.
 821      *
 822      * @implSpec This method may be overridden for customization purposes. If overridden,
 823      * the subclass must invoke {@code super.close} to close the task scope.
 824      *
 825      * @throws IllegalStateException thrown after closing the task scope if the task scope
 826      * owner did not attempt to join after forking
 827      * @throws WrongThreadException if the current thread is not the task scope owner
 828      * @throws StructureViolationException if a structure violation was detected
 829      */
 830     @Override
 831     public void close() {
 832         ensureOwner();
 833         int s = state;
 834         if (s == CLOSED)
 835             return;







 836 

 837         try {
 838             if (s < SHUTDOWN)
 839                 implShutdown();
 840             flock.close();
 841         } finally {
 842             state = CLOSED;
 843         }
 844 
 845         // throw ISE if the owner didn't attempt to join after forking
 846         if (forkRound > lastJoinAttempted) {
 847             lastJoinCompleted = forkRound;
 848             throw newIllegalStateExceptionNoJoin();
 849         }
 850     }
 851 




 852     @Override
 853     public String toString() {
 854         String name = flock.name();
 855         return switch (state) {
 856             case OPEN     -> name;
 857             case SHUTDOWN -> name + "/shutdown";
 858             case CLOSED   -> name + "/closed";
 859             default -> throw new InternalError();
 860         };
 861     }
 862 
 863     /**
 864      * Subtask implementation, runs the task specified to the fork method.
 865      */
 866     private static final class SubtaskImpl<T> implements Subtask<T>, Runnable {
 867         private static final AltResult RESULT_NULL = new AltResult(Subtask.State.SUCCESS);
 868 
 869         private record AltResult(Subtask.State state, Throwable exception) {
 870             AltResult(Subtask.State state) {
 871                 this(state, null);
 872             }
 873         }
 874 
 875         private final StructuredTaskScope<? super T> scope;
 876         private final Callable<? extends T> task;
 877         private final int round;
 878         private volatile Object result;
 879 
 880         SubtaskImpl(StructuredTaskScope<? super T> scope,
 881                     Callable<? extends T> task,
 882                     int round) {
 883             this.scope = scope;
 884             this.task = task;
 885             this.round = round;
 886         }
 887 
 888         @Override
 889         public void run() {
 890             T result = null;
 891             Throwable ex = null;
 892             try {
 893                 result = task.call();
 894             } catch (Throwable e) {
 895                 ex = e;
 896             }
 897 
 898             // nothing to do if task scope is shutdown
 899             if (scope.isShutdown())
 900                 return;
 901 
 902             // capture result or exception, invoke handleComplete
 903             if (ex == null) {
 904                 this.result = (result != null) ? result : RESULT_NULL;
 905             } else {
 906                 this.result = new AltResult(State.FAILED, ex);
 907             }
 908             scope.handleComplete(this);
 909         }
 910 
 911         @Override
 912         public Callable<? extends T> task() {
 913             return task;
 914         }
 915 
 916         @Override
 917         public Subtask.State state() {
 918             Object result = this.result;
 919             if (result == null) {
 920                 return State.UNAVAILABLE;
 921             } else if (result instanceof AltResult alt) {
 922                 // null or failed
 923                 return alt.state();
 924             } else {
 925                 return State.SUCCESS;
 926             }
 927         }
 928 

 929         @Override
 930         public T get() {
 931             scope.ensureJoinedIfOwner(round);
 932             Object result = this.result;
 933             if (result instanceof AltResult) {
 934                 if (result == RESULT_NULL) return null;
 935             } else if (result != null) {
 936                 @SuppressWarnings("unchecked")
 937                 T r = (T) result;
 938                 return r;
 939             }
 940             throw new IllegalStateException(
 941                     "Result is unavailable or subtask did not complete successfully");
 942         }
 943 
 944         @Override
 945         public Throwable exception() {
 946             scope.ensureJoinedIfOwner(round);
 947             Object result = this.result;
 948             if (result instanceof AltResult alt && alt.state() == State.FAILED) {
 949                 return alt.exception();
 950             }
 951             throw new IllegalStateException(
 952                     "Exception is unavailable or subtask did not complete with exception");
 953         }
 954 
 955         @Override
 956         public String toString() {
 957             String stateAsString = switch (state()) {
 958                 case UNAVAILABLE -> "[Unavailable]";
 959                 case SUCCESS     -> "[Completed successfully]";
 960                 case FAILED      -> {
 961                     Throwable ex = ((AltResult) result).exception();
 962                     yield "[Failed: " + ex + "]";
 963                 }
 964             };
 965             return Objects.toIdentityString(this) + stateAsString;
 966         }
 967     }
 968 
 969     /**
 970      * A {@code StructuredTaskScope} that captures the result of the first subtask to
 971      * complete {@linkplain Subtask.State#SUCCESS successfully}. Once captured, it
 972      * {@linkplain #shutdown() shuts down} the task scope to interrupt unfinished threads
 973      * and wakeup the task scope owner. The policy implemented by this class is intended
 974      * for cases where the result of any subtask will do ("invoke any") and where the
 975      * results of other unfinished subtasks are no longer needed.
 976      *
 977      * <p> Unless otherwise specified, passing a {@code null} argument to a method
 978      * in this class will cause a {@link NullPointerException} to be thrown.
 979      *
 980      * @apiNote This class implements a policy to shut down the task scope when a subtask
 981      * completes successfully. There shouldn't be any need to directly shut down the task
 982      * scope with the {@link #shutdown() shutdown} method.
 983      *
 984      * @param <T> the result type
 985      * @since 21
 986      */
 987     @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 988     public static final class ShutdownOnSuccess<T> extends StructuredTaskScope<T> {
 989         private static final Object RESULT_NULL = new Object();
 990         private static final VarHandle FIRST_RESULT;
 991         private static final VarHandle FIRST_EXCEPTION;
 992         static {
 993             try {
 994                 MethodHandles.Lookup l = MethodHandles.lookup();
 995                 FIRST_RESULT = l.findVarHandle(ShutdownOnSuccess.class, "firstResult", Object.class);
 996                 FIRST_EXCEPTION = l.findVarHandle(ShutdownOnSuccess.class, "firstException", Throwable.class);
 997             } catch (Exception e) {
 998                 throw new ExceptionInInitializerError(e);
 999             }
1000         }
1001         private volatile Object firstResult;

1002         private volatile Throwable firstException;
1003 
1004         /**
1005          * Constructs a new {@code ShutdownOnSuccess} with the given name and thread factory.
1006          * The task scope is optionally named for the purposes of monitoring and management.
1007          * The thread factory is used to {@link ThreadFactory#newThread(Runnable) create}
1008          * threads when subtasks are {@linkplain #fork(Callable) forked}. The task scope
1009          * is owned by the current thread.
1010          *
1011          * <p> Construction captures the current thread's {@linkplain ScopedValue scoped
1012          * value} bindings for inheritance by threads started in the task scope. The
1013          * <a href="#TreeStructure">Tree Structure</a> section in the class description
1014          * details how parent-child relations are established implicitly for the purpose
1015          * of inheritance of scoped value bindings.
1016          *
1017          * @param name the name of the task scope, can be null
1018          * @param factory the thread factory
1019          */
1020         public ShutdownOnSuccess(String name, ThreadFactory factory) {
1021             super(name, factory);
1022         }
1023 
1024         /**
1025          * Constructs a new unnamed {@code ShutdownOnSuccess} that creates virtual threads.
1026          *
1027          * @implSpec This constructor is equivalent to invoking the 2-arg constructor with
1028          * a name of {@code null} and a thread factory that creates virtual threads.
1029          */
1030         public ShutdownOnSuccess() {
1031             this(null, Thread.ofVirtual().factory());
1032         }
1033 
1034         @Override
1035         protected void handleComplete(Subtask<? extends T> subtask) {
1036             if (firstResult != null) {
1037                 // already captured a result
1038                 return;


1039             }


1040 
1041             if (subtask.state() == Subtask.State.SUCCESS) {
1042                 // task succeeded
1043                 T result = subtask.get();
1044                 Object r = (result != null) ? result : RESULT_NULL;
1045                 if (FIRST_RESULT.compareAndSet(this, null, r)) {
1046                     super.shutdown();
1047                 }
1048             } else if (firstException == null) {
1049                 // capture the exception thrown by the first subtask that failed
1050                 FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());




1051             }
1052         }


1053 
1054         /**
1055          * Wait for a subtask started in this task scope to complete {@linkplain
1056          * Subtask.State#SUCCESS successfully} or all subtasks to complete.
1057          *
1058          * <p> This method waits for all subtasks by waiting for all threads {@linkplain
1059          * #fork(Callable) started} in this task scope to finish execution. It stops waiting
1060          * when all threads finish, a subtask completes successfully, or the current
1061          * thread is {@linkplain Thread#interrupt() interrupted}. It also stops waiting
1062          * if the {@link #shutdown() shutdown} method is invoked directly to shut down
1063          * this task scope.
1064          *
1065          * <p> This method may only be invoked by the task scope owner.
1066          *
1067          * @throws IllegalStateException {@inheritDoc}
1068          * @throws WrongThreadException {@inheritDoc}
1069          */
1070         @Override
1071         public ShutdownOnSuccess<T> join() throws InterruptedException {
1072             super.join();
1073             return this;








1074         }
1075 
1076         /**
1077          * Wait for a subtask started in this task scope to complete {@linkplain
1078          * Subtask.State#SUCCESS successfully} or all subtasks to complete, up to the
1079          * given deadline.
1080          *
1081          * <p> This method waits for all subtasks by waiting for all threads {@linkplain
1082          * #fork(Callable) started} in this task scope to finish execution. It stops waiting
1083          * when all threads finish, a subtask completes successfully, the deadline is
1084          * reached, or the current thread is {@linkplain Thread#interrupt() interrupted}.
1085          * It also stops waiting if the {@link #shutdown() shutdown} method is invoked
1086          * directly to shut down this task scope.
1087          *
1088          * <p> This method may only be invoked by the task scope owner.
1089          *
1090          * @throws IllegalStateException {@inheritDoc}
1091          * @throws WrongThreadException {@inheritDoc}
1092          */
1093         @Override
1094         public ShutdownOnSuccess<T> joinUntil(Instant deadline)
1095             throws InterruptedException, TimeoutException
1096         {
1097             super.joinUntil(deadline);
1098             return this;
1099         }
1100 
1101         /**
1102          * {@return the result of the first subtask that completed {@linkplain
1103          * Subtask.State#SUCCESS successfully}}
1104          *
1105          * <p> When no subtask completed successfully, but a subtask {@linkplain
1106          * Subtask.State#FAILED failed} then {@code ExecutionException} is thrown with
1107          * the subtask's exception as the {@linkplain Throwable#getCause() cause}.
1108          *
1109          * @throws ExecutionException if no subtasks completed successfully but at least
1110          * one subtask failed
1111          * @throws IllegalStateException if no subtasks completed or the task scope owner
1112          * did not join after forking
1113          * @throws WrongThreadException if the current thread is not the task scope owner
1114          */
1115         public T result() throws ExecutionException {
1116             return result(ExecutionException::new);
1117         }
1118 
1119         /**
1120          * Returns the result of the first subtask that completed {@linkplain
1121          * Subtask.State#SUCCESS successfully}, otherwise throws an exception produced
1122          * by the given exception supplying function.
1123          *
1124          * <p> When no subtask completed successfully, but a subtask {@linkplain
1125          * Subtask.State#FAILED failed}, then the exception supplying function is invoked
1126          * with subtask's exception.
1127          *
1128          * @param esf the exception supplying function
1129          * @param <X> type of the exception to be thrown
1130          * @return the result of the first subtask that completed with a result
1131          *
1132          * @throws X if no subtasks completed successfully but at least one subtask failed
1133          * @throws IllegalStateException if no subtasks completed or the task scope owner
1134          * did not join after forking
1135          * @throws WrongThreadException if the current thread is not the task scope owner
1136          */
1137         public <X extends Throwable> T result(Function<Throwable, ? extends X> esf) throws X {
1138             Objects.requireNonNull(esf);
1139             ensureOwnerAndJoined();
1140 
1141             Object result = firstResult;
1142             if (result == RESULT_NULL) {
1143                 return null;
1144             } else if (result != null) {
1145                 @SuppressWarnings("unchecked")
1146                 T r = (T) result;
1147                 return r;
1148             }
1149 
1150             Throwable exception = firstException;
1151             if (exception != null) {
1152                 X ex = esf.apply(exception);
1153                 Objects.requireNonNull(ex, "esf returned null");
1154                 throw ex;
1155             }
1156 
1157             throw new IllegalStateException("No completed subtasks");
1158         }
1159     }
1160 
1161     /**
1162      * A {@code StructuredTaskScope} that captures the exception of the first subtask to
1163      * {@linkplain Subtask.State#FAILED fail}. Once captured, it {@linkplain #shutdown()
1164      * shuts down} the task scope to interrupt unfinished threads and wakeup the task
1165      * scope owner. The policy implemented by this class is intended for cases where the
1166      * results for all subtasks are required ("invoke all"); if any subtask fails then the
1167      * results of other unfinished subtasks are no longer needed.
1168      *
1169      * <p> Unless otherwise specified, passing a {@code null} argument to a method
1170      * in this class will cause a {@link NullPointerException} to be thrown.
1171      *
1172      * @apiNote This class implements a policy to shut down the task scope when a subtask
1173      * fails. There shouldn't be any need to directly shut down the task scope with the
1174      * {@link #shutdown() shutdown} method.
1175      *
1176      * @since 21
1177      */
1178     @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
1179     public static final class ShutdownOnFailure extends StructuredTaskScope<Object> {
1180         private static final VarHandle FIRST_EXCEPTION;
1181         static {
1182             try {
1183                 MethodHandles.Lookup l = MethodHandles.lookup();
1184                 FIRST_EXCEPTION = l.findVarHandle(ShutdownOnFailure.class, "firstException", Throwable.class);
1185             } catch (Exception e) {
1186                 throw new ExceptionInInitializerError(e);
1187             }
1188         }
1189         private volatile Throwable firstException;
1190 
1191         /**
1192          * Constructs a new {@code ShutdownOnFailure} with the given name and thread factory.
1193          * The task scope is optionally named for the purposes of monitoring and management.
1194          * The thread factory is used to {@link ThreadFactory#newThread(Runnable) create}
1195          * threads when subtasks are {@linkplain #fork(Callable) forked}. The task scope
1196          * is owned by the current thread.
1197          *
1198          * <p> Construction captures the current thread's {@linkplain ScopedValue scoped
1199          * value} bindings for inheritance by threads started in the task scope. The
1200          * <a href="#TreeStructure">Tree Structure</a> section in the class description
1201          * details how parent-child relations are established implicitly for the purpose
1202          * of inheritance of scoped value bindings.
1203          *
1204          * @param name the name of the task scope, can be null
1205          * @param factory the thread factory
1206          */
1207         public ShutdownOnFailure(String name, ThreadFactory factory) {
1208             super(name, factory);
1209         }
1210 
1211         /**
1212          * Constructs a new unnamed {@code ShutdownOnFailure} that creates virtual threads.
1213          *
1214          * @implSpec This constructor is equivalent to invoking the 2-arg constructor with
1215          * a name of {@code null} and a thread factory that creates virtual threads.
1216          */
1217         public ShutdownOnFailure() {
1218             this(null, Thread.ofVirtual().factory());













1219         }
1220 
1221         @Override
1222         protected void handleComplete(Subtask<?> subtask) {
1223             if (subtask.state() == Subtask.State.FAILED
1224                     && firstException == null
1225                     && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception())) {
1226                 super.shutdown();
1227             }
1228         }
1229 
1230         /**
1231          * Wait for all subtasks started in this task scope to complete or for a subtask
1232          * to {@linkplain Subtask.State#FAILED fail}.
1233          *
1234          * <p> This method waits for all subtasks by waiting for all threads {@linkplain
1235          * #fork(Callable) started} in this task scope to finish execution. It stops waiting
1236          * when all threads finish, a subtask fails, or the current thread is {@linkplain
1237          * Thread#interrupt() interrupted}. It also stops waiting if the {@link #shutdown()
1238          * shutdown} method is invoked directly to shut down this task scope.
1239          *
1240          * <p> This method may only be invoked by the task scope owner.
1241          *
1242          * @throws IllegalStateException {@inheritDoc}
1243          * @throws WrongThreadException {@inheritDoc}
1244          */
1245         @Override
1246         public ShutdownOnFailure join() throws InterruptedException {
1247             super.join();
1248             return this;
1249         }
1250 
1251         /**
1252          * Wait for all subtasks started in this task scope to complete or for a subtask
1253          * to {@linkplain Subtask.State#FAILED fail}, up to the given deadline.
1254          *
1255          * <p> This method waits for all subtasks by waiting for all threads {@linkplain
1256          * #fork(Callable) started} in this task scope to finish execution. It stops waiting
1257          * when all threads finish, a subtask fails, the deadline is reached, or the current
1258          * thread is {@linkplain Thread#interrupt() interrupted}. It also stops waiting
1259          * if the {@link #shutdown() shutdown} method is invoked directly to shut down
1260          * this task scope.
1261          *
1262          * <p> This method may only be invoked by the task scope owner.
1263          *
1264          * @throws IllegalStateException {@inheritDoc}
1265          * @throws WrongThreadException {@inheritDoc}
1266          */
1267         @Override
1268         public ShutdownOnFailure joinUntil(Instant deadline)
1269             throws InterruptedException, TimeoutException
1270         {
1271             super.joinUntil(deadline);
1272             return this;
1273         }

1274 
1275         /**
1276          * Returns the exception of the first subtask that {@linkplain Subtask.State#FAILED
1277          * failed}. If no subtasks failed then an empty {@code Optional} is returned.
1278          *
1279          * @return the exception for the first subtask to fail or an empty optional if no
1280          * subtasks failed
1281          *
1282          * @throws WrongThreadException if the current thread is not the task scope owner
1283          * @throws IllegalStateException if the task scope owner did not join after forking
1284          */
1285         public Optional<Throwable> exception() {
1286             ensureOwnerAndJoined();
1287             return Optional.ofNullable(firstException);
1288         }
1289 
1290         /**
1291          * Throws if a subtask {@linkplain Subtask.State#FAILED failed}.
1292          * If any subtask failed with an exception then {@code ExecutionException} is
1293          * thrown with the exception of the first subtask to fail as the {@linkplain
1294          * Throwable#getCause() cause}. This method does nothing if no subtasks failed.
1295          *
1296          * @throws ExecutionException if a subtask failed
1297          * @throws WrongThreadException if the current thread is not the task scope owner
1298          * @throws IllegalStateException if the task scope owner did not join after forking
1299          */
1300         public void throwIfFailed() throws ExecutionException {
1301             throwIfFailed(ExecutionException::new);
1302         }
1303 
1304         /**
1305          * Throws the exception produced by the given exception supplying function if a
1306          * subtask {@linkplain Subtask.State#FAILED failed}. If any subtask failed with
1307          * an exception then the function is invoked with the exception of the first
1308          * subtask to fail. The exception returned by the function is thrown. This method
1309          * does nothing if no subtasks failed.
1310          *
1311          * @param esf the exception supplying function
1312          * @param <X> type of the exception to be thrown
1313          *
1314          * @throws X produced by the exception supplying function
1315          * @throws WrongThreadException if the current thread is not the task scope owner
1316          * @throws IllegalStateException if the task scope owner did not join after forking
1317          */
1318         public <X extends Throwable>
1319         void throwIfFailed(Function<Throwable, ? extends X> esf) throws X {
1320             ensureOwnerAndJoined();
1321             Objects.requireNonNull(esf);
1322             Throwable exception = firstException;
1323             if (exception != null) {
1324                 X ex = esf.apply(exception);
1325                 Objects.requireNonNull(ex, "esf returned null");
1326                 throw ex;
1327             }






1328         }
1329     }
1330 }

   1 /*
   2  * Copyright (c) 2021, 2024, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.concurrent;
  26 
  27 import java.lang.invoke.MethodHandles;
  28 import java.lang.invoke.VarHandle;
  29 import java.security.AccessController;
  30 import java.security.PrivilegedAction;
  31 import java.time.Duration;
  32 import java.util.ArrayList;
  33 import java.util.List;
  34 import java.util.NoSuchElementException;
  35 import java.util.Objects;


  36 import java.util.function.Function;
  37 import java.util.function.Predicate;
  38 import java.util.function.Supplier;
  39 import java.util.stream.Stream;
  40 import jdk.internal.javac.PreviewFeature;
  41 import jdk.internal.misc.InnocuousThread;
  42 import jdk.internal.misc.ThreadFlock;
  43 
  44 /**
  45  * An API for <em>structured concurrency</em>. {@code StructuredTaskScope} supports cases
  46  * where a main task splits into several concurrent subtasks, and where the subtasks must
  47  * complete before the main task continues. A {@code StructuredTaskScope} can be used to
  48  * ensure that the lifetime of a concurrent operation is confined by a <em>syntax block</em>,
  49  * just like that of a sequential operation in structured programming.
  50  *
  51  * <p> {@code StructuredTaskScope} defines the static method {@link #open(Joiner) open}
  52  * to open a new {@code StructuredTaskScope} and the {@link #close() close} method to close
  53  * it. The API is designed to be used with the {@code try-with-resources} statement where
  54  * the {@code StructuredTaskScope} is opened as a resource and then closed automatically.
  55  * The code in the block uses the {@link #fork(Callable) fork} method to fork subtasks.
  56  * After forking, it uses the {@link #join() join} method to wait for all subtasks to
  57  * finish (or some other outcome) as a single operation. Forking a subtask starts a new
  58  * {@link Thread} to run the subtask. The thread executing the main task does not continue
  59  * beyond the {@code close} method until all threads started to execute subtasks have finished.
  60  * To ensure correct usage, the {@code fork}, {@code join} and {@code close} methods may
  61  * only be invoked by the <em>owner thread</em> (the thread that opened the {@code
  62  * StructuredTaskScope}), and the {@code close} method throws an exception after closing
  63  * if the owner did not invoke the {@code join} method.
  64  *
  65  * <p> A {@code StructuredTaskScope} is opened with a {@link Joiner} that handles subtask
  66  * completion and produces the outcome (the result or an exception) for the {@link #join()
  67  * join} method. The {@code Joiner} interface defines static methods to create a
  68  * {@code Joiner} for common cases.
  69  *
  70  * <p> A {@code Joiner} may <a id="CancelExecution"><em>cancel execution</em></a>
  71  * (sometimes called "short-circuiting") when some condition is reached that does not
  72  * require the result of subtasks that are still executing. Cancelling execution prevents
  73  * new threads from being started to execute further subtasks, {@linkplain Thread#interrupt()
  74  * interrupts} the threads executing subtasks that have not completed, and causes the
  75  * {@code join} method to wakeup with a result (or exception). The {@link #close() close}
  76  * method always waits for threads executing subtasks to finish, even if execution is
  77  * cancelled, so it cannot continue beyond the {@code close} method until the interrupted
  78  * threads finish. Subtasks should be coded so that they finish as soon as possible when
  79  * interrupted. Subtasks that block on methods that are not interruptible may delay the
  80  * closing of a task scope.
  81  *
  82  * <p> Consider the example of a main task that splits into two subtasks to concurrently
  83  * fetch resources from two URL locations "left" and "right". Both subtasks may complete
  84  * successfully, one subtask may succeed and the other may fail, or both subtasks may
  85  * fail. The main task in this example is interested in the successful result from both
  86  * subtasks. It uses {@link Joiner#awaitAllSuccessfulOrThrow()
  87  * Joiner.awaitAllSuccessfulOrThrow()} to create a {@code Joiner} that waits for both
  88  * subtasks to complete successfully or for either subtask to fail.
  89  * {@snippet lang=java :
  90  *    try (var scope = StructuredTaskScope.open(Joiner.awaitAllSuccessfulOrThrow())) {
  91  *
  92  *        // @link substring="fork" target="#fork(Callable)" :
  93  *        Subtask<String> subtask1 = scope.fork(() -> query(left));
  94  *        Subtask<Integer> subtask2 = scope.fork(() -> query(right));
  95  *
  96  *        // throws if either subtask fails
  97  *        scope.join();  // @link substring="join" target="#join()"
  98  *
  99  *        // both subtasks completed successfully
 100  *        return new MyResult(subtask1.get(), subtask2.get()); // @link substring="get" target="Subtask#get()"
 101  *
 102  *    }
 103  * }






 104  *
 105  * <p> In this example, the main task forks the two subtasks. The {@code fork} method
 106  * returns a {@link Subtask Subtask} that is a handle to the forked subtask. The main task
 107  * waits in the {@code join} method for both subtasks to complete successfully or for either
 108  * subtask to fail. If both subtasks complete successfully then the {@code join} method
 109  * completes and the main task uses the {@link Subtask#get() Subtask.get()} method to get
 110  * the result of each subtask. If either subtask fails then the {@code Joiner} causes the
 111  * other subtask to be cancelled (this will interrupt the thread executing the subtask)
 112  * and the {@code join} method throws {@link ExecutionException} with the exception from
 113  * the failed subtask as the {@linkplain Throwable#getCause() cause}.
 114  *
 115  * <p> Now consider another example that also splits into two subtasks to concurrently
 116  * fetch resources. In this example, the code in the main task is only interested in the
 117  * result from the first subtask to complete successfully. The example uses {@link
 118  * Joiner#anySuccessfulResultOrThrow() Joiner.anySuccessfulResultOrThrow()} to
 119  * create a {@code Joiner} that makes available the result of the first subtask to
 120  * complete successfully. The type parameter in the example is "{@code String}" so that
 121  * only subtasks that return a {@code String} can be forked.
 122  * {@snippet lang=java :
 123  *    // @link substring="open" target="#open(Policy)" :
 124  *    try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow())) {
 125  *
 126  *        scope.fork(() -> query(left));  // @link substring="fork" target="#fork(Callable)"
 127  *        scope.fork(() -> query(right));
 128  *
 129  *        // throws if both subtasks fail
 130  *        String firstResult = scope.join();

 131  *
 132  *    // @link substring="close" target="#close()" :
 133  *    } // close
 134  * }
 135  *
 136  * <p> In the example, the main task forks the two subtasks, then waits in the {@code
 137  * join} method for either subtask to complete successfully or for both subtasks to fail.
 138  * If one of the subtasks completes successfully then the {@code Joiner} causes the other
 139  * subtask to be cancelled (this will interrupt the thread executing the subtask), and
 140  * the {@code join} method returns the result from the first subtask. Cancelling the other
 141  * subtask avoids the main task waiting for a result that it doesn't care about. If both
 142  * subtasks fail then the {@code join} method throws {@link ExecutionException} with the
 143  * exception from one of the subtasks as the {@linkplain Throwable#getCause() cause}.


 144  *
 145  * <p> Whether code uses the {@code Subtask} returned from {@code fork} will depend on
 146  * the {@code Joiner} and usage. Some {@code Joiner} implementations are suited to subtasks
 147  * that return results of the same type and where the {@code join} method returns a result
 148  * for the main task to use. Code that forks subtasks that return results of different
 149  * types, and uses a {@code Joiner} such as {@code Joiner.awaitAllSuccessfulOrThrow()} that
 150  * does not return a result, will use {@link Subtask#get() Subtask.get()} after joining.
 151  *
 152  * <h2>Exception handling</h2>
 153  *
 154  * <p> A {@code StructuredTaskScope} is opened with a {@link Joiner Joiner} that
 155  * handles subtask completion and produces the outcome for the {@link #join() join} method.
 156  * In some cases, the outcome will be a result, in other cases it will be an exception.
 157  * If the outcome is an exception then the {@code join} method throws {@link
 158  * ExecutionException} with the exception as the {@linkplain Throwable#getCause()
 159  * cause}. For many {@code Joiner} implementations, the exception will be an exception
 160  * thrown by a subtask that failed. In the case of {@link Joiner#allSuccessfulOrThrow()
 161  * allSuccessfulOrThrow} and {@link Joiner#awaitAllSuccessfulOrThrow() awaitAllSuccessfulOrThrow}
 162  * for example, the exception is from the first subtask to fail.







 163  *
 164  * <p> Many of the details for how exceptions are handled will depend on usage. In some
 165  * cases, the {@code join} method will be called in a {@code try-catch} block to catch
 166  * {@code ExecutionException} and handle the cause. The exception handling may use
 167  * {@code instanceof} with pattern matching to handle specific causes. In some cases it
 168  * may not be useful to catch {@code ExecutionException} but instead leave it to propagate
 169  * to the configured {@linkplain Thread.UncaughtExceptionHandler uncaught exception handler}
 170  * for logging purposes.




 171  *
 172  * <p> For cases where a specific exception triggers the use of a default result then it
 173  * may be more appropriate to handle this in the subtask itself rather than the subtask
 174  * failing and code in the main task handling the exception.
 175  *
 176  * <h2>Configuration</h2>
 177  *
 178  * A {@code StructuredTaskScope} is opened with {@linkplain Config configuration} that
 179  * consists of a {@link ThreadFactory} to create threads, an optional name for monitoring
 180  * and management purposes, and an optional timeout.
 181  *
 182  * <p> The 1-arg {@link #open(Joiner) open} method creates a {@code StructuredTaskScope}
 183  * with the <a id="DefaultConfiguration"> <em>default configuration</em></a>. The default
 184  * configuration has a {@code ThreadFactory} that creates unnamed
 185  * <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>,
 186  * is unnamed for monitoring and management purposes, and has no timeout.










 187  *
 188  * <p> The 2-arg {@link #open(Joiner, Function) open} method can be used to create a
 189  * {@code StructuredTaskScope} that uses a different {@code ThreadFactory}, has a name for
 190  * the purposes of monitoring and management, or has a timeout that cancels execution if
 191  * the timeout expires before or while waiting for subtasks to complete. The {@code open}
 192  * method is called with a {@linkplain Function function} that is applied to the default
 193  * configuration and returns a {@link Config Config} for the {@code StructuredTaskScope}
 194  * under construction.
 195  *
 196  * <p> The following example opens a new {@code StructuredTaskScope} with a {@code
 197  * ThreadFactory} that creates virtual threads {@linkplain Thread#setName(String) named}
 198  * "duke-0", "duke-1" ...
 199  * {@snippet lang = java:
 200  *    // @link substring="name" target="Thread.Builder#name(String, long)" :
 201  *    ThreadFactory factory = Thread.ofVirtual().name("duke-", 0).factory();
 202  *
 203  *    // @link substring="withThreadFactory" target="Config#withThreadFactory(ThreadFactory)" :
 204  *    try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) {
 205  *
 206  *        scope.fork( .. );   // runs in a virtual thread with name "duke-0"
 207  *        scope.fork( .. );   // runs in a virtual thread with name "duke-1"
 208  *
 209  *        scope.join();



 210  *

 211  *     }
 212  *}
















 213  *
 214  * <p> A second example sets a timeout, represented by a {@link Duration}. The timeout
 215  * starts when the new task scope is opened. If the timeout expires before the {@code join}
 216  * method has completed then <a href="#CancelExecution">execution is cancelled</a>. This
 217  * interrupts the threads executing the two subtasks and causes the {@link #join() join}
 218  * method to throw {@link ExecutionException} with {@link TimeoutException} as the cause.
 219  * {@snippet lang=java :
 220  *    Duration timeout = Duration.ofSeconds(10);

 221  *
 222  *    // @link substring="allSuccessfulOrThrow" target="Joiner#allSuccessfulOrThrow()" :
 223  *    try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
 224  *    // @link substring="withTimeout" target="Config#withTimeout(Duration)" :
 225  *                                              cf -> cf.withTimeout(timeout))) {


 226  *
 227  *        scope.fork(() -> query(left));
 228  *        scope.fork(() -> query(right));



 229  *
 230  *        List<String> result = scope.join()
 231  *                                   .map(Subtask::get)
 232  *                                   .toList();
 233  *
 234  *   }

 235  * }



 236  *
 237  * <h2>Inheritance of scoped value bindings</h2>
 238  *
 239  * {@link ScopedValue} supports the execution of a method with a {@code ScopedValue} bound
 240  * to a value for the bounded period of execution of the method by the <em>current thread</em>.
 241  * It allows a value to be safely and efficiently shared to methods without using method
 242  * parameters.
 243  *
 244  * <p> When used in conjunction with a {@code StructuredTaskScope}, a {@code ScopedValue}
 245  * can also safely and efficiently share a value to methods executed by subtasks forked
 246  * in the task scope. When a {@code ScopedValue} object is bound to a value in the thread
 247  * executing the main task then that binding is inherited by the threads created to
 248  * execute the subtasks. The thread executing the main task does not continue beyond the
 249  * {@link #close() close} method until all threads executing the subtasks have finished.
 250  * This ensures that the {@code ScopedValue} is not reverted to being {@linkplain
 251  * ScopedValue#isBound() unbound} (or its previous value) while subtasks are executing.
 252  * In addition to providing a safe and efficient means to inherit a value into subtasks,
 253  * the inheritance allows sequential code using {@code ScopedValue} be refactored to use
 254  * structured concurrency.
 255  *
 256  * <p> To ensure correctness, opening a new {@code StructuredTaskScope} captures the
 257  * current thread's scoped value bindings. These are the scoped values bindings that are
 258  * inherited by the threads created to execute subtasks in the task scope. Forking a
 259  * subtask checks that the bindings in effect at the time that the subtask is forked
 260  * match the bindings when the {@code StructuredTaskScope} was created. This check ensures
 261  * that a subtask does not inherit a binding that is reverted in the main task before the
 262  * subtask has completed.
 263  *
 264  * <p> A {@code ScopedValue} that is shared across threads requires that the value be an
 265  * immutable object or for all access to the value to be appropriately synchronized.




 266  *
 267  * <p> The following example demonstrates the inheritance of scoped value bindings. The
 268  * scoped value USERNAME is bound to the value "duke" for the bounded period of a lambda
 269  * expression by the thread executing it. The code in the block opens a {@code
 270  * StructuredTaskScope} and forks two subtasks, it then waits in the {@code join} method
 271  * and aggregates the results from both subtasks. If code executed by the threads
 272  * running subtask1 and subtask2 uses {@link ScopedValue#get()}, to get the value of
 273  * USERNAME, then value "duke" will be returned.
 274  * {@snippet lang=java :
 275  *     // @link substring="newInstance" target="ScopedValue#newInstance()" :
 276  *     private static final ScopedValue<String> USERNAME = ScopedValue.newInstance();
 277  *
 278  *     // @link substring="callWhere" target="ScopedValue#callWhere" :
 279  *     Result result = ScopedValue.callWhere(USERNAME, "duke", () -> {

 280  *
 281  *         try (var scope = StructuredTaskScope.open(Joiner.awaitAllSuccessfulOrThrow())) {



 282  *
 283  *             Subtask<String> subtask1 = scope.fork( .. );    // inherits binding
 284  *             Subtask<Integer> subtask2 = scope.fork( .. );   // inherits binding
 285  *
 286  *             scope.join();
 287  *             return new MyResult(subtask1.get(), subtask2.get());
 288  *         }
 289  *
 290  *     });
 291  * }
 292  *
 293  * <p> A scoped value inherited into a subtask may be
 294  * <a href="{@docRoot}/java.base/java/lang/ScopedValues.html#rebind">rebound</a> to a new
 295  * value in the subtask for the bounded execution of some method executed in the subtask.
 296  * When the method completes, the value of the {@code ScopedValue} reverts to its previous
 297  * value, the value inherited from the thread executing the main task.
 298  *
 299  * <p> A subtask may execute code that itself opens a new {@code StructuredTaskScope}.
 300  * A main task executing in thread T1 opens a {@code StructuredTaskScope} and forks a
 301  * subtask that runs in thread T2. The scoped value bindings captured when T1 opens the
 302  * task scope are inherited into T2. The subtask (in thread T2) executes code that opens a
 303  * new {@code StructuredTaskScope} and forks a subtask that runs in thread T3. The scoped
 304  * value bindings captured when T2 opens the task scope are inherited into T3. These
 305  * include (or may be the same) as the bindings that were inherited from T1. In effect,
 306  * scoped values are inherited into a tree of subtasks, not just one level of subtask.
 307  *
 308  * <h2>Memory consistency effects</h2>
 309  *
 310  * <p> Actions in the owner thread of a {@code StructuredTaskScope} prior to
 311  * {@linkplain #fork forking} of a subtask
 312  * <a href="{@docRoot}/java.base/java/util/concurrent/package-summary.html#MemoryVisibility">
 313  * <i>happen-before</i></a> any actions taken by that subtask, which in turn
 314  * <i>happen-before</i> the subtask result is {@linkplain Subtask#get() retrieved}.

 315  *
 316  * <h2>General exceptions</h2>
 317  *
 318  * <p> Unless otherwise specified, passing a {@code null} argument to a method in this
 319  * class will cause a {@link NullPointerException} to be thrown.
 320  *
 321  * @param <T> the result type of tasks executed in the task scope
 322  * @param <R> the type of the result returned by the join method
 323  *
 324  * @jls 17.4.5 Happens-before Order
 325  * @since 21
 326  */
 327 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 328 public class StructuredTaskScope<T, R> implements AutoCloseable {
 329     private static final VarHandle CANCELLED;
 330     static {
 331         try {
 332             MethodHandles.Lookup l = MethodHandles.lookup();
 333             CANCELLED = l.findVarHandle(StructuredTaskScope.class,"cancelled", boolean.class);
 334         } catch (Exception e) {
 335             throw new ExceptionInInitializerError(e);
 336         }
 337     }
 338 
 339     private final Joiner<? super T, ? extends R> joiner;
 340     private final ThreadFactory threadFactory;
 341     private final ThreadFlock flock;

 342 
 343     // fields that are only accessed by owner thread
 344     private boolean needToJoin;     // set by fork to indicate that owner must join
 345     private boolean joined;         // set to true when owner joins
 346     private boolean closed;
 347     private Future<?> timerTask;
 348 
 349     // set or read by any thread
 350     private volatile boolean cancelled;
 351 
 352     // set by the timer thread, read by the owner thread
 353     private volatile boolean timeoutExpired;
 354 
 355     /**
 356      * Throws IllegalStateException if the task scope is closed.
 357      */
 358     private void ensureOpen() {
 359         assert Thread.currentThread() == flock.owner();
 360         if (closed) {
 361             throw new IllegalStateException("Task scope is closed");
 362         }
 363     }
 364 
 365     /**
 366      * Throws WrongThreadException if the current thread is not the owner thread.
 367      */
 368     private void ensureOwner() {
 369         if (Thread.currentThread() != flock.owner()) {
 370             throw new WrongThreadException("Current thread not owner");
 371         }
 372     }
 373 
 374     /**
 375      * Throws IllegalStateException if invoked by the owner thread and the owner thread
 376      * has not joined.
 377      */
 378     private void ensureJoinedIfOwner() {
 379         if (Thread.currentThread() == flock.owner() && !joined) {
 380             String msg = needToJoin ? "Owner did not join" : "join did not complete";
 381             throw new IllegalStateException(msg);
 382         }
 383     }
 384 
 385     /**
 386      * Interrupts all threads in this task scope, except the current thread.
 387      */
 388     private void implInterruptAll() {
 389         flock.threads()
 390                 .filter(t -> t != Thread.currentThread())
 391                 .forEach(t -> {
 392                     try {
 393                         t.interrupt();
 394                     } catch (Throwable ignore) { }
 395                 });
 396     }
 397 
 398     @SuppressWarnings("removal")
 399     private void interruptAll() {
 400         if (System.getSecurityManager() == null) {
 401             implInterruptAll();
 402         } else {
 403             PrivilegedAction<Void> pa = () -> {
 404                 implInterruptAll();
 405                 return null;
 406             };
 407             AccessController.doPrivileged(pa);
 408         }
 409     }
 410 
 411     /**
 412      * Cancel exception if not already cancelled.
 413      */
 414     private void cancelExecution() {
 415         if (!cancelled && CANCELLED.compareAndSet(this, false, true)) {
 416             // prevent new threads from starting
 417             flock.shutdown();
 418 
 419             // interrupt all unfinished threads
 420             interruptAll();
 421 
 422             // wakeup join
 423             flock.wakeup();
 424         }
 425     }
 426 
 427     /**
 428      * Schedules a task to cancel execution on timeout.
 429      */
 430     private void scheduleTimeout(Duration timeout) {
 431         assert Thread.currentThread() == flock.owner() && timerTask == null;
 432         timerTask = TimerSupport.schedule(timeout, () -> {
 433             if (!cancelled) {
 434                 timeoutExpired = true;
 435                 cancelExecution();
 436             }
 437         });
 438     }
 439 
 440     /**
 441      * Cancels the timer task if set.
 442      */
 443     private void cancelTimeout() {
 444         assert Thread.currentThread() == flock.owner();
 445         if (timerTask != null) {
 446             timerTask.cancel(false);
 447         }
 448     }
 449 
 450     /**
 451      * Invoked by the thread for a subtask when the subtask completes before execution
 452      * was cancelled.
 453      */
 454     private void onComplete(SubtaskImpl<? extends T> subtask) {
 455         assert subtask.state() != Subtask.State.UNAVAILABLE;
 456         if (joiner.onComplete(subtask)) {
 457             cancelExecution();
 458         }
 459     }
 460 
 461     /**
 462      * Initialize a new StructuredTaskScope.
 463      */
 464     @SuppressWarnings("this-escape")
 465     private StructuredTaskScope(Joiner<? super T, ? extends R> joiner,
 466                                 ThreadFactory threadFactory,
 467                                 String name) {
 468         this.joiner = joiner;
 469         this.threadFactory = threadFactory;
 470 
 471         if (name == null)
 472             name = Objects.toIdentityString(this);
 473         this.flock = ThreadFlock.open(name);
 474     }


 475 
 476     /**
 477      * Represents a subtask forked with {@link #fork(Callable)} or {@link #fork(Runnable)}.
 478      *
 479      * <p> Code that forks subtasks can use the {@link #get() get()} method after {@linkplain
 480      * #join() joining} to obtain the result of a subtask that completed successfully. It
 481      * can use the {@link #exception()} method to obtain the exception thrown by a subtask
 482      * that failed.
 483      *
 484      * @param <T> the result type
 485      * @since 21
 486      */
 487     @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 488     public sealed interface Subtask<T> extends Supplier<T> permits SubtaskImpl {








 489         /**
 490          * Represents the state of a subtask.
 491          * @see Subtask#state()
 492          * @since 21
 493          */
 494         @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 495         enum State {
 496             /**
 497              * The subtask result or exception is not available. This state indicates that
 498              * the subtask was forked but has not completed, it completed after execution
 499              * was cancelled, or it was forked after execution was cancelled (in which
 500              * case a thread was not created to execute the subtask).
 501              */
 502             UNAVAILABLE,
 503             /**
 504              * The subtask completed successfully. The {@link Subtask#get() Subtask.get()}
 505              * method can be used to get the result. This is a terminal state.

 506              */
 507             SUCCESS,
 508             /**
 509              * The subtask failed with an exception. The {@link Subtask#exception()
 510              * Subtask.exception()} method can be used to get the exception. This is a
 511              * terminal state.
 512              */
 513             FAILED,
 514         }
 515 
 516         /**
 517          * {@return the subtask state}
 518          */
 519         State state();
 520 
 521         /**
 522          * Returns the result of this subtask if it completed successfully. If
 523          * {@linkplain #fork(Callable) forked} to execute a value-returning task then the
 524          * result from the {@link Callable#call() call} method is returned. If
 525          * {@linkplain #fork(Runnable) forked} to execute a task that does not return a
 526          * result then {@code null} is returned.
 527          *
 528          * <p> Code executing in the scope owner thread can use this method to get the
 529          * result of a successful subtask only after it has {@linkplain #join() joined}.
 530          *
 531          * <p> Code executing in the {@code Joiner} {@link Joiner#onComplete(Subtask)
 532          * onComplete} method should test that the {@linkplain #state() subtask state} is
 533          * {@link State#SUCCESS SUCCESS} before using this method to get the result.
 534          *
 535          * @return the possibly-null result
 536          * @throws IllegalStateException if the subtask has not completed, did not complete
 537          * successfully, or the current thread is the task scope owner and it has not joined

 538          * @see State#SUCCESS
 539          */
 540         T get();
 541 
 542         /**
 543          * {@return the exception thrown by this subtask if it failed} If
 544          * {@linkplain #fork(Callable) forked} to execute a value-returning task then
 545          * the exception thrown by the {@link Callable#call() call} method is returned.
 546          * If {@linkplain #fork(Runnable) forked} to execute a task that does not return
 547          * a result then the exception thrown by the {@link Runnable#run() run} method is
 548          * returned.
 549          *
 550          * <p> Code executing in the scope owner thread can use this method to get the
 551          * exception thrown by a failed subtask only after it has {@linkplain #join() joined}.
 552          *
 553          * <p> Code executing in a {@code Joiner} {@link Joiner#onComplete(Subtask)
 554          * onComplete} method should test that the {@linkplain #state() subtask state} is
 555          * {@link State#FAILED FAILED} before using this method to get the exception.
 556          *
 557          * @throws IllegalStateException if the subtask has not completed, completed with
 558          * a result, or the current thread is the task scope owner and it has not joined

 559          * @see State#FAILED
 560          */
 561         Throwable exception();
 562     }
 563 
 564     /**
 565      * An object used with a {@link StructuredTaskScope} to handle subtask completion
 566      * and produce the result for a main task waiting in the {@link #join() join} method
 567      * for subtasks to complete.


 568      *
 569      * <p> Joiner defines static methods to create {@code Joiner} objects for common cases:
 570      * <ul>
 571      *   <li> {@link #allSuccessfulOrThrow() allSuccessfulOrThrow()} creates a {@code Joiner}
 572      *   that yields a stream of the completed subtasks for {@code join} to return when
 573      *   all subtasks complete successfully. It cancels execution and causes {@code join}
 574      *   to throw if any subtask fails.
 575      *   <li> {@link #anySuccessfulResultOrThrow() anySuccessfulResultOrThrow()} creates a
 576      *   {@code Joiner} that yields the result of the first subtask to succeed. It cancels
 577      *   execution and causes {@code join} to throw if all subtasks fail.
 578      *   <li> {@link #awaitAllSuccessfulOrThrow() awaitAllSuccessfulOrThrow()} creates a
 579      *   {@code Joiner} that waits for all successful subtasks. It cancels execution and
 580      *   causes {@code join} to throw if any subtask fails.
 581      *   <li> {@link #awaitAll() awaitAll()} creates a {@code Joiner} that waits for all
 582      *   subtasks. If does not cancel execution.
 583      * </ul>
 584      *
 585      * <p> In addition to the methods to create {@code Joiner} objects for common cases,
 586      * the {@link #all(Predicate) all(Predicate)} method is defined to create a {@code
 587      * Joiner} that yields a stream of all subtasks. It is created with a {@link
 588      * Predicate Predicate} that determines if execution should continue or be cancelled.
 589      * This {@code Joiner} can be built upon to create custom policies that cancel
 590      * execution based on some condition.
 591      *
 592      * <p> More advanced policies can be developed by implementing the {@code Joiner}
 593      * interface. The {@link #onFork(Subtask)} method is invoked when subtasks are forked.
 594      * The {@link #onComplete(Subtask)} method is invoked when subtasks complete with a
 595      * result or exception. These methods return a {@code boolean} to indicate if execution
 596      * should be cancelled. These methods can be used to collect subtasks, results, or
 597      * exceptions, and control when to cancel execution. The {@link #result()} method
 598      * must be implemented to produce the result (or exception) for the {@code join}
 599      * method.
 600      *
 601      * <p> Unless otherwise specified, passing a {@code null} argument to a method
 602      * in this class will cause a {@link NullPointerException} to be thrown.
 603      *
 604      * @implSpec Implementations of this interface must be thread safe. The {@link
 605      * #onComplete(Subtask)} method defined by this interface may be invoked by several
 606      * threads concurrently.
 607      *
 608      * @apiNote It is very important that a new {@code Joiner} object is created for each
 609      * {@code StructuredTaskScope}. {@code Joiner} objects should never be shared with
 610      * different task scopes or re-used after a task is closed.
 611      *
 612      * <p> Designing a {@code Joiner} should take into account the code at the use-site
 613      * where the results from the {@link StructuredTaskScope#join() join} method are
 614      * processed. It should be clear what the {@code Joiner} does vs. the application
 615      * code at the use-site. In general, the {@code Joiner} implementation is not the
 616      * place to code "business logic". A {@code Joiner} should be designed to be as
 617      * general purpose as possible.
 618      *
 619      * @param <T> the result type of tasks executed in the task scope
 620      * @param <R> the type of results returned by the join method
 621      * @since 24
 622      * @see #open(Joiner)
 623      */
 624     @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 625     @FunctionalInterface
 626     public interface Joiner<T, R> {
 627 
 628         /**
 629          * Invoked by {@link #fork(Callable) fork(Callable)} and {@link #fork(Runnable)
 630          * fork(Runnable)} when forking a subtask. The method is invoked from the task
 631          * owner thread. The method is invoked before a thread is created to run the
 632          * subtask.
 633          *
 634          * @implSpec The default implementation throws {@code NullPointerException} if the
 635          * subtask is {@code null}. It throws {@code IllegalArgumentException} if the
 636          * subtask is not in the {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state, it
 637          * otherwise returns {@code false}.
 638          *
 639          * @apiNote This method is invoked by the {@code fork} methods. It should not be
 640          * invoked directly.
 641          *
 642          * @param subtask the subtask
 643          * @return {@code true} to cancel execution
 644          */
 645         default boolean onFork(Subtask<? extends T> subtask) {
 646             if (subtask.state() != Subtask.State.UNAVAILABLE) {
 647                 throw new IllegalArgumentException();
 648             }
 649             return false;
 650         }
 651 
 652         /**
 653          * Invoked by the thread started to execute a subtask after the subtask completes
 654          * successfully or fails with an exception. This method is not invoked if a
 655          * subtask completes after execution has been cancelled.
 656          *
 657          * @implSpec The default implementation throws {@code NullPointerException} if the
 658          * subtask is {@code null}. It throws {@code IllegalArgumentException} if the
 659          * subtask is not in the {@link Subtask.State#SUCCESS SUCCESS} or {@link
 660          * Subtask.State#FAILED FAILED} state, it otherwise returns {@code false}.
 661          *
 662          * @apiNote This method is invoked by subtasks when they complete. It should not
 663          * be invoked directly.
 664          *
 665          * @param subtask the subtask
 666          * @return {@code true} to cancel execution
 667          */
 668         default boolean onComplete(Subtask<? extends T> subtask) {
 669             if (subtask.state() == Subtask.State.UNAVAILABLE) {
 670                 throw new IllegalArgumentException();
 671             }
 672             return false;
 673         }
 674 
 675         /**
 676          * Invoked by {@link #join()} to produce the result (or exception) after waiting
 677          * for all subtasks to complete or execution to be cancelled. The result from this
 678          * method is returned by the {@code join} method. If this method throws, then
 679          * {@code join} throws {@link ExecutionException} with the exception thrown by
 680          * this method as the cause.
 681          *
 682          * <p> In normal usage, this method will be called at most once to produce the
 683          * result (or exception). If the {@code join} method is called more than once
 684          * then this method may be called more than once to produce the result. An
 685          * implementation should return an equal result (or throw the same exception) on
 686          * second or subsequent calls to produce the outcome.
 687          *
 688          * @apiNote This method is invoked by the {@code join} method. It should not be
 689          * invoked directly.
 690          *
 691          * @return the result
 692          * @throws Throwable the exception
 693          */
 694         R result() throws Throwable;
 695 
 696         /**
 697          * {@return a new Joiner object that yields a stream of all subtasks when all
 698          * subtasks complete successfully, or throws if any subtask fails}
 699          * If any subtask fails then execution is cancelled.
 700          *
 701          * <p> If all subtasks complete successfully, the joiner's {@link Joiner#result()}
 702          * method returns a stream of all subtasks in the order that they were forked.
 703          * If any subtask failed then the {@code result} method throws the exception from
 704          * the first subtask to fail.
 705          *
 706          * @apiNote This joiner is intended for cases where the results for all subtasks
 707          * are required ("invoke all"); if any subtask fails then the results of other
 708          * unfinished subtasks are no longer needed. A typical usage will be when the
 709          * subtasks return results of the same type, the returned stream of forked
 710          * subtasks can be used to get the results.
 711          *
 712          * @param <T> the result type of subtasks
 713          */
 714         static <T> Joiner<T, Stream<Subtask<T>>> allSuccessfulOrThrow() {
 715             return new AllSuccessful<>();
 716         }
 717 
 718         /**
 719          * {@return a new Joiner object that yields the result of a subtask that completed
 720          * successfully, or throws if all subtasks fail} If any subtask completes
 721          * successfully then execution is cancelled.
 722          *
 723          * <p> The joiner's {@link Joiner#result()} method returns the result of a subtask
 724          * that completed successfully. If all subtasks fail then the {@code result} method
 725          * throws the exception from one of the failed subtasks. The {@code result} method
 726          * throws {@code NoSuchElementException} if no subtasks were forked.
 727          *
 728          * @apiNote This joiner is intended for cases where the result of any subtask will
 729          * do ("invoke any") and where the results of other unfinished subtasks are no
 730          * longer needed.
 731          *
 732          * @param <T> the result type of subtasks
 733          */
 734         static <T> Joiner<T, T> anySuccessfulResultOrThrow() {
 735             return new AnySuccessful<>();
 736         }
 737 
 738         /**
 739          * {@return a new Joiner object that waits for all successful subtasks. It
 740          * <a href="StructuredTaskScope.html#CancelExecution">cancels execution</a> if
 741          * any subtask fails}
 742          *
 743          * <p> The joiner's {@link Joiner#result() result} method returns {@code null}
 744          * if all subtasks complete successfully, or throws the exception from the first
 745          * subtask to fail.
 746          *
 747          * @apiNote This joiner is intended for cases where the results for all subtasks
 748          * are required ("invoke all"), and where the code {@linkplain #fork(Callable)
 749          * forking} subtasks keeps a reference to the {@linkplain Subtask Subtask} objects.
 750          * A typical usage will be when subtasks return results of different types.
 751          *
 752          * @param <T> the result type of subtasks
 753          */
 754         static <T> Joiner<T, Void> awaitAllSuccessfulOrThrow() {
 755             return new AwaitSuccessful<>();
 756         }
 757 
 758         /**
 759          * {@return a new Joiner object that waits for all subtasks}
 760          *
 761          * <p> The joiner's {@link Joiner#result() result} method returns {@code null}.
 762          *
 763          * @apiNote This joiner is intended for cases where subtasks make use of
 764          * <em>side-effects</em> rather than return results or fail with exceptions.
 765          * The {@link #fork(Runnable) fork(Runnable)} method can be used to fork subtasks
 766          * that do not return a result.
 767          *
 768          * @param <T> the result type of subtasks
 769          */
 770         static <T> Joiner<T, Void> awaitAll() {
 771             // ensure that new Joiner object is returned
 772             return new Joiner<T, Void>() {
 773                 @Override
 774                 public Void result() {
 775                     return null;
 776                 }
 777             };
 778         }
 779 
 780         /**
 781          * {@return a new Joiner object that yields a stream of all subtasks, cancelling
 782          * execution when evaluating a completed subtask with the given predicate returns
 783          * {@code true}}
 784          *
 785          * <p> The joiner's {@link Joiner#onComplete(Subtask)} method invokes the
 786          * predicate's {@link Predicate#test(Object) test} method with the subtask that
 787          * completed successfully or failed with an exception. If the {@code test} method
 788          * returns {@code true} then <a href="StructuredTaskScope.html#CancelExecution">
 789          * execution is cancelled</a>. The {@code test} method must be thread safe as it
 790          * may be invoked concurrently from several threads.
 791          *
 792          * <p> The joiner's {@link #result()} method returns the stream of all subtasks,
 793          * in fork order. The stream may contain subtasks that have completed
 794          * (in {@link Subtask.State#SUCCESS SUCCESS} or {@link Subtask.State#FAILED FAILED}
 795          * state) or subtasks in the {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state
 796          * if execution was cancelled before all subtasks were forked or completed.
 797          *
 798          * <p> The following example uses this method to create a {@code Joiner} that
 799          * <a href="StructuredTaskScope.html#CancelExecution">cancels execution</a> when
 800          * two or more subtasks fail.
 801          * {@snippet lang=java :
 802          *    class CancelAfterTwoFailures<T> implements Predicate<Subtask<? extends T>> {
 803          *         private final AtomicInteger failedCount = new AtomicInteger();
 804          *         @Override
 805          *         public boolean test(Subtask<? extends T> subtask) {
 806          *             return subtask.state() == Subtask.State.FAILED
 807          *                     && failedCount.incrementAndGet() >= 2;
 808          *         }
 809          *     }
 810          *
 811          *     var joinPolicy = Joiner.all(new CancelAfterTwoFailures<String>());
 812          * }
 813          *
 814          * @param isDone the predicate to evaluate completed subtasks
 815          * @param <T> the result type of subtasks
 816          */
 817         static <T> Joiner<T, Stream<Subtask<T>>> all(Predicate<Subtask<? extends T>> isDone) {
 818             return new AllSubtasks<>(isDone);
 819         }
 820     }
 821 
 822     /**
 823      * Represents the configuration for a {@code StructuredTaskScope}.


 824      *
 825      * <p> The configuration for a {@code StructuredTaskScope} consists of a {@link
 826      * ThreadFactory} to create threads, an optional name for the purposes of monitoring
 827      * and management, and an optional timeout.
 828      *
 829      * <p> Creating a {@code StructuredTaskScope} with its 1-arg {@link #open(Joiner) open}
 830      * method uses the <a href="StructuredTaskScope.html#DefaultConfiguration">default
 831      * configuration</a>. The default configuration consists of a thread factory that
 832      * creates unnamed <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">
 833      * virtual threads</a>, no name for monitoring and management purposes, and no timeout.
 834      *
 835      * <p> Creating a {@code StructuredTaskScope} with its 2-arg {@link #open(Joiner, Function)
 836      * open} method allows a different configuration to be used. The function specified
 837      * to the {@code open} method is applied to the default configuration and returns the
 838      * configuration for the {@code StructuredTaskScope} under construction. The function
 839      * can use the {@code with-} prefixed methods defined here to specify the components
 840      * of the configuration to use.
 841      *
 842      * <p> Unless otherwise specified, passing a {@code null} argument to a method
 843      * in this class will cause a {@link NullPointerException} to be thrown.
 844      *
 845      * @since 24
 846      */
 847     @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 848     public sealed interface Config permits ConfigImpl {
 849         /**
 850          * {@return a new {@code Config} object with the given thread factory}
 851          * The other components are the same as this object. The thread factory is used by
 852          * a task scope to create threads when {@linkplain #fork(Callable) forking} subtasks.
 853          * @param threadFactory the thread factory
 854          *
 855          * @apiNote The thread factory will typically create
 856          * <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>,
 857          * maybe with names for monitoring purposes, an {@linkplain Thread.UncaughtExceptionHandler
 858          * uncaught exception handler}, or other properties configured.
 859          *
 860          * @see #fork(Callable)
 861          */
 862         Config withThreadFactory(ThreadFactory threadFactory);
 863 
 864         /**
 865          * {@return a new {@code Config} object with the given name}
 866          * The other components are the same as this object. A task scope is optionally
 867          * named for the purposes of monitoring and management.
 868          * @param name the name
 869          * @see StructuredTaskScope#toString()
 870          */
 871         Config withName(String name);
 872 
 873         /**
 874          * {@return a new {@code Config} object with the given timeout}
 875          * The other components are the same as this object.
 876          * @param timeout the timeout
 877          *
 878          * @apiNote Applications using deadlines, expressed as an {@link java.time.Instant},
 879          * can use {@link Duration#between Duration.between(Instant.now(), deadline)} to
 880          * compute the timeout for this method.
 881          *
 882          * @see #join()
 883          */
 884         Config withTimeout(Duration timeout);
 885     }
 886 
 887     /**
 888      * Opens a new structured task scope to use the given {@code Joiner} object and with
 889      * configuration that is the result of applying the given function to the
 890      * <a href="#DefaultConfiguration">default configuration</a>.
 891      *
 892      * <p> The {@code configFunction} is called with the default configuration and returns
 893      * the configuration for the new structured task scope. The function may, for example,
 894      * set the {@linkplain Config#withThreadFactory(ThreadFactory) ThreadFactory} or set
 895      * a {@linkplain Config#withTimeout(Duration) timeout}.
 896      *
 897      * <p> If a {@linkplain Config#withThreadFactory(ThreadFactory) ThreadFactory} is set
 898      * then the {@code ThreadFactory}'s {@link ThreadFactory#newThread(Runnable) newThread}
 899      * method will be used to create threads when forking subtasks in this task scope.
 900      *
 901      * <p> If a {@linkplain Config#withTimeout(Duration) timeout} is set then it starts
 902      * when the task scope is opened. If the timeout expires before the task scope has
 903      * {@linkplain #join() joined} then execution is cancelled and the {@code join} method
 904      * throws {@link ExecutionException} with {@link TimeoutException} as the cause.
 905      *
 906      * <p> The new task scope is owned by the current thread. Only code executing in this
 907      * thread can {@linkplain #fork(Callable) fork}, {@linkplain #join() join}, or
 908      * {@linkplain #close close} the task scope.
 909      *
 910      * <p> Construction captures the current thread's {@linkplain ScopedValue scoped
 911      * value} bindings for inheritance by threads started in the task scope.
 912      *
 913      * @param joiner the joiner
 914      * @param configFunction a function to produce the configuration
 915      * @return a new task scope
 916      * @param <T> the result type of tasks executed in the task scope
 917      * @param <R> the type of the result returned by the join method
 918      * @since 24
 919      */
 920     public static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner,
 921                                                         Function<Config, Config> configFunction) {
 922         Objects.requireNonNull(joiner);
 923 
 924         var config = (ConfigImpl) configFunction.apply(ConfigImpl.defaultConfig());
 925         var scope = new StructuredTaskScope<T, R>(joiner, config.threadFactory(), config.name());
 926 
 927         // schedule timeout
 928         Duration timeout = config.timeout();
 929         if (timeout != null) {
 930             boolean done = false;
 931             try {
 932                 scope.scheduleTimeout(timeout);
 933                 done = true;
 934             } finally {
 935                 if (!done) {
 936                     scope.close();  // pop if scheduling timeout failed
 937                 }
 938             }
 939         }
 940 
 941         return scope;
 942     }
 943 
 944     /**
 945      * Opens a new structured task scope to use the given {@code Joiner} object. The
 946      * task scope is created with the <a href="#DefaultConfiguration">default configuration</a>.
 947      * The default configuration has a {@code ThreadFactory} that creates unnamed
 948      * <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>,
 949      * is unnamed for monitoring and management purposes, and has no timeout.
 950      *
 951      * @implSpec
 952      * This factory method is equivalent to invoking the 2-arg open method with the given
 953      * joiner and the {@linkplain Function#identity() identity function}.


 954      *
 955      * @param joiner the joiner
 956      * @return a new task scope
 957      * @param <T> the result type of tasks executed in the task scope
 958      * @param <R> the type of the result returned by the join method
 959      * @since 24
 960      */
 961     public static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner) {
 962         return open(joiner, Function.identity());
 963     }
 964 
 965     /**
 966      * Starts a new thread in this task scope to execute a value-returning task, thus
 967      * creating a <em>subtask</em>. The value-returning task is provided to this method
 968      * as a {@link Callable}, the thread executes the task's {@link Callable#call() call}
 969      * method.
 970      *
 971      * <p> This method first creates a {@link Subtask Subtask} to represent the <em>forked
 972      * subtask</em>. It invokes the joiner's {@link Joiner#onFork(Subtask) onFork} method
 973      * with the {@code Subtask} object. If the {@code onFork} completes with an exception
 974      * or error then it is propagated by the {@code fork} method. If execution is
 975      * {@linkplain #isCancelled() cancelled}, or {@code onFork} returns {@code true} to
 976      * cancel execution, then this method returns the {@code Subtask} (in the {@link
 977      * Subtask.State#UNAVAILABLE UNAVAILABLE} state) without creating a thread to execute
 978      * the subtask. If execution is not cancelled then a thread is created with the
 979      * {@link ThreadFactory} configured when the task scope was created, and the thread is
 980      * started. Forking a subtask inherits the current thread's {@linkplain ScopedValue
 981      * scoped value} bindings. The bindings must match the bindings captured when the
 982      * task scope was opened. If the subtask completes (successfully or with an exception)
 983      * before execution is cancelled, then the thread invokes the joiner's
 984      * {@link Joiner#onComplete(Subtask) onComplete} method with subtask in the
 985      * {@link Subtask.State#SUCCESS SUCCESS} or {@link Subtask.State#FAILED FAILED} state.
 986      *
 987      * <p> This method returns the {@link Subtask Subtask} object. In some usages, this
 988      * object may be used to get its result. In other cases it may be used for correlation
 989      * or just discarded. To ensure correct usage, the {@link Subtask#get() Subtask.get()}
 990      * method may only be called by the task scope owner to get the result after it has
 991      * waited for subtasks to complete with the {@link #join() join} method and the subtask
 992      * completed successfully. Similarly, the {@link Subtask#exception() Subtask.exception()}
 993      * method may only be called by the task scope owner after it has joined and the subtask
 994      * failed. If execution was cancelled before the subtask was forked, or before it
 995      * completes, then neither method can be used to obtain the outcome.
 996      *
 997      * <p> This method may only be invoked by the task scope owner.


 998      *
 999      * @param task the value-returning task for the thread to execute
1000      * @param <U> the result type
1001      * @return the subtask
1002      * @throws IllegalStateException if this task scope is closed or the owner has already
1003      * joined
1004      * @throws WrongThreadException if the current thread is not the task scope owner
1005      * @throws StructureViolationException if the current scoped value bindings are not
1006      * the same as when the task scope was created
1007      * @throws RejectedExecutionException if the thread factory rejected creating a
1008      * thread to run the subtask
1009      */
1010     public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
1011         Objects.requireNonNull(task);
1012         ensureOwner();
1013         ensureOpen();
1014         if (joined) {
1015             throw new IllegalStateException("Already joined");






1016         }
1017 
1018         var subtask = new SubtaskImpl<U>(this, task);
1019 
1020         // notify joiner, even if cancelled
1021         if (joiner.onFork(subtask)) {
1022             cancelExecution();
1023         }
1024 
1025         if (!cancelled) {
1026             // create thread to run task
1027             Thread thread = threadFactory.newThread(subtask);
1028             if (thread == null) {
1029                 throw new RejectedExecutionException("Rejected by thread factory");
1030             }
1031 
1032             // attempt to start the thread
1033             try {
1034                 flock.start(thread);
1035             } catch (IllegalStateException e) {
1036                 // shutdown by another thread, or underlying flock is shutdown due
1037                 // to unstructured use
1038             }
1039         }
1040 
1041         needToJoin = true;





1042         return subtask;
1043     }
1044 
1045     /**
1046      * Starts a new thread in this task scope to execute a task that does not return a
1047      * result, creating a <em>subtask</em>.



























1048      *
1049      * <p> This method works exactly the same as {@link #fork(Callable)} except that
1050      * the task is provided to this method as a {@link Runnable}, the thread executes
1051      * the task's {@link Runnable#run() run} method, and its result is {@code null}.

1052      *
1053      * @param task the task for the thread to execute
1054      * @return the subtask
1055      * @throws IllegalStateException if this task scope is closed or the owner has already
1056      * joined
1057      * @throws WrongThreadException if the current thread is not the task scope owner
1058      * @throws StructureViolationException if the current scoped value bindings are not
1059      * the same as when the task scope was created
1060      * @throws RejectedExecutionException if the thread factory rejected creating a
1061      * thread to run the subtask
1062      * @since 24
1063      */
1064     public Subtask<? extends T> fork(Runnable task) {
1065         Objects.requireNonNull(task);
1066         return fork(() -> { task.run(); return null; });




1067     }
1068 
1069     /**
1070      * Waits for all subtasks started in this task scope to complete or execution to be
1071      * cancelled. If a {@linkplain  Config#withTimeout(Duration) timeout} has been set
1072      * then execution will be cancelled if the timeout expires before or while waiting.
1073      * Once finished waiting, the {@code Joiner}'s {@link Joiner#result() result}
1074      * method is invoked to get the result or throw an exception. If the {@code result}
1075      * method throws then this method throws {@code ExecutionException} with the
1076      * exception thrown by the {@code result()} method as the cause.
1077      *
1078      * <p> This method waits for all subtasks by waiting for all threads {@linkplain
1079      * #fork(Callable) started} in this task scope to finish execution. It stops waiting
1080      * when all threads finish, the {@code Joiner}'s {@link Joiner#onFork(Subtask)
1081      * onFork} or {@link Joiner#onComplete(Subtask) onComplete} returns {@code true}
1082      * to cancel execution, the timeout (if set) expires, or the current thread is
1083      * {@linkplain Thread#interrupt() interrupted}.
1084      *
1085      * <p> This method may only be invoked by the task scope owner.
1086      *
1087      * @return the {@link Joiner#result() result}






1088      * @throws IllegalStateException if this task scope is closed
1089      * @throws WrongThreadException if the current thread is not the task scope owner
1090      * @throws ExecutionException if the joiner's {@code result} method throws, or with
1091      * cause {@link TimeoutException} if a timeout is set and the timeout expires
1092      * @throws InterruptedException if interrupted while waiting
1093      * @since 24
1094      */
1095     public R join() throws ExecutionException, InterruptedException {
1096         ensureOwner();
1097         ensureOpen();

















1098 
1099         if (!joined) {
1100             // owner has attempted to join
1101             needToJoin = false;









1102 
1103             // wait for all threads, execution to be cancelled, or interrupt
1104             flock.awaitAll();








1105 
1106             // throw if timeout expired while waiting
1107             if (timeoutExpired) {
1108                 throw new ExecutionException(new TimeoutException());
1109             }
1110 
1111             // join completed successfully
1112             cancelTimeout();
1113             joined = true;
1114         }
1115 
1116         // invoke joiner to get result
1117         try {
1118             return joiner.result();
1119         } catch (Throwable e) {
1120             throw new ExecutionException(e);


1121         }
1122     }
1123 
1124     /**
1125      * Cancels execution. This method allows the task scope owner to explicitly
1126      * <a href="#CancelExecution">cancel execution</a>. If not already cancelled, this
1127      * method {@linkplain Thread#interrupt() interrupts} the threads executing subtasks
1128      * that have not completed, and prevents new threads from being started in the task
1129      * scope.










1130      *
1131      * @apiNote This method is intended for cases where a task scope is created with a
1132      * {@link Joiner Joiner} that doesn't cancel execution or where the code in the main
1133      * task needs to cancel execution due to some exception or other condition in the main
1134      * task. The following example accepts network connections indefinitely, forking a
1135      * subtask to handle each connection. If the {@code accept()} method in the example
1136      * throws then the main task cancels execution before joining and closing the scope.
1137      * The {@link #close() close} method waits for the interrupted threads to finish.
1138      *
1139      * {@snippet lang=java :
1140      *    // @link substring="awaitAll" target="Joiner#awaitAll()" :
1141      *    try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
1142      *
1143      *        try {
1144      *            while (true) {
1145      *                Socket peer = listener.accept();
1146      *                // @link substring="fork" target="#fork(Runnable)" :
1147      *                scope.fork(() -> handle(peer));
1148      *            }
1149      *        } finally {
1150      *            scope.cancel();
1151      *            scope.join();    // completes immediately
1152      *        }
1153      *
1154      *   }
1155      * }



1156      *
1157      * @throws IllegalStateException if this task scope is closed
1158      * @throws WrongThreadException if the current thread is not the task scope owner
1159      * @since 24

1160      */
1161     public void cancel() {
1162         ensureOwner();
1163         ensureOpen();
1164         cancelExecution();

1165     }
1166 
1167     /**
1168      * {@return {@code true} if <a href="#CancelExecution">execution is cancelled</a>,
1169      * or in the process of being cancelled, otherwise {@code false}}
1170      *
1171      * <p> Cancelling execution prevents new threads from starting in the task scope and
1172      * {@linkplain Thread#interrupt() interrupts} threads executing unfinished subtasks.
1173      * It may take some time before the interrupted threads finish execution; this
1174      * method may return {@code true} before all threads have been interrupted or before
1175      * all threads have finished.
1176      *
1177      * @apiNote A main task with a lengthy "forking phase" (the code that executes before
1178      * the main task invokes {@link #join() join}) may use this method to avoid doing work
1179      * in cases where execution was cancelled by the completion of a previously forked
1180      * subtask or timeout.
1181      *
1182      * @since 24
1183      */
1184     public boolean isCancelled() {
1185         return cancelled;
1186     }
1187 
1188     /**
1189      * Closes this task scope.
1190      *
1191      * <p> This method first <a href="#CancelExecution">cancels execution</a>, if not
1192      * already cancelled. This interrupts the threads executing unfinished subtasks. This
1193      * method then waits for all threads to finish. If interrupted while waiting then it
1194      * will continue to wait until the threads finish, before completing with the interrupt
1195      * status set.
1196      *
1197      * <p> This method may only be invoked by the task scope owner. If the task scope
1198      * is already closed then the task scope owner invoking this method has no effect.
1199      *
1200      * <p> A {@code StructuredTaskScope} is intended to be used in a <em>structured
1201      * manner</em>. If this method is called to close a task scope before nested task
1202      * scopes are closed then it closes the underlying construct of each nested task scope
1203      * (in the reverse order that they were created in), closes this task scope, and then
1204      * throws {@link StructureViolationException}.
1205      * Similarly, if this method is called to close a task scope while executing with
1206      * {@linkplain ScopedValue scoped value} bindings, and the task scope was created
1207      * before the scoped values were bound, then {@code StructureViolationException} is
1208      * thrown after closing the task scope.
1209      * If a thread terminates without first closing task scopes that it owns then
1210      * termination will cause the underlying construct of each of its open tasks scopes to
1211      * be closed. Closing is performed in the reverse order that the task scopes were
1212      * created in. Thread termination may therefore be delayed when the task scope owner
1213      * has to wait for threads forked in these task scopes to finish.
1214      *



1215      * @throws IllegalStateException thrown after closing the task scope if the task scope
1216      * owner did not attempt to join after forking
1217      * @throws WrongThreadException if the current thread is not the task scope owner
1218      * @throws StructureViolationException if a structure violation was detected
1219      */
1220     @Override
1221     public void close() {
1222         ensureOwner();
1223         if (closed) {

1224             return;
1225         }
1226 
1227         // cancel execution if not already joined
1228         if (!joined) {
1229             cancelExecution();
1230             cancelTimeout();
1231         }
1232 
1233         // wait for stragglers
1234         try {


1235             flock.close();
1236         } finally {
1237             closed = true;
1238         }
1239 
1240         // throw ISE if the owner didn't join after forking
1241         if (needToJoin) {
1242             needToJoin = false;
1243             throw new IllegalStateException("Owner did not join");
1244         }
1245     }
1246 
1247     /**
1248      * {@inheritDoc}  If a {@link Config#withName(String) name} for monitoring and
1249      * monitoring purposes has been set then the string representation includes the name.
1250      */
1251     @Override
1252     public String toString() {
1253         return flock.name();






1254     }
1255 
1256     /**
1257      * Subtask implementation, runs the task specified to the fork method.
1258      */
1259     private static final class SubtaskImpl<T> implements Subtask<T>, Runnable {
1260         private static final AltResult RESULT_NULL = new AltResult(Subtask.State.SUCCESS);
1261 
1262         private record AltResult(Subtask.State state, Throwable exception) {
1263             AltResult(Subtask.State state) {
1264                 this(state, null);
1265             }
1266         }
1267 
1268         private final StructuredTaskScope<? super T, ?> scope;
1269         private final Callable<? extends T> task;

1270         private volatile Object result;
1271 
1272         SubtaskImpl(StructuredTaskScope<? super T, ?> scope, Callable<? extends T> task) {


1273             this.scope = scope;
1274             this.task = task;

1275         }
1276 
1277         @Override
1278         public void run() {
1279             T result = null;
1280             Throwable ex = null;
1281             try {
1282                 result = task.call();
1283             } catch (Throwable e) {
1284                 ex = e;
1285             }
1286 
1287             // nothing to do if task scope is cancelled
1288             if (scope.isCancelled())
1289                 return;
1290 
1291             // set result/exception and invoke onComplete
1292             if (ex == null) {
1293                 this.result = (result != null) ? result : RESULT_NULL;
1294             } else {
1295                 this.result = new AltResult(State.FAILED, ex);
1296             }
1297             scope.onComplete(this);





1298         }
1299 
1300         @Override
1301         public Subtask.State state() {
1302             Object result = this.result;
1303             if (result == null) {
1304                 return State.UNAVAILABLE;
1305             } else if (result instanceof AltResult alt) {
1306                 // null or failed
1307                 return alt.state();
1308             } else {
1309                 return State.SUCCESS;
1310             }
1311         }
1312 
1313 
1314         @Override
1315         public T get() {
1316             scope.ensureJoinedIfOwner();
1317             Object result = this.result;
1318             if (result instanceof AltResult) {
1319                 if (result == RESULT_NULL) return null;
1320             } else if (result != null) {
1321                 @SuppressWarnings("unchecked")
1322                 T r = (T) result;
1323                 return r;
1324             }
1325             throw new IllegalStateException(
1326                     "Result is unavailable or subtask did not complete successfully");
1327         }
1328 
1329         @Override
1330         public Throwable exception() {
1331             scope.ensureJoinedIfOwner();
1332             Object result = this.result;
1333             if (result instanceof AltResult alt && alt.state() == State.FAILED) {
1334                 return alt.exception();
1335             }
1336             throw new IllegalStateException(
1337                     "Exception is unavailable or subtask did not complete with exception");
1338         }
1339 
1340         @Override
1341         public String toString() {
1342             String stateAsString = switch (state()) {
1343                 case UNAVAILABLE -> "[Unavailable]";
1344                 case SUCCESS     -> "[Completed successfully]";
1345                 case FAILED      -> {
1346                     Throwable ex = ((AltResult) result).exception();
1347                     yield "[Failed: " + ex + "]";
1348                 }
1349             };
1350             return Objects.toIdentityString(this) + stateAsString;
1351         }
1352     }
1353 
1354     /**
1355      * A joiner that returns a stream of all subtasks when all subtasks complete
1356      * successfully. If any subtask fails then execution is cancelled.














1357      */
1358     private static final class AllSuccessful<T> implements Joiner<T, Stream<Subtask<T>>> {



1359         private static final VarHandle FIRST_EXCEPTION;
1360         static {
1361             try {
1362                 MethodHandles.Lookup l = MethodHandles.lookup();
1363                 FIRST_EXCEPTION = l.findVarHandle(AllSuccessful.class, "firstException", Throwable.class);

1364             } catch (Exception e) {
1365                 throw new ExceptionInInitializerError(e);
1366             }
1367         }
1368         // list of forked subtasks, only accessed by owner thread
1369         private final List<Subtask<T>> subtasks = new ArrayList<>();
1370         private volatile Throwable firstException;
1371 
1372         @Override
1373         public boolean onFork(Subtask<? extends T> subtask) {
1374             @SuppressWarnings("unchecked")
1375             var tmp = (Subtask<T>) subtask;
1376             subtasks.add(tmp);
1377             return false;












1378         }
1379 
1380         @Override
1381         public boolean onComplete(Subtask<? extends T> subtask) {
1382             return (subtask.state() == Subtask.State.FAILED)
1383                     && (firstException == null)
1384                     && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());



1385         }
1386 
1387         @Override
1388         public Stream<Subtask<T>> result() throws Throwable {
1389             Throwable ex = firstException;
1390             if (ex != null) {
1391                 throw ex;
1392             } else {
1393                 return subtasks.stream();
1394             }
1395         }
1396     }
1397 
1398     /**
1399      * A joiner that returns the result of the first subtask to complete successfully.
1400      * If any subtask completes successfully then execution is cancelled.
1401      */
1402     private static final class AnySuccessful<T> implements Joiner<T, T> {
1403         private static final VarHandle FIRST_SUCCESS;
1404         private static final VarHandle FIRST_EXCEPTION;
1405         static {
1406             try {
1407                 MethodHandles.Lookup l = MethodHandles.lookup();
1408                 FIRST_SUCCESS = l.findVarHandle(AnySuccessful.class, "firstSuccess", Subtask.class);
1409                 FIRST_EXCEPTION = l.findVarHandle(AnySuccessful.class, "firstException", Throwable.class);
1410             } catch (Exception e) {
1411                 throw new ExceptionInInitializerError(e);
1412             }
1413         }
1414         private volatile Subtask<T> firstSuccess;
1415         private volatile Throwable firstException;
1416 
















1417         @Override
1418         public boolean onComplete(Subtask<? extends T> subtask) {
1419             if (firstSuccess == null) {
1420                 if (subtask.state() == Subtask.State.SUCCESS) {
1421                     // capture the first subtask that completes successfully
1422                     return FIRST_SUCCESS.compareAndSet(this, null, subtask);
1423                 } else if (firstException == null) {
1424                     // capture the exception thrown by the first task to fail
1425                     FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
1426                 }
1427             }
1428             return false;
1429         }
1430 

















1431         @Override
1432         public T result() throws Throwable {
1433             Subtask<T> firstSuccess = this.firstSuccess;
1434             if (firstSuccess != null) {
1435                 return firstSuccess.get();


















































1436             }
1437             Throwable firstException = this.firstException;
1438             if (firstException != null) {
1439                 throw firstException;
1440             } else {
1441                 throw new NoSuchElementException("No subtasks completed");

1442             }


1443         }
1444     }
1445 
1446     /**
1447      * A joiner that that waits for all successful subtasks. If any subtask fails the
1448      * execution is cancelled.













1449      */
1450     private static final class AwaitSuccessful<T> implements Joiner<T, Void> {

1451         private static final VarHandle FIRST_EXCEPTION;
1452         static {
1453             try {
1454                 MethodHandles.Lookup l = MethodHandles.lookup();
1455                 FIRST_EXCEPTION = l.findVarHandle(AwaitSuccessful.class, "firstException", Throwable.class);
1456             } catch (Exception e) {
1457                 throw new ExceptionInInitializerError(e);
1458             }
1459         }
1460         private volatile Throwable firstException;
1461 
1462         @Override
1463         public boolean onComplete(Subtask<? extends T> subtask) {
1464             return (subtask.state() == Subtask.State.FAILED)
1465                     && (firstException == null)
1466                     && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());













1467         }
1468 
1469         @Override
1470         public Void result() throws Throwable {
1471             Throwable ex = firstException;
1472             if (ex != null) {
1473                 throw ex;
1474             } else {
1475                 return null;
1476             }
1477         }
1478     }
1479 
1480     /**
1481      * A joiner that returns a stream of all subtasks.
1482      */
1483     private static class AllSubtasks<T> implements Joiner<T, Stream<Subtask<T>>> {
1484         private final Predicate<Subtask<? extends T>> isDone;
1485         // list of forked subtasks, only accessed by owner thread
1486         private final List<Subtask<T>> subtasks = new ArrayList<>();
1487 
1488         AllSubtasks(Predicate<Subtask<? extends T>> isDone) {
1489             this.isDone = Objects.requireNonNull(isDone);
1490         }
1491 
1492         @Override
1493         public boolean onFork(Subtask<? extends T> subtask) {
1494             @SuppressWarnings("unchecked")
1495             var tmp = (Subtask<T>) subtask;
1496             subtasks.add(tmp);
1497             return false;

1498         }
1499 















1500         @Override
1501         public boolean onComplete(Subtask<? extends T> subtask) {
1502             return isDone.test(subtask);

1503         }
1504 
















1505         @Override
1506         public Stream<Subtask<T>> result() {
1507             return subtasks.stream();



1508         }
1509     }
1510 
1511     /**
1512      * Implementation of Config.
1513      */
1514     private record ConfigImpl(ThreadFactory threadFactory,
1515                               String name,
1516                               Duration timeout) implements Config {
1517         static Config defaultConfig() {
1518             return new ConfigImpl(Thread.ofVirtual().factory(), null, null);





1519         }
1520 
1521         @Override
1522         public Config withThreadFactory(ThreadFactory threadFactory) {
1523             return new ConfigImpl(Objects.requireNonNull(threadFactory), name, timeout);









1524         }
1525 
1526         @Override
1527         public Config withName(String name) {
1528             return new ConfigImpl(threadFactory, Objects.requireNonNull(name), timeout);
1529         }
1530 
1531         @Override
1532         public Config withTimeout(Duration timeout) {
1533             return new ConfigImpl(threadFactory, name, Objects.requireNonNull(timeout));
1534         }
1535     }
1536 
1537     /**
1538      * Used to schedule a task to cancel exception when a timeout expires.
1539      */
1540     private static class TimerSupport {
1541         private static final ScheduledExecutorService DELAYED_TASK_SCHEDULER;
1542         static {
1543             ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor)
1544                 Executors.newScheduledThreadPool(1, task -> {
1545                     Thread t = InnocuousThread.newThread("StructuredTaskScope-Timer", task);
1546                     t.setDaemon(true);
1547                     return t;
1548                 });
1549             stpe.setRemoveOnCancelPolicy(true);
1550             DELAYED_TASK_SCHEDULER = stpe;
1551         }
1552 
1553         static Future<?> schedule(Duration timeout, Runnable task) {
1554             long nanos = TimeUnit.NANOSECONDS.convert(timeout);
1555             return DELAYED_TASK_SCHEDULER.schedule(task, nanos, TimeUnit.NANOSECONDS);
1556         }
1557     }
1558 }
< prev index next >