1 /* 2 * Copyright (c) 2021, 2024, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.concurrent; 26 27 import java.lang.invoke.MethodHandles; 28 import java.lang.invoke.VarHandle; 29 import java.security.AccessController; 30 import java.security.PrivilegedAction; 31 import java.time.Duration; 32 import java.util.ArrayList; 33 import java.util.List; 34 import java.util.NoSuchElementException; 35 import java.util.Objects; 36 import java.util.function.Function; 37 import java.util.function.Predicate; 38 import java.util.function.Supplier; 39 import java.util.stream.Stream; 40 import jdk.internal.javac.PreviewFeature; 41 import jdk.internal.misc.InnocuousThread; 42 import jdk.internal.misc.ThreadFlock; 43 44 /** 45 * An API for <em>structured concurrency</em>. {@code StructuredTaskScope} supports cases 46 * where a main task splits into several concurrent subtasks, and where the subtasks must 47 * complete before the main task continues. A {@code StructuredTaskScope} can be used to 48 * ensure that the lifetime of a concurrent operation is confined by a <em>syntax block</em>, 49 * just like that of a sequential operation in structured programming. 50 * 51 * <p> {@code StructuredTaskScope} defines the static method {@link #open(Joiner) open} 52 * to open a new {@code StructuredTaskScope} and the {@link #close() close} method to close 53 * it. The API is designed to be used with the {@code try-with-resources} statement where 54 * the {@code StructuredTaskScope} is opened as a resource and then closed automatically. 55 * The code in the block uses the {@link #fork(Callable) fork} method to fork subtasks. 56 * After forking, it uses the {@link #join() join} method to wait for all subtasks to 57 * finish (or some other outcome) as a single operation. Forking a subtask starts a new 58 * {@link Thread} to run the subtask. The thread executing the main task does not continue 59 * beyond the {@code close} method until all threads started to execute subtasks have finished. 60 * To ensure correct usage, the {@code fork}, {@code join} and {@code close} methods may 61 * only be invoked by the <em>owner thread</em> (the thread that opened the {@code 62 * StructuredTaskScope}), and the {@code close} method throws an exception after closing 63 * if the owner did not invoke the {@code join} method. 64 * 65 * <p> A {@code StructuredTaskScope} is opened with a {@link Joiner} that handles subtask 66 * completion and produces the outcome (the result or an exception) for the {@link #join() 67 * join} method. The {@code Joiner} interface defines static methods to create a 68 * {@code Joiner} for common cases. 69 * 70 * <p> A {@code Joiner} may <a id="CancelExecution"><em>cancel execution</em></a> 71 * (sometimes called "short-circuiting") when some condition is reached that does not 72 * require the result of subtasks that are still executing. Cancelling execution prevents 73 * new threads from being started to execute further subtasks, {@linkplain Thread#interrupt() 74 * interrupts} the threads executing subtasks that have not completed, and causes the 75 * {@code join} method to wakeup with a result (or exception). The {@link #close() close} 76 * method always waits for threads executing subtasks to finish, even if execution is 77 * cancelled, so it cannot continue beyond the {@code close} method until the interrupted 78 * threads finish. Subtasks should be coded so that they finish as soon as possible when 79 * interrupted. Subtasks that block on methods that are not interruptible may delay the 80 * closing of a task scope. 81 * 82 * <p> Consider the example of a main task that splits into two subtasks to concurrently 83 * fetch resources from two URL locations "left" and "right". Both subtasks may complete 84 * successfully, one subtask may succeed and the other may fail, or both subtasks may 85 * fail. The main task in this example is interested in the successful result from both 86 * subtasks. It uses {@link Joiner#awaitAllSuccessfulOrThrow() 87 * Joiner.awaitAllSuccessfulOrThrow()} to create a {@code Joiner} that waits for both 88 * subtasks to complete successfully or for either subtask to fail. 89 * {@snippet lang=java : 90 * try (var scope = StructuredTaskScope.open(Joiner.awaitAllSuccessfulOrThrow())) { 91 * 92 * // @link substring="fork" target="#fork(Callable)" : 93 * Subtask<String> subtask1 = scope.fork(() -> query(left)); 94 * Subtask<Integer> subtask2 = scope.fork(() -> query(right)); 95 * 96 * // throws if either subtask fails 97 * scope.join(); // @link substring="join" target="#join()" 98 * 99 * // both subtasks completed successfully 100 * return new MyResult(subtask1.get(), subtask2.get()); // @link substring="get" target="Subtask#get()" 101 * 102 * } 103 * } 104 * 105 * <p> In this example, the main task forks the two subtasks. The {@code fork} method 106 * returns a {@link Subtask Subtask} that is a handle to the forked subtask. The main task 107 * waits in the {@code join} method for both subtasks to complete successfully or for either 108 * subtask to fail. If both subtasks complete successfully then the {@code join} method 109 * completes and the main task uses the {@link Subtask#get() Subtask.get()} method to get 110 * the result of each subtask. If either subtask fails then the {@code Joiner} causes the 111 * other subtask to be cancelled (this will interrupt the thread executing the subtask) 112 * and the {@code join} method throws {@link ExecutionException} with the exception from 113 * the failed subtask as the {@linkplain Throwable#getCause() cause}. 114 * 115 * <p> Now consider another example that also splits into two subtasks to concurrently 116 * fetch resources. In this example, the code in the main task is only interested in the 117 * result from the first subtask to complete successfully. The example uses {@link 118 * Joiner#anySuccessfulResultOrThrow() Joiner.anySuccessfulResultOrThrow()} to 119 * create a {@code Joiner} that makes available the result of the first subtask to 120 * complete successfully. The type parameter in the example is "{@code String}" so that 121 * only subtasks that return a {@code String} can be forked. 122 * {@snippet lang=java : 123 * // @link substring="open" target="#open(Policy)" : 124 * try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow())) { 125 * 126 * scope.fork(() -> query(left)); // @link substring="fork" target="#fork(Callable)" 127 * scope.fork(() -> query(right)); 128 * 129 * // throws if both subtasks fail 130 * String firstResult = scope.join(); 131 * 132 * // @link substring="close" target="#close()" : 133 * } // close 134 * } 135 * 136 * <p> In the example, the main task forks the two subtasks, then waits in the {@code 137 * join} method for either subtask to complete successfully or for both subtasks to fail. 138 * If one of the subtasks completes successfully then the {@code Joiner} causes the other 139 * subtask to be cancelled (this will interrupt the thread executing the subtask), and 140 * the {@code join} method returns the result from the first subtask. Cancelling the other 141 * subtask avoids the main task waiting for a result that it doesn't care about. If both 142 * subtasks fail then the {@code join} method throws {@link ExecutionException} with the 143 * exception from one of the subtasks as the {@linkplain Throwable#getCause() cause}. 144 * 145 * <p> Whether code uses the {@code Subtask} returned from {@code fork} will depend on 146 * the {@code Joiner} and usage. Some {@code Joiner} implementations are suited to subtasks 147 * that return results of the same type and where the {@code join} method returns a result 148 * for the main task to use. Code that forks subtasks that return results of different 149 * types, and uses a {@code Joiner} such as {@code Joiner.awaitAllSuccessfulOrThrow()} that 150 * does not return a result, will use {@link Subtask#get() Subtask.get()} after joining. 151 * 152 * <h2>Exception handling</h2> 153 * 154 * <p> A {@code StructuredTaskScope} is opened with a {@link Joiner Joiner} that 155 * handles subtask completion and produces the outcome for the {@link #join() join} method. 156 * In some cases, the outcome will be a result, in other cases it will be an exception. 157 * If the outcome is an exception then the {@code join} method throws {@link 158 * ExecutionException} with the exception as the {@linkplain Throwable#getCause() 159 * cause}. For many {@code Joiner} implementations, the exception will be an exception 160 * thrown by a subtask that failed. In the case of {@link Joiner#allSuccessfulOrThrow() 161 * allSuccessfulOrThrow} and {@link Joiner#awaitAllSuccessfulOrThrow() awaitAllSuccessfulOrThrow} 162 * for example, the exception is from the first subtask to fail. 163 * 164 * <p> Many of the details for how exceptions are handled will depend on usage. In some 165 * cases, the {@code join} method will be called in a {@code try-catch} block to catch 166 * {@code ExecutionException} and handle the cause. The exception handling may use 167 * {@code instanceof} with pattern matching to handle specific causes. In some cases it 168 * may not be useful to catch {@code ExecutionException} but instead leave it to propagate 169 * to the configured {@linkplain Thread.UncaughtExceptionHandler uncaught exception handler} 170 * for logging purposes. 171 * 172 * <p> For cases where a specific exception triggers the use of a default result then it 173 * may be more appropriate to handle this in the subtask itself rather than the subtask 174 * failing and code in the main task handling the exception. 175 * 176 * <h2>Configuration</h2> 177 * 178 * A {@code StructuredTaskScope} is opened with {@linkplain Config configuration} that 179 * consists of a {@link ThreadFactory} to create threads, an optional name for monitoring 180 * and management purposes, and an optional timeout. 181 * 182 * <p> The 1-arg {@link #open(Joiner) open} method creates a {@code StructuredTaskScope} 183 * with the <a id="DefaultConfiguration"> <em>default configuration</em></a>. The default 184 * configuration has a {@code ThreadFactory} that creates unnamed 185 * <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>, 186 * is unnamed for monitoring and management purposes, and has no timeout. 187 * 188 * <p> The 2-arg {@link #open(Joiner, Function) open} method can be used to create a 189 * {@code StructuredTaskScope} that uses a different {@code ThreadFactory}, has a name for 190 * the purposes of monitoring and management, or has a timeout that cancels execution if 191 * the timeout expires before or while waiting for subtasks to complete. The {@code open} 192 * method is called with a {@linkplain Function function} that is applied to the default 193 * configuration and returns a {@link Config Config} for the {@code StructuredTaskScope} 194 * under construction. 195 * 196 * <p> The following example opens a new {@code StructuredTaskScope} with a {@code 197 * ThreadFactory} that creates virtual threads {@linkplain Thread#setName(String) named} 198 * "duke-0", "duke-1" ... 199 * {@snippet lang = java: 200 * // @link substring="name" target="Thread.Builder#name(String, long)" : 201 * ThreadFactory factory = Thread.ofVirtual().name("duke-", 0).factory(); 202 * 203 * // @link substring="withThreadFactory" target="Config#withThreadFactory(ThreadFactory)" : 204 * try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) { 205 * 206 * scope.fork( .. ); // runs in a virtual thread with name "duke-0" 207 * scope.fork( .. ); // runs in a virtual thread with name "duke-1" 208 * 209 * scope.join(); 210 * 211 * } 212 *} 213 * 214 * <p> A second example sets a timeout, represented by a {@link Duration}. The timeout 215 * starts when the new task scope is opened. If the timeout expires before the {@code join} 216 * method has completed then <a href="#CancelExecution">execution is cancelled</a>. This 217 * interrupts the threads executing the two subtasks and causes the {@link #join() join} 218 * method to throw {@link ExecutionException} with {@link TimeoutException} as the cause. 219 * {@snippet lang=java : 220 * Duration timeout = Duration.ofSeconds(10); 221 * 222 * // @link substring="allSuccessfulOrThrow" target="Joiner#allSuccessfulOrThrow()" : 223 * try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(), 224 * // @link substring="withTimeout" target="Config#withTimeout(Duration)" : 225 * cf -> cf.withTimeout(timeout))) { 226 * 227 * scope.fork(() -> query(left)); 228 * scope.fork(() -> query(right)); 229 * 230 * List<String> result = scope.join() 231 * .map(Subtask::get) 232 * .toList(); 233 * 234 * } 235 * } 236 * 237 * <h2>Inheritance of scoped value bindings</h2> 238 * 239 * {@link ScopedValue} supports the execution of a method with a {@code ScopedValue} bound 240 * to a value for the bounded period of execution of the method by the <em>current thread</em>. 241 * It allows a value to be safely and efficiently shared to methods without using method 242 * parameters. 243 * 244 * <p> When used in conjunction with a {@code StructuredTaskScope}, a {@code ScopedValue} 245 * can also safely and efficiently share a value to methods executed by subtasks forked 246 * in the task scope. When a {@code ScopedValue} object is bound to a value in the thread 247 * executing the main task then that binding is inherited by the threads created to 248 * execute the subtasks. The thread executing the main task does not continue beyond the 249 * {@link #close() close} method until all threads executing the subtasks have finished. 250 * This ensures that the {@code ScopedValue} is not reverted to being {@linkplain 251 * ScopedValue#isBound() unbound} (or its previous value) while subtasks are executing. 252 * In addition to providing a safe and efficient means to inherit a value into subtasks, 253 * the inheritance allows sequential code using {@code ScopedValue} be refactored to use 254 * structured concurrency. 255 * 256 * <p> To ensure correctness, opening a new {@code StructuredTaskScope} captures the 257 * current thread's scoped value bindings. These are the scoped values bindings that are 258 * inherited by the threads created to execute subtasks in the task scope. Forking a 259 * subtask checks that the bindings in effect at the time that the subtask is forked 260 * match the bindings when the {@code StructuredTaskScope} was created. This check ensures 261 * that a subtask does not inherit a binding that is reverted in the main task before the 262 * subtask has completed. 263 * 264 * <p> A {@code ScopedValue} that is shared across threads requires that the value be an 265 * immutable object or for all access to the value to be appropriately synchronized. 266 * 267 * <p> The following example demonstrates the inheritance of scoped value bindings. The 268 * scoped value USERNAME is bound to the value "duke" for the bounded period of a lambda 269 * expression by the thread executing it. The code in the block opens a {@code 270 * StructuredTaskScope} and forks two subtasks, it then waits in the {@code join} method 271 * and aggregates the results from both subtasks. If code executed by the threads 272 * running subtask1 and subtask2 uses {@link ScopedValue#get()}, to get the value of 273 * USERNAME, then value "duke" will be returned. 274 * {@snippet lang=java : 275 * // @link substring="newInstance" target="ScopedValue#newInstance()" : 276 * private static final ScopedValue<String> USERNAME = ScopedValue.newInstance(); 277 * 278 * // @link substring="callWhere" target="ScopedValue#callWhere" : 279 * Result result = ScopedValue.callWhere(USERNAME, "duke", () -> { 280 * 281 * try (var scope = StructuredTaskScope.open(Joiner.awaitAllSuccessfulOrThrow())) { 282 * 283 * Subtask<String> subtask1 = scope.fork( .. ); // inherits binding 284 * Subtask<Integer> subtask2 = scope.fork( .. ); // inherits binding 285 * 286 * scope.join(); 287 * return new MyResult(subtask1.get(), subtask2.get()); 288 * } 289 * 290 * }); 291 * } 292 * 293 * <p> A scoped value inherited into a subtask may be 294 * <a href="{@docRoot}/java.base/java/lang/ScopedValues.html#rebind">rebound</a> to a new 295 * value in the subtask for the bounded execution of some method executed in the subtask. 296 * When the method completes, the value of the {@code ScopedValue} reverts to its previous 297 * value, the value inherited from the thread executing the main task. 298 * 299 * <p> A subtask may execute code that itself opens a new {@code StructuredTaskScope}. 300 * A main task executing in thread T1 opens a {@code StructuredTaskScope} and forks a 301 * subtask that runs in thread T2. The scoped value bindings captured when T1 opens the 302 * task scope are inherited into T2. The subtask (in thread T2) executes code that opens a 303 * new {@code StructuredTaskScope} and forks a subtask that runs in thread T3. The scoped 304 * value bindings captured when T2 opens the task scope are inherited into T3. These 305 * include (or may be the same) as the bindings that were inherited from T1. In effect, 306 * scoped values are inherited into a tree of subtasks, not just one level of subtask. 307 * 308 * <h2>Memory consistency effects</h2> 309 * 310 * <p> Actions in the owner thread of a {@code StructuredTaskScope} prior to 311 * {@linkplain #fork forking} of a subtask 312 * <a href="{@docRoot}/java.base/java/util/concurrent/package-summary.html#MemoryVisibility"> 313 * <i>happen-before</i></a> any actions taken by that subtask, which in turn 314 * <i>happen-before</i> the subtask result is {@linkplain Subtask#get() retrieved}. 315 * 316 * <h2>General exceptions</h2> 317 * 318 * <p> Unless otherwise specified, passing a {@code null} argument to a method in this 319 * class will cause a {@link NullPointerException} to be thrown. 320 * 321 * @param <T> the result type of tasks executed in the task scope 322 * @param <R> the type of the result returned by the join method 323 * 324 * @jls 17.4.5 Happens-before Order 325 * @since 21 326 */ 327 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY) 328 public class StructuredTaskScope<T, R> implements AutoCloseable { 329 private static final VarHandle CANCELLED; 330 static { 331 try { 332 MethodHandles.Lookup l = MethodHandles.lookup(); 333 CANCELLED = l.findVarHandle(StructuredTaskScope.class,"cancelled", boolean.class); 334 } catch (Exception e) { 335 throw new ExceptionInInitializerError(e); 336 } 337 } 338 339 private final Joiner<? super T, ? extends R> joiner; 340 private final ThreadFactory threadFactory; 341 private final ThreadFlock flock; 342 343 // fields that are only accessed by owner thread 344 private boolean needToJoin; // set by fork to indicate that owner must join 345 private boolean joined; // set to true when owner joins 346 private boolean closed; 347 private Future<?> timerTask; 348 349 // set or read by any thread 350 private volatile boolean cancelled; 351 352 // set by the timer thread, read by the owner thread 353 private volatile boolean timeoutExpired; 354 355 /** 356 * Throws IllegalStateException if the task scope is closed. 357 */ 358 private void ensureOpen() { 359 assert Thread.currentThread() == flock.owner(); 360 if (closed) { 361 throw new IllegalStateException("Task scope is closed"); 362 } 363 } 364 365 /** 366 * Throws WrongThreadException if the current thread is not the owner thread. 367 */ 368 private void ensureOwner() { 369 if (Thread.currentThread() != flock.owner()) { 370 throw new WrongThreadException("Current thread not owner"); 371 } 372 } 373 374 /** 375 * Throws IllegalStateException if invoked by the owner thread and the owner thread 376 * has not joined. 377 */ 378 private void ensureJoinedIfOwner() { 379 if (Thread.currentThread() == flock.owner() && !joined) { 380 String msg = needToJoin ? "Owner did not join" : "join did not complete"; 381 throw new IllegalStateException(msg); 382 } 383 } 384 385 /** 386 * Interrupts all threads in this task scope, except the current thread. 387 */ 388 private void implInterruptAll() { 389 flock.threads() 390 .filter(t -> t != Thread.currentThread()) 391 .forEach(t -> { 392 try { 393 t.interrupt(); 394 } catch (Throwable ignore) { } 395 }); 396 } 397 398 @SuppressWarnings("removal") 399 private void interruptAll() { 400 if (System.getSecurityManager() == null) { 401 implInterruptAll(); 402 } else { 403 PrivilegedAction<Void> pa = () -> { 404 implInterruptAll(); 405 return null; 406 }; 407 AccessController.doPrivileged(pa); 408 } 409 } 410 411 /** 412 * Cancel exception if not already cancelled. 413 */ 414 private void cancelExecution() { 415 if (!cancelled && CANCELLED.compareAndSet(this, false, true)) { 416 // prevent new threads from starting 417 flock.shutdown(); 418 419 // interrupt all unfinished threads 420 interruptAll(); 421 422 // wakeup join 423 flock.wakeup(); 424 } 425 } 426 427 /** 428 * Schedules a task to cancel execution on timeout. 429 */ 430 private void scheduleTimeout(Duration timeout) { 431 assert Thread.currentThread() == flock.owner() && timerTask == null; 432 timerTask = TimerSupport.schedule(timeout, () -> { 433 if (!cancelled) { 434 timeoutExpired = true; 435 cancelExecution(); 436 } 437 }); 438 } 439 440 /** 441 * Cancels the timer task if set. 442 */ 443 private void cancelTimeout() { 444 assert Thread.currentThread() == flock.owner(); 445 if (timerTask != null) { 446 timerTask.cancel(false); 447 } 448 } 449 450 /** 451 * Invoked by the thread for a subtask when the subtask completes before execution 452 * was cancelled. 453 */ 454 private void onComplete(SubtaskImpl<? extends T> subtask) { 455 assert subtask.state() != Subtask.State.UNAVAILABLE; 456 if (joiner.onComplete(subtask)) { 457 cancelExecution(); 458 } 459 } 460 461 /** 462 * Initialize a new StructuredTaskScope. 463 */ 464 @SuppressWarnings("this-escape") 465 private StructuredTaskScope(Joiner<? super T, ? extends R> joiner, 466 ThreadFactory threadFactory, 467 String name) { 468 this.joiner = joiner; 469 this.threadFactory = threadFactory; 470 471 if (name == null) 472 name = Objects.toIdentityString(this); 473 this.flock = ThreadFlock.open(name); 474 } 475 476 /** 477 * Represents a subtask forked with {@link #fork(Callable)} or {@link #fork(Runnable)}. 478 * 479 * <p> Code that forks subtasks can use the {@link #get() get()} method after {@linkplain 480 * #join() joining} to obtain the result of a subtask that completed successfully. It 481 * can use the {@link #exception()} method to obtain the exception thrown by a subtask 482 * that failed. 483 * 484 * @param <T> the result type 485 * @since 21 486 */ 487 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY) 488 public sealed interface Subtask<T> extends Supplier<T> permits SubtaskImpl { 489 /** 490 * Represents the state of a subtask. 491 * @see Subtask#state() 492 * @since 21 493 */ 494 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY) 495 enum State { 496 /** 497 * The subtask result or exception is not available. This state indicates that 498 * the subtask was forked but has not completed, it completed after execution 499 * was cancelled, or it was forked after execution was cancelled (in which 500 * case a thread was not created to execute the subtask). 501 */ 502 UNAVAILABLE, 503 /** 504 * The subtask completed successfully. The {@link Subtask#get() Subtask.get()} 505 * method can be used to get the result. This is a terminal state. 506 */ 507 SUCCESS, 508 /** 509 * The subtask failed with an exception. The {@link Subtask#exception() 510 * Subtask.exception()} method can be used to get the exception. This is a 511 * terminal state. 512 */ 513 FAILED, 514 } 515 516 /** 517 * {@return the subtask state} 518 */ 519 State state(); 520 521 /** 522 * Returns the result of this subtask if it completed successfully. If 523 * {@linkplain #fork(Callable) forked} to execute a value-returning task then the 524 * result from the {@link Callable#call() call} method is returned. If 525 * {@linkplain #fork(Runnable) forked} to execute a task that does not return a 526 * result then {@code null} is returned. 527 * 528 * <p> Code executing in the scope owner thread can use this method to get the 529 * result of a successful subtask only after it has {@linkplain #join() joined}. 530 * 531 * <p> Code executing in the {@code Joiner} {@link Joiner#onComplete(Subtask) 532 * onComplete} method should test that the {@linkplain #state() subtask state} is 533 * {@link State#SUCCESS SUCCESS} before using this method to get the result. 534 * 535 * @return the possibly-null result 536 * @throws IllegalStateException if the subtask has not completed, did not complete 537 * successfully, or the current thread is the task scope owner and it has not joined 538 * @see State#SUCCESS 539 */ 540 T get(); 541 542 /** 543 * {@return the exception thrown by this subtask if it failed} If 544 * {@linkplain #fork(Callable) forked} to execute a value-returning task then 545 * the exception thrown by the {@link Callable#call() call} method is returned. 546 * If {@linkplain #fork(Runnable) forked} to execute a task that does not return 547 * a result then the exception thrown by the {@link Runnable#run() run} method is 548 * returned. 549 * 550 * <p> Code executing in the scope owner thread can use this method to get the 551 * exception thrown by a failed subtask only after it has {@linkplain #join() joined}. 552 * 553 * <p> Code executing in a {@code Joiner} {@link Joiner#onComplete(Subtask) 554 * onComplete} method should test that the {@linkplain #state() subtask state} is 555 * {@link State#FAILED FAILED} before using this method to get the exception. 556 * 557 * @throws IllegalStateException if the subtask has not completed, completed with 558 * a result, or the current thread is the task scope owner and it has not joined 559 * @see State#FAILED 560 */ 561 Throwable exception(); 562 } 563 564 /** 565 * An object used with a {@link StructuredTaskScope} to handle subtask completion 566 * and produce the result for a main task waiting in the {@link #join() join} method 567 * for subtasks to complete. 568 * 569 * <p> Joiner defines static methods to create {@code Joiner} objects for common cases: 570 * <ul> 571 * <li> {@link #allSuccessfulOrThrow() allSuccessfulOrThrow()} creates a {@code Joiner} 572 * that yields a stream of the completed subtasks for {@code join} to return when 573 * all subtasks complete successfully. It cancels execution and causes {@code join} 574 * to throw if any subtask fails. 575 * <li> {@link #anySuccessfulResultOrThrow() anySuccessfulResultOrThrow()} creates a 576 * {@code Joiner} that yields the result of the first subtask to succeed. It cancels 577 * execution and causes {@code join} to throw if all subtasks fail. 578 * <li> {@link #awaitAllSuccessfulOrThrow() awaitAllSuccessfulOrThrow()} creates a 579 * {@code Joiner} that waits for all successful subtasks. It cancels execution and 580 * causes {@code join} to throw if any subtask fails. 581 * <li> {@link #awaitAll() awaitAll()} creates a {@code Joiner} that waits for all 582 * subtasks. If does not cancel execution. 583 * </ul> 584 * 585 * <p> In addition to the methods to create {@code Joiner} objects for common cases, 586 * the {@link #all(Predicate) all(Predicate)} method is defined to create a {@code 587 * Joiner} that yields a stream of all subtasks. It is created with a {@link 588 * Predicate Predicate} that determines if execution should continue or be cancelled. 589 * This {@code Joiner} can be built upon to create custom policies that cancel 590 * execution based on some condition. 591 * 592 * <p> More advanced policies can be developed by implementing the {@code Joiner} 593 * interface. The {@link #onFork(Subtask)} method is invoked when subtasks are forked. 594 * The {@link #onComplete(Subtask)} method is invoked when subtasks complete with a 595 * result or exception. These methods return a {@code boolean} to indicate if execution 596 * should be cancelled. These methods can be used to collect subtasks, results, or 597 * exceptions, and control when to cancel execution. The {@link #result()} method 598 * must be implemented to produce the result (or exception) for the {@code join} 599 * method. 600 * 601 * <p> Unless otherwise specified, passing a {@code null} argument to a method 602 * in this class will cause a {@link NullPointerException} to be thrown. 603 * 604 * @implSpec Implementations of this interface must be thread safe. The {@link 605 * #onComplete(Subtask)} method defined by this interface may be invoked by several 606 * threads concurrently. 607 * 608 * @apiNote It is very important that a new {@code Joiner} object is created for each 609 * {@code StructuredTaskScope}. {@code Joiner} objects should never be shared with 610 * different task scopes or re-used after a task is closed. 611 * 612 * <p> Designing a {@code Joiner} should take into account the code at the use-site 613 * where the results from the {@link StructuredTaskScope#join() join} method are 614 * processed. It should be clear what the {@code Joiner} does vs. the application 615 * code at the use-site. In general, the {@code Joiner} implementation is not the 616 * place to code "business logic". A {@code Joiner} should be designed to be as 617 * general purpose as possible. 618 * 619 * @param <T> the result type of tasks executed in the task scope 620 * @param <R> the type of results returned by the join method 621 * @since 24 622 * @see #open(Joiner) 623 */ 624 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY) 625 @FunctionalInterface 626 public interface Joiner<T, R> { 627 628 /** 629 * Invoked by {@link #fork(Callable) fork(Callable)} and {@link #fork(Runnable) 630 * fork(Runnable)} when forking a subtask. The method is invoked from the task 631 * owner thread. The method is invoked before a thread is created to run the 632 * subtask. 633 * 634 * @implSpec The default implementation throws {@code NullPointerException} if the 635 * subtask is {@code null}. It throws {@code IllegalArgumentException} if the 636 * subtask is not in the {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state, it 637 * otherwise returns {@code false}. 638 * 639 * @apiNote This method is invoked by the {@code fork} methods. It should not be 640 * invoked directly. 641 * 642 * @param subtask the subtask 643 * @return {@code true} to cancel execution 644 */ 645 default boolean onFork(Subtask<? extends T> subtask) { 646 if (subtask.state() != Subtask.State.UNAVAILABLE) { 647 throw new IllegalArgumentException(); 648 } 649 return false; 650 } 651 652 /** 653 * Invoked by the thread started to execute a subtask after the subtask completes 654 * successfully or fails with an exception. This method is not invoked if a 655 * subtask completes after execution has been cancelled. 656 * 657 * @implSpec The default implementation throws {@code NullPointerException} if the 658 * subtask is {@code null}. It throws {@code IllegalArgumentException} if the 659 * subtask is not in the {@link Subtask.State#SUCCESS SUCCESS} or {@link 660 * Subtask.State#FAILED FAILED} state, it otherwise returns {@code false}. 661 * 662 * @apiNote This method is invoked by subtasks when they complete. It should not 663 * be invoked directly. 664 * 665 * @param subtask the subtask 666 * @return {@code true} to cancel execution 667 */ 668 default boolean onComplete(Subtask<? extends T> subtask) { 669 if (subtask.state() == Subtask.State.UNAVAILABLE) { 670 throw new IllegalArgumentException(); 671 } 672 return false; 673 } 674 675 /** 676 * Invoked by {@link #join()} to produce the result (or exception) after waiting 677 * for all subtasks to complete or execution to be cancelled. The result from this 678 * method is returned by the {@code join} method. If this method throws, then 679 * {@code join} throws {@link ExecutionException} with the exception thrown by 680 * this method as the cause. 681 * 682 * <p> In normal usage, this method will be called at most once to produce the 683 * result (or exception). If the {@code join} method is called more than once 684 * then this method may be called more than once to produce the result. An 685 * implementation should return an equal result (or throw the same exception) on 686 * second or subsequent calls to produce the outcome. 687 * 688 * @apiNote This method is invoked by the {@code join} method. It should not be 689 * invoked directly. 690 * 691 * @return the result 692 * @throws Throwable the exception 693 */ 694 R result() throws Throwable; 695 696 /** 697 * {@return a new Joiner object that yields a stream of all subtasks when all 698 * subtasks complete successfully, or throws if any subtask fails} 699 * If any subtask fails then execution is cancelled. 700 * 701 * <p> If all subtasks complete successfully, the joiner's {@link Joiner#result()} 702 * method returns a stream of all subtasks in the order that they were forked. 703 * If any subtask failed then the {@code result} method throws the exception from 704 * the first subtask to fail. 705 * 706 * @apiNote This joiner is intended for cases where the results for all subtasks 707 * are required ("invoke all"); if any subtask fails then the results of other 708 * unfinished subtasks are no longer needed. A typical usage will be when the 709 * subtasks return results of the same type, the returned stream of forked 710 * subtasks can be used to get the results. 711 * 712 * @param <T> the result type of subtasks 713 */ 714 static <T> Joiner<T, Stream<Subtask<T>>> allSuccessfulOrThrow() { 715 return new AllSuccessful<>(); 716 } 717 718 /** 719 * {@return a new Joiner object that yields the result of a subtask that completed 720 * successfully, or throws if all subtasks fail} If any subtask completes 721 * successfully then execution is cancelled. 722 * 723 * <p> The joiner's {@link Joiner#result()} method returns the result of a subtask 724 * that completed successfully. If all subtasks fail then the {@code result} method 725 * throws the exception from one of the failed subtasks. The {@code result} method 726 * throws {@code NoSuchElementException} if no subtasks were forked. 727 * 728 * @apiNote This joiner is intended for cases where the result of any subtask will 729 * do ("invoke any") and where the results of other unfinished subtasks are no 730 * longer needed. 731 * 732 * @param <T> the result type of subtasks 733 */ 734 static <T> Joiner<T, T> anySuccessfulResultOrThrow() { 735 return new AnySuccessful<>(); 736 } 737 738 /** 739 * {@return a new Joiner object that waits for all successful subtasks. It 740 * <a href="StructuredTaskScope.html#CancelExecution">cancels execution</a> if 741 * any subtask fails} 742 * 743 * <p> The joiner's {@link Joiner#result() result} method returns {@code null} 744 * if all subtasks complete successfully, or throws the exception from the first 745 * subtask to fail. 746 * 747 * @apiNote This joiner is intended for cases where the results for all subtasks 748 * are required ("invoke all"), and where the code {@linkplain #fork(Callable) 749 * forking} subtasks keeps a reference to the {@linkplain Subtask Subtask} objects. 750 * A typical usage will be when subtasks return results of different types. 751 * 752 * @param <T> the result type of subtasks 753 */ 754 static <T> Joiner<T, Void> awaitAllSuccessfulOrThrow() { 755 return new AwaitSuccessful<>(); 756 } 757 758 /** 759 * {@return a new Joiner object that waits for all subtasks} 760 * 761 * <p> The joiner's {@link Joiner#result() result} method returns {@code null}. 762 * 763 * @apiNote This joiner is intended for cases where subtasks make use of 764 * <em>side-effects</em> rather than return results or fail with exceptions. 765 * The {@link #fork(Runnable) fork(Runnable)} method can be used to fork subtasks 766 * that do not return a result. 767 * 768 * @param <T> the result type of subtasks 769 */ 770 static <T> Joiner<T, Void> awaitAll() { 771 // ensure that new Joiner object is returned 772 return new Joiner<T, Void>() { 773 @Override 774 public Void result() { 775 return null; 776 } 777 }; 778 } 779 780 /** 781 * {@return a new Joiner object that yields a stream of all subtasks, cancelling 782 * execution when evaluating a completed subtask with the given predicate returns 783 * {@code true}} 784 * 785 * <p> The joiner's {@link Joiner#onComplete(Subtask)} method invokes the 786 * predicate's {@link Predicate#test(Object) test} method with the subtask that 787 * completed successfully or failed with an exception. If the {@code test} method 788 * returns {@code true} then <a href="StructuredTaskScope.html#CancelExecution"> 789 * execution is cancelled</a>. The {@code test} method must be thread safe as it 790 * may be invoked concurrently from several threads. 791 * 792 * <p> The joiner's {@link #result()} method returns the stream of all subtasks, 793 * in fork order. The stream may contain subtasks that have completed 794 * (in {@link Subtask.State#SUCCESS SUCCESS} or {@link Subtask.State#FAILED FAILED} 795 * state) or subtasks in the {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state 796 * if execution was cancelled before all subtasks were forked or completed. 797 * 798 * <p> The following example uses this method to create a {@code Joiner} that 799 * <a href="StructuredTaskScope.html#CancelExecution">cancels execution</a> when 800 * two or more subtasks fail. 801 * {@snippet lang=java : 802 * class CancelAfterTwoFailures<T> implements Predicate<Subtask<? extends T>> { 803 * private final AtomicInteger failedCount = new AtomicInteger(); 804 * @Override 805 * public boolean test(Subtask<? extends T> subtask) { 806 * return subtask.state() == Subtask.State.FAILED 807 * && failedCount.incrementAndGet() >= 2; 808 * } 809 * } 810 * 811 * var joinPolicy = Joiner.all(new CancelAfterTwoFailures<String>()); 812 * } 813 * 814 * @param isDone the predicate to evaluate completed subtasks 815 * @param <T> the result type of subtasks 816 */ 817 static <T> Joiner<T, Stream<Subtask<T>>> all(Predicate<Subtask<? extends T>> isDone) { 818 return new AllSubtasks<>(isDone); 819 } 820 } 821 822 /** 823 * Represents the configuration for a {@code StructuredTaskScope}. 824 * 825 * <p> The configuration for a {@code StructuredTaskScope} consists of a {@link 826 * ThreadFactory} to create threads, an optional name for the purposes of monitoring 827 * and management, and an optional timeout. 828 * 829 * <p> Creating a {@code StructuredTaskScope} with its 1-arg {@link #open(Joiner) open} 830 * method uses the <a href="StructuredTaskScope.html#DefaultConfiguration">default 831 * configuration</a>. The default configuration consists of a thread factory that 832 * creates unnamed <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads"> 833 * virtual threads</a>, no name for monitoring and management purposes, and no timeout. 834 * 835 * <p> Creating a {@code StructuredTaskScope} with its 2-arg {@link #open(Joiner, Function) 836 * open} method allows a different configuration to be used. The function specified 837 * to the {@code open} method is applied to the default configuration and returns the 838 * configuration for the {@code StructuredTaskScope} under construction. The function 839 * can use the {@code with-} prefixed methods defined here to specify the components 840 * of the configuration to use. 841 * 842 * <p> Unless otherwise specified, passing a {@code null} argument to a method 843 * in this class will cause a {@link NullPointerException} to be thrown. 844 * 845 * @since 24 846 */ 847 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY) 848 public sealed interface Config permits ConfigImpl { 849 /** 850 * {@return a new {@code Config} object with the given thread factory} 851 * The other components are the same as this object. The thread factory is used by 852 * a task scope to create threads when {@linkplain #fork(Callable) forking} subtasks. 853 * @param threadFactory the thread factory 854 * 855 * @apiNote The thread factory will typically create 856 * <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>, 857 * maybe with names for monitoring purposes, an {@linkplain Thread.UncaughtExceptionHandler 858 * uncaught exception handler}, or other properties configured. 859 * 860 * @see #fork(Callable) 861 */ 862 Config withThreadFactory(ThreadFactory threadFactory); 863 864 /** 865 * {@return a new {@code Config} object with the given name} 866 * The other components are the same as this object. A task scope is optionally 867 * named for the purposes of monitoring and management. 868 * @param name the name 869 * @see StructuredTaskScope#toString() 870 */ 871 Config withName(String name); 872 873 /** 874 * {@return a new {@code Config} object with the given timeout} 875 * The other components are the same as this object. 876 * @param timeout the timeout 877 * 878 * @apiNote Applications using deadlines, expressed as an {@link java.time.Instant}, 879 * can use {@link Duration#between Duration.between(Instant.now(), deadline)} to 880 * compute the timeout for this method. 881 * 882 * @see #join() 883 */ 884 Config withTimeout(Duration timeout); 885 } 886 887 /** 888 * Opens a new structured task scope to use the given {@code Joiner} object and with 889 * configuration that is the result of applying the given function to the 890 * <a href="#DefaultConfiguration">default configuration</a>. 891 * 892 * <p> The {@code configFunction} is called with the default configuration and returns 893 * the configuration for the new structured task scope. The function may, for example, 894 * set the {@linkplain Config#withThreadFactory(ThreadFactory) ThreadFactory} or set 895 * a {@linkplain Config#withTimeout(Duration) timeout}. 896 * 897 * <p> If a {@linkplain Config#withThreadFactory(ThreadFactory) ThreadFactory} is set 898 * then the {@code ThreadFactory}'s {@link ThreadFactory#newThread(Runnable) newThread} 899 * method will be used to create threads when forking subtasks in this task scope. 900 * 901 * <p> If a {@linkplain Config#withTimeout(Duration) timeout} is set then it starts 902 * when the task scope is opened. If the timeout expires before the task scope has 903 * {@linkplain #join() joined} then execution is cancelled and the {@code join} method 904 * throws {@link ExecutionException} with {@link TimeoutException} as the cause. 905 * 906 * <p> The new task scope is owned by the current thread. Only code executing in this 907 * thread can {@linkplain #fork(Callable) fork}, {@linkplain #join() join}, or 908 * {@linkplain #close close} the task scope. 909 * 910 * <p> Construction captures the current thread's {@linkplain ScopedValue scoped 911 * value} bindings for inheritance by threads started in the task scope. 912 * 913 * @param joiner the joiner 914 * @param configFunction a function to produce the configuration 915 * @return a new task scope 916 * @param <T> the result type of tasks executed in the task scope 917 * @param <R> the type of the result returned by the join method 918 * @since 24 919 */ 920 public static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner, 921 Function<Config, Config> configFunction) { 922 Objects.requireNonNull(joiner); 923 924 var config = (ConfigImpl) configFunction.apply(ConfigImpl.defaultConfig()); 925 var scope = new StructuredTaskScope<T, R>(joiner, config.threadFactory(), config.name()); 926 927 // schedule timeout 928 Duration timeout = config.timeout(); 929 if (timeout != null) { 930 boolean done = false; 931 try { 932 scope.scheduleTimeout(timeout); 933 done = true; 934 } finally { 935 if (!done) { 936 scope.close(); // pop if scheduling timeout failed 937 } 938 } 939 } 940 941 return scope; 942 } 943 944 /** 945 * Opens a new structured task scope to use the given {@code Joiner} object. The 946 * task scope is created with the <a href="#DefaultConfiguration">default configuration</a>. 947 * The default configuration has a {@code ThreadFactory} that creates unnamed 948 * <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>, 949 * is unnamed for monitoring and management purposes, and has no timeout. 950 * 951 * @implSpec 952 * This factory method is equivalent to invoking the 2-arg open method with the given 953 * joiner and the {@linkplain Function#identity() identity function}. 954 * 955 * @param joiner the joiner 956 * @return a new task scope 957 * @param <T> the result type of tasks executed in the task scope 958 * @param <R> the type of the result returned by the join method 959 * @since 24 960 */ 961 public static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner) { 962 return open(joiner, Function.identity()); 963 } 964 965 /** 966 * Starts a new thread in this task scope to execute a value-returning task, thus 967 * creating a <em>subtask</em>. The value-returning task is provided to this method 968 * as a {@link Callable}, the thread executes the task's {@link Callable#call() call} 969 * method. 970 * 971 * <p> This method first creates a {@link Subtask Subtask} to represent the <em>forked 972 * subtask</em>. It invokes the joiner's {@link Joiner#onFork(Subtask) onFork} method 973 * with the {@code Subtask} object. If the {@code onFork} completes with an exception 974 * or error then it is propagated by the {@code fork} method. If execution is 975 * {@linkplain #isCancelled() cancelled}, or {@code onFork} returns {@code true} to 976 * cancel execution, then this method returns the {@code Subtask} (in the {@link 977 * Subtask.State#UNAVAILABLE UNAVAILABLE} state) without creating a thread to execute 978 * the subtask. If execution is not cancelled then a thread is created with the 979 * {@link ThreadFactory} configured when the task scope was created, and the thread is 980 * started. Forking a subtask inherits the current thread's {@linkplain ScopedValue 981 * scoped value} bindings. The bindings must match the bindings captured when the 982 * task scope was opened. If the subtask completes (successfully or with an exception) 983 * before execution is cancelled, then the thread invokes the joiner's 984 * {@link Joiner#onComplete(Subtask) onComplete} method with subtask in the 985 * {@link Subtask.State#SUCCESS SUCCESS} or {@link Subtask.State#FAILED FAILED} state. 986 * 987 * <p> This method returns the {@link Subtask Subtask} object. In some usages, this 988 * object may be used to get its result. In other cases it may be used for correlation 989 * or just discarded. To ensure correct usage, the {@link Subtask#get() Subtask.get()} 990 * method may only be called by the task scope owner to get the result after it has 991 * waited for subtasks to complete with the {@link #join() join} method and the subtask 992 * completed successfully. Similarly, the {@link Subtask#exception() Subtask.exception()} 993 * method may only be called by the task scope owner after it has joined and the subtask 994 * failed. If execution was cancelled before the subtask was forked, or before it 995 * completes, then neither method can be used to obtain the outcome. 996 * 997 * <p> This method may only be invoked by the task scope owner. 998 * 999 * @param task the value-returning task for the thread to execute 1000 * @param <U> the result type 1001 * @return the subtask 1002 * @throws IllegalStateException if this task scope is closed or the owner has already 1003 * joined 1004 * @throws WrongThreadException if the current thread is not the task scope owner 1005 * @throws StructureViolationException if the current scoped value bindings are not 1006 * the same as when the task scope was created 1007 * @throws RejectedExecutionException if the thread factory rejected creating a 1008 * thread to run the subtask 1009 */ 1010 public <U extends T> Subtask<U> fork(Callable<? extends U> task) { 1011 Objects.requireNonNull(task); 1012 ensureOwner(); 1013 ensureOpen(); 1014 if (joined) { 1015 throw new IllegalStateException("Already joined"); 1016 } 1017 1018 var subtask = new SubtaskImpl<U>(this, task); 1019 1020 // notify joiner, even if cancelled 1021 if (joiner.onFork(subtask)) { 1022 cancelExecution(); 1023 } 1024 1025 if (!cancelled) { 1026 // create thread to run task 1027 Thread thread = threadFactory.newThread(subtask); 1028 if (thread == null) { 1029 throw new RejectedExecutionException("Rejected by thread factory"); 1030 } 1031 1032 // attempt to start the thread 1033 try { 1034 flock.start(thread); 1035 } catch (IllegalStateException e) { 1036 // shutdown by another thread, or underlying flock is shutdown due 1037 // to unstructured use 1038 } 1039 } 1040 1041 needToJoin = true; 1042 return subtask; 1043 } 1044 1045 /** 1046 * Starts a new thread in this task scope to execute a task that does not return a 1047 * result, creating a <em>subtask</em>. 1048 * 1049 * <p> This method works exactly the same as {@link #fork(Callable)} except that 1050 * the task is provided to this method as a {@link Runnable}, the thread executes 1051 * the task's {@link Runnable#run() run} method, and its result is {@code null}. 1052 * 1053 * @param task the task for the thread to execute 1054 * @return the subtask 1055 * @throws IllegalStateException if this task scope is closed or the owner has already 1056 * joined 1057 * @throws WrongThreadException if the current thread is not the task scope owner 1058 * @throws StructureViolationException if the current scoped value bindings are not 1059 * the same as when the task scope was created 1060 * @throws RejectedExecutionException if the thread factory rejected creating a 1061 * thread to run the subtask 1062 * @since 24 1063 */ 1064 public Subtask<? extends T> fork(Runnable task) { 1065 Objects.requireNonNull(task); 1066 return fork(() -> { task.run(); return null; }); 1067 } 1068 1069 /** 1070 * Waits for all subtasks started in this task scope to complete or execution to be 1071 * cancelled. If a {@linkplain Config#withTimeout(Duration) timeout} has been set 1072 * then execution will be cancelled if the timeout expires before or while waiting. 1073 * Once finished waiting, the {@code Joiner}'s {@link Joiner#result() result} 1074 * method is invoked to get the result or throw an exception. If the {@code result} 1075 * method throws then this method throws {@code ExecutionException} with the 1076 * exception thrown by the {@code result()} method as the cause. 1077 * 1078 * <p> This method waits for all subtasks by waiting for all threads {@linkplain 1079 * #fork(Callable) started} in this task scope to finish execution. It stops waiting 1080 * when all threads finish, the {@code Joiner}'s {@link Joiner#onFork(Subtask) 1081 * onFork} or {@link Joiner#onComplete(Subtask) onComplete} returns {@code true} 1082 * to cancel execution, the timeout (if set) expires, or the current thread is 1083 * {@linkplain Thread#interrupt() interrupted}. 1084 * 1085 * <p> This method may only be invoked by the task scope owner. 1086 * 1087 * @return the {@link Joiner#result() result} 1088 * @throws IllegalStateException if this task scope is closed 1089 * @throws WrongThreadException if the current thread is not the task scope owner 1090 * @throws ExecutionException if the joiner's {@code result} method throws, or with 1091 * cause {@link TimeoutException} if a timeout is set and the timeout expires 1092 * @throws InterruptedException if interrupted while waiting 1093 * @since 24 1094 */ 1095 public R join() throws ExecutionException, InterruptedException { 1096 ensureOwner(); 1097 ensureOpen(); 1098 1099 if (!joined) { 1100 // owner has attempted to join 1101 needToJoin = false; 1102 1103 // wait for all threads, execution to be cancelled, or interrupt 1104 flock.awaitAll(); 1105 1106 // throw if timeout expired while waiting 1107 if (timeoutExpired) { 1108 throw new ExecutionException(new TimeoutException()); 1109 } 1110 1111 // join completed successfully 1112 cancelTimeout(); 1113 joined = true; 1114 } 1115 1116 // invoke joiner to get result 1117 try { 1118 return joiner.result(); 1119 } catch (Throwable e) { 1120 throw new ExecutionException(e); 1121 } 1122 } 1123 1124 /** 1125 * Cancels execution. This method allows the task scope owner to explicitly 1126 * <a href="#CancelExecution">cancel execution</a>. If not already cancelled, this 1127 * method {@linkplain Thread#interrupt() interrupts} the threads executing subtasks 1128 * that have not completed, and prevents new threads from being started in the task 1129 * scope. 1130 * 1131 * @apiNote This method is intended for cases where a task scope is created with a 1132 * {@link Joiner Joiner} that doesn't cancel execution or where the code in the main 1133 * task needs to cancel execution due to some exception or other condition in the main 1134 * task. The following example accepts network connections indefinitely, forking a 1135 * subtask to handle each connection. If the {@code accept()} method in the example 1136 * throws then the main task cancels execution before joining and closing the scope. 1137 * The {@link #close() close} method waits for the interrupted threads to finish. 1138 * 1139 * {@snippet lang=java : 1140 * // @link substring="awaitAll" target="Joiner#awaitAll()" : 1141 * try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { 1142 * 1143 * try { 1144 * while (true) { 1145 * Socket peer = listener.accept(); 1146 * // @link substring="fork" target="#fork(Runnable)" : 1147 * scope.fork(() -> handle(peer)); 1148 * } 1149 * } finally { 1150 * scope.cancel(); 1151 * scope.join(); // completes immediately 1152 * } 1153 * 1154 * } 1155 * } 1156 * 1157 * @throws IllegalStateException if this task scope is closed 1158 * @throws WrongThreadException if the current thread is not the task scope owner 1159 * @since 24 1160 */ 1161 public void cancel() { 1162 ensureOwner(); 1163 ensureOpen(); 1164 cancelExecution(); 1165 } 1166 1167 /** 1168 * {@return {@code true} if <a href="#CancelExecution">execution is cancelled</a>, 1169 * or in the process of being cancelled, otherwise {@code false}} 1170 * 1171 * <p> Cancelling execution prevents new threads from starting in the task scope and 1172 * {@linkplain Thread#interrupt() interrupts} threads executing unfinished subtasks. 1173 * It may take some time before the interrupted threads finish execution; this 1174 * method may return {@code true} before all threads have been interrupted or before 1175 * all threads have finished. 1176 * 1177 * @apiNote A main task with a lengthy "forking phase" (the code that executes before 1178 * the main task invokes {@link #join() join}) may use this method to avoid doing work 1179 * in cases where execution was cancelled by the completion of a previously forked 1180 * subtask or timeout. 1181 * 1182 * @since 24 1183 */ 1184 public boolean isCancelled() { 1185 return cancelled; 1186 } 1187 1188 /** 1189 * Closes this task scope. 1190 * 1191 * <p> This method first <a href="#CancelExecution">cancels execution</a>, if not 1192 * already cancelled. This interrupts the threads executing unfinished subtasks. This 1193 * method then waits for all threads to finish. If interrupted while waiting then it 1194 * will continue to wait until the threads finish, before completing with the interrupt 1195 * status set. 1196 * 1197 * <p> This method may only be invoked by the task scope owner. If the task scope 1198 * is already closed then the task scope owner invoking this method has no effect. 1199 * 1200 * <p> A {@code StructuredTaskScope} is intended to be used in a <em>structured 1201 * manner</em>. If this method is called to close a task scope before nested task 1202 * scopes are closed then it closes the underlying construct of each nested task scope 1203 * (in the reverse order that they were created in), closes this task scope, and then 1204 * throws {@link StructureViolationException}. 1205 * Similarly, if this method is called to close a task scope while executing with 1206 * {@linkplain ScopedValue scoped value} bindings, and the task scope was created 1207 * before the scoped values were bound, then {@code StructureViolationException} is 1208 * thrown after closing the task scope. 1209 * If a thread terminates without first closing task scopes that it owns then 1210 * termination will cause the underlying construct of each of its open tasks scopes to 1211 * be closed. Closing is performed in the reverse order that the task scopes were 1212 * created in. Thread termination may therefore be delayed when the task scope owner 1213 * has to wait for threads forked in these task scopes to finish. 1214 * 1215 * @throws IllegalStateException thrown after closing the task scope if the task scope 1216 * owner did not attempt to join after forking 1217 * @throws WrongThreadException if the current thread is not the task scope owner 1218 * @throws StructureViolationException if a structure violation was detected 1219 */ 1220 @Override 1221 public void close() { 1222 ensureOwner(); 1223 if (closed) { 1224 return; 1225 } 1226 1227 // cancel execution if not already joined 1228 if (!joined) { 1229 cancelExecution(); 1230 cancelTimeout(); 1231 } 1232 1233 // wait for stragglers 1234 try { 1235 flock.close(); 1236 } finally { 1237 closed = true; 1238 } 1239 1240 // throw ISE if the owner didn't join after forking 1241 if (needToJoin) { 1242 needToJoin = false; 1243 throw new IllegalStateException("Owner did not join"); 1244 } 1245 } 1246 1247 /** 1248 * {@inheritDoc} If a {@link Config#withName(String) name} for monitoring and 1249 * monitoring purposes has been set then the string representation includes the name. 1250 */ 1251 @Override 1252 public String toString() { 1253 return flock.name(); 1254 } 1255 1256 /** 1257 * Subtask implementation, runs the task specified to the fork method. 1258 */ 1259 private static final class SubtaskImpl<T> implements Subtask<T>, Runnable { 1260 private static final AltResult RESULT_NULL = new AltResult(Subtask.State.SUCCESS); 1261 1262 private record AltResult(Subtask.State state, Throwable exception) { 1263 AltResult(Subtask.State state) { 1264 this(state, null); 1265 } 1266 } 1267 1268 private final StructuredTaskScope<? super T, ?> scope; 1269 private final Callable<? extends T> task; 1270 private volatile Object result; 1271 1272 SubtaskImpl(StructuredTaskScope<? super T, ?> scope, Callable<? extends T> task) { 1273 this.scope = scope; 1274 this.task = task; 1275 } 1276 1277 @Override 1278 public void run() { 1279 T result = null; 1280 Throwable ex = null; 1281 try { 1282 result = task.call(); 1283 } catch (Throwable e) { 1284 ex = e; 1285 } 1286 1287 // nothing to do if task scope is cancelled 1288 if (scope.isCancelled()) 1289 return; 1290 1291 // set result/exception and invoke onComplete 1292 if (ex == null) { 1293 this.result = (result != null) ? result : RESULT_NULL; 1294 } else { 1295 this.result = new AltResult(State.FAILED, ex); 1296 } 1297 scope.onComplete(this); 1298 } 1299 1300 @Override 1301 public Subtask.State state() { 1302 Object result = this.result; 1303 if (result == null) { 1304 return State.UNAVAILABLE; 1305 } else if (result instanceof AltResult alt) { 1306 // null or failed 1307 return alt.state(); 1308 } else { 1309 return State.SUCCESS; 1310 } 1311 } 1312 1313 1314 @Override 1315 public T get() { 1316 scope.ensureJoinedIfOwner(); 1317 Object result = this.result; 1318 if (result instanceof AltResult) { 1319 if (result == RESULT_NULL) return null; 1320 } else if (result != null) { 1321 @SuppressWarnings("unchecked") 1322 T r = (T) result; 1323 return r; 1324 } 1325 throw new IllegalStateException( 1326 "Result is unavailable or subtask did not complete successfully"); 1327 } 1328 1329 @Override 1330 public Throwable exception() { 1331 scope.ensureJoinedIfOwner(); 1332 Object result = this.result; 1333 if (result instanceof AltResult alt && alt.state() == State.FAILED) { 1334 return alt.exception(); 1335 } 1336 throw new IllegalStateException( 1337 "Exception is unavailable or subtask did not complete with exception"); 1338 } 1339 1340 @Override 1341 public String toString() { 1342 String stateAsString = switch (state()) { 1343 case UNAVAILABLE -> "[Unavailable]"; 1344 case SUCCESS -> "[Completed successfully]"; 1345 case FAILED -> { 1346 Throwable ex = ((AltResult) result).exception(); 1347 yield "[Failed: " + ex + "]"; 1348 } 1349 }; 1350 return Objects.toIdentityString(this) + stateAsString; 1351 } 1352 } 1353 1354 /** 1355 * A joiner that returns a stream of all subtasks when all subtasks complete 1356 * successfully. If any subtask fails then execution is cancelled. 1357 */ 1358 private static final class AllSuccessful<T> implements Joiner<T, Stream<Subtask<T>>> { 1359 private static final VarHandle FIRST_EXCEPTION; 1360 static { 1361 try { 1362 MethodHandles.Lookup l = MethodHandles.lookup(); 1363 FIRST_EXCEPTION = l.findVarHandle(AllSuccessful.class, "firstException", Throwable.class); 1364 } catch (Exception e) { 1365 throw new ExceptionInInitializerError(e); 1366 } 1367 } 1368 // list of forked subtasks, only accessed by owner thread 1369 private final List<Subtask<T>> subtasks = new ArrayList<>(); 1370 private volatile Throwable firstException; 1371 1372 @Override 1373 public boolean onFork(Subtask<? extends T> subtask) { 1374 @SuppressWarnings("unchecked") 1375 var tmp = (Subtask<T>) subtask; 1376 subtasks.add(tmp); 1377 return false; 1378 } 1379 1380 @Override 1381 public boolean onComplete(Subtask<? extends T> subtask) { 1382 return (subtask.state() == Subtask.State.FAILED) 1383 && (firstException == null) 1384 && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception()); 1385 } 1386 1387 @Override 1388 public Stream<Subtask<T>> result() throws Throwable { 1389 Throwable ex = firstException; 1390 if (ex != null) { 1391 throw ex; 1392 } else { 1393 return subtasks.stream(); 1394 } 1395 } 1396 } 1397 1398 /** 1399 * A joiner that returns the result of the first subtask to complete successfully. 1400 * If any subtask completes successfully then execution is cancelled. 1401 */ 1402 private static final class AnySuccessful<T> implements Joiner<T, T> { 1403 private static final VarHandle FIRST_SUCCESS; 1404 private static final VarHandle FIRST_EXCEPTION; 1405 static { 1406 try { 1407 MethodHandles.Lookup l = MethodHandles.lookup(); 1408 FIRST_SUCCESS = l.findVarHandle(AnySuccessful.class, "firstSuccess", Subtask.class); 1409 FIRST_EXCEPTION = l.findVarHandle(AnySuccessful.class, "firstException", Throwable.class); 1410 } catch (Exception e) { 1411 throw new ExceptionInInitializerError(e); 1412 } 1413 } 1414 private volatile Subtask<T> firstSuccess; 1415 private volatile Throwable firstException; 1416 1417 @Override 1418 public boolean onComplete(Subtask<? extends T> subtask) { 1419 if (firstSuccess == null) { 1420 if (subtask.state() == Subtask.State.SUCCESS) { 1421 // capture the first subtask that completes successfully 1422 return FIRST_SUCCESS.compareAndSet(this, null, subtask); 1423 } else if (firstException == null) { 1424 // capture the exception thrown by the first task to fail 1425 FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception()); 1426 } 1427 } 1428 return false; 1429 } 1430 1431 @Override 1432 public T result() throws Throwable { 1433 Subtask<T> firstSuccess = this.firstSuccess; 1434 if (firstSuccess != null) { 1435 return firstSuccess.get(); 1436 } 1437 Throwable firstException = this.firstException; 1438 if (firstException != null) { 1439 throw firstException; 1440 } else { 1441 throw new NoSuchElementException("No subtasks completed"); 1442 } 1443 } 1444 } 1445 1446 /** 1447 * A joiner that that waits for all successful subtasks. If any subtask fails the 1448 * execution is cancelled. 1449 */ 1450 private static final class AwaitSuccessful<T> implements Joiner<T, Void> { 1451 private static final VarHandle FIRST_EXCEPTION; 1452 static { 1453 try { 1454 MethodHandles.Lookup l = MethodHandles.lookup(); 1455 FIRST_EXCEPTION = l.findVarHandle(AwaitSuccessful.class, "firstException", Throwable.class); 1456 } catch (Exception e) { 1457 throw new ExceptionInInitializerError(e); 1458 } 1459 } 1460 private volatile Throwable firstException; 1461 1462 @Override 1463 public boolean onComplete(Subtask<? extends T> subtask) { 1464 return (subtask.state() == Subtask.State.FAILED) 1465 && (firstException == null) 1466 && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception()); 1467 } 1468 1469 @Override 1470 public Void result() throws Throwable { 1471 Throwable ex = firstException; 1472 if (ex != null) { 1473 throw ex; 1474 } else { 1475 return null; 1476 } 1477 } 1478 } 1479 1480 /** 1481 * A joiner that returns a stream of all subtasks. 1482 */ 1483 private static class AllSubtasks<T> implements Joiner<T, Stream<Subtask<T>>> { 1484 private final Predicate<Subtask<? extends T>> isDone; 1485 // list of forked subtasks, only accessed by owner thread 1486 private final List<Subtask<T>> subtasks = new ArrayList<>(); 1487 1488 AllSubtasks(Predicate<Subtask<? extends T>> isDone) { 1489 this.isDone = Objects.requireNonNull(isDone); 1490 } 1491 1492 @Override 1493 public boolean onFork(Subtask<? extends T> subtask) { 1494 @SuppressWarnings("unchecked") 1495 var tmp = (Subtask<T>) subtask; 1496 subtasks.add(tmp); 1497 return false; 1498 } 1499 1500 @Override 1501 public boolean onComplete(Subtask<? extends T> subtask) { 1502 return isDone.test(subtask); 1503 } 1504 1505 @Override 1506 public Stream<Subtask<T>> result() { 1507 return subtasks.stream(); 1508 } 1509 } 1510 1511 /** 1512 * Implementation of Config. 1513 */ 1514 private record ConfigImpl(ThreadFactory threadFactory, 1515 String name, 1516 Duration timeout) implements Config { 1517 static Config defaultConfig() { 1518 return new ConfigImpl(Thread.ofVirtual().factory(), null, null); 1519 } 1520 1521 @Override 1522 public Config withThreadFactory(ThreadFactory threadFactory) { 1523 return new ConfigImpl(Objects.requireNonNull(threadFactory), name, timeout); 1524 } 1525 1526 @Override 1527 public Config withName(String name) { 1528 return new ConfigImpl(threadFactory, Objects.requireNonNull(name), timeout); 1529 } 1530 1531 @Override 1532 public Config withTimeout(Duration timeout) { 1533 return new ConfigImpl(threadFactory, name, Objects.requireNonNull(timeout)); 1534 } 1535 } 1536 1537 /** 1538 * Used to schedule a task to cancel exception when a timeout expires. 1539 */ 1540 private static class TimerSupport { 1541 private static final ScheduledExecutorService DELAYED_TASK_SCHEDULER; 1542 static { 1543 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) 1544 Executors.newScheduledThreadPool(1, task -> { 1545 Thread t = InnocuousThread.newThread("StructuredTaskScope-Timer", task); 1546 t.setDaemon(true); 1547 return t; 1548 }); 1549 stpe.setRemoveOnCancelPolicy(true); 1550 DELAYED_TASK_SCHEDULER = stpe; 1551 } 1552 1553 static Future<?> schedule(Duration timeout, Runnable task) { 1554 long nanos = TimeUnit.NANOSECONDS.convert(timeout); 1555 return DELAYED_TASK_SCHEDULER.schedule(task, nanos, TimeUnit.NANOSECONDS); 1556 } 1557 } 1558 }