1 /* 2 * Copyright (c) 2021, 2022, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package jdk.incubator.concurrent; 26 27 import java.lang.invoke.MethodHandles; 28 import java.lang.invoke.VarHandle; 29 import java.security.AccessController; 30 import java.security.PrivilegedAction; 31 import java.time.Duration; 32 import java.time.Instant; 33 import java.util.Comparator; 34 import java.util.Objects; 35 import java.util.Optional; 36 import java.util.Set; 37 import java.util.concurrent.Callable; 38 import java.util.concurrent.CancellationException; 39 import java.util.concurrent.ConcurrentHashMap; 40 import java.util.concurrent.ExecutionException; 41 import java.util.concurrent.Future; 42 import java.util.concurrent.FutureTask; 43 import java.util.concurrent.RejectedExecutionException; 44 import java.util.concurrent.ThreadFactory; 45 import java.util.concurrent.TimeoutException; 46 import java.util.concurrent.TimeUnit; 47 import java.util.concurrent.locks.ReentrantLock; 48 import java.util.function.Function; 49 import jdk.internal.misc.ThreadFlock; 50 51 /** 52 * A basic API for <em>structured concurrency</em>. {@code StructuredTaskScope} supports 53 * cases where a task splits into several concurrent subtasks, to be executed in their 54 * own threads, and where the subtasks must complete before the main task continues. A 55 * {@code StructuredTaskScope} can be used to ensure that the lifetime of a concurrent 56 * operation is confined by a <em>syntax block</em>, just like that of a sequential 57 * operation in structured programming. 58 * 59 * <h2>Basic usage</h2> 60 * 61 * A {@code StructuredTaskScope} is created with one of its public constructors. It defines 62 * the {@link #fork(Callable) fork} method to start a thread to execute a task, the {@link 63 * #join() join} method to wait for all threads to finish, and the {@link #close() close} 64 * method to close the task scope. The API is intended to be used with the {@code 65 * try-with-resources} construct. The intention is that code in the <em>block</em> uses 66 * the {@code fork} method to fork threads to execute the subtasks, wait for the threads 67 * to finish with the {@code join} method, and then <em>process the results</em>. 68 * Processing of results may include handling or re-throwing of exceptions. 69 * {@snippet lang=java : 70 * try (var scope = new StructuredTaskScope<Object>()) { 71 * 72 * Future<Integer> future1 = scope.fork(task1); // @highlight substring="fork" 73 * Future<String> future2 = scope.fork(task2); // @highlight substring="fork" 74 * 75 * scope.join(); // @highlight substring="join" 76 * 77 * ... process results/exceptions ... 78 * 79 * } // close // @highlight substring="close" 80 * } 81 * To ensure correct usage, the {@code join} and {@code close} methods may only be invoked 82 * by the <em>owner</em> (the thread that opened/created the task scope}, and the 83 * {@code close} method throws an exception after closing if the owner did not invoke the 84 * {@code join} method after forking. 85 * 86 * <p> {@code StructuredTaskScope} defines the {@link #shutdown() shutdown} method to shut 87 * down a task scope without closing it. Shutdown is useful for cases where a subtask 88 * completes with a result (or exception) and the results of other unfinished subtasks are 89 * no longer needed. If a subtask invokes {@code shutdown} while the owner is waiting in 90 * the {@code join} method then it will cause {@code join} to wakeup, all unfinished 91 * threads to be {@linkplain Thread#interrupt() interrupted} and prevents new threads 92 * from starting in the task scope. 93 * 94 * <h2>Subclasses with policies for common cases</h2> 95 * 96 * Two subclasses of {@code StructuredTaskScope} are defined to implement policy for 97 * common cases: 98 * <ol> 99 * <li> {@link ShutdownOnSuccess ShutdownOnSuccess} captures the first result and 100 * shuts down the task scope to interrupt unfinished threads and wakeup the owner. This 101 * class is intended for cases where the result of any subtask will do ("invoke any") 102 * and where there is no need to wait for results of other unfinished tasks. It defines 103 * methods to get the first result or throw an exception if all subtasks fail. 104 * <li> {@link ShutdownOnFailure ShutdownOnFailure} captures the first exception and 105 * shuts down the task scope. This class is intended for cases where the results of all 106 * subtasks are required ("invoke all"); if any subtask fails then the results of other 107 * unfinished subtasks are no longer needed. If defines methods to throw an exception if 108 * any of the subtasks fail. 109 * </ol> 110 * 111 * <p> The following are two examples that use the two classes. In both cases, a pair of 112 * subtasks are forked to fetch resources from two URL locations "left" and "right". The 113 * first example creates a ShutdownOnSuccess object to capture the result of the first 114 * subtask to complete normally, cancelling the other by way of shutting down the task 115 * scope. The main task waits in {@code join} until either subtask completes with a result 116 * or both subtasks fail. It invokes {@link ShutdownOnSuccess#result(Function) 117 * result(Function)} method to get the captured result. If both subtasks fail then this 118 * method throws a {@code WebApplicationException} with the exception from one of the 119 * subtasks as the cause. 120 * {@snippet lang=java : 121 * try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) { 122 * 123 * scope.fork(() -> fetch(left)); 124 * scope.fork(() -> fetch(right)); 125 * 126 * scope.join(); 127 * 128 * // @link regex="result(?=\()" target="ShutdownOnSuccess#result" : 129 * String result = scope.result(e -> new WebApplicationException(e)); 130 * 131 * ... 132 * } 133 * } 134 * The second example creates a ShutdownOnFailure object to capture the exception of the 135 * first subtask to fail, cancelling the other by way of shutting down the task scope. The 136 * main task waits in {@link #joinUntil(Instant)} until both subtasks complete with a 137 * result, either fails, or a deadline is reached. It invokes {@link 138 * ShutdownOnFailure#throwIfFailed(Function) throwIfFailed(Function)} to throw an exception 139 * when either subtask fails. This method is a no-op if no subtasks fail. The main task 140 * uses {@code Future}'s {@link Future#resultNow() resultNow()} method to retrieve the 141 * results. 142 * 143 * {@snippet lang=java : 144 * Instant deadline = ... 145 * 146 * try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { 147 * 148 * Future<String> future1 = scope.fork(() -> query(left)); 149 * Future<String> future2 = scope.fork(() -> query(right)); 150 * 151 * scope.joinUntil(deadline); 152 * 153 * // @link substring="throwIfFailed" target="ShutdownOnFailure#throwIfFailed" : 154 * scope.throwIfFailed(e -> new WebApplicationException(e)); 155 * 156 * // both subtasks completed successfully 157 * String result = Stream.of(future1, future2) 158 * // @link substring="Future::resultNow" target="Future#resultNow" : 159 * .map(Future::resultNow) 160 * .collect(Collectors.joining(", ", "{ ", " }")); 161 * 162 * ... 163 * } 164 * } 165 * 166 * <h2>Extending StructuredTaskScope</h2> 167 * 168 * {@code StructuredTaskScope} can be extended, and the {@link #handleComplete(Future) 169 * handleComplete} overridden, to implement policies other than those implemented by 170 * {@code ShutdownOnSuccess} and {@code ShutdownOnFailure}. The method may be overridden 171 * to, for example, collect the results of subtasks that complete with a result and ignore 172 * subtasks that fail. It may collect exceptions when subtasks fail. It may invoke the 173 * {@link #shutdown() shutdown} method to shut down and cause {@link #join() join} to 174 * wakeup when some condition arises. 175 * 176 * <p> A subclass will typically define methods to make available results, state, or other 177 * outcome to code that executes after the {@code join} method. A subclass that collects 178 * results and ignores subtasks that fail may define a method that returns a collection of 179 * results. A subclass that implements a policy to shut down when a subtask fails may 180 * define a method to retrieve the exception of the first subtask to fail. 181 * 182 * <p> The following is an example of a {@code StructuredTaskScope} implementation that 183 * collects the results of subtasks that complete successfully. It defines the method 184 * <b>{@code results()}</b> to be used by the main task to retrieve the results. 185 * 186 * {@snippet lang=java : 187 * class MyScope<T> extends StructuredTaskScope<T> { 188 * private final Queue<T> results = new ConcurrentLinkedQueue<>(); 189 * 190 * MyScope() { 191 * super(null, Thread.ofVirtual().factory()); 192 * } 193 * 194 * @Override 195 * // @link substring="handleComplete" target="handleComplete" : 196 * protected void handleComplete(Future<T> future) { 197 * if (future.state() == Future.State.SUCCESS) { 198 * T result = future.resultNow(); 199 * results.add(result); 200 * } 201 * } 202 * 203 * // Returns a stream of results from the subtasks that completed successfully 204 * public Stream<T> results() { // @highlight substring="results" 205 * return results.stream(); 206 * } 207 * } 208 * } 209 * 210 * <h2><a id="TreeStructure">Tree structure</a></h2> 211 * 212 * Task scopes form a tree where parent-child relations are established implicitly when 213 * opening a new task scope: 214 * <ul> 215 * <li> A parent-child relation is established when a thread started in a task scope 216 * opens its own task scope. A thread started in task scope "A" that opens task scope 217 * "B" establishes a parent-child relation where task scope "A" is the parent of task 218 * scope "B". 219 * <li> A parent-child relation is established with nesting. If a thread opens task 220 * scope "B", then opens task scope "C" (before it closes "B"), then the enclosing task 221 * scope "B" is the parent of the nested task scope "C". 222 * </ul> 223 * 224 * The <i>descendants</i> of a task scope are the child task scopes that it is a parent 225 * of, plus the descendants of the child task scopes, recursively. 226 * 227 * <p> The tree structure supports: 228 * <ul> 229 * <li> Inheritance of {@linkplain ScopedValue scoped values} across threads. 230 * <li> Confinement checks. The phrase "threads contained in the task scope" in method 231 * descriptions means threads started in the task scope or descendant scopes. 232 * </ul> 233 * 234 * <p> The following example demonstrates the inheritance of a scoped value. A scoped 235 * value {@code USERNAME} is bound to the value "{@code duke}". A {@code StructuredTaskScope} 236 * is created and its {@code fork} method invoked to start a thread to execute {@code 237 * childTask}. The thread inherits the scoped value <em>bindings</em> captured when 238 * creating the task scope. The code in {@code childTask} uses the value of the scoped 239 * value and so reads the value "{@code duke}". 240 * {@snippet lang=java : 241 * private static final ScopedValue<String> USERNAME = ScopedValue.newInstance(); 242 * 243 * // @link substring="where" target="ScopedValue#where(ScopedValue, Object, Runnable)" : 244 * ScopedValue.where(USERNAME, "duke", () -> { 245 * try (var scope = new StructuredTaskScope<String>()) { 246 * 247 * scope.fork(() -> childTask()); // @highlight substring="fork" 248 * ... 249 * } 250 * }); 251 * 252 * ... 253 * 254 * String childTask() { 255 * // @link substring="get" target="ScopedValue#get()" : 256 * String name = USERNAME.get(); // "duke" 257 * ... 258 * } 259 * } 260 * 261 * <p> {@code StructuredTaskScope} does not define APIs that exposes the tree structure 262 * at this time. 263 * 264 * <p> Unless otherwise specified, passing a {@code null} argument to a constructor 265 * or method in this class will cause a {@link NullPointerException} to be thrown. 266 * 267 * <h2>Memory consistency effects</h2> 268 * 269 * <p> Actions in the owner thread of, or a thread contained in, the task scope prior to 270 * {@linkplain #fork forking} of a {@code Callable} task 271 * <a href="{@docRoot}/java.base/java/util/concurrent/package-summary.html#MemoryVisibility"> 272 * <i>happen-before</i></a> any actions taken by that task, which in turn <i>happen-before</i> 273 * the task result is retrieved via its {@code Future}, or <i>happen-before</i> any actions 274 * taken in a thread after {@linkplain #join() joining} of the task scope. 275 * 276 * @jls 17.4.5 Happens-before Order 277 * 278 * @param <T> the result type of tasks executed in the scope 279 * @since 19 280 */ 281 public class StructuredTaskScope<T> implements AutoCloseable { 282 private static final VarHandle FUTURES; 283 static { 284 try { 285 MethodHandles.Lookup l = MethodHandles.lookup(); 286 FUTURES = l.findVarHandle(StructuredTaskScope.class, "futures", Set.class); 287 } catch (Exception e) { 288 throw new InternalError(e); 289 } 290 } 291 292 private final ThreadFactory factory; 293 private final ThreadFlock flock; 294 private final ReentrantLock shutdownLock = new ReentrantLock(); 295 296 // lazily created set of Future objects with threads waiting in Future::get 297 private volatile Set<Future<?>> futures; 298 299 // set by owner when it forks, reset by owner when it joins 300 private boolean needJoin; 301 302 // states: OPEN -> SHUTDOWN -> CLOSED 303 private static final int OPEN = 0; // initial state 304 private static final int SHUTDOWN = 1; 305 private static final int CLOSED = 2; 306 307 // scope state, set by owner, read by any thread 308 private volatile int state; 309 310 /** 311 * Creates a structured task scope with the given name and thread factory. The task 312 * scope is optionally named for the purposes of monitoring and management. The thread 313 * factory is used to {@link ThreadFactory#newThread(Runnable) create} threads when 314 * tasks are {@linkplain #fork(Callable) forked}. The task scope is owned by the 315 * current thread. 316 * 317 * <p> This method captures the current thread's {@linkplain ScopedValue scoped value} 318 * bindings for inheritance by threads created in the task scope. The 319 * <a href="#TreeStructure">Tree Structure</a> section in the class description 320 * details how parent-child relations are established implicitly for the purpose of 321 * inheritance of scoped value bindings. 322 * 323 * @param name the name of the task scope, can be null 324 * @param factory the thread factory 325 */ 326 public StructuredTaskScope(String name, ThreadFactory factory) { 327 this.factory = Objects.requireNonNull(factory, "'factory' is null"); 328 this.flock = ThreadFlock.open(name); 329 } 330 331 /** 332 * Creates an unnamed structured task scope that creates virtual threads. The task 333 * scope is owned by the current thread. 334 * 335 * <p> This constructor is equivalent to invoking the 2-arg constructor with a name 336 * of {@code null} and a thread factory that creates virtual threads. 337 */ 338 public StructuredTaskScope() { 339 this.factory = Thread.ofVirtual().factory(); 340 this.flock = ThreadFlock.open(null); 341 } 342 343 /** 344 * Throws WrongThreadException if the current thread is not the owner. 345 */ 346 private void ensureOwner() { 347 if (Thread.currentThread() != flock.owner()) 348 throw new WrongThreadException("Current thread not owner"); 349 } 350 351 /** 352 * Throws WrongThreadException if the current thread is not the owner 353 * or a thread contained in the tree. 354 */ 355 private void ensureOwnerOrContainsThread() { 356 Thread currentThread = Thread.currentThread(); 357 if (currentThread != flock.owner() && !flock.containsThread(currentThread)) 358 throw new WrongThreadException("Current thread not owner or thread in the tree"); 359 } 360 361 /** 362 * Tests if the task scope is shutdown. 363 */ 364 private boolean isShutdown() { 365 return state >= SHUTDOWN; 366 } 367 368 /** 369 * Track the given Future. 370 */ 371 private void track(Future<?> future) { 372 // create the set of Futures if not already created 373 Set<Future<?>> futures = this.futures; 374 if (futures == null) { 375 futures = ConcurrentHashMap.newKeySet(); 376 if (!FUTURES.compareAndSet(this, null, futures)) { 377 // lost the race 378 futures = this.futures; 379 } 380 } 381 futures.add(future); 382 } 383 384 /** 385 * Stop tracking the Future. 386 */ 387 private void untrack(Future<?> future) { 388 assert futures != null; 389 futures.remove(future); 390 } 391 392 /** 393 * Invoked when a task completes before the scope is shut down. 394 * 395 * <p> The {@code handleComplete} method should be thread safe. It may be invoked by 396 * several threads concurrently. 397 * 398 * @implSpec The default implementation does nothing. 399 * 400 * @param future the completed task 401 */ 402 protected void handleComplete(Future<T> future) { } 403 404 /** 405 * Starts a new thread to run the given task. 406 * 407 * <p> The new thread is created with the task scope's {@link ThreadFactory}. It 408 * inherits the current thread's {@linkplain ScopedValue scoped value} bindings. The 409 * bindings must match the bindings captured when the task scope was created. 410 * 411 * <p> If the task completes before the task scope is {@link #shutdown() shutdown} 412 * then the {@link #handleComplete(Future) handleComplete} method is invoked to 413 * consume the completed task. The {@code handleComplete} method is run when the task 414 * completes with a result or exception. If the {@code Future}'s {@link 415 * Future#cancel(boolean) cancel} method is used to cancel a task before the task scope 416 * is shut down, then the {@code handleComplete} method is run by the thread that 417 * invokes {@code cancel}. If the task scope shuts down at or around the same time 418 * that the task completes or is cancelled then the {@code handleComplete} method may 419 * or may not be invoked. 420 * 421 * <p> If this task scope is {@linkplain #shutdown() shutdown} (or in the process 422 * of shutting down) then {@code fork} returns a {@code Future} representing a {@link 423 * Future.State#CANCELLED cancelled} task that was not run. 424 * 425 * <p> This method may only be invoked by the task scope owner or threads contained 426 * in the task scope. The {@link Future#cancel(boolean) cancel} method of the returned 427 * {@code Future} object is also restricted to the task scope owner or threads contained 428 * in the task scope. The {@code cancel} method throws {@link WrongThreadException} 429 * if invoked from another thread. All other methods on the returned {@code Future} 430 * object, such as {@link Future#get() get}, are not restricted. 431 * 432 * @param task the task to run 433 * @param <U> the result type 434 * @return a future 435 * @throws IllegalStateException if this task scope is closed 436 * @throws WrongThreadException if the current thread is not the owner or a thread 437 * contained in the task scope 438 * @throws StructureViolationException if the current scoped value bindings are not 439 * the same as when the task scope was created 440 * @throws RejectedExecutionException if the thread factory rejected creating a 441 * thread to run the task 442 */ 443 public <U extends T> Future<U> fork(Callable<? extends U> task) { 444 Objects.requireNonNull(task, "'task' is null"); 445 446 // create future 447 var future = new FutureImpl<U>(this, task); 448 449 boolean shutdown = (state >= SHUTDOWN); 450 451 if (!shutdown) { 452 // create thread 453 Thread thread = factory.newThread(future); 454 if (thread == null) { 455 throw new RejectedExecutionException("Rejected by thread factory"); 456 } 457 458 // attempt to start the thread 459 try { 460 flock.start(thread); 461 } catch (IllegalStateException e) { 462 // shutdown or in the process of shutting down 463 shutdown = true; 464 } 465 } 466 467 if (shutdown) { 468 if (state == CLOSED) { 469 throw new IllegalStateException("Task scope is closed"); 470 } else { 471 future.cancel(false); 472 } 473 } 474 475 // if owner forks then it will need to join 476 if (Thread.currentThread() == flock.owner() && !needJoin) { 477 needJoin = true; 478 } 479 480 return future; 481 } 482 483 /** 484 * Wait for all threads to finish or the task scope to shut down. 485 */ 486 private void implJoin(Duration timeout) 487 throws InterruptedException, TimeoutException 488 { 489 ensureOwner(); 490 needJoin = false; 491 int s = state; 492 if (s >= SHUTDOWN) { 493 if (s == CLOSED) 494 throw new IllegalStateException("Task scope is closed"); 495 return; 496 } 497 498 // wait for all threads, wakeup, interrupt, or timeout 499 if (timeout != null) { 500 flock.awaitAll(timeout); 501 } else { 502 flock.awaitAll(); 503 } 504 } 505 506 /** 507 * Wait for all threads to finish or the task scope to shut down. This method waits 508 * until all threads started in the task scope finish execution (of both task and 509 * {@link #handleComplete(Future) handleComplete} method), the {@link #shutdown() 510 * shutdown} method is invoked to shut down the task scope, or the current thread 511 * is {@linkplain Thread#interrupt() interrupted}. 512 * 513 * <p> This method may only be invoked by the task scope owner. 514 * 515 * @return this task scope 516 * @throws IllegalStateException if this task scope is closed 517 * @throws WrongThreadException if the current thread is not the owner 518 * @throws InterruptedException if interrupted while waiting 519 */ 520 public StructuredTaskScope<T> join() throws InterruptedException { 521 try { 522 implJoin(null); 523 } catch (TimeoutException e) { 524 throw new InternalError(); 525 } 526 return this; 527 } 528 529 /** 530 * Wait for all threads to finish or the task scope to shut down, up to the given 531 * deadline. This method waits until all threads started in the task scope finish 532 * execution (of both task and {@link #handleComplete(Future) handleComplete} method), 533 * the {@link #shutdown() shutdown} method is invoked to shut down the task scope, 534 * the current thread is {@linkplain Thread#interrupt() interrupted}, or the deadline 535 * is reached. 536 * 537 * <p> This method may only be invoked by the task scope owner. 538 * 539 * @param deadline the deadline 540 * @return this task scope 541 * @throws IllegalStateException if this task scope is closed 542 * @throws WrongThreadException if the current thread is not the owner 543 * @throws InterruptedException if interrupted while waiting 544 * @throws TimeoutException if the deadline is reached while waiting 545 */ 546 public StructuredTaskScope<T> joinUntil(Instant deadline) 547 throws InterruptedException, TimeoutException 548 { 549 Duration timeout = Duration.between(Instant.now(), deadline); 550 implJoin(timeout); 551 return this; 552 } 553 554 /** 555 * Cancel all tracked Future objects. 556 */ 557 private void cancelTrackedFutures() { 558 Set<Future<?>> futures = this.futures; 559 if (futures != null) { 560 futures.forEach(f -> f.cancel(false)); 561 } 562 } 563 564 /** 565 * Interrupt all unfinished threads. 566 */ 567 private void implInterruptAll() { 568 flock.threads().forEach(t -> { 569 if (t != Thread.currentThread()) { 570 t.interrupt(); 571 } 572 }); 573 } 574 575 @SuppressWarnings("removal") 576 private void interruptAll() { 577 if (System.getSecurityManager() == null) { 578 implInterruptAll(); 579 } else { 580 PrivilegedAction<Void> pa = () -> { 581 implInterruptAll(); 582 return null; 583 }; 584 AccessController.doPrivileged(pa); 585 } 586 } 587 588 /** 589 * Shutdown the task scope if not already shutdown. Return true if this method 590 * shutdowns the task scope, false if already shutdown. 591 */ 592 private boolean implShutdown() { 593 if (state < SHUTDOWN) { 594 shutdownLock.lock(); 595 try { 596 if (state < SHUTDOWN) { 597 598 // prevent new threads from starting 599 flock.shutdown(); 600 601 // wakeup any threads waiting in Future::get 602 cancelTrackedFutures(); 603 604 // interrupt all unfinished threads 605 interruptAll(); 606 607 state = SHUTDOWN; 608 return true; 609 } 610 } finally { 611 shutdownLock.unlock(); 612 } 613 } 614 assert state >= SHUTDOWN; 615 return false; 616 } 617 618 /** 619 * Shut down the task scope without closing it. Shutting down a task scope prevents 620 * new threads from starting, interrupts all unfinished threads, and causes the 621 * {@link #join() join} method to wakeup. Shutdown is useful for cases where the 622 * results of unfinished subtasks are no longer needed. 623 * 624 * <p> More specifically, this method: 625 * <ul> 626 * <li> {@linkplain Future#cancel(boolean) Cancels} the tasks that have threads 627 * {@linkplain Future#get() waiting} on a result so that the waiting threads wakeup. 628 * <li> {@linkplain Thread#interrupt() Interrupts} all unfinished threads in the 629 * task scope (except the current thread). 630 * <li> Wakes up the owner if it is waiting in {@link #join()} or {@link 631 * #joinUntil(Instant)}. If the owner is not waiting then its next call to {@code 632 * join} or {@code joinUntil} will return immediately. 633 * </ul> 634 * 635 * <p> When this method completes then the {@code Future} objects for all tasks will 636 * be {@linkplain Future#isDone() done}, normally or abnormally. There may still 637 * be threads that have not finished because they are executing code that did not 638 * respond (or respond promptly) to thread interrupt. This method does not wait 639 * for these threads. When the owner invokes the {@link #close() close} method 640 * to close the task scope then it will wait for the remaining threads to finish. 641 * 642 * <p> This method may only be invoked by the task scope owner or threads contained 643 * in the task scope. 644 * 645 * @throws IllegalStateException if this task scope is closed 646 * @throws WrongThreadException if the current thread is not the owner or 647 * a thread contained in the task scope 648 */ 649 public void shutdown() { 650 ensureOwnerOrContainsThread(); 651 if (state == CLOSED) 652 throw new IllegalStateException("Task scope is closed"); 653 if (implShutdown()) 654 flock.wakeup(); 655 } 656 657 /** 658 * Closes this task scope. 659 * 660 * <p> This method first shuts down the task scope (as if by invoking the {@link 661 * #shutdown() shutdown} method). It then waits for the threads executing any 662 * unfinished tasks to finish. If interrupted then this method will continue to 663 * wait for the threads to finish before completing with the interrupt status set. 664 * 665 * <p> This method may only be invoked by the task scope owner. If the task scope 666 * is already closed then the owner invoking this method has no effect. 667 * 668 * <p> A {@code StructuredTaskScope} is intended to be used in a <em>structured 669 * manner</em>. If this method is called to close a task scope before nested task 670 * scopes are closed then it closes the underlying construct of each nested task scope 671 * (in the reverse order that they were created in), closes this task scope, and then 672 * throws {@link StructureViolationException}. 673 * 674 * Similarly, if this method is called to close a task scope while executing with 675 * {@linkplain ScopedValue scoped value} bindings, and the task scope was created 676 * before the scoped values were bound, then {@code StructureViolationException} is 677 * thrown after closing the task scope. 678 * 679 * If a thread terminates without first closing task scopes that it owns then 680 * termination will cause the underlying construct of each of its open tasks scopes to 681 * be closed. Closing is performed in the reverse order that the task scopes were 682 * created in. Thread termination may therefore be delayed when the owner has to wait 683 * for threads forked in these task scopes to finish. 684 * 685 * @throws IllegalStateException thrown after closing the task scope if the owner 686 * did not invoke join after forking 687 * @throws WrongThreadException if the current thread is not the owner 688 * @throws StructureViolationException if a structure violation was detected 689 */ 690 @Override 691 public void close() { 692 ensureOwner(); 693 if (state == CLOSED) 694 return; 695 696 try { 697 implShutdown(); 698 flock.close(); 699 } finally { 700 state = CLOSED; 701 } 702 703 if (needJoin) { 704 throw new IllegalStateException("Owner did not invoke join or joinUntil after fork"); 705 } 706 } 707 708 @Override 709 public String toString() { 710 StringBuilder sb = new StringBuilder(); 711 String name = flock.name(); 712 if (name != null) { 713 sb.append(name); 714 sb.append('/'); 715 } 716 sb.append(Objects.toIdentityString(this)); 717 int s = state; 718 if (s == CLOSED) 719 sb.append("/closed"); 720 else if (s == SHUTDOWN) 721 sb.append("/shutdown"); 722 return sb.toString(); 723 } 724 725 /** 726 * The Future implementation returned by the fork methods. Most methods are 727 * overridden to support cancellation when the task scope is shutdown. 728 * The blocking get methods register the Future with the task scope so that they 729 * are cancelled when the task scope shuts down. 730 */ 731 private static final class FutureImpl<V> extends FutureTask<V> { 732 private final StructuredTaskScope<V> scope; 733 734 @SuppressWarnings("unchecked") 735 FutureImpl(StructuredTaskScope<? super V> scope, Callable<? extends V> task) { 736 super((Callable<V>) task); 737 this.scope = (StructuredTaskScope<V>) scope; 738 } 739 740 @Override 741 protected void done() { 742 if (!scope.isShutdown()) { 743 scope.handleComplete(this); 744 } 745 } 746 747 private void cancelIfShutdown() { 748 if (scope.isShutdown() && !super.isDone()) { 749 super.cancel(false); 750 } 751 } 752 753 @Override 754 public boolean isDone() { 755 cancelIfShutdown(); 756 return super.isDone(); 757 } 758 759 @Override 760 public boolean isCancelled() { 761 cancelIfShutdown(); 762 return super.isCancelled(); 763 } 764 765 @Override 766 public boolean cancel(boolean mayInterruptIfRunning) { 767 scope.ensureOwnerOrContainsThread(); 768 cancelIfShutdown(); 769 return super.cancel(mayInterruptIfRunning); 770 } 771 772 @Override 773 public V get() throws InterruptedException, ExecutionException { 774 if (super.isDone()) 775 return super.get(); 776 scope.track(this); 777 try { 778 cancelIfShutdown(); 779 return super.get(); 780 } finally { 781 scope.untrack(this); 782 } 783 } 784 785 @Override 786 public V get(long timeout, TimeUnit unit) 787 throws InterruptedException, ExecutionException, TimeoutException { 788 Objects.requireNonNull(unit); 789 if (super.isDone()) 790 return super.get(); 791 scope.track(this); 792 try { 793 cancelIfShutdown(); 794 return super.get(timeout, unit); 795 } finally { 796 scope.untrack(this); 797 } 798 } 799 800 @Override 801 public V resultNow() { 802 cancelIfShutdown(); 803 return super.resultNow(); 804 } 805 806 @Override 807 public Throwable exceptionNow() { 808 cancelIfShutdown(); 809 return super.exceptionNow(); 810 } 811 812 @Override 813 public State state() { 814 cancelIfShutdown(); 815 return super.state(); 816 } 817 818 @Override 819 public String toString() { 820 cancelIfShutdown(); 821 return super.toString(); 822 } 823 } 824 825 /** 826 * Maps a Future.State to an int that can be compared. 827 * RUNNING < CANCELLED < FAILED < SUCCESS. 828 */ 829 private static int futureStateToInt(Future.State s) { 830 return switch (s) { 831 case RUNNING -> 0; 832 case CANCELLED -> 1; 833 case FAILED -> 2; 834 case SUCCESS -> 3; 835 }; 836 } 837 838 // RUNNING < CANCELLED < FAILED < SUCCESS 839 private static final Comparator<Future.State> FUTURE_STATE_COMPARATOR = 840 Comparator.comparingInt(StructuredTaskScope::futureStateToInt); 841 842 /** 843 * A {@code StructuredTaskScope} that captures the result of the first subtask to 844 * complete successfully. Once captured, it invokes the {@linkplain #shutdown() shutdown} 845 * method to interrupt unfinished threads and wakeup the owner. The policy 846 * implemented by this class is intended for cases where the result of any subtask 847 * will do ("invoke any") and where the results of other unfinished subtask are no 848 * longer needed. 849 * 850 * <p> Unless otherwise specified, passing a {@code null} argument to a method 851 * in this class will cause a {@link NullPointerException} to be thrown. 852 * 853 * @param <T> the result type 854 * @since 19 855 */ 856 public static final class ShutdownOnSuccess<T> extends StructuredTaskScope<T> { 857 private static final VarHandle FUTURE; 858 static { 859 try { 860 MethodHandles.Lookup l = MethodHandles.lookup(); 861 FUTURE = l.findVarHandle(ShutdownOnSuccess.class, "future", Future.class); 862 } catch (Exception e) { 863 throw new InternalError(e); 864 } 865 } 866 private volatile Future<T> future; 867 868 /** 869 * Constructs a new {@code ShutdownOnSuccess} with the given name and thread factory. 870 * The task scope is optionally named for the purposes of monitoring and management. 871 * The thread factory is used to {@link ThreadFactory#newThread(Runnable) create} 872 * threads when tasks are {@linkplain #fork(Callable) forked}. The task scope is 873 * owned by the current thread. 874 * 875 * <p> This method captures the current thread's {@linkplain ScopedValue scoped value} 876 * bindings for inheritance by threads created in the task scope. The 877 * <a href="StructuredTaskScope.html#TreeStructure">Tree Structure</a> section in 878 * the class description details how parent-child relations are established 879 * implicitly for the purpose of inheritance of scoped value bindings. 880 * 881 * @param name the name of the task scope, can be null 882 * @param factory the thread factory 883 */ 884 public ShutdownOnSuccess(String name, ThreadFactory factory) { 885 super(name, factory); 886 } 887 888 /** 889 * Constructs a new unnamed {@code ShutdownOnSuccess} that creates virtual threads. 890 * 891 * <p> This constructor is equivalent to invoking the 2-arg constructor with a 892 * name of {@code null} and a thread factory that creates virtual threads. 893 */ 894 public ShutdownOnSuccess() { 895 super(null, Thread.ofVirtual().factory()); 896 } 897 898 /** 899 * Shut down the given task scope when invoked for the first time with a {@code 900 * Future} for a task that completed with a result. 901 * 902 * @param future the completed task 903 * @see #shutdown() 904 * @see Future.State#SUCCESS 905 */ 906 @Override 907 protected void handleComplete(Future<T> future) { 908 Future.State state = future.state(); 909 if (state == Future.State.RUNNING) { 910 throw new IllegalArgumentException("Task is not completed"); 911 } 912 913 Future<T> f; 914 while (((f = this.future) == null) 915 || FUTURE_STATE_COMPARATOR.compare(f.state(), state) < 0) { 916 if (FUTURE.compareAndSet(this, f, future)) { 917 if (state == Future.State.SUCCESS) 918 shutdown(); 919 break; 920 } 921 } 922 } 923 924 /** 925 * {@inheritDoc} 926 * @return this task scope 927 * @throws IllegalStateException {@inheritDoc} 928 * @throws WrongThreadException {@inheritDoc} 929 */ 930 @Override 931 public ShutdownOnSuccess<T> join() throws InterruptedException { 932 super.join(); 933 return this; 934 } 935 936 /** 937 * {@inheritDoc} 938 * @return this task scope 939 * @throws IllegalStateException {@inheritDoc} 940 * @throws WrongThreadException {@inheritDoc} 941 */ 942 @Override 943 public ShutdownOnSuccess<T> joinUntil(Instant deadline) 944 throws InterruptedException, TimeoutException 945 { 946 super.joinUntil(deadline); 947 return this; 948 } 949 950 /** 951 * {@return the result of the first subtask that completed with a result} 952 * 953 * <p> When no subtask completed with a result but a task completed with an 954 * exception then {@code ExecutionException} is thrown with the exception as the 955 * {@linkplain Throwable#getCause() cause}. If only cancelled subtasks were 956 * notified to the {@code handleComplete} method then {@code CancellationException} 957 * is thrown. 958 * 959 * @apiNote This method is intended to be invoked by the task scope owner after it 960 * has invoked {@link #join() join} (or {@link #joinUntil(Instant) joinUntil}). 961 * A future release may add enforcement to prevent the method being called by 962 * other threads or before joining. 963 * 964 * @throws ExecutionException if no subtasks completed with a result but a subtask 965 * completed with an exception 966 * @throws CancellationException if all subtasks were cancelled 967 * @throws IllegalStateException if the handle method was not invoked with a 968 * completed subtask 969 */ 970 public T result() throws ExecutionException { 971 Future<T> f = future; 972 if (f == null) { 973 throw new IllegalStateException("No completed subtasks"); 974 } 975 return switch (f.state()) { 976 case SUCCESS -> f.resultNow(); 977 case FAILED -> throw new ExecutionException(f.exceptionNow()); 978 case CANCELLED -> throw new CancellationException(); 979 default -> throw new InternalError("Unexpected state: " + f); 980 }; 981 982 } 983 984 /** 985 * Returns the result of the first subtask that completed with a result, otherwise 986 * throws an exception produced by the given exception supplying function. 987 * 988 * <p> When no subtask completed with a result but a subtask completed with an 989 * exception then the exception supplying function is invoked with the exception. 990 * If only cancelled subtasks were notified to the {@code handleComplete} method 991 * then the exception supplying function is invoked with a {@code CancellationException}. 992 * 993 * @apiNote This method is intended to be invoked by the task scope owner after it 994 * has invoked {@link #join() join} (or {@link #joinUntil(Instant) joinUntil}). 995 * A future release may add enforcement to prevent the method being called by 996 * other threads or before joining. 997 * 998 * @param esf the exception supplying function 999 * @param <X> type of the exception to be thrown 1000 * @return the result of the first subtask that completed with a result 1001 * @throws X if no subtask completed with a result 1002 * @throws IllegalStateException if the handle method was not invoked with a 1003 * completed subtask 1004 */ 1005 public <X extends Throwable> T result(Function<Throwable, ? extends X> esf) throws X { 1006 Objects.requireNonNull(esf); 1007 Future<T> f = future; 1008 if (f == null) { 1009 throw new IllegalStateException("No completed subtasks"); 1010 } 1011 Future.State state = f.state(); 1012 if (state == Future.State.SUCCESS) { 1013 return f.resultNow(); 1014 } else { 1015 Throwable throwable = (state == Future.State.FAILED) 1016 ? f.exceptionNow() 1017 : new CancellationException(); 1018 X ex = esf.apply(throwable); 1019 Objects.requireNonNull(ex, "esf returned null"); 1020 throw ex; 1021 } 1022 } 1023 } 1024 1025 /** 1026 * A {@code StructuredTaskScope} that captures the exception of the first subtask to 1027 * complete abnormally. Once captured, it invokes the {@linkplain #shutdown() shutdown} 1028 * method to interrupt unfinished threads and wakeup the owner. The policy implemented 1029 * by this class is intended for cases where the results for all subtasks are required 1030 * ("invoke all"); if any subtask fails then the results of other unfinished subtasks 1031 * are no longer needed. 1032 * 1033 * <p> Unless otherwise specified, passing a {@code null} argument to a method 1034 * in this class will cause a {@link NullPointerException} to be thrown. 1035 * 1036 * @since 19 1037 */ 1038 public static final class ShutdownOnFailure extends StructuredTaskScope<Object> { 1039 private static final VarHandle FUTURE; 1040 static { 1041 try { 1042 MethodHandles.Lookup l = MethodHandles.lookup(); 1043 FUTURE = l.findVarHandle(ShutdownOnFailure.class, "future", Future.class); 1044 } catch (Exception e) { 1045 throw new InternalError(e); 1046 } 1047 } 1048 private volatile Future<Object> future; 1049 1050 /** 1051 * Constructs a new {@code ShutdownOnFailure} with the given name and thread factory. 1052 * The task scope is optionally named for the purposes of monitoring and management. 1053 * The thread factory is used to {@link ThreadFactory#newThread(Runnable) create} 1054 * threads when tasks are {@linkplain #fork(Callable) forked}. The task scope 1055 * is owned by the current thread. 1056 * 1057 * <p> This method captures the current thread's {@linkplain ScopedValue scoped value} 1058 * bindings for inheritance by threads created in the task scope. The 1059 * <a href="StructuredTaskScope.html#TreeStructure">Tree Structure</a> section in 1060 * the class description details how parent-child relations are established 1061 * implicitly for the purpose of inheritance of scoped value bindings. 1062 * 1063 * @param name the name of the task scope, can be null 1064 * @param factory the thread factory 1065 */ 1066 public ShutdownOnFailure(String name, ThreadFactory factory) { 1067 super(name, factory); 1068 } 1069 1070 /** 1071 * Constructs a new unnamed {@code ShutdownOnFailure} that creates virtual threads. 1072 * 1073 * <p> This constructor is equivalent to invoking the 2-arg constructor with a 1074 * name of {@code null} and a thread factory that creates virtual threads. 1075 */ 1076 public ShutdownOnFailure() { 1077 super(null, Thread.ofVirtual().factory()); 1078 } 1079 1080 /** 1081 * Shut down the given task scope when invoked for the first time with a {@code 1082 * Future} for a task that completed abnormally (exception or cancelled). 1083 * 1084 * @param future the completed task 1085 * @see #shutdown() 1086 * @see Future.State#FAILED 1087 * @see Future.State#CANCELLED 1088 */ 1089 @Override 1090 protected void handleComplete(Future<Object> future) { 1091 Future.State state = future.state(); 1092 if (state == Future.State.RUNNING) { 1093 throw new IllegalArgumentException("Task is not completed"); 1094 } else if (state == Future.State.SUCCESS) { 1095 return; 1096 } 1097 1098 // A failed task overrides a cancelled task. 1099 // The first failed or cancelled task causes the scope to shutdown. 1100 Future<Object> f; 1101 while (((f = this.future) == null) 1102 || FUTURE_STATE_COMPARATOR.compare(f.state(), state) < 0) { 1103 if (FUTURE.compareAndSet(this, f, future)) { 1104 shutdown(); 1105 break; 1106 } 1107 } 1108 } 1109 1110 /** 1111 * {@inheritDoc} 1112 * @return this task scope 1113 * @throws IllegalStateException {@inheritDoc} 1114 * @throws WrongThreadException {@inheritDoc} 1115 */ 1116 @Override 1117 public ShutdownOnFailure join() throws InterruptedException { 1118 super.join(); 1119 return this; 1120 } 1121 1122 /** 1123 * {@inheritDoc} 1124 * @return this task scope 1125 * @throws IllegalStateException {@inheritDoc} 1126 * @throws WrongThreadException {@inheritDoc} 1127 */ 1128 @Override 1129 public ShutdownOnFailure joinUntil(Instant deadline) 1130 throws InterruptedException, TimeoutException 1131 { 1132 super.joinUntil(deadline); 1133 return this; 1134 } 1135 1136 /** 1137 * Returns the exception for the first subtask that completed with an exception. 1138 * If no subtask completed with an exception but cancelled subtasks were notified 1139 * to the {@code handleComplete} method then a {@code CancellationException} 1140 * is returned. If no subtasks completed abnormally then an empty {@code Optional} 1141 * is returned. 1142 * 1143 * @apiNote This method is intended to be invoked by the task scope owner after it 1144 * has invoked {@link #join() join} (or {@link #joinUntil(Instant) joinUntil}). 1145 * A future release may add enforcement to prevent the method being called by 1146 * other threads or before joining. 1147 * 1148 * @return the exception for a subtask that completed abnormally or an empty 1149 * optional if no subtasks completed abnormally 1150 */ 1151 public Optional<Throwable> exception() { 1152 Future<Object> f = future; 1153 if (f != null) { 1154 Throwable throwable = (f.state() == Future.State.FAILED) 1155 ? f.exceptionNow() 1156 : new CancellationException(); 1157 return Optional.of(throwable); 1158 } else { 1159 return Optional.empty(); 1160 } 1161 } 1162 1163 /** 1164 * Throws if a subtask completed abnormally. If any subtask completed with an 1165 * exception then {@code ExecutionException} is thrown with the exception of the 1166 * first subtask to fail as the {@linkplain Throwable#getCause() cause}. If no 1167 * subtask completed with an exception but cancelled subtasks were notified to the 1168 * {@code handleComplete} method then {@code CancellationException} is thrown. 1169 * This method does nothing if no subtasks completed abnormally. 1170 * 1171 * @apiNote This method is intended to be invoked by the task scope owner after it 1172 * has invoked {@link #join() join} (or {@link #joinUntil(Instant) joinUntil}). 1173 * A future release may add enforcement to prevent the method being called by 1174 * other threads or before joining. 1175 * 1176 * @throws ExecutionException if a subtask completed with an exception 1177 * @throws CancellationException if no subtasks completed with an exception but 1178 * subtasks were cancelled 1179 */ 1180 public void throwIfFailed() throws ExecutionException { 1181 Future<Object> f = future; 1182 if (f != null) { 1183 if (f.state() == Future.State.FAILED) { 1184 throw new ExecutionException(f.exceptionNow()); 1185 } else { 1186 throw new CancellationException(); 1187 } 1188 } 1189 } 1190 1191 /** 1192 * Throws the exception produced by the given exception supplying function if 1193 * a subtask completed abnormally. If any subtask completed with an exception then 1194 * the function is invoked with the exception of the first subtask to fail. 1195 * If no subtask completed with an exception but cancelled subtasks were notified 1196 * to the {@code handleComplete} method then the function is called with a {@code 1197 * CancellationException}. The exception returned by the function is thrown. 1198 * This method does nothing if no subtasks completed abnormally. 1199 * 1200 * @apiNote This method is intended to be invoked by the task scope owner after it 1201 * has invoked {@link #join() join} (or {@link #joinUntil(Instant) joinUntil}). 1202 * A future release may add enforcement to prevent the method being called by 1203 * other threads or before joining. 1204 * 1205 * @param esf the exception supplying function 1206 * @param <X> type of the exception to be thrown 1207 * @throws X produced by the exception supplying function 1208 */ 1209 public <X extends Throwable> 1210 void throwIfFailed(Function<Throwable, ? extends X> esf) throws X { 1211 Objects.requireNonNull(esf); 1212 Future<Object> f = future; 1213 if (f != null) { 1214 Throwable throwable = (f.state() == Future.State.FAILED) 1215 ? f.exceptionNow() 1216 : new CancellationException(); 1217 X ex = esf.apply(throwable); 1218 Objects.requireNonNull(ex, "esf returned null"); 1219 throw ex; 1220 } 1221 } 1222 } 1223 }