1 /*
   2  * Copyright (c) 2021, 2024, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.concurrent;
  26 
  27 import java.lang.invoke.MethodHandles;
  28 import java.lang.invoke.VarHandle;
  29 import java.security.AccessController;
  30 import java.security.PrivilegedAction;
  31 import java.time.Duration;
  32 import java.util.ArrayList;
  33 import java.util.List;
  34 import java.util.NoSuchElementException;
  35 import java.util.Objects;
  36 import java.util.function.Function;
  37 import java.util.function.Predicate;
  38 import java.util.function.Supplier;
  39 import java.util.stream.Stream;
  40 import jdk.internal.javac.PreviewFeature;
  41 import jdk.internal.misc.InnocuousThread;
  42 import jdk.internal.misc.ThreadFlock;
  43 
  44 /**
  45  * An API for <em>structured concurrency</em>. {@code StructuredTaskScope} supports cases
  46  * where a main task splits into several concurrent subtasks, and where the subtasks must
  47  * complete before the main task continues. A {@code StructuredTaskScope} can be used to
  48  * ensure that the lifetime of a concurrent operation is confined by a <em>syntax block</em>,
  49  * just like that of a sequential operation in structured programming.
  50  *
  51  * <p> {@code StructuredTaskScope} defines the static method {@link #open() open} to open
  52  * a new {@code StructuredTaskScope} and the {@link #close() close} method to close it.
  53  * The API is designed to be used with the {@code try-with-resources} statement where
  54  * the {@code StructuredTaskScope} is opened as a resource and then closed automatically.
  55  * The code in the block uses the {@link #fork(Callable) fork} method to fork subtasks.
  56  * After forking, it uses the {@link #join() join} method to wait for all subtasks to
  57  * finish (or some other outcome) as a single operation. Forking a subtask starts a new
  58  * {@link Thread} to run the subtask. The thread executing the main task does not continue
  59  * beyond the {@code close} method until all threads started to execute subtasks have finished.
  60  * To ensure correct usage, the {@code fork}, {@code join} and {@code close} methods may
  61  * only be invoked by the <em>owner thread</em> (the thread that opened the {@code
  62  * StructuredTaskScope}), the {@code fork} method may not be called after {@code join},
  63  * the {@code join} method may only be invoked once, and the {@code close} method throws
  64  * an exception after closing if the owner did not invoke the {@code join} method after
  65  * forking subtasks.
  66  *
  67  * <p> As a first example, consider a main task that splits into two subtasks to concurrently
  68  * fetch resources from two URL locations "left" and "right". Both subtasks may complete
  69  * successfully, one subtask may succeed and the other may fail, or both subtasks may
  70  * fail. The main task in this example is interested in the successful result from both
  71  * subtasks. It waits in the {@link #join() join} method for both subtasks to complete
  72  * successfully or for either subtask to fail.
  73  * {@snippet lang=java :
  74  *    // @link substring="open" target="#open()" :
  75  *    try (var scope = StructuredTaskScope.open()) {
  76  *
  77  *        // @link substring="fork" target="#fork(Callable)" :
  78  *        Subtask<String> subtask1 = scope.fork(() -> query(left));
  79  *        Subtask<Integer> subtask2 = scope.fork(() -> query(right));
  80  *
  81  *        // throws if either subtask fails
  82  *        scope.join();  // @link substring="join" target="#join()"
  83  *
  84  *        // both subtasks completed successfully
  85  *        // @link substring="get" target="Subtask#get()" :
  86  *        return new MyResult(subtask1.get(), subtask2.get());
  87  *
  88  *    // @link substring="close" target="#close()" :
  89  *    } // close
  90  * }
  91  *
  92  * <p> If both subtasks complete successfully then the {@code join} method completes
  93  * normally and the main task uses the {@link Subtask#get() Subtask.get()} method to get
  94  * the result of each subtask. If one of the subtasks fails then the other subtask is
  95  * cancelled (this will interrupt the thread executing the other subtask) and the {@code
  96  * join} method throws {@link FailedException} with the exception from the failed subtask
  97  * as the {@linkplain Throwable#getCause() cause}.
  98  *
  99  * <p> In the example, the subtasks produce results of different types ({@code String} and
 100  * {@code Integer}). In other cases the subtasks may all produce results of the same type.
 101  * If the example had used {@code StructuredTaskScope.<String>open()} then it could
 102  * only be used to fork subtasks that return a {@code String} result.
 103  *
 104  * <h2>Joiners</h2>
 105  *
 106  * <p> In the example above, the main task fails if any subtask fails. If all subtasks
 107  * succeed then the {@code join} method completes normally. Other policy and outcome is
 108  * supported by creating a {@code StructuredTaskScope} with a {@link Joiner} that
 109  * implements the desired policy. A {@code Joiner} handles subtask completion and produces
 110  * the outcome for the {@link #join() join} method. In the example above, {@code join}
 111  * returns {@code null}. Depending on the {@code Joiner}, {@code join} may return a
 112  * result, a stream of elements, or some other object. The {@code Joiner} interface defines
 113  * factory methods to create {@code Joiner}s for some common cases.
 114  *
 115  * <p> A {@code Joiner} may <a id="CancelExecution"><em>cancel execution</em></a> (sometimes
 116  * called "short-circuiting") when some condition is reached that does not require the
 117  * result of subtasks that are still executing. Cancelling execution prevents new threads
 118  * from being started to execute further subtasks, {@linkplain Thread#interrupt() interrupts}
 119  * the threads executing subtasks that have not completed, and causes the {@code join}
 120  * method to wakeup with the outcome (result or exception). In the above example, the
 121  * outcome is that {@code join} completes with a result of {@code null} when all subtasks
 122  * succeed. It cancels execution if any of the subtasks fail, throwing the exception from
 123  * the first subtask that fails. Other {@code Joiner} implementations may return an object
 124  * instead of {@code null} and may cancel execution or throw based on some other policy.
 125  *
 126  * <p> To allow for cancelling execution, subtasks must be coded so that they
 127  * finish as soon as possible when interrupted. Subtasks that do not respond to interrupt,
 128  * e.g. block on methods that are not interruptible, may delay the closing of a task scope
 129  * indefinitely. The {@link #close() close} method always waits for threads executing
 130  * subtasks to finish, even if execution is cancelled, so execution cannot continue beyond
 131  * the {@code close} method until the interrupted threads finish.
 132  *
 133  * <p> Now consider another example that also splits into two subtasks to concurrently
 134  * fetch resources. In this example, the code in the main task is only interested in the
 135  * result from the first subtask to complete successfully. The example uses {@link
 136  * Joiner#anySuccessfulResultOrThrow() Joiner.anySuccessfulResultOrThrow()} to
 137  * create a {@code Joiner} that makes available the result of the first subtask to
 138  * complete successfully. The type parameter in the example is "{@code String}" so that
 139  * only subtasks that return a {@code String} can be forked.
 140  * {@snippet lang=java :
 141  *    // @link substring="open" target="#open(Policy)" :
 142  *    try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow())) {
 143  *
 144  *        scope.fork(() -> query(left));  // @link substring="fork" target="#fork(Callable)"
 145  *        scope.fork(() -> query(right));
 146  *
 147  *        // throws if both subtasks fail
 148  *        String firstResult = scope.join();
 149  *
 150  *    }
 151  * }
 152  *
 153  * <p> In the example, the main task forks the two subtasks, then waits in the {@code
 154  * join} method for either subtask to complete successfully or for both subtasks to fail.
 155  * If one of the subtasks completes successfully then the {@code Joiner} causes the other
 156  * subtask to be cancelled (this will interrupt the thread executing the subtask), and
 157  * the {@code join} method returns the result from the first subtask. Cancelling the other
 158  * subtask avoids the main task waiting for a result that it doesn't care about. If both
 159  * subtasks fail then the {@code join} method throws {@link FailedException} with the
 160  * exception from one of the subtasks as the {@linkplain Throwable#getCause() cause}.
 161  *
 162  * <p> Whether code uses the {@code Subtask} returned from {@code fork} will depend on
 163  * the {@code Joiner} and usage. Some {@code Joiner} implementations are suited to subtasks
 164  * that return results of the same type and where the {@code join} method returns a result
 165  * for the main task to use. Code that forks subtasks that return results of different
 166  * types, and uses a {@code Joiner} such as {@code Joiner.awaitAllSuccessfulOrThrow()} that
 167  * does not return a result, will use {@link Subtask#get() Subtask.get()} after joining.
 168  *
 169  * <h2>Exception handling</h2>
 170  *
 171  * <p> A {@code StructuredTaskScope} is opened with a {@link Joiner Joiner} that
 172  * handles subtask completion and produces the outcome for the {@link #join() join} method.
 173  * In some cases, the outcome will be a result, in other cases it will be an exception.
 174  * If the outcome is an exception then the {@code join} method throws {@link
 175  * FailedException} with the exception as the {@linkplain Throwable#getCause()
 176  * cause}. For many {@code Joiner} implementations, the exception will be an exception
 177  * thrown by a subtask that failed. In the case of {@link Joiner#allSuccessfulOrThrow()
 178  * allSuccessfulOrThrow} and {@link Joiner#awaitAllSuccessfulOrThrow() awaitAllSuccessfulOrThrow}
 179  * for example, the exception is from the first subtask to fail.
 180  *
 181  * <p> Many of the details for how exceptions are handled will depend on usage. In some
 182  * cases it may be useful to add a {@code catch} block to catch {@code FailedException}.
 183  * The exception handling may use {@code instanceof} with pattern matching to handle
 184  * specific causes.
 185  * {@snippet lang=java :
 186  *    try (var scope = StructuredTaskScope.open()) {
 187  *
 188  *        ..
 189  *
 190  *    } catch (StructuredTaskScope.FailedException e) {
 191  *
 192  *        Throwable cause = e.getCause();
 193  *        switch (cause) {
 194  *            case IOException ioe -> ..
 195  *            default -> ..
 196  *        }
 197  *
 198  *    }
 199  * }
 200  * In some cases it may not be useful to catch {@code FailedException} but instead leave
 201  * it to propagate to the configured {@linkplain Thread.UncaughtExceptionHandler uncaught
 202  * exception handler} for logging purposes.
 203  *
 204  * <p> For cases where a specific exception triggers the use of a default result then it
 205  * may be more appropriate to handle this in the subtask itself rather than the subtask
 206  * failing and code in the main task handling the exception.
 207  *
 208  * <h2>Configuration</h2>
 209  *
 210  * A {@code StructuredTaskScope} is opened with {@linkplain Config configuration} that
 211  * consists of a {@link ThreadFactory} to create threads, an optional name for monitoring
 212  * and management purposes, and an optional timeout.
 213  *
 214  * <p> The {@link #open()} and {@link #open(Joiner)} methods create a {@code StructuredTaskScope}
 215  * with the <a id="DefaultConfiguration"> <em>default configuration</em></a>. The default
 216  * configuration has a {@code ThreadFactory} that creates unnamed
 217  * <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>,
 218  * is unnamed for monitoring and management purposes, and has no timeout.
 219  *
 220  * <p> The 2-arg {@link #open(Joiner, Function) open} method can be used to create a
 221  * {@code StructuredTaskScope} that uses a different {@code ThreadFactory}, has a name for
 222  * the purposes of monitoring and management, or has a timeout that cancels execution if
 223  * the timeout expires before or while waiting for subtasks to complete. The {@code open}
 224  * method is called with a {@linkplain Function function} that is applied to the default
 225  * configuration and returns a {@link Config Config} for the {@code StructuredTaskScope}
 226  * under construction.
 227  *
 228  * <p> The following example opens a new {@code StructuredTaskScope} with a {@code
 229  * ThreadFactory} that creates virtual threads {@linkplain Thread#setName(String) named}
 230  * "duke-0", "duke-1" ...
 231  * {@snippet lang = java:
 232  *    // @link substring="name" target="Thread.Builder#name(String, long)" :
 233  *    ThreadFactory factory = Thread.ofVirtual().name("duke-", 0).factory();
 234  *
 235  *    // @link substring="withThreadFactory" target="Config#withThreadFactory(ThreadFactory)" :
 236  *    try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) {
 237  *
 238  *        scope.fork( .. );   // runs in a virtual thread with name "duke-0"
 239  *        scope.fork( .. );   // runs in a virtual thread with name "duke-1"
 240  *
 241  *        scope.join();
 242  *
 243  *     }
 244  *}
 245  *
 246  * <p> A second example sets a timeout, represented by a {@link Duration}. The timeout
 247  * starts when the new task scope is opened. If the timeout expires before the {@code join}
 248  * method has completed then <a href="#CancelExecution">execution is cancelled</a>. This
 249  * interrupts the threads executing the two subtasks and causes the {@link #join() join}
 250  * method to throw {@link FailedException} with {@link TimeoutException} as the cause.
 251  * {@snippet lang=java :
 252  *    Duration timeout = Duration.ofSeconds(10);
 253  *
 254  *    // @link substring="allSuccessfulOrThrow" target="Joiner#allSuccessfulOrThrow()" :
 255  *    try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
 256  *    // @link substring="withTimeout" target="Config#withTimeout(Duration)" :
 257  *                                              cf -> cf.withTimeout(timeout))) {
 258  *
 259  *        scope.fork(() -> query(left));
 260  *        scope.fork(() -> query(right));
 261  *
 262  *        List<String> result = scope.join()
 263  *                                   .map(Subtask::get)
 264  *                                   .toList();
 265  *
 266  *   }
 267  * }
 268  *
 269  * <h2>Inheritance of scoped value bindings</h2>
 270  *
 271  * {@link ScopedValue} supports the execution of a method with a {@code ScopedValue} bound
 272  * to a value for the bounded period of execution of the method by the <em>current thread</em>.
 273  * It allows a value to be safely and efficiently shared to methods without using method
 274  * parameters.
 275  *
 276  * <p> When used in conjunction with a {@code StructuredTaskScope}, a {@code ScopedValue}
 277  * can also safely and efficiently share a value to methods executed by subtasks forked
 278  * in the task scope. When a {@code ScopedValue} object is bound to a value in the thread
 279  * executing the main task then that binding is inherited by the threads created to
 280  * execute the subtasks. The thread executing the main task does not continue beyond the
 281  * {@link #close() close} method until all threads executing the subtasks have finished.
 282  * This ensures that the {@code ScopedValue} is not reverted to being {@linkplain
 283  * ScopedValue#isBound() unbound} (or its previous value) while subtasks are executing.
 284  * In addition to providing a safe and efficient means to inherit a value into subtasks,
 285  * the inheritance allows sequential code using {@code ScopedValue} be refactored to use
 286  * structured concurrency.
 287  *
 288  * <p> To ensure correctness, opening a new {@code StructuredTaskScope} captures the
 289  * current thread's scoped value bindings. These are the scoped values bindings that are
 290  * inherited by the threads created to execute subtasks in the task scope. Forking a
 291  * subtask checks that the bindings in effect at the time that the subtask is forked
 292  * match the bindings when the {@code StructuredTaskScope} was created. This check ensures
 293  * that a subtask does not inherit a binding that is reverted in the main task before the
 294  * subtask has completed.
 295  *
 296  * <p> A {@code ScopedValue} that is shared across threads requires that the value be an
 297  * immutable object or for all access to the value to be appropriately synchronized.
 298  *
 299  * <p> The following example demonstrates the inheritance of scoped value bindings. The
 300  * scoped value USERNAME is bound to the value "duke" for the bounded period of a lambda
 301  * expression by the thread executing it. The code in the block opens a {@code
 302  * StructuredTaskScope} and forks two subtasks, it then waits in the {@code join} method
 303  * and aggregates the results from both subtasks. If code executed by the threads
 304  * running subtask1 and subtask2 uses {@link ScopedValue#get()}, to get the value of
 305  * USERNAME, then value "duke" will be returned.
 306  * {@snippet lang=java :
 307  *     // @link substring="newInstance" target="ScopedValue#newInstance()" :
 308  *     private static final ScopedValue<String> USERNAME = ScopedValue.newInstance();
 309  *
 310  *     // @link substring="callWhere" target="ScopedValue#where" :
 311  *     Result result = ScopedValue.where(USERNAME, "duke").call(() -> {
 312  *
 313  *         try (var scope = StructuredTaskScope.open(Joiner.awaitAllSuccessfulOrThrow())) {
 314  *
 315  *             Subtask<String> subtask1 = scope.fork( .. );    // inherits binding
 316  *             Subtask<Integer> subtask2 = scope.fork( .. );   // inherits binding
 317  *
 318  *             scope.join();
 319  *             return new MyResult(subtask1.get(), subtask2.get());
 320  *         }
 321  *
 322  *     });
 323  * }
 324  *
 325  * <p> A scoped value inherited into a subtask may be
 326  * <a href="{@docRoot}/java.base/java/lang/ScopedValues.html#rebind">rebound</a> to a new
 327  * value in the subtask for the bounded execution of some method executed in the subtask.
 328  * When the method completes, the value of the {@code ScopedValue} reverts to its previous
 329  * value, the value inherited from the thread executing the main task.
 330  *
 331  * <p> A subtask may execute code that itself opens a new {@code StructuredTaskScope}.
 332  * A main task executing in thread T1 opens a {@code StructuredTaskScope} and forks a
 333  * subtask that runs in thread T2. The scoped value bindings captured when T1 opens the
 334  * task scope are inherited into T2. The subtask (in thread T2) executes code that opens a
 335  * new {@code StructuredTaskScope} and forks a subtask that runs in thread T3. The scoped
 336  * value bindings captured when T2 opens the task scope are inherited into T3. These
 337  * include (or may be the same) as the bindings that were inherited from T1. In effect,
 338  * scoped values are inherited into a tree of subtasks, not just one level of subtask.
 339  *
 340  * <h2>Memory consistency effects</h2>
 341  *
 342  * <p> Actions in the owner thread of a {@code StructuredTaskScope} prior to
 343  * {@linkplain #fork forking} of a subtask
 344  * <a href="{@docRoot}/java.base/java/util/concurrent/package-summary.html#MemoryVisibility">
 345  * <i>happen-before</i></a> any actions taken by that subtask, which in turn
 346  * <i>happen-before</i> the subtask result is {@linkplain Subtask#get() retrieved}.
 347  *
 348  * <h2>General exceptions</h2>
 349  *
 350  * <p> Unless otherwise specified, passing a {@code null} argument to a method in this
 351  * class will cause a {@link NullPointerException} to be thrown.
 352  *
 353  * @param <T> the result type of tasks executed in the task scope
 354  * @param <R> the type of the result returned by the join method
 355  *
 356  * @jls 17.4.5 Happens-before Order
 357  * @since 21
 358  */
 359 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 360 public class StructuredTaskScope<T, R> implements AutoCloseable {
 361     private static final VarHandle CANCELLED;
 362     static {
 363         try {
 364             MethodHandles.Lookup l = MethodHandles.lookup();
 365             CANCELLED = l.findVarHandle(StructuredTaskScope.class,"cancelled", boolean.class);
 366         } catch (Exception e) {
 367             throw new ExceptionInInitializerError(e);
 368         }
 369     }
 370 
 371     private final Joiner<? super T, ? extends R> joiner;
 372     private final ThreadFactory threadFactory;
 373     private final ThreadFlock flock;
 374 
 375     // state, only accessed by owner thread
 376     private static final int ST_NEW            = 0;
 377     private static final int ST_FORKED         = 1;   // subtasks forked, need to join
 378     private static final int ST_JOIN_STARTED   = 2;   // join started, can no longer fork
 379     private static final int ST_JOIN_COMPLETED = 3;   // join completed
 380     private static final int ST_CLOSED         = 4;   // closed
 381     private int state;
 382 
 383     // timer task, only accessed by owner thread
 384     private Future<?> timerTask;
 385 
 386     // set or read by any thread
 387     private volatile boolean cancelled;
 388 
 389     // set by the timer thread, read by the owner thread
 390     private volatile boolean timeoutExpired;
 391 
 392     /**
 393      * Throws WrongThreadException if the current thread is not the owner thread.
 394      */
 395     private void ensureOwner() {
 396         if (Thread.currentThread() != flock.owner()) {
 397             throw new WrongThreadException("Current thread not owner");
 398         }
 399     }
 400 
 401     /**
 402      * Throws IllegalStateException if the already joined or task scope is closed.
 403      */
 404     private void ensureNotJoined() {
 405         assert Thread.currentThread() == flock.owner();
 406         if (state > ST_FORKED) {
 407             throw new IllegalStateException("Already joined or task scope is closed");
 408         }
 409     }
 410 
 411     /**
 412      * Throws IllegalStateException if invoked by the owner thread and the owner thread
 413      * has not joined.
 414      */
 415     private void ensureJoinedIfOwner() {
 416         if (Thread.currentThread() == flock.owner() && state <= ST_JOIN_STARTED) {
 417             throw new IllegalStateException("join not called");
 418         }
 419     }
 420 
 421     /**
 422      * Interrupts all threads in this task scope, except the current thread.
 423      */
 424     private void implInterruptAll() {
 425         flock.threads()
 426                 .filter(t -> t != Thread.currentThread())
 427                 .forEach(t -> {
 428                     try {
 429                         t.interrupt();
 430                     } catch (Throwable ignore) { }
 431                 });
 432     }
 433 
 434     @SuppressWarnings("removal")
 435     private void interruptAll() {
 436         if (System.getSecurityManager() == null) {
 437             implInterruptAll();
 438         } else {
 439             PrivilegedAction<Void> pa = () -> {
 440                 implInterruptAll();
 441                 return null;
 442             };
 443             AccessController.doPrivileged(pa);
 444         }
 445     }
 446 
 447     /**
 448      * Cancel exception if not already cancelled.
 449      */
 450     private void cancelExecution() {
 451         if (!cancelled && CANCELLED.compareAndSet(this, false, true)) {
 452             // prevent new threads from starting
 453             flock.shutdown();
 454 
 455             // interrupt all unfinished threads
 456             interruptAll();
 457 
 458             // wakeup join
 459             flock.wakeup();
 460         }
 461     }
 462 
 463     /**
 464      * Schedules a task to cancel execution on timeout.
 465      */
 466     private void scheduleTimeout(Duration timeout) {
 467         assert Thread.currentThread() == flock.owner() && timerTask == null;
 468         timerTask = TimerSupport.schedule(timeout, () -> {
 469             if (!cancelled) {
 470                 timeoutExpired = true;
 471                 cancelExecution();
 472             }
 473         });
 474     }
 475 
 476     /**
 477      * Cancels the timer task if set.
 478      */
 479     private void cancelTimeout() {
 480         assert Thread.currentThread() == flock.owner();
 481         if (timerTask != null) {
 482             timerTask.cancel(false);
 483         }
 484     }
 485 
 486     /**
 487      * Invoked by the thread for a subtask when the subtask completes before execution
 488      * was cancelled.
 489      */
 490     private void onComplete(SubtaskImpl<? extends T> subtask) {
 491         assert subtask.state() != Subtask.State.UNAVAILABLE;
 492         if (joiner.onComplete(subtask)) {
 493             cancelExecution();
 494         }
 495     }
 496 
 497     /**
 498      * Initialize a new StructuredTaskScope.
 499      */
 500     @SuppressWarnings("this-escape")
 501     private StructuredTaskScope(Joiner<? super T, ? extends R> joiner,
 502                                 ThreadFactory threadFactory,
 503                                 String name) {
 504         this.joiner = joiner;
 505         this.threadFactory = threadFactory;
 506 
 507         if (name == null)
 508             name = Objects.toIdentityString(this);
 509         this.flock = ThreadFlock.open(name);
 510     }
 511 
 512     /**
 513      * Represents a subtask forked with {@link #fork(Callable)} or {@link #fork(Runnable)}.
 514      *
 515      * <p> Code that forks subtasks can use the {@link #get() get()} method after {@linkplain
 516      * #join() joining} to obtain the result of a subtask that completed successfully. It
 517      * can use the {@link #exception()} method to obtain the exception thrown by a subtask
 518      * that failed.
 519      *
 520      * @param <T> the result type
 521      * @since 21
 522      */
 523     @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 524     public sealed interface Subtask<T> extends Supplier<T> permits SubtaskImpl {
 525         /**
 526          * Represents the state of a subtask.
 527          * @see Subtask#state()
 528          * @since 21
 529          */
 530         @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 531         enum State {
 532             /**
 533              * The subtask result or exception is not available. This state indicates that
 534              * the subtask was forked but has not completed, it completed after execution
 535              * was cancelled, or it was forked after execution was cancelled (in which
 536              * case a thread was not created to execute the subtask).
 537              */
 538             UNAVAILABLE,
 539             /**
 540              * The subtask completed successfully. The {@link Subtask#get() Subtask.get()}
 541              * method can be used to get the result. This is a terminal state.
 542              */
 543             SUCCESS,
 544             /**
 545              * The subtask failed with an exception. The {@link Subtask#exception()
 546              * Subtask.exception()} method can be used to get the exception. This is a
 547              * terminal state.
 548              */
 549             FAILED,
 550         }
 551 
 552         /**
 553          * {@return the subtask state}
 554          */
 555         State state();
 556 
 557         /**
 558          * Returns the result of this subtask if it completed successfully. If
 559          * {@linkplain #fork(Callable) forked} to execute a value-returning task then the
 560          * result from the {@link Callable#call() call} method is returned. If
 561          * {@linkplain #fork(Runnable) forked} to execute a task that does not return a
 562          * result then {@code null} is returned.
 563          *
 564          * <p> Code executing in the scope owner thread can use this method to get the
 565          * result of a successful subtask only after it has {@linkplain #join() joined}.
 566          *
 567          * <p> Code executing in the {@code Joiner} {@link Joiner#onComplete(Subtask)
 568          * onComplete} method should test that the {@linkplain #state() subtask state} is
 569          * {@link State#SUCCESS SUCCESS} before using this method to get the result.
 570          *
 571          * @return the possibly-null result
 572          * @throws IllegalStateException if the subtask has not completed, did not complete
 573          * successfully, or the current thread is the task scope owner invoking this
 574          * method before {@linkplain #join() joining}
 575          * @see State#SUCCESS
 576          */
 577         T get();
 578 
 579         /**
 580          * {@return the exception thrown by this subtask if it failed} If
 581          * {@linkplain #fork(Callable) forked} to execute a value-returning task then
 582          * the exception thrown by the {@link Callable#call() call} method is returned.
 583          * If {@linkplain #fork(Runnable) forked} to execute a task that does not return
 584          * a result then the exception thrown by the {@link Runnable#run() run} method is
 585          * returned.
 586          *
 587          * <p> Code executing in the scope owner thread can use this method to get the
 588          * exception thrown by a failed subtask only after it has {@linkplain #join() joined}.
 589          *
 590          * <p> Code executing in a {@code Joiner} {@link Joiner#onComplete(Subtask)
 591          * onComplete} method should test that the {@linkplain #state() subtask state} is
 592          * {@link State#FAILED FAILED} before using this method to get the exception.
 593          *
 594          * @throws IllegalStateException if the subtask has not completed, completed with
 595          * a result, or the current thread is the task scope owner invoking this method
 596          * before {@linkplain #join() joining}
 597          * @see State#FAILED
 598          */
 599         Throwable exception();
 600     }
 601 
 602     /**
 603      * An object used with a {@link StructuredTaskScope} to handle subtask completion
 604      * and produce the result for a main task waiting in the {@link #join() join} method
 605      * for subtasks to complete.
 606      *
 607      * <p> Joiner defines static methods to create {@code Joiner} objects for common cases:
 608      * <ul>
 609      *   <li> {@link #allSuccessfulOrThrow() allSuccessfulOrThrow()} creates a {@code Joiner}
 610      *   that yields a stream of the completed subtasks for {@code join} to return when
 611      *   all subtasks complete successfully. It cancels execution and causes {@code join}
 612      *   to throw if any subtask fails.
 613      *   <li> {@link #anySuccessfulResultOrThrow() anySuccessfulResultOrThrow()} creates a
 614      *   {@code Joiner} that yields the result of the first subtask to succeed. It cancels
 615      *   execution and causes {@code join} to throw if all subtasks fail.
 616      *   <li> {@link #awaitAllSuccessfulOrThrow() awaitAllSuccessfulOrThrow()} creates a
 617      *   {@code Joiner} that waits for all successful subtasks. It cancels execution and
 618      *   causes {@code join} to throw if any subtask fails.
 619      *   <li> {@link #awaitAll() awaitAll()} creates a {@code Joiner} that waits for all
 620      *   subtasks. It does not cancel execution or cause {@code join} to throw.
 621      * </ul>
 622      *
 623      * <p> In addition to the methods to create {@code Joiner} objects for common cases,
 624      * the {@link #allUntil(Predicate) allUntil(Predicate)} method is defined to create a
 625      * {@code Joiner} that yields a stream of all subtasks. It is created with a {@link
 626      * Predicate Predicate} that determines if execution should continue or be cancelled.
 627      * This {@code Joiner} can be built upon to create custom policies that cancel
 628      * execution based on some condition.
 629      *
 630      * <p> More advanced policies can be developed by implementing the {@code Joiner}
 631      * interface. The {@link #onFork(Subtask)} method is invoked when subtasks are forked.
 632      * The {@link #onComplete(Subtask)} method is invoked when subtasks complete with a
 633      * result or exception. These methods return a {@code boolean} to indicate if execution
 634      * should be cancelled. These methods can be used to collect subtasks, results, or
 635      * exceptions, and control when to cancel execution. The {@link #result()} method
 636      * must be implemented to produce the result (or exception) for the {@code join}
 637      * method.
 638      *
 639      * <p> Unless otherwise specified, passing a {@code null} argument to a method
 640      * in this class will cause a {@link NullPointerException} to be thrown.
 641      *
 642      * @implSpec Implementations of this interface must be thread safe. The {@link
 643      * #onComplete(Subtask)} method defined by this interface may be invoked by several
 644      * threads concurrently.
 645      *
 646      * @apiNote It is very important that a new {@code Joiner} object is created for each
 647      * {@code StructuredTaskScope}. {@code Joiner} objects should never be shared with
 648      * different task scopes or re-used after a task is closed.
 649      *
 650      * <p> Designing a {@code Joiner} should take into account the code at the use-site
 651      * where the results from the {@link StructuredTaskScope#join() join} method are
 652      * processed. It should be clear what the {@code Joiner} does vs. the application
 653      * code at the use-site. In general, the {@code Joiner} implementation is not the
 654      * place to code "business logic". A {@code Joiner} should be designed to be as
 655      * general purpose as possible.
 656      *
 657      * @param <T> the result type of tasks executed in the task scope
 658      * @param <R> the type of results returned by the join method
 659      * @since 24
 660      * @see #open(Joiner)
 661      */
 662     @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 663     @FunctionalInterface
 664     public interface Joiner<T, R> {
 665 
 666         /**
 667          * Invoked by {@link #fork(Callable) fork(Callable)} and {@link #fork(Runnable)
 668          * fork(Runnable)} when forking a subtask. The method is invoked from the task
 669          * owner thread. The method is invoked before a thread is created to run the
 670          * subtask.
 671          *
 672          * @implSpec The default implementation throws {@code NullPointerException} if the
 673          * subtask is {@code null}. It throws {@code IllegalArgumentException} if the
 674          * subtask is not in the {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state, it
 675          * otherwise returns {@code false}.
 676          *
 677          * @apiNote This method is invoked by the {@code fork} methods. It should not be
 678          * invoked directly.
 679          *
 680          * @param subtask the subtask
 681          * @return {@code true} to cancel execution
 682          */
 683         default boolean onFork(Subtask<? extends T> subtask) {
 684             if (subtask.state() != Subtask.State.UNAVAILABLE) {
 685                 throw new IllegalArgumentException();
 686             }
 687             return false;
 688         }
 689 
 690         /**
 691          * Invoked by the thread started to execute a subtask after the subtask completes
 692          * successfully or fails with an exception. This method is not invoked if a
 693          * subtask completes after execution has been cancelled.
 694          *
 695          * @implSpec The default implementation throws {@code NullPointerException} if the
 696          * subtask is {@code null}. It throws {@code IllegalArgumentException} if the
 697          * subtask is not in the {@link Subtask.State#SUCCESS SUCCESS} or {@link
 698          * Subtask.State#FAILED FAILED} state, it otherwise returns {@code false}.
 699          *
 700          * @apiNote This method is invoked by subtasks when they complete. It should not
 701          * be invoked directly.
 702          *
 703          * @param subtask the subtask
 704          * @return {@code true} to cancel execution
 705          */
 706         default boolean onComplete(Subtask<? extends T> subtask) {
 707             if (subtask.state() == Subtask.State.UNAVAILABLE) {
 708                 throw new IllegalArgumentException();
 709             }
 710             return false;
 711         }
 712 
 713         /**
 714          * Invoked by {@link #join()} to produce the result (or exception) after waiting
 715          * for all subtasks to complete or execution to be cancelled. The result from this
 716          * method is returned by the {@code join} method. If this method throws, then
 717          * {@code join} throws {@link FailedException} with the exception thrown by
 718          * this method as the cause.
 719          *
 720          * <p> In normal usage, this method will be called at most once by the {@code join}
 721          * method to produce the result (or exception). The behavior of this method when
 722          * invoked directly, and invoked more than once, is not specified. Where possible,
 723          * an implementation should return an equal result (or throw the same exception)
 724          * on second or subsequent calls to produce the outcome.
 725          *
 726          * @apiNote This method is invoked by the {@code join} method. It should not be
 727          * invoked directly.
 728          *
 729          * @return the result
 730          * @throws Throwable the exception
 731          */
 732         R result() throws Throwable;
 733 
 734         /**
 735          * {@return a new Joiner object that yields a stream of all subtasks when all
 736          * subtasks complete successfully}
 737          * This method throws, and <a href="StructuredTaskScope.html#CancelExecution">
 738          * execution is cancelled</a>, if any subtask fails.
 739          *
 740          * <p> If all subtasks complete successfully, the joiner's {@link Joiner#result()}
 741          * method returns a stream of all subtasks in the order that they were forked.
 742          * If any subtask failed then the {@code result} method throws the exception from
 743          * the first subtask to fail.
 744          *
 745          * @apiNote Joiners returned by this method are suited to cases where all subtasks
 746          * return a result of the same type. Joiners returned by {@link
 747          * #awaitAllSuccessfulOrThrow()} are suited to cases where the subtasks return
 748          * results of different types.
 749          *
 750          * @param <T> the result type of subtasks
 751          */
 752         static <T> Joiner<T, Stream<Subtask<T>>> allSuccessfulOrThrow() {
 753             return new AllSuccessful<>();
 754         }
 755 
 756         /**
 757          * {@return a new Joiner object that yields the result of any subtask that
 758          * completed successfully}
 759          * This method throws, and <a href="StructuredTaskScope.html#CancelExecution">
 760          * execution is cancelled</a>, if all subtasks fail.
 761          *
 762          * <p> The joiner's {@link Joiner#result()} method returns the result of a subtask
 763          * that completed successfully. If all subtasks fail then the {@code result} method
 764          * throws the exception from one of the failed subtasks. The {@code result} method
 765          * throws {@code NoSuchElementException} if no subtasks were forked.
 766          *
 767          * @param <T> the result type of subtasks
 768          */
 769         static <T> Joiner<T, T> anySuccessfulResultOrThrow() {
 770             return new AnySuccessful<>();
 771         }
 772 
 773         /**
 774          * {@return a new Joiner object that waits for subtasks to complete successfully}
 775          * This method throws, and <a href="StructuredTaskScope.html#CancelExecution">
 776          * execution is cancelled</a>, if any subtask fails.
 777          *
 778          * <p> The joiner's {@link Joiner#result() result} method returns {@code null}
 779          * if all subtasks complete successfully, or throws the exception from the first
 780          * subtask to fail.
 781          *
 782          * @apiNote Joiners returned by this method are suited to cases where subtasks
 783          * return results of different types. Joiners returned by {@link #allSuccessfulOrThrow()}
 784          * are suited to cases where the subtasks return a result of the same type.
 785          *
 786          * @param <T> the result type of subtasks
 787          */
 788         static <T> Joiner<T, Void> awaitAllSuccessfulOrThrow() {
 789             return new AwaitSuccessful<>();
 790         }
 791 
 792         /**
 793          * {@return a new Joiner object that waits for all subtasks to complete}
 794          * This method does not cancel execution if a subtask fails.
 795          *
 796          * <p> The joiner's {@link Joiner#result() result} method returns {@code null}.
 797          *
 798          * @apiNote This Joiner can be useful for cases where subtasks make use of
 799          * <em>side-effects</em> rather than return results or fail with exceptions.
 800          * The {@link #fork(Runnable) fork(Runnable)} method can be used to fork subtasks
 801          * that do not return a result.
 802          *
 803          * @param <T> the result type of subtasks
 804          */
 805         static <T> Joiner<T, Void> awaitAll() {
 806             // ensure that new Joiner object is returned
 807             return new Joiner<T, Void>() {
 808                 @Override
 809                 public Void result() {
 810                     return null;
 811                 }
 812             };
 813         }
 814 
 815         /**
 816          * {@return a new Joiner object that yields a stream of all subtasks when all
 817          * subtasks complete or <a href="StructuredTaskScope.html#CancelExecution">
 818          * execution is cancelled</a> by a predicate}
 819          *
 820          * <p> The joiner's {@link Joiner#onComplete(Subtask)} method invokes the
 821          * predicate's {@link Predicate#test(Object) test} method with the subtask that
 822          * completed successfully or failed with an exception. If the {@code test} method
 823          * returns {@code true} then <a href="StructuredTaskScope.html#CancelExecution">
 824          * execution is cancelled</a>. The {@code test} method must be thread safe as it
 825          * may be invoked concurrently from several threads.
 826          *
 827          * <p> The joiner's {@link #result()} method returns the stream of all subtasks,
 828          * in fork order. The stream may contain subtasks that have completed
 829          * (in {@link Subtask.State#SUCCESS SUCCESS} or {@link Subtask.State#FAILED FAILED}
 830          * state) or subtasks in the {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state
 831          * if execution was cancelled before all subtasks were forked or completed.
 832          *
 833          * <p> The following example uses this method to create a {@code Joiner} that
 834          * <a href="StructuredTaskScope.html#CancelExecution">cancels execution</a> when
 835          * two or more subtasks fail.
 836          * {@snippet lang=java :
 837          *    class CancelAfterTwoFailures<T> implements Predicate<Subtask<? extends T>> {
 838          *         private final AtomicInteger failedCount = new AtomicInteger();
 839          *         @Override
 840          *         public boolean test(Subtask<? extends T> subtask) {
 841          *             return subtask.state() == Subtask.State.FAILED
 842          *                     && failedCount.incrementAndGet() >= 2;
 843          *         }
 844          *     }
 845          *
 846          *     var joiner = Joiner.all(new CancelAfterTwoFailures<String>());
 847          * }
 848          *
 849          * @param isDone the predicate to evaluate completed subtasks
 850          * @param <T> the result type of subtasks
 851          */
 852         static <T> Joiner<T, Stream<Subtask<T>>> allUntil(Predicate<Subtask<? extends T>> isDone) {
 853             return new AllSubtasks<>(isDone);
 854         }
 855     }
 856 
 857     /**
 858      * Represents the configuration for a {@code StructuredTaskScope}.
 859      *
 860      * <p> The configuration for a {@code StructuredTaskScope} consists of a {@link
 861      * ThreadFactory} to create threads, an optional name for the purposes of monitoring
 862      * and management, and an optional timeout.
 863      *
 864      * <p> Creating a {@code StructuredTaskScope} with {@link #open()} or {@link #open(Joiner)}
 865      * uses the <a href="StructuredTaskScope.html#DefaultConfiguration">default
 866      * configuration</a>. The default configuration consists of a thread factory that
 867      * creates unnamed <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">
 868      * virtual threads</a>, no name for monitoring and management purposes, and no timeout.
 869      *
 870      * <p> Creating a {@code StructuredTaskScope} with its 2-arg {@link #open(Joiner, Function)
 871      * open} method allows a different configuration to be used. The function specified
 872      * to the {@code open} method is applied to the default configuration and returns the
 873      * configuration for the {@code StructuredTaskScope} under construction. The function
 874      * can use the {@code with-} prefixed methods defined here to specify the components
 875      * of the configuration to use.
 876      *
 877      * <p> Unless otherwise specified, passing a {@code null} argument to a method
 878      * in this class will cause a {@link NullPointerException} to be thrown.
 879      *
 880      * @since 24
 881      */
 882     @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 883     public sealed interface Config permits ConfigImpl {
 884         /**
 885          * {@return a new {@code Config} object with the given thread factory}
 886          * The other components are the same as this object. The thread factory is used by
 887          * a task scope to create threads when {@linkplain #fork(Callable) forking} subtasks.
 888          * @param threadFactory the thread factory
 889          *
 890          * @apiNote The thread factory will typically create
 891          * <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>,
 892          * maybe with names for monitoring purposes, an {@linkplain Thread.UncaughtExceptionHandler
 893          * uncaught exception handler}, or other properties configured.
 894          *
 895          * @see #fork(Callable)
 896          */
 897         Config withThreadFactory(ThreadFactory threadFactory);
 898 
 899         /**
 900          * {@return a new {@code Config} object with the given name}
 901          * The other components are the same as this object. A task scope is optionally
 902          * named for the purposes of monitoring and management.
 903          * @param name the name
 904          * @see StructuredTaskScope#toString()
 905          */
 906         Config withName(String name);
 907 
 908         /**
 909          * {@return a new {@code Config} object with the given timeout}
 910          * The other components are the same as this object.
 911          * @param timeout the timeout
 912          *
 913          * @apiNote Applications using deadlines, expressed as an {@link java.time.Instant},
 914          * can use {@link Duration#between Duration.between(Instant.now(), deadline)} to
 915          * compute the timeout for this method.
 916          *
 917          * @see #join()
 918          */
 919         Config withTimeout(Duration timeout);
 920     }
 921 
 922     /**
 923      * Exception thrown by {@link #join()} when the outcome is an exception rather than a
 924      * result.
 925      *
 926      * @since 24
 927      */
 928     @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 929     public static class FailedException extends RuntimeException {
 930         @java.io.Serial
 931         static final long serialVersionUID = -1533055100078459923L;
 932 
 933         /**
 934          * Constructs a {@code FailedException} with the specified cause.
 935          *
 936          * @param  cause the cause, can be {@code null}
 937          */
 938         public FailedException(Throwable cause) {
 939             super(cause);
 940         }
 941     }
 942 
 943     /**
 944      * Exception thrown by {@link #join()} if the task scope was created a timeout and
 945      * the timeout expired before or while waiting in {@code join}.
 946      *
 947      * @since 24
 948      * @see Config#withTimeout(Duration)
 949      */
 950     @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 951     public static class TimeoutException extends RuntimeException {
 952         @java.io.Serial
 953         static final long serialVersionUID = 705788143955048766L;
 954 
 955         /**
 956          * Constructs a {@code TimeoutException} with no detail message.
 957          */
 958         public TimeoutException() { }
 959     }
 960 
 961     /**
 962      * Opens a new structured task scope to use the given {@code Joiner} object and with
 963      * configuration that is the result of applying the given function to the
 964      * <a href="#DefaultConfiguration">default configuration</a>.
 965      *
 966      * <p> The {@code configFunction} is called with the default configuration and returns
 967      * the configuration for the new structured task scope. The function may, for example,
 968      * set the {@linkplain Config#withThreadFactory(ThreadFactory) ThreadFactory} or set
 969      * a {@linkplain Config#withTimeout(Duration) timeout}.
 970      *
 971      * <p> If a {@linkplain Config#withThreadFactory(ThreadFactory) ThreadFactory} is set
 972      * then the {@code ThreadFactory}'s {@link ThreadFactory#newThread(Runnable) newThread}
 973      * method will be used to create threads when forking subtasks in this task scope.
 974      *
 975      * <p> If a {@linkplain Config#withTimeout(Duration) timeout} is set then it starts
 976      * when the task scope is opened. If the timeout expires before the task scope has
 977      * {@linkplain #join() joined} then execution is cancelled and the {@code join} method
 978      * throws {@link FailedException} with {@link TimeoutException} as the cause.
 979      *
 980      * <p> The new task scope is owned by the current thread. Only code executing in this
 981      * thread can {@linkplain #fork(Callable) fork}, {@linkplain #join() join}, or
 982      * {@linkplain #close close} the task scope.
 983      *
 984      * <p> Construction captures the current thread's {@linkplain ScopedValue scoped
 985      * value} bindings for inheritance by threads started in the task scope.
 986      *
 987      * @param joiner the joiner
 988      * @param configFunction a function to produce the configuration
 989      * @return a new task scope
 990      * @param <T> the result type of tasks executed in the task scope
 991      * @param <R> the type of the result returned by the join method
 992      * @since 24
 993      */
 994     public static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner,
 995                                                         Function<Config, Config> configFunction) {
 996         Objects.requireNonNull(joiner);
 997 
 998         var config = (ConfigImpl) configFunction.apply(ConfigImpl.defaultConfig());
 999         var scope = new StructuredTaskScope<T, R>(joiner, config.threadFactory(), config.name());
1000 
1001         // schedule timeout
1002         Duration timeout = config.timeout();
1003         if (timeout != null) {
1004             boolean done = false;
1005             try {
1006                 scope.scheduleTimeout(timeout);
1007                 done = true;
1008             } finally {
1009                 if (!done) {
1010                     scope.close();  // pop if scheduling timeout failed
1011                 }
1012             }
1013         }
1014 
1015         return scope;
1016     }
1017 
1018     /**
1019      * Opens a new structured task scope to use the given {@code Joiner} object. The
1020      * task scope is created with the <a href="#DefaultConfiguration">default configuration</a>.
1021      * The default configuration has a {@code ThreadFactory} that creates unnamed
1022      * <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>,
1023      * is unnamed for monitoring and management purposes, and has no timeout.
1024      *
1025      * @implSpec
1026      * This factory method is equivalent to invoking the 2-arg open method with the given
1027      * joiner and the {@linkplain Function#identity() identity function}.
1028      *
1029      * @param joiner the joiner
1030      * @return a new task scope
1031      * @param <T> the result type of tasks executed in the task scope
1032      * @param <R> the type of the result returned by the join method
1033      * @since 24
1034      */
1035     public static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner) {
1036         return open(joiner, Function.identity());
1037     }
1038 
1039     /**
1040      * Opens a new structured task scope that can be used to fork subtasks that return
1041      * results of any type. The {@link #join()} method waits for all subtasks to succeed
1042      * or any subtask to fail.
1043      *
1044      * <p> The {@code join} method returns {@code null} if all subtasks complete successfully.
1045      * It throws {@link FailedException} if any subtask fails, with the exception from
1046      * the first subtask to fail as the cause.
1047      *
1048      * <p> The task scope is created with the <a href="#DefaultConfiguration">default
1049      * configuration</a>. The default configuration has a {@code ThreadFactory} that creates
1050      * unnamed <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual
1051      * threads</a>, is unnamed for monitoring and management purposes, and has no timeout.
1052      *
1053      * @implSpec
1054      * This factory method is equivalent to invoking the 2-arg open method with a joiner
1055      * created with {@link Joiner#awaitAllSuccessfulOrThrow() awaitAllSuccessfulOrThrow()}
1056      * and the {@linkplain Function#identity() identity function}.
1057      *
1058      * @param <T> the result type of subtasks
1059      * @return a new task scope
1060      * @since 24
1061      */
1062     public static <T> StructuredTaskScope<T, Void> open() {
1063         return open(Joiner.awaitAllSuccessfulOrThrow(), Function.identity());
1064     }
1065 
1066     /**
1067      * Starts a new thread in this task scope to execute a value-returning task, thus
1068      * creating a <em>subtask</em>. The value-returning task is provided to this method
1069      * as a {@link Callable}, the thread executes the task's {@link Callable#call() call}
1070      * method.
1071      *
1072      * <p> This method first creates a {@link Subtask Subtask} to represent the <em>forked
1073      * subtask</em>. It invokes the joiner's {@link Joiner#onFork(Subtask) onFork} method
1074      * with the {@code Subtask} object. If the {@code onFork} completes with an exception
1075      * or error then it is propagated by the {@code fork} method. If execution is
1076      * {@linkplain #isCancelled() cancelled}, or {@code onFork} returns {@code true} to
1077      * cancel execution, then this method returns the {@code Subtask} (in the {@link
1078      * Subtask.State#UNAVAILABLE UNAVAILABLE} state) without creating a thread to execute
1079      * the subtask. If execution is not cancelled then a thread is created with the
1080      * {@link ThreadFactory} configured when the task scope was created, and the thread is
1081      * started. Forking a subtask inherits the current thread's {@linkplain ScopedValue
1082      * scoped value} bindings. The bindings must match the bindings captured when the
1083      * task scope was opened. If the subtask completes (successfully or with an exception)
1084      * before execution is cancelled, then the thread invokes the joiner's
1085      * {@link Joiner#onComplete(Subtask) onComplete} method with subtask in the
1086      * {@link Subtask.State#SUCCESS SUCCESS} or {@link Subtask.State#FAILED FAILED} state.
1087      *
1088      * <p> This method returns the {@link Subtask Subtask} object. In some usages, this
1089      * object may be used to get its result. In other cases it may be used for correlation
1090      * or just discarded. To ensure correct usage, the {@link Subtask#get() Subtask.get()}
1091      * method may only be called by the task scope owner to get the result after it has
1092      * waited for subtasks to complete with the {@link #join() join} method and the subtask
1093      * completed successfully. Similarly, the {@link Subtask#exception() Subtask.exception()}
1094      * method may only be called by the task scope owner after it has joined and the subtask
1095      * failed. If execution was cancelled before the subtask was forked, or before it
1096      * completes, then neither method can be used to obtain the outcome.
1097      *
1098      * <p> This method may only be invoked by the task scope owner.
1099      *
1100      * @param task the value-returning task for the thread to execute
1101      * @param <U> the result type
1102      * @return the subtask
1103      * @throws WrongThreadException if the current thread is not the task scope owner
1104      * @throws IllegalStateException if the owner has already {@linkplain #join() joined}
1105      * or the task scope is closed
1106      * @throws StructureViolationException if the current scoped value bindings are not
1107      * the same as when the task scope was created
1108      * @throws RejectedExecutionException if the thread factory rejected creating a
1109      * thread to run the subtask
1110      */
1111     public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
1112         Objects.requireNonNull(task);
1113         ensureOwner();
1114         ensureNotJoined();
1115 
1116         var subtask = new SubtaskImpl<U>(this, task);
1117 
1118         // notify joiner, even if cancelled
1119         if (joiner.onFork(subtask)) {
1120             cancelExecution();
1121         }
1122 
1123         if (!cancelled) {
1124             // create thread to run task
1125             Thread thread = threadFactory.newThread(subtask);
1126             if (thread == null) {
1127                 throw new RejectedExecutionException("Rejected by thread factory");
1128             }
1129 
1130             // attempt to start the thread
1131             try {
1132                 flock.start(thread);
1133             } catch (IllegalStateException e) {
1134                 // shutdown by another thread, or underlying flock is shutdown due
1135                 // to unstructured use
1136             }
1137         }
1138 
1139         // force owner to join
1140         state = ST_FORKED;
1141         return subtask;
1142     }
1143 
1144     /**
1145      * Starts a new thread in this task scope to execute a task that does not return a
1146      * result, creating a <em>subtask</em>.
1147      *
1148      * <p> This method works exactly the same as {@link #fork(Callable)} except that
1149      * the task is provided to this method as a {@link Runnable}, the thread executes
1150      * the task's {@link Runnable#run() run} method, and its result is {@code null}.
1151      *
1152      * @param task the task for the thread to execute
1153      * @return the subtask
1154      * @throws WrongThreadException if the current thread is not the task scope owner
1155      * @throws IllegalStateException if the owner has already {@linkplain #join() joined}
1156      * or the task scope is closed
1157      * @throws StructureViolationException if the current scoped value bindings are not
1158      * the same as when the task scope was created
1159      * @throws RejectedExecutionException if the thread factory rejected creating a
1160      * thread to run the subtask
1161      * @since 24
1162      */
1163     public Subtask<? extends T> fork(Runnable task) {
1164         Objects.requireNonNull(task);
1165         return fork(() -> { task.run(); return null; });
1166     }
1167 
1168     /**
1169      * Waits for all subtasks started in this task scope to complete or execution to be
1170      * cancelled. If a {@linkplain  Config#withTimeout(Duration) timeout} has been set
1171      * then execution will be cancelled if the timeout expires before or while waiting.
1172      * Once finished waiting, the {@code Joiner}'s {@link Joiner#result() result} method
1173      * is invoked to get the result or throw an exception. If the {@code result} method
1174      * throws then this method throws {@code FailedException} with the exception thrown
1175      * by the {@code result()} method as the cause.
1176      *
1177      * <p> This method waits for all subtasks by waiting for all threads {@linkplain
1178      * #fork(Callable) started} in this task scope to finish execution. It stops waiting
1179      * when all threads finish, the {@code Joiner}'s {@link Joiner#onFork(Subtask)
1180      * onFork} or {@link Joiner#onComplete(Subtask) onComplete} returns {@code true}
1181      * to cancel execution, the timeout (if set) expires, or the current thread is
1182      * {@linkplain Thread#interrupt() interrupted}.
1183      *
1184      * <p> This method may only be invoked by the task scope owner, and only once.
1185      *
1186      * @return the {@link Joiner#result() result}
1187      * @throws WrongThreadException if the current thread is not the task scope owner
1188      * @throws IllegalStateException if already joined or this task scope is closed
1189      * @throws FailedException if the <i>outcome</i> is an exception, thrown with the
1190      * exception from {@link Joiner#result()} as the cause
1191      * @throws TimeoutException if a timeout is set and the timeout expires before or
1192      * while waiting
1193      * @throws InterruptedException if interrupted while waiting
1194      */
1195     public R join() throws InterruptedException {
1196         ensureOwner();
1197         ensureNotJoined();
1198 
1199         // join started
1200         state = ST_JOIN_STARTED;
1201 
1202         // wait for all subtasks, execution to be cancelled, or interrupt
1203         flock.awaitAll();
1204 
1205         // throw if timeout expired
1206         if (timeoutExpired) {
1207             throw new TimeoutException();
1208         }
1209         cancelTimeout();
1210 
1211         // all subtasks completed or cancelled
1212         state = ST_JOIN_COMPLETED;
1213 
1214         // invoke joiner to get result
1215         try {
1216             return joiner.result();
1217         } catch (Throwable e) {
1218             throw new FailedException(e);
1219         }
1220     }
1221 
1222     /**
1223      * {@return {@code true} if <a href="#CancelExecution">execution is cancelled</a>,
1224      * or in the process of being cancelled, otherwise {@code false}}
1225      *
1226      * <p> Cancelling execution prevents new threads from starting in the task scope and
1227      * {@linkplain Thread#interrupt() interrupts} threads executing unfinished subtasks.
1228      * It may take some time before the interrupted threads finish execution; this
1229      * method may return {@code true} before all threads have been interrupted or before
1230      * all threads have finished.
1231      *
1232      * @apiNote A main task with a lengthy "forking phase" (the code that executes before
1233      * the main task invokes {@link #join() join}) may use this method to avoid doing work
1234      * in cases where execution was cancelled by the completion of a previously forked
1235      * subtask or timeout.
1236      *
1237      * @since 24
1238      */
1239     public boolean isCancelled() {
1240         return cancelled;
1241     }
1242 
1243     /**
1244      * Closes this task scope.
1245      *
1246      * <p> This method first <a href="#CancelExecution">cancels execution</a>, if not
1247      * already cancelled. This interrupts the threads executing unfinished subtasks. This
1248      * method then waits for all threads to finish. If interrupted while waiting then it
1249      * will continue to wait until the threads finish, before completing with the interrupt
1250      * status set.
1251      *
1252      * <p> This method may only be invoked by the task scope owner. If the task scope
1253      * is already closed then the task scope owner invoking this method has no effect.
1254      *
1255      * <p> A {@code StructuredTaskScope} is intended to be used in a <em>structured
1256      * manner</em>. If this method is called to close a task scope before nested task
1257      * scopes are closed then it closes the underlying construct of each nested task scope
1258      * (in the reverse order that they were created in), closes this task scope, and then
1259      * throws {@link StructureViolationException}.
1260      * Similarly, if this method is called to close a task scope while executing with
1261      * {@linkplain ScopedValue scoped value} bindings, and the task scope was created
1262      * before the scoped values were bound, then {@code StructureViolationException} is
1263      * thrown after closing the task scope.
1264      * If a thread terminates without first closing task scopes that it owns then
1265      * termination will cause the underlying construct of each of its open tasks scopes to
1266      * be closed. Closing is performed in the reverse order that the task scopes were
1267      * created in. Thread termination may therefore be delayed when the task scope owner
1268      * has to wait for threads forked in these task scopes to finish.
1269      *
1270      * @throws IllegalStateException thrown after closing the task scope if the task scope
1271      * owner did not attempt to join after forking
1272      * @throws WrongThreadException if the current thread is not the task scope owner
1273      * @throws StructureViolationException if a structure violation was detected
1274      */
1275     @Override
1276     public void close() {
1277         ensureOwner();
1278         int s = state;
1279         if (s == ST_CLOSED) {
1280             return;
1281         }
1282 
1283         // cancel execution if join did not complete
1284         if (s < ST_JOIN_COMPLETED) {
1285             cancelExecution();
1286             cancelTimeout();
1287         }
1288 
1289         // wait for stragglers
1290         try {
1291             flock.close();
1292         } finally {
1293             state = ST_CLOSED;
1294         }
1295 
1296         // throw ISE if the owner didn't join after forking
1297         if (s == ST_FORKED) {
1298             throw new IllegalStateException("Owner did not join after forking");
1299         }
1300     }
1301 
1302     /**
1303      * {@inheritDoc}  If a {@link Config#withName(String) name} for monitoring and
1304      * monitoring purposes has been set then the string representation includes the name.
1305      */
1306     @Override
1307     public String toString() {
1308         return flock.name();
1309     }
1310 
1311     /**
1312      * Subtask implementation, runs the task specified to the fork method.
1313      */
1314     private static final class SubtaskImpl<T> implements Subtask<T>, Runnable {
1315         private static final AltResult RESULT_NULL = new AltResult(Subtask.State.SUCCESS);
1316 
1317         private record AltResult(Subtask.State state, Throwable exception) {
1318             AltResult(Subtask.State state) {
1319                 this(state, null);
1320             }
1321         }
1322 
1323         private final StructuredTaskScope<? super T, ?> scope;
1324         private final Callable<? extends T> task;
1325         private volatile Object result;
1326 
1327         SubtaskImpl(StructuredTaskScope<? super T, ?> scope, Callable<? extends T> task) {
1328             this.scope = scope;
1329             this.task = task;
1330         }
1331 
1332         @Override
1333         public void run() {
1334             T result = null;
1335             Throwable ex = null;
1336             try {
1337                 result = task.call();
1338             } catch (Throwable e) {
1339                 ex = e;
1340             }
1341 
1342             // nothing to do if task scope is cancelled
1343             if (scope.isCancelled())
1344                 return;
1345 
1346             // set result/exception and invoke onComplete
1347             if (ex == null) {
1348                 this.result = (result != null) ? result : RESULT_NULL;
1349             } else {
1350                 this.result = new AltResult(State.FAILED, ex);
1351             }
1352             scope.onComplete(this);
1353         }
1354 
1355         @Override
1356         public Subtask.State state() {
1357             Object result = this.result;
1358             if (result == null) {
1359                 return State.UNAVAILABLE;
1360             } else if (result instanceof AltResult alt) {
1361                 // null or failed
1362                 return alt.state();
1363             } else {
1364                 return State.SUCCESS;
1365             }
1366         }
1367 
1368 
1369         @Override
1370         public T get() {
1371             scope.ensureJoinedIfOwner();
1372             Object result = this.result;
1373             if (result instanceof AltResult) {
1374                 if (result == RESULT_NULL) return null;
1375             } else if (result != null) {
1376                 @SuppressWarnings("unchecked")
1377                 T r = (T) result;
1378                 return r;
1379             }
1380             throw new IllegalStateException(
1381                     "Result is unavailable or subtask did not complete successfully");
1382         }
1383 
1384         @Override
1385         public Throwable exception() {
1386             scope.ensureJoinedIfOwner();
1387             Object result = this.result;
1388             if (result instanceof AltResult alt && alt.state() == State.FAILED) {
1389                 return alt.exception();
1390             }
1391             throw new IllegalStateException(
1392                     "Exception is unavailable or subtask did not complete with exception");
1393         }
1394 
1395         @Override
1396         public String toString() {
1397             String stateAsString = switch (state()) {
1398                 case UNAVAILABLE -> "[Unavailable]";
1399                 case SUCCESS     -> "[Completed successfully]";
1400                 case FAILED      -> {
1401                     Throwable ex = ((AltResult) result).exception();
1402                     yield "[Failed: " + ex + "]";
1403                 }
1404             };
1405             return Objects.toIdentityString(this) + stateAsString;
1406         }
1407     }
1408 
1409     /**
1410      * A joiner that returns a stream of all subtasks when all subtasks complete
1411      * successfully. If any subtask fails then execution is cancelled.
1412      */
1413     private static final class AllSuccessful<T> implements Joiner<T, Stream<Subtask<T>>> {
1414         private static final VarHandle FIRST_EXCEPTION;
1415         static {
1416             try {
1417                 MethodHandles.Lookup l = MethodHandles.lookup();
1418                 FIRST_EXCEPTION = l.findVarHandle(AllSuccessful.class, "firstException", Throwable.class);
1419             } catch (Exception e) {
1420                 throw new ExceptionInInitializerError(e);
1421             }
1422         }
1423         // list of forked subtasks, only accessed by owner thread
1424         private final List<Subtask<T>> subtasks = new ArrayList<>();
1425         private volatile Throwable firstException;
1426 
1427         @Override
1428         public boolean onFork(Subtask<? extends T> subtask) {
1429             @SuppressWarnings("unchecked")
1430             var tmp = (Subtask<T>) Objects.requireNonNull(subtask);
1431             subtasks.add(tmp);
1432             return false;
1433         }
1434 
1435         @Override
1436         public boolean onComplete(Subtask<? extends T> subtask) {
1437             return (subtask.state() == Subtask.State.FAILED)
1438                     && (firstException == null)
1439                     && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
1440         }
1441 
1442         @Override
1443         public Stream<Subtask<T>> result() throws Throwable {
1444             Throwable ex = firstException;
1445             if (ex != null) {
1446                 throw ex;
1447             } else {
1448                 return subtasks.stream();
1449             }
1450         }
1451     }
1452 
1453     /**
1454      * A joiner that returns the result of the first subtask to complete successfully.
1455      * If any subtask completes successfully then execution is cancelled.
1456      */
1457     private static final class AnySuccessful<T> implements Joiner<T, T> {
1458         private static final VarHandle FIRST_SUCCESS;
1459         private static final VarHandle FIRST_EXCEPTION;
1460         static {
1461             try {
1462                 MethodHandles.Lookup l = MethodHandles.lookup();
1463                 FIRST_SUCCESS = l.findVarHandle(AnySuccessful.class, "firstSuccess", Subtask.class);
1464                 FIRST_EXCEPTION = l.findVarHandle(AnySuccessful.class, "firstException", Throwable.class);
1465             } catch (Exception e) {
1466                 throw new ExceptionInInitializerError(e);
1467             }
1468         }
1469         private volatile Subtask<T> firstSuccess;
1470         private volatile Throwable firstException;
1471 
1472         @Override
1473         public boolean onComplete(Subtask<? extends T> subtask) {
1474             Objects.requireNonNull(subtask);
1475             if (firstSuccess == null) {
1476                 if (subtask.state() == Subtask.State.SUCCESS) {
1477                     // capture the first subtask that completes successfully
1478                     return FIRST_SUCCESS.compareAndSet(this, null, subtask);
1479                 } else if (firstException == null) {
1480                     // capture the exception thrown by the first task to fail
1481                     FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
1482                 }
1483             }
1484             return false;
1485         }
1486 
1487         @Override
1488         public T result() throws Throwable {
1489             Subtask<T> firstSuccess = this.firstSuccess;
1490             if (firstSuccess != null) {
1491                 return firstSuccess.get();
1492             }
1493             Throwable firstException = this.firstException;
1494             if (firstException != null) {
1495                 throw firstException;
1496             } else {
1497                 throw new NoSuchElementException("No subtasks completed");
1498             }
1499         }
1500     }
1501 
1502     /**
1503      * A joiner that that waits for all successful subtasks. If any subtask fails the
1504      * execution is cancelled.
1505      */
1506     private static final class AwaitSuccessful<T> implements Joiner<T, Void> {
1507         private static final VarHandle FIRST_EXCEPTION;
1508         static {
1509             try {
1510                 MethodHandles.Lookup l = MethodHandles.lookup();
1511                 FIRST_EXCEPTION = l.findVarHandle(AwaitSuccessful.class, "firstException", Throwable.class);
1512             } catch (Exception e) {
1513                 throw new ExceptionInInitializerError(e);
1514             }
1515         }
1516         private volatile Throwable firstException;
1517 
1518         @Override
1519         public boolean onComplete(Subtask<? extends T> subtask) {
1520             return (subtask.state() == Subtask.State.FAILED)
1521                     && (firstException == null)
1522                     && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
1523         }
1524 
1525         @Override
1526         public Void result() throws Throwable {
1527             Throwable ex = firstException;
1528             if (ex != null) {
1529                 throw ex;
1530             } else {
1531                 return null;
1532             }
1533         }
1534     }
1535 
1536     /**
1537      * A joiner that returns a stream of all subtasks.
1538      */
1539     private static class AllSubtasks<T> implements Joiner<T, Stream<Subtask<T>>> {
1540         private final Predicate<Subtask<? extends T>> isDone;
1541         // list of forked subtasks, only accessed by owner thread
1542         private final List<Subtask<T>> subtasks = new ArrayList<>();
1543 
1544         AllSubtasks(Predicate<Subtask<? extends T>> isDone) {
1545             this.isDone = Objects.requireNonNull(isDone);
1546         }
1547 
1548         @Override
1549         public boolean onFork(Subtask<? extends T> subtask) {
1550             @SuppressWarnings("unchecked")
1551             var tmp = (Subtask<T>) Objects.requireNonNull(subtask);
1552             subtasks.add(tmp);
1553             return false;
1554         }
1555 
1556         @Override
1557         public boolean onComplete(Subtask<? extends T> subtask) {
1558             return isDone.test(Objects.requireNonNull(subtask));
1559         }
1560 
1561         @Override
1562         public Stream<Subtask<T>> result() {
1563             return subtasks.stream();
1564         }
1565     }
1566 
1567     /**
1568      * Implementation of Config.
1569      */
1570     private record ConfigImpl(ThreadFactory threadFactory,
1571                               String name,
1572                               Duration timeout) implements Config {
1573         static Config defaultConfig() {
1574             return new ConfigImpl(Thread.ofVirtual().factory(), null, null);
1575         }
1576 
1577         @Override
1578         public Config withThreadFactory(ThreadFactory threadFactory) {
1579             return new ConfigImpl(Objects.requireNonNull(threadFactory), name, timeout);
1580         }
1581 
1582         @Override
1583         public Config withName(String name) {
1584             return new ConfigImpl(threadFactory, Objects.requireNonNull(name), timeout);
1585         }
1586 
1587         @Override
1588         public Config withTimeout(Duration timeout) {
1589             return new ConfigImpl(threadFactory, name, Objects.requireNonNull(timeout));
1590         }
1591     }
1592 
1593     /**
1594      * Used to schedule a task to cancel execution when a timeout expires.
1595      */
1596     private static class TimerSupport {
1597         private static final ScheduledExecutorService DELAYED_TASK_SCHEDULER;
1598         static {
1599             ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor)
1600                 Executors.newScheduledThreadPool(1, task -> {
1601                     Thread t = InnocuousThread.newThread("StructuredTaskScope-Timer", task);
1602                     t.setDaemon(true);
1603                     return t;
1604                 });
1605             stpe.setRemoveOnCancelPolicy(true);
1606             DELAYED_TASK_SCHEDULER = stpe;
1607         }
1608 
1609         static Future<?> schedule(Duration timeout, Runnable task) {
1610             long nanos = TimeUnit.NANOSECONDS.convert(timeout);
1611             return DELAYED_TASK_SCHEDULER.schedule(task, nanos, TimeUnit.NANOSECONDS);
1612         }
1613     }
1614 }