1 /* 2 * Copyright (c) 2021, 2022, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package jdk.incubator.concurrent; 26 27 import java.lang.invoke.MethodHandles; 28 import java.lang.invoke.VarHandle; 29 import java.security.AccessController; 30 import java.security.PrivilegedAction; 31 import java.time.Duration; 32 import java.time.Instant; 33 import java.util.Comparator; 34 import java.util.Objects; 35 import java.util.Optional; 36 import java.util.Set; 37 import java.util.concurrent.Callable; 38 import java.util.concurrent.CancellationException; 39 import java.util.concurrent.ConcurrentHashMap; 40 import java.util.concurrent.ExecutionException; 41 import java.util.concurrent.Future; 42 import java.util.concurrent.FutureTask; 43 import java.util.concurrent.RejectedExecutionException; 44 import java.util.concurrent.ThreadFactory; 45 import java.util.concurrent.TimeoutException; 46 import java.util.concurrent.TimeUnit; 47 import java.util.concurrent.locks.ReentrantLock; 48 import java.util.function.Function; 49 import jdk.internal.misc.PreviewFeatures; 50 import jdk.internal.misc.ThreadFlock; 51 52 /** 53 * A basic API for <em>structured concurrency</em>. {@code StructuredTaskScope} supports 54 * cases where a task splits into several concurrent subtasks, to be executed in their 55 * own threads, and where the subtasks must complete before the main task continues. A 56 * {@code StructuredTaskScope} can be used to ensure that the lifetime of a concurrent 57 * operation is confined by a <em>syntax block</em>, just like that of a sequential 58 * operation in structured programming. 59 * 60 * <h2>Basic usage</h2> 61 * 62 * A {@code StructuredTaskScope} is created with one of its public constructors. It defines 63 * the {@link #fork(Callable) fork} method to start a thread to execute a task, the {@link 64 * #join() join} method to wait for all threads to finish, and the {@link #close() close} 65 * method to close the task scope. The API is intended to be used with the {@code 66 * try-with-resources} construct. The intention is that code in the <em>block</em> uses 67 * the {@code fork} method to fork threads to execute the subtasks, wait for the threads 68 * to finish with the {@code join} method, and then <em>process the results</em>. 69 * Processing of results may include handling or re-throwing of exceptions. 70 * {@snippet lang=java : 71 * try (var scope = new StructuredTaskScope<Object>()) { 72 * 73 * Future<Integer> future1 = scope.fork(task1); // @highlight substring="fork" 74 * Future<String> future2 = scope.fork(task2); // @highlight substring="fork" 75 * 76 * scope.join(); // @highlight substring="join" 77 * 78 * ... process results/exceptions ... 79 * 80 * } // close // @highlight substring="close" 81 * } 82 * To ensure correct usage, the {@code join} and {@code close} methods may only be invoked 83 * by the <em>owner</em> (the thread that opened/created the task scope}, and the 84 * {@code close} method throws an exception after closing if the owner did not invoke the 85 * {@code join} method after forking. 86 * 87 * <p> {@code StructuredTaskScope} defines the {@link #shutdown() shutdown} method to shut 88 * down a task scope without closing it. Shutdown is useful for cases where a subtask 89 * completes with a result (or exception) and the results of other unfinished subtasks are 90 * no longer needed. If a subtask invokes {@code shutdown} while the owner is waiting in 91 * the {@code join} method then it will cause {@code join} to wakeup, all unfinished 92 * threads to be {@linkplain Thread#interrupt() interrupted} and prevents new threads 93 * from starting in the task scope. 94 * 95 * <h2>Subclasses with policies for common cases</h2> 96 * 97 * Two subclasses of {@code StructuredTaskScope} are defined to implement policy for 98 * common cases: 99 * <ol> 100 * <li> {@link ShutdownOnSuccess ShutdownOnSuccess} captures the first result and 101 * shuts down the task scope to interrupt unfinished threads and wakeup the owner. This 102 * class is intended for cases where the result of any subtask will do ("invoke any") 103 * and where there is no need to wait for results of other unfinished tasks. It defines 104 * methods to get the first result or throw an exception if all subtasks fail. 105 * <li> {@link ShutdownOnFailure ShutdownOnFailure} captures the first exception and 106 * shuts down the task scope. This class is intended for cases where the results of all 107 * subtasks are required ("invoke all"); if any subtask fails then the results of other 108 * unfinished subtasks are no longer needed. If defines methods to throw an exception if 109 * any of the subtasks fail. 110 * </ol> 111 * 112 * <p> The following are two examples that use the two classes. In both cases, a pair of 113 * subtasks are forked to fetch resources from two URL locations "left" and "right". The 114 * first example creates a ShutdownOnSuccess object to capture the result of the first 115 * subtask to complete normally, cancelling the other by way of shutting down the task 116 * scope. The main task waits in {@code join} until either subtask completes with a result 117 * or both subtasks fail. It invokes {@link ShutdownOnSuccess#result(Function) 118 * result(Function)} method to get the captured result. If both subtasks fail then this 119 * method throws a {@code WebApplicationException} with the exception from one of the 120 * subtasks as the cause. 121 * {@snippet lang=java : 122 * try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) { 123 * 124 * scope.fork(() -> fetch(left)); 125 * scope.fork(() -> fetch(right)); 126 * 127 * scope.join(); 128 * 129 * // @link regex="result(?=\()" target="ShutdownOnSuccess#result" : 130 * String result = scope.result(e -> new WebApplicationException(e)); 131 * 132 * ... 133 * } 134 * } 135 * The second example creates a ShutdownOnFailure object to capture the exception of the 136 * first subtask to fail, cancelling the other by way of shutting down the task scope. The 137 * main task waits in {@link #joinUntil(Instant)} until both subtasks complete with a 138 * result, either fails, or a deadline is reached. It invokes {@link 139 * ShutdownOnFailure#throwIfFailed(Function) throwIfFailed(Function)} to throw an exception 140 * when either subtask fails. This method is a no-op if no subtasks fail. The main task 141 * uses {@code Future}'s {@link Future#resultNow() resultNow()} method to retrieve the 142 * results. 143 * 144 * {@snippet lang=java : 145 * Instant deadline = ... 146 * 147 * try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { 148 * 149 * Future<String> future1 = scope.fork(() -> query(left)); 150 * Future<String> future2 = scope.fork(() -> query(right)); 151 * 152 * scope.joinUntil(deadline); 153 * 154 * // @link substring="throwIfFailed" target="ShutdownOnFailure#throwIfFailed" : 155 * scope.throwIfFailed(e -> new WebApplicationException(e)); 156 * 157 * // both subtasks completed successfully 158 * String result = Stream.of(future1, future2) 159 * // @link substring="Future::resultNow" target="Future#resultNow" : 160 * .map(Future::resultNow) 161 * .collect(Collectors.joining(", ", "{ ", " }")); 162 * 163 * ... 164 * } 165 * } 166 * 167 * <h2>Extending StructuredTaskScope</h2> 168 * 169 * {@code StructuredTaskScope} can be extended, and the {@link #handleComplete(Future) 170 * handleComplete} overridden, to implement policies other than those implemented by 171 * {@code ShutdownOnSuccess} and {@code ShutdownOnFailure}. The method may be overridden 172 * to, for example, collect the results of subtasks that complete with a result and ignore 173 * subtasks that fail. It may collect exceptions when subtasks fail. It may invoke the 174 * {@link #shutdown() shutdown} method to shut down and cause {@link #join() join} to 175 * wakeup when some condition arises. 176 * 177 * <p> A subclass will typically define methods to make available results, state, or other 178 * outcome to code that executes after the {@code join} method. A subclass that collects 179 * results and ignores subtasks that fail may define a method that returns a collection of 180 * results. A subclass that implements a policy to shut down when a subtask fails may 181 * define a method to retrieve the exception of the first subtask to fail. 182 * 183 * <p> The following is an example of a {@code StructuredTaskScope} implementation that 184 * collects the results of subtasks that complete successfully. It defines the method 185 * <b>{@code results()}</b> to be used by the main task to retrieve the results. 186 * 187 * {@snippet lang=java : 188 * class MyScope<T> extends StructuredTaskScope<T> { 189 * private final Queue<T> results = new ConcurrentLinkedQueue<>(); 190 * 191 * MyScope() { 192 * super(null, Thread.ofVirtual().factory()); 193 * } 194 * 195 * @Override 196 * // @link substring="handleComplete" target="handleComplete" : 197 * protected void handleComplete(Future<T> future) { 198 * if (future.state() == Future.State.SUCCESS) { 199 * T result = future.resultNow(); 200 * results.add(result); 201 * } 202 * } 203 * 204 * // Returns a stream of results from the subtasks that completed successfully 205 * public Stream<T> results() { // @highlight substring="results" 206 * return results.stream(); 207 * } 208 * } 209 * } 210 * 211 * <h2><a id="TreeStructure">Tree structure</a></h2> 212 * 213 * StructuredTaskScopes form a tree where parent-child relations are established 214 * implicitly when opening a new task scope: 215 * <ul> 216 * <li> A parent-child relation is established when a thread started in a task scope 217 * opens its own task scope. A thread started in task scope "A" that opens task scope 218 * "B" establishes a parent-child relation where task scope "A" is the parent of task 219 * scope "B". 220 * <li> A parent-child relation is established with nesting. If a thread opens task 221 * scope "B", then opens task scope "C" (before it closes "B"), then the enclosing task 222 * scope "B" is the parent of the nested task scope "C". 223 * </ul> 224 * 225 * <p> The tree structure supports: 226 * <ul> 227 * <li> Inheritance of {@linkplain ExtentLocal extent-local} variables across threads. 228 * <li> Confinement checks. The phrase "threads contained in the task scope" in method 229 * descriptions means threads started in the task scope or descendant scopes. 230 * </ul> 231 * 232 * <p> The following example demonstrates the inheritance of an extent-local variable. An 233 * extent local {@code NAME} is bound to the value "duke". A StructuredTaskScope is created 234 * and its {@code fork} method invoked to start a thread to execute {@code childTask}. 235 * The thread inherits the extent-local {@linkplain ExtentLocal.Carrier bindings} captured 236 * when creating the task scope. The code in {@code childTask} uses the value of the 237 * extent-local and so reads the value "duke". 238 * {@snippet lang=java : 239 * private static final ExtentLocal<String> NAME = ExtentLocal.newInstance(); 240 * 241 * // @link substring="where" target="ExtentLocal#where" : 242 * ExtentLocal.where(NAME, "duke").run(() -> { 243 * try (var scope = new StructuredTaskScope<String>()) { 244 * 245 * scope.fork(() -> childTask()); // @highlight substring="fork" 246 * ... 247 * } 248 * }); 249 * 250 * ... 251 * 252 * String childTask() { 253 * String name = NAME.get(); // "duke" // @highlight substring="get" 254 * ... 255 * } 256 * } 257 * 258 * <p> {@code StructuredTaskScope} does not define APIs that exposes the tree structure 259 * at this time. 260 * 261 * <p> Unless otherwise specified, passing a {@code null} argument to a constructor 262 * or method in this class will cause a {@link NullPointerException} to be thrown. 263 * 264 * <h2>Memory consistency effects</h2> 265 * 266 * <p> Actions in the owner thread of, or a thread contained in, the task scope prior to 267 * {@linkplain #fork forking} of a {@code Callable} task 268 * <a href="../../../../java.base/java/util/concurrent/package-summary.html#MemoryVisibility"> 269 * <i>happen-before</i></a> any actions taken by that task, which in turn <i>happen-before</i> 270 * the task result is retrieved via its {@code Future}, or <i>happen-before</i> any actions 271 * taken in a thread after {@linkplain #join() joining} of the task scope. 272 * 273 * @jls 17.4.5 Happens-before Order 274 * 275 * @param <T> the result type of tasks executed in the scope 276 * @since 19 277 */ 278 public class StructuredTaskScope<T> implements AutoCloseable { 279 private static final VarHandle FUTURES; 280 static { 281 try { 282 MethodHandles.Lookup l = MethodHandles.lookup(); 283 FUTURES = l.findVarHandle(StructuredTaskScope.class, "futures", Set.class); 284 } catch (Exception e) { 285 throw new InternalError(e); 286 } 287 } 288 289 private final ThreadFactory factory; 290 private final ThreadFlock flock; 291 private final ReentrantLock shutdownLock = new ReentrantLock(); 292 293 // lazily created set of Future objects with threads waiting in Future::get 294 private volatile Set<Future<?>> futures; 295 296 // set by owner when it forks, reset by owner when it joins 297 private boolean needJoin; 298 299 // states: OPEN -> SHUTDOWN -> CLOSED 300 private static final int OPEN = 0; // initial state 301 private static final int SHUTDOWN = 1; 302 private static final int CLOSED = 2; 303 304 // scope state, set by owner, read by any thread 305 private volatile int state; 306 307 /** 308 * Creates a structured task scope with the given name and thread factory. The task 309 * scope is optionally named for the purposes of monitoring and management. The thread 310 * factory is used to {@link ThreadFactory#newThread(Runnable) create} threads when 311 * tasks are {@linkplain #fork(Callable) forked}. The task scope is owned by the 312 * current thread. 313 * 314 * <p> This method captures the current thread's {@linkplain ExtentLocal extent-local} 315 * bindings for inheritance by threads created in the task scope. The 316 * <a href="#TreeStructure">Tree Structure</a> section in the class description 317 * details how parent-child relations are established implicitly for the purpose of 318 * inheritance of extent-local bindings. 319 * 320 * @param name the name of the task scope, can be null 321 * @param factory the thread factory 322 */ 323 public StructuredTaskScope(String name, ThreadFactory factory) { 324 this.factory = Objects.requireNonNull(factory, "'factory' is null"); 325 this.flock = ThreadFlock.open(name); 326 } 327 328 /** 329 * Creates an unnamed structured task scope that creates virtual threads. The task 330 * scope is owned by the current thread. 331 * 332 * <p> This constructor is equivalent to invoking the 2-arg constructor with a name 333 * of {@code null} and a thread factory that creates virtual threads. 334 * 335 * @throws UnsupportedOperationException if preview features are not enabled 336 */ 337 public StructuredTaskScope() { 338 PreviewFeatures.ensureEnabled(); 339 this.factory = Thread.ofVirtual().factory(); 340 this.flock = ThreadFlock.open(null); 341 } 342 343 /** 344 * Throws WrongThreadException if the current thread is not the owner. 345 */ 346 private void ensureOwner() { 347 if (Thread.currentThread() != flock.owner()) 348 throw new WrongThreadException("Current thread not owner"); 349 } 350 351 /** 352 * Throws WrongThreadException if the current thread is not the owner 353 * or a thread contained in the tree. 354 */ 355 private void ensureOwnerOrContainsThread() { 356 Thread currentThread = Thread.currentThread(); 357 if (currentThread != flock.owner() && !flock.containsThread(currentThread)) 358 throw new WrongThreadException("Current thread not owner or thread in the tree"); 359 } 360 361 /** 362 * Tests if the task scope is shutdown. 363 */ 364 private boolean isShutdown() { 365 return state >= SHUTDOWN; 366 } 367 368 /** 369 * Track the given Future. 370 */ 371 private void track(Future<?> future) { 372 // create the set of Futures if not already created 373 Set<Future<?>> futures = this.futures; 374 if (futures == null) { 375 futures = ConcurrentHashMap.newKeySet(); 376 if (!FUTURES.compareAndSet(this, null, futures)) { 377 // lost the race 378 futures = this.futures; 379 } 380 } 381 futures.add(future); 382 } 383 384 /** 385 * Stop tracking the Future. 386 */ 387 private void untrack(Future<?> future) { 388 assert futures != null; 389 futures.remove(future); 390 } 391 392 /** 393 * Invoked when a task completes before the scope is shut down. 394 * 395 * <p> The {@code handleComplete} method should be thread safe. It may be invoked by 396 * several threads concurrently. 397 * 398 * @implSpec The default implementation does nothing. 399 * 400 * @param future the completed task 401 */ 402 protected void handleComplete(Future<T> future) { } 403 404 /** 405 * Starts a new thread to run the given task. 406 * 407 * <p> The new thread is created with the task scope's {@link ThreadFactory}. It 408 * inherits the current thread's {@linkplain ExtentLocal extent-local} bindings. The 409 * bindings must match the bindings captured when the task scope was created. 410 * 411 * <p> If the task completes before the task scope is {@link #shutdown() shutdown} 412 * then the {@link #handleComplete(Future) handle} method is invoked to consume the 413 * completed task. The {@code handleComplete} method is run when the task completes 414 * with a result or exception. If the {@code Future} {@link Future#cancel(boolean) 415 * cancel} method is used the cancel a task before the task scope is shut down, then 416 * the {@code handleComplete} method is run by the thread that invokes {@code cancel}. 417 * If the task scope shuts down at or around the same time that the task completes or 418 * is cancelled then the {@code handleComplete} method may or may not be invoked. 419 * 420 * <p> If this task scope is {@linkplain #shutdown() shutdown} (or in the process 421 * of shutting down) then {@code fork} returns a {@code Future} representing a {@link 422 * Future.State#CANCELLED cancelled} task that was not run. 423 * 424 * <p> This method may only be invoked by the task scope owner or threads contained 425 * in the task scope. The {@link Future#cancel(boolean) cancel} method of the returned 426 * {@code Future} object is also restricted to the task scope owner or threads contained 427 * in the task scope. The {@code cancel} method throws {@link WrongThreadException} 428 * if invoked from another thread. All other methods on the returned {@code Future} 429 * object, such as {@link Future#get() get}, are not restricted. 430 * 431 * @param task the task to run 432 * @param <U> the result type 433 * @return a future 434 * @throws IllegalStateException if this task scope is closed 435 * @throws WrongThreadException if the current thread is not the owner or a thread 436 * contained in the task scope 437 * @throws StructureViolationException if the current extent-local bindings are not 438 * the same as when the task scope was created 439 * @throws RejectedExecutionException if the thread factory rejected creating a 440 * thread to run the task 441 */ 442 public <U extends T> Future<U> fork(Callable<? extends U> task) { 443 Objects.requireNonNull(task, "'task' is null"); 444 445 // create future 446 var future = new FutureImpl<U>(this, task); 447 448 boolean shutdown = (state >= SHUTDOWN); 449 450 if (!shutdown) { 451 // create thread 452 Thread thread = factory.newThread(future); 453 if (thread == null) { 454 throw new RejectedExecutionException("Rejected by thread factory"); 455 } 456 457 // attempt to start the thread 458 try { 459 flock.start(thread); 460 } catch (IllegalStateException e) { 461 // shutdown or in the process of shutting down 462 shutdown = true; 463 } 464 } 465 466 if (shutdown) { 467 if (state == CLOSED) { 468 throw new IllegalStateException("Task scope is closed"); 469 } else { 470 future.cancel(false); 471 } 472 } 473 474 // if owner forks then it will need to join 475 if (Thread.currentThread() == flock.owner() && !needJoin) { 476 needJoin = true; 477 } 478 479 return future; 480 } 481 482 /** 483 * Wait for all threads to finish or the task scope to shut down. 484 */ 485 private void implJoin(Duration timeout) 486 throws InterruptedException, TimeoutException 487 { 488 ensureOwner(); 489 needJoin = false; 490 int s = state; 491 if (s >= SHUTDOWN) { 492 if (s == CLOSED) 493 throw new IllegalStateException("Task scope is closed"); 494 return; 495 } 496 497 // wait for all threads, wakeup, interrupt, or timeout 498 if (timeout != null) { 499 flock.awaitAll(timeout); 500 } else { 501 flock.awaitAll(); 502 } 503 } 504 505 /** 506 * Wait for all threads to finish or the task scope to shut down. This method waits 507 * until all threads started in the task scope finish execution (of both task and 508 * {@link #handleComplete(Future) handleComplete} method), the {@link #shutdown() 509 * shutdown} method is invoked to shut down the task scope, or the current thread 510 * is {@linkplain Thread#interrupt() interrupted}. 511 * 512 * <p> This method may only be invoked by the task scope owner. 513 * 514 * @return this task scope 515 * @throws IllegalStateException if this task scope is closed 516 * @throws WrongThreadException if the current thread is not the owner 517 * @throws InterruptedException if interrupted while waiting 518 */ 519 public StructuredTaskScope<T> join() throws InterruptedException { 520 try { 521 implJoin(null); 522 } catch (TimeoutException e) { 523 throw new InternalError(); 524 } 525 return this; 526 } 527 528 /** 529 * Wait for all threads to finish or the task scope to shut down, up to the given 530 * deadline. This method waits until all threads started in the task scope finish 531 * execution (of both task and {@link #handleComplete(Future) handleComplete} method), 532 * the {@link #shutdown() shutdown} method is invoked to shut down the task scope, 533 * the current thread is {@linkplain Thread#interrupt() interrupted}, or the deadline 534 * is reached. 535 * 536 * <p> This method may only be invoked by the task scope owner. 537 * 538 * @param deadline the deadline 539 * @return this task scope 540 * @throws IllegalStateException if this task scope is closed 541 * @throws WrongThreadException if the current thread is not the owner 542 * @throws InterruptedException if interrupted while waiting 543 * @throws TimeoutException if the deadline is reached while waiting 544 */ 545 public StructuredTaskScope<T> joinUntil(Instant deadline) 546 throws InterruptedException, TimeoutException 547 { 548 Duration timeout = Duration.between(Instant.now(), deadline); 549 implJoin(timeout); 550 return this; 551 } 552 553 /** 554 * Cancel all tracked Future objects. 555 */ 556 private void cancelTrackedFutures() { 557 Set<Future<?>> futures = this.futures; 558 if (futures != null) { 559 futures.forEach(f -> f.cancel(false)); 560 } 561 } 562 563 /** 564 * Interrupt all unfinished threads. 565 */ 566 private void implInterruptAll() { 567 flock.threads().forEach(t -> { 568 if (t != Thread.currentThread()) { 569 t.interrupt(); 570 } 571 }); 572 } 573 574 @SuppressWarnings("removal") 575 private void interruptAll() { 576 if (System.getSecurityManager() == null) { 577 implInterruptAll(); 578 } else { 579 PrivilegedAction<Void> pa = () -> { 580 implInterruptAll(); 581 return null; 582 }; 583 AccessController.doPrivileged(pa); 584 } 585 } 586 587 /** 588 * Shutdown the task scope if not already shutdown. Return true if this method 589 * shutdowns the task scope, false if already shutdown. 590 */ 591 private boolean implShutdown() { 592 if (state < SHUTDOWN) { 593 shutdownLock.lock(); 594 try { 595 if (state < SHUTDOWN) { 596 597 // prevent new threads from starting 598 flock.shutdown(); 599 600 // wakeup any threads waiting in Future::get 601 cancelTrackedFutures(); 602 603 // interrupt all unfinished threads 604 interruptAll(); 605 606 state = SHUTDOWN; 607 return true; 608 } 609 } finally { 610 shutdownLock.unlock(); 611 } 612 } 613 assert state >= SHUTDOWN; 614 return false; 615 } 616 617 /** 618 * Shut down the task scope without closing it. Shutting down a task scope prevents 619 * new threads from starting, interrupts all unfinished threads, and causes the 620 * {@link #join() join} method to wakeup. Shutdown is useful for cases where the 621 * results of unfinished subtasks are no longer needed. 622 * 623 * <p> More specifically, this method: 624 * <ul> 625 * <li> {@linkplain Future#cancel(boolean) Cancels} the tasks that have threads 626 * {@linkplain Future#get() waiting} on a result so that the waiting threads wakeup. 627 * <li> {@linkplain Thread#interrupt() Interrupts} all unfinished threads in the 628 * task scope (except the current thread). 629 * <li> Wakes up the owner if it is waiting in {@link #join()} or {@link 630 * #joinUntil(Instant)}. If the owner is not waiting then its next call to {@code 631 * join} or {@code joinUntil} will return immediately. 632 * </ul> 633 * 634 * <p> When this method completes then the {@code Future} objects for all tasks will 635 * be {@linkplain Future#isDone() done}, normally or abnormally. There may still 636 * be threads that have not finished because they are executing code that did not 637 * respond (or respond promptly) to thread interrupt. This method does not wait 638 * for these threads. When the owner invokes the {@link #close() close} method 639 * to close the task scope then it will wait for the remaining threads to finish. 640 * 641 * <p> This method may only be invoked by the task scope owner or threads contained 642 * in the task scope. 643 * 644 * @throws IllegalStateException if this task scope is closed 645 * @throws WrongThreadException if the current thread is not the owner or 646 * a thread contained in the task scope 647 */ 648 public void shutdown() { 649 ensureOwnerOrContainsThread(); 650 if (state == CLOSED) 651 throw new IllegalStateException("Task scope is closed"); 652 if (implShutdown()) 653 flock.wakeup(); 654 } 655 656 /** 657 * Closes this task scope. 658 * 659 * <p> This method first shuts down the task scope (as if by invoking the {@link 660 * #shutdown() shutdown} method). It then waits for the threads executing any 661 * unfinished tasks to finish. If interrupted then this method will continue to 662 * wait for the threads to finish before completing with the interrupt status set. 663 * 664 * <p> This method may only be invoked by the task scope owner. If the task scope 665 * is already closed then the owner invoking this method has no effect. 666 * 667 * <p> A {@code StructuredTaskScope} is intended to be used in a <em>structured 668 * manner</em>. If this method is called to close a task scope before nested task 669 * scopes are closed then it closes the underlying construct of each nested task scope 670 * (in the reverse order that they were created in), closes this task scope, and then 671 * throws {@link StructureViolationException}. 672 * 673 * Similarly, if called to close a task scope that <em>encloses</em> {@linkplain 674 * ExtentLocal.Carrier#run(Runnable) operations} with extent-local bindings then 675 * it also throws {@code StructureViolationException} after closing the task scope. 676 * 677 * If a thread terminates without first closing task scopes that it owns then 678 * termination will cause the underlying construct of each of its open tasks scopes to 679 * be closed. Closing is performed in the reverse order that the task scopes were 680 * created in. Thread termination may therefore be delayed when the owner has to wait 681 * for threads forked in these task scopes to finish. 682 * 683 * @throws IllegalStateException thrown after closing the task scope if the owner 684 * did not invoke join after forking 685 * @throws WrongThreadException if the current thread is not the owner 686 * @throws StructureViolationException if a structure violation was detected 687 */ 688 @Override 689 public void close() { 690 ensureOwner(); 691 if (state == CLOSED) 692 return; 693 694 try { 695 implShutdown(); 696 flock.close(); 697 } finally { 698 state = CLOSED; 699 } 700 701 if (needJoin) { 702 throw new IllegalStateException("Owner did not invoke join or joinUntil after fork"); 703 } 704 } 705 706 @Override 707 public String toString() { 708 StringBuilder sb = new StringBuilder(); 709 String name = flock.name(); 710 if (name != null) { 711 sb.append(name); 712 sb.append('/'); 713 } 714 sb.append(Objects.toIdentityString(this)); 715 int s = state; 716 if (s == CLOSED) 717 sb.append("/closed"); 718 else if (s == SHUTDOWN) 719 sb.append("/shutdown"); 720 return sb.toString(); 721 } 722 723 /** 724 * The Future implementation returned by the fork methods. Most methods are 725 * overridden to support cancellation when the task scope is shutdown. 726 * The blocking get methods register the Future with the task scope so that they 727 * are cancelled when the task scope shuts down. 728 */ 729 private static final class FutureImpl<V> extends FutureTask<V> { 730 private final StructuredTaskScope<V> scope; 731 732 @SuppressWarnings("unchecked") 733 FutureImpl(StructuredTaskScope<? super V> scope, Callable<? extends V> task) { 734 super((Callable<V>) task); 735 this.scope = (StructuredTaskScope<V>) scope; 736 } 737 738 @Override 739 protected void done() { 740 if (!scope.isShutdown()) { 741 scope.handleComplete(this); 742 } 743 } 744 745 private void cancelIfShutdown() { 746 if (scope.isShutdown() && !super.isDone()) { 747 super.cancel(false); 748 } 749 } 750 751 @Override 752 public boolean isDone() { 753 cancelIfShutdown(); 754 return super.isDone(); 755 } 756 757 @Override 758 public boolean isCancelled() { 759 cancelIfShutdown(); 760 return super.isCancelled(); 761 } 762 763 @Override 764 public boolean cancel(boolean mayInterruptIfRunning) { 765 scope.ensureOwnerOrContainsThread(); 766 cancelIfShutdown(); 767 return super.cancel(mayInterruptIfRunning); 768 } 769 770 @Override 771 public V get() throws InterruptedException, ExecutionException { 772 if (super.isDone()) 773 return super.get(); 774 scope.track(this); 775 try { 776 cancelIfShutdown(); 777 return super.get(); 778 } finally { 779 scope.untrack(this); 780 } 781 } 782 783 @Override 784 public V get(long timeout, TimeUnit unit) 785 throws InterruptedException, ExecutionException, TimeoutException { 786 Objects.requireNonNull(unit); 787 if (super.isDone()) 788 return super.get(); 789 scope.track(this); 790 try { 791 cancelIfShutdown(); 792 return super.get(timeout, unit); 793 } finally { 794 scope.untrack(this); 795 } 796 } 797 798 @Override 799 public V resultNow() { 800 cancelIfShutdown(); 801 return super.resultNow(); 802 } 803 804 @Override 805 public Throwable exceptionNow() { 806 cancelIfShutdown(); 807 return super.exceptionNow(); 808 } 809 810 @Override 811 public State state() { 812 cancelIfShutdown(); 813 return super.state(); 814 } 815 816 @Override 817 public String toString() { 818 cancelIfShutdown(); 819 return super.toString(); 820 } 821 } 822 823 /** 824 * Maps a Future.State to an int that can be compared. 825 * RUNNING < CANCELLED < FAILED < SUCCESS. 826 */ 827 private static int futureStateToInt(Future.State s) { 828 return switch (s) { 829 case RUNNING -> 0; 830 case CANCELLED -> 1; 831 case FAILED -> 2; 832 case SUCCESS -> 3; 833 }; 834 } 835 836 // RUNNING < CANCELLED < FAILED < SUCCESS 837 private static final Comparator<Future.State> FUTURE_STATE_COMPARATOR = 838 Comparator.comparingInt(StructuredTaskScope::futureStateToInt); 839 840 /** 841 * A {@code StructuredTaskScope} that captures the result of the first subtask to 842 * complete successfully. Once captured, it invokes the {@linkplain #shutdown() shutdown} 843 * method to interrupt unfinished threads and wakeup the owner. The policy 844 * implemented by this class is intended for cases where the result of any subtask 845 * will do ("invoke any") and where the results of other unfinished subtask are no 846 * longer needed. 847 * 848 * <p> Unless otherwise specified, passing a {@code null} argument to a method 849 * in this class will cause a {@link NullPointerException} to be thrown. 850 * 851 * @param <T> the result type 852 * @since 19 853 */ 854 public static final class ShutdownOnSuccess<T> extends StructuredTaskScope<T> { 855 private static final VarHandle FUTURE; 856 static { 857 try { 858 MethodHandles.Lookup l = MethodHandles.lookup(); 859 FUTURE = l.findVarHandle(ShutdownOnSuccess.class, "future", Future.class); 860 } catch (Exception e) { 861 throw new InternalError(e); 862 } 863 } 864 private volatile Future<T> future; 865 866 /** 867 * Constructs a new {@code ShutdownOnSuccess} with the given name and thread factory. 868 * The task scope is optionally named for the purposes of monitoring and management. 869 * The thread factory is used to {@link ThreadFactory#newThread(Runnable) create} 870 * threads when tasks are {@linkplain #fork(Callable) forked}. The task scope is 871 * owned by the current thread. 872 * 873 * <p> This method captures the current thread's {@linkplain ExtentLocal extent-local} 874 * bindings for inheritance by threads created in the task scope. The 875 * <a href="StructuredTaskScope.html#TreeStructure">Tree Structure</a> section in 876 * the class description details how parent-child relations are established 877 * implicitly for the purpose of inheritance of extent-local bindings. 878 * 879 * @param name the name of the task scope, can be null 880 * @param factory the thread factory 881 */ 882 public ShutdownOnSuccess(String name, ThreadFactory factory) { 883 super(name, factory); 884 } 885 886 /** 887 * Constructs a new unnamed {@code ShutdownOnSuccess} that creates virtual threads. 888 * 889 * <p> This constructor is equivalent to invoking the 2-arg constructor with a 890 * name of {@code null} and a thread factory that creates virtual threads. 891 */ 892 public ShutdownOnSuccess() { 893 super(null, Thread.ofVirtual().factory()); 894 } 895 896 /** 897 * Shut down the given task scope when invoked for the first time with a {@code 898 * Future} for a task that completed with a result. 899 * 900 * @param future the completed task 901 * @see #shutdown() 902 * @see Future.State#SUCCESS 903 */ 904 @Override 905 protected void handleComplete(Future<T> future) { 906 Future.State state = future.state(); 907 if (state == Future.State.RUNNING) { 908 throw new IllegalArgumentException("Task is not completed"); 909 } 910 911 Future<T> f; 912 while (((f = this.future) == null) 913 || FUTURE_STATE_COMPARATOR.compare(f.state(), state) < 0) { 914 if (FUTURE.compareAndSet(this, f, future)) { 915 if (state == Future.State.SUCCESS) 916 shutdown(); 917 break; 918 } 919 } 920 } 921 922 /** 923 * {@inheritDoc} 924 * @return this task scope 925 * @throws IllegalStateException {@inheritDoc} 926 * @throws WrongThreadException {@inheritDoc} 927 */ 928 @Override 929 public ShutdownOnSuccess<T> join() throws InterruptedException { 930 super.join(); 931 return this; 932 } 933 934 /** 935 * {@inheritDoc} 936 * @return this task scope 937 * @throws IllegalStateException {@inheritDoc} 938 * @throws WrongThreadException {@inheritDoc} 939 */ 940 @Override 941 public ShutdownOnSuccess<T> joinUntil(Instant deadline) 942 throws InterruptedException, TimeoutException 943 { 944 super.joinUntil(deadline); 945 return this; 946 } 947 948 /** 949 * {@return the result of the first subtask that completed with a result} 950 * 951 * <p> When no subtask completed with a result but a task completed with an 952 * exception then {@code ExecutionException} is thrown with the exception as the 953 * {@linkplain Throwable#getCause() cause}. If only cancelled subtasks were 954 * notified to the {@code handleComplete} method then {@code CancellationException} 955 * is thrown. 956 * 957 * @apiNote This method is intended to be invoked by the task scope owner after it 958 * has invoked {@link #join() join} (or {@link #joinUntil(Instant) joinUntil}). 959 * A future release may add enforcement to prevent the method being called by 960 * other threads or before joining. 961 * 962 * @throws ExecutionException if no subtasks completed with a result but a subtask 963 * completed with an exception 964 * @throws CancellationException if all subtasks were cancelled 965 * @throws IllegalStateException if the handle method was not invoked with a 966 * completed subtask 967 */ 968 public T result() throws ExecutionException { 969 Future<T> f = future; 970 if (f == null) { 971 throw new IllegalStateException("No completed subtasks"); 972 } 973 return switch (f.state()) { 974 case SUCCESS -> f.resultNow(); 975 case FAILED -> throw new ExecutionException(f.exceptionNow()); 976 case CANCELLED -> throw new CancellationException(); 977 default -> throw new InternalError("Unexpected state: " + f); 978 }; 979 980 } 981 982 /** 983 * Returns the result of the first subtask that completed with a result, otherwise 984 * throws an exception produced by the given exception supplying function. 985 * 986 * <p> When no subtask completed with a result but a subtask completed with an 987 * exception then the exception supplying function is invoked with the exception. 988 * If only cancelled subtasks were notified to the {@code handleComplete} method 989 * then the exception supplying function is invoked with a {@code CancellationException}. 990 * 991 * @apiNote This method is intended to be invoked by the task scope owner after it 992 * has invoked {@link #join() join} (or {@link #joinUntil(Instant) joinUntil}). 993 * A future release may add enforcement to prevent the method being called by 994 * other threads or before joining. 995 * 996 * @param esf the exception supplying function 997 * @param <X> type of the exception to be thrown 998 * @return the result of the first subtask that completed with a result 999 * @throws X if no subtask completed with a result 1000 * @throws IllegalStateException if the handle method was not invoked with a 1001 * completed subtask 1002 */ 1003 public <X extends Throwable> T result(Function<Throwable, ? extends X> esf) throws X { 1004 Objects.requireNonNull(esf); 1005 Future<T> f = future; 1006 if (f == null) { 1007 throw new IllegalStateException("No completed subtasks"); 1008 } 1009 Future.State state = f.state(); 1010 if (state == Future.State.SUCCESS) { 1011 return f.resultNow(); 1012 } else { 1013 Throwable throwable = (state == Future.State.FAILED) 1014 ? f.exceptionNow() 1015 : new CancellationException(); 1016 X ex = esf.apply(throwable); 1017 Objects.requireNonNull(ex, "esf returned null"); 1018 throw ex; 1019 } 1020 } 1021 } 1022 1023 /** 1024 * A {@code StructuredTaskScope} that captures the exception of the first subtask to 1025 * complete abnormally. Once captured, it invokes the {@linkplain #shutdown() shutdown} 1026 * method to interrupt unfinished threads and wakeup the owner. The policy implemented 1027 * by this class is intended for cases where the results for all subtasks are required 1028 * ("invoke all"); if any subtask fails then the results of other unfinished subtasks 1029 * are no longer needed. 1030 * 1031 * <p> Unless otherwise specified, passing a {@code null} argument to a method 1032 * in this class will cause a {@link NullPointerException} to be thrown. 1033 * 1034 * @since 19 1035 */ 1036 public static final class ShutdownOnFailure extends StructuredTaskScope<Object> { 1037 private static final VarHandle FUTURE; 1038 static { 1039 try { 1040 MethodHandles.Lookup l = MethodHandles.lookup(); 1041 FUTURE = l.findVarHandle(ShutdownOnFailure.class, "future", Future.class); 1042 } catch (Exception e) { 1043 throw new InternalError(e); 1044 } 1045 } 1046 private volatile Future<Object> future; 1047 1048 /** 1049 * Constructs a new {@code ShutdownOnFailure} with the given name and thread factory. 1050 * The task scope is optionally named for the purposes of monitoring and management. 1051 * The thread factory is used to {@link ThreadFactory#newThread(Runnable) create} 1052 * threads when tasks are {@linkplain #fork(Callable) forked}. The task scope 1053 * is owned by the current thread. 1054 * 1055 * <p> This method captures the current thread's {@linkplain ExtentLocal extent-local} 1056 * bindings for inheritance by threads created in the task scope. The 1057 * <a href="StructuredTaskScope.html#TreeStructure">Tree Structure</a> section in 1058 * the class description details how parent-child relations are established 1059 * implicitly for the purpose of inheritance of extent-local bindings. 1060 * 1061 * @param name the name of the task scope, can be null 1062 * @param factory the thread factory 1063 */ 1064 public ShutdownOnFailure(String name, ThreadFactory factory) { 1065 super(name, factory); 1066 } 1067 1068 /** 1069 * Constructs a new unnamed {@code ShutdownOnFailure} that creates virtual threads. 1070 * 1071 * <p> This constructor is equivalent to invoking the 2-arg constructor with a 1072 * name of {@code null} and a thread factory that creates virtual threads. 1073 */ 1074 public ShutdownOnFailure() { 1075 super(null, Thread.ofVirtual().factory()); 1076 } 1077 1078 /** 1079 * Shut down the given task scope when invoked for the first time with a {@code 1080 * Future} for a task that completed abnormally (exception or cancelled). 1081 * 1082 * @param future the completed task 1083 * @see #shutdown() 1084 * @see Future.State#FAILED 1085 * @see Future.State#CANCELLED 1086 */ 1087 @Override 1088 protected void handleComplete(Future<Object> future) { 1089 Future.State state = future.state(); 1090 if (state == Future.State.RUNNING) { 1091 throw new IllegalArgumentException("Task is not completed"); 1092 } else if (state == Future.State.SUCCESS) { 1093 return; 1094 } 1095 1096 // A failed task overrides a cancelled task. 1097 // The first failed or cancelled task causes the scope to shutdown. 1098 Future<Object> f; 1099 while (((f = this.future) == null) 1100 || FUTURE_STATE_COMPARATOR.compare(f.state(), state) < 0) { 1101 if (FUTURE.compareAndSet(this, f, future)) { 1102 shutdown(); 1103 break; 1104 } 1105 } 1106 } 1107 1108 /** 1109 * {@inheritDoc} 1110 * @return this task scope 1111 * @throws IllegalStateException {@inheritDoc} 1112 * @throws WrongThreadException {@inheritDoc} 1113 */ 1114 @Override 1115 public ShutdownOnFailure join() throws InterruptedException { 1116 super.join(); 1117 return this; 1118 } 1119 1120 /** 1121 * {@inheritDoc} 1122 * @return this task scope 1123 * @throws IllegalStateException {@inheritDoc} 1124 * @throws WrongThreadException {@inheritDoc} 1125 */ 1126 @Override 1127 public ShutdownOnFailure joinUntil(Instant deadline) 1128 throws InterruptedException, TimeoutException 1129 { 1130 super.joinUntil(deadline); 1131 return this; 1132 } 1133 1134 /** 1135 * Returns the exception for the first subtask that completed with an exception. 1136 * If no subtask completed with an exception but cancelled subtasks were notified 1137 * to the {@code handleComplete} method then a {@code CancellationException} 1138 * is returned. If no subtasks completed abnormally then an empty {@code Optional} 1139 * is returned. 1140 * 1141 * @apiNote This method is intended to be invoked by the task scope owner after it 1142 * has invoked {@link #join() join} (or {@link #joinUntil(Instant) joinUntil}). 1143 * A future release may add enforcement to prevent the method being called by 1144 * other threads or before joining. 1145 * 1146 * @return the exception for a subtask that completed abnormally or an empty 1147 * optional if no subtasks completed abnormally 1148 */ 1149 public Optional<Throwable> exception() { 1150 Future<Object> f = future; 1151 if (f != null) { 1152 Throwable throwable = (f.state() == Future.State.FAILED) 1153 ? f.exceptionNow() 1154 : new CancellationException(); 1155 return Optional.of(throwable); 1156 } else { 1157 return Optional.empty(); 1158 } 1159 } 1160 1161 /** 1162 * Throws if a subtask completed abnormally. If any subtask completed with an 1163 * exception then {@code ExecutionException} is thrown with the exception of the 1164 * first subtask to fail as the {@linkplain Throwable#getCause() cause}. If no 1165 * subtask completed with an exception but cancelled subtasks were notified to the 1166 * {@code handleComplete} method then {@code CancellationException} is thrown. 1167 * This method does nothing if no subtasks completed abnormally. 1168 * 1169 * @apiNote This method is intended to be invoked by the task scope owner after it 1170 * has invoked {@link #join() join} (or {@link #joinUntil(Instant) joinUntil}). 1171 * A future release may add enforcement to prevent the method being called by 1172 * other threads or before joining. 1173 * 1174 * @throws ExecutionException if a subtask completed with an exception 1175 * @throws CancellationException if no subtasks completed with an exception but 1176 * subtasks were cancelled 1177 */ 1178 public void throwIfFailed() throws ExecutionException { 1179 Future<Object> f = future; 1180 if (f != null) { 1181 if (f.state() == Future.State.FAILED) { 1182 throw new ExecutionException(f.exceptionNow()); 1183 } else { 1184 throw new CancellationException(); 1185 } 1186 } 1187 } 1188 1189 /** 1190 * Throws the exception produced by the given exception supplying function if 1191 * a subtask completed abnormally. If any subtask completed with an exception then 1192 * the function is invoked with the exception of the first subtask to fail. 1193 * If no subtask completed with an exception but cancelled subtasks were notified 1194 * to the {@code handleComplete} method then the function is called with a {@code 1195 * CancellationException}. The exception returned by the function is thrown. 1196 * This method does nothing if no subtasks completed abnormally. 1197 * 1198 * @apiNote This method is intended to be invoked by the task scope owner after it 1199 * has invoked {@link #join() join} (or {@link #joinUntil(Instant) joinUntil}). 1200 * A future release may add enforcement to prevent the method being called by 1201 * other threads or before joining. 1202 * 1203 * @param esf the exception supplying function 1204 * @param <X> type of the exception to be thrown 1205 * @throws X produced by the exception supplying function 1206 */ 1207 public <X extends Throwable> 1208 void throwIfFailed(Function<Throwable, ? extends X> esf) throws X { 1209 Objects.requireNonNull(esf); 1210 Future<Object> f = future; 1211 if (f != null) { 1212 Throwable throwable = (f.state() == Future.State.FAILED) 1213 ? f.exceptionNow() 1214 : new CancellationException(); 1215 X ex = esf.apply(throwable); 1216 Objects.requireNonNull(ex, "esf returned null"); 1217 throw ex; 1218 } 1219 } 1220 } 1221 }