1 /*
   2  * Copyright (c) 2021, 2024, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.concurrent;
  26 
  27 import java.lang.invoke.MethodHandles;
  28 import java.lang.invoke.VarHandle;
  29 import java.security.AccessController;
  30 import java.security.PrivilegedAction;
  31 import java.time.Duration;
  32 import java.util.ArrayList;
  33 import java.util.List;
  34 import java.util.NoSuchElementException;
  35 import java.util.Objects;


  36 import java.util.function.Function;
  37 import java.util.function.Predicate;
  38 import java.util.function.Supplier;
  39 import java.util.stream.Stream;
  40 import jdk.internal.javac.PreviewFeature;
  41 import jdk.internal.misc.InnocuousThread;
  42 import jdk.internal.misc.ThreadFlock;
  43 
  44 /**
  45  * An API for <em>structured concurrency</em>. {@code StructuredTaskScope} supports cases
  46  * where a main task splits into several concurrent subtasks, and where the subtasks must
  47  * complete before the main task continues. A {@code StructuredTaskScope} can be used to
  48  * ensure that the lifetime of a concurrent operation is confined by a <em>syntax block</em>,
  49  * just like that of a sequential operation in structured programming.
  50  *
  51  * <p> {@code StructuredTaskScope} defines the static method {@link #open(Joiner) open}
  52  * to open a new {@code StructuredTaskScope} and the {@link #close() close} method to close
  53  * it. The API is designed to be used with the {@code try-with-resources} statement where
  54  * the {@code StructuredTaskScope} is opened as a resource and then closed automatically.
  55  * The code in the block uses the {@link #fork(Callable) fork} method to fork subtasks.
  56  * After forking, it uses the {@link #join() join} method to wait for all subtasks to
  57  * finish (or some other outcome) as a single operation. Forking a subtask starts a new
  58  * {@link Thread} to run the subtask. The thread executing the main task does not continue
  59  * beyond the {@code close} method until all threads started to execute subtasks have finished.
  60  * To ensure correct usage, the {@code fork}, {@code join} and {@code close} methods may
  61  * only be invoked by the <em>owner thread</em> (the thread that opened the {@code
  62  * StructuredTaskScope}), and the {@code close} method throws an exception after closing
  63  * if the owner did not invoke the {@code join} method.
  64  *
  65  * <p> A {@code StructuredTaskScope} is opened with a {@link Joiner} that handles subtask
  66  * completion and produces the outcome (the result or an exception) for the {@link #join()
  67  * join} method. The {@code Joiner} interface defines static methods to create a
  68  * {@code Joiner} for common cases.









  69  *
  70  * <p> A {@code Joiner} may <a id="CancelExecution"><em>cancel execution</em></a>
  71  * (sometimes called "short-circuiting") when some condition is reached that does not
  72  * require the result of subtasks that are still executing. Cancelling execution prevents
  73  * new threads from being started to execute further subtasks, {@linkplain Thread#interrupt()
  74  * interrupts} the threads executing subtasks that have not completed, and causes the
  75  * {@code join} method to wakeup with a result (or exception). The {@link #close() close}
  76  * method always waits for threads executing subtasks to finish, even if execution is
  77  * cancelled, so it cannot continue beyond the {@code close} method until the interrupted
  78  * threads finish. Subtasks should be coded so that they finish as soon as possible when
  79  * interrupted. Subtasks that block on methods that are not interruptible may delay the
  80  * closing of a task scope.
  81  *
  82  * <p> Consider the example of a main task that splits into two subtasks to concurrently
  83  * fetch resources from two URL locations "left" and "right". Both subtasks may complete
  84  * successfully, one subtask may succeed and the other may fail, or both subtasks may
  85  * fail. In this example, the code in the main task is interested in the result from the
  86  * first subtask to complete successfully. The example uses {@link
  87  * Joiner#anySuccessfulResultOrThrow() Joiner.anySuccessfulResultOrThrow()} to
  88  * create a {@code Joiner} that makes available the result of the first subtask to
  89  * complete successfully. The type parameter in the example is "{@code String}" so that
  90  * only subtasks that return a {@code String} can be forked.
  91  * {@snippet lang=java :
  92  *    // @link substring="open" target="#open(Policy)" :
  93  *    try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow())) {
  94  *
  95  *        scope.fork(() -> query(left));  // @link substring="fork" target="#fork(Callable)"
  96  *        scope.fork(() -> query(right));
  97  *
  98  *        // throws if both subtasks fail
  99  *        String firstResult = scope.join();   // @link substring="join" target="#join()"
 100  *
 101  *    // @link substring="close" target="#close()" :
 102  *    } // close
 103  * }






 104  *
 105  * <p> In the example, the main task forks the two subtasks, then waits in the {@code
 106  * join} method for either subtask to complete successfully or for both subtasks to fail.
 107  * If one of the subtasks completes successfully then the other subtask is cancelled (by
 108  * way of interrupting the thread executing the subtask), and the {@code join} method
 109  * returns the result from the first subtask. Cancelling the other subtask avoids the
 110  * main task waiting for a result that it doesn't care about. If both subtasks fail then
 111  * the {@code join} method throws {@link ExecutionException} with the exception from one
 112  * of the subtasks as the {@linkplain Throwable#getCause() cause}.
 113  *
 114  * <p> Now consider another example that also splits into two subtasks to concurrently
 115  * fetch resources. One of the subtasks returns a {@code String} when it succeeds, the
 116  * other returns an {@code Integer}. The main task in this example is interested in the
 117  * successful result from both subtasks. It uses {@link Joiner#awaitAllSuccessfulOrThrow()
 118  * Joiner.awaitAllSuccessfulOrThrow()} to create a {@code Joiner} that cancels
 119  * execution and causes {@code join} to throw if any subtask fails.
 120  * {@snippet lang=java :
 121  *    try (var scope = StructuredTaskScope.open(Joiner.awaitAllSuccessfulOrThrow())) {
 122  *
 123  *        // @link substring="Subtask" target="Subtask" :
 124  *        Subtask<String> subtask1 = scope.fork(() -> query(left));
 125  *        Subtask<Integer> subtask2 = scope.fork(() -> query(right));
 126  *
 127  *        // throws if either subtask fails
 128  *        scope.join();

 129  *
 130  *        // both subtasks completed successfully
 131  *        return new MyResult(subtask1.get(), subtask2.get()); // @link substring="get" target="Subtask#get()"
 132  *
 133  *    }
 134  * }


 135  *
 136  * <p> In this example, the main task forks the two subtasks. The {@code fork} method
 137  * returns a {@link Subtask Subtask} that is a handle to the forked subtask. The main task
 138  * waits in the {@code join} method for both subtasks to complete successfully or for either
 139  * subtask to fail. If both subtasks complete successfully then the {@code join} method
 140  * completes and the main task uses the {@link Subtask#get() Subtask.get()} method to get
 141  * the result of each subtask. If either subtask fails then the other is cancelled (by way
 142  * of interrupting the thread executing the subtask) and the {@code join} throws {@link
 143  * ExecutionException} with the exception from the failed subtask as the {@linkplain
 144  * Throwable#getCause() cause}.
 145  *
 146  * <p> Whether code uses the {@code Subtask} returned from {@code fork} will depend on
 147  * the {@code Joiner} and usage. Some {@code Joiner} implementations are suited to subtasks
 148  * that return results of the same type and where the {@code join} method returns a result
 149  * for the main task to use. Code that forks subtasks that return results of different
 150  * types, and uses a {@code Joiner} such as {@code Joiner.awaitAllSuccessfulOrThrow()} that
 151  * does not return a result, will use {@link Subtask#get() Subtask.get()} after joining.
 152  *
 153  * <h2>Exception handling</h2>
 154  *
 155  * <p> A {@code StructuredTaskScope} is opened with a {@link Joiner Joiner} that
 156  * handles subtask completion and produces the outcome for the {@link #join() join} method.
 157  * In some cases, the outcome will be a result, in other cases it will be an exception.
 158  * If the outcome is an exception then the {@code join} method throws {@link
 159  * ExecutionException} with the exception as the {@linkplain Throwable#getCause()
 160  * cause}. For many {@code Joiner} implementations, the exception will be an exception
 161  * thrown by a subtask that failed. In the case of {@link Joiner#allSuccessfulOrThrow()
 162  * allSuccessfulOrThrow} and {@link Joiner#awaitAllSuccessfulOrThrow() awaitAllSuccessfulOrThrow}
 163  * for example, the exception is from the first subtask to fail.







 164  *
 165  * <p> Many of the details for how exceptions are handled will depend on usage. In some
 166  * cases, the {@code join} method will be called in a {@code try-catch} block to catch
 167  * {@code ExecutionException} and handle the cause. The exception handling may use
 168  * {@code instanceof} with pattern matching to handle specific causes. In some cases it
 169  * may not be useful to catch {@code ExecutionException} but instead leave it to propagate
 170  * to the configured {@linkplain Thread.UncaughtExceptionHandler uncaught exception handler}
 171  * for logging purposes.




 172  *
 173  * <p> For cases where a specific exception triggers the use of a default result then it
 174  * may be more appropriate to handle this in the subtask itself rather than the subtask
 175  * failing and code in the main task handling the exception.
 176  *
 177  * <h2>Configuration</h2>
 178  *
 179  * A {@code StructuredTaskScope} is opened with {@linkplain Config configuration} that
 180  * consists of a {@link ThreadFactory} to create threads, an optional name for monitoring
 181  * and management purposes, and an optional timeout.
 182  *
 183  * <p> The 1-arg {@link #open(Joiner) open} method creates a {@code StructuredTaskScope}
 184  * with the <a id="DefaultConfiguration"> <em>default configuration</em></a>. The default
 185  * configuration has a {@code ThreadFactory} that creates unnamed
 186  * <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>,
 187  * is unnamed for monitoring and management purposes, and has no timeout.










 188  *
 189  * <p> The 2-arg {@link #open(Joiner, Function) open} method can be used to create a
 190  * {@code StructuredTaskScope} that uses a different {@code ThreadFactory}, has a name for
 191  * the purposes of monitoring and management, or has a timeout that cancels execution if
 192  * the timeout expires before or while waiting for subtasks to complete. The {@code open}
 193  * method is called with a {@linkplain Function function} that is applied to the default
 194  * configuration and returns a {@link Config Config} for the {@code StructuredTaskScope}
 195  * under construction.
 196  *
 197  * <p> The following example opens a new {@code StructuredTaskScope} with a {@code
 198  * ThreadFactory} that creates virtual threads {@linkplain Thread#setName(String) named}
 199  * "duke-0", "duke-1" ...
 200  * {@snippet lang = java:
 201  *    // @link substring="name" target="Thread.Builder#name(String, long)" :
 202  *    ThreadFactory factory = Thread.ofVirtual().name("duke-", 0).factory();
 203  *
 204  *    // @link substring="withThreadFactory" target="Config#withThreadFactory(ThreadFactory)" :
 205  *    try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) {
 206  *
 207  *        scope.fork( .. );   // runs in a virtual thread with name "duke-0"
 208  *        scope.fork( .. );   // runs in a virtual thread with name "duke-1"
 209  *
 210  *        scope.join();



 211  *

 212  *     }
 213  *}
















 214  *
 215  * <p> A second example sets a timeout, represented by a {@link Duration}. The timeout
 216  * starts when the new task scope is opened. If the timeout expires before the {@code join}
 217  * method has completed then <a href="#CancelExecution">execution is cancelled</a>. This
 218  * interrupts the threads executing the two subtasks and causes the {@link #join() join}
 219  * method to throw {@link ExecutionException} with {@link TimeoutException} as the cause.
 220  * {@snippet lang=java :
 221  *    Duration timeout = Duration.ofSeconds(10);

 222  *
 223  *    // @link substring="allSuccessfulOrThrow" target="Joiner#allSuccessfulOrThrow()" :
 224  *    try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
 225  *    // @link substring="withTimeout" target="Config#withTimeout(Duration)" :
 226  *                                              cf -> cf.withTimeout(timeout))) {


 227  *
 228  *        scope.fork(() -> query(left));
 229  *        scope.fork(() -> query(right));



 230  *
 231  *        List<String> result = scope.join()
 232  *                                   .map(Subtask::get)
 233  *                                   .toList();
 234  *
 235  *   }

 236  * }



 237  *
 238  * <h2>Inheritance of scoped value bindings</h2>
 239  *
 240  * {@link ScopedValue} supports the execution of a method with a {@code ScopedValue} bound
 241  * to a value for the bounded period of execution of the method by the <em>current thread</em>.
 242  * It allows a value to be safely and efficiently shared to methods without using method
 243  * parameters.
 244  *
 245  * <p> When used in conjunction with a {@code StructuredTaskScope}, a {@code ScopedValue}
 246  * can also safely and efficiently share a value to methods executed by subtasks forked
 247  * in the task scope. When a {@code ScopedValue} object is bound to a value in the thread
 248  * executing the main task then that binding is inherited by the threads created to
 249  * execute the subtasks. The thread executing the main task does not continue beyond the
 250  * {@link #close() close} method until all threads executing the subtasks have finished.
 251  * This ensures that the {@code ScopedValue} is not reverted to being {@linkplain
 252  * ScopedValue#isBound() unbound} (or its previous value) while subtasks are executing.
 253  * In addition to providing a safe and efficient means to inherit a value into subtasks,
 254  * the inheritance allows sequential code using {@code ScopedValue} be refactored to use
 255  * structured concurrency.
 256  *
 257  * <p> To ensure correctness, opening a new {@code StructuredTaskScope} captures the
 258  * current thread's scoped value bindings. These are the scoped values bindings that are
 259  * inherited by the threads created to execute subtasks in the task scope. Forking a
 260  * subtask checks that the bindings in effect at the time that the subtask is forked
 261  * match the bindings when the {@code StructuredTaskScope} was created. This check ensures
 262  * that a subtask does not inherit a binding that is reverted in the main task before the
 263  * subtask has completed.
 264  *
 265  * <p> A {@code ScopedValue} that is shared across threads requires that the value be an
 266  * immutable object or for all access to the value to be appropriately synchronized.




 267  *
 268  * <p> The following example demonstrates the inheritance of scoped value bindings. The
 269  * scoped value USERNAME is bound to the value "duke" for the bounded period of a lambda
 270  * expression by the thread executing it. The code in the block opens a {@code
 271  * StructuredTaskScope} and forks two subtasks, it then waits in the {@code join} method
 272  * and aggregates the results from both subtasks. If code executed by the threads
 273  * running subtask1 and subtask2 uses {@link ScopedValue#get()}, to get the value of
 274  * USERNAME, then value "duke" will be returned.
 275  * {@snippet lang=java :
 276  *     // @link substring="newInstance" target="ScopedValue#newInstance()" :
 277  *     private static final ScopedValue<String> USERNAME = ScopedValue.newInstance();
 278  *
 279  *     // @link substring="callWhere" target="ScopedValue#callWhere" :
 280  *     Result result = ScopedValue.callWhere(USERNAME, "duke", () -> {

 281  *
 282  *         try (var scope = StructuredTaskScope.open(Joiner.awaitAllSuccessfulOrThrow())) {



 283  *
 284  *             Subtask<String> subtask1 = scope.fork( .. );    // inherits binding
 285  *             Subtask<Integer> subtask2 = scope.fork( .. );   // inherits binding
 286  *
 287  *             scope.join();
 288  *             return new MyResult(subtask1.get(), subtask2.get());
 289  *         }
 290  *
 291  *     });
 292  * }
 293  *
 294  * <p> A scoped value inherited into a subtask may be
 295  * <a href="{@docRoot}/java.base/java/lang/ScopedValues.html#rebind">rebound</a> to a new
 296  * value in the subtask for the bounded execution of some method executed in the subtask.
 297  * When the method completes, the value of the {@code ScopedValue} reverts to its previous
 298  * value, the value inherited from the thread executing the main task.
 299  *
 300  * <p> A subtask may execute code that itself opens a new {@code StructuredTaskScope}.
 301  * A main task executing in thread T1 opens a {@code StructuredTaskScope} and forks a
 302  * subtask that runs in thread T2. The scoped value bindings captured when T1 opens the
 303  * task scope are inherited into T2. The subtask (in thread T2) executes code that opens a
 304  * new {@code StructuredTaskScope} and forks a subtask that runs in thread T3. The scoped
 305  * value bindings captured when T2 opens the task scope are inherited into T3. These
 306  * include (or may be the same) as the bindings that were inherited from T1. In effect,
 307  * scoped values are inherited into a tree of subtasks, not just one level of subtask.
 308  *
 309  * <h2>Memory consistency effects</h2>
 310  *
 311  * <p> Actions in the owner thread of a {@code StructuredTaskScope} prior to
 312  * {@linkplain #fork forking} of a subtask
 313  * <a href="{@docRoot}/java.base/java/util/concurrent/package-summary.html#MemoryVisibility">
 314  * <i>happen-before</i></a> any actions taken by that subtask, which in turn
 315  * <i>happen-before</i> the subtask result is {@linkplain Subtask#get() retrieved}.

 316  *
 317  * <h2>General exceptions</h2>
 318  *
 319  * <p> Unless otherwise specified, passing a {@code null} argument to a method in this
 320  * class will cause a {@link NullPointerException} to be thrown.
 321  *
 322  * @param <T> the result type of tasks executed in the task scope
 323  * @param <R> the type of the result returned by the join method
 324  *
 325  * @jls 17.4.5 Happens-before Order
 326  * @since 21
 327  */
 328 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 329 public class StructuredTaskScope<T, R> implements AutoCloseable {
 330     private static final VarHandle CANCELLED;
 331     static {
 332         try {
 333             MethodHandles.Lookup l = MethodHandles.lookup();
 334             CANCELLED = l.findVarHandle(StructuredTaskScope.class,"cancelled", boolean.class);
 335         } catch (Exception e) {
 336             throw new ExceptionInInitializerError(e);
 337         }
 338     }
 339 
 340     private final Joiner<? super T, ? extends R> joiner;
 341     private final ThreadFactory threadFactory;
 342     private final ThreadFlock flock;

 343 
 344     // fields that are only accessed by owner thread
 345     private boolean needToJoin;     // set by fork to indicate that owner must join
 346     private boolean joined;         // set to true when owner joins
 347     private boolean closed;
 348     private Future<?> timerTask;
 349 
 350     // set or read by any thread
 351     private volatile boolean cancelled;
 352 
 353     // set by the timer thread, read by the owner thread
 354     private volatile boolean timeoutExpired;
 355 
 356     /**
 357      * Throws IllegalStateException if the task scope is closed.
 358      */
 359     private void ensureOpen() {
 360         assert Thread.currentThread() == flock.owner();
 361         if (closed) {
 362             throw new IllegalStateException("Task scope is closed");
 363         }
 364     }
 365 
 366     /**
 367      * Throws WrongThreadException if the current thread is not the owner thread.
 368      */
 369     private void ensureOwner() {
 370         if (Thread.currentThread() != flock.owner()) {
 371             throw new WrongThreadException("Current thread not owner");
 372         }
 373     }
 374 
 375     /**
 376      * Throws IllegalStateException if invoked by the owner thread and the owner thread
 377      * has not joined.
 378      */
 379     private void ensureJoinedIfOwner() {
 380         if (Thread.currentThread() == flock.owner() && !joined) {
 381             String msg = needToJoin ? "Owner did not join" : "join did not complete";
 382             throw new IllegalStateException(msg);
 383         }
 384     }
 385 
 386     /**
 387      * Interrupts all threads in this task scope, except the current thread.
 388      */
 389     private void implInterruptAll() {
 390         flock.threads()
 391                 .filter(t -> t != Thread.currentThread())
 392                 .forEach(t -> {
 393                     try {
 394                         t.interrupt();
 395                     } catch (Throwable ignore) { }
 396                 });
 397     }
 398 
 399     @SuppressWarnings("removal")
 400     private void interruptAll() {
 401         if (System.getSecurityManager() == null) {
 402             implInterruptAll();
 403         } else {
 404             PrivilegedAction<Void> pa = () -> {
 405                 implInterruptAll();
 406                 return null;
 407             };
 408             AccessController.doPrivileged(pa);
 409         }
 410     }
 411 
 412     /**
 413      * Cancel exception if not already cancelled.
 414      */
 415     private void cancelExecution() {
 416         if (!cancelled && CANCELLED.compareAndSet(this, false, true)) {
 417             // prevent new threads from starting
 418             flock.shutdown();
 419 
 420             // interrupt all unfinished threads
 421             interruptAll();
 422 
 423             // wakeup join
 424             flock.wakeup();
 425         }
 426     }
 427 
 428     /**
 429      * Schedules a task to cancel execution on timeout.
 430      */
 431     private void scheduleTimeout(Duration timeout) {
 432         assert Thread.currentThread() == flock.owner() && timerTask == null;
 433         timerTask = TimerSupport.schedule(timeout, () -> {
 434             if (!cancelled) {
 435                 timeoutExpired = true;
 436                 cancelExecution();
 437             }
 438         });
 439     }
 440 
 441     /**
 442      * Cancels the timer task if set.
 443      */
 444     private void cancelTimeout() {
 445         assert Thread.currentThread() == flock.owner();
 446         if (timerTask != null) {
 447             timerTask.cancel(false);
 448         }
 449     }
 450 
 451     /**
 452      * Invoked by the thread for a subtask when the subtask completes before execution
 453      * was cancelled.
 454      */
 455     private void onComplete(SubtaskImpl<? extends T> subtask) {
 456         assert subtask.state() != Subtask.State.UNAVAILABLE;
 457         if (joiner.onComplete(subtask)) {
 458             cancelExecution();
 459         }
 460     }
 461 
 462     /**
 463      * Initialize a new StructuredTaskScope.
 464      */
 465     @SuppressWarnings("this-escape")
 466     private StructuredTaskScope(Joiner<? super T, ? extends R> joiner,
 467                                 ThreadFactory threadFactory,
 468                                 String name) {
 469         this.joiner = joiner;
 470         this.threadFactory = threadFactory;
 471 
 472         if (name == null)
 473             name = Objects.toIdentityString(this);
 474         this.flock = ThreadFlock.open(name);
 475     }


 476 
 477     /**
 478      * Represents a subtask forked with {@link #fork(Callable)} or {@link #fork(Runnable)}.
 479      * @param <T> the result type
 480      * @since 21
 481      */
 482     @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 483     public sealed interface Subtask<T> extends Supplier<T> permits SubtaskImpl {








 484         /**
 485          * Represents the state of a subtask.
 486          * @see Subtask#state()
 487          * @since 21
 488          */
 489         @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 490         enum State {
 491             /**
 492              * The subtask result or exception is not available. This state indicates that
 493              * the subtask was forked but has not completed, it completed after execution
 494              * was cancelled, or it was forked after execution was cancelled (in which
 495              * case a thread was not created to execute the subtask).
 496              */
 497             UNAVAILABLE,
 498             /**
 499              * The subtask completed successfully. The {@link Subtask#get() Subtask.get()}
 500              * method can be used to get the result. This is a terminal state.

 501              */
 502             SUCCESS,
 503             /**
 504              * The subtask failed with an exception. The {@link Subtask#exception()
 505              * Subtask.exception()} method can be used to get the exception. This is a
 506              * terminal state.
 507              */
 508             FAILED,
 509         }
 510 
 511         /**
 512          * {@return the subtask state}
 513          */
 514         State state();
 515 
 516         /**
 517          * Returns the result of this subtask if it completed successfully. If
 518          * {@linkplain #fork(Callable) forked} to execute a value-returning task then the
 519          * result from the {@link Callable#call() call} method is returned. If
 520          * {@linkplain #fork(Runnable) forked} to execute a task that does not return a
 521          * result then {@code null} is returned.
 522          *
 523          * <p> Code executing in the scope owner thread can use this method to get the
 524          * result of a successful subtask only after it has {@linkplain #join() joined}.
 525          *
 526          * <p> Code executing in the {@code Joiner} {@link Joiner#onComplete(Subtask)
 527          * onComplete} method should test that the {@linkplain #state() subtask state} is
 528          * {@link State#SUCCESS SUCCESS} before using this method to get the result.
 529          *
 530          * @return the possibly-null result
 531          * @throws IllegalStateException if the subtask has not completed, did not complete
 532          * successfully, or the current thread is the task scope owner and it has not joined

 533          * @see State#SUCCESS
 534          */
 535         T get();
 536 
 537         /**
 538          * {@return the exception thrown by this subtask if it failed} If
 539          * {@linkplain #fork(Callable) forked} to execute a value-returning task then
 540          * the exception thrown by the {@link Callable#call() call} method is returned.
 541          * If {@linkplain #fork(Runnable) forked} to execute a task that does not return
 542          * a result then the exception thrown by the {@link Runnable#run() run} method is
 543          * returned.
 544          *
 545          * <p> Code executing in the scope owner thread can use this method to get the
 546          * exception thrown by a failed subtask only after it has {@linkplain #join() joined}.
 547          *
 548          * <p> Code executing in a {@code Joiner} {@link Joiner#onComplete(Subtask)
 549          * onComplete} method should test that the {@linkplain #state() subtask state} is
 550          * {@link State#FAILED FAILED} before using this method to get the exception.
 551          *
 552          * @throws IllegalStateException if the subtask has not completed, completed with
 553          * a result, or the current thread is the task scope owner and it has not joined

 554          * @see State#FAILED
 555          */
 556         Throwable exception();
 557     }
 558 
 559     /**
 560      * An object used with a {@link StructuredTaskScope} to handle subtask completion
 561      * and produce the result for a main task waiting in the {@link #join() join} method
 562      * for subtasks to complete.


 563      *
 564      * <p> Joiner defines static methods to create {@code Joiner} objects for common cases:
 565      * <ul>
 566      *   <li> {@link #allSuccessfulOrThrow() allSuccessfulOrThrow()} creates a {@code Joiner}
 567      *   that yields a stream of the completed subtasks for {@code join} to return when
 568      *   all subtasks complete successfully. It cancels execution and causes {@code join}
 569      *   to throw if any subtask fails.
 570      *   <li> {@link #anySuccessfulResultOrThrow() anySuccessfulResultOrThrow()} creates a
 571      *   {@code Joiner} that yields the result of the first subtask to succeed. It cancels
 572      *   execution and causes {@code join} to throw if all subtasks fail.
 573      *   <li> {@link #awaitAllSuccessfulOrThrow() awaitAllSuccessfulOrThrow()} creates a
 574      *   {@code Joiner} that waits for all successful subtasks. It cancels execution and
 575      *   causes {@code join} to throw if any subtask fails.
 576      *   <li> {@link #awaitAll() awaitAll()} creates a {@code Joiner} that waits for all
 577      *   subtasks. If does not cancel execution.
 578      * </ul>
 579      *
 580      * <p> In addition to the methods to create {@code Joiner} objects for common cases,
 581      * the {@link #all(Predicate) all(Predicate)} method is defined to create a {@code
 582      * Joiner} that yields a stream of all subtasks. It is created with a {@link
 583      * Predicate Predicate} that determines if execution should continue or be cancelled.
 584      * This {@code Joiner} can be built upon to create custom policies that cancel
 585      * execution based on some condition.
 586      *
 587      * <p> More advanced policies can be developed by implementing the {@code Joiner}
 588      * interface. The {@link #onFork(Subtask)} method is invoked when subtasks are forked.
 589      * The {@link #onComplete(Subtask)} method is invoked when subtasks complete with a
 590      * result or exception. These methods return a {@code boolean} to indicate if execution
 591      * should be cancelled. These methods can be used to collect subtasks, results, or
 592      * exceptions, and control when to cancel execution. The {@link #result()} method
 593      * must be implemented to produce the result (or exception) for the {@code join}
 594      * method.
 595      *
 596      * <p> Unless otherwise specified, passing a {@code null} argument to a method
 597      * in this class will cause a {@link NullPointerException} to be thrown.
 598      *
 599      * @implSpec Implementations of this interface must be thread safe. The {@link
 600      * #onComplete(Subtask)} method defined by this interface may be invoked by several
 601      * threads concurrently.
 602      *
 603      * @apiNote It is very important that a new {@code Joiner} object is created for each
 604      * {@code StructuredTaskScope}. {@code Joiner} objects should never be shared with
 605      * different task scopes or re-used after a task is closed.
 606      *
 607      * <p> Designing a {@code Joiner} should take into account the code at the use-site
 608      * where the results from the {@link StructuredTaskScope#join() join} method are
 609      * processed. It should be clear what the {@code Joiner} does vs. the application
 610      * code at the use-site. In general, the {@code Joiner} implementation is not the
 611      * place to code "business logic". A {@code Joiner} should be designed to be as
 612      * general purpose as possible.
 613      *
 614      * @param <T> the result type of tasks executed in the task scope
 615      * @param <R> the type of results returned by the join method
 616      * @since 24
 617      * @see #open(Joiner)
 618      */
 619     @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 620     @FunctionalInterface
 621     public interface Joiner<T, R> {
 622 
 623         /**
 624          * Invoked by {@link #fork(Callable) fork(Callable)} and {@link #fork(Runnable)
 625          * fork(Runnable)} when forking a subtask. The method is invoked from the task
 626          * owner thread. The method is invoked before a thread is created to run the
 627          * subtask.
 628          *
 629          * @implSpec The default implementation throws {@code NullPointerException} if the
 630          * subtask is {@code null}. It throws {@code IllegalArgumentException} if the
 631          * subtask is not in the {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state, it
 632          * otherwise returns {@code false}.
 633          *
 634          * @apiNote This method is invoked by the {@code fork} methods. It should not be
 635          * invoked directly.
 636          *
 637          * @param subtask the subtask
 638          * @return {@code true} to cancel execution
 639          */
 640         default boolean onFork(Subtask<? extends T> subtask) {
 641             if (subtask.state() != Subtask.State.UNAVAILABLE) {
 642                 throw new IllegalArgumentException();
 643             }
 644             return false;
 645         }
 646 
 647         /**
 648          * Invoked by the thread started to execute a subtask after the subtask completes
 649          * successfully or fails with an exception. This method is not invoked if a
 650          * subtask completes after execution has been cancelled.
 651          *
 652          * @implSpec The default implementation throws {@code NullPointerException} if the
 653          * subtask is {@code null}. It throws {@code IllegalArgumentException} if the
 654          * subtask is not in the {@link Subtask.State#SUCCESS SUCCESS} or {@link
 655          * Subtask.State#FAILED FAILED} state, it otherwise returns {@code false}.
 656          *
 657          * @apiNote This method is invoked by subtasks when they complete. It should not
 658          * be invoked directly.
 659          *
 660          * @param subtask the subtask
 661          * @return {@code true} to cancel execution
 662          */
 663         default boolean onComplete(Subtask<? extends T> subtask) {
 664             if (subtask.state() == Subtask.State.UNAVAILABLE) {
 665                 throw new IllegalArgumentException();
 666             }
 667             return false;
 668         }
 669 
 670         /**
 671          * Invoked by {@link #join()} to produce the result (or exception) after waiting
 672          * for all subtasks to complete or execution to be cancelled. The result from this
 673          * method is returned by the {@code join} method. If this method throws, then
 674          * {@code join} throws {@link ExecutionException} with the exception thrown by
 675          * this method as the cause.
 676          *
 677          * <p> In normal usage, this method will be called at most once to produce the
 678          * result (or exception). If the {@code join} method is called more than once
 679          * then this method may be called more than once to produce the result. An
 680          * implementation should return an equal result (or throw the same exception) on
 681          * second or subsequent calls to produce the outcome.
 682          *
 683          * @apiNote This method is invoked by the {@code join} method. It should not be
 684          * invoked directly.
 685          *
 686          * @return the result
 687          * @throws Throwable the exception
 688          */
 689         R result() throws Throwable;
 690 
 691         /**
 692          * {@return a new Joiner object that yields a stream of all subtasks when all
 693          * subtasks complete successfully, or throws if any subtask fails}
 694          * If any subtask fails then execution is cancelled.
 695          *
 696          * <p> If all subtasks complete successfully, the joiner's {@link Joiner#result()}
 697          * method returns a stream of all subtasks in the order that they were forked.
 698          * If any subtask failed then the {@code result} method throws the exception from
 699          * the first subtask to fail.
 700          *
 701          * @apiNote This joiner is intended for cases where the results for all subtasks
 702          * are required ("invoke all"); if any subtask fails then the results of other
 703          * unfinished subtasks are no longer needed. A typical usage will be when the
 704          * subtasks return results of the same type, the returned stream of forked
 705          * subtasks can be used to get the results.
 706          *
 707          * @param <T> the result type of subtasks
 708          */
 709         static <T> Joiner<T, Stream<Subtask<T>>> allSuccessfulOrThrow() {
 710             return new AllSuccessful<>();
 711         }
 712 
 713         /**
 714          * {@return a new Joiner object that yields the result of a subtask that completed
 715          * successfully, or throws if all subtasks fail} If any subtask completes
 716          * successfully then execution is cancelled.
 717          *
 718          * <p> The joiner's {@link Joiner#result()} method returns the result of a subtask
 719          * that completed successfully. If all subtasks fail then the {@code result} method
 720          * throws the exception from one of the failed subtasks. The {@code result} method
 721          * throws {@code NoSuchElementException} if no subtasks were forked.
 722          *
 723          * @apiNote This joiner is intended for cases where the result of any subtask will
 724          * do ("invoke any") and where the results of other unfinished subtasks are no
 725          * longer needed.
 726          *
 727          * @param <T> the result type of subtasks
 728          */
 729         static <T> Joiner<T, T> anySuccessfulResultOrThrow() {
 730             return new AnySuccessful<>();
 731         }
 732 
 733         /**
 734          * {@return a new Joiner object that waits for all successful subtasks. It
 735          * <a href="StructuredTaskScope.html#CancelExecution">cancels execution</a> if
 736          * any subtask fails}
 737          *
 738          * <p> The joiner's {@link Joiner#result() result} method returns {@code null}
 739          * if all subtasks complete successfully, or throws the exception from the first
 740          * subtask to fail.
 741          *
 742          * @apiNote This joiner is intended for cases where the results for all subtasks
 743          * are required ("invoke all"), and where the code {@linkplain #fork(Callable)
 744          * forking} subtasks keeps a reference to the {@linkplain Subtask Subtask} objects.
 745          * A typical usage will be when subtasks return results of different types.
 746          *
 747          * @param <T> the result type of subtasks
 748          */
 749         static <T> Joiner<T, Void> awaitAllSuccessfulOrThrow() {
 750             return new AwaitSuccessful<>();
 751         }
 752 
 753         /**
 754          * {@return a new Joiner object that waits for all subtasks}
 755          *
 756          * <p> The joiner's {@link Joiner#result() result} method returns {@code null}.
 757          *
 758          * @apiNote This joiner is intended for cases where subtasks make use of
 759          * <em>side-effects</em> rather than return results or fail with exceptions.
 760          * The {@link #fork(Runnable) fork(Runnable)} method can be used to fork subtasks
 761          * that do not return a result.
 762          *
 763          * @param <T> the result type of subtasks
 764          */
 765         static <T> Joiner<T, Void> awaitAll() {
 766             // ensure that new Joiner object is returned
 767             return new Joiner<T, Void>() {
 768                 @Override
 769                 public Void result() {
 770                     return null;
 771                 }
 772             };
 773         }
 774 
 775         /**
 776          * {@return a new Joiner object that yields a stream of all subtasks, cancelling
 777          * execution when evaluating a completed subtask with the given predicate returns
 778          * {@code true}}
 779          *
 780          * <p> The joiner's {@link Joiner#onComplete(Subtask)} method invokes the
 781          * predicate's {@link Predicate#test(Object) test} method with the subtask that
 782          * completed successfully or failed with an exception. If the {@code test} method
 783          * returns {@code true} then <a href="StructuredTaskScope.html#CancelExecution">
 784          * execution is cancelled</a>. The {@code test} method must be thread safe as it
 785          * may be invoked concurrently from several threads.
 786          *
 787          * <p> The joiner's {@link #result()} method returns the stream of all subtasks,
 788          * in fork order. The stream may contain subtasks that have completed
 789          * (in {@link Subtask.State#SUCCESS SUCCESS} or {@link Subtask.State#FAILED FAILED}
 790          * state) or subtasks in the {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state
 791          * if execution was cancelled before all subtasks were forked or completed.
 792          *
 793          * <p> The following example uses this method to create a {@code Joiner} that
 794          * <a href="StructuredTaskScope.html#CancelExecution">cancels execution</a> when
 795          * two or more subtasks fail.
 796          * {@snippet lang=java :
 797          *    class CancelAfterTwoFailures<T> implements Predicate<Subtask<? extends T>> {
 798          *         private final AtomicInteger failedCount = new AtomicInteger();
 799          *         @Override
 800          *         public boolean test(Subtask<? extends T> subtask) {
 801          *             return subtask.state() == Subtask.State.FAILED
 802          *                     && failedCount.incrementAndGet() >= 2;
 803          *         }
 804          *     }
 805          *
 806          *     var joinPolicy = Joiner.all(new CancelAfterTwoFailures<String>());
 807          * }
 808          *
 809          * @param isDone the predicate to evaluate completed subtasks
 810          * @param <T> the result type of subtasks
 811          */
 812         static <T> Joiner<T, Stream<Subtask<T>>> all(Predicate<Subtask<? extends T>> isDone) {
 813             return new AllSubtasks<>(isDone);
 814         }
 815     }
 816 
 817     /**
 818      * Represents the configuration for a {@code StructuredTaskScope}.


 819      *
 820      * <p> The configuration for a {@code StructuredTaskScope} consists of a {@link
 821      * ThreadFactory} to create threads, an optional name for the purposes of monitoring
 822      * and management, and an optional timeout.
 823      *
 824      * <p> Creating a {@code StructuredTaskScope} with its 1-arg {@link #open(Joiner) open}
 825      * method uses the <a href="StructuredTaskScope.html#DefaultConfiguration">default
 826      * configuration</a>. The default configuration consists of a thread factory that
 827      * creates unnamed <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">
 828      * virtual threads</a>, no name for monitoring and management purposes, and no timeout.
 829      *
 830      * <p> Creating a {@code StructuredTaskScope} with its 2-arg {@link #open(Joiner, Function)
 831      * open} method allows a different configuration to be used. The function specified
 832      * to the {@code open} method is applied to the default configuration and returns the
 833      * configuration for the {@code StructuredTaskScope} under construction. The function
 834      * can use the {@code with-} prefixed methods defined here to specify the components
 835      * of the configuration to use.
 836      *
 837      * <p> Unless otherwise specified, passing a {@code null} argument to a method
 838      * in this class will cause a {@link NullPointerException} to be thrown.
 839      *
 840      * @since 24
 841      */
 842     @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 843     public sealed interface Config permits ConfigImpl {
 844         /**
 845          * {@return a new {@code Config} object with the given thread factory}
 846          * The other components are the same as this object. The thread factory is used by
 847          * a task scope to create threads when {@linkplain #fork(Callable) forking} subtasks.
 848          * @param threadFactory the thread factory
 849          *
 850          * @apiNote The thread factory will typically create
 851          * <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>,
 852          * maybe with names for monitoring purposes, an {@linkplain Thread.UncaughtExceptionHandler
 853          * uncaught exception handler}, or other properties configured.
 854          *
 855          * @see #fork(Callable)
 856          */
 857         Config withThreadFactory(ThreadFactory threadFactory);
 858 
 859         /**
 860          * {@return a new {@code Config} object with the given name}
 861          * The other components are the same as this object. A task scope is optionally
 862          * named for the purposes of monitoring and management.
 863          * @param name the name
 864          * @see StructuredTaskScope#toString()
 865          */
 866         Config withName(String name);
 867 
 868         /**
 869          * {@return a new {@code Config} object with the given timeout}
 870          * The other components are the same as this object.
 871          * @param timeout the timeout
 872          *
 873          * @apiNote Applications using deadlines, expressed as an {@link java.time.Instant},
 874          * can use {@link Duration#between Duration.between(Instant.now(), deadline)} to
 875          * compute the timeout for this method.
 876          *
 877          * @see #join()
 878          */
 879         Config withTimeout(Duration timeout);
 880     }
 881 
 882     /**
 883      * Opens a new structured task scope to use the given {@code Joiner} object and with
 884      * configuration that is the result of applying the given function to the
 885      * <a href="#DefaultConfiguration">default configuration</a>.
 886      *
 887      * <p> The {@code configFunction} is called with the default configuration and returns
 888      * the configuration for the new structured task scope. The function may, for example,
 889      * set the {@linkplain Config#withThreadFactory(ThreadFactory) ThreadFactory} or set
 890      * a {@linkplain Config#withTimeout(Duration) timeout}.
 891      *
 892      * <p> If a {@linkplain Config#withThreadFactory(ThreadFactory) ThreadFactory} is set
 893      * then the {@code ThreadFactory}'s {@link ThreadFactory#newThread(Runnable) newThread}
 894      * method will be used to create threads when forking subtasks in this task scope.
 895      *
 896      * <p> If a {@linkplain Config#withTimeout(Duration) timeout} is set then it starts
 897      * when the task scope is opened. If the timeout expires before the task scope has
 898      * {@linkplain #join() joined} then execution is cancelled and the {@code join} method
 899      * throws {@link ExecutionException} with {@link TimeoutException} as the cause.
 900      *
 901      * <p> The new task scope is owned by the current thread. Only code executing in this
 902      * thread can {@linkplain #fork(Callable) fork}, {@linkplain #join() join}, or
 903      * {@linkplain #close close} the task scope.
 904      *
 905      * <p> Construction captures the current thread's {@linkplain ScopedValue scoped
 906      * value} bindings for inheritance by threads started in the task scope.
 907      *
 908      * @param joiner the joiner
 909      * @param configFunction a function to produce the configuration
 910      * @return a new task scope
 911      * @param <T> the result type of tasks executed in the task scope
 912      * @param <R> the type of the result returned by the join method
 913      * @since 24
 914      */
 915     public static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner,
 916                                                         Function<Config, Config> configFunction) {
 917         Objects.requireNonNull(joiner);
 918 
 919         var config = (ConfigImpl) configFunction.apply(ConfigImpl.defaultConfig());
 920         var scope = new StructuredTaskScope<T, R>(joiner, config.threadFactory(), config.name());
 921 
 922         // schedule timeout
 923         Duration timeout = config.timeout();
 924         if (timeout != null) {
 925             boolean done = false;
 926             try {
 927                 scope.scheduleTimeout(timeout);
 928                 done = true;
 929             } finally {
 930                 if (!done) {
 931                     scope.close();  // pop if scheduling timeout failed
 932                 }
 933             }
 934         }
 935 
 936         return scope;
 937     }
 938 
 939     /**
 940      * Opens a new structured task scope to use the given {@code Joiner} object. The
 941      * task scope is created with the <a href="#DefaultConfiguration">default configuration</a>.
 942      * The default configuration has a {@code ThreadFactory} that creates unnamed
 943      * <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>,
 944      * is unnamed for monitoring and management purposes, and has no timeout.
 945      *
 946      * @implSpec
 947      * This factory method is equivalent to invoking the 2-arg open method with the given
 948      * joiner and the {@linkplain Function#identity() identity function}.


 949      *
 950      * @param joiner the joiner
 951      * @return a new task scope
 952      * @param <T> the result type of tasks executed in the task scope
 953      * @param <R> the type of the result returned by the join method
 954      * @since 24
 955      */
 956     public static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner) {
 957         return open(joiner, Function.identity());
 958     }
 959 
 960     /**
 961      * Starts a new thread in this task scope to execute a value-returning task, thus
 962      * creating a <em>subtask</em>. The value-returning task is provided to this method
 963      * as a {@link Callable}, the thread executes the task's {@link Callable#call() call}
 964      * method.
 965      *
 966      * <p> This method first creates a {@link Subtask Subtask} to represent the <em>forked
 967      * subtask</em>. It invokes the joiner's {@link Joiner#onFork(Subtask) onFork} method
 968      * with the {@code Subtask} object. If the {@code onFork} completes with an exception
 969      * or error then it is propagated by the {@code fork} method. If execution is
 970      * {@linkplain #isCancelled() cancelled}, or {@code onFork} returns {@code true} to
 971      * cancel execution, then this method returns the {@code Subtask} (in the {@link
 972      * Subtask.State#UNAVAILABLE UNAVAILABLE} state) without creating a thread to execute
 973      * the subtask. If execution is not cancelled then a thread is created with the
 974      * {@link ThreadFactory} configured when the task scope was created, and the thread is
 975      * started. Forking a subtask inherits the current thread's {@linkplain ScopedValue
 976      * scoped value} bindings. The bindings must match the bindings captured when the
 977      * task scope was opened. If the subtask completes (successfully or with an exception)
 978      * before execution is cancelled, then the thread invokes the joiner's
 979      * {@link Joiner#onComplete(Subtask) onComplete} method with subtask in the
 980      * {@link Subtask.State#SUCCESS SUCCESS} or {@link Subtask.State#FAILED FAILED} state.
 981      *
 982      * <p> This method returns the {@link Subtask Subtask} object. In some usages, this
 983      * object may be used to get its result. In other cases it may be used for correlation
 984      * or just discarded. To ensure correct usage, the {@link Subtask#get() Subtask.get()}
 985      * method may only be called by the task scope owner to get the result after it has
 986      * waited for subtasks to complete with the {@link #join() join} method and the subtask
 987      * completed successfully. Similarly, the {@link Subtask#exception() Subtask.exception()}
 988      * method may only be called by the task scope owner after it has joined and the subtask
 989      * failed. If execution was cancelled before the subtask was forked, or before it
 990      * completes, then neither method can be used to obtain the outcome.
 991      *
 992      * <p> This method may only be invoked by the task scope owner.


 993      *
 994      * @param task the value-returning task for the thread to execute
 995      * @param <U> the result type
 996      * @return the subtask
 997      * @throws IllegalStateException if this task scope is closed or the owner has already
 998      * joined
 999      * @throws WrongThreadException if the current thread is not the task scope owner
1000      * @throws StructureViolationException if the current scoped value bindings are not
1001      * the same as when the task scope was created
1002      * @throws RejectedExecutionException if the thread factory rejected creating a
1003      * thread to run the subtask
1004      */
1005     public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
1006         Objects.requireNonNull(task);
1007         ensureOwner();
1008         ensureOpen();
1009         if (joined) {
1010             throw new IllegalStateException("Already joined");
1011         }
1012 
1013         var subtask = new SubtaskImpl<U>(this, task);
1014 
1015         // notify joiner, even if cancelled
1016         if (joiner.onFork(subtask)) {
1017             cancelExecution();
1018         }
1019 
1020         if (!cancelled) {

1021             // create thread to run task
1022             Thread thread = threadFactory.newThread(subtask);
1023             if (thread == null) {
1024                 throw new RejectedExecutionException("Rejected by thread factory");
1025             }
1026 
1027             // attempt to start the thread
1028             try {
1029                 flock.start(thread);
1030             } catch (IllegalStateException e) {
1031                 // shutdown by another thread, or underlying flock is shutdown due
1032                 // to unstructured use
1033             }
1034         }
1035 
1036         needToJoin = true;





1037         return subtask;
1038     }
1039 
1040     /**
1041      * Starts a new thread in this task scope to execute a task that does not return a
1042      * result, creating a <em>subtask</em>.

























1043      *
1044      * <p> This method works exactly the same as {@link #fork(Callable)} except that
1045      * the task is provided to this method as a {@link Runnable}, the thread executes
1046      * the task's {@link Runnable#run() run} method, and its result is {@code null}.



1047      *
1048      * @param task the task for the thread to execute
1049      * @return the subtask
1050      * @throws IllegalStateException if this task scope is closed or the owner has already
1051      * joined
1052      * @throws WrongThreadException if the current thread is not the task scope owner
1053      * @throws StructureViolationException if the current scoped value bindings are not
1054      * the same as when the task scope was created
1055      * @throws RejectedExecutionException if the thread factory rejected creating a
1056      * thread to run the subtask
1057      * @since 24
1058      */
1059     public Subtask<? extends T> fork(Runnable task) {
1060         Objects.requireNonNull(task);
1061         return fork(() -> { task.run(); return null; });




1062     }
1063 
1064     /**
1065      * Waits for all subtasks started in this task scope to complete or execution to be
1066      * cancelled. If a {@linkplain  Config#withTimeout(Duration) timeout} has been set
1067      * then execution will be cancelled if the timeout expires before or while waiting.
1068      * Once finished waiting, the {@code Joiner}'s {@link Joiner#result() result}
1069      * method is invoked to get the result or throw an exception. If the {@code result}
1070      * method throws then this method throws {@code ExecutionException} with the
1071      * exception thrown by the {@code result()} method as the cause.
1072      *
1073      * <p> This method waits for all subtasks by waiting for all threads {@linkplain
1074      * #fork(Callable) started} in this task scope to finish execution. It stops waiting
1075      * when all threads finish, the {@code Joiner}'s {@link Joiner#onFork(Subtask)
1076      * onFork} or {@link Joiner#onComplete(Subtask) onComplete} returns {@code true}
1077      * to cancel execution, the timeout (if set) expires, or the current thread is
1078      * {@linkplain Thread#interrupt() interrupted}.
1079      *
1080      * <p> This method may only be invoked by the task scope owner.
1081      *
1082      * @return the {@link Joiner#result() result}






1083      * @throws IllegalStateException if this task scope is closed
1084      * @throws WrongThreadException if the current thread is not the task scope owner
1085      * @throws ExecutionException if the joiner's {@code result} method throws, or with
1086      * cause {@link TimeoutException} if a timeout is set and the timeout expires
1087      * @throws InterruptedException if interrupted while waiting
1088      * @since 24











1089      */
1090     public R join() throws ExecutionException, InterruptedException {
1091         ensureOwner();
1092         ensureOpen();






1093 
1094         if (!joined) {
1095             // owner has attempted to join
1096             needToJoin = false;









1097 
1098             // wait for all threads, execution to be cancelled, or interrupt
1099             flock.awaitAll();








1100 
1101             // throw if timeout expired while waiting
1102             if (timeoutExpired) {
1103                 throw new ExecutionException(new TimeoutException());
1104             }
1105 
1106             // join completed successfully
1107             cancelTimeout();
1108             joined = true;
1109         }
1110 
1111         // invoke joiner to get result
1112         try {
1113             return joiner.result();
1114         } catch (Throwable e) {
1115             throw new ExecutionException(e);


1116         }
1117     }
1118 
1119     /**
1120      * {@return {@code true} if <a href="#CancelExecution">execution is cancelled</a>,
1121      * or in the process of being cancelled, otherwise {@code false}}






















1122      *
1123      * <p> Cancelling execution prevents new threads from starting in the task scope and
1124      * {@linkplain Thread#interrupt() interrupts} threads executing unfinished subtasks.
1125      * It may take some time before the interrupted threads finish execution; this
1126      * method may return {@code true} before all threads have been interrupted or before
1127      * all threads have finished.
1128      *
1129      * @apiNote A main task with a lengthy "forking phase" (the code that executes before
1130      * the main task invokes {@link #join() join}) may use this method to avoid doing work
1131      * in cases where execution was cancelled by the completion of a previously forked
1132      * subtask or timeout.

1133      *
1134      * @since 24














1135      */
1136     public boolean isCancelled() {
1137         return cancelled;
1138     }
1139 
1140     /**
1141      * Closes this task scope.
1142      *
1143      * <p> This method first <a href="#CancelExecution">cancels execution</a>, if not
1144      * already cancelled. This interrupts the threads executing unfinished subtasks. This
1145      * method then waits for all threads to finish. If interrupted while waiting then it
1146      * will continue to wait until the threads finish, before completing with the interrupt
1147      * status set.
1148      *
1149      * <p> This method may only be invoked by the task scope owner. If the task scope
1150      * is already closed then the task scope owner invoking this method has no effect.
1151      *
1152      * <p> A {@code StructuredTaskScope} is intended to be used in a <em>structured
1153      * manner</em>. If this method is called to close a task scope before nested task
1154      * scopes are closed then it closes the underlying construct of each nested task scope
1155      * (in the reverse order that they were created in), closes this task scope, and then
1156      * throws {@link StructureViolationException}.
1157      * Similarly, if this method is called to close a task scope while executing with
1158      * {@linkplain ScopedValue scoped value} bindings, and the task scope was created
1159      * before the scoped values were bound, then {@code StructureViolationException} is
1160      * thrown after closing the task scope.
1161      * If a thread terminates without first closing task scopes that it owns then
1162      * termination will cause the underlying construct of each of its open tasks scopes to
1163      * be closed. Closing is performed in the reverse order that the task scopes were
1164      * created in. Thread termination may therefore be delayed when the task scope owner
1165      * has to wait for threads forked in these task scopes to finish.
1166      *



1167      * @throws IllegalStateException thrown after closing the task scope if the task scope
1168      * owner did not attempt to join after forking
1169      * @throws WrongThreadException if the current thread is not the task scope owner
1170      * @throws StructureViolationException if a structure violation was detected
1171      */
1172     @Override
1173     public void close() {
1174         ensureOwner();
1175         if (closed) {

1176             return;
1177         }
1178 
1179         // cancel execution if not already joined
1180         if (!joined) {
1181             cancelExecution();
1182             cancelTimeout();
1183         }
1184 
1185         // wait for stragglers
1186         try {


1187             flock.close();
1188         } finally {
1189             closed = true;
1190         }
1191 
1192         // throw ISE if the owner didn't join after forking
1193         if (needToJoin) {
1194             needToJoin = false;
1195             throw new IllegalStateException("Owner did not join");
1196         }
1197     }
1198 
1199     /**
1200      * {@inheritDoc}  If a {@link Config#withName(String) name} for monitoring and
1201      * monitoring purposes has been set then the string representation includes the name.
1202      */
1203     @Override
1204     public String toString() {
1205         return flock.name();






1206     }
1207 
1208     /**
1209      * Subtask implementation, runs the task specified to the fork method.
1210      */
1211     private static final class SubtaskImpl<T> implements Subtask<T>, Runnable {
1212         private static final AltResult RESULT_NULL = new AltResult(Subtask.State.SUCCESS);
1213 
1214         private record AltResult(Subtask.State state, Throwable exception) {
1215             AltResult(Subtask.State state) {
1216                 this(state, null);
1217             }
1218         }
1219 
1220         private final StructuredTaskScope<? super T, ?> scope;
1221         private final Callable<? extends T> task;

1222         private volatile Object result;
1223 
1224         SubtaskImpl(StructuredTaskScope<? super T, ?> scope, Callable<? extends T> task) {


1225             this.scope = scope;
1226             this.task = task;

1227         }
1228 
1229         @Override
1230         public void run() {
1231             T result = null;
1232             Throwable ex = null;
1233             try {
1234                 result = task.call();
1235             } catch (Throwable e) {
1236                 ex = e;
1237             }
1238 
1239             // nothing to do if task scope is cancelled
1240             if (scope.isCancelled())
1241                 return;
1242 
1243             // set result/exception and invoke onComplete
1244             if (ex == null) {
1245                 this.result = (result != null) ? result : RESULT_NULL;
1246             } else {
1247                 this.result = new AltResult(State.FAILED, ex);
1248             }
1249             scope.onComplete(this);





1250         }
1251 
1252         @Override
1253         public Subtask.State state() {
1254             Object result = this.result;
1255             if (result == null) {
1256                 return State.UNAVAILABLE;
1257             } else if (result instanceof AltResult alt) {
1258                 // null or failed
1259                 return alt.state();
1260             } else {
1261                 return State.SUCCESS;
1262             }
1263         }
1264 
1265 
1266         @Override
1267         public T get() {
1268             scope.ensureJoinedIfOwner();
1269             Object result = this.result;
1270             if (result instanceof AltResult) {
1271                 if (result == RESULT_NULL) return null;
1272             } else if (result != null) {
1273                 @SuppressWarnings("unchecked")
1274                 T r = (T) result;
1275                 return r;
1276             }
1277             throw new IllegalStateException(
1278                     "Result is unavailable or subtask did not complete successfully");
1279         }
1280 
1281         @Override
1282         public Throwable exception() {
1283             scope.ensureJoinedIfOwner();
1284             Object result = this.result;
1285             if (result instanceof AltResult alt && alt.state() == State.FAILED) {
1286                 return alt.exception();
1287             }
1288             throw new IllegalStateException(
1289                     "Exception is unavailable or subtask did not complete with exception");
1290         }
1291 
1292         @Override
1293         public String toString() {
1294             String stateAsString = switch (state()) {
1295                 case UNAVAILABLE -> "[Unavailable]";
1296                 case SUCCESS     -> "[Completed successfully]";
1297                 case FAILED      -> {
1298                     Throwable ex = ((AltResult) result).exception();
1299                     yield "[Failed: " + ex + "]";
1300                 }
1301             };
1302             return Objects.toIdentityString(this) + stateAsString;
1303         }
1304     }
1305 
1306     /**
1307      * A joiner that returns a stream of all subtasks when all subtasks complete
1308      * successfully. If any subtask fails then execution is cancelled.














1309      */
1310     private static final class AllSuccessful<T> implements Joiner<T, Stream<Subtask<T>>> {



1311         private static final VarHandle FIRST_EXCEPTION;
1312         static {
1313             try {
1314                 MethodHandles.Lookup l = MethodHandles.lookup();
1315                 FIRST_EXCEPTION = l.findVarHandle(AllSuccessful.class, "firstException", Throwable.class);

1316             } catch (Exception e) {
1317                 throw new ExceptionInInitializerError(e);
1318             }
1319         }
1320         // list of forked subtasks, only accessed by owner thread
1321         private final List<Subtask<T>> subtasks = new ArrayList<>();
1322         private volatile Throwable firstException;
1323 
1324         @Override
1325         public boolean onFork(Subtask<? extends T> subtask) {
1326             @SuppressWarnings("unchecked")
1327             var tmp = (Subtask<T>) subtask;
1328             subtasks.add(tmp);
1329             return false;












1330         }
1331 
1332         @Override
1333         public boolean onComplete(Subtask<? extends T> subtask) {
1334             return (subtask.state() == Subtask.State.FAILED)
1335                     && (firstException == null)
1336                     && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());



1337         }
1338 
1339         @Override
1340         public Stream<Subtask<T>> result() throws Throwable {
1341             Throwable ex = firstException;
1342             if (ex != null) {
1343                 throw ex;
1344             } else {
1345                 return subtasks.stream();
1346             }
1347         }
1348     }
1349 
1350     /**
1351      * A joiner that returns the result of the first subtask to complete successfully.
1352      * If any subtask completes successfully then execution is cancelled.
1353      */
1354     private static final class AnySuccessful<T> implements Joiner<T, T> {
1355         private static final VarHandle FIRST_SUCCESS;
1356         private static final VarHandle FIRST_EXCEPTION;
1357         static {
1358             try {
1359                 MethodHandles.Lookup l = MethodHandles.lookup();
1360                 FIRST_SUCCESS = l.findVarHandle(AnySuccessful.class, "firstSuccess", Subtask.class);
1361                 FIRST_EXCEPTION = l.findVarHandle(AnySuccessful.class, "firstException", Throwable.class);
1362             } catch (Exception e) {
1363                 throw new ExceptionInInitializerError(e);
1364             }
1365         }
1366         private volatile Subtask<T> firstSuccess;
1367         private volatile Throwable firstException;
1368 
















1369         @Override
1370         public boolean onComplete(Subtask<? extends T> subtask) {
1371             if (firstSuccess == null) {
1372                 if (subtask.state() == Subtask.State.SUCCESS) {
1373                     // capture the first subtask that completes successfully
1374                     return FIRST_SUCCESS.compareAndSet(this, null, subtask);
1375                 } else if (firstException == null) {
1376                     // capture the exception thrown by the first task to fail
1377                     FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
1378                 }
1379             }
1380             return false;
1381         }
1382 

















1383         @Override
1384         public T result() throws Throwable {
1385             Subtask<T> firstSuccess = this.firstSuccess;
1386             if (firstSuccess != null) {
1387                 return firstSuccess.get();


















































1388             }
1389             Throwable firstException = this.firstException;
1390             if (firstException != null) {
1391                 throw firstException;
1392             } else {
1393                 throw new NoSuchElementException("No subtasks completed");

1394             }


1395         }
1396     }
1397 
1398     /**
1399      * A joiner that that waits for all successful subtasks. If any subtask fails the
1400      * execution is cancelled.













1401      */
1402     private static final class AwaitSuccessful<T> implements Joiner<T, Void> {

1403         private static final VarHandle FIRST_EXCEPTION;
1404         static {
1405             try {
1406                 MethodHandles.Lookup l = MethodHandles.lookup();
1407                 FIRST_EXCEPTION = l.findVarHandle(AwaitSuccessful.class, "firstException", Throwable.class);
1408             } catch (Exception e) {
1409                 throw new ExceptionInInitializerError(e);
1410             }
1411         }
1412         private volatile Throwable firstException;
1413 
1414         @Override
1415         public boolean onComplete(Subtask<? extends T> subtask) {
1416             return (subtask.state() == Subtask.State.FAILED)
1417                     && (firstException == null)
1418                     && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());













1419         }
1420 
1421         @Override
1422         public Void result() throws Throwable {
1423             Throwable ex = firstException;
1424             if (ex != null) {
1425                 throw ex;
1426             } else {
1427                 return null;
1428             }
1429         }
1430     }
1431 
1432     /**
1433      * A joiner that returns a stream of all subtasks.
1434      */
1435     private static class AllSubtasks<T> implements Joiner<T, Stream<Subtask<T>>> {
1436         private final Predicate<Subtask<? extends T>> isDone;
1437         // list of forked subtasks, only accessed by owner thread
1438         private final List<Subtask<T>> subtasks = new ArrayList<>();
1439 
1440         AllSubtasks(Predicate<Subtask<? extends T>> isDone) {
1441             this.isDone = Objects.requireNonNull(isDone);
1442         }
1443 
1444         @Override
1445         public boolean onFork(Subtask<? extends T> subtask) {
1446             @SuppressWarnings("unchecked")
1447             var tmp = (Subtask<T>) subtask;
1448             subtasks.add(tmp);
1449             return false;

1450         }
1451 















1452         @Override
1453         public boolean onComplete(Subtask<? extends T> subtask) {
1454             return isDone.test(subtask);

1455         }
1456 
















1457         @Override
1458         public Stream<Subtask<T>> result() {
1459             return subtasks.stream();



1460         }
1461     }
1462 
1463     /**
1464      * Implementation of Config.
1465      */
1466     private record ConfigImpl(ThreadFactory threadFactory,
1467                               String name,
1468                               Duration timeout) implements Config {
1469         static Config defaultConfig() {
1470             return new ConfigImpl(Thread.ofVirtual().factory(), null, null);





1471         }
1472 
1473         @Override
1474         public Config withThreadFactory(ThreadFactory threadFactory) {
1475             return new ConfigImpl(Objects.requireNonNull(threadFactory), name, timeout);









1476         }
1477 
1478         @Override
1479         public Config withName(String name) {
1480             return new ConfigImpl(threadFactory, Objects.requireNonNull(name), timeout);
1481         }
1482 
1483         @Override
1484         public Config withTimeout(Duration timeout) {
1485             return new ConfigImpl(threadFactory, name, Objects.requireNonNull(timeout));
1486         }
1487     }
1488 
1489     /**
1490      * Used to schedule a task to cancel exception when a timeout expires.
1491      */
1492     private static class TimerSupport {
1493         private static final ScheduledExecutorService DELAYED_TASK_SCHEDULER;
1494         static {
1495             ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor)
1496                 Executors.newScheduledThreadPool(1, task -> {
1497                     Thread t = InnocuousThread.newThread("StructuredTaskScope-Timer", task);
1498                     t.setDaemon(true);
1499                     return t;
1500                 });
1501             stpe.setRemoveOnCancelPolicy(true);
1502             DELAYED_TASK_SCHEDULER = stpe;
1503         }
1504 
1505         static Future<?> schedule(Duration timeout, Runnable task) {
1506             long nanos = TimeUnit.NANOSECONDS.convert(timeout);
1507             return DELAYED_TASK_SCHEDULER.schedule(task, nanos, TimeUnit.NANOSECONDS);
1508         }
1509     }
1510 }
--- EOF ---