1 /* 2 * Copyright (c) 2021, 2024, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.concurrent; 26 27 import java.lang.invoke.MethodHandles; 28 import java.lang.invoke.VarHandle; 29 import java.security.AccessController; 30 import java.security.PrivilegedAction; 31 import java.time.Duration; 32 import java.util.ArrayList; 33 import java.util.List; 34 import java.util.NoSuchElementException; 35 import java.util.Objects; 36 import java.util.function.Function; 37 import java.util.function.Predicate; 38 import java.util.function.Supplier; 39 import java.util.stream.Stream; 40 import jdk.internal.javac.PreviewFeature; 41 import jdk.internal.misc.InnocuousThread; 42 import jdk.internal.misc.ThreadFlock; 43 44 /** 45 * An API for <em>structured concurrency</em>. {@code StructuredTaskScope} supports cases 46 * where a main task splits into several concurrent subtasks, and where the subtasks must 47 * complete before the main task continues. A {@code StructuredTaskScope} can be used to 48 * ensure that the lifetime of a concurrent operation is confined by a <em>syntax block</em>, 49 * just like that of a sequential operation in structured programming. 50 * 51 * <p> {@code StructuredTaskScope} defines the static method {@link #open() open} to open 52 * a new {@code StructuredTaskScope} and the {@link #close() close} method to close it. 53 * The API is designed to be used with the {@code try-with-resources} statement where 54 * the {@code StructuredTaskScope} is opened as a resource and then closed automatically. 55 * The code in the block uses the {@link #fork(Callable) fork} method to fork subtasks. 56 * After forking, it uses the {@link #join() join} method to wait for all subtasks to 57 * finish (or some other outcome) as a single operation. Forking a subtask starts a new 58 * {@link Thread} to run the subtask. The thread executing the main task does not continue 59 * beyond the {@code close} method until all threads started to execute subtasks have finished. 60 * To ensure correct usage, the {@code fork}, {@code join} and {@code close} methods may 61 * only be invoked by the <em>owner thread</em> (the thread that opened the {@code 62 * StructuredTaskScope}), the {@code fork} method may not be called after {@code join}, 63 * the {@code join} method may only be invoked once, and the {@code close} method throws 64 * an exception after closing if the owner did not invoke the {@code join} method after 65 * forking subtasks. 66 * 67 * <p> As a first example, consider a main task that splits into two subtasks to concurrently 68 * fetch resources from two URL locations "left" and "right". Both subtasks may complete 69 * successfully, one subtask may succeed and the other may fail, or both subtasks may 70 * fail. The main task in this example is interested in the successful result from both 71 * subtasks. It waits in the {@link #join() join} method for both subtasks to complete 72 * successfully or for either subtask to fail. 73 * {@snippet lang=java : 74 * // @link substring="open" target="#open()" : 75 * try (var scope = StructuredTaskScope.open()) { 76 * 77 * // @link substring="fork" target="#fork(Callable)" : 78 * Subtask<String> subtask1 = scope.fork(() -> query(left)); 79 * Subtask<Integer> subtask2 = scope.fork(() -> query(right)); 80 * 81 * // throws if either subtask fails 82 * scope.join(); // @link substring="join" target="#join()" 83 * 84 * // both subtasks completed successfully 85 * // @link substring="get" target="Subtask#get()" : 86 * return new MyResult(subtask1.get(), subtask2.get()); 87 * 88 * // @link substring="close" target="#close()" : 89 * } // close 90 * } 91 * 92 * <p> If both subtasks complete successfully then the {@code join} method completes 93 * normally and the main task uses the {@link Subtask#get() Subtask.get()} method to get 94 * the result of each subtask. If one of the subtasks fails then the other subtask is 95 * cancelled (this will interrupt the thread executing the other subtask) and the {@code 96 * join} method throws {@link FailedException} with the exception from the failed subtask 97 * as the {@linkplain Throwable#getCause() cause}. 98 * 99 * <p> In the example, the subtasks produce results of different types ({@code String} and 100 * {@code Integer}). In other cases the subtasks may all produce results of the same type. 101 * If the example had used {@code StructuredTaskScope.<String>open()} then it could 102 * only be used to fork subtasks that return a {@code String} result. 103 * 104 * <h2>Joiners</h2> 105 * 106 * <p> In the example above, the main task fails if any subtask fails. If all subtasks 107 * succeed then the {@code join} method completes normally. Other policy and outcome is 108 * supported by creating a {@code StructuredTaskScope} with a {@link Joiner} that 109 * implements the desired policy. A {@code Joiner} handles subtask completion and produces 110 * the outcome for the {@link #join() join} method. In the example above, {@code join} 111 * returns {@code null}. Depending on the {@code Joiner}, {@code join} may return a 112 * result, a stream of elements, or some other object. The {@code Joiner} interface defines 113 * factory methods to create {@code Joiner}s for some common cases. 114 * 115 * <p> A {@code Joiner} may <a id="CancelExecution"><em>cancel execution</em></a> (sometimes 116 * called "short-circuiting") when some condition is reached that does not require the 117 * result of subtasks that are still executing. Cancelling execution prevents new threads 118 * from being started to execute further subtasks, {@linkplain Thread#interrupt() interrupts} 119 * the threads executing subtasks that have not completed, and causes the {@code join} 120 * method to wakeup with the outcome (result or exception). In the above example, the 121 * outcome is that {@code join} completes with a result of {@code null} when all subtasks 122 * succeed. It cancels execution if any of the subtasks fail, throwing the exception from 123 * the first subtask that fails. Other {@code Joiner} implementations may return an object 124 * instead of {@code null} and may cancel execution or throw based on some other policy. 125 * 126 * <p> To allow for cancelling execution, subtasks must be coded so that they 127 * finish as soon as possible when interrupted. Subtasks that do not respond to interrupt, 128 * e.g. block on methods that are not interruptible, may delay the closing of a task scope 129 * indefinitely. The {@link #close() close} method always waits for threads executing 130 * subtasks to finish, even if execution is cancelled, so execution cannot continue beyond 131 * the {@code close} method until the interrupted threads finish. 132 * 133 * <p> Now consider another example that also splits into two subtasks to concurrently 134 * fetch resources. In this example, the code in the main task is only interested in the 135 * result from the first subtask to complete successfully. The example uses {@link 136 * Joiner#anySuccessfulResultOrThrow() Joiner.anySuccessfulResultOrThrow()} to 137 * create a {@code Joiner} that makes available the result of the first subtask to 138 * complete successfully. The type parameter in the example is "{@code String}" so that 139 * only subtasks that return a {@code String} can be forked. 140 * {@snippet lang=java : 141 * // @link substring="open" target="#open(Policy)" : 142 * try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow())) { 143 * 144 * scope.fork(() -> query(left)); // @link substring="fork" target="#fork(Callable)" 145 * scope.fork(() -> query(right)); 146 * 147 * // throws if both subtasks fail 148 * String firstResult = scope.join(); 149 * 150 * } 151 * } 152 * 153 * <p> In the example, the main task forks the two subtasks, then waits in the {@code 154 * join} method for either subtask to complete successfully or for both subtasks to fail. 155 * If one of the subtasks completes successfully then the {@code Joiner} causes the other 156 * subtask to be cancelled (this will interrupt the thread executing the subtask), and 157 * the {@code join} method returns the result from the first subtask. Cancelling the other 158 * subtask avoids the main task waiting for a result that it doesn't care about. If both 159 * subtasks fail then the {@code join} method throws {@link FailedException} with the 160 * exception from one of the subtasks as the {@linkplain Throwable#getCause() cause}. 161 * 162 * <p> Whether code uses the {@code Subtask} returned from {@code fork} will depend on 163 * the {@code Joiner} and usage. Some {@code Joiner} implementations are suited to subtasks 164 * that return results of the same type and where the {@code join} method returns a result 165 * for the main task to use. Code that forks subtasks that return results of different 166 * types, and uses a {@code Joiner} such as {@code Joiner.awaitAllSuccessfulOrThrow()} that 167 * does not return a result, will use {@link Subtask#get() Subtask.get()} after joining. 168 * 169 * <h2>Exception handling</h2> 170 * 171 * <p> A {@code StructuredTaskScope} is opened with a {@link Joiner Joiner} that 172 * handles subtask completion and produces the outcome for the {@link #join() join} method. 173 * In some cases, the outcome will be a result, in other cases it will be an exception. 174 * If the outcome is an exception then the {@code join} method throws {@link 175 * FailedException} with the exception as the {@linkplain Throwable#getCause() 176 * cause}. For many {@code Joiner} implementations, the exception will be an exception 177 * thrown by a subtask that failed. In the case of {@link Joiner#allSuccessfulOrThrow() 178 * allSuccessfulOrThrow} and {@link Joiner#awaitAllSuccessfulOrThrow() awaitAllSuccessfulOrThrow} 179 * for example, the exception is from the first subtask to fail. 180 * 181 * <p> Many of the details for how exceptions are handled will depend on usage. In some 182 * cases it may be useful to add a {@code catch} block to catch {@code FailedException}. 183 * The exception handling may use {@code instanceof} with pattern matching to handle 184 * specific causes. 185 * {@snippet lang=java : 186 * try (var scope = StructuredTaskScope.open()) { 187 * 188 * .. 189 * 190 * } catch (StructuredTaskScope.FailedException e) { 191 * 192 * Throwable cause = e.getCause(); 193 * switch (cause) { 194 * case IOException ioe -> .. 195 * default -> .. 196 * } 197 * 198 * } 199 * } 200 * In some cases it may not be useful to catch {@code FailedException} but instead leave 201 * it to propagate to the configured {@linkplain Thread.UncaughtExceptionHandler uncaught 202 * exception handler} for logging purposes. 203 * 204 * <p> For cases where a specific exception triggers the use of a default result then it 205 * may be more appropriate to handle this in the subtask itself rather than the subtask 206 * failing and code in the main task handling the exception. 207 * 208 * <h2>Configuration</h2> 209 * 210 * A {@code StructuredTaskScope} is opened with {@linkplain Config configuration} that 211 * consists of a {@link ThreadFactory} to create threads, an optional name for monitoring 212 * and management purposes, and an optional timeout. 213 * 214 * <p> The {@link #open()} and {@link #open(Joiner)} methods create a {@code StructuredTaskScope} 215 * with the <a id="DefaultConfiguration"> <em>default configuration</em></a>. The default 216 * configuration has a {@code ThreadFactory} that creates unnamed 217 * <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>, 218 * is unnamed for monitoring and management purposes, and has no timeout. 219 * 220 * <p> The 2-arg {@link #open(Joiner, Function) open} method can be used to create a 221 * {@code StructuredTaskScope} that uses a different {@code ThreadFactory}, has a name for 222 * the purposes of monitoring and management, or has a timeout that cancels execution if 223 * the timeout expires before or while waiting for subtasks to complete. The {@code open} 224 * method is called with a {@linkplain Function function} that is applied to the default 225 * configuration and returns a {@link Config Config} for the {@code StructuredTaskScope} 226 * under construction. 227 * 228 * <p> The following example opens a new {@code StructuredTaskScope} with a {@code 229 * ThreadFactory} that creates virtual threads {@linkplain Thread#setName(String) named} 230 * "duke-0", "duke-1" ... 231 * {@snippet lang = java: 232 * // @link substring="name" target="Thread.Builder#name(String, long)" : 233 * ThreadFactory factory = Thread.ofVirtual().name("duke-", 0).factory(); 234 * 235 * // @link substring="withThreadFactory" target="Config#withThreadFactory(ThreadFactory)" : 236 * try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) { 237 * 238 * scope.fork( .. ); // runs in a virtual thread with name "duke-0" 239 * scope.fork( .. ); // runs in a virtual thread with name "duke-1" 240 * 241 * scope.join(); 242 * 243 * } 244 *} 245 * 246 * <p> A second example sets a timeout, represented by a {@link Duration}. The timeout 247 * starts when the new task scope is opened. If the timeout expires before the {@code join} 248 * method has completed then <a href="#CancelExecution">execution is cancelled</a>. This 249 * interrupts the threads executing the two subtasks and causes the {@link #join() join} 250 * method to throw {@link FailedException} with {@link TimeoutException} as the cause. 251 * {@snippet lang=java : 252 * Duration timeout = Duration.ofSeconds(10); 253 * 254 * // @link substring="allSuccessfulOrThrow" target="Joiner#allSuccessfulOrThrow()" : 255 * try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(), 256 * // @link substring="withTimeout" target="Config#withTimeout(Duration)" : 257 * cf -> cf.withTimeout(timeout))) { 258 * 259 * scope.fork(() -> query(left)); 260 * scope.fork(() -> query(right)); 261 * 262 * List<String> result = scope.join() 263 * .map(Subtask::get) 264 * .toList(); 265 * 266 * } 267 * } 268 * 269 * <h2>Inheritance of scoped value bindings</h2> 270 * 271 * {@link ScopedValue} supports the execution of a method with a {@code ScopedValue} bound 272 * to a value for the bounded period of execution of the method by the <em>current thread</em>. 273 * It allows a value to be safely and efficiently shared to methods without using method 274 * parameters. 275 * 276 * <p> When used in conjunction with a {@code StructuredTaskScope}, a {@code ScopedValue} 277 * can also safely and efficiently share a value to methods executed by subtasks forked 278 * in the task scope. When a {@code ScopedValue} object is bound to a value in the thread 279 * executing the main task then that binding is inherited by the threads created to 280 * execute the subtasks. The thread executing the main task does not continue beyond the 281 * {@link #close() close} method until all threads executing the subtasks have finished. 282 * This ensures that the {@code ScopedValue} is not reverted to being {@linkplain 283 * ScopedValue#isBound() unbound} (or its previous value) while subtasks are executing. 284 * In addition to providing a safe and efficient means to inherit a value into subtasks, 285 * the inheritance allows sequential code using {@code ScopedValue} be refactored to use 286 * structured concurrency. 287 * 288 * <p> To ensure correctness, opening a new {@code StructuredTaskScope} captures the 289 * current thread's scoped value bindings. These are the scoped values bindings that are 290 * inherited by the threads created to execute subtasks in the task scope. Forking a 291 * subtask checks that the bindings in effect at the time that the subtask is forked 292 * match the bindings when the {@code StructuredTaskScope} was created. This check ensures 293 * that a subtask does not inherit a binding that is reverted in the main task before the 294 * subtask has completed. 295 * 296 * <p> A {@code ScopedValue} that is shared across threads requires that the value be an 297 * immutable object or for all access to the value to be appropriately synchronized. 298 * 299 * <p> The following example demonstrates the inheritance of scoped value bindings. The 300 * scoped value USERNAME is bound to the value "duke" for the bounded period of a lambda 301 * expression by the thread executing it. The code in the block opens a {@code 302 * StructuredTaskScope} and forks two subtasks, it then waits in the {@code join} method 303 * and aggregates the results from both subtasks. If code executed by the threads 304 * running subtask1 and subtask2 uses {@link ScopedValue#get()}, to get the value of 305 * USERNAME, then value "duke" will be returned. 306 * {@snippet lang=java : 307 * // @link substring="newInstance" target="ScopedValue#newInstance()" : 308 * private static final ScopedValue<String> USERNAME = ScopedValue.newInstance(); 309 * 310 * // @link substring="callWhere" target="ScopedValue#where" : 311 * Result result = ScopedValue.where(USERNAME, "duke").call(() -> { 312 * 313 * try (var scope = StructuredTaskScope.open(Joiner.awaitAllSuccessfulOrThrow())) { 314 * 315 * Subtask<String> subtask1 = scope.fork( .. ); // inherits binding 316 * Subtask<Integer> subtask2 = scope.fork( .. ); // inherits binding 317 * 318 * scope.join(); 319 * return new MyResult(subtask1.get(), subtask2.get()); 320 * } 321 * 322 * }); 323 * } 324 * 325 * <p> A scoped value inherited into a subtask may be 326 * <a href="{@docRoot}/java.base/java/lang/ScopedValues.html#rebind">rebound</a> to a new 327 * value in the subtask for the bounded execution of some method executed in the subtask. 328 * When the method completes, the value of the {@code ScopedValue} reverts to its previous 329 * value, the value inherited from the thread executing the main task. 330 * 331 * <p> A subtask may execute code that itself opens a new {@code StructuredTaskScope}. 332 * A main task executing in thread T1 opens a {@code StructuredTaskScope} and forks a 333 * subtask that runs in thread T2. The scoped value bindings captured when T1 opens the 334 * task scope are inherited into T2. The subtask (in thread T2) executes code that opens a 335 * new {@code StructuredTaskScope} and forks a subtask that runs in thread T3. The scoped 336 * value bindings captured when T2 opens the task scope are inherited into T3. These 337 * include (or may be the same) as the bindings that were inherited from T1. In effect, 338 * scoped values are inherited into a tree of subtasks, not just one level of subtask. 339 * 340 * <h2>Memory consistency effects</h2> 341 * 342 * <p> Actions in the owner thread of a {@code StructuredTaskScope} prior to 343 * {@linkplain #fork forking} of a subtask 344 * <a href="{@docRoot}/java.base/java/util/concurrent/package-summary.html#MemoryVisibility"> 345 * <i>happen-before</i></a> any actions taken by that subtask, which in turn 346 * <i>happen-before</i> the subtask result is {@linkplain Subtask#get() retrieved}. 347 * 348 * <h2>General exceptions</h2> 349 * 350 * <p> Unless otherwise specified, passing a {@code null} argument to a method in this 351 * class will cause a {@link NullPointerException} to be thrown. 352 * 353 * @param <T> the result type of tasks executed in the task scope 354 * @param <R> the type of the result returned by the join method 355 * 356 * @jls 17.4.5 Happens-before Order 357 * @since 21 358 */ 359 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY) 360 public class StructuredTaskScope<T, R> implements AutoCloseable { 361 private static final VarHandle CANCELLED; 362 static { 363 try { 364 MethodHandles.Lookup l = MethodHandles.lookup(); 365 CANCELLED = l.findVarHandle(StructuredTaskScope.class,"cancelled", boolean.class); 366 } catch (Exception e) { 367 throw new ExceptionInInitializerError(e); 368 } 369 } 370 371 private final Joiner<? super T, ? extends R> joiner; 372 private final ThreadFactory threadFactory; 373 private final ThreadFlock flock; 374 375 // state, only accessed by owner thread 376 private static final int ST_NEW = 0; 377 private static final int ST_FORKED = 1; // subtasks forked, need to join 378 private static final int ST_JOIN_STARTED = 2; // join started, can no longer fork 379 private static final int ST_JOIN_COMPLETED = 3; // join completed 380 private static final int ST_CLOSED = 4; // closed 381 private int state; 382 383 // timer task, only accessed by owner thread 384 private Future<?> timerTask; 385 386 // set or read by any thread 387 private volatile boolean cancelled; 388 389 // set by the timer thread, read by the owner thread 390 private volatile boolean timeoutExpired; 391 392 /** 393 * Throws WrongThreadException if the current thread is not the owner thread. 394 */ 395 private void ensureOwner() { 396 if (Thread.currentThread() != flock.owner()) { 397 throw new WrongThreadException("Current thread not owner"); 398 } 399 } 400 401 /** 402 * Throws IllegalStateException if the already joined or task scope is closed. 403 */ 404 private void ensureNotJoined() { 405 assert Thread.currentThread() == flock.owner(); 406 if (state > ST_FORKED) { 407 throw new IllegalStateException("Already joined or task scope is closed"); 408 } 409 } 410 411 /** 412 * Throws IllegalStateException if invoked by the owner thread and the owner thread 413 * has not joined. 414 */ 415 private void ensureJoinedIfOwner() { 416 if (Thread.currentThread() == flock.owner() && state <= ST_JOIN_STARTED) { 417 throw new IllegalStateException("join not called"); 418 } 419 } 420 421 /** 422 * Interrupts all threads in this task scope, except the current thread. 423 */ 424 private void implInterruptAll() { 425 flock.threads() 426 .filter(t -> t != Thread.currentThread()) 427 .forEach(t -> { 428 try { 429 t.interrupt(); 430 } catch (Throwable ignore) { } 431 }); 432 } 433 434 @SuppressWarnings("removal") 435 private void interruptAll() { 436 if (System.getSecurityManager() == null) { 437 implInterruptAll(); 438 } else { 439 PrivilegedAction<Void> pa = () -> { 440 implInterruptAll(); 441 return null; 442 }; 443 AccessController.doPrivileged(pa); 444 } 445 } 446 447 /** 448 * Cancel exception if not already cancelled. 449 */ 450 private void cancelExecution() { 451 if (!cancelled && CANCELLED.compareAndSet(this, false, true)) { 452 // prevent new threads from starting 453 flock.shutdown(); 454 455 // interrupt all unfinished threads 456 interruptAll(); 457 458 // wakeup join 459 flock.wakeup(); 460 } 461 } 462 463 /** 464 * Schedules a task to cancel execution on timeout. 465 */ 466 private void scheduleTimeout(Duration timeout) { 467 assert Thread.currentThread() == flock.owner() && timerTask == null; 468 timerTask = TimerSupport.schedule(timeout, () -> { 469 if (!cancelled) { 470 timeoutExpired = true; 471 cancelExecution(); 472 } 473 }); 474 } 475 476 /** 477 * Cancels the timer task if set. 478 */ 479 private void cancelTimeout() { 480 assert Thread.currentThread() == flock.owner(); 481 if (timerTask != null) { 482 timerTask.cancel(false); 483 } 484 } 485 486 /** 487 * Invoked by the thread for a subtask when the subtask completes before execution 488 * was cancelled. 489 */ 490 private void onComplete(SubtaskImpl<? extends T> subtask) { 491 assert subtask.state() != Subtask.State.UNAVAILABLE; 492 if (joiner.onComplete(subtask)) { 493 cancelExecution(); 494 } 495 } 496 497 /** 498 * Initialize a new StructuredTaskScope. 499 */ 500 @SuppressWarnings("this-escape") 501 private StructuredTaskScope(Joiner<? super T, ? extends R> joiner, 502 ThreadFactory threadFactory, 503 String name) { 504 this.joiner = joiner; 505 this.threadFactory = threadFactory; 506 507 if (name == null) 508 name = Objects.toIdentityString(this); 509 this.flock = ThreadFlock.open(name); 510 } 511 512 /** 513 * Represents a subtask forked with {@link #fork(Callable)} or {@link #fork(Runnable)}. 514 * 515 * <p> Code that forks subtasks can use the {@link #get() get()} method after {@linkplain 516 * #join() joining} to obtain the result of a subtask that completed successfully. It 517 * can use the {@link #exception()} method to obtain the exception thrown by a subtask 518 * that failed. 519 * 520 * @param <T> the result type 521 * @since 21 522 */ 523 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY) 524 public sealed interface Subtask<T> extends Supplier<T> permits SubtaskImpl { 525 /** 526 * Represents the state of a subtask. 527 * @see Subtask#state() 528 * @since 21 529 */ 530 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY) 531 enum State { 532 /** 533 * The subtask result or exception is not available. This state indicates that 534 * the subtask was forked but has not completed, it completed after execution 535 * was cancelled, or it was forked after execution was cancelled (in which 536 * case a thread was not created to execute the subtask). 537 */ 538 UNAVAILABLE, 539 /** 540 * The subtask completed successfully. The {@link Subtask#get() Subtask.get()} 541 * method can be used to get the result. This is a terminal state. 542 */ 543 SUCCESS, 544 /** 545 * The subtask failed with an exception. The {@link Subtask#exception() 546 * Subtask.exception()} method can be used to get the exception. This is a 547 * terminal state. 548 */ 549 FAILED, 550 } 551 552 /** 553 * {@return the subtask state} 554 */ 555 State state(); 556 557 /** 558 * Returns the result of this subtask if it completed successfully. If 559 * {@linkplain #fork(Callable) forked} to execute a value-returning task then the 560 * result from the {@link Callable#call() call} method is returned. If 561 * {@linkplain #fork(Runnable) forked} to execute a task that does not return a 562 * result then {@code null} is returned. 563 * 564 * <p> Code executing in the scope owner thread can use this method to get the 565 * result of a successful subtask only after it has {@linkplain #join() joined}. 566 * 567 * <p> Code executing in the {@code Joiner} {@link Joiner#onComplete(Subtask) 568 * onComplete} method should test that the {@linkplain #state() subtask state} is 569 * {@link State#SUCCESS SUCCESS} before using this method to get the result. 570 * 571 * @return the possibly-null result 572 * @throws IllegalStateException if the subtask has not completed, did not complete 573 * successfully, or the current thread is the task scope owner invoking this 574 * method before {@linkplain #join() joining} 575 * @see State#SUCCESS 576 */ 577 T get(); 578 579 /** 580 * {@return the exception thrown by this subtask if it failed} If 581 * {@linkplain #fork(Callable) forked} to execute a value-returning task then 582 * the exception thrown by the {@link Callable#call() call} method is returned. 583 * If {@linkplain #fork(Runnable) forked} to execute a task that does not return 584 * a result then the exception thrown by the {@link Runnable#run() run} method is 585 * returned. 586 * 587 * <p> Code executing in the scope owner thread can use this method to get the 588 * exception thrown by a failed subtask only after it has {@linkplain #join() joined}. 589 * 590 * <p> Code executing in a {@code Joiner} {@link Joiner#onComplete(Subtask) 591 * onComplete} method should test that the {@linkplain #state() subtask state} is 592 * {@link State#FAILED FAILED} before using this method to get the exception. 593 * 594 * @throws IllegalStateException if the subtask has not completed, completed with 595 * a result, or the current thread is the task scope owner invoking this method 596 * before {@linkplain #join() joining} 597 * @see State#FAILED 598 */ 599 Throwable exception(); 600 } 601 602 /** 603 * An object used with a {@link StructuredTaskScope} to handle subtask completion 604 * and produce the result for a main task waiting in the {@link #join() join} method 605 * for subtasks to complete. 606 * 607 * <p> Joiner defines static methods to create {@code Joiner} objects for common cases: 608 * <ul> 609 * <li> {@link #allSuccessfulOrThrow() allSuccessfulOrThrow()} creates a {@code Joiner} 610 * that yields a stream of the completed subtasks for {@code join} to return when 611 * all subtasks complete successfully. It cancels execution and causes {@code join} 612 * to throw if any subtask fails. 613 * <li> {@link #anySuccessfulResultOrThrow() anySuccessfulResultOrThrow()} creates a 614 * {@code Joiner} that yields the result of the first subtask to succeed. It cancels 615 * execution and causes {@code join} to throw if all subtasks fail. 616 * <li> {@link #awaitAllSuccessfulOrThrow() awaitAllSuccessfulOrThrow()} creates a 617 * {@code Joiner} that waits for all successful subtasks. It cancels execution and 618 * causes {@code join} to throw if any subtask fails. 619 * <li> {@link #awaitAll() awaitAll()} creates a {@code Joiner} that waits for all 620 * subtasks. It does not cancel execution or cause {@code join} to throw. 621 * </ul> 622 * 623 * <p> In addition to the methods to create {@code Joiner} objects for common cases, 624 * the {@link #allUntil(Predicate) allUntil(Predicate)} method is defined to create a 625 * {@code Joiner} that yields a stream of all subtasks. It is created with a {@link 626 * Predicate Predicate} that determines if execution should continue or be cancelled. 627 * This {@code Joiner} can be built upon to create custom policies that cancel 628 * execution based on some condition. 629 * 630 * <p> More advanced policies can be developed by implementing the {@code Joiner} 631 * interface. The {@link #onFork(Subtask)} method is invoked when subtasks are forked. 632 * The {@link #onComplete(Subtask)} method is invoked when subtasks complete with a 633 * result or exception. These methods return a {@code boolean} to indicate if execution 634 * should be cancelled. These methods can be used to collect subtasks, results, or 635 * exceptions, and control when to cancel execution. The {@link #result()} method 636 * must be implemented to produce the result (or exception) for the {@code join} 637 * method. 638 * 639 * <p> Unless otherwise specified, passing a {@code null} argument to a method 640 * in this class will cause a {@link NullPointerException} to be thrown. 641 * 642 * @implSpec Implementations of this interface must be thread safe. The {@link 643 * #onComplete(Subtask)} method defined by this interface may be invoked by several 644 * threads concurrently. 645 * 646 * @apiNote It is very important that a new {@code Joiner} object is created for each 647 * {@code StructuredTaskScope}. {@code Joiner} objects should never be shared with 648 * different task scopes or re-used after a task is closed. 649 * 650 * <p> Designing a {@code Joiner} should take into account the code at the use-site 651 * where the results from the {@link StructuredTaskScope#join() join} method are 652 * processed. It should be clear what the {@code Joiner} does vs. the application 653 * code at the use-site. In general, the {@code Joiner} implementation is not the 654 * place to code "business logic". A {@code Joiner} should be designed to be as 655 * general purpose as possible. 656 * 657 * @param <T> the result type of tasks executed in the task scope 658 * @param <R> the type of results returned by the join method 659 * @since 24 660 * @see #open(Joiner) 661 */ 662 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY) 663 @FunctionalInterface 664 public interface Joiner<T, R> { 665 666 /** 667 * Invoked by {@link #fork(Callable) fork(Callable)} and {@link #fork(Runnable) 668 * fork(Runnable)} when forking a subtask. The method is invoked from the task 669 * owner thread. The method is invoked before a thread is created to run the 670 * subtask. 671 * 672 * @implSpec The default implementation throws {@code NullPointerException} if the 673 * subtask is {@code null}. It throws {@code IllegalArgumentException} if the 674 * subtask is not in the {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state, it 675 * otherwise returns {@code false}. 676 * 677 * @apiNote This method is invoked by the {@code fork} methods. It should not be 678 * invoked directly. 679 * 680 * @param subtask the subtask 681 * @return {@code true} to cancel execution 682 */ 683 default boolean onFork(Subtask<? extends T> subtask) { 684 if (subtask.state() != Subtask.State.UNAVAILABLE) { 685 throw new IllegalArgumentException(); 686 } 687 return false; 688 } 689 690 /** 691 * Invoked by the thread started to execute a subtask after the subtask completes 692 * successfully or fails with an exception. This method is not invoked if a 693 * subtask completes after execution has been cancelled. 694 * 695 * @implSpec The default implementation throws {@code NullPointerException} if the 696 * subtask is {@code null}. It throws {@code IllegalArgumentException} if the 697 * subtask is not in the {@link Subtask.State#SUCCESS SUCCESS} or {@link 698 * Subtask.State#FAILED FAILED} state, it otherwise returns {@code false}. 699 * 700 * @apiNote This method is invoked by subtasks when they complete. It should not 701 * be invoked directly. 702 * 703 * @param subtask the subtask 704 * @return {@code true} to cancel execution 705 */ 706 default boolean onComplete(Subtask<? extends T> subtask) { 707 if (subtask.state() == Subtask.State.UNAVAILABLE) { 708 throw new IllegalArgumentException(); 709 } 710 return false; 711 } 712 713 /** 714 * Invoked by {@link #join()} to produce the result (or exception) after waiting 715 * for all subtasks to complete or execution to be cancelled. The result from this 716 * method is returned by the {@code join} method. If this method throws, then 717 * {@code join} throws {@link FailedException} with the exception thrown by 718 * this method as the cause. 719 * 720 * <p> In normal usage, this method will be called at most once by the {@code join} 721 * method to produce the result (or exception). The behavior of this method when 722 * invoked directly, and invoked more than once, is not specified. Where possible, 723 * an implementation should return an equal result (or throw the same exception) 724 * on second or subsequent calls to produce the outcome. 725 * 726 * @apiNote This method is invoked by the {@code join} method. It should not be 727 * invoked directly. 728 * 729 * @return the result 730 * @throws Throwable the exception 731 */ 732 R result() throws Throwable; 733 734 /** 735 * {@return a new Joiner object that yields a stream of all subtasks when all 736 * subtasks complete successfully} 737 * This method throws, and <a href="StructuredTaskScope.html#CancelExecution"> 738 * execution is cancelled</a>, if any subtask fails. 739 * 740 * <p> If all subtasks complete successfully, the joiner's {@link Joiner#result()} 741 * method returns a stream of all subtasks in the order that they were forked. 742 * If any subtask failed then the {@code result} method throws the exception from 743 * the first subtask to fail. 744 * 745 * @apiNote Joiners returned by this method are suited to cases where all subtasks 746 * return a result of the same type. Joiners returned by {@link 747 * #awaitAllSuccessfulOrThrow()} are suited to cases where the subtasks return 748 * results of different types. 749 * 750 * @param <T> the result type of subtasks 751 */ 752 static <T> Joiner<T, Stream<Subtask<T>>> allSuccessfulOrThrow() { 753 return new AllSuccessful<>(); 754 } 755 756 /** 757 * {@return a new Joiner object that yields the result of any subtask that 758 * completed successfully} 759 * This method throws, and <a href="StructuredTaskScope.html#CancelExecution"> 760 * execution is cancelled</a>, if all subtasks fail. 761 * 762 * <p> The joiner's {@link Joiner#result()} method returns the result of a subtask 763 * that completed successfully. If all subtasks fail then the {@code result} method 764 * throws the exception from one of the failed subtasks. The {@code result} method 765 * throws {@code NoSuchElementException} if no subtasks were forked. 766 * 767 * @param <T> the result type of subtasks 768 */ 769 static <T> Joiner<T, T> anySuccessfulResultOrThrow() { 770 return new AnySuccessful<>(); 771 } 772 773 /** 774 * {@return a new Joiner object that waits for subtasks to complete successfully} 775 * This method throws, and <a href="StructuredTaskScope.html#CancelExecution"> 776 * execution is cancelled</a>, if any subtask fails. 777 * 778 * <p> The joiner's {@link Joiner#result() result} method returns {@code null} 779 * if all subtasks complete successfully, or throws the exception from the first 780 * subtask to fail. 781 * 782 * @apiNote Joiners returned by this method are suited to cases where subtasks 783 * return results of different types. Joiners returned by {@link #allSuccessfulOrThrow()} 784 * are suited to cases where the subtasks return a result of the same type. 785 * 786 * @param <T> the result type of subtasks 787 */ 788 static <T> Joiner<T, Void> awaitAllSuccessfulOrThrow() { 789 return new AwaitSuccessful<>(); 790 } 791 792 /** 793 * {@return a new Joiner object that waits for all subtasks to complete} 794 * This method does not cancel execution if a subtask fails. 795 * 796 * <p> The joiner's {@link Joiner#result() result} method returns {@code null}. 797 * 798 * @apiNote This Joiner can be useful for cases where subtasks make use of 799 * <em>side-effects</em> rather than return results or fail with exceptions. 800 * The {@link #fork(Runnable) fork(Runnable)} method can be used to fork subtasks 801 * that do not return a result. 802 * 803 * @param <T> the result type of subtasks 804 */ 805 static <T> Joiner<T, Void> awaitAll() { 806 // ensure that new Joiner object is returned 807 return new Joiner<T, Void>() { 808 @Override 809 public Void result() { 810 return null; 811 } 812 }; 813 } 814 815 /** 816 * {@return a new Joiner object that yields a stream of all subtasks when all 817 * subtasks complete or <a href="StructuredTaskScope.html#CancelExecution"> 818 * execution is cancelled</a> by a predicate} 819 * 820 * <p> The joiner's {@link Joiner#onComplete(Subtask)} method invokes the 821 * predicate's {@link Predicate#test(Object) test} method with the subtask that 822 * completed successfully or failed with an exception. If the {@code test} method 823 * returns {@code true} then <a href="StructuredTaskScope.html#CancelExecution"> 824 * execution is cancelled</a>. The {@code test} method must be thread safe as it 825 * may be invoked concurrently from several threads. 826 * 827 * <p> The joiner's {@link #result()} method returns the stream of all subtasks, 828 * in fork order. The stream may contain subtasks that have completed 829 * (in {@link Subtask.State#SUCCESS SUCCESS} or {@link Subtask.State#FAILED FAILED} 830 * state) or subtasks in the {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state 831 * if execution was cancelled before all subtasks were forked or completed. 832 * 833 * <p> The following example uses this method to create a {@code Joiner} that 834 * <a href="StructuredTaskScope.html#CancelExecution">cancels execution</a> when 835 * two or more subtasks fail. 836 * {@snippet lang=java : 837 * class CancelAfterTwoFailures<T> implements Predicate<Subtask<? extends T>> { 838 * private final AtomicInteger failedCount = new AtomicInteger(); 839 * @Override 840 * public boolean test(Subtask<? extends T> subtask) { 841 * return subtask.state() == Subtask.State.FAILED 842 * && failedCount.incrementAndGet() >= 2; 843 * } 844 * } 845 * 846 * var joiner = Joiner.all(new CancelAfterTwoFailures<String>()); 847 * } 848 * 849 * @param isDone the predicate to evaluate completed subtasks 850 * @param <T> the result type of subtasks 851 */ 852 static <T> Joiner<T, Stream<Subtask<T>>> allUntil(Predicate<Subtask<? extends T>> isDone) { 853 return new AllSubtasks<>(isDone); 854 } 855 } 856 857 /** 858 * Represents the configuration for a {@code StructuredTaskScope}. 859 * 860 * <p> The configuration for a {@code StructuredTaskScope} consists of a {@link 861 * ThreadFactory} to create threads, an optional name for the purposes of monitoring 862 * and management, and an optional timeout. 863 * 864 * <p> Creating a {@code StructuredTaskScope} with {@link #open()} or {@link #open(Joiner)} 865 * uses the <a href="StructuredTaskScope.html#DefaultConfiguration">default 866 * configuration</a>. The default configuration consists of a thread factory that 867 * creates unnamed <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads"> 868 * virtual threads</a>, no name for monitoring and management purposes, and no timeout. 869 * 870 * <p> Creating a {@code StructuredTaskScope} with its 2-arg {@link #open(Joiner, Function) 871 * open} method allows a different configuration to be used. The function specified 872 * to the {@code open} method is applied to the default configuration and returns the 873 * configuration for the {@code StructuredTaskScope} under construction. The function 874 * can use the {@code with-} prefixed methods defined here to specify the components 875 * of the configuration to use. 876 * 877 * <p> Unless otherwise specified, passing a {@code null} argument to a method 878 * in this class will cause a {@link NullPointerException} to be thrown. 879 * 880 * @since 24 881 */ 882 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY) 883 public sealed interface Config permits ConfigImpl { 884 /** 885 * {@return a new {@code Config} object with the given thread factory} 886 * The other components are the same as this object. The thread factory is used by 887 * a task scope to create threads when {@linkplain #fork(Callable) forking} subtasks. 888 * @param threadFactory the thread factory 889 * 890 * @apiNote The thread factory will typically create 891 * <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>, 892 * maybe with names for monitoring purposes, an {@linkplain Thread.UncaughtExceptionHandler 893 * uncaught exception handler}, or other properties configured. 894 * 895 * @see #fork(Callable) 896 */ 897 Config withThreadFactory(ThreadFactory threadFactory); 898 899 /** 900 * {@return a new {@code Config} object with the given name} 901 * The other components are the same as this object. A task scope is optionally 902 * named for the purposes of monitoring and management. 903 * @param name the name 904 * @see StructuredTaskScope#toString() 905 */ 906 Config withName(String name); 907 908 /** 909 * {@return a new {@code Config} object with the given timeout} 910 * The other components are the same as this object. 911 * @param timeout the timeout 912 * 913 * @apiNote Applications using deadlines, expressed as an {@link java.time.Instant}, 914 * can use {@link Duration#between Duration.between(Instant.now(), deadline)} to 915 * compute the timeout for this method. 916 * 917 * @see #join() 918 */ 919 Config withTimeout(Duration timeout); 920 } 921 922 /** 923 * Exception thrown by {@link #join()} when the outcome is an exception rather than a 924 * result. 925 * 926 * @since 24 927 */ 928 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY) 929 public static class FailedException extends RuntimeException { 930 @java.io.Serial 931 static final long serialVersionUID = -1533055100078459923L; 932 933 /** 934 * Constructs a {@code FailedException} with the specified cause. 935 * 936 * @param cause the cause, can be {@code null} 937 */ 938 public FailedException(Throwable cause) { 939 super(cause); 940 } 941 } 942 943 /** 944 * Exception thrown by {@link #join()} if the task scope was created a timeout and 945 * the timeout expired before or while waiting in {@code join}. 946 * 947 * @since 24 948 * @see Config#withTimeout(Duration) 949 */ 950 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY) 951 public static class TimeoutException extends RuntimeException { 952 @java.io.Serial 953 static final long serialVersionUID = 705788143955048766L; 954 955 /** 956 * Constructs a {@code TimeoutException} with no detail message. 957 */ 958 public TimeoutException() { } 959 } 960 961 /** 962 * Opens a new structured task scope to use the given {@code Joiner} object and with 963 * configuration that is the result of applying the given function to the 964 * <a href="#DefaultConfiguration">default configuration</a>. 965 * 966 * <p> The {@code configFunction} is called with the default configuration and returns 967 * the configuration for the new structured task scope. The function may, for example, 968 * set the {@linkplain Config#withThreadFactory(ThreadFactory) ThreadFactory} or set 969 * a {@linkplain Config#withTimeout(Duration) timeout}. 970 * 971 * <p> If a {@linkplain Config#withThreadFactory(ThreadFactory) ThreadFactory} is set 972 * then the {@code ThreadFactory}'s {@link ThreadFactory#newThread(Runnable) newThread} 973 * method will be used to create threads when forking subtasks in this task scope. 974 * 975 * <p> If a {@linkplain Config#withTimeout(Duration) timeout} is set then it starts 976 * when the task scope is opened. If the timeout expires before the task scope has 977 * {@linkplain #join() joined} then execution is cancelled and the {@code join} method 978 * throws {@link FailedException} with {@link TimeoutException} as the cause. 979 * 980 * <p> The new task scope is owned by the current thread. Only code executing in this 981 * thread can {@linkplain #fork(Callable) fork}, {@linkplain #join() join}, or 982 * {@linkplain #close close} the task scope. 983 * 984 * <p> Construction captures the current thread's {@linkplain ScopedValue scoped 985 * value} bindings for inheritance by threads started in the task scope. 986 * 987 * @param joiner the joiner 988 * @param configFunction a function to produce the configuration 989 * @return a new task scope 990 * @param <T> the result type of tasks executed in the task scope 991 * @param <R> the type of the result returned by the join method 992 * @since 24 993 */ 994 public static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner, 995 Function<Config, Config> configFunction) { 996 Objects.requireNonNull(joiner); 997 998 var config = (ConfigImpl) configFunction.apply(ConfigImpl.defaultConfig()); 999 var scope = new StructuredTaskScope<T, R>(joiner, config.threadFactory(), config.name()); 1000 1001 // schedule timeout 1002 Duration timeout = config.timeout(); 1003 if (timeout != null) { 1004 boolean done = false; 1005 try { 1006 scope.scheduleTimeout(timeout); 1007 done = true; 1008 } finally { 1009 if (!done) { 1010 scope.close(); // pop if scheduling timeout failed 1011 } 1012 } 1013 } 1014 1015 return scope; 1016 } 1017 1018 /** 1019 * Opens a new structured task scope to use the given {@code Joiner} object. The 1020 * task scope is created with the <a href="#DefaultConfiguration">default configuration</a>. 1021 * The default configuration has a {@code ThreadFactory} that creates unnamed 1022 * <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>, 1023 * is unnamed for monitoring and management purposes, and has no timeout. 1024 * 1025 * @implSpec 1026 * This factory method is equivalent to invoking the 2-arg open method with the given 1027 * joiner and the {@linkplain Function#identity() identity function}. 1028 * 1029 * @param joiner the joiner 1030 * @return a new task scope 1031 * @param <T> the result type of tasks executed in the task scope 1032 * @param <R> the type of the result returned by the join method 1033 * @since 24 1034 */ 1035 public static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner) { 1036 return open(joiner, Function.identity()); 1037 } 1038 1039 /** 1040 * Opens a new structured task scope that can be used to fork subtasks that return 1041 * results of any type. The {@link #join()} method waits for all subtasks to succeed 1042 * or any subtask to fail. 1043 * 1044 * <p> The {@code join} method returns {@code null} if all subtasks complete successfully. 1045 * It throws {@link FailedException} if any subtask fails, with the exception from 1046 * the first subtask to fail as the cause. 1047 * 1048 * <p> The task scope is created with the <a href="#DefaultConfiguration">default 1049 * configuration</a>. The default configuration has a {@code ThreadFactory} that creates 1050 * unnamed <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual 1051 * threads</a>, is unnamed for monitoring and management purposes, and has no timeout. 1052 * 1053 * @implSpec 1054 * This factory method is equivalent to invoking the 2-arg open method with a joiner 1055 * created with {@link Joiner#awaitAllSuccessfulOrThrow() awaitAllSuccessfulOrThrow()} 1056 * and the {@linkplain Function#identity() identity function}. 1057 * 1058 * @param <T> the result type of subtasks 1059 * @return a new task scope 1060 * @since 24 1061 */ 1062 public static <T> StructuredTaskScope<T, Void> open() { 1063 return open(Joiner.awaitAllSuccessfulOrThrow(), Function.identity()); 1064 } 1065 1066 /** 1067 * Starts a new thread in this task scope to execute a value-returning task, thus 1068 * creating a <em>subtask</em>. The value-returning task is provided to this method 1069 * as a {@link Callable}, the thread executes the task's {@link Callable#call() call} 1070 * method. 1071 * 1072 * <p> This method first creates a {@link Subtask Subtask} to represent the <em>forked 1073 * subtask</em>. It invokes the joiner's {@link Joiner#onFork(Subtask) onFork} method 1074 * with the {@code Subtask} object. If the {@code onFork} completes with an exception 1075 * or error then it is propagated by the {@code fork} method. If execution is 1076 * {@linkplain #isCancelled() cancelled}, or {@code onFork} returns {@code true} to 1077 * cancel execution, then this method returns the {@code Subtask} (in the {@link 1078 * Subtask.State#UNAVAILABLE UNAVAILABLE} state) without creating a thread to execute 1079 * the subtask. If execution is not cancelled then a thread is created with the 1080 * {@link ThreadFactory} configured when the task scope was created, and the thread is 1081 * started. Forking a subtask inherits the current thread's {@linkplain ScopedValue 1082 * scoped value} bindings. The bindings must match the bindings captured when the 1083 * task scope was opened. If the subtask completes (successfully or with an exception) 1084 * before execution is cancelled, then the thread invokes the joiner's 1085 * {@link Joiner#onComplete(Subtask) onComplete} method with subtask in the 1086 * {@link Subtask.State#SUCCESS SUCCESS} or {@link Subtask.State#FAILED FAILED} state. 1087 * 1088 * <p> This method returns the {@link Subtask Subtask} object. In some usages, this 1089 * object may be used to get its result. In other cases it may be used for correlation 1090 * or just discarded. To ensure correct usage, the {@link Subtask#get() Subtask.get()} 1091 * method may only be called by the task scope owner to get the result after it has 1092 * waited for subtasks to complete with the {@link #join() join} method and the subtask 1093 * completed successfully. Similarly, the {@link Subtask#exception() Subtask.exception()} 1094 * method may only be called by the task scope owner after it has joined and the subtask 1095 * failed. If execution was cancelled before the subtask was forked, or before it 1096 * completes, then neither method can be used to obtain the outcome. 1097 * 1098 * <p> This method may only be invoked by the task scope owner. 1099 * 1100 * @param task the value-returning task for the thread to execute 1101 * @param <U> the result type 1102 * @return the subtask 1103 * @throws WrongThreadException if the current thread is not the task scope owner 1104 * @throws IllegalStateException if the owner has already {@linkplain #join() joined} 1105 * or the task scope is closed 1106 * @throws StructureViolationException if the current scoped value bindings are not 1107 * the same as when the task scope was created 1108 * @throws RejectedExecutionException if the thread factory rejected creating a 1109 * thread to run the subtask 1110 */ 1111 public <U extends T> Subtask<U> fork(Callable<? extends U> task) { 1112 Objects.requireNonNull(task); 1113 ensureOwner(); 1114 ensureNotJoined(); 1115 1116 var subtask = new SubtaskImpl<U>(this, task); 1117 1118 // notify joiner, even if cancelled 1119 if (joiner.onFork(subtask)) { 1120 cancelExecution(); 1121 } 1122 1123 if (!cancelled) { 1124 // create thread to run task 1125 Thread thread = threadFactory.newThread(subtask); 1126 if (thread == null) { 1127 throw new RejectedExecutionException("Rejected by thread factory"); 1128 } 1129 1130 // attempt to start the thread 1131 try { 1132 flock.start(thread); 1133 } catch (IllegalStateException e) { 1134 // shutdown by another thread, or underlying flock is shutdown due 1135 // to unstructured use 1136 } 1137 } 1138 1139 // force owner to join 1140 state = ST_FORKED; 1141 return subtask; 1142 } 1143 1144 /** 1145 * Starts a new thread in this task scope to execute a task that does not return a 1146 * result, creating a <em>subtask</em>. 1147 * 1148 * <p> This method works exactly the same as {@link #fork(Callable)} except that 1149 * the task is provided to this method as a {@link Runnable}, the thread executes 1150 * the task's {@link Runnable#run() run} method, and its result is {@code null}. 1151 * 1152 * @param task the task for the thread to execute 1153 * @return the subtask 1154 * @throws WrongThreadException if the current thread is not the task scope owner 1155 * @throws IllegalStateException if the owner has already {@linkplain #join() joined} 1156 * or the task scope is closed 1157 * @throws StructureViolationException if the current scoped value bindings are not 1158 * the same as when the task scope was created 1159 * @throws RejectedExecutionException if the thread factory rejected creating a 1160 * thread to run the subtask 1161 * @since 24 1162 */ 1163 public Subtask<? extends T> fork(Runnable task) { 1164 Objects.requireNonNull(task); 1165 return fork(() -> { task.run(); return null; }); 1166 } 1167 1168 /** 1169 * Waits for all subtasks started in this task scope to complete or execution to be 1170 * cancelled. If a {@linkplain Config#withTimeout(Duration) timeout} has been set 1171 * then execution will be cancelled if the timeout expires before or while waiting. 1172 * Once finished waiting, the {@code Joiner}'s {@link Joiner#result() result} method 1173 * is invoked to get the result or throw an exception. If the {@code result} method 1174 * throws then this method throws {@code FailedException} with the exception thrown 1175 * by the {@code result()} method as the cause. 1176 * 1177 * <p> This method waits for all subtasks by waiting for all threads {@linkplain 1178 * #fork(Callable) started} in this task scope to finish execution. It stops waiting 1179 * when all threads finish, the {@code Joiner}'s {@link Joiner#onFork(Subtask) 1180 * onFork} or {@link Joiner#onComplete(Subtask) onComplete} returns {@code true} 1181 * to cancel execution, the timeout (if set) expires, or the current thread is 1182 * {@linkplain Thread#interrupt() interrupted}. 1183 * 1184 * <p> This method may only be invoked by the task scope owner, and only once. 1185 * 1186 * @return the {@link Joiner#result() result} 1187 * @throws WrongThreadException if the current thread is not the task scope owner 1188 * @throws IllegalStateException if already joined or this task scope is closed 1189 * @throws FailedException if the <i>outcome</i> is an exception, thrown with the 1190 * exception from {@link Joiner#result()} as the cause 1191 * @throws TimeoutException if a timeout is set and the timeout expires before or 1192 * while waiting 1193 * @throws InterruptedException if interrupted while waiting 1194 */ 1195 public R join() throws InterruptedException { 1196 ensureOwner(); 1197 ensureNotJoined(); 1198 1199 // join started 1200 state = ST_JOIN_STARTED; 1201 1202 // wait for all subtasks, execution to be cancelled, or interrupt 1203 flock.awaitAll(); 1204 1205 // throw if timeout expired 1206 if (timeoutExpired) { 1207 throw new TimeoutException(); 1208 } 1209 cancelTimeout(); 1210 1211 // all subtasks completed or cancelled 1212 state = ST_JOIN_COMPLETED; 1213 1214 // invoke joiner to get result 1215 try { 1216 return joiner.result(); 1217 } catch (Throwable e) { 1218 throw new FailedException(e); 1219 } 1220 } 1221 1222 /** 1223 * {@return {@code true} if <a href="#CancelExecution">execution is cancelled</a>, 1224 * or in the process of being cancelled, otherwise {@code false}} 1225 * 1226 * <p> Cancelling execution prevents new threads from starting in the task scope and 1227 * {@linkplain Thread#interrupt() interrupts} threads executing unfinished subtasks. 1228 * It may take some time before the interrupted threads finish execution; this 1229 * method may return {@code true} before all threads have been interrupted or before 1230 * all threads have finished. 1231 * 1232 * @apiNote A main task with a lengthy "forking phase" (the code that executes before 1233 * the main task invokes {@link #join() join}) may use this method to avoid doing work 1234 * in cases where execution was cancelled by the completion of a previously forked 1235 * subtask or timeout. 1236 * 1237 * @since 24 1238 */ 1239 public boolean isCancelled() { 1240 return cancelled; 1241 } 1242 1243 /** 1244 * Closes this task scope. 1245 * 1246 * <p> This method first <a href="#CancelExecution">cancels execution</a>, if not 1247 * already cancelled. This interrupts the threads executing unfinished subtasks. This 1248 * method then waits for all threads to finish. If interrupted while waiting then it 1249 * will continue to wait until the threads finish, before completing with the interrupt 1250 * status set. 1251 * 1252 * <p> This method may only be invoked by the task scope owner. If the task scope 1253 * is already closed then the task scope owner invoking this method has no effect. 1254 * 1255 * <p> A {@code StructuredTaskScope} is intended to be used in a <em>structured 1256 * manner</em>. If this method is called to close a task scope before nested task 1257 * scopes are closed then it closes the underlying construct of each nested task scope 1258 * (in the reverse order that they were created in), closes this task scope, and then 1259 * throws {@link StructureViolationException}. 1260 * Similarly, if this method is called to close a task scope while executing with 1261 * {@linkplain ScopedValue scoped value} bindings, and the task scope was created 1262 * before the scoped values were bound, then {@code StructureViolationException} is 1263 * thrown after closing the task scope. 1264 * If a thread terminates without first closing task scopes that it owns then 1265 * termination will cause the underlying construct of each of its open tasks scopes to 1266 * be closed. Closing is performed in the reverse order that the task scopes were 1267 * created in. Thread termination may therefore be delayed when the task scope owner 1268 * has to wait for threads forked in these task scopes to finish. 1269 * 1270 * @throws IllegalStateException thrown after closing the task scope if the task scope 1271 * owner did not attempt to join after forking 1272 * @throws WrongThreadException if the current thread is not the task scope owner 1273 * @throws StructureViolationException if a structure violation was detected 1274 */ 1275 @Override 1276 public void close() { 1277 ensureOwner(); 1278 int s = state; 1279 if (s == ST_CLOSED) { 1280 return; 1281 } 1282 1283 // cancel execution if join did not complete 1284 if (s < ST_JOIN_COMPLETED) { 1285 cancelExecution(); 1286 cancelTimeout(); 1287 } 1288 1289 // wait for stragglers 1290 try { 1291 flock.close(); 1292 } finally { 1293 state = ST_CLOSED; 1294 } 1295 1296 // throw ISE if the owner didn't join after forking 1297 if (s == ST_FORKED) { 1298 throw new IllegalStateException("Owner did not join after forking"); 1299 } 1300 } 1301 1302 /** 1303 * {@inheritDoc} If a {@link Config#withName(String) name} for monitoring and 1304 * monitoring purposes has been set then the string representation includes the name. 1305 */ 1306 @Override 1307 public String toString() { 1308 return flock.name(); 1309 } 1310 1311 /** 1312 * Subtask implementation, runs the task specified to the fork method. 1313 */ 1314 private static final class SubtaskImpl<T> implements Subtask<T>, Runnable { 1315 private static final AltResult RESULT_NULL = new AltResult(Subtask.State.SUCCESS); 1316 1317 private record AltResult(Subtask.State state, Throwable exception) { 1318 AltResult(Subtask.State state) { 1319 this(state, null); 1320 } 1321 } 1322 1323 private final StructuredTaskScope<? super T, ?> scope; 1324 private final Callable<? extends T> task; 1325 private volatile Object result; 1326 1327 SubtaskImpl(StructuredTaskScope<? super T, ?> scope, Callable<? extends T> task) { 1328 this.scope = scope; 1329 this.task = task; 1330 } 1331 1332 @Override 1333 public void run() { 1334 T result = null; 1335 Throwable ex = null; 1336 try { 1337 result = task.call(); 1338 } catch (Throwable e) { 1339 ex = e; 1340 } 1341 1342 // nothing to do if task scope is cancelled 1343 if (scope.isCancelled()) 1344 return; 1345 1346 // set result/exception and invoke onComplete 1347 if (ex == null) { 1348 this.result = (result != null) ? result : RESULT_NULL; 1349 } else { 1350 this.result = new AltResult(State.FAILED, ex); 1351 } 1352 scope.onComplete(this); 1353 } 1354 1355 @Override 1356 public Subtask.State state() { 1357 Object result = this.result; 1358 if (result == null) { 1359 return State.UNAVAILABLE; 1360 } else if (result instanceof AltResult alt) { 1361 // null or failed 1362 return alt.state(); 1363 } else { 1364 return State.SUCCESS; 1365 } 1366 } 1367 1368 1369 @Override 1370 public T get() { 1371 scope.ensureJoinedIfOwner(); 1372 Object result = this.result; 1373 if (result instanceof AltResult) { 1374 if (result == RESULT_NULL) return null; 1375 } else if (result != null) { 1376 @SuppressWarnings("unchecked") 1377 T r = (T) result; 1378 return r; 1379 } 1380 throw new IllegalStateException( 1381 "Result is unavailable or subtask did not complete successfully"); 1382 } 1383 1384 @Override 1385 public Throwable exception() { 1386 scope.ensureJoinedIfOwner(); 1387 Object result = this.result; 1388 if (result instanceof AltResult alt && alt.state() == State.FAILED) { 1389 return alt.exception(); 1390 } 1391 throw new IllegalStateException( 1392 "Exception is unavailable or subtask did not complete with exception"); 1393 } 1394 1395 @Override 1396 public String toString() { 1397 String stateAsString = switch (state()) { 1398 case UNAVAILABLE -> "[Unavailable]"; 1399 case SUCCESS -> "[Completed successfully]"; 1400 case FAILED -> { 1401 Throwable ex = ((AltResult) result).exception(); 1402 yield "[Failed: " + ex + "]"; 1403 } 1404 }; 1405 return Objects.toIdentityString(this) + stateAsString; 1406 } 1407 } 1408 1409 /** 1410 * A joiner that returns a stream of all subtasks when all subtasks complete 1411 * successfully. If any subtask fails then execution is cancelled. 1412 */ 1413 private static final class AllSuccessful<T> implements Joiner<T, Stream<Subtask<T>>> { 1414 private static final VarHandle FIRST_EXCEPTION; 1415 static { 1416 try { 1417 MethodHandles.Lookup l = MethodHandles.lookup(); 1418 FIRST_EXCEPTION = l.findVarHandle(AllSuccessful.class, "firstException", Throwable.class); 1419 } catch (Exception e) { 1420 throw new ExceptionInInitializerError(e); 1421 } 1422 } 1423 // list of forked subtasks, only accessed by owner thread 1424 private final List<Subtask<T>> subtasks = new ArrayList<>(); 1425 private volatile Throwable firstException; 1426 1427 @Override 1428 public boolean onFork(Subtask<? extends T> subtask) { 1429 @SuppressWarnings("unchecked") 1430 var tmp = (Subtask<T>) Objects.requireNonNull(subtask); 1431 subtasks.add(tmp); 1432 return false; 1433 } 1434 1435 @Override 1436 public boolean onComplete(Subtask<? extends T> subtask) { 1437 return (subtask.state() == Subtask.State.FAILED) 1438 && (firstException == null) 1439 && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception()); 1440 } 1441 1442 @Override 1443 public Stream<Subtask<T>> result() throws Throwable { 1444 Throwable ex = firstException; 1445 if (ex != null) { 1446 throw ex; 1447 } else { 1448 return subtasks.stream(); 1449 } 1450 } 1451 } 1452 1453 /** 1454 * A joiner that returns the result of the first subtask to complete successfully. 1455 * If any subtask completes successfully then execution is cancelled. 1456 */ 1457 private static final class AnySuccessful<T> implements Joiner<T, T> { 1458 private static final VarHandle FIRST_SUCCESS; 1459 private static final VarHandle FIRST_EXCEPTION; 1460 static { 1461 try { 1462 MethodHandles.Lookup l = MethodHandles.lookup(); 1463 FIRST_SUCCESS = l.findVarHandle(AnySuccessful.class, "firstSuccess", Subtask.class); 1464 FIRST_EXCEPTION = l.findVarHandle(AnySuccessful.class, "firstException", Throwable.class); 1465 } catch (Exception e) { 1466 throw new ExceptionInInitializerError(e); 1467 } 1468 } 1469 private volatile Subtask<T> firstSuccess; 1470 private volatile Throwable firstException; 1471 1472 @Override 1473 public boolean onComplete(Subtask<? extends T> subtask) { 1474 Objects.requireNonNull(subtask); 1475 if (firstSuccess == null) { 1476 if (subtask.state() == Subtask.State.SUCCESS) { 1477 // capture the first subtask that completes successfully 1478 return FIRST_SUCCESS.compareAndSet(this, null, subtask); 1479 } else if (firstException == null) { 1480 // capture the exception thrown by the first task to fail 1481 FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception()); 1482 } 1483 } 1484 return false; 1485 } 1486 1487 @Override 1488 public T result() throws Throwable { 1489 Subtask<T> firstSuccess = this.firstSuccess; 1490 if (firstSuccess != null) { 1491 return firstSuccess.get(); 1492 } 1493 Throwable firstException = this.firstException; 1494 if (firstException != null) { 1495 throw firstException; 1496 } else { 1497 throw new NoSuchElementException("No subtasks completed"); 1498 } 1499 } 1500 } 1501 1502 /** 1503 * A joiner that that waits for all successful subtasks. If any subtask fails the 1504 * execution is cancelled. 1505 */ 1506 private static final class AwaitSuccessful<T> implements Joiner<T, Void> { 1507 private static final VarHandle FIRST_EXCEPTION; 1508 static { 1509 try { 1510 MethodHandles.Lookup l = MethodHandles.lookup(); 1511 FIRST_EXCEPTION = l.findVarHandle(AwaitSuccessful.class, "firstException", Throwable.class); 1512 } catch (Exception e) { 1513 throw new ExceptionInInitializerError(e); 1514 } 1515 } 1516 private volatile Throwable firstException; 1517 1518 @Override 1519 public boolean onComplete(Subtask<? extends T> subtask) { 1520 return (subtask.state() == Subtask.State.FAILED) 1521 && (firstException == null) 1522 && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception()); 1523 } 1524 1525 @Override 1526 public Void result() throws Throwable { 1527 Throwable ex = firstException; 1528 if (ex != null) { 1529 throw ex; 1530 } else { 1531 return null; 1532 } 1533 } 1534 } 1535 1536 /** 1537 * A joiner that returns a stream of all subtasks. 1538 */ 1539 private static class AllSubtasks<T> implements Joiner<T, Stream<Subtask<T>>> { 1540 private final Predicate<Subtask<? extends T>> isDone; 1541 // list of forked subtasks, only accessed by owner thread 1542 private final List<Subtask<T>> subtasks = new ArrayList<>(); 1543 1544 AllSubtasks(Predicate<Subtask<? extends T>> isDone) { 1545 this.isDone = Objects.requireNonNull(isDone); 1546 } 1547 1548 @Override 1549 public boolean onFork(Subtask<? extends T> subtask) { 1550 @SuppressWarnings("unchecked") 1551 var tmp = (Subtask<T>) Objects.requireNonNull(subtask); 1552 subtasks.add(tmp); 1553 return false; 1554 } 1555 1556 @Override 1557 public boolean onComplete(Subtask<? extends T> subtask) { 1558 return isDone.test(Objects.requireNonNull(subtask)); 1559 } 1560 1561 @Override 1562 public Stream<Subtask<T>> result() { 1563 return subtasks.stream(); 1564 } 1565 } 1566 1567 /** 1568 * Implementation of Config. 1569 */ 1570 private record ConfigImpl(ThreadFactory threadFactory, 1571 String name, 1572 Duration timeout) implements Config { 1573 static Config defaultConfig() { 1574 return new ConfigImpl(Thread.ofVirtual().factory(), null, null); 1575 } 1576 1577 @Override 1578 public Config withThreadFactory(ThreadFactory threadFactory) { 1579 return new ConfigImpl(Objects.requireNonNull(threadFactory), name, timeout); 1580 } 1581 1582 @Override 1583 public Config withName(String name) { 1584 return new ConfigImpl(threadFactory, Objects.requireNonNull(name), timeout); 1585 } 1586 1587 @Override 1588 public Config withTimeout(Duration timeout) { 1589 return new ConfigImpl(threadFactory, name, Objects.requireNonNull(timeout)); 1590 } 1591 } 1592 1593 /** 1594 * Used to schedule a task to cancel execution when a timeout expires. 1595 */ 1596 private static class TimerSupport { 1597 private static final ScheduledExecutorService DELAYED_TASK_SCHEDULER; 1598 static { 1599 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) 1600 Executors.newScheduledThreadPool(1, task -> { 1601 Thread t = InnocuousThread.newThread("StructuredTaskScope-Timer", task); 1602 t.setDaemon(true); 1603 return t; 1604 }); 1605 stpe.setRemoveOnCancelPolicy(true); 1606 DELAYED_TASK_SCHEDULER = stpe; 1607 } 1608 1609 static Future<?> schedule(Duration timeout, Runnable task) { 1610 long nanos = TimeUnit.NANOSECONDS.convert(timeout); 1611 return DELAYED_TASK_SCHEDULER.schedule(task, nanos, TimeUnit.NANOSECONDS); 1612 } 1613 } 1614 }