1 /* 2 * Copyright (c) 2021, 2023, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.concurrent; 26 27 import java.lang.invoke.MethodHandles; 28 import java.lang.invoke.VarHandle; 29 import java.security.AccessController; 30 import java.security.PrivilegedAction; 31 import java.time.Duration; 32 import java.time.Instant; 33 import java.util.Objects; 34 import java.util.Optional; 35 import java.util.concurrent.locks.ReentrantLock; 36 import java.util.function.Function; 37 import java.util.function.Supplier; 38 import jdk.internal.javac.PreviewFeature; 39 import jdk.internal.misc.ThreadFlock; 40 41 /** 42 * A basic API for <em>structured concurrency</em>. {@code StructuredTaskScope} supports 43 * cases where a task splits into several concurrent subtasks, and where the subtasks must 44 * complete before the main task continues. A {@code StructuredTaskScope} can be used to 45 * ensure that the lifetime of a concurrent operation is confined by a <em>syntax block</em>, 46 * just like that of a sequential operation in structured programming. 47 * 48 * <h2>Basic operation</h2> 49 * 50 * A {@code StructuredTaskScope} is created with one of its public constructors. It defines 51 * the {@link #fork(Callable) fork} method to start a thread to execute a subtask, the {@link 52 * #join() join} method to wait for all subtasks to finish, and the {@link #close() close} 53 * method to close the task scope. The API is intended to be used with the {@code 54 * try-with-resources} statement. The intention is that code in the try <em>block</em> 55 * uses the {@code fork} method to fork threads to execute the subtasks, wait for the 56 * subtasks to finish with the {@code join} method, and then <em>process the results</em>. 57 * A call to the {@code fork} method returns a {@link Subtask Subtask} to representing 58 * the <em>forked subtask</em>. Once {@code join} is called, the {@code Subtask} can be 59 * used to get the result completed successfully, or the exception if the subtask failed. 60 * {@snippet lang=java : 61 * Callable<String> task1 = ... 62 * Callable<Integer> task2 = ... 63 * 64 * try (var scope = new StructuredTaskScope<Object>()) { 65 * 66 * Subtask<String> subtask1 = scope.fork(task1); // @highlight substring="fork" 67 * Subtask<Integer> subtask2 = scope.fork(task2); // @highlight substring="fork" 68 * 69 * scope.join(); // @highlight substring="join" 70 * 71 * ... process results/exceptions ... 72 * 73 * } // close // @highlight substring="close" 74 * } 75 * <p> The following example forks a collection of homogeneous subtasks, waits for all of 76 * them to complete with the {@code join} method, and uses the {@link Subtask.State 77 * Subtask.State} to partition the subtasks into a set of the subtasks that completed 78 * successfully and another for the subtasks that failed. 79 * {@snippet lang=java : 80 * List<Callable<String>> callables = ... 81 * 82 * try (var scope = new StructuredTaskScope<String>()) { 83 * 84 * List<Subtask<String>> subtasks = callables.stream().map(scope::fork).toList(); 85 * 86 * scope.join(); 87 * 88 * Map<Boolean, Set<Subtask<String>>> map = subtasks.stream() 89 * .collect(Collectors.partitioningBy(h -> h.state() == Subtask.State.SUCCESS, 90 * Collectors.toSet())); 91 * 92 * } // close 93 * } 94 * 95 * <p> To ensure correct usage, the {@code join} and {@code close} methods may only be 96 * invoked by the <em>owner</em> (the thread that opened/created the task scope), and the 97 * {@code close} method throws an exception after closing if the owner did not invoke the 98 * {@code join} method after forking. 99 * 100 * <p> {@code StructuredTaskScope} defines the {@link #shutdown() shutdown} method to shut 101 * down a task scope without closing it. The {@code shutdown()} method <em>cancels</em> all 102 * unfinished subtasks by {@linkplain Thread#interrupt() interrupting} the threads. It 103 * prevents new threads from starting in the task scope. If the owner is waiting in the 104 * {@code join} method then it will wakeup. 105 * 106 * <p> Shutdown is used for <em>short-circuiting</em> and allow subclasses to implement 107 * <em>policy</em> that does not require all subtasks to finish. 108 * 109 * <h2>Subclasses with policies for common cases</h2> 110 * 111 * Two subclasses of {@code StructuredTaskScope} are defined to implement policy for 112 * common cases: 113 * <ol> 114 * <li> {@link ShutdownOnSuccess ShutdownOnSuccess} captures the result of the first 115 * subtask to complete successfully. Once captured, it shuts down the task scope to 116 * interrupt unfinished threads and wakeup the owner. This class is intended for cases 117 * where the result of any subtask will do ("invoke any") and where there is no need to 118 * wait for results of other unfinished subtasks. It defines methods to get the first 119 * result or throw an exception if all subtasks fail. 120 * <li> {@link ShutdownOnFailure ShutdownOnFailure} captures the exception of the first 121 * subtask to fail. Once captured, it shuts down the task scope to interrupt unfinished 122 * threads and wakeup the owner. This class is intended for cases where the results of all 123 * subtasks are required ("invoke all"); if any subtask fails then the results of other 124 * unfinished subtasks are no longer needed. If defines methods to throw an exception if 125 * any of the subtasks fail. 126 * </ol> 127 * 128 * <p> The following are two examples that use the two classes. In both cases, a pair of 129 * subtasks are forked to fetch resources from two URL locations "left" and "right". The 130 * first example creates a ShutdownOnSuccess object to capture the result of the first 131 * subtask to complete successfully, cancelling the other by way of shutting down the task 132 * scope. The main task waits in {@code join} until either subtask completes with a result 133 * or both subtasks fail. It invokes {@link ShutdownOnSuccess#result(Function) 134 * result(Function)} method to get the captured result. If both subtasks fail then this 135 * method throws a {@code WebApplicationException} with the exception from one of the 136 * subtasks as the cause. 137 * {@snippet lang=java : 138 * try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) { 139 * 140 * scope.fork(() -> fetch(left)); 141 * scope.fork(() -> fetch(right)); 142 * 143 * scope.join(); 144 * 145 * // @link regex="result(?=\()" target="ShutdownOnSuccess#result" : 146 * String result = scope.result(e -> new WebApplicationException(e)); 147 * 148 * ... 149 * } 150 * } 151 * The second example creates a ShutdownOnFailure object to capture the exception of the 152 * first subtask to fail, cancelling the other by way of shutting down the task scope. The 153 * main task waits in {@link #joinUntil(Instant)} until both subtasks complete with a 154 * result, either fails, or a deadline is reached. It invokes {@link 155 * ShutdownOnFailure#throwIfFailed(Function) throwIfFailed(Function)} to throw an exception 156 * if either subtask fails. This method is a no-op if both subtasks complete successfully. 157 * The example uses {@link Supplier#get()} to get the result of each subtask. Using 158 * {@code Supplier} instead of {@code Subtask} is preferred for common cases where the 159 * object returned by fork is only used to get the result of a subtask that completed 160 * successfully. 161 * {@snippet lang=java : 162 * Instant deadline = ... 163 * 164 * try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { 165 * 166 * Supplier<String> supplier1 = scope.fork(() -> query(left)); 167 * Supplier<String> supplier2 = scope.fork(() -> query(right)); 168 * 169 * scope.joinUntil(deadline); 170 * 171 * // @link substring="throwIfFailed" target="ShutdownOnFailure#throwIfFailed" : 172 * scope.throwIfFailed(e -> new WebApplicationException(e)); 173 * 174 * // both subtasks completed successfully 175 * String result = Stream.of(supplier1, supplier2) 176 * .map(Supplier::get) 177 * .collect(Collectors.joining(", ", "{ ", " }")); 178 * 179 * ... 180 * } 181 * } 182 * 183 * <h2>Extending StructuredTaskScope</h2> 184 * 185 * {@code StructuredTaskScope} can be extended, and the {@link #handleComplete(Subtask) 186 * handleComplete} method overridden, to implement policies other than those implemented 187 * by {@code ShutdownOnSuccess} and {@code ShutdownOnFailure}. A subclass may, for example, 188 * collect the results of subtasks that complete successfully and ignore subtasks that 189 * fail. It may collect exceptions when subtasks fail. It may invoke the {@link #shutdown() 190 * shutdown} method to shut down and cause {@link #join() join} to wakeup when some 191 * condition arises. 192 * 193 * <p> A subclass will typically define methods to make available results, state, or other 194 * outcome to code that executes after the {@code join} method. A subclass that collects 195 * results and ignores subtasks that fail may define a method that returns the results. 196 * A subclass that implements a policy to shut down when a subtask fails may define a 197 * method to get the exception of the first subtask to fail. 198 * 199 * <p> The following is an example of a simple {@code StructuredTaskScope} implementation 200 * that collects homogenous subtasks that complete successfully. It defines the method 201 * "{@code completedSuccessfully()}" that the main task can invoke after it joins. 202 * {@snippet lang=java : 203 * class CollectingScope<T> extends StructuredTaskScope<T> { 204 * private final Queue<Subtask<? extends T>> subtasks = new LinkedTransferQueue<>(); 205 * 206 * @Override 207 * protected void handleComplete(Subtask<? extends T> subtask) { 208 * if (subtask.state() == Subtask.State.SUCCESS) { 209 * subtasks.add(subtask); 210 * } 211 * } 212 * 213 * @Override 214 * public CollectingScope<T> join() throws InterruptedException { 215 * super.join(); 216 * return this; 217 * } 218 * 219 * public Stream<Subtask<? extends T>> completedSuccessfully() { 220 * // @link substring="ensureOwnerAndJoined" target="ensureOwnerAndJoined" : 221 * super.ensureOwnerAndJoined(); 222 * return subtasks.stream(); 223 * } 224 * } 225 * } 226 * <p> The implementations of the {@code completedSuccessfully()} method in the example 227 * invokes {@link #ensureOwnerAndJoined()} to ensure that the method can only be invoked 228 * by the owner thread and only after it has joined. 229 * 230 * <h2><a id="TreeStructure">Tree structure</a></h2> 231 * 232 * Task scopes form a tree where parent-child relations are established implicitly when 233 * opening a new task scope: 234 * <ul> 235 * <li> A parent-child relation is established when a thread started in a task scope 236 * opens its own task scope. A thread started in task scope "A" that opens task scope 237 * "B" establishes a parent-child relation where task scope "A" is the parent of task 238 * scope "B". 239 * <li> A parent-child relation is established with nesting. If a thread opens task 240 * scope "B", then opens task scope "C" (before it closes "B"), then the enclosing task 241 * scope "B" is the parent of the nested task scope "C". 242 * </ul> 243 * 244 * The <i>descendants</i> of a task scope are the child task scopes that it is a parent 245 * of, plus the descendants of the child task scopes, recursively. 246 * 247 * <p> The tree structure supports: 248 * <ul> 249 * <li> Inheritance of {@linkplain ScopedValue scoped values} across threads. 250 * <li> Confinement checks. The phrase "threads contained in the task scope" in method 251 * descriptions means threads started in the task scope or descendant scopes. 252 * </ul> 253 * 254 * <p> The following example demonstrates the inheritance of a scoped value. A scoped 255 * value {@code USERNAME} is bound to the value "{@code duke}". A {@code StructuredTaskScope} 256 * is created and its {@code fork} method invoked to start a thread to execute {@code 257 * childTask}. The thread inherits the scoped value <em>bindings</em> captured when 258 * creating the task scope. The code in {@code childTask} uses the value of the scoped 259 * value and so reads the value "{@code duke}". 260 * {@snippet lang=java : 261 * private static final ScopedValue<String> USERNAME = ScopedValue.newInstance(); 262 * 263 * // @link substring="runWhere" target="ScopedValue#runWhere(ScopedValue, Object, Runnable)" : 264 * ScopedValue.runWhere(USERNAME, "duke", () -> { 265 * try (var scope = new StructuredTaskScope<String>()) { 266 * 267 * scope.fork(() -> childTask()); // @highlight substring="fork" 268 * ... 269 * } 270 * }); 271 * 272 * ... 273 * 274 * String childTask() { 275 * // @link substring="get" target="ScopedValue#get()" : 276 * String name = USERNAME.get(); // "duke" 277 * ... 278 * } 279 * } 280 * 281 * <p> {@code StructuredTaskScope} does not define APIs that exposes the tree structure 282 * at this time. 283 * 284 * <p> Unless otherwise specified, passing a {@code null} argument to a constructor 285 * or method in this class will cause a {@link NullPointerException} to be thrown. 286 * 287 * <h2>Memory consistency effects</h2> 288 * 289 * <p> Actions in the owner thread of, or a thread contained in, the task scope prior to 290 * {@linkplain #fork forking} of a subtask 291 * <a href="{@docRoot}/java.base/java/util/concurrent/package-summary.html#MemoryVisibility"> 292 * <i>happen-before</i></a> any actions taken by that subtask, which in turn <i>happen-before</i> 293 * the subtask result is {@linkplain Subtask#get() retrieved} or <i>happen-before</i> any 294 * actions taken in a thread after {@linkplain #join() joining} of the task scope. 295 * 296 * @jls 17.4.5 Happens-before Order 297 * 298 * @param <T> the result type of tasks executed in the task scope 299 * @since 21 300 */ 301 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY) 302 public class StructuredTaskScope<T> implements AutoCloseable { 303 private final ThreadFactory factory; 304 private final ThreadFlock flock; 305 private final ReentrantLock shutdownLock = new ReentrantLock(); 306 307 // states: OPEN -> SHUTDOWN -> CLOSED 308 private static final int OPEN = 0; // initial state 309 private static final int SHUTDOWN = 1; 310 private static final int CLOSED = 2; 311 312 // state: set to SHUTDOWN by any thread, set to CLOSED by owner, read by any thread 313 private volatile int state; 314 315 // Counters to support checking that the task scope owner joins before processing 316 // results and attempts join before closing the task scope. These counters are 317 // accessed only by the owner thread. 318 private int forkRound; // incremented when the first subtask is forked after join 319 private int lastJoinAttempted; // set to the current fork round when join is attempted 320 private int lastJoinCompleted; // set to the current fork round when join completes 321 322 /** 323 * Represents a subtask forked with {@link #fork(Callable)}. 324 * @param <T> the result type 325 * @since 21 326 */ 327 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY) 328 public sealed interface Subtask<T> extends Supplier<T> permits SubtaskImpl { 329 /** 330 * {@return the value returning task provided to the {@code fork} method} 331 * 332 * @apiNote Task objects with unique identity may be used for correlation by 333 * implementations of {@link #handleComplete(Subtask) handleComplete}. 334 */ 335 Callable<? extends T> task(); 336 337 /** 338 * Represents the state of a subtask. 339 * @see Subtask#state() 340 * @since 21 341 */ 342 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY) 343 enum State { 344 /** 345 * The subtask result or exception is not available. This state indicates that 346 * the subtask was forked but has not completed, it completed after the task 347 * scope was {@linkplain #shutdown() shut down}, or it was forked after the 348 * task scope was shut down. 349 */ 350 UNAVAILABLE, 351 /** 352 * The subtask completed successfully with a result. The {@link Subtask#get() 353 * Subtask.get()} method can be used to obtain the result. This is a terminal 354 * state. 355 */ 356 SUCCESS, 357 /** 358 * The subtask failed with an exception. The {@link Subtask#exception() 359 * Subtask.exception()} method can be used to obtain the exception. This is a 360 * terminal state. 361 */ 362 FAILED, 363 } 364 365 /** 366 * {@return the state of the subtask} 367 */ 368 State state(); 369 370 /** 371 * Returns the result of the subtask. 372 * 373 * <p> To ensure correct usage, if the scope owner {@linkplain #fork(Callable) forks} 374 * a subtask, then it must join (with {@link #join() join} or {@link #joinUntil(Instant) 375 * joinUntil}) before it can obtain the result of the subtask. 376 * 377 * @return the possibly-null result 378 * @throws IllegalStateException if the subtask has not completed, did not complete 379 * successfully, or the current thread is the task scope owner and did not join 380 * after forking 381 * @see State#SUCCESS 382 */ 383 T get(); 384 385 /** 386 * {@return the exception thrown by the subtask} 387 * 388 * <p> To ensure correct usage, if the scope owner {@linkplain #fork(Callable) forks} 389 * a subtask, then it must join (with {@link #join() join} or {@link #joinUntil(Instant) 390 * joinUntil}) before it can obtain the exception thrown by the subtask. 391 * 392 * @throws IllegalStateException if the subtask has not completed, completed with 393 * a result, or the current thread is the task scope owner and did not join after 394 * forking 395 * @see State#FAILED 396 */ 397 Throwable exception(); 398 } 399 400 /** 401 * Creates a structured task scope with the given name and thread factory. The task 402 * scope is optionally named for the purposes of monitoring and management. The thread 403 * factory is used to {@link ThreadFactory#newThread(Runnable) create} threads when 404 * subtasks are {@linkplain #fork(Callable) forked}. The task scope is owned by the 405 * current thread. 406 * 407 * <p> Construction captures the current thread's {@linkplain ScopedValue scoped value} 408 * bindings for inheritance by threads started in the task scope. The 409 * <a href="#TreeStructure">Tree Structure</a> section in the class description details 410 * how parent-child relations are established implicitly for the purpose of inheritance 411 * of scoped value bindings. 412 * 413 * @param name the name of the task scope, can be null 414 * @param factory the thread factory 415 */ 416 @SuppressWarnings("this-escape") 417 public StructuredTaskScope(String name, ThreadFactory factory) { 418 this.factory = Objects.requireNonNull(factory, "'factory' is null"); 419 if (name == null) 420 name = Objects.toIdentityString(this); 421 this.flock = ThreadFlock.open(name); 422 } 423 424 /** 425 * Creates an unnamed structured task scope that creates virtual threads. The task 426 * scope is owned by the current thread. 427 * 428 * @implSpec This constructor is equivalent to invoking the 2-arg constructor with a 429 * name of {@code null} and a thread factory that creates virtual threads. 430 */ 431 public StructuredTaskScope() { 432 this(null, Thread.ofVirtual().factory()); 433 } 434 435 private IllegalStateException newIllegalStateExceptionScopeClosed() { 436 return new IllegalStateException("Task scope is closed"); 437 } 438 439 private IllegalStateException newIllegalStateExceptionNoJoin() { 440 return new IllegalStateException("Owner did not join after forking subtasks"); 441 } 442 443 /** 444 * Throws IllegalStateException if the scope is closed, returning the state if not 445 * closed. 446 */ 447 private int ensureOpen() { 448 int s = state; 449 if (s == CLOSED) 450 throw newIllegalStateExceptionScopeClosed(); 451 return s; 452 } 453 454 /** 455 * Throws WrongThreadException if the current thread is not the owner. 456 */ 457 private void ensureOwner() { 458 if (Thread.currentThread() != flock.owner()) 459 throw new WrongThreadException("Current thread not owner"); 460 } 461 462 /** 463 * Throws WrongThreadException if the current thread is not the owner 464 * or a thread contained in the tree. 465 */ 466 private void ensureOwnerOrContainsThread() { 467 Thread currentThread = Thread.currentThread(); 468 if (currentThread != flock.owner() && !flock.containsThread(currentThread)) 469 throw new WrongThreadException("Current thread not owner or thread in the tree"); 470 } 471 472 /** 473 * Throws IllegalStateException if the current thread is the owner, and the owner did 474 * not join after forking a subtask in the given fork round. 475 */ 476 private void ensureJoinedIfOwner(int round) { 477 if (Thread.currentThread() == flock.owner() && (round > lastJoinCompleted)) { 478 throw newIllegalStateExceptionNoJoin(); 479 } 480 } 481 482 /** 483 * Ensures that the current thread is the owner of this task scope and that it joined 484 * (with {@link #join()} or {@link #joinUntil(Instant)}) after {@linkplain #fork(Callable) 485 * forking} subtasks. 486 * 487 * @apiNote This method can be used by subclasses that define methods to make available 488 * results, state, or other outcome to code intended to execute after the join method. 489 * 490 * @throws WrongThreadException if the current thread is not the task scope owner 491 * @throws IllegalStateException if the task scope is open and task scope owner did 492 * not join after forking 493 */ 494 protected final void ensureOwnerAndJoined() { 495 ensureOwner(); 496 if (forkRound > lastJoinCompleted) { 497 throw newIllegalStateExceptionNoJoin(); 498 } 499 } 500 501 /** 502 * Invoked by a subtask when it completes successfully or fails in this task scope. 503 * This method is not invoked if a subtask completes after the task scope is 504 * {@linkplain #shutdown() shut down}. 505 * 506 * @implSpec The default implementation throws {@code NullPointerException} if the 507 * subtask is {@code null}. It throws {@link IllegalArgumentException} if the subtask 508 * has not completed. 509 * 510 * @apiNote The {@code handleComplete} method should be thread safe. It may be 511 * invoked by several threads concurrently. 512 * 513 * @param subtask the subtask 514 * 515 * @throws IllegalArgumentException if called with a subtask that has not completed 516 */ 517 protected void handleComplete(Subtask<? extends T> subtask) { 518 if (subtask.state() == Subtask.State.UNAVAILABLE) 519 throw new IllegalArgumentException(); 520 } 521 522 /** 523 * Starts a new thread in this task scope to execute a value-returning task, thus 524 * creating a <em>subtask</em> of this task scope. 525 * 526 * <p> The value-returning task is provided to this method as a {@link Callable}, the 527 * thread executes the task's {@link Callable#call() call} method. The thread is 528 * created with the task scope's {@link ThreadFactory}. It inherits the current thread's 529 * {@linkplain ScopedValue scoped value} bindings. The bindings must match the bindings 530 * captured when the task scope was created. 531 * 532 * <p> This method returns a {@link Subtask Subtask} to represent the <em>forked 533 * subtask</em>. The {@code Subtask} object can be used to obtain the result when 534 * the subtask completes successfully, or the exception when the subtask fails. To 535 * ensure correct usage, the {@link Subtask#get() get()} and {@link Subtask#exception() 536 * exception()} methods may only be called by the task scope owner after it has waited 537 * for all threads to finish with the {@link #join() join} or {@link #joinUntil(Instant)} 538 * methods. When the subtask completes, the thread invokes the {@link 539 * #handleComplete(Subtask) handleComplete} method to consume the completed subtask. 540 * If the task scope is {@linkplain #shutdown() shut down} before the subtask completes 541 * then the {@code handleComplete} method will not be invoked. 542 * 543 * <p> If this task scope is {@linkplain #shutdown() shutdown} (or in the process of 544 * shutting down) then the subtask will not run and the {@code handleComplete} method 545 * will not be invoked. 546 * 547 * <p> This method may only be invoked by the task scope owner or threads contained 548 * in the task scope. 549 * 550 * @implSpec This method may be overridden for customization purposes, wrapping tasks 551 * for example. If overridden, the subclass must invoke {@code super.fork} to start a 552 * new thread in this task scope. 553 * 554 * @param task the value-returning task for the thread to execute 555 * @param <U> the result type 556 * @return the subtask 557 * @throws IllegalStateException if this task scope is closed 558 * @throws WrongThreadException if the current thread is not the task scope owner or a 559 * thread contained in the task scope 560 * @throws StructureViolationException if the current scoped value bindings are not 561 * the same as when the task scope was created 562 * @throws RejectedExecutionException if the thread factory rejected creating a 563 * thread to run the subtask 564 */ 565 public <U extends T> Subtask<U> fork(Callable<? extends U> task) { 566 Objects.requireNonNull(task, "'task' is null"); 567 int s = ensureOpen(); // throws ISE if closed 568 569 // when forked by the owner, the subtask is forked in the current or next round 570 int round = -1; 571 if (Thread.currentThread() == flock.owner()) { 572 round = forkRound; 573 if (forkRound == lastJoinCompleted) { 574 // new round if first fork after join 575 round++; 576 } 577 } 578 579 SubtaskImpl<U> subtask = new SubtaskImpl<>(this, task, round); 580 if (s < SHUTDOWN) { 581 // create thread to run task 582 Thread thread = factory.newThread(subtask); 583 if (thread == null) { 584 throw new RejectedExecutionException("Rejected by thread factory"); 585 } 586 587 // attempt to start the thread 588 try { 589 flock.start(thread); 590 } catch (IllegalStateException e) { 591 // shutdown by another thread, or underlying flock is shutdown due 592 // to unstructured use 593 } 594 } 595 596 // force owner to join if this is the first fork in the round 597 if (Thread.currentThread() == flock.owner() && round > forkRound) { 598 forkRound = round; 599 } 600 601 // return forked subtask or a subtask that did not run 602 return subtask; 603 } 604 605 /** 606 * Wait for all threads to finish or the task scope to shut down. 607 */ 608 private void implJoin(Duration timeout) 609 throws InterruptedException, TimeoutException 610 { 611 ensureOwner(); 612 lastJoinAttempted = forkRound; 613 int s = ensureOpen(); // throws ISE if closed 614 if (s == OPEN) { 615 // wait for all threads, wakeup, interrupt, or timeout 616 if (timeout != null) { 617 flock.awaitAll(timeout); 618 } else { 619 flock.awaitAll(); 620 } 621 } 622 lastJoinCompleted = forkRound; 623 } 624 625 /** 626 * Wait for all subtasks started in this task scope to finish or the task scope to 627 * shut down. 628 * 629 * <p> This method waits for all subtasks by waiting for all threads {@linkplain 630 * #fork(Callable) started} in this task scope to finish execution. It stops waiting 631 * when all threads finish, the task scope is {@linkplain #shutdown() shut down}, or 632 * the current thread is {@linkplain Thread#interrupt() interrupted}. 633 * 634 * <p> This method may only be invoked by the task scope owner. 635 * 636 * @implSpec This method may be overridden for customization purposes or to return a 637 * more specific return type. If overridden, the subclass must invoke {@code 638 * super.join} to ensure that the method waits for threads in this task scope to 639 * finish. 640 * 641 * @return this task scope 642 * @throws IllegalStateException if this task scope is closed 643 * @throws WrongThreadException if the current thread is not the task scope owner 644 * @throws InterruptedException if interrupted while waiting 645 */ 646 public StructuredTaskScope<T> join() throws InterruptedException { 647 try { 648 implJoin(null); 649 } catch (TimeoutException e) { 650 throw new InternalError(); 651 } 652 return this; 653 } 654 655 /** 656 * Wait for all subtasks started in this task scope to finish or the task scope to 657 * shut down, up to the given deadline. 658 * 659 * <p> This method waits for all subtasks by waiting for all threads {@linkplain 660 * #fork(Callable) started} in this task scope to finish execution. It stops waiting 661 * when all threads finish, the task scope is {@linkplain #shutdown() shut down}, the 662 * deadline is reached, or the current thread is {@linkplain Thread#interrupt() 663 * interrupted}. 664 * 665 * <p> This method may only be invoked by the task scope owner. 666 * 667 * @implSpec This method may be overridden for customization purposes or to return a 668 * more specific return type. If overridden, the subclass must invoke {@code 669 * super.joinUntil} to ensure that the method waits for threads in this task scope to 670 * finish. 671 * 672 * @param deadline the deadline 673 * @return this task scope 674 * @throws IllegalStateException if this task scope is closed 675 * @throws WrongThreadException if the current thread is not the task scope owner 676 * @throws InterruptedException if interrupted while waiting 677 * @throws TimeoutException if the deadline is reached while waiting 678 */ 679 public StructuredTaskScope<T> joinUntil(Instant deadline) 680 throws InterruptedException, TimeoutException 681 { 682 Duration timeout = Duration.between(Instant.now(), deadline); 683 implJoin(timeout); 684 return this; 685 } 686 687 /** 688 * Interrupt all unfinished threads. 689 */ 690 private void implInterruptAll() { 691 flock.threads() 692 .filter(t -> t != Thread.currentThread()) 693 .forEach(t -> { 694 try { 695 t.interrupt(); 696 } catch (Throwable ignore) { } 697 }); 698 } 699 700 @SuppressWarnings("removal") 701 private void interruptAll() { 702 if (System.getSecurityManager() == null) { 703 implInterruptAll(); 704 } else { 705 PrivilegedAction<Void> pa = () -> { 706 implInterruptAll(); 707 return null; 708 }; 709 AccessController.doPrivileged(pa); 710 } 711 } 712 713 /** 714 * Shutdown the task scope if not already shutdown. Return true if this method 715 * shutdowns the task scope, false if already shutdown. 716 */ 717 private boolean implShutdown() { 718 shutdownLock.lock(); 719 try { 720 if (state < SHUTDOWN) { 721 // prevent new threads from starting 722 flock.shutdown(); 723 724 // set status before interrupting tasks 725 state = SHUTDOWN; 726 727 // interrupt all unfinished threads 728 interruptAll(); 729 730 return true; 731 } else { 732 // already shutdown 733 return false; 734 } 735 } finally { 736 shutdownLock.unlock(); 737 } 738 } 739 740 /** 741 * Shut down this task scope without closing it. Shutting down a task scope prevents 742 * new threads from starting, interrupts all unfinished threads, and causes the 743 * {@link #join() join} method to wakeup. Shutdown is useful for cases where the 744 * results of unfinished subtasks are no longer needed. It will typically be called 745 * by the {@link #handleComplete(Subtask)} implementation of a subclass that 746 * implements a policy to discard unfinished tasks once some outcome is reached. 747 * 748 * <p> More specifically, this method: 749 * <ul> 750 * <li> {@linkplain Thread#interrupt() Interrupts} all unfinished threads in the 751 * task scope (except the current thread). 752 * <li> Wakes up the task scope owner if it is waiting in {@link #join()} or {@link 753 * #joinUntil(Instant)}. If the task scope owner is not waiting then its next call to 754 * {@code join} or {@code joinUntil} will return immediately. 755 * </ul> 756 * 757 * <p> The {@linkplain Subtask.State state} of unfinished subtasks that complete at 758 * around the time that the task scope is shutdown is not defined. A subtask that 759 * completes successfully with a result, or fails with an exception, at around 760 * the time that the task scope is shutdown may or may not <i>transition</i> to a 761 * terminal state. 762 * 763 * <p> This method may only be invoked by the task scope owner or threads contained 764 * in the task scope. 765 * 766 * @implSpec This method may be overridden for customization purposes. If overridden, 767 * the subclass must invoke {@code super.shutdown} to ensure that the method shuts 768 * down the task scope. 769 * 770 * @apiNote 771 * There may be threads that have not finished because they are executing code that 772 * did not respond (or respond promptly) to thread interrupt. This method does not wait 773 * for these threads. When the owner invokes the {@link #close() close} method 774 * to close the task scope then it will wait for the remaining threads to finish. 775 * 776 * @throws IllegalStateException if this task scope is closed 777 * @throws WrongThreadException if the current thread is not the task scope owner or 778 * a thread contained in the task scope 779 * @see #isShutdown() 780 */ 781 public void shutdown() { 782 ensureOwnerOrContainsThread(); 783 int s = ensureOpen(); // throws ISE if closed 784 if (s < SHUTDOWN && implShutdown()) 785 flock.wakeup(); 786 } 787 788 /** 789 * {@return true if this task scope is shutdown, otherwise false} 790 * @see #shutdown() 791 */ 792 public final boolean isShutdown() { 793 return state >= SHUTDOWN; 794 } 795 796 /** 797 * Closes this task scope. 798 * 799 * <p> This method first shuts down the task scope (as if by invoking the {@link 800 * #shutdown() shutdown} method). It then waits for the threads executing any 801 * unfinished tasks to finish. If interrupted, this method will continue to wait for 802 * the threads to finish before completing with the interrupt status set. 803 * 804 * <p> This method may only be invoked by the task scope owner. If the task scope 805 * is already closed then the task scope owner invoking this method has no effect. 806 * 807 * <p> A {@code StructuredTaskScope} is intended to be used in a <em>structured 808 * manner</em>. If this method is called to close a task scope before nested task 809 * scopes are closed then it closes the underlying construct of each nested task scope 810 * (in the reverse order that they were created in), closes this task scope, and then 811 * throws {@link StructureViolationException}. 812 * Similarly, if this method is called to close a task scope while executing with 813 * {@linkplain ScopedValue scoped value} bindings, and the task scope was created 814 * before the scoped values were bound, then {@code StructureViolationException} is 815 * thrown after closing the task scope. 816 * If a thread terminates without first closing task scopes that it owns then 817 * termination will cause the underlying construct of each of its open tasks scopes to 818 * be closed. Closing is performed in the reverse order that the task scopes were 819 * created in. Thread termination may therefore be delayed when the task scope owner 820 * has to wait for threads forked in these task scopes to finish. 821 * 822 * @implSpec This method may be overridden for customization purposes. If overridden, 823 * the subclass must invoke {@code super.close} to close the task scope. 824 * 825 * @throws IllegalStateException thrown after closing the task scope if the task scope 826 * owner did not attempt to join after forking 827 * @throws WrongThreadException if the current thread is not the task scope owner 828 * @throws StructureViolationException if a structure violation was detected 829 */ 830 @Override 831 public void close() { 832 ensureOwner(); 833 int s = state; 834 if (s == CLOSED) 835 return; 836 837 try { 838 if (s < SHUTDOWN) 839 implShutdown(); 840 flock.close(); 841 } finally { 842 state = CLOSED; 843 } 844 845 // throw ISE if the owner didn't attempt to join after forking 846 if (forkRound > lastJoinAttempted) { 847 lastJoinCompleted = forkRound; 848 throw newIllegalStateExceptionNoJoin(); 849 } 850 } 851 852 @Override 853 public String toString() { 854 String name = flock.name(); 855 return switch (state) { 856 case OPEN -> name; 857 case SHUTDOWN -> name + "/shutdown"; 858 case CLOSED -> name + "/closed"; 859 default -> throw new InternalError(); 860 }; 861 } 862 863 /** 864 * Subtask implementation, runs the task specified to the fork method. 865 */ 866 private static final class SubtaskImpl<T> implements Subtask<T>, Runnable { 867 private static final AltResult RESULT_NULL = new AltResult(Subtask.State.SUCCESS); 868 869 private record AltResult(Subtask.State state, Throwable exception) { 870 AltResult(Subtask.State state) { 871 this(state, null); 872 } 873 } 874 875 private final StructuredTaskScope<? super T> scope; 876 private final Callable<? extends T> task; 877 private final int round; 878 private volatile Object result; 879 880 SubtaskImpl(StructuredTaskScope<? super T> scope, 881 Callable<? extends T> task, 882 int round) { 883 this.scope = scope; 884 this.task = task; 885 this.round = round; 886 } 887 888 @Override 889 public void run() { 890 T result = null; 891 Throwable ex = null; 892 try { 893 result = task.call(); 894 } catch (Throwable e) { 895 ex = e; 896 } 897 898 // nothing to do if task scope is shutdown 899 if (scope.isShutdown()) 900 return; 901 902 // capture result or exception, invoke handleComplete 903 if (ex == null) { 904 this.result = (result != null) ? result : RESULT_NULL; 905 } else { 906 this.result = new AltResult(State.FAILED, ex); 907 } 908 scope.handleComplete(this); 909 } 910 911 @Override 912 public Callable<? extends T> task() { 913 return task; 914 } 915 916 @Override 917 public Subtask.State state() { 918 Object result = this.result; 919 if (result == null) { 920 return State.UNAVAILABLE; 921 } else if (result instanceof AltResult alt) { 922 // null or failed 923 return alt.state(); 924 } else { 925 return State.SUCCESS; 926 } 927 } 928 929 @Override 930 public T get() { 931 scope.ensureJoinedIfOwner(round); 932 Object result = this.result; 933 if (result instanceof AltResult) { 934 if (result == RESULT_NULL) return null; 935 } else if (result != null) { 936 @SuppressWarnings("unchecked") 937 T r = (T) result; 938 return r; 939 } 940 throw new IllegalStateException( 941 "Result is unavailable or subtask did not complete successfully"); 942 } 943 944 @Override 945 public Throwable exception() { 946 scope.ensureJoinedIfOwner(round); 947 Object result = this.result; 948 if (result instanceof AltResult alt && alt.state() == State.FAILED) { 949 return alt.exception(); 950 } 951 throw new IllegalStateException( 952 "Exception is unavailable or subtask did not complete with exception"); 953 } 954 955 @Override 956 public String toString() { 957 String stateAsString = switch (state()) { 958 case UNAVAILABLE -> "[Unavailable]"; 959 case SUCCESS -> "[Completed successfully]"; 960 case FAILED -> { 961 Throwable ex = ((AltResult) result).exception(); 962 yield "[Failed: " + ex + "]"; 963 } 964 }; 965 return Objects.toIdentityString(this) + stateAsString; 966 } 967 } 968 969 /** 970 * A {@code StructuredTaskScope} that captures the result of the first subtask to 971 * complete {@linkplain Subtask.State#SUCCESS successfully}. Once captured, it 972 * {@linkplain #shutdown() shuts down} the task scope to interrupt unfinished threads 973 * and wakeup the task scope owner. The policy implemented by this class is intended 974 * for cases where the result of any subtask will do ("invoke any") and where the 975 * results of other unfinished subtasks are no longer needed. 976 * 977 * <p> Unless otherwise specified, passing a {@code null} argument to a method 978 * in this class will cause a {@link NullPointerException} to be thrown. 979 * 980 * @apiNote This class implements a policy to shut down the task scope when a subtask 981 * completes successfully. There shouldn't be any need to directly shut down the task 982 * scope with the {@link #shutdown() shutdown} method. 983 * 984 * @param <T> the result type 985 * @since 21 986 */ 987 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY) 988 public static final class ShutdownOnSuccess<T> extends StructuredTaskScope<T> { 989 private static final Object RESULT_NULL = new Object(); 990 private static final VarHandle FIRST_RESULT; 991 private static final VarHandle FIRST_EXCEPTION; 992 static { 993 try { 994 MethodHandles.Lookup l = MethodHandles.lookup(); 995 FIRST_RESULT = l.findVarHandle(ShutdownOnSuccess.class, "firstResult", Object.class); 996 FIRST_EXCEPTION = l.findVarHandle(ShutdownOnSuccess.class, "firstException", Throwable.class); 997 } catch (Exception e) { 998 throw new ExceptionInInitializerError(e); 999 } 1000 } 1001 private volatile Object firstResult; 1002 private volatile Throwable firstException; 1003 1004 /** 1005 * Constructs a new {@code ShutdownOnSuccess} with the given name and thread factory. 1006 * The task scope is optionally named for the purposes of monitoring and management. 1007 * The thread factory is used to {@link ThreadFactory#newThread(Runnable) create} 1008 * threads when subtasks are {@linkplain #fork(Callable) forked}. The task scope 1009 * is owned by the current thread. 1010 * 1011 * <p> Construction captures the current thread's {@linkplain ScopedValue scoped 1012 * value} bindings for inheritance by threads started in the task scope. The 1013 * <a href="#TreeStructure">Tree Structure</a> section in the class description 1014 * details how parent-child relations are established implicitly for the purpose 1015 * of inheritance of scoped value bindings. 1016 * 1017 * @param name the name of the task scope, can be null 1018 * @param factory the thread factory 1019 */ 1020 public ShutdownOnSuccess(String name, ThreadFactory factory) { 1021 super(name, factory); 1022 } 1023 1024 /** 1025 * Constructs a new unnamed {@code ShutdownOnSuccess} that creates virtual threads. 1026 * 1027 * @implSpec This constructor is equivalent to invoking the 2-arg constructor with 1028 * a name of {@code null} and a thread factory that creates virtual threads. 1029 */ 1030 public ShutdownOnSuccess() { 1031 this(null, Thread.ofVirtual().factory()); 1032 } 1033 1034 @Override 1035 protected void handleComplete(Subtask<? extends T> subtask) { 1036 if (firstResult != null) { 1037 // already captured a result 1038 return; 1039 } 1040 1041 if (subtask.state() == Subtask.State.SUCCESS) { 1042 // task succeeded 1043 T result = subtask.get(); 1044 Object r = (result != null) ? result : RESULT_NULL; 1045 if (FIRST_RESULT.compareAndSet(this, null, r)) { 1046 super.shutdown(); 1047 } 1048 } else if (firstException == null) { 1049 // capture the exception thrown by the first subtask that failed 1050 FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception()); 1051 } 1052 } 1053 1054 /** 1055 * Wait for a subtask started in this task scope to complete {@linkplain 1056 * Subtask.State#SUCCESS successfully} or all subtasks to complete. 1057 * 1058 * <p> This method waits for all subtasks by waiting for all threads {@linkplain 1059 * #fork(Callable) started} in this task scope to finish execution. It stops waiting 1060 * when all threads finish, a subtask completes successfully, or the current 1061 * thread is {@linkplain Thread#interrupt() interrupted}. It also stops waiting 1062 * if the {@link #shutdown() shutdown} method is invoked directly to shut down 1063 * this task scope. 1064 * 1065 * <p> This method may only be invoked by the task scope owner. 1066 * 1067 * @throws IllegalStateException {@inheritDoc} 1068 * @throws WrongThreadException {@inheritDoc} 1069 */ 1070 @Override 1071 public ShutdownOnSuccess<T> join() throws InterruptedException { 1072 super.join(); 1073 return this; 1074 } 1075 1076 /** 1077 * Wait for a subtask started in this task scope to complete {@linkplain 1078 * Subtask.State#SUCCESS successfully} or all subtasks to complete, up to the 1079 * given deadline. 1080 * 1081 * <p> This method waits for all subtasks by waiting for all threads {@linkplain 1082 * #fork(Callable) started} in this task scope to finish execution. It stops waiting 1083 * when all threads finish, a subtask completes successfully, the deadline is 1084 * reached, or the current thread is {@linkplain Thread#interrupt() interrupted}. 1085 * It also stops waiting if the {@link #shutdown() shutdown} method is invoked 1086 * directly to shut down this task scope. 1087 * 1088 * <p> This method may only be invoked by the task scope owner. 1089 * 1090 * @throws IllegalStateException {@inheritDoc} 1091 * @throws WrongThreadException {@inheritDoc} 1092 */ 1093 @Override 1094 public ShutdownOnSuccess<T> joinUntil(Instant deadline) 1095 throws InterruptedException, TimeoutException 1096 { 1097 super.joinUntil(deadline); 1098 return this; 1099 } 1100 1101 /** 1102 * {@return the result of the first subtask that completed {@linkplain 1103 * Subtask.State#SUCCESS successfully}} 1104 * 1105 * <p> When no subtask completed successfully, but a subtask {@linkplain 1106 * Subtask.State#FAILED failed} then {@code ExecutionException} is thrown with 1107 * the subtask's exception as the {@linkplain Throwable#getCause() cause}. 1108 * 1109 * @throws ExecutionException if no subtasks completed successfully but at least 1110 * one subtask failed 1111 * @throws IllegalStateException if no subtasks completed or the task scope owner 1112 * did not join after forking 1113 * @throws WrongThreadException if the current thread is not the task scope owner 1114 */ 1115 public T result() throws ExecutionException { 1116 return result(ExecutionException::new); 1117 } 1118 1119 /** 1120 * Returns the result of the first subtask that completed {@linkplain 1121 * Subtask.State#SUCCESS successfully}, otherwise throws an exception produced 1122 * by the given exception supplying function. 1123 * 1124 * <p> When no subtask completed successfully, but a subtask {@linkplain 1125 * Subtask.State#FAILED failed}, then the exception supplying function is invoked 1126 * with subtask's exception. 1127 * 1128 * @param esf the exception supplying function 1129 * @param <X> type of the exception to be thrown 1130 * @return the result of the first subtask that completed with a result 1131 * 1132 * @throws X if no subtasks completed successfully but at least one subtask failed 1133 * @throws IllegalStateException if no subtasks completed or the task scope owner 1134 * did not join after forking 1135 * @throws WrongThreadException if the current thread is not the task scope owner 1136 */ 1137 public <X extends Throwable> T result(Function<Throwable, ? extends X> esf) throws X { 1138 Objects.requireNonNull(esf); 1139 ensureOwnerAndJoined(); 1140 1141 Object result = firstResult; 1142 if (result == RESULT_NULL) { 1143 return null; 1144 } else if (result != null) { 1145 @SuppressWarnings("unchecked") 1146 T r = (T) result; 1147 return r; 1148 } 1149 1150 Throwable exception = firstException; 1151 if (exception != null) { 1152 X ex = esf.apply(exception); 1153 Objects.requireNonNull(ex, "esf returned null"); 1154 throw ex; 1155 } 1156 1157 throw new IllegalStateException("No completed subtasks"); 1158 } 1159 } 1160 1161 /** 1162 * A {@code StructuredTaskScope} that captures the exception of the first subtask to 1163 * {@linkplain Subtask.State#FAILED fail}. Once captured, it {@linkplain #shutdown() 1164 * shuts down} the task scope to interrupt unfinished threads and wakeup the task 1165 * scope owner. The policy implemented by this class is intended for cases where the 1166 * results for all subtasks are required ("invoke all"); if any subtask fails then the 1167 * results of other unfinished subtasks are no longer needed. 1168 * 1169 * <p> Unless otherwise specified, passing a {@code null} argument to a method 1170 * in this class will cause a {@link NullPointerException} to be thrown. 1171 * 1172 * @apiNote This class implements a policy to shut down the task scope when a subtask 1173 * fails. There shouldn't be any need to directly shut down the task scope with the 1174 * {@link #shutdown() shutdown} method. 1175 * 1176 * @since 21 1177 */ 1178 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY) 1179 public static final class ShutdownOnFailure extends StructuredTaskScope<Object> { 1180 private static final VarHandle FIRST_EXCEPTION; 1181 static { 1182 try { 1183 MethodHandles.Lookup l = MethodHandles.lookup(); 1184 FIRST_EXCEPTION = l.findVarHandle(ShutdownOnFailure.class, "firstException", Throwable.class); 1185 } catch (Exception e) { 1186 throw new ExceptionInInitializerError(e); 1187 } 1188 } 1189 private volatile Throwable firstException; 1190 1191 /** 1192 * Constructs a new {@code ShutdownOnFailure} with the given name and thread factory. 1193 * The task scope is optionally named for the purposes of monitoring and management. 1194 * The thread factory is used to {@link ThreadFactory#newThread(Runnable) create} 1195 * threads when subtasks are {@linkplain #fork(Callable) forked}. The task scope 1196 * is owned by the current thread. 1197 * 1198 * <p> Construction captures the current thread's {@linkplain ScopedValue scoped 1199 * value} bindings for inheritance by threads started in the task scope. The 1200 * <a href="#TreeStructure">Tree Structure</a> section in the class description 1201 * details how parent-child relations are established implicitly for the purpose 1202 * of inheritance of scoped value bindings. 1203 * 1204 * @param name the name of the task scope, can be null 1205 * @param factory the thread factory 1206 */ 1207 public ShutdownOnFailure(String name, ThreadFactory factory) { 1208 super(name, factory); 1209 } 1210 1211 /** 1212 * Constructs a new unnamed {@code ShutdownOnFailure} that creates virtual threads. 1213 * 1214 * @implSpec This constructor is equivalent to invoking the 2-arg constructor with 1215 * a name of {@code null} and a thread factory that creates virtual threads. 1216 */ 1217 public ShutdownOnFailure() { 1218 this(null, Thread.ofVirtual().factory()); 1219 } 1220 1221 @Override 1222 protected void handleComplete(Subtask<?> subtask) { 1223 if (subtask.state() == Subtask.State.FAILED 1224 && firstException == null 1225 && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception())) { 1226 super.shutdown(); 1227 } 1228 } 1229 1230 /** 1231 * Wait for all subtasks started in this task scope to complete or for a subtask 1232 * to {@linkplain Subtask.State#FAILED fail}. 1233 * 1234 * <p> This method waits for all subtasks by waiting for all threads {@linkplain 1235 * #fork(Callable) started} in this task scope to finish execution. It stops waiting 1236 * when all threads finish, a subtask fails, or the current thread is {@linkplain 1237 * Thread#interrupt() interrupted}. It also stops waiting if the {@link #shutdown() 1238 * shutdown} method is invoked directly to shut down this task scope. 1239 * 1240 * <p> This method may only be invoked by the task scope owner. 1241 * 1242 * @throws IllegalStateException {@inheritDoc} 1243 * @throws WrongThreadException {@inheritDoc} 1244 */ 1245 @Override 1246 public ShutdownOnFailure join() throws InterruptedException { 1247 super.join(); 1248 return this; 1249 } 1250 1251 /** 1252 * Wait for all subtasks started in this task scope to complete or for a subtask 1253 * to {@linkplain Subtask.State#FAILED fail}, up to the given deadline. 1254 * 1255 * <p> This method waits for all subtasks by waiting for all threads {@linkplain 1256 * #fork(Callable) started} in this task scope to finish execution. It stops waiting 1257 * when all threads finish, a subtask fails, the deadline is reached, or the current 1258 * thread is {@linkplain Thread#interrupt() interrupted}. It also stops waiting 1259 * if the {@link #shutdown() shutdown} method is invoked directly to shut down 1260 * this task scope. 1261 * 1262 * <p> This method may only be invoked by the task scope owner. 1263 * 1264 * @throws IllegalStateException {@inheritDoc} 1265 * @throws WrongThreadException {@inheritDoc} 1266 */ 1267 @Override 1268 public ShutdownOnFailure joinUntil(Instant deadline) 1269 throws InterruptedException, TimeoutException 1270 { 1271 super.joinUntil(deadline); 1272 return this; 1273 } 1274 1275 /** 1276 * Returns the exception of the first subtask that {@linkplain Subtask.State#FAILED 1277 * failed}. If no subtasks failed then an empty {@code Optional} is returned. 1278 * 1279 * @return the exception for the first subtask to fail or an empty optional if no 1280 * subtasks failed 1281 * 1282 * @throws WrongThreadException if the current thread is not the task scope owner 1283 * @throws IllegalStateException if the task scope owner did not join after forking 1284 */ 1285 public Optional<Throwable> exception() { 1286 ensureOwnerAndJoined(); 1287 return Optional.ofNullable(firstException); 1288 } 1289 1290 /** 1291 * Throws if a subtask {@linkplain Subtask.State#FAILED failed}. 1292 * If any subtask failed with an exception then {@code ExecutionException} is 1293 * thrown with the exception of the first subtask to fail as the {@linkplain 1294 * Throwable#getCause() cause}. This method does nothing if no subtasks failed. 1295 * 1296 * @throws ExecutionException if a subtask failed 1297 * @throws WrongThreadException if the current thread is not the task scope owner 1298 * @throws IllegalStateException if the task scope owner did not join after forking 1299 */ 1300 public void throwIfFailed() throws ExecutionException { 1301 throwIfFailed(ExecutionException::new); 1302 } 1303 1304 /** 1305 * Throws the exception produced by the given exception supplying function if a 1306 * subtask {@linkplain Subtask.State#FAILED failed}. If any subtask failed with 1307 * an exception then the function is invoked with the exception of the first 1308 * subtask to fail. The exception returned by the function is thrown. This method 1309 * does nothing if no subtasks failed. 1310 * 1311 * @param esf the exception supplying function 1312 * @param <X> type of the exception to be thrown 1313 * 1314 * @throws X produced by the exception supplying function 1315 * @throws WrongThreadException if the current thread is not the task scope owner 1316 * @throws IllegalStateException if the task scope owner did not join after forking 1317 */ 1318 public <X extends Throwable> 1319 void throwIfFailed(Function<Throwable, ? extends X> esf) throws X { 1320 ensureOwnerAndJoined(); 1321 Objects.requireNonNull(esf); 1322 Throwable exception = firstException; 1323 if (exception != null) { 1324 X ex = esf.apply(exception); 1325 Objects.requireNonNull(ex, "esf returned null"); 1326 throw ex; 1327 } 1328 } 1329 } 1330 }