1 /* 2 * Copyright (c) 2021, 2024, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.concurrent; 26 27 import java.lang.invoke.MethodHandles; 28 import java.lang.invoke.VarHandle; 29 import java.time.Duration; 30 import java.time.Instant; 31 import java.util.Objects; 32 import java.util.Optional; 33 import java.util.concurrent.locks.ReentrantLock; 34 import java.util.function.Function; 35 import java.util.function.Supplier; 36 import jdk.internal.javac.PreviewFeature; 37 import jdk.internal.misc.ThreadFlock; 38 import jdk.internal.invoke.MhUtil; 39 40 /** 41 * A basic API for <em>structured concurrency</em>. {@code StructuredTaskScope} supports 42 * cases where a task splits into several concurrent subtasks, and where the subtasks must 43 * complete before the main task continues. A {@code StructuredTaskScope} can be used to 44 * ensure that the lifetime of a concurrent operation is confined by a <em>syntax block</em>, 45 * just like that of a sequential operation in structured programming. 46 * 47 * <h2>Basic operation</h2> 48 * 49 * A {@code StructuredTaskScope} is created with one of its public constructors. It defines 50 * the {@link #fork(Callable) fork} method to start a thread to execute a subtask, the {@link 51 * #join() join} method to wait for all subtasks to finish, and the {@link #close() close} 52 * method to close the task scope. The API is intended to be used with the {@code 53 * try-with-resources} statement. The intention is that code in the try <em>block</em> 54 * uses the {@code fork} method to fork threads to execute the subtasks, wait for the 55 * subtasks to finish with the {@code join} method, and then <em>process the results</em>. 56 * A call to the {@code fork} method returns a {@link Subtask Subtask} to representing 57 * the <em>forked subtask</em>. Once {@code join} is called, the {@code Subtask} can be 58 * used to get the result completed successfully, or the exception if the subtask failed. 59 * {@snippet lang=java : 60 * Callable<String> task1 = ... 61 * Callable<Integer> task2 = ... 62 * 63 * try (var scope = new StructuredTaskScope<Object>()) { 64 * 65 * Subtask<String> subtask1 = scope.fork(task1); // @highlight substring="fork" 66 * Subtask<Integer> subtask2 = scope.fork(task2); // @highlight substring="fork" 67 * 68 * scope.join(); // @highlight substring="join" 69 * 70 * ... process results/exceptions ... 71 * 72 * } // close // @highlight substring="close" 73 * } 74 * <p> The following example forks a collection of homogeneous subtasks, waits for all of 75 * them to complete with the {@code join} method, and uses the {@link Subtask.State 76 * Subtask.State} to partition the subtasks into a set of the subtasks that completed 77 * successfully and another for the subtasks that failed. 78 * {@snippet lang=java : 79 * List<Callable<String>> callables = ... 80 * 81 * try (var scope = new StructuredTaskScope<String>()) { 82 * 83 * List<Subtask<String>> subtasks = callables.stream().map(scope::fork).toList(); 84 * 85 * scope.join(); 86 * 87 * Map<Boolean, Set<Subtask<String>>> map = subtasks.stream() 88 * .collect(Collectors.partitioningBy(h -> h.state() == Subtask.State.SUCCESS, 89 * Collectors.toSet())); 90 * 91 * } // close 92 * } 93 * 94 * <p> To ensure correct usage, the {@code join} and {@code close} methods may only be 95 * invoked by the <em>owner</em> (the thread that opened/created the task scope), and the 96 * {@code close} method throws an exception after closing if the owner did not invoke the 97 * {@code join} method after forking. 98 * 99 * <p> {@code StructuredTaskScope} defines the {@link #shutdown() shutdown} method to shut 100 * down a task scope without closing it. The {@code shutdown()} method <em>cancels</em> all 101 * unfinished subtasks by {@linkplain Thread#interrupt() interrupting} the threads. It 102 * prevents new threads from starting in the task scope. If the owner is waiting in the 103 * {@code join} method then it will wakeup. 104 * 105 * <p> Shutdown is used for <em>short-circuiting</em> and allow subclasses to implement 106 * <em>policy</em> that does not require all subtasks to finish. 107 * 108 * <h2>Subclasses with policies for common cases</h2> 109 * 110 * Two subclasses of {@code StructuredTaskScope} are defined to implement policy for 111 * common cases: 112 * <ol> 113 * <li> {@link ShutdownOnSuccess ShutdownOnSuccess} captures the result of the first 114 * subtask to complete successfully. Once captured, it shuts down the task scope to 115 * interrupt unfinished threads and wakeup the owner. This class is intended for cases 116 * where the result of any subtask will do ("invoke any") and where there is no need to 117 * wait for results of other unfinished subtasks. It defines methods to get the first 118 * result or throw an exception if all subtasks fail. 119 * <li> {@link ShutdownOnFailure ShutdownOnFailure} captures the exception of the first 120 * subtask to fail. Once captured, it shuts down the task scope to interrupt unfinished 121 * threads and wakeup the owner. This class is intended for cases where the results of all 122 * subtasks are required ("invoke all"); if any subtask fails then the results of other 123 * unfinished subtasks are no longer needed. If defines methods to throw an exception if 124 * any of the subtasks fail. 125 * </ol> 126 * 127 * <p> The following are two examples that use the two classes. In both cases, a pair of 128 * subtasks are forked to fetch resources from two URL locations "left" and "right". The 129 * first example creates a ShutdownOnSuccess object to capture the result of the first 130 * subtask to complete successfully, cancelling the other by way of shutting down the task 131 * scope. The main task waits in {@code join} until either subtask completes with a result 132 * or both subtasks fail. It invokes {@link ShutdownOnSuccess#result(Function) 133 * result(Function)} method to get the captured result. If both subtasks fail then this 134 * method throws a {@code WebApplicationException} with the exception from one of the 135 * subtasks as the cause. 136 * {@snippet lang=java : 137 * try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) { 138 * 139 * scope.fork(() -> fetch(left)); 140 * scope.fork(() -> fetch(right)); 141 * 142 * scope.join(); 143 * 144 * // @link regex="result(?=\()" target="ShutdownOnSuccess#result" : 145 * String result = scope.result(e -> new WebApplicationException(e)); 146 * 147 * ... 148 * } 149 * } 150 * The second example creates a ShutdownOnFailure object to capture the exception of the 151 * first subtask to fail, cancelling the other by way of shutting down the task scope. The 152 * main task waits in {@link #joinUntil(Instant)} until both subtasks complete with a 153 * result, either fails, or a deadline is reached. It invokes {@link 154 * ShutdownOnFailure#throwIfFailed(Function) throwIfFailed(Function)} to throw an exception 155 * if either subtask fails. This method is a no-op if both subtasks complete successfully. 156 * The example uses {@link Supplier#get()} to get the result of each subtask. Using 157 * {@code Supplier} instead of {@code Subtask} is preferred for common cases where the 158 * object returned by fork is only used to get the result of a subtask that completed 159 * successfully. 160 * {@snippet lang=java : 161 * Instant deadline = ... 162 * 163 * try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { 164 * 165 * Supplier<String> supplier1 = scope.fork(() -> query(left)); 166 * Supplier<String> supplier2 = scope.fork(() -> query(right)); 167 * 168 * scope.joinUntil(deadline); 169 * 170 * // @link substring="throwIfFailed" target="ShutdownOnFailure#throwIfFailed" : 171 * scope.throwIfFailed(e -> new WebApplicationException(e)); 172 * 173 * // both subtasks completed successfully 174 * String result = Stream.of(supplier1, supplier2) 175 * .map(Supplier::get) 176 * .collect(Collectors.joining(", ", "{ ", " }")); 177 * 178 * ... 179 * } 180 * } 181 * 182 * <h2>Extending StructuredTaskScope</h2> 183 * 184 * {@code StructuredTaskScope} can be extended, and the {@link #handleComplete(Subtask) 185 * handleComplete} method overridden, to implement policies other than those implemented 186 * by {@code ShutdownOnSuccess} and {@code ShutdownOnFailure}. A subclass may, for example, 187 * collect the results of subtasks that complete successfully and ignore subtasks that 188 * fail. It may collect exceptions when subtasks fail. It may invoke the {@link #shutdown() 189 * shutdown} method to shut down and cause {@link #join() join} to wakeup when some 190 * condition arises. 191 * 192 * <p> A subclass will typically define methods to make available results, state, or other 193 * outcome to code that executes after the {@code join} method. A subclass that collects 194 * results and ignores subtasks that fail may define a method that returns the results. 195 * A subclass that implements a policy to shut down when a subtask fails may define a 196 * method to get the exception of the first subtask to fail. 197 * 198 * <p> The following is an example of a simple {@code StructuredTaskScope} implementation 199 * that collects homogenous subtasks that complete successfully. It defines the method 200 * "{@code completedSuccessfully()}" that the main task can invoke after it joins. 201 * {@snippet lang=java : 202 * class CollectingScope<T> extends StructuredTaskScope<T> { 203 * private final Queue<Subtask<? extends T>> subtasks = new LinkedTransferQueue<>(); 204 * 205 * @Override 206 * protected void handleComplete(Subtask<? extends T> subtask) { 207 * if (subtask.state() == Subtask.State.SUCCESS) { 208 * subtasks.add(subtask); 209 * } 210 * } 211 * 212 * @Override 213 * public CollectingScope<T> join() throws InterruptedException { 214 * super.join(); 215 * return this; 216 * } 217 * 218 * public Stream<Subtask<? extends T>> completedSuccessfully() { 219 * // @link substring="ensureOwnerAndJoined" target="ensureOwnerAndJoined" : 220 * super.ensureOwnerAndJoined(); 221 * return subtasks.stream(); 222 * } 223 * } 224 * } 225 * <p> The implementations of the {@code completedSuccessfully()} method in the example 226 * invokes {@link #ensureOwnerAndJoined()} to ensure that the method can only be invoked 227 * by the owner thread and only after it has joined. 228 * 229 * <h2><a id="TreeStructure">Tree structure</a></h2> 230 * 231 * Task scopes form a tree where parent-child relations are established implicitly when 232 * opening a new task scope: 233 * <ul> 234 * <li> A parent-child relation is established when a thread started in a task scope 235 * opens its own task scope. A thread started in task scope "A" that opens task scope 236 * "B" establishes a parent-child relation where task scope "A" is the parent of task 237 * scope "B". 238 * <li> A parent-child relation is established with nesting. If a thread opens task 239 * scope "B", then opens task scope "C" (before it closes "B"), then the enclosing task 240 * scope "B" is the parent of the nested task scope "C". 241 * </ul> 242 * 243 * The <i>descendants</i> of a task scope are the child task scopes that it is a parent 244 * of, plus the descendants of the child task scopes, recursively. 245 * 246 * <p> The tree structure supports: 247 * <ul> 248 * <li> Inheritance of {@linkplain ScopedValue scoped values} across threads. 249 * <li> Confinement checks. The phrase "threads contained in the task scope" in method 250 * descriptions means threads started in the task scope or descendant scopes. 251 * </ul> 252 * 253 * <p> The following example demonstrates the inheritance of a scoped value. A scoped 254 * value {@code USERNAME} is bound to the value "{@code duke}". A {@code StructuredTaskScope} 255 * is created and its {@code fork} method invoked to start a thread to execute {@code 256 * childTask}. The thread inherits the scoped value <em>bindings</em> captured when 257 * creating the task scope. The code in {@code childTask} uses the value of the scoped 258 * value and so reads the value "{@code duke}". 259 * {@snippet lang=java : 260 * private static final ScopedValue<String> USERNAME = ScopedValue.newInstance(); 261 * 262 * // @link substring="run" target="ScopedValue.Carrier#run(Runnable)" : 263 * ScopedValue.where(USERNAME, "duke").run(() -> { 264 * try (var scope = new StructuredTaskScope<String>()) { 265 * 266 * scope.fork(() -> childTask()); // @highlight substring="fork" 267 * ... 268 * } 269 * }); 270 * 271 * ... 272 * 273 * String childTask() { 274 * // @link substring="get" target="ScopedValue#get()" : 275 * String name = USERNAME.get(); // "duke" 276 * ... 277 * } 278 * } 279 * 280 * <p> {@code StructuredTaskScope} does not define APIs that exposes the tree structure 281 * at this time. 282 * 283 * <p> Unless otherwise specified, passing a {@code null} argument to a constructor 284 * or method in this class will cause a {@link NullPointerException} to be thrown. 285 * 286 * <h2>Memory consistency effects</h2> 287 * 288 * <p> Actions in the owner thread of, or a thread contained in, the task scope prior to 289 * {@linkplain #fork forking} of a subtask 290 * <a href="{@docRoot}/java.base/java/util/concurrent/package-summary.html#MemoryVisibility"> 291 * <i>happen-before</i></a> any actions taken by that subtask, which in turn <i>happen-before</i> 292 * the subtask result is {@linkplain Subtask#get() retrieved} or <i>happen-before</i> any 293 * actions taken in a thread after {@linkplain #join() joining} of the task scope. 294 * 295 * @jls 17.4.5 Happens-before Order 296 * 297 * @param <T> the result type of tasks executed in the task scope 298 * @since 21 299 */ 300 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY) 301 public class StructuredTaskScope<T> implements AutoCloseable { 302 private final ThreadFactory factory; 303 private final ThreadFlock flock; 304 private final ReentrantLock shutdownLock = new ReentrantLock(); 305 306 // states: OPEN -> SHUTDOWN -> CLOSED 307 private static final int OPEN = 0; // initial state 308 private static final int SHUTDOWN = 1; 309 private static final int CLOSED = 2; 310 311 // state: set to SHUTDOWN by any thread, set to CLOSED by owner, read by any thread 312 private volatile int state; 313 314 // Counters to support checking that the task scope owner joins before processing 315 // results and attempts join before closing the task scope. These counters are 316 // accessed only by the owner thread. 317 private int forkRound; // incremented when the first subtask is forked after join 318 private int lastJoinAttempted; // set to the current fork round when join is attempted 319 private int lastJoinCompleted; // set to the current fork round when join completes 320 321 /** 322 * Represents a subtask forked with {@link #fork(Callable)}. 323 * @param <T> the result type 324 * @since 21 325 */ 326 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY) 327 public sealed interface Subtask<T> extends Supplier<T> permits SubtaskImpl { 328 /** 329 * {@return the value returning task provided to the {@code fork} method} 330 * 331 * @apiNote Task objects with unique identity may be used for correlation by 332 * implementations of {@link #handleComplete(Subtask) handleComplete}. 333 */ 334 Callable<? extends T> task(); 335 336 /** 337 * Represents the state of a subtask. 338 * @see Subtask#state() 339 * @since 21 340 */ 341 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY) 342 enum State { 343 /** 344 * The subtask result or exception is not available. This state indicates that 345 * the subtask was forked but has not completed, it completed after the task 346 * scope was {@linkplain #shutdown() shut down}, or it was forked after the 347 * task scope was shut down. 348 */ 349 UNAVAILABLE, 350 /** 351 * The subtask completed successfully with a result. The {@link Subtask#get() 352 * Subtask.get()} method can be used to obtain the result. This is a terminal 353 * state. 354 */ 355 SUCCESS, 356 /** 357 * The subtask failed with an exception. The {@link Subtask#exception() 358 * Subtask.exception()} method can be used to obtain the exception. This is a 359 * terminal state. 360 */ 361 FAILED, 362 } 363 364 /** 365 * {@return the state of the subtask} 366 */ 367 State state(); 368 369 /** 370 * Returns the result of the subtask. 371 * 372 * <p> To ensure correct usage, if the scope owner {@linkplain #fork(Callable) forks} 373 * a subtask, then it must join (with {@link #join() join} or {@link #joinUntil(Instant) 374 * joinUntil}) before it can obtain the result of the subtask. 375 * 376 * @return the possibly-null result 377 * @throws IllegalStateException if the subtask has not completed, did not complete 378 * successfully, or the current thread is the task scope owner and did not join 379 * after forking 380 * @see State#SUCCESS 381 */ 382 T get(); 383 384 /** 385 * {@return the exception thrown by the subtask} 386 * 387 * <p> To ensure correct usage, if the scope owner {@linkplain #fork(Callable) forks} 388 * a subtask, then it must join (with {@link #join() join} or {@link #joinUntil(Instant) 389 * joinUntil}) before it can obtain the exception thrown by the subtask. 390 * 391 * @throws IllegalStateException if the subtask has not completed, completed with 392 * a result, or the current thread is the task scope owner and did not join after 393 * forking 394 * @see State#FAILED 395 */ 396 Throwable exception(); 397 } 398 399 /** 400 * Creates a structured task scope with the given name and thread factory. The task 401 * scope is optionally named for the purposes of monitoring and management. The thread 402 * factory is used to {@link ThreadFactory#newThread(Runnable) create} threads when 403 * subtasks are {@linkplain #fork(Callable) forked}. The task scope is owned by the 404 * current thread. 405 * 406 * <p> Construction captures the current thread's {@linkplain ScopedValue scoped value} 407 * bindings for inheritance by threads started in the task scope. The 408 * <a href="#TreeStructure">Tree Structure</a> section in the class description details 409 * how parent-child relations are established implicitly for the purpose of inheritance 410 * of scoped value bindings. 411 * 412 * @param name the name of the task scope, can be null 413 * @param factory the thread factory 414 */ 415 @SuppressWarnings("this-escape") 416 public StructuredTaskScope(String name, ThreadFactory factory) { 417 this.factory = Objects.requireNonNull(factory, "'factory' is null"); 418 if (name == null) 419 name = Objects.toIdentityString(this); 420 this.flock = ThreadFlock.open(name); 421 } 422 423 /** 424 * Creates an unnamed structured task scope that creates virtual threads. The task 425 * scope is owned by the current thread. 426 * 427 * @implSpec This constructor is equivalent to invoking the 2-arg constructor with a 428 * name of {@code null} and a thread factory that creates virtual threads. 429 */ 430 public StructuredTaskScope() { 431 this(null, Thread.ofVirtual().factory()); 432 } 433 434 private IllegalStateException newIllegalStateExceptionScopeClosed() { 435 return new IllegalStateException("Task scope is closed"); 436 } 437 438 private IllegalStateException newIllegalStateExceptionNoJoin() { 439 return new IllegalStateException("Owner did not join after forking subtasks"); 440 } 441 442 /** 443 * Throws IllegalStateException if the scope is closed, returning the state if not 444 * closed. 445 */ 446 private int ensureOpen() { 447 int s = state; 448 if (s == CLOSED) 449 throw newIllegalStateExceptionScopeClosed(); 450 return s; 451 } 452 453 /** 454 * Throws WrongThreadException if the current thread is not the owner. 455 */ 456 private void ensureOwner() { 457 if (Thread.currentThread() != flock.owner()) 458 throw new WrongThreadException("Current thread not owner"); 459 } 460 461 /** 462 * Throws WrongThreadException if the current thread is not the owner 463 * or a thread contained in the tree. 464 */ 465 private void ensureOwnerOrContainsThread() { 466 Thread currentThread = Thread.currentThread(); 467 if (currentThread != flock.owner() && !flock.containsThread(currentThread)) 468 throw new WrongThreadException("Current thread not owner or thread in the tree"); 469 } 470 471 /** 472 * Throws IllegalStateException if the current thread is the owner, and the owner did 473 * not join after forking a subtask in the given fork round. 474 */ 475 private void ensureJoinedIfOwner(int round) { 476 if (Thread.currentThread() == flock.owner() && (round > lastJoinCompleted)) { 477 throw newIllegalStateExceptionNoJoin(); 478 } 479 } 480 481 /** 482 * Ensures that the current thread is the owner of this task scope and that it joined 483 * (with {@link #join()} or {@link #joinUntil(Instant)}) after {@linkplain #fork(Callable) 484 * forking} subtasks. 485 * 486 * @apiNote This method can be used by subclasses that define methods to make available 487 * results, state, or other outcome to code intended to execute after the join method. 488 * 489 * @throws WrongThreadException if the current thread is not the task scope owner 490 * @throws IllegalStateException if the task scope is open and task scope owner did 491 * not join after forking 492 */ 493 protected final void ensureOwnerAndJoined() { 494 ensureOwner(); 495 if (forkRound > lastJoinCompleted) { 496 throw newIllegalStateExceptionNoJoin(); 497 } 498 } 499 500 /** 501 * Invoked by a subtask when it completes successfully or fails in this task scope. 502 * This method is not invoked if a subtask completes after the task scope is 503 * {@linkplain #shutdown() shut down}. 504 * 505 * @implSpec The default implementation throws {@code NullPointerException} if the 506 * subtask is {@code null}. It throws {@link IllegalArgumentException} if the subtask 507 * has not completed. 508 * 509 * @apiNote The {@code handleComplete} method should be thread safe. It may be 510 * invoked by several threads concurrently. 511 * 512 * @param subtask the subtask 513 * 514 * @throws IllegalArgumentException if called with a subtask that has not completed 515 */ 516 protected void handleComplete(Subtask<? extends T> subtask) { 517 if (subtask.state() == Subtask.State.UNAVAILABLE) 518 throw new IllegalArgumentException(); 519 } 520 521 /** 522 * Starts a new thread in this task scope to execute a value-returning task, thus 523 * creating a <em>subtask</em> of this task scope. 524 * 525 * <p> The value-returning task is provided to this method as a {@link Callable}, the 526 * thread executes the task's {@link Callable#call() call} method. The thread is 527 * created with the task scope's {@link ThreadFactory}. It inherits the current thread's 528 * {@linkplain ScopedValue scoped value} bindings. The bindings must match the bindings 529 * captured when the task scope was created. 530 * 531 * <p> This method returns a {@link Subtask Subtask} to represent the <em>forked 532 * subtask</em>. The {@code Subtask} object can be used to obtain the result when 533 * the subtask completes successfully, or the exception when the subtask fails. To 534 * ensure correct usage, the {@link Subtask#get() get()} and {@link Subtask#exception() 535 * exception()} methods may only be called by the task scope owner after it has waited 536 * for all threads to finish with the {@link #join() join} or {@link #joinUntil(Instant)} 537 * methods. When the subtask completes, the thread invokes the {@link 538 * #handleComplete(Subtask) handleComplete} method to consume the completed subtask. 539 * If the task scope is {@linkplain #shutdown() shut down} before the subtask completes 540 * then the {@code handleComplete} method will not be invoked. 541 * 542 * <p> If this task scope is {@linkplain #shutdown() shutdown} (or in the process of 543 * shutting down) then the subtask will not run and the {@code handleComplete} method 544 * will not be invoked. 545 * 546 * <p> This method may only be invoked by the task scope owner or threads contained 547 * in the task scope. 548 * 549 * @implSpec This method may be overridden for customization purposes, wrapping tasks 550 * for example. If overridden, the subclass must invoke {@code super.fork} to start a 551 * new thread in this task scope. 552 * 553 * @param task the value-returning task for the thread to execute 554 * @param <U> the result type 555 * @return the subtask 556 * @throws IllegalStateException if this task scope is closed 557 * @throws WrongThreadException if the current thread is not the task scope owner or a 558 * thread contained in the task scope 559 * @throws StructureViolationException if the current scoped value bindings are not 560 * the same as when the task scope was created 561 * @throws RejectedExecutionException if the thread factory rejected creating a 562 * thread to run the subtask 563 */ 564 public <U extends T> Subtask<U> fork(Callable<? extends U> task) { 565 Objects.requireNonNull(task, "'task' is null"); 566 int s = ensureOpen(); // throws ISE if closed 567 568 // when forked by the owner, the subtask is forked in the current or next round 569 int round = -1; 570 if (Thread.currentThread() == flock.owner()) { 571 round = forkRound; 572 if (forkRound == lastJoinCompleted) { 573 // new round if first fork after join 574 round++; 575 } 576 } 577 578 SubtaskImpl<U> subtask = new SubtaskImpl<>(this, task, round); 579 if (s < SHUTDOWN) { 580 // create thread to run task 581 Thread thread = factory.newThread(subtask); 582 if (thread == null) { 583 throw new RejectedExecutionException("Rejected by thread factory"); 584 } 585 586 // attempt to start the thread 587 try { 588 flock.start(thread); 589 } catch (IllegalStateException e) { 590 // shutdown by another thread, or underlying flock is shutdown due 591 // to unstructured use 592 } 593 } 594 595 // force owner to join if this is the first fork in the round 596 if (Thread.currentThread() == flock.owner() && round > forkRound) { 597 forkRound = round; 598 } 599 600 // return forked subtask or a subtask that did not run 601 return subtask; 602 } 603 604 /** 605 * Wait for all threads to finish or the task scope to shut down. 606 */ 607 private void implJoin(Duration timeout) 608 throws InterruptedException, TimeoutException 609 { 610 ensureOwner(); 611 lastJoinAttempted = forkRound; 612 int s = ensureOpen(); // throws ISE if closed 613 if (s == OPEN) { 614 // wait for all threads, wakeup, interrupt, or timeout 615 if (timeout != null) { 616 flock.awaitAll(timeout); 617 } else { 618 flock.awaitAll(); 619 } 620 } 621 lastJoinCompleted = forkRound; 622 } 623 624 /** 625 * Wait for all subtasks started in this task scope to finish or the task scope to 626 * shut down. 627 * 628 * <p> This method waits for all subtasks by waiting for all threads {@linkplain 629 * #fork(Callable) started} in this task scope to finish execution. It stops waiting 630 * when all threads finish, the task scope is {@linkplain #shutdown() shut down}, or 631 * the current thread is {@linkplain Thread#interrupt() interrupted}. 632 * 633 * <p> This method may only be invoked by the task scope owner. 634 * 635 * @implSpec This method may be overridden for customization purposes or to return a 636 * more specific return type. If overridden, the subclass must invoke {@code 637 * super.join} to ensure that the method waits for threads in this task scope to 638 * finish. 639 * 640 * @return this task scope 641 * @throws IllegalStateException if this task scope is closed 642 * @throws WrongThreadException if the current thread is not the task scope owner 643 * @throws InterruptedException if interrupted while waiting 644 */ 645 public StructuredTaskScope<T> join() throws InterruptedException { 646 try { 647 implJoin(null); 648 } catch (TimeoutException e) { 649 throw new InternalError(); 650 } 651 return this; 652 } 653 654 /** 655 * Wait for all subtasks started in this task scope to finish or the task scope to 656 * shut down, up to the given deadline. 657 * 658 * <p> This method waits for all subtasks by waiting for all threads {@linkplain 659 * #fork(Callable) started} in this task scope to finish execution. It stops waiting 660 * when all threads finish, the task scope is {@linkplain #shutdown() shut down}, the 661 * deadline is reached, or the current thread is {@linkplain Thread#interrupt() 662 * interrupted}. 663 * 664 * <p> This method may only be invoked by the task scope owner. 665 * 666 * @implSpec This method may be overridden for customization purposes or to return a 667 * more specific return type. If overridden, the subclass must invoke {@code 668 * super.joinUntil} to ensure that the method waits for threads in this task scope to 669 * finish. 670 * 671 * @param deadline the deadline 672 * @return this task scope 673 * @throws IllegalStateException if this task scope is closed 674 * @throws WrongThreadException if the current thread is not the task scope owner 675 * @throws InterruptedException if interrupted while waiting 676 * @throws TimeoutException if the deadline is reached while waiting 677 */ 678 public StructuredTaskScope<T> joinUntil(Instant deadline) 679 throws InterruptedException, TimeoutException 680 { 681 Duration timeout = Duration.between(Instant.now(), deadline); 682 implJoin(timeout); 683 return this; 684 } 685 686 /** 687 * Interrupt all unfinished threads. 688 */ 689 private void interruptAll() { 690 flock.threads() 691 .filter(t -> t != Thread.currentThread()) 692 .forEach(t -> { 693 try { 694 t.interrupt(); 695 } catch (Throwable ignore) { } 696 }); 697 } 698 699 /** 700 * Shutdown the task scope if not already shutdown. Return true if this method 701 * shutdowns the task scope, false if already shutdown. 702 */ 703 private boolean implShutdown() { 704 shutdownLock.lock(); 705 try { 706 if (state < SHUTDOWN) { 707 // prevent new threads from starting 708 flock.shutdown(); 709 710 // set status before interrupting tasks 711 state = SHUTDOWN; 712 713 // interrupt all unfinished threads 714 interruptAll(); 715 716 return true; 717 } else { 718 // already shutdown 719 return false; 720 } 721 } finally { 722 shutdownLock.unlock(); 723 } 724 } 725 726 /** 727 * Shut down this task scope without closing it. Shutting down a task scope prevents 728 * new threads from starting, interrupts all unfinished threads, and causes the 729 * {@link #join() join} method to wakeup. Shutdown is useful for cases where the 730 * results of unfinished subtasks are no longer needed. It will typically be called 731 * by the {@link #handleComplete(Subtask)} implementation of a subclass that 732 * implements a policy to discard unfinished tasks once some outcome is reached. 733 * 734 * <p> More specifically, this method: 735 * <ul> 736 * <li> {@linkplain Thread#interrupt() Interrupts} all unfinished threads in the 737 * task scope (except the current thread). 738 * <li> Wakes up the task scope owner if it is waiting in {@link #join()} or {@link 739 * #joinUntil(Instant)}. If the task scope owner is not waiting then its next call to 740 * {@code join} or {@code joinUntil} will return immediately. 741 * </ul> 742 * 743 * <p> The {@linkplain Subtask.State state} of unfinished subtasks that complete at 744 * around the time that the task scope is shutdown is not defined. A subtask that 745 * completes successfully with a result, or fails with an exception, at around 746 * the time that the task scope is shutdown may or may not <i>transition</i> to a 747 * terminal state. 748 * 749 * <p> This method may only be invoked by the task scope owner or threads contained 750 * in the task scope. 751 * 752 * @implSpec This method may be overridden for customization purposes. If overridden, 753 * the subclass must invoke {@code super.shutdown} to ensure that the method shuts 754 * down the task scope. 755 * 756 * @apiNote 757 * There may be threads that have not finished because they are executing code that 758 * did not respond (or respond promptly) to thread interrupt. This method does not wait 759 * for these threads. When the owner invokes the {@link #close() close} method 760 * to close the task scope then it will wait for the remaining threads to finish. 761 * 762 * @throws IllegalStateException if this task scope is closed 763 * @throws WrongThreadException if the current thread is not the task scope owner or 764 * a thread contained in the task scope 765 * @see #isShutdown() 766 */ 767 public void shutdown() { 768 ensureOwnerOrContainsThread(); 769 int s = ensureOpen(); // throws ISE if closed 770 if (s < SHUTDOWN && implShutdown()) 771 flock.wakeup(); 772 } 773 774 /** 775 * {@return true if this task scope is shutdown, otherwise false} 776 * @see #shutdown() 777 */ 778 public final boolean isShutdown() { 779 return state >= SHUTDOWN; 780 } 781 782 /** 783 * Closes this task scope. 784 * 785 * <p> This method first shuts down the task scope (as if by invoking the {@link 786 * #shutdown() shutdown} method). It then waits for the threads executing any 787 * unfinished tasks to finish. If interrupted, this method will continue to wait for 788 * the threads to finish before completing with the interrupt status set. 789 * 790 * <p> This method may only be invoked by the task scope owner. If the task scope 791 * is already closed then the task scope owner invoking this method has no effect. 792 * 793 * <p> A {@code StructuredTaskScope} is intended to be used in a <em>structured 794 * manner</em>. If this method is called to close a task scope before nested task 795 * scopes are closed then it closes the underlying construct of each nested task scope 796 * (in the reverse order that they were created in), closes this task scope, and then 797 * throws {@link StructureViolationException}. 798 * Similarly, if this method is called to close a task scope while executing with 799 * {@linkplain ScopedValue scoped value} bindings, and the task scope was created 800 * before the scoped values were bound, then {@code StructureViolationException} is 801 * thrown after closing the task scope. 802 * If a thread terminates without first closing task scopes that it owns then 803 * termination will cause the underlying construct of each of its open tasks scopes to 804 * be closed. Closing is performed in the reverse order that the task scopes were 805 * created in. Thread termination may therefore be delayed when the task scope owner 806 * has to wait for threads forked in these task scopes to finish. 807 * 808 * @implSpec This method may be overridden for customization purposes. If overridden, 809 * the subclass must invoke {@code super.close} to close the task scope. 810 * 811 * @throws IllegalStateException thrown after closing the task scope if the task scope 812 * owner did not attempt to join after forking 813 * @throws WrongThreadException if the current thread is not the task scope owner 814 * @throws StructureViolationException if a structure violation was detected 815 */ 816 @Override 817 public void close() { 818 ensureOwner(); 819 int s = state; 820 if (s == CLOSED) 821 return; 822 823 try { 824 if (s < SHUTDOWN) 825 implShutdown(); 826 flock.close(); 827 } finally { 828 state = CLOSED; 829 } 830 831 // throw ISE if the owner didn't attempt to join after forking 832 if (forkRound > lastJoinAttempted) { 833 lastJoinCompleted = forkRound; 834 throw newIllegalStateExceptionNoJoin(); 835 } 836 } 837 838 @Override 839 public String toString() { 840 String name = flock.name(); 841 return switch (state) { 842 case OPEN -> name; 843 case SHUTDOWN -> name + "/shutdown"; 844 case CLOSED -> name + "/closed"; 845 default -> throw new InternalError(); 846 }; 847 } 848 849 /** 850 * Subtask implementation, runs the task specified to the fork method. 851 */ 852 private static final class SubtaskImpl<T> implements Subtask<T>, Runnable { 853 private static final AltResult RESULT_NULL = new AltResult(Subtask.State.SUCCESS); 854 855 private record AltResult(Subtask.State state, Throwable exception) { 856 AltResult(Subtask.State state) { 857 this(state, null); 858 } 859 } 860 861 private final StructuredTaskScope<? super T> scope; 862 private final Callable<? extends T> task; 863 private final int round; 864 private volatile Object result; 865 866 SubtaskImpl(StructuredTaskScope<? super T> scope, 867 Callable<? extends T> task, 868 int round) { 869 this.scope = scope; 870 this.task = task; 871 this.round = round; 872 } 873 874 @Override 875 public void run() { 876 T result = null; 877 Throwable ex = null; 878 try { 879 result = task.call(); 880 } catch (Throwable e) { 881 ex = e; 882 } 883 884 // nothing to do if task scope is shutdown 885 if (scope.isShutdown()) 886 return; 887 888 // capture result or exception, invoke handleComplete 889 if (ex == null) { 890 this.result = (result != null) ? result : RESULT_NULL; 891 } else { 892 this.result = new AltResult(State.FAILED, ex); 893 } 894 scope.handleComplete(this); 895 } 896 897 @Override 898 public Callable<? extends T> task() { 899 return task; 900 } 901 902 @Override 903 public Subtask.State state() { 904 Object result = this.result; 905 if (result == null) { 906 return State.UNAVAILABLE; 907 } else if (result instanceof AltResult alt) { 908 // null or failed 909 return alt.state(); 910 } else { 911 return State.SUCCESS; 912 } 913 } 914 915 @Override 916 public T get() { 917 scope.ensureJoinedIfOwner(round); 918 Object result = this.result; 919 if (result instanceof AltResult) { 920 if (result == RESULT_NULL) return null; 921 } else if (result != null) { 922 @SuppressWarnings("unchecked") 923 T r = (T) result; 924 return r; 925 } 926 throw new IllegalStateException( 927 "Result is unavailable or subtask did not complete successfully"); 928 } 929 930 @Override 931 public Throwable exception() { 932 scope.ensureJoinedIfOwner(round); 933 Object result = this.result; 934 if (result instanceof AltResult alt && alt.state() == State.FAILED) { 935 return alt.exception(); 936 } 937 throw new IllegalStateException( 938 "Exception is unavailable or subtask did not complete with exception"); 939 } 940 941 @Override 942 public String toString() { 943 String stateAsString = switch (state()) { 944 case UNAVAILABLE -> "[Unavailable]"; 945 case SUCCESS -> "[Completed successfully]"; 946 case FAILED -> { 947 Throwable ex = ((AltResult) result).exception(); 948 yield "[Failed: " + ex + "]"; 949 } 950 }; 951 return Objects.toIdentityString(this) + stateAsString; 952 } 953 } 954 955 /** 956 * A {@code StructuredTaskScope} that captures the result of the first subtask to 957 * complete {@linkplain Subtask.State#SUCCESS successfully}. Once captured, it 958 * {@linkplain #shutdown() shuts down} the task scope to interrupt unfinished threads 959 * and wakeup the task scope owner. The policy implemented by this class is intended 960 * for cases where the result of any subtask will do ("invoke any") and where the 961 * results of other unfinished subtasks are no longer needed. 962 * 963 * <p> Unless otherwise specified, passing a {@code null} argument to a method 964 * in this class will cause a {@link NullPointerException} to be thrown. 965 * 966 * @apiNote This class implements a policy to shut down the task scope when a subtask 967 * completes successfully. There shouldn't be any need to directly shut down the task 968 * scope with the {@link #shutdown() shutdown} method. 969 * 970 * @param <T> the result type 971 * @since 21 972 */ 973 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY) 974 public static final class ShutdownOnSuccess<T> extends StructuredTaskScope<T> { 975 private static final Object RESULT_NULL = new Object(); 976 private static final VarHandle FIRST_RESULT; 977 private static final VarHandle FIRST_EXCEPTION; 978 static { 979 MethodHandles.Lookup l = MethodHandles.lookup(); 980 FIRST_RESULT = MhUtil.findVarHandle(l, "firstResult", Object.class); 981 FIRST_EXCEPTION = MhUtil.findVarHandle(l, "firstException", Throwable.class); 982 } 983 private volatile Object firstResult; 984 private volatile Throwable firstException; 985 986 /** 987 * Constructs a new {@code ShutdownOnSuccess} with the given name and thread factory. 988 * The task scope is optionally named for the purposes of monitoring and management. 989 * The thread factory is used to {@link ThreadFactory#newThread(Runnable) create} 990 * threads when subtasks are {@linkplain #fork(Callable) forked}. The task scope 991 * is owned by the current thread. 992 * 993 * <p> Construction captures the current thread's {@linkplain ScopedValue scoped 994 * value} bindings for inheritance by threads started in the task scope. The 995 * {@linkplain StructuredTaskScope##TreeStructure Tree Structure} section 996 * in the class description details how parent-child relations are established 997 * implicitly for the purpose of inheritance of scoped value bindings. 998 * 999 * @param name the name of the task scope, can be null 1000 * @param factory the thread factory 1001 */ 1002 public ShutdownOnSuccess(String name, ThreadFactory factory) { 1003 super(name, factory); 1004 } 1005 1006 /** 1007 * Constructs a new unnamed {@code ShutdownOnSuccess} that creates virtual threads. 1008 * 1009 * @implSpec This constructor is equivalent to invoking the 2-arg constructor with 1010 * a name of {@code null} and a thread factory that creates virtual threads. 1011 */ 1012 public ShutdownOnSuccess() { 1013 this(null, Thread.ofVirtual().factory()); 1014 } 1015 1016 @Override 1017 protected void handleComplete(Subtask<? extends T> subtask) { 1018 if (firstResult != null) { 1019 // already captured a result 1020 return; 1021 } 1022 1023 if (subtask.state() == Subtask.State.SUCCESS) { 1024 // task succeeded 1025 T result = subtask.get(); 1026 Object r = (result != null) ? result : RESULT_NULL; 1027 if (FIRST_RESULT.compareAndSet(this, null, r)) { 1028 super.shutdown(); 1029 } 1030 } else if (firstException == null) { 1031 // capture the exception thrown by the first subtask that failed 1032 FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception()); 1033 } 1034 } 1035 1036 /** 1037 * Wait for a subtask started in this task scope to complete {@linkplain 1038 * Subtask.State#SUCCESS successfully} or all subtasks to complete. 1039 * 1040 * <p> This method waits for all subtasks by waiting for all threads {@linkplain 1041 * #fork(Callable) started} in this task scope to finish execution. It stops waiting 1042 * when all threads finish, a subtask completes successfully, or the current 1043 * thread is {@linkplain Thread#interrupt() interrupted}. It also stops waiting 1044 * if the {@link #shutdown() shutdown} method is invoked directly to shut down 1045 * this task scope. 1046 * 1047 * <p> This method may only be invoked by the task scope owner. 1048 * 1049 * @throws IllegalStateException {@inheritDoc} 1050 * @throws WrongThreadException {@inheritDoc} 1051 */ 1052 @Override 1053 public ShutdownOnSuccess<T> join() throws InterruptedException { 1054 super.join(); 1055 return this; 1056 } 1057 1058 /** 1059 * Wait for a subtask started in this task scope to complete {@linkplain 1060 * Subtask.State#SUCCESS successfully} or all subtasks to complete, up to the 1061 * given deadline. 1062 * 1063 * <p> This method waits for all subtasks by waiting for all threads {@linkplain 1064 * #fork(Callable) started} in this task scope to finish execution. It stops waiting 1065 * when all threads finish, a subtask completes successfully, the deadline is 1066 * reached, or the current thread is {@linkplain Thread#interrupt() interrupted}. 1067 * It also stops waiting if the {@link #shutdown() shutdown} method is invoked 1068 * directly to shut down this task scope. 1069 * 1070 * <p> This method may only be invoked by the task scope owner. 1071 * 1072 * @throws IllegalStateException {@inheritDoc} 1073 * @throws WrongThreadException {@inheritDoc} 1074 */ 1075 @Override 1076 public ShutdownOnSuccess<T> joinUntil(Instant deadline) 1077 throws InterruptedException, TimeoutException 1078 { 1079 super.joinUntil(deadline); 1080 return this; 1081 } 1082 1083 /** 1084 * {@return the result of the first subtask that completed {@linkplain 1085 * Subtask.State#SUCCESS successfully}} 1086 * 1087 * <p> When no subtask completed successfully, but a subtask {@linkplain 1088 * Subtask.State#FAILED failed} then {@code ExecutionException} is thrown with 1089 * the subtask's exception as the {@linkplain Throwable#getCause() cause}. 1090 * 1091 * @throws ExecutionException if no subtasks completed successfully but at least 1092 * one subtask failed 1093 * @throws IllegalStateException if no subtasks completed or the task scope owner 1094 * did not join after forking 1095 * @throws WrongThreadException if the current thread is not the task scope owner 1096 */ 1097 public T result() throws ExecutionException { 1098 return result(ExecutionException::new); 1099 } 1100 1101 /** 1102 * Returns the result of the first subtask that completed {@linkplain 1103 * Subtask.State#SUCCESS successfully}, otherwise throws an exception produced 1104 * by the given exception supplying function. 1105 * 1106 * <p> When no subtask completed successfully, but a subtask {@linkplain 1107 * Subtask.State#FAILED failed}, then the exception supplying function is invoked 1108 * with subtask's exception. 1109 * 1110 * @param esf the exception supplying function 1111 * @param <X> type of the exception to be thrown 1112 * @return the result of the first subtask that completed with a result 1113 * 1114 * @throws X if no subtasks completed successfully but at least one subtask failed 1115 * @throws IllegalStateException if no subtasks completed or the task scope owner 1116 * did not join after forking 1117 * @throws WrongThreadException if the current thread is not the task scope owner 1118 */ 1119 public <X extends Throwable> T result(Function<Throwable, ? extends X> esf) throws X { 1120 Objects.requireNonNull(esf); 1121 ensureOwnerAndJoined(); 1122 1123 Object result = firstResult; 1124 if (result == RESULT_NULL) { 1125 return null; 1126 } else if (result != null) { 1127 @SuppressWarnings("unchecked") 1128 T r = (T) result; 1129 return r; 1130 } 1131 1132 Throwable exception = firstException; 1133 if (exception != null) { 1134 X ex = esf.apply(exception); 1135 Objects.requireNonNull(ex, "esf returned null"); 1136 throw ex; 1137 } 1138 1139 throw new IllegalStateException("No completed subtasks"); 1140 } 1141 } 1142 1143 /** 1144 * A {@code StructuredTaskScope} that captures the exception of the first subtask to 1145 * {@linkplain Subtask.State#FAILED fail}. Once captured, it {@linkplain #shutdown() 1146 * shuts down} the task scope to interrupt unfinished threads and wakeup the task 1147 * scope owner. The policy implemented by this class is intended for cases where the 1148 * results for all subtasks are required ("invoke all"); if any subtask fails then the 1149 * results of other unfinished subtasks are no longer needed. 1150 * 1151 * <p> Unless otherwise specified, passing a {@code null} argument to a method 1152 * in this class will cause a {@link NullPointerException} to be thrown. 1153 * 1154 * @apiNote This class implements a policy to shut down the task scope when a subtask 1155 * fails. There shouldn't be any need to directly shut down the task scope with the 1156 * {@link #shutdown() shutdown} method. 1157 * 1158 * @since 21 1159 */ 1160 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY) 1161 public static final class ShutdownOnFailure extends StructuredTaskScope<Object> { 1162 private static final VarHandle FIRST_EXCEPTION = 1163 MhUtil.findVarHandle(MethodHandles.lookup(), "firstException", Throwable.class); 1164 private volatile Throwable firstException; 1165 1166 /** 1167 * Constructs a new {@code ShutdownOnFailure} with the given name and thread factory. 1168 * The task scope is optionally named for the purposes of monitoring and management. 1169 * The thread factory is used to {@link ThreadFactory#newThread(Runnable) create} 1170 * threads when subtasks are {@linkplain #fork(Callable) forked}. The task scope 1171 * is owned by the current thread. 1172 * 1173 * <p> Construction captures the current thread's {@linkplain ScopedValue scoped 1174 * value} bindings for inheritance by threads started in the task scope. The 1175 * {@linkplain StructuredTaskScope##TreeStructure Tree Structure} section in the class description 1176 * details how parent-child relations are established implicitly for the purpose 1177 * of inheritance of scoped value bindings. 1178 * 1179 * @param name the name of the task scope, can be null 1180 * @param factory the thread factory 1181 */ 1182 public ShutdownOnFailure(String name, ThreadFactory factory) { 1183 super(name, factory); 1184 } 1185 1186 /** 1187 * Constructs a new unnamed {@code ShutdownOnFailure} that creates virtual threads. 1188 * 1189 * @implSpec This constructor is equivalent to invoking the 2-arg constructor with 1190 * a name of {@code null} and a thread factory that creates virtual threads. 1191 */ 1192 public ShutdownOnFailure() { 1193 this(null, Thread.ofVirtual().factory()); 1194 } 1195 1196 @Override 1197 protected void handleComplete(Subtask<?> subtask) { 1198 if (subtask.state() == Subtask.State.FAILED 1199 && firstException == null 1200 && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception())) { 1201 super.shutdown(); 1202 } 1203 } 1204 1205 /** 1206 * Wait for all subtasks started in this task scope to complete or for a subtask 1207 * to {@linkplain Subtask.State#FAILED fail}. 1208 * 1209 * <p> This method waits for all subtasks by waiting for all threads {@linkplain 1210 * #fork(Callable) started} in this task scope to finish execution. It stops waiting 1211 * when all threads finish, a subtask fails, or the current thread is {@linkplain 1212 * Thread#interrupt() interrupted}. It also stops waiting if the {@link #shutdown() 1213 * shutdown} method is invoked directly to shut down this task scope. 1214 * 1215 * <p> This method may only be invoked by the task scope owner. 1216 * 1217 * @throws IllegalStateException {@inheritDoc} 1218 * @throws WrongThreadException {@inheritDoc} 1219 */ 1220 @Override 1221 public ShutdownOnFailure join() throws InterruptedException { 1222 super.join(); 1223 return this; 1224 } 1225 1226 /** 1227 * Wait for all subtasks started in this task scope to complete or for a subtask 1228 * to {@linkplain Subtask.State#FAILED fail}, up to the given deadline. 1229 * 1230 * <p> This method waits for all subtasks by waiting for all threads {@linkplain 1231 * #fork(Callable) started} in this task scope to finish execution. It stops waiting 1232 * when all threads finish, a subtask fails, the deadline is reached, or the current 1233 * thread is {@linkplain Thread#interrupt() interrupted}. It also stops waiting 1234 * if the {@link #shutdown() shutdown} method is invoked directly to shut down 1235 * this task scope. 1236 * 1237 * <p> This method may only be invoked by the task scope owner. 1238 * 1239 * @throws IllegalStateException {@inheritDoc} 1240 * @throws WrongThreadException {@inheritDoc} 1241 */ 1242 @Override 1243 public ShutdownOnFailure joinUntil(Instant deadline) 1244 throws InterruptedException, TimeoutException 1245 { 1246 super.joinUntil(deadline); 1247 return this; 1248 } 1249 1250 /** 1251 * Returns the exception of the first subtask that {@linkplain Subtask.State#FAILED 1252 * failed}. If no subtasks failed then an empty {@code Optional} is returned. 1253 * 1254 * @return the exception for the first subtask to fail or an empty optional if no 1255 * subtasks failed 1256 * 1257 * @throws WrongThreadException if the current thread is not the task scope owner 1258 * @throws IllegalStateException if the task scope owner did not join after forking 1259 */ 1260 public Optional<Throwable> exception() { 1261 ensureOwnerAndJoined(); 1262 return Optional.ofNullable(firstException); 1263 } 1264 1265 /** 1266 * Throws if a subtask {@linkplain Subtask.State#FAILED failed}. 1267 * If any subtask failed with an exception then {@code ExecutionException} is 1268 * thrown with the exception of the first subtask to fail as the {@linkplain 1269 * Throwable#getCause() cause}. This method does nothing if no subtasks failed. 1270 * 1271 * @throws ExecutionException if a subtask failed 1272 * @throws WrongThreadException if the current thread is not the task scope owner 1273 * @throws IllegalStateException if the task scope owner did not join after forking 1274 */ 1275 public void throwIfFailed() throws ExecutionException { 1276 throwIfFailed(ExecutionException::new); 1277 } 1278 1279 /** 1280 * Throws the exception produced by the given exception supplying function if a 1281 * subtask {@linkplain Subtask.State#FAILED failed}. If any subtask failed with 1282 * an exception then the function is invoked with the exception of the first 1283 * subtask to fail. The exception returned by the function is thrown. This method 1284 * does nothing if no subtasks failed. 1285 * 1286 * @param esf the exception supplying function 1287 * @param <X> type of the exception to be thrown 1288 * 1289 * @throws X produced by the exception supplying function 1290 * @throws WrongThreadException if the current thread is not the task scope owner 1291 * @throws IllegalStateException if the task scope owner did not join after forking 1292 */ 1293 public <X extends Throwable> 1294 void throwIfFailed(Function<Throwable, ? extends X> esf) throws X { 1295 ensureOwnerAndJoined(); 1296 Objects.requireNonNull(esf); 1297 Throwable exception = firstException; 1298 if (exception != null) { 1299 X ex = esf.apply(exception); 1300 Objects.requireNonNull(ex, "esf returned null"); 1301 throw ex; 1302 } 1303 } 1304 } 1305 }