1 /*
   2  * Copyright (c) 2021, 2024, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.concurrent;
  26 
  27 import java.lang.invoke.MethodHandles;
  28 import java.lang.invoke.VarHandle;
  29 import java.security.AccessController;
  30 import java.security.PrivilegedAction;
  31 import java.time.Duration;
  32 import java.util.ArrayList;
  33 import java.util.List;
  34 import java.util.NoSuchElementException;
  35 import java.util.Objects;
  36 import java.util.function.Function;
  37 import java.util.function.Predicate;
  38 import java.util.function.Supplier;
  39 import java.util.stream.Stream;
  40 import jdk.internal.javac.PreviewFeature;
  41 import jdk.internal.misc.InnocuousThread;
  42 import jdk.internal.misc.ThreadFlock;
  43 
  44 /**
  45  * An API for <em>structured concurrency</em>. {@code StructuredTaskScope} supports cases
  46  * where a main task splits into several concurrent subtasks, and where the subtasks must
  47  * complete before the main task continues. A {@code StructuredTaskScope} can be used to
  48  * ensure that the lifetime of a concurrent operation is confined by a <em>syntax block</em>,
  49  * just like that of a sequential operation in structured programming.
  50  *
  51  * <p> {@code StructuredTaskScope} defines the static method {@link #open(Policy) open} to
  52  * open a new {@code StructuredTaskScope} and the {@link #close() close} method to close it.
  53  * The API is designed to be used with the {@code try-with-resources} statement where
  54  * the {@code StructuredTaskScope} is opened as a resource and then closed automatically.
  55  * The code in the block uses the {@link #fork(Callable) fork} method to fork subtasks.
  56  * After forking, it uses the {@link #join() join} method to wait for all subtasks to
  57  * finish (or some other outcome) as a single operation. Forking a subtask starts a new
  58  * {@link Thread} to run the subtask. The thread executing the main task does not continue
  59  * beyond the {@code close} method until all threads started to execute subtasks have finished.
  60  * To ensure correct usage, the {@code fork}, {@code join} and {@code close} methods may
  61  * only be invoked by the <em>owner thread</em> (the thread that opened the {@code
  62  * StructuredTaskScope}), and the {@code close} method throws an exception after closing
  63  * if the owner did not invoke the {@code join} method.
  64  *
  65  * <p> A {@code StructuredTaskScope} is opened with a {@link Policy} that handles subtask
  66  * completion and produces the result returned by the {@link #join() join} method. The
  67  * {@code Policy} interface defines static methods to create a {@code Policy} for common
  68  * cases.
  69  *
  70  * <p> A {@code Policy} may <a id="CancelExecution"><em>cancel execution</em></a>
  71  * (sometimes called "short-circuiting") when some condition is reached that does not
  72  * require the result of subtasks that are still executing. Cancelling execution prevents
  73  * new threads from being started to execute further subtasks, {@linkplain Thread#interrupt()
  74  * interrupts} the threads executing subtasks that have not completed, and causes the
  75  * {@code join} method to wakeup with a result (or exception). The {@link #close() close}
  76  * method always waits for threads executing subtasks to finish, even if execution is
  77  * cancelled, so it cannot continue beyond the {@code close} method until the interrupted
  78  * threads finish. Subtasks should be coded so that they finish as soon as possible when
  79  * interrupted. Subtasks that block on methods that are not interruptible may delay the
  80  * closing of a task scope.
  81  *
  82  * <p> Consider the example of a main task that splits into two subtasks to concurrently
  83  * fetch resources from two URL locations "left" and "right". Both subtasks may complete
  84  * successfully, one subtask may succeed and the other may fail, or both subtasks may
  85  * fail. In this example, the code in the main task is interested in the result from the
  86  * first subtask to complete successfully. The example uses {@link
  87  * Policy#anySuccessfulResultOrThrow() Policy.anySuccessfulResultOrThrow()} to create a
  88  * {@code Policy} that makes available the result of the first subtask to complete
  89  * successfully. The type parameter in the example is "{@code String}" so that only subtasks
  90  * that return a {@code String} can be forked.
  91  * {@snippet lang=java :
  92  *    // @link substring="open" target="#open(Policy)" :
  93  *    try (var scope = StructuredTaskScope.open(Policy.<String>anySuccessfulResultOrThrow())) {
  94  *
  95  *        scope.fork(() -> query(left));  // @link substring="fork" target="#fork(Callable)"
  96  *        scope.fork(() -> query(right));
  97  *
  98  *        // throws if both subtasks fail
  99  *        String firstResult = scope.join();   // @link substring="join" target="#join()"
 100  *
 101       // @link substring="close" target="#close()" :
 102  *    } // close
 103  * }
 104  *
 105  * <p> In the example, the main task forks the two subtasks, then waits in the {@code
 106  * join} method for either subtask to complete successfully or for both subtasks to fail.
 107  * If one of the subtasks completes successfully then the other subtask is cancelled (by
 108  * way of interrupting the thread executing the subtask), and the {@code join} method
 109  * returns the result from the first subtask. Cancelling the other subtask avoids the
 110  * main task waiting for a result that it doesn't care about. If both subtasks fail then
 111  * the {@code join} method throws {@link ExecutionException} with the exception from one
 112  * of the subtasks as the {@linkplain Throwable#getCause() cause}.
 113  *
 114  * <p> Now consider another example that also splits into two subtasks to concurrently
 115  * fetch resources. One of the subtasks returns a {@code String} when it succeeds, the
 116  * other returns an {@code Integer}. The main task in this example is interested in the
 117  * successful result from both subtasks. It uses {@link Policy#ignoreSuccessfulOrThrow()
 118  * Policy.ignoreSuccessfulOrThrow()} to create a {@code Policy} that cancels execution and
 119  * causes {@code join} to throw if any subtask fails.
 120  * {@snippet lang=java :
 121  *    try (var scope = StructuredTaskScope.open(Policy.ignoreSuccessfulOrThrow())) {
 122  *
 123  *        // @link substring="Subtask" target="Subtask" :
 124  *        Subtask<String> subtask1 = scope.fork(() -> query(left));
 125  *        Subtask<Integer> subtask2 = scope.fork(() -> query(right));
 126  *
 127  *        // throws if either subtask fails
 128  *        scope.join();
 129  *
 130  *        // both subtasks completed successfully
 131  *        return new MyResult(subtask1.get(), subtask2.get()); // @link substring="get" target="Subtask#get()"
 132  *
 133  *    }
 134  * }
 135  *
 136  * <p> In this example, the main task forks the two subtasks. The {@code fork} method
 137  * returns a {@link Subtask Subtask} that is a handle to the forked subtask. The main task
 138  * waits in the {@code join} method for both subtasks to complete successfully or for either
 139  * subtask to fail. If both subtasks complete successfully then the {@code join} method
 140  * completes and the main task uses the {@link Subtask#get() Subtask.get()} method to get
 141  * the result of each subtask. If either subtask fails then the other is cancelled (by way
 142  * of interrupting the thread executing the subtask) and the {@code join} throws {@link
 143  * ExecutionException} with the exception from the failed subtask as the {@linkplain
 144  * Throwable#getCause() cause}.
 145  *
 146  * <p> Whether code uses the {@code Subtask} returned from {@code fork} will depend on
 147  * the {@code Policy} and usage. Some {@code Policy} implementations are suited to subtasks
 148  * that return results of the same type and where the {@code join} method returns a result
 149  * for the main task to use. Code that forks subtasks that return results of different
 150  * types, and uses a {@code Policy} such as {@code Policy.ignoreSuccessfulOrThrow()} that
 151  * does not return a result, will use {@link Subtask#get() Subtask.get()} after joining.
 152  *
 153  * <h2>Configuration</h2>
 154  *
 155  * A {@code StructuredTaskScope} is opened with {@linkplain Config configuration} that
 156  * consists of a {@link ThreadFactory} to create threads, an optional name for monitoring
 157  * and management purposes, and an optional timeout.
 158  *
 159  * <p> The 1-arg {@link #open(Policy) open} method creates a {@code StructuredTaskScope}
 160  * with the <a id="DefaultConfiguration"> <em>default configuration</em></a>. The default
 161  * configuration has a {@code ThreadFactory} that creates unnamed
 162  * <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>,
 163  * is unnamed for monitoring and management purposes, and has no timeout.
 164  *
 165  * <p> The 2-arg {@link #open(Policy, Function) open} method can be used to create a
 166  * {@code StructuredTaskScope} that uses a different {@code ThreadFactory}, has a name for
 167  * the purposes of monitoring and management, or has a timeout that cancels execution if
 168  * the timeout expires before or while waiting for subtasks to finish. The {@code open}
 169  * method is called with a {@linkplain Function function} that is applied to the default
 170  * configuration and returns a {@link Config Config} for the {@code StructuredTaskScope}
 171  * under construction.
 172  *
 173  * <p> The following example opens a new {@code StructuredTaskScope} with a {@code
 174  * ThreadFactory} that creates virtual threads {@linkplain Thread#setName(String) named}
 175  * "duke-0", "duke-1" ...
 176  * {@snippet lang=java :
 177  *    // @link substring="name" target="Thread.Builder#name(String, long)" :
 178  *    ThreadFactory factory = Thread.ofVirtual().name("duke-", 0).factory();
 179  *
 180  *    // @link substring="withThreadFactory" target="Config#withThreadFactory(ThreadFactory)" :
 181  *    try (var scope = StructuredTaskScope.open(policy, cf -> cf.withThreadFactory(factory))) {
 182  *
 183  *        scope.fork( .. );   // runs in a virtual thread with name "duke-0"
 184  *        scope.fork( .. );   // runs in a virtual thread with name "duke-1"
 185  *
 186  *        scope.join();
 187  *
 188  *     }
 189  * }
 190  *
 191  * <p> A second example sets a timeout, represented by a {@link Duration}. The timeout
 192  * starts when the new task scope is opened. If the timeout expires before the {@code join}
 193  * method has completed then <a href="#CancelExecution">execution is cancelled</a>. This
 194  * interrupts the threads executing the two subtasks and causes the {@link #join() join}
 195  * method to throw {@link ExecutionException} with {@link TimeoutException} as the cause.
 196  * {@snippet lang=java :
 197  *    Duration timeout = Duration.ofSeconds(10);
 198  *
 199  *    // @link substring="allSuccessfulOrThrow" target="Policy#allSuccessfulOrThrow()" :
 200  *    try (var scope = StructuredTaskScope.open(Policy.<String>allSuccessfulOrThrow(),
 201       // @link substring="withTimeout" target="Config#withTimeout(Duration)" :
 202  *                                              cf -> cf.withTimeout(timeout))) {
 203  *
 204  *        scope.fork(() -> query(left));
 205  *        scope.fork(() -> query(right));
 206  *
 207  *        List<String> result = scope.join()
 208  *                                   .map(Subtask::get)
 209  *                                   .toList();
 210  *
 211  *   }
 212  * }
 213  *
 214  * <h2>Inheritance of scoped value bindings</h2>
 215  *
 216  * {@link ScopedValue} supports the execution of a method with a {@code ScopedValue} bound
 217  * to a value for the bounded period of execution of the method by the <em>current thread</em>.
 218  * It allows a value to be safely and efficiently shared to methods without using method
 219  * parameters.
 220  *
 221  * <p> When used in conjunction with a {@code StructuredTaskScope}, a {@code ScopedValue}
 222  * can also safely and efficiently share a value to methods executed by subtasks forked
 223  * in the task scope. When a {@code ScopedValue} object is bound to a value in the thread
 224  * executing the main task then that binding is inherited by the threads created to
 225  * execute the subtasks. The thread executing the main task does not continue beyond the
 226  * {@link #close() close} method until all threads executing the subtasks have finished.
 227  * This ensures that the {@code ScopedValue} is not reverted to being {@linkplain
 228  * ScopedValue#isBound() unbound} (or its previous value) while subtasks are executing.
 229  * In addition to providing a safe and efficient means to inherit a value into subtasks,
 230  * the inheritance allows sequential code using {@code ScopedValue} be refactored to use
 231  * structured concurrency.
 232  *
 233  * <p> To ensure correctness, opening a new {@code StructuredTaskScope} captures the
 234  * current thread's scoped value bindings. These are the scoped values bindings that are
 235  * inherited by the threads created to execute subtasks in the task scope. Forking a
 236  * subtask checks that the bindings in effect at the time that the subtask is forked
 237  * match the bindings when the {@code StructuredTaskScope} was created. This check ensures
 238  * that a subtask does not inherit a binding that is reverted in the main task before the
 239  * subtask has completed.
 240  *
 241  * <p> A {@code ScopedValue} that is shared across threads requires that the value be an
 242  * immutable object or for all access to the value to be appropriately synchronized.
 243  *
 244  * <p> The following example demonstrates the inheritance of scoped value bindings. The
 245  * scoped value USERNAME is bound to the value "duke" for the bounded period of a lambda
 246  * expression by the thread executing it. The code in the block opens a {@code
 247  * StructuredTaskScope} and forks two subtasks, it then waits in the {@code join} method
 248  * and aggregates the results from both subtasks. If code executed by the threads
 249  * running subtask1 and subtask2 uses {@link ScopedValue#get()}, to get the value of
 250  * USERNAME, then value "duke" will be returned.
 251  * {@snippet lang=java :
 252  *     // @link substring="newInstance" target="ScopedValue#newInstance()" :
 253  *     private static final ScopedValue<String> USERNAME = ScopedValue.newInstance();
 254  *
 255  *     // @link substring="callWhere" target="ScopedValue#callWhere" :
 256  *     Result result = ScopedValue.callWhere(USERNAME, "duke", () -> {
 257  *
 258  *         try (var scope = StructuredTaskScope.open(Policy.ignoreSuccessfulOrThrow())) {
 259  *
 260  *             Subtask<String> subtask1 = scope.fork( .. );    // inherits binding
 261  *             Subtask<Integer> subtask2 = scope.fork( .. );   // inherits binding
 262  *
 263  *             scope.join();
 264  *             return new MyResult(subtask1.get(), subtask2.get());
 265  *         }
 266  *
 267  *     });
 268  * }
 269  *
 270  * <p> A scoped value inherited into a subtask may be
 271  * <a href="{@docRoot}/java.base/java/lang/ScopedValues.html#rebind">rebound</a> to a new
 272  * value in the subtask for the bounded execution of some method executed in the subtask.
 273  * When the method completes, the value of the {@code ScopedValue} reverts to its previous
 274  * value, the value inherited from the thread executing the main task.
 275  *
 276  * <p> A subtask may execute code that itself opens a new {@code StructuredTaskScope}.
 277  * A main task executing in thread T1 opens a {@code StructuredTaskScope} and forks a
 278  * subtask that runs in thread T2. The scoped value bindings captured when T1 opens the
 279  * task scope are inherited into T2. The subtask (in thread T2) executes code that opens a
 280  * new {@code StructuredTaskScope} and forks a subtask that runs in thread T3. The scoped
 281  * value bindings captured when T2 opens the task scope are inherited into T3. These
 282  * include (or may be the same) as the bindings that were inherited from T1. In effect,
 283  * scoped values are inherited into a tree of subtasks, not just one level of subtask.
 284  *
 285  * <h2>Memory consistency effects</h2>
 286  *
 287  * <p> Actions in the owner thread of a {@code StructuredTaskScope} prior to
 288  * {@linkplain #fork forking} of a subtask
 289  * <a href="{@docRoot}/java.base/java/util/concurrent/package-summary.html#MemoryVisibility">
 290  * <i>happen-before</i></a> any actions taken by that subtask, which in turn
 291  * <i>happen-before</i> the subtask result is {@linkplain Subtask#get() retrieved}.
 292  *
 293  * <h2>General exceptions</h2>
 294  *
 295  * <p> Unless otherwise specified, passing a {@code null} argument to a method in this
 296  * class will cause a {@link NullPointerException} to be thrown.
 297  *
 298  * @param <T> the result type of tasks executed in the task scope
 299  * @param <R> the type of the result returned by the join method
 300  *
 301  * @jls 17.4.5 Happens-before Order
 302  * @since 21
 303  */
 304 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 305 public class StructuredTaskScope<T, R> implements AutoCloseable {
 306     private static final VarHandle CANCELLED;
 307     static {
 308         try {
 309             MethodHandles.Lookup l = MethodHandles.lookup();
 310             CANCELLED = l.findVarHandle(StructuredTaskScope.class,"cancelled", boolean.class);
 311         } catch (Exception e) {
 312             throw new ExceptionInInitializerError(e);
 313         }
 314     }
 315 
 316     private final Policy<? super T, ? extends R> policy;
 317     private final ThreadFactory threadFactory;
 318     private final ThreadFlock flock;
 319 
 320     // fields that are only accessed by owner thread
 321     private boolean needToJoin;     // join attempted
 322     private boolean joined;         // join completed
 323     private boolean closed;
 324     private Future<?> timerTask;
 325 
 326     // set or read by any thread
 327     private volatile boolean cancelled;
 328 
 329     // set by the timer thread, read by the owner thread
 330     private volatile boolean timeoutExpired;
 331 
 332     /**
 333      * Represents a subtask forked with {@link #fork(Callable)} or {@link #fork(Runnable)}.
 334      * @param <T> the result type
 335      * @since 21
 336      */
 337     @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 338     public sealed interface Subtask<T> extends Supplier<T> permits SubtaskImpl {
 339         /**
 340          * Represents the state of a subtask.
 341          * @see Subtask#state()
 342          * @since 21
 343          */
 344         @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 345         enum State {
 346             /**
 347              * The subtask result or exception is not available. This state indicates that
 348              * the subtask was forked but has not completed, it completed after execution
 349              * was cancelled, or it was forked after execution was cancelled (in which
 350              * case a thread was not created to execute the subtask).
 351              */
 352             UNAVAILABLE,
 353             /**
 354              * The subtask completed successfully. The {@link Subtask#get() Subtask.get()}
 355              * method can be used to get the result. This is a terminal state.
 356              */
 357             SUCCESS,
 358             /**
 359              * The subtask failed with an exception. The {@link Subtask#exception()
 360              * Subtask.exception()} method can be used to get the exception. This is a
 361              * terminal state.
 362              */
 363             FAILED,
 364         }
 365 
 366         /**
 367          * {@return the subtask state}
 368          */
 369         State state();
 370 
 371         /**
 372          * Returns the result of this subtask if it completed successfully. If
 373          * {@linkplain #fork(Callable) forked} to execute a value-returning task then the
 374          * result from the {@link Callable#call() call} method is returned. If
 375          * {@linkplain #fork(Runnable) forked} to execute a task that does not return a
 376          * result then {@code null} is returned.
 377          *
 378          * <p> Code executing in the scope owner thread can use this method to get the
 379          * result of a successful subtask only after it has {@linkplain #join() joined}.
 380          *
 381          * <p> Code executing in the {@code Policy} {@link Policy#onComplete(Subtask)
 382          * onComplete} method should test that the {@linkplain #state() subtask state} is
 383          * {@link State#SUCCESS SUCCESS} before using this method to get the result.
 384          *
 385          * @return the possibly-null result
 386          * @throws IllegalStateException if the subtask has not completed, did not complete
 387          * successfully, or the current thread is the task scope owner and it has not joined
 388          * @see State#SUCCESS
 389          */
 390         T get();
 391 
 392         /**
 393          * {@return the exception thrown by this subtask if it failed} If
 394          * {@linkplain #fork(Callable) forked} to execute a value-returning task then
 395          * the exception thrown by the {@link Callable#call() call} method is returned.
 396          * If {@linkplain #fork(Runnable) forked} to execute a task that does not return
 397          * a result then the exception thrown by the {@link Runnable#run() run} method is
 398          * returned.
 399          *
 400          * <p> Code executing in the scope owner thread can use this method to get the
 401          * exception thrown by a failed subtask only after it has {@linkplain #join() joined}.
 402          *
 403          * <p> Code executing in a {@code Policy} {@link Policy#onComplete(Subtask)
 404          * onComplete} method should test that the {@linkplain #state() subtask state} is
 405          * {@link State#FAILED FAILED} before using this method to get the exception.
 406          *
 407          * @throws IllegalStateException if the subtask has not completed, completed with
 408          * a result, or the current thread is the task scope owner and it has not joined
 409          * @see State#FAILED
 410          */
 411         Throwable exception();
 412     }
 413 
 414     /**
 415      * An object used with a {@link StructuredTaskScope} to handle subtask completion
 416      * and produce the result for a main task waiting in the {@link #join() join} method
 417      * for subtasks to complete.
 418      *
 419      * <p> Policy defines static methods to create {@code Policy} objects for common cases:
 420      * <ul>
 421      *   <li> {@link #ignoreSuccessfulOrThrow() ignoreSuccessfulOrThrow()} creates a {@code
 422      *   Policy} that ignores all successful subtasks. It cancels execution and causes
 423      *   {@code join} to throw if any subtask fails.
 424      *   <li> {@link #allSuccessfulOrThrow() allSuccessfulOrThrow()} creates a {@code Policy}
 425      *   that yields a stream of the completed subtasks for {@code join} to return when
 426      *   all subtasks complete successfully. It cancels execution and causes {@code join}
 427      *   to throw if any subtask fails.
 428      *   <li> {@link #anySuccessfulResultOrThrow() anySuccessfulResultOrThrow()} creates a
 429      *   {@code Policy} that yields the result of the first subtask to succeed. It cancels
 430      *   execution and causes {@code join} to throw if all subtasks fail.
 431      *   <li> {@link #ignoreAll() ignoreAll()} creates a {@code Policy} that ignores all
 432      *   completed subtasks, even subtasks that fail. The {@code join} method returns null
 433      *   when all subtasks finish.
 434      * </ul>
 435      *
 436      * <p> In addition to the methods to create {@code Policy} objects for common cases,
 437      * the {@link #all(Predicate) all(Predicate)} method is defined to create
 438      * a {@code Policy} that yields a stream of all forked subtasks. It is created with a
 439      * {@link Predicate Predicate} that determines if execution should continue or be
 440      * cancelled. This policy can be built upon to create custom policies that cancel
 441      * execution based on some condition.
 442      *
 443      * <p> More advanced policies can be developed by implementing the {@code Policy}
 444      * interface. The {@link #onFork(Subtask)} method is invoked when subtasks are forked.
 445      * The {@link #onComplete(Subtask)} method is invoked when subtasks complete with a
 446      * result or exception. These methods return a {@code boolean} to indicate if execution
 447      * should be cancelled. These methods can be used to collect subtasks, results, or
 448      * exceptions, and control when to cancel execution. The {@link #result()} method
 449      * must be implemented to produce the result (or exception) for the {@code join}
 450      * method.
 451      *
 452      * <p> Unless otherwise specified, passing a {@code null} argument to a method
 453      * in this class will cause a {@link NullPointerException} to be thrown.
 454      *
 455      * @implSpec Implementations of this interface must be thread safe. The {@link
 456      * #onComplete(Subtask)} method defined by this interface may be invoked by several
 457      * threads concurrently.
 458      *
 459      * @apiNote It is very important that a new {@code Policy} object is created for each
 460      * {@code StructuredTaskScope}. {@code Policy} objects should never be shared with
 461      * different task scopes or re-used after a task is closed.
 462      *
 463      * <p> Designing a {@code Policy} should take into account the code at the use-site
 464      * where the results from the {@link StructuredTaskScope#join() join} method are
 465      * processed. It should be clear what the {@code Policy} does vs. the application
 466      * code at the use-site. In general, the {@code Policy} implementation is not the
 467      * place to code "business logic". A {@code Policy} should be designed to be as
 468      * general purpose as possible.
 469      *
 470      * @param <T> the result type of tasks executed in the task scope
 471      * @param <R> the type of results returned by the join method
 472      * @since 24
 473      * @see #open(Policy)
 474      */
 475     @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 476     @FunctionalInterface
 477     public interface Policy<T, R> {
 478 
 479         /**
 480          * Invoked by {@link #fork(Callable) fork(Callable)} and {@link #fork(Runnable)
 481          * fork(Runnable)} when forking a subtask. The method is invoked from the task
 482          * owner thread. The method is invoked before a thread is created to run the
 483          * subtask.
 484          *
 485          * @implSpec The default implementation throws {@code NullPointerException} if the
 486          * subtask is {@code null}. It throws {@code IllegalArgumentException} if the
 487          * subtask is not in the {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state, it
 488          * otherwise returns {@code false}.
 489          * 
 490          * @param subtask the subtask
 491          * @return {@code true} to cancel execution
 492          */
 493         default boolean onFork(Subtask<? extends T> subtask) {
 494             if (subtask.state() != Subtask.State.UNAVAILABLE) {
 495                 throw new IllegalArgumentException();
 496             }
 497             return false;
 498         }
 499 
 500         /**
 501          * Invoked by the thread started to execute a subtask after the subtask completes
 502          * successfully or fails with an exception. This method is not invoked if a
 503          * subtask completes after execution has been cancelled.
 504          *
 505          * @implSpec The default implementation throws {@code NullPointerException} if the
 506          * subtask is {@code null}. It throws {@code IllegalArgumentException} if the
 507          * subtask is not in the {@link Subtask.State#SUCCESS SUCCESS} or {@link
 508          * Subtask.State#FAILED FAILED} state, it otherwise returns {@code false}.
 509          * 
 510          * @param subtask the subtask
 511          * @return {@code true} to cancel execution
 512          */
 513         default boolean onComplete(Subtask<? extends T> subtask) {
 514             if (subtask.state() == Subtask.State.UNAVAILABLE) {
 515                 throw new IllegalArgumentException();
 516             }
 517             return false;
 518         }
 519 
 520         /**
 521          * Invoked by {@link #join()} to produce the result (or exception) after waiting
 522          * for all subtasks to complete or execution to be cancelled. The result from this
 523          * method is returned by the {@code join} method. If this method throws, then
 524          * {@code join} throws {@link ExecutionException} with the exception thrown by
 525          * this method as the cause.
 526          *
 527          * <p> In normal usage, this method will be called at most once to produce the
 528          * result (or exception). If the {@code join} method is called more than once
 529          * then this method may be called more than once to produce the result. An
 530          * implementation should return an equal result (or throw the same exception) on
 531          * second or subsequent calls to produce the outcome.
 532          *
 533          * @return the result
 534          * @throws Throwable the exception
 535          */
 536         R result() throws Throwable;
 537 
 538         /**
 539          * {@return a new policy object that ignores all successful subtasks. It
 540          * <a href="StructuredTaskScope.html#CancelExecution">cancels execution</a> if
 541          * any subtask fails}
 542          *
 543          * The policy's {@link Policy#result() result} method returns {@code null} if
 544          * all subtasks complete successfully, or throws the exception from the first
 545          * subtask to fail.
 546          *
 547          * @apiNote This policy is intended for cases where the results for all subtasks
 548          * are required ("invoke all"), and where the code {@linkplain #fork(Callable) forking}
 549          * subtasks keeps a reference to the {@linkplain Subtask Subtask} objects. A
 550          * typical usage will be when subtasks return results of different types.
 551          *
 552          * @param <T> the result type of subtasks
 553          */
 554         static <T> Policy<T, Void> ignoreSuccessfulOrThrow() {
 555             return new ThrowIfFailed<>();
 556         }
 557 
 558         /**
 559          * {@return a new policy object that yields a stream of all forked subtasks
 560          * when all subtasks complete successfully, or throws if any subtask fails}
 561          * If any subtask fails then execution is cancelled.
 562          *
 563          * <p> If all subtasks complete successfully, the policy's {@link Policy#result()}
 564          * method returns a stream of all forked subtasks in the order that they were forked.
 565          * If any subtask failed then the {@code result} method throws the exception from
 566          * the first subtask to fail.
 567          *
 568          * @apiNote This policy is intended for cases where the results for all subtasks
 569          * are required ("invoke all"); if any subtask fails then the results of other
 570          * unfinished subtasks are no longer needed. A typical usage will be when the
 571          * subtasks return results of the same type, the returned stream of forked
 572          * subtasks can be used to get the results.
 573          *
 574          * @param <T> the result type of subtasks
 575          */
 576         static <T> Policy<T, Stream<Subtask<T>>> allSuccessfulOrThrow() {
 577             return new AllSuccessful<>();
 578         }
 579 
 580         /**
 581          * {@return a new policy object that yields the result of a subtask that completed
 582          * successfully, or throws if all subtasks fail} If any subtask completes
 583          * successfully then execution is cancelled.
 584          *
 585          * <p> The policy's {@link Policy#result()} method returns the result of a subtask
 586          * that completed successfully. If all subtasks fail then the {@code result} method
 587          * throws the exception from one of the failed subtasks. The {@code result} method
 588          * throws {@code NoSuchElementException} if no subtasks were forked.
 589          *
 590          * @apiNote This policy is intended for cases where the result of any subtask will
 591          * do ("invoke any") and where the results of other unfinished subtasks are no
 592          * longer needed.
 593          *
 594          * @param <T> the result type of subtasks
 595          */
 596         static <T> Policy<T, T> anySuccessfulResultOrThrow() {
 597             return new AnySuccessful<>();
 598         }
 599 
 600         /**
 601          * {@return a new policy object that ignores all completed subtasks, even subtasks
 602          * that fail} The policy's {@link Policy#result() result} method returns {@code null}.
 603          *
 604          * @apiNote This policy is intended for cases where subtasks make use of
 605          * <em>side-effects</em> rather than return results or fail with exceptions.
 606          * The {@link #fork(Runnable) fork(Runnable)} method can be used to fork subtasks
 607          * that do not return a result.
 608          *
 609          * @param <T> the result type of subtasks
 610          */
 611         static <T> Policy<T, Void> ignoreAll() {
 612             // ensure that new Policy object is returned
 613             return new Policy<T, Void>() {
 614                 @Override
 615                 public Void result() {
 616                     return null;
 617                 }
 618             };
 619         }
 620 
 621         /**
 622          * {@return a new policy object that yields a stream of all forked subtasks,
 623          * cancelling execution when evaluating a completed subtask with the given
 624          * predicate returns true}
 625          *
 626          * <p> The policy's {@link Policy#onComplete(Subtask)} method invokes the
 627          * predicate's {@link Predicate#test(Object) test} method with the subtask that
 628          * completed successfully or failed with an exception. If the {@code test} method
 629          * returns {@code true} then <a href="StructuredTaskScope.html#CancelExecution">
 630          * execution is cancelled</a>. The {@code test} method must be thread safe as it
 631          * may be invoked concurrently from several threads.
 632          *
 633          * <p> The policy's {@link #result()} method returns the stream of all forked
 634          * subtasks, in fork order. The stream may contain subtasks that have completed
 635          * (in {@link Subtask.State#SUCCESS SUCCESS} or {@link Subtask.State#FAILED FAILED}
 636          * state) or subtasks in the {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state
 637          * if execution was cancelled before all subtasks were forked or completed.
 638          *
 639          * <p> The following example uses {@code all} to create a {@code Policy} that
 640          * <a href="StructuredTaskScope.html#CancelExecution">cancels execution</a> when
 641          * two or more subtasks fail.
 642          * {@snippet lang=java :
 643          *    class AtMostTwoFailures<T> implements Predicate<Subtask<? extends T>> {
 644          *         private final AtomicInteger failedCount = new AtomicInteger();
 645          *         @Override
 646          *         public boolean test(Subtask<? extends T> subtask) {
 647          *             return subtask.state() == Subtask.State.FAILED
 648          *                     && failedCount.incrementAndGet() >= 2;
 649          *         }
 650          *     }
 651          *
 652          *     var policy = Policy.all(new AtMostTwoFailures<String>());
 653          * }
 654          *
 655          * @param isDone the predicate to evaluate completed subtasks
 656          * @param <T> the result type of subtasks
 657          */
 658         static <T> Policy<T, Stream<Subtask<T>>> all(Predicate<Subtask<? extends T>> isDone) {
 659             return new AllForked<>(isDone);
 660         }
 661     }
 662 
 663     /**
 664      * Represents the configuration for a {@code StructuredTaskScope}.
 665      *
 666      * <p> The configuration for a {@code StructuredTaskScope} consists of a {@link
 667      * ThreadFactory} to create threads, an optional name for the purposes of monitoring
 668      * and management, and an optional timeout.
 669      *
 670      * <p> Creating a {@code StructuredTaskScope} with its 1-arg {@link #open(Policy) open}
 671      * method uses the <a href="StructuredTaskScope.html#DefaultConfiguration">default
 672      * configuration</a>. The default configuration consists of a thread factory that
 673      * creates unnamed <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">
 674      * virtual threads</a>, no name for monitoring and management purposes, and no timeout.
 675      *
 676      * <p> Creating a {@code StructuredTaskScope} with its 2-arg {@link #open(Policy, Function)
 677      * open} method allows a different configuration to be used. The function specified
 678      * to the {@code open} method is applied to the default configuration and returns the
 679      * configuration for the {@code StructuredTaskScope} under construction. The function
 680      * can use the {@code with-} prefixed methods defined here to specify the components
 681      * of the configuration to use.
 682      *
 683      * <p> Unless otherwise specified, passing a {@code null} argument to a method
 684      * in this class will cause a {@link NullPointerException} to be thrown.
 685      *
 686      * @since 24
 687      */
 688     @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 689     public sealed interface Config permits ConfigImpl {
 690         /**
 691          * {@return a new {@code Config} object with the given thread factory}
 692          * The other components are the same as this object. The thread factory is used by
 693          * a task scope to create threads when {@linkplain #fork(Callable) forking} subtasks.
 694          * @param threadFactory the thread factory
 695          *
 696          * @apiNote The thread factory will typically create
 697          * <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>,
 698          * maybe with names for monitoring purposes, an {@linkplain Thread.UncaughtExceptionHandler
 699          * uncaught exception handler}, or other properties configured.
 700          *
 701          * @see #fork(Callable)
 702          */
 703         Config withThreadFactory(ThreadFactory threadFactory);
 704 
 705         /**
 706          * {@return a new {@code Config} object with the given name}
 707          * The other components are the same as this object. A task scope is optionally
 708          * named for the purposes of monitoring and management.
 709          * @param name the name
 710          * @see StructuredTaskScope#toString()
 711          */
 712         Config withName(String name);
 713 
 714         /**
 715          * {@return a new {@code Config} object with the given timeout}
 716          * The other components are the same as this object.
 717          * @param timeout the timeout
 718          *
 719          * @apiNote Applications using deadlines, expressed as an {@link java.time.Instant},
 720          * can use {@link Duration#between Duration.between(Instant.now(), deadline)} to
 721          * compute the timeout for this method.
 722          *
 723          * @see #join()
 724          */
 725         Config withTimeout(Duration timeout);
 726     }
 727 
 728     /**
 729      * Initialize a new StructuredTaskScope.
 730      */
 731     @SuppressWarnings("this-escape")
 732     private StructuredTaskScope(Policy<? super T, ? extends R> policy,
 733                                 ThreadFactory threadFactory,
 734                                 String name) {
 735         this.policy = policy;
 736         this.threadFactory = threadFactory;
 737 
 738         if (name == null)
 739             name = Objects.toIdentityString(this);
 740         this.flock = ThreadFlock.open(name);
 741     }
 742 
 743     /**
 744      * Opens a new structured task scope to use the given policy object plus configuration
 745      * that is the result of applying the given function to the
 746      * <a href="#DefaultConfiguration">default configuration</a>.
 747      *
 748      * <p> The {@code configFunction} is called with the default configuration and returns
 749      * the configuration for the new structured task scope. The function may, for example,
 750      * set the {@linkplain Config#withThreadFactory(ThreadFactory) ThreadFactory} or set
 751      * a {@linkplain Config#withTimeout(Duration) timeout}.
 752      *
 753      * <p> If a {@linkplain Config#withThreadFactory(ThreadFactory) ThreadFactory} is set
 754      * then the {@code ThreadFactory}'s {@link ThreadFactory#newThread(Runnable) newThread}
 755      * method will be used to create threads when forking subtasks in this task scope.
 756      *
 757      * <p> If a {@linkplain Config#withTimeout(Duration) timeout} is set then it starts
 758      * when the task scope is opened. If the timeout expires before the task scope has
 759      * {@linkplain #join() joined} then execution is cancelled and the {@code join} method
 760      * throws {@link ExecutionException} with {@link TimeoutException} as the cause.
 761      *
 762      * <p> The new task scope is owned by the current thread. Only code executing in this
 763      * thread can {@linkplain #fork(Callable) fork}, {@linkplain #join() join}, or
 764      * {@linkplain #close close} the task scope.
 765      *
 766      * <p> Construction captures the current thread's {@linkplain ScopedValue scoped
 767      * value} bindings for inheritance by threads started in the task scope.
 768      *
 769      * @param policy the policy
 770      * @param configFunction a function to produce the configuration
 771      * @return a new task scope
 772      * @param <T> the result type of tasks executed in the task scope
 773      * @param <R> the type of the result returned by the join method
 774      * @since 24
 775      */
 776     public static <T, R> StructuredTaskScope<T, R> open(Policy<? super T, ? extends R> policy,
 777                                                         Function<Config, Config> configFunction) {
 778         Objects.requireNonNull(policy);
 779 
 780         var config = (ConfigImpl) configFunction.apply(ConfigImpl.defaultConfig());
 781         var scope = new StructuredTaskScope<T, R>(policy, config.threadFactory(), config.name());
 782 
 783         // schedule timeout
 784         Duration timeout = config.timeout();
 785         if (timeout != null) {
 786             boolean done = false;
 787             try {
 788                 scope.scheduleTimeout(timeout);
 789                 done = true;
 790             } finally {
 791                 if (!done) {
 792                     scope.close();  // pop if scheduling timeout failed
 793                 }
 794             }
 795         }
 796 
 797         return scope;
 798     }
 799 
 800     /**
 801      * Opens a new structured task scope to use the given policy. The task scope is
 802      * created with the <a href="#DefaultConfiguration">default configuration</a>.
 803      * The default configuration has a {@code ThreadFactory} that creates unnamed
 804      * <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>,
 805      * is unnamed for monitoring and management purposes, and has no timeout.
 806      *
 807      * @implSpec
 808      * This factory method is equivalent to invoking the 2-arg open method with the given
 809      * policy and the {@linkplain Function#identity() identity function}.
 810      *
 811      * @param policy the policy
 812      * @return a new task scope
 813      * @param <T> the result type of tasks executed in the task scope
 814      * @param <R> the type of the result returned by the join method
 815      * @since 24
 816      */
 817     public static <T, R> StructuredTaskScope<T, R> open(Policy<? super T, ? extends R> policy) {
 818         return open(policy, Function.identity());
 819     }
 820 
 821     private void ensureOwner() {
 822         if (Thread.currentThread() != flock.owner()) {
 823             throw new WrongThreadException("Current thread not owner");
 824         }
 825     }
 826 
 827     private void ensureOpen() {
 828         if (closed) {
 829             throw new IllegalStateException("Task scope is closed");
 830         }
 831     }
 832 
 833     private void ensureJoinedIfOwner() {
 834         if (Thread.currentThread() == flock.owner() && !joined) {
 835             String msg = needToJoin ? "Owner did not join" : "join did not complete";
 836             throw new IllegalStateException(msg);
 837         }
 838     }
 839 
 840     /**
 841      * Interrupts all threads in this task scope, except the current thread.
 842      */
 843     private void implInterruptAll() {
 844         flock.threads()
 845                 .filter(t -> t != Thread.currentThread())
 846                 .forEach(t -> {
 847                     try {
 848                         t.interrupt();
 849                     } catch (Throwable ignore) { }
 850                 });
 851     }
 852 
 853     @SuppressWarnings("removal")
 854     private void interruptAll() {
 855         if (System.getSecurityManager() == null) {
 856             implInterruptAll();
 857         } else {
 858             PrivilegedAction<Void> pa = () -> {
 859                 implInterruptAll();
 860                 return null;
 861             };
 862             AccessController.doPrivileged(pa);
 863         }
 864     }
 865 
 866     /**
 867      * Cancel exception.
 868      */
 869     private void cancelExecution() {
 870         if (!cancelled && CANCELLED.compareAndSet(this, false, true)) {
 871             // prevent new threads from starting
 872             flock.shutdown();
 873 
 874             // interrupt all unfinished threads
 875             interruptAll();
 876 
 877             // wakeup join
 878             flock.wakeup();
 879         }
 880     }
 881 
 882     /**
 883      * Schedules a task to cancel execution on timeout.
 884      */
 885     private void scheduleTimeout(Duration timeout) {
 886         assert Thread.currentThread() == flock.owner() && timerTask == null;
 887         timerTask = TimerSupport.schedule(timeout, () -> {
 888             if (!cancelled) {
 889                 timeoutExpired = true;
 890                 cancelExecution();
 891             }
 892         });
 893     }
 894 
 895     /**
 896      * Cancels the timer task if set.
 897      */
 898     private void cancelTimeout() {
 899         assert Thread.currentThread() == flock.owner();
 900         if (timerTask != null) {
 901             timerTask.cancel(false);
 902         }
 903     }
 904 
 905     /**
 906      * Invoked when the given subtask completes before execution was cancelled.
 907      */
 908     private void onComplete(SubtaskImpl<? extends T> subtask) {
 909         assert subtask.state() != Subtask.State.UNAVAILABLE;
 910         if (policy.onComplete(subtask)) {
 911             cancelExecution();
 912         }
 913     }
 914 
 915     /**
 916      * Starts a new thread in this task scope to execute a value-returning task, thus
 917      * creating a <em>subtask</em>. The value-returning task is provided to this method
 918      * as a {@link Callable}, the thread executes the task's {@link Callable#call() call}
 919      * method.
 920      *
 921      * <p> This method first creates a {@link Subtask Subtask} to represent the <em>forked
 922      * subtask</em>. It invokes the policy's {@link Policy#onFork(Subtask) onFork} method
 923      * with the {@code Subtask} object. If the {@code onFork} completes with an exception
 924      * or error then it is propagated by the {@code fork} method. If execution is
 925      * {@linkplain #isCancelled() cancelled}, or {@code onFork} returns {@code true} to
 926      * cancel execution, then this method returns the {@code Subtask} (in the {@link
 927      * Subtask.State#UNAVAILABLE UNAVAILABLE} state) without creating a thread to execute
 928      * the subtask. If execution is not cancelled then a thread is created with the
 929      * {@link ThreadFactory} configured when the task scope was created, and the thread is
 930      * started. Forking a subtask inherits the current thread's {@linkplain ScopedValue
 931      * scoped value} bindings. The bindings must match the bindings captured when the
 932      * task scope was opened. If the subtask completes (successfully or with an exception)
 933      * before execution is cancelled, then the thread invokes the policy's
 934      * {@link Policy#onComplete(Subtask) onComplete} method with subtask in the
 935      * {@link Subtask.State#SUCCESS SUCCESS} or {@link Subtask.State#FAILED FAILED} state.
 936      *
 937      * <p> This method returns the {@link Subtask Subtask} object. In some usages, this
 938      * object may be used to get its result. In other cases it may be used for correlation
 939      * or just discarded. To ensure correct usage, the {@link Subtask#get() Subtask.get()}
 940      * method may only be called by the task scope owner to get the result after it has
 941      * waited for all threads to finish with the {@link #join() join} method and the subtask
 942      * completed successfully. Similarly, the {@link Subtask#exception() Subtask.exception()}
 943      * method may only be called by the task scope owner after it has joined and the subtask
 944      * failed. If execution was cancelled before the subtask was forked, or before it
 945      * completes, then neither method can be used to obtain the outcome.
 946      *
 947      * <p> This method may only be invoked by the task scope owner.
 948      *
 949      * @param task the value-returning task for the thread to execute
 950      * @param <U> the result type
 951      * @return the subtask
 952      * @throws IllegalStateException if this task scope is closed or the owner has already
 953      * joined
 954      * @throws WrongThreadException if the current thread is not the task scope owner
 955      * @throws StructureViolationException if the current scoped value bindings are not
 956      * the same as when the task scope was created
 957      * @throws RejectedExecutionException if the thread factory rejected creating a
 958      * thread to run the subtask
 959      */
 960     public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
 961         Objects.requireNonNull(task);
 962         ensureOwner();
 963         ensureOpen();
 964         if (joined) {
 965             throw new IllegalStateException("Already joined");
 966         }
 967 
 968         var subtask = new SubtaskImpl<U>(this, task);
 969 
 970         // notify policy, even if cancelled
 971         if (policy.onFork(subtask)) {
 972             cancelExecution();
 973         }
 974 
 975         if (!cancelled) {
 976             // create thread to run task
 977             Thread thread = threadFactory.newThread(subtask);
 978             if (thread == null) {
 979                 throw new RejectedExecutionException("Rejected by thread factory");
 980             }
 981 
 982             // attempt to start the thread
 983             try {
 984                 flock.start(thread);
 985             } catch (IllegalStateException e) {
 986                 // shutdown by another thread, or underlying flock is shutdown due
 987                 // to unstructured use
 988             }
 989         }
 990 
 991         needToJoin = true;
 992         return subtask;
 993     }
 994 
 995     /**
 996      * Starts a new thread in this task scope to execute a task that does not return a
 997      * result, creating a <em>subtask</em>.
 998      *
 999      * <p> This method works exactly the same as {@link #fork(Callable)} except that
1000      * the task is provided to this method as a {@link Runnable}, the thread executes
1001      * the task's {@link Runnable#run() run} method, and its result is {@code null}.
1002      *
1003      * @param task the task for the thread to execute
1004      * @return the subtask
1005      * @throws IllegalStateException if this task scope is closed or the owner has already
1006      * joined
1007      * @throws WrongThreadException if the current thread is not the task scope owner
1008      * @throws StructureViolationException if the current scoped value bindings are not
1009      * the same as when the task scope was created
1010      * @throws RejectedExecutionException if the thread factory rejected creating a
1011      * thread to run the subtask
1012      * @since 24
1013      */
1014     public Subtask<? extends T> fork(Runnable task) {
1015         Objects.requireNonNull(task);
1016         return fork(() -> { task.run(); return null; });
1017     }
1018 
1019     /**
1020      * Waits for all subtasks started in this task scope to finish or execution to be
1021      * cancelled. If a {@linkplain  Config#withTimeout(Duration) timeout} has been set
1022      * then execution will be cancelled if the timeout expires before or while waiting.
1023      * Once finished waiting, the {@code Policy}'s {@link Policy#result() result}
1024      * method is invoked to get the result or throw an exception. If the {@code result}
1025      * method throws then this method throws {@code ExecutionException} with the policy's
1026      * exception as the cause.
1027      *
1028      * <p> This method waits for all subtasks by waiting for all threads {@linkplain
1029      * #fork(Callable) started} in this task scope to finish execution. It stops waiting
1030      * when all threads finish, the {@code Policy}'s {@link Policy#onFork(Subtask) onFork}
1031      * or {@link Policy#onComplete(Subtask) onComplete} returns {@code true} to cancel
1032      * execution, the timeout (if set) expires, or the current thread is {@linkplain
1033      * Thread#interrupt() interrupted}.
1034      *
1035      * <p> This method may only be invoked by the task scope owner.
1036      *
1037      * @return the {@link Policy#result() result}
1038      * @throws IllegalStateException if this task scope is closed
1039      * @throws WrongThreadException if the current thread is not the task scope owner
1040      * @throws ExecutionException if the policy's {@code result} method throws, or with
1041      * cause {@link TimeoutException} if a timeout is set and the timeout expires
1042      * @throws InterruptedException if interrupted while waiting
1043      * @since 24
1044      */
1045     public R join() throws ExecutionException, InterruptedException {
1046         ensureOwner();
1047         ensureOpen();
1048 
1049         if (!joined) {
1050             // owner has attempted to join
1051             needToJoin = false;
1052 
1053             // wait for all threads, execution to be cancelled, or interrupt
1054             flock.awaitAll();
1055 
1056             // throw if timeout expired while waiting
1057             if (timeoutExpired) {
1058                 throw new ExecutionException(new TimeoutException());
1059             }
1060 
1061             // join completed successfully
1062             cancelTimeout();
1063             joined = true;
1064         }
1065 
1066         // invoke policy to get result
1067         try {
1068             return policy.result();
1069         } catch (Throwable e) {
1070             throw new ExecutionException(e);
1071         }
1072     }
1073 
1074     /**
1075      * {@return {@code true} if <a href="#CancelExecution">execution is cancelled</a>,
1076      * or in the process of being cancelled, otherwise {@code false}}
1077      *
1078      * <p> Cancelling execution prevents new threads from starting in the task scope and
1079      * {@linkplain Thread#interrupt() interrupts} threads executing unfinished subtasks.
1080      * It may take some time before the interrupted threads finish execution; this
1081      * method may return {@code true} before all threads have been interrupted or before
1082      * all threads have finished.
1083      *
1084      * @apiNote A main task with a lengthy "forking phase" (the code that executes before
1085      * the main task invokes {@link #join() join}) may use this method to avoid doing work
1086      * in cases where execution was cancelled by the completion of a previously forked
1087      * subtask or timeout.
1088      *
1089      * @since 24
1090      */
1091     public boolean isCancelled() {
1092         return cancelled;
1093     }
1094 
1095     /**
1096      * Closes this task scope.
1097      *
1098      * <p> This method first <a href="#CancelExecution">cancels execution</a>, if not
1099      * already cancelled. This interrupts the threads executing unfinished subtasks. This
1100      * method then waits for all threads to finish. If interrupted while waiting then it
1101      * will continue to wait until the threads finish, before completing with the interrupt
1102      * status set.
1103      *
1104      * <p> This method may only be invoked by the task scope owner. If the task scope
1105      * is already closed then the task scope owner invoking this method has no effect.
1106      *
1107      * <p> A {@code StructuredTaskScope} is intended to be used in a <em>structured
1108      * manner</em>. If this method is called to close a task scope before nested task
1109      * scopes are closed then it closes the underlying construct of each nested task scope
1110      * (in the reverse order that they were created in), closes this task scope, and then
1111      * throws {@link StructureViolationException}.
1112      * Similarly, if this method is called to close a task scope while executing with
1113      * {@linkplain ScopedValue scoped value} bindings, and the task scope was created
1114      * before the scoped values were bound, then {@code StructureViolationException} is
1115      * thrown after closing the task scope.
1116      * If a thread terminates without first closing task scopes that it owns then
1117      * termination will cause the underlying construct of each of its open tasks scopes to
1118      * be closed. Closing is performed in the reverse order that the task scopes were
1119      * created in. Thread termination may therefore be delayed when the task scope owner
1120      * has to wait for threads forked in these task scopes to finish.
1121      *
1122      * @throws IllegalStateException thrown after closing the task scope if the task scope
1123      * owner did not attempt to join after forking
1124      * @throws WrongThreadException if the current thread is not the task scope owner
1125      * @throws StructureViolationException if a structure violation was detected
1126      */
1127     @Override
1128     public void close() {
1129         ensureOwner();
1130         if (closed) {
1131             return;
1132         }
1133 
1134         // cancel execution if not already joined
1135         if (!joined) {
1136             cancelExecution();
1137             cancelTimeout();
1138         }
1139 
1140         // wait for stragglers to finish
1141         try {
1142             flock.close();
1143         } finally {
1144             closed = true;
1145         }
1146 
1147         // throw ISE if the owner didn't attempt to join after forking
1148         if (needToJoin) {
1149             needToJoin = false;
1150             throw new IllegalStateException("Owner did not join");
1151         }
1152     }
1153 
1154     /**
1155      * {@inheritDoc}  If a {@link Config#withName(String) name} for monitoring and
1156      * monitoring purposes has been set then the string representation includes the name.
1157      */
1158     @Override
1159     public String toString() {
1160         return flock.name();
1161     }
1162 
1163     /**
1164      * Subtask implementation, runs the task specified to the fork method.
1165      */
1166     private static final class SubtaskImpl<T> implements Subtask<T>, Runnable {
1167         private static final AltResult RESULT_NULL = new AltResult(Subtask.State.SUCCESS);
1168 
1169         private record AltResult(Subtask.State state, Throwable exception) {
1170             AltResult(Subtask.State state) {
1171                 this(state, null);
1172             }
1173         }
1174 
1175         private final StructuredTaskScope<? super T, ?> scope;
1176         private final Callable<? extends T> task;
1177         private volatile Object result;
1178 
1179         SubtaskImpl(StructuredTaskScope<? super T, ?> scope, Callable<? extends T> task) {
1180             this.scope = scope;
1181             this.task = task;
1182         }
1183 
1184         @Override
1185         public void run() {
1186             T result = null;
1187             Throwable ex = null;
1188             try {
1189                 result = task.call();
1190             } catch (Throwable e) {
1191                 ex = e;
1192             }
1193 
1194             // nothing to do if task scope is cancelled
1195             if (scope.isCancelled())
1196                 return;
1197 
1198             // set result/exception and invoke onComplete
1199             if (ex == null) {
1200                 this.result = (result != null) ? result : RESULT_NULL;
1201             } else {
1202                 this.result = new AltResult(State.FAILED, ex);
1203             }
1204             scope.onComplete(this);
1205         }
1206 
1207         @Override
1208         public Subtask.State state() {
1209             Object result = this.result;
1210             if (result == null) {
1211                 return State.UNAVAILABLE;
1212             } else if (result instanceof AltResult alt) {
1213                 // null or failed
1214                 return alt.state();
1215             } else {
1216                 return State.SUCCESS;
1217             }
1218         }
1219 
1220 
1221         @Override
1222         public T get() {
1223             scope.ensureJoinedIfOwner();
1224             Object result = this.result;
1225             if (result instanceof AltResult) {
1226                 if (result == RESULT_NULL) return null;
1227             } else if (result != null) {
1228                 @SuppressWarnings("unchecked")
1229                 T r = (T) result;
1230                 return r;
1231             }
1232             throw new IllegalStateException(
1233                     "Result is unavailable or subtask did not complete successfully");
1234         }
1235 
1236         @Override
1237         public Throwable exception() {
1238             scope.ensureJoinedIfOwner();
1239             Object result = this.result;
1240             if (result instanceof AltResult alt && alt.state() == State.FAILED) {
1241                 return alt.exception();
1242             }
1243             throw new IllegalStateException(
1244                     "Exception is unavailable or subtask did not complete with exception");
1245         }
1246 
1247         @Override
1248         public String toString() {
1249             String stateAsString = switch (state()) {
1250                 case UNAVAILABLE -> "[Unavailable]";
1251                 case SUCCESS     -> "[Completed successfully]";
1252                 case FAILED      -> {
1253                     Throwable ex = ((AltResult) result).exception();
1254                     yield "[Failed: " + ex + "]";
1255                 }
1256             };
1257             return Objects.toIdentityString(this) + stateAsString;
1258         }
1259     }
1260 
1261     /**
1262      * A policy that cancels execution if a subtask fails.
1263      */
1264     private static final class ThrowIfFailed<T> implements Policy<T, Void> {
1265         private static final VarHandle FIRST_EXCEPTION;
1266         static {
1267             try {
1268                 MethodHandles.Lookup l = MethodHandles.lookup();
1269                 FIRST_EXCEPTION = l.findVarHandle(ThrowIfFailed.class, "firstException", Throwable.class);
1270             } catch (Exception e) {
1271                 throw new ExceptionInInitializerError(e);
1272             }
1273         }
1274         private volatile Throwable firstException;
1275 
1276         @Override
1277         public boolean onComplete(Subtask<? extends T> subtask) {
1278             return (subtask.state() == Subtask.State.FAILED)
1279                     && (firstException == null)
1280                     && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
1281         }
1282 
1283         @Override
1284         public Void result() throws Throwable {
1285             Throwable ex = firstException;
1286             if (ex != null) {
1287                 throw ex;
1288             } else {
1289                 return null;
1290             }
1291         }
1292     }
1293 
1294     /**
1295      * A policy that returns a stream of all forked subtasks when all subtasks
1296      * complete successfully. If any subtask fails then execution is cancelled.
1297      */
1298     private static final class AllSuccessful<T> implements Policy<T, Stream<Subtask<T>>> {
1299         private static final VarHandle FIRST_EXCEPTION;
1300         static {
1301             try {
1302                 MethodHandles.Lookup l = MethodHandles.lookup();
1303                 FIRST_EXCEPTION = l.findVarHandle(AllSuccessful.class, "firstException", Throwable.class);
1304             } catch (Exception e) {
1305                 throw new ExceptionInInitializerError(e);
1306             }
1307         }
1308         // list of forked subtasks, only accessed by owner thread
1309         private final List<Subtask<T>> subtasks = new ArrayList<>();
1310         private volatile Throwable firstException;
1311         
1312         @Override
1313         public boolean onFork(Subtask<? extends T> subtask) {
1314             @SuppressWarnings("unchecked")
1315             var tmp = (Subtask<T>) subtask;
1316             subtasks.add(tmp);
1317             return false;
1318         }
1319 
1320         @Override
1321         public boolean onComplete(Subtask<? extends T> subtask) {
1322             return (subtask.state() == Subtask.State.FAILED)
1323                     && (firstException == null)
1324                     && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
1325         }
1326 
1327         @Override
1328         public Stream<Subtask<T>> result() throws Throwable {
1329             Throwable ex = firstException;
1330             if (ex != null) {
1331                 throw ex;
1332             } else {
1333                 return subtasks.stream();
1334             }
1335         }
1336     }
1337 
1338     /**
1339      * A policy that returns the result of the first subtask to complete successfully.
1340      * If any subtask completes successfully then execution is cancelled.
1341      */
1342     private static final class AnySuccessful<T> implements Policy<T, T> {
1343         private static final VarHandle FIRST_SUCCESS;
1344         private static final VarHandle FIRST_EXCEPTION;
1345         static {
1346             try {
1347                 MethodHandles.Lookup l = MethodHandles.lookup();
1348                 FIRST_SUCCESS = l.findVarHandle(AnySuccessful.class, "firstSuccess", Subtask.class);
1349                 FIRST_EXCEPTION = l.findVarHandle(AnySuccessful.class, "firstException", Throwable.class);
1350             } catch (Exception e) {
1351                 throw new ExceptionInInitializerError(e);
1352             }
1353         }
1354         private volatile Subtask<T> firstSuccess;
1355         private volatile Throwable firstException;
1356 
1357         @Override
1358         public boolean onComplete(Subtask<? extends T> subtask) {
1359             if (firstSuccess != null) {
1360                 // already captured a successful subtask
1361                 return false;
1362             }
1363 
1364             if (subtask.state() == Subtask.State.SUCCESS) {
1365                 // task completed with a result
1366                 return FIRST_SUCCESS.compareAndSet(this, null, subtask);
1367             } else if (firstException == null) {
1368                 // capture the exception thrown by the first task that failed
1369                 FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
1370             }
1371 
1372             return false;
1373         }
1374 
1375         @Override
1376         public T result() throws Throwable {
1377             Subtask<T> firstSuccess = this.firstSuccess;
1378             if (firstSuccess != null) {
1379                 return firstSuccess.get();
1380             }
1381             Throwable firstException = this.firstException;
1382             if (firstException != null) {
1383                 throw firstException;
1384             } else {
1385                 throw new NoSuchElementException("No subtasks completed");
1386             }
1387         }
1388     }
1389 
1390     /**
1391      * A policy that returns a stream of all forked subtasks.
1392      */
1393     private static class AllForked<T> implements Policy<T, Stream<Subtask<T>>> {
1394         private final Predicate<Subtask<? extends T>> isDone;
1395         // list of forked subtasks, only accessed by owner thread
1396         private final List<Subtask<T>> subtasks = new ArrayList<>();
1397 
1398         AllForked(Predicate<Subtask<? extends T>> isDone) {
1399             this.isDone = Objects.requireNonNull(isDone);
1400         }
1401 
1402         @Override
1403         public boolean onFork(Subtask<? extends T> subtask) {
1404             @SuppressWarnings("unchecked")
1405             var tmp = (Subtask<T>) subtask;
1406             subtasks.add(tmp);
1407             return false;
1408         }
1409 
1410         @Override
1411         public boolean onComplete(Subtask<? extends T> subtask) {
1412             return isDone.test(subtask);
1413         }
1414 
1415         @Override
1416         public Stream<Subtask<T>> result() {
1417             return subtasks.stream();
1418         }
1419     }
1420 
1421     /**
1422      * Implementation of Config.
1423      */
1424     private record ConfigImpl(ThreadFactory threadFactory,
1425                               String name,
1426                               Duration timeout) implements Config {
1427         static Config defaultConfig() {
1428             return new ConfigImpl(Thread.ofVirtual().factory(),null,  null);
1429         }
1430 
1431         @Override
1432         public Config withThreadFactory(ThreadFactory threadFactory) {
1433             return new ConfigImpl(Objects.requireNonNull(threadFactory), name, timeout);
1434         }
1435 
1436         @Override
1437         public Config withName(String name) {
1438             return new ConfigImpl(threadFactory, Objects.requireNonNull(name), timeout);
1439         }
1440 
1441         @Override
1442         public Config withTimeout(Duration timeout) {
1443             return new ConfigImpl(threadFactory, name, Objects.requireNonNull(timeout));
1444         }
1445     }
1446 
1447     /**
1448      * Used to schedule a task to cancel exception when a timeout expires.
1449      */
1450     private static class TimerSupport {
1451         private static final ScheduledExecutorService DELAYED_TASK_SCHEDULER;
1452         static {
1453             ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor)
1454                 Executors.newScheduledThreadPool(1, task -> {
1455                     Thread t = InnocuousThread.newThread("StructuredTaskScope-Timer", task);
1456                     t.setDaemon(true);
1457                     return t;
1458                 });
1459             stpe.setRemoveOnCancelPolicy(true);
1460             DELAYED_TASK_SCHEDULER = stpe;
1461         }
1462 
1463         static Future<?> schedule(Duration timeout, Runnable task) {
1464             long nanos = TimeUnit.NANOSECONDS.convert(timeout);
1465             return DELAYED_TASK_SCHEDULER.schedule(task, nanos, TimeUnit.NANOSECONDS);
1466         }
1467     }
1468 }