1 /*
   2  * Copyright (c) 2021, 2022, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package jdk.incubator.concurrent;
  26 
  27 import java.lang.invoke.MethodHandles;
  28 import java.lang.invoke.VarHandle;
  29 import java.security.AccessController;
  30 import java.security.PrivilegedAction;
  31 import java.time.Duration;
  32 import java.time.Instant;
  33 import java.util.Comparator;
  34 import java.util.Objects;
  35 import java.util.Optional;
  36 import java.util.Set;
  37 import java.util.concurrent.Callable;
  38 import java.util.concurrent.CancellationException;
  39 import java.util.concurrent.ConcurrentHashMap;
  40 import java.util.concurrent.ExecutionException;
  41 import java.util.concurrent.Future;
  42 import java.util.concurrent.FutureTask;
  43 import java.util.concurrent.RejectedExecutionException;
  44 import java.util.concurrent.ThreadFactory;
  45 import java.util.concurrent.TimeoutException;
  46 import java.util.concurrent.TimeUnit;
  47 import java.util.concurrent.locks.ReentrantLock;
  48 import java.util.function.Function;
  49 import jdk.internal.misc.ThreadFlock;
  50 
  51 /**
  52  * A basic API for <em>structured concurrency</em>. {@code StructuredTaskScope} supports
  53  * cases where a task splits into several concurrent subtasks, to be executed in their
  54  * own threads, and where the subtasks must complete before the main task continues. A
  55  * {@code StructuredTaskScope} can be used to ensure that the lifetime of a concurrent
  56  * operation is confined by a <em>syntax block</em>, just like that of a sequential
  57  * operation in structured programming.
  58  *
  59  * <h2>Basic usage</h2>
  60  *
  61  * A {@code StructuredTaskScope} is created with one of its public constructors. It defines
  62  * the {@link #fork(Callable) fork} method to start a thread to execute a task, the {@link
  63  * #join() join} method to wait for all threads to finish, and the {@link #close() close}
  64  * method to close the task scope. The API is intended to be used with the {@code
  65  * try-with-resources} construct. The intention is that code in the <em>block</em> uses
  66  * the {@code fork} method to fork threads to execute the subtasks, wait for the threads
  67  * to finish with the {@code join} method, and then <em>process the results</em>.
  68  * Processing of results may include handling or re-throwing of exceptions.
  69  * {@snippet lang=java :
  70  *     try (var scope = new StructuredTaskScope<Object>()) {
  71  *
  72  *         Future<Integer> future1 = scope.fork(task1);   // @highlight substring="fork"
  73  *         Future<String> future2 = scope.fork(task2);    // @highlight substring="fork"
  74  *
  75  *         scope.join();                                  // @highlight substring="join"
  76  *
  77  *         ... process results/exceptions ...
  78  *
  79  *     } // close                                         // @highlight substring="close"
  80  * }
  81  * To ensure correct usage, the {@code join} and {@code close} methods may only be invoked
  82  * by the <em>owner</em> (the thread that opened/created the task scope}, and the
  83  * {@code close} method throws an exception after closing if the owner did not invoke the
  84  * {@code join} method after forking.
  85  *
  86  * <p> {@code StructuredTaskScope} defines the {@link #shutdown() shutdown} method to shut
  87  * down a task scope without closing it. Shutdown is useful for cases where a subtask
  88  * completes with a result (or exception) and the results of other unfinished subtasks are
  89  * no longer needed. If a subtask invokes {@code shutdown} while the owner is waiting in
  90  * the {@code join} method then it will cause {@code join} to wakeup, all unfinished
  91  * threads to be {@linkplain Thread#interrupt() interrupted} and prevents new threads
  92  * from starting in the task scope.
  93  *
  94  * <h2>Subclasses with policies for common cases</h2>
  95  *
  96  * Two subclasses of {@code StructuredTaskScope} are defined to implement policy for
  97  * common cases:
  98  * <ol>
  99  *   <li> {@link ShutdownOnSuccess ShutdownOnSuccess} captures the first result and
 100  *   shuts down the task scope to interrupt unfinished threads and wakeup the owner. This
 101  *   class is intended for cases where the result of any subtask will do ("invoke any")
 102  *   and where there is no need to wait for results of other unfinished tasks. It defines
 103  *   methods to get the first result or throw an exception if all subtasks fail.
 104  *   <li> {@link ShutdownOnFailure ShutdownOnFailure} captures the first exception and
 105  *   shuts down the task scope. This class is intended for cases where the results of all
 106  *   subtasks are required ("invoke all"); if any subtask fails then the results of other
 107  *   unfinished subtasks are no longer needed. If defines methods to throw an exception if
 108  *   any of the subtasks fail.
 109  * </ol>
 110  *
 111  * <p> The following are two examples that use the two classes. In both cases, a pair of
 112  * subtasks are forked to fetch resources from two URL locations "left" and "right". The
 113  * first example creates a ShutdownOnSuccess object to capture the result of the first
 114  * subtask to complete normally, cancelling the other by way of shutting down the task
 115  * scope. The main task waits in {@code join} until either subtask completes with a result
 116  * or both subtasks fail. It invokes {@link ShutdownOnSuccess#result(Function)
 117  * result(Function)} method to get the captured result. If both subtasks fail then this
 118  * method throws a {@code WebApplicationException} with the exception from one of the
 119  * subtasks as the cause.
 120  * {@snippet lang=java :
 121  *     try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
 122  *
 123  *         scope.fork(() -> fetch(left));
 124  *         scope.fork(() -> fetch(right));
 125  *
 126  *         scope.join();
 127  *
 128  *         // @link regex="result(?=\()" target="ShutdownOnSuccess#result" :
 129  *         String result = scope.result(e -> new WebApplicationException(e));
 130  *
 131  *         ...
 132  *     }
 133  * }
 134  * The second example creates a ShutdownOnFailure object to capture the exception of the
 135  * first subtask to fail, cancelling the other by way of shutting down the task scope. The
 136  * main task waits in {@link #joinUntil(Instant)} until both subtasks complete with a
 137  * result, either fails, or a deadline is reached. It invokes {@link
 138  * ShutdownOnFailure#throwIfFailed(Function) throwIfFailed(Function)} to throw an exception
 139  * when either subtask fails. This method is a no-op if no subtasks fail. The main task
 140  * uses {@code Future}'s {@link Future#resultNow() resultNow()} method to retrieve the
 141  * results.
 142  *
 143  * {@snippet lang=java :
 144  *    Instant deadline = ...
 145  *
 146  *    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
 147  *
 148  *         Future<String> future1 = scope.fork(() -> query(left));
 149  *         Future<String> future2 = scope.fork(() -> query(right));
 150  *
 151  *         scope.joinUntil(deadline);
 152  *
 153  *         // @link substring="throwIfFailed" target="ShutdownOnFailure#throwIfFailed" :
 154  *         scope.throwIfFailed(e -> new WebApplicationException(e));
 155  *
 156  *         // both subtasks completed successfully
 157  *         String result = Stream.of(future1, future2)
 158  *                 // @link substring="Future::resultNow" target="Future#resultNow" :
 159  *                 .map(Future::resultNow)
 160  *                 .collect(Collectors.joining(", ", "{ ", " }"));
 161  *
 162  *         ...
 163  *     }
 164  * }
 165  *
 166  * <h2>Extending StructuredTaskScope</h2>
 167  *
 168  * {@code StructuredTaskScope} can be extended, and the {@link #handleComplete(Future)
 169  * handleComplete} overridden, to implement policies other than those implemented by
 170  * {@code ShutdownOnSuccess} and {@code ShutdownOnFailure}. The method may be overridden
 171  * to, for example, collect the results of subtasks that complete with a result and ignore
 172  * subtasks that fail. It may collect exceptions when subtasks fail. It may invoke the
 173  * {@link #shutdown() shutdown} method to shut down and cause {@link #join() join} to
 174  * wakeup when some condition arises.
 175  *
 176  * <p> A subclass will typically define methods to make available results, state, or other
 177  * outcome to code that executes after the {@code join} method. A subclass that collects
 178  * results and ignores subtasks that fail may define a method that returns a collection of
 179  * results. A subclass that implements a policy to shut down when a subtask fails may
 180  * define a method to retrieve the exception of the first subtask to fail.
 181  *
 182  * <p> The following is an example of a {@code StructuredTaskScope} implementation that
 183  * collects the results of subtasks that complete successfully. It defines the method
 184  * <b>{@code results()}</b> to be used by the main task to retrieve the results.
 185  *
 186  * {@snippet lang=java :
 187  *     class MyScope<T> extends StructuredTaskScope<T> {
 188  *         private final Queue<T> results = new ConcurrentLinkedQueue<>();
 189  *
 190  *         MyScope() {
 191  *             super(null, Thread.ofVirtual().factory());
 192  *         }
 193  *
 194  *         @Override
 195  *         // @link substring="handleComplete" target="handleComplete" :
 196  *         protected void handleComplete(Future<T> future) {
 197  *             if (future.state() == Future.State.SUCCESS) {
 198  *                 T result = future.resultNow();
 199  *                 results.add(result);
 200  *             }
 201  *         }
 202  *
 203  *         // Returns a stream of results from the subtasks that completed successfully
 204  *         public Stream<T> results() {     // @highlight substring="results"
 205  *             return results.stream();
 206  *         }
 207  *     }
 208  *  }
 209  *
 210  * <h2><a id="TreeStructure">Tree structure</a></h2>
 211  *
 212  * Task scopes form a tree where parent-child relations are established implicitly when
 213  * opening a new task scope:
 214  * <ul>
 215  *   <li> A parent-child relation is established when a thread started in a task scope
 216  *   opens its own task scope. A thread started in task scope "A" that opens task scope
 217  *   "B" establishes a parent-child relation where task scope "A" is the parent of task
 218  *   scope "B".
 219  *   <li> A parent-child relation is established with nesting. If a thread opens task
 220  *   scope "B", then opens task scope "C" (before it closes "B"), then the enclosing task
 221  *   scope "B" is the parent of the nested task scope "C".
 222  * </ul>
 223  *
 224  * The <i>descendants</i> of a task scope are the child task scopes that it is a parent
 225  * of, plus the descendants of the child task scopes, recursively.
 226  *
 227  * <p> The tree structure supports:
 228  * <ul>
 229  *   <li> Inheritance of {@linkplain ScopedValue scoped values} across threads.
 230  *   <li> Confinement checks. The phrase "threads contained in the task scope" in method
 231  *   descriptions means threads started in the task scope or descendant scopes.
 232  * </ul>
 233  *
 234  * <p> The following example demonstrates the inheritance of a scoped value. A scoped
 235  * value {@code USERNAME} is bound to the value "{@code duke}". A {@code StructuredTaskScope}
 236  * is created and its {@code fork} method invoked to start a thread to execute {@code
 237  * childTask}. The thread inherits the scoped value <em>bindings</em> captured when
 238  * creating the task scope. The code in {@code childTask} uses the value of the scoped
 239  * value and so reads the value "{@code duke}".
 240  * {@snippet lang=java :
 241  *     private static final ScopedValue<String> USERNAME = ScopedValue.newInstance();
 242  *
 243  *     // @link substring="where" target="ScopedValue#where(ScopedValue, Object, Runnable)" :
 244  *     ScopedValue.where(USERNAME, "duke", () -> {
 245  *         try (var scope = new StructuredTaskScope<String>()) {
 246  *
 247  *             scope.fork(() -> childTask());           // @highlight substring="fork"
 248  *             ...
 249  *          }
 250  *     });
 251  *
 252  *     ...
 253  *
 254  *     String childTask() {
 255  *         // @link substring="get" target="ScopedValue#get()" :
 256  *         String name = USERNAME.get();   // "duke"
 257  *         ...
 258  *     }
 259  * }
 260  *
 261  * <p> {@code StructuredTaskScope} does not define APIs that exposes the tree structure
 262  * at this time.
 263  *
 264  * <p> Unless otherwise specified, passing a {@code null} argument to a constructor
 265  * or method in this class will cause a {@link NullPointerException} to be thrown.
 266  *
 267  * <h2>Memory consistency effects</h2>
 268  *
 269  * <p> Actions in the owner thread of, or a thread contained in, the task scope prior to
 270  * {@linkplain #fork forking} of a {@code Callable} task
 271  * <a href="{@docRoot}/java.base/java/util/concurrent/package-summary.html#MemoryVisibility">
 272  * <i>happen-before</i></a> any actions taken by that task, which in turn <i>happen-before</i>
 273  * the task result is retrieved via its {@code Future}, or <i>happen-before</i> any actions
 274  * taken in a thread after {@linkplain #join() joining} of the task scope.
 275  *
 276  * @jls 17.4.5 Happens-before Order
 277  *
 278  * @param <T> the result type of tasks executed in the scope
 279  * @since 19
 280  */
 281 public class StructuredTaskScope<T> implements AutoCloseable {
 282     private static final VarHandle FUTURES;
 283     static {
 284         try {
 285             MethodHandles.Lookup l = MethodHandles.lookup();
 286             FUTURES = l.findVarHandle(StructuredTaskScope.class, "futures", Set.class);
 287         } catch (Exception e) {
 288             throw new InternalError(e);
 289         }
 290     }
 291 
 292     private final ThreadFactory factory;
 293     private final ThreadFlock flock;
 294     private final ReentrantLock shutdownLock = new ReentrantLock();
 295 
 296     // lazily created set of Future objects with threads waiting in Future::get
 297     private volatile Set<Future<?>> futures;
 298 
 299     // set by owner when it forks, reset by owner when it joins
 300     private boolean needJoin;
 301 
 302     // states: OPEN -> SHUTDOWN -> CLOSED
 303     private static final int OPEN     = 0;   // initial state
 304     private static final int SHUTDOWN = 1;
 305     private static final int CLOSED   = 2;
 306 
 307     // scope state, set by owner, read by any thread
 308     private volatile int state;
 309 
 310     /**
 311      * Creates a structured task scope with the given name and thread factory. The task
 312      * scope is optionally named for the purposes of monitoring and management. The thread
 313      * factory is used to {@link ThreadFactory#newThread(Runnable) create} threads when
 314      * tasks are {@linkplain #fork(Callable) forked}. The task scope is owned by the
 315      * current thread.
 316      *
 317      * <p> This method captures the current thread's {@linkplain ScopedValue scoped value}
 318      * bindings for inheritance by threads created in the task scope. The
 319      * <a href="#TreeStructure">Tree Structure</a> section in the class description
 320      * details how parent-child relations are established implicitly for the purpose of
 321      * inheritance of scoped value bindings.
 322      *
 323      * @param name the name of the task scope, can be null
 324      * @param factory the thread factory
 325      */
 326     public StructuredTaskScope(String name, ThreadFactory factory) {
 327         this.factory = Objects.requireNonNull(factory, "'factory' is null");
 328         this.flock = ThreadFlock.open(name);
 329     }
 330 
 331     /**
 332      * Creates an unnamed structured task scope that creates virtual threads. The task
 333      * scope is owned by the current thread.
 334      *
 335      * <p> This constructor is equivalent to invoking the 2-arg constructor with a name
 336      * of {@code null} and a thread factory that creates virtual threads.
 337      */
 338     public StructuredTaskScope() {
 339         this.factory = Thread.ofVirtual().factory();
 340         this.flock = ThreadFlock.open(null);
 341     }
 342 
 343     /**
 344      * Throws WrongThreadException if the current thread is not the owner.
 345      */
 346     private void ensureOwner() {
 347         if (Thread.currentThread() != flock.owner())
 348             throw new WrongThreadException("Current thread not owner");
 349     }
 350 
 351     /**
 352      * Throws WrongThreadException if the current thread is not the owner
 353      * or a thread contained in the tree.
 354      */
 355     private void ensureOwnerOrContainsThread() {
 356         Thread currentThread = Thread.currentThread();
 357         if (currentThread != flock.owner() && !flock.containsThread(currentThread))
 358             throw new WrongThreadException("Current thread not owner or thread in the tree");
 359     }
 360 
 361     /**
 362      * Tests if the task scope is shutdown.
 363      */
 364     private boolean isShutdown() {
 365         return state >= SHUTDOWN;
 366     }
 367 
 368     /**
 369      * Track the given Future.
 370      */
 371     private void track(Future<?> future) {
 372         // create the set of Futures if not already created
 373         Set<Future<?>> futures = this.futures;
 374         if (futures == null) {
 375             futures = ConcurrentHashMap.newKeySet();
 376             if (!FUTURES.compareAndSet(this, null, futures)) {
 377                 // lost the race
 378                 futures = this.futures;
 379             }
 380         }
 381         futures.add(future);
 382     }
 383 
 384     /**
 385      * Stop tracking the Future.
 386      */
 387     private void untrack(Future<?> future) {
 388         assert futures != null;
 389         futures.remove(future);
 390     }
 391 
 392     /**
 393      * Invoked when a task completes before the scope is shut down.
 394      *
 395      * <p> The {@code handleComplete} method should be thread safe. It may be invoked by
 396      * several threads concurrently.
 397      *
 398      * @implSpec The default implementation does nothing.
 399      *
 400      * @param future the completed task
 401      */
 402     protected void handleComplete(Future<T> future) { }
 403 
 404     /**
 405      * Starts a new thread to run the given task.
 406      *
 407      * <p> The new thread is created with the task scope's {@link ThreadFactory}. It
 408      * inherits the current thread's {@linkplain ScopedValue scoped value} bindings. The
 409      * bindings must match the bindings captured when the task scope was created.
 410      *
 411      * <p> If the task completes before the task scope is {@link #shutdown() shutdown}
 412      * then the {@link #handleComplete(Future) handleComplete} method is invoked to
 413      * consume the completed task. The {@code handleComplete} method is run when the task
 414      * completes with a result or exception. If the {@code Future}'s {@link
 415      * Future#cancel(boolean) cancel} method is used to cancel a task before the task scope
 416      * is shut down, then the {@code handleComplete} method is run by the thread that
 417      * invokes {@code cancel}. If the task scope shuts down at or around the same time
 418      * that the task completes or is cancelled then the {@code handleComplete} method may
 419      * or may not be invoked.
 420      *
 421      * <p> If this task scope is {@linkplain #shutdown() shutdown} (or in the process
 422      * of shutting down) then {@code fork} returns a {@code Future} representing a {@link
 423      * Future.State#CANCELLED cancelled} task that was not run.
 424      *
 425      * <p> This method may only be invoked by the task scope owner or threads contained
 426      * in the task scope. The {@link Future#cancel(boolean) cancel} method of the returned
 427      * {@code Future} object is also restricted to the task scope owner or threads contained
 428      * in the task scope. The {@code cancel} method throws {@link WrongThreadException}
 429      * if invoked from another thread. All other methods on the returned {@code Future}
 430      * object, such as {@link Future#get() get}, are not restricted.
 431      *
 432      * @param task the task to run
 433      * @param <U> the result type
 434      * @return a future
 435      * @throws IllegalStateException if this task scope is closed
 436      * @throws WrongThreadException if the current thread is not the owner or a thread
 437      * contained in the task scope
 438      * @throws StructureViolationException if the current scoped value bindings are not
 439      * the same as when the task scope was created
 440      * @throws RejectedExecutionException if the thread factory rejected creating a
 441      * thread to run the task
 442      */
 443     public <U extends T> Future<U> fork(Callable<? extends U> task) {
 444         Objects.requireNonNull(task, "'task' is null");
 445 
 446         // create future
 447         var future = new FutureImpl<U>(this, task);
 448 
 449         boolean shutdown = (state >= SHUTDOWN);
 450 
 451         if (!shutdown) {
 452             // create thread
 453             Thread thread = factory.newThread(future);
 454             if (thread == null) {
 455                 throw new RejectedExecutionException("Rejected by thread factory");
 456             }
 457 
 458             // attempt to start the thread
 459             try {
 460                 flock.start(thread);
 461             } catch (IllegalStateException e) {
 462                 // shutdown or in the process of shutting down
 463                 shutdown = true;
 464             }
 465         }
 466 
 467         if (shutdown) {
 468             if (state == CLOSED) {
 469                 throw new IllegalStateException("Task scope is closed");
 470             } else {
 471                 future.cancel(false);
 472             }
 473         }
 474 
 475         // if owner forks then it will need to join
 476         if (Thread.currentThread() == flock.owner() && !needJoin) {
 477             needJoin = true;
 478         }
 479 
 480         return future;
 481     }
 482 
 483     /**
 484      * Wait for all threads to finish or the task scope to shut down.
 485      */
 486     private void implJoin(Duration timeout)
 487         throws InterruptedException, TimeoutException
 488     {
 489         ensureOwner();
 490         needJoin = false;
 491         int s = state;
 492         if (s >= SHUTDOWN) {
 493             if (s == CLOSED)
 494                 throw new IllegalStateException("Task scope is closed");
 495             return;
 496         }
 497 
 498         // wait for all threads, wakeup, interrupt, or timeout
 499         if (timeout != null) {
 500             flock.awaitAll(timeout);
 501         } else {
 502             flock.awaitAll();
 503         }
 504     }
 505 
 506     /**
 507      * Wait for all threads to finish or the task scope to shut down. This method waits
 508      * until all threads started in the task scope finish execution (of both task and
 509      * {@link #handleComplete(Future) handleComplete} method), the {@link #shutdown()
 510      * shutdown} method is invoked to shut down the task scope, or the current thread
 511      * is {@linkplain Thread#interrupt() interrupted}.
 512      *
 513      * <p> This method may only be invoked by the task scope owner.
 514      *
 515      * @return this task scope
 516      * @throws IllegalStateException if this task scope is closed
 517      * @throws WrongThreadException if the current thread is not the owner
 518      * @throws InterruptedException if interrupted while waiting
 519      */
 520     public StructuredTaskScope<T> join() throws InterruptedException {
 521         try {
 522             implJoin(null);
 523         } catch (TimeoutException e) {
 524             throw new InternalError();
 525         }
 526         return this;
 527     }
 528 
 529     /**
 530      * Wait for all threads to finish or the task scope to shut down, up to the given
 531      * deadline. This method waits until all threads started in the task scope finish
 532      * execution (of both task and {@link #handleComplete(Future) handleComplete} method),
 533      * the {@link #shutdown() shutdown} method is invoked to shut down the task scope,
 534      * the current thread is {@linkplain Thread#interrupt() interrupted}, or the deadline
 535      * is reached.
 536      *
 537      * <p> This method may only be invoked by the task scope owner.
 538      *
 539      * @param deadline the deadline
 540      * @return this task scope
 541      * @throws IllegalStateException if this task scope is closed
 542      * @throws WrongThreadException if the current thread is not the owner
 543      * @throws InterruptedException if interrupted while waiting
 544      * @throws TimeoutException if the deadline is reached while waiting
 545      */
 546     public StructuredTaskScope<T> joinUntil(Instant deadline)
 547         throws InterruptedException, TimeoutException
 548     {
 549         Duration timeout = Duration.between(Instant.now(), deadline);
 550         implJoin(timeout);
 551         return this;
 552     }
 553 
 554     /**
 555      * Cancel all tracked Future objects.
 556      */
 557     private void cancelTrackedFutures() {
 558         Set<Future<?>> futures = this.futures;
 559         if (futures != null) {
 560             futures.forEach(f -> f.cancel(false));
 561         }
 562     }
 563 
 564     /**
 565      * Interrupt all unfinished threads.
 566      */
 567     private void implInterruptAll() {
 568         flock.threads().forEach(t -> {
 569             if (t != Thread.currentThread()) {
 570                 t.interrupt();
 571             }
 572         });
 573     }
 574 
 575     @SuppressWarnings("removal")
 576     private void interruptAll() {
 577         if (System.getSecurityManager() == null) {
 578             implInterruptAll();
 579         } else {
 580             PrivilegedAction<Void> pa = () -> {
 581                 implInterruptAll();
 582                 return null;
 583             };
 584             AccessController.doPrivileged(pa);
 585         }
 586     }
 587 
 588     /**
 589      * Shutdown the task scope if not already shutdown. Return true if this method
 590      * shutdowns the task scope, false if already shutdown.
 591      */
 592     private boolean implShutdown() {
 593         if (state < SHUTDOWN) {
 594             shutdownLock.lock();
 595             try {
 596                 if (state < SHUTDOWN) {
 597 
 598                     // prevent new threads from starting
 599                     flock.shutdown();
 600 
 601                     // wakeup any threads waiting in Future::get
 602                     cancelTrackedFutures();
 603 
 604                     // interrupt all unfinished threads
 605                     interruptAll();
 606 
 607                     state = SHUTDOWN;
 608                     return true;
 609                 }
 610             } finally {
 611                 shutdownLock.unlock();
 612             }
 613         }
 614         assert state >= SHUTDOWN;
 615         return false;
 616     }
 617 
 618     /**
 619      * Shut down the task scope without closing it. Shutting down a task scope prevents
 620      * new threads from starting, interrupts all unfinished threads, and causes the
 621      * {@link #join() join} method to wakeup. Shutdown is useful for cases where the
 622      * results of unfinished subtasks are no longer needed.
 623      *
 624      * <p> More specifically, this method:
 625      * <ul>
 626      * <li> {@linkplain Future#cancel(boolean) Cancels} the tasks that have threads
 627      * {@linkplain Future#get() waiting} on a result so that the waiting threads wakeup.
 628      * <li> {@linkplain Thread#interrupt() Interrupts} all unfinished threads in the
 629      * task scope (except the current thread).
 630      * <li> Wakes up the owner if it is waiting in {@link #join()} or {@link
 631      * #joinUntil(Instant)}. If the owner is not waiting then its next call to {@code
 632      * join} or {@code joinUntil} will return immediately.
 633      * </ul>
 634      *
 635      * <p> When this method completes then the {@code Future} objects for all tasks will
 636      * be {@linkplain Future#isDone() done}, normally or abnormally. There may still
 637      * be threads that have not finished because they are executing code that did not
 638      * respond (or respond promptly) to thread interrupt. This method does not wait
 639      * for these threads. When the owner invokes the {@link #close() close} method
 640      * to close the task scope then it will wait for the remaining threads to finish.
 641      *
 642      * <p> This method may only be invoked by the task scope owner or threads contained
 643      * in the task scope.
 644      *
 645      * @throws IllegalStateException if this task scope is closed
 646      * @throws WrongThreadException if the current thread is not the owner or
 647      * a thread contained in the task scope
 648      */
 649     public void shutdown() {
 650         ensureOwnerOrContainsThread();
 651         if (state == CLOSED)
 652             throw new IllegalStateException("Task scope is closed");
 653         if (implShutdown())
 654             flock.wakeup();
 655     }
 656 
 657     /**
 658      * Closes this task scope.
 659      *
 660      * <p> This method first shuts down the task scope (as if by invoking the {@link
 661      * #shutdown() shutdown} method). It then waits for the threads executing any
 662      * unfinished tasks to finish. If interrupted then this method will continue to
 663      * wait for the threads to finish before completing with the interrupt status set.
 664      *
 665      * <p> This method may only be invoked by the task scope owner. If the task scope
 666      * is already closed then the owner invoking this method has no effect.
 667      *
 668      * <p> A {@code StructuredTaskScope} is intended to be used in a <em>structured
 669      * manner</em>. If this method is called to close a task scope before nested task
 670      * scopes are closed then it closes the underlying construct of each nested task scope
 671      * (in the reverse order that they were created in), closes this task scope, and then
 672      * throws {@link StructureViolationException}.
 673      *
 674      * Similarly, if this method is called to close a task scope while executing with
 675      * {@linkplain ScopedValue scoped value} bindings, and the task scope was created
 676      * before the scoped values were bound, then {@code StructureViolationException} is
 677      * thrown after closing the task scope.
 678      *
 679      * If a thread terminates without first closing task scopes that it owns then
 680      * termination will cause the underlying construct of each of its open tasks scopes to
 681      * be closed. Closing is performed in the reverse order that the task scopes were
 682      * created in. Thread termination may therefore be delayed when the owner has to wait
 683      * for threads forked in these task scopes to finish.
 684      *
 685      * @throws IllegalStateException thrown after closing the task scope if the owner
 686      * did not invoke join after forking
 687      * @throws WrongThreadException if the current thread is not the owner
 688      * @throws StructureViolationException if a structure violation was detected
 689      */
 690     @Override
 691     public void close() {
 692         ensureOwner();
 693         if (state == CLOSED)
 694             return;
 695 
 696         try {
 697             implShutdown();
 698             flock.close();
 699         } finally {
 700             state = CLOSED;
 701         }
 702 
 703         if (needJoin) {
 704             throw new IllegalStateException("Owner did not invoke join or joinUntil after fork");
 705         }
 706     }
 707 
 708     @Override
 709     public String toString() {
 710         StringBuilder sb = new StringBuilder();
 711         String name = flock.name();
 712         if (name != null) {
 713             sb.append(name);
 714             sb.append('/');
 715         }
 716         sb.append(Objects.toIdentityString(this));
 717         int s = state;
 718         if (s == CLOSED)
 719             sb.append("/closed");
 720         else if (s == SHUTDOWN)
 721             sb.append("/shutdown");
 722         return sb.toString();
 723     }
 724 
 725     /**
 726      * The Future implementation returned by the fork methods. Most methods are
 727      * overridden to support cancellation when the task scope is shutdown.
 728      * The blocking get methods register the Future with the task scope so that they
 729      * are cancelled when the task scope shuts down.
 730      */
 731     private static final class FutureImpl<V> extends FutureTask<V> {
 732         private final StructuredTaskScope<V> scope;
 733 
 734         @SuppressWarnings("unchecked")
 735         FutureImpl(StructuredTaskScope<? super V> scope, Callable<? extends V> task) {
 736             super((Callable<V>) task);
 737             this.scope = (StructuredTaskScope<V>) scope;
 738         }
 739 
 740         @Override
 741         protected void done() {
 742             if (!scope.isShutdown()) {
 743                 scope.handleComplete(this);
 744             }
 745         }
 746 
 747         private void cancelIfShutdown() {
 748             if (scope.isShutdown() && !super.isDone()) {
 749                 super.cancel(false);
 750             }
 751         }
 752 
 753         @Override
 754         public boolean isDone() {
 755             cancelIfShutdown();
 756             return super.isDone();
 757         }
 758 
 759         @Override
 760         public boolean isCancelled() {
 761             cancelIfShutdown();
 762             return super.isCancelled();
 763         }
 764 
 765         @Override
 766         public boolean cancel(boolean mayInterruptIfRunning) {
 767             scope.ensureOwnerOrContainsThread();
 768             cancelIfShutdown();
 769             return super.cancel(mayInterruptIfRunning);
 770         }
 771 
 772         @Override
 773         public V get() throws InterruptedException, ExecutionException {
 774             if (super.isDone())
 775                 return super.get();
 776             scope.track(this);
 777             try {
 778                 cancelIfShutdown();
 779                 return super.get();
 780             } finally {
 781                 scope.untrack(this);
 782             }
 783         }
 784 
 785         @Override
 786         public V get(long timeout, TimeUnit unit)
 787                 throws InterruptedException, ExecutionException, TimeoutException {
 788             Objects.requireNonNull(unit);
 789             if (super.isDone())
 790                 return super.get();
 791             scope.track(this);
 792             try {
 793                 cancelIfShutdown();
 794                 return super.get(timeout, unit);
 795             } finally {
 796                 scope.untrack(this);
 797             }
 798         }
 799 
 800         @Override
 801         public V resultNow() {
 802             cancelIfShutdown();
 803             return super.resultNow();
 804         }
 805 
 806         @Override
 807         public Throwable exceptionNow() {
 808             cancelIfShutdown();
 809             return super.exceptionNow();
 810         }
 811 
 812         @Override
 813         public State state() {
 814             cancelIfShutdown();
 815             return super.state();
 816         }
 817 
 818         @Override
 819         public String toString() {
 820             cancelIfShutdown();
 821             return super.toString();
 822         }
 823     }
 824 
 825     /**
 826      * Maps a Future.State to an int that can be compared.
 827      * RUNNING < CANCELLED < FAILED < SUCCESS.
 828      */
 829     private static int futureStateToInt(Future.State s) {
 830         return switch (s) {
 831             case RUNNING   -> 0;
 832             case CANCELLED -> 1;
 833             case FAILED    -> 2;
 834             case SUCCESS   -> 3;
 835         };
 836     }
 837 
 838     // RUNNING < CANCELLED < FAILED < SUCCESS
 839     private static final Comparator<Future.State> FUTURE_STATE_COMPARATOR =
 840             Comparator.comparingInt(StructuredTaskScope::futureStateToInt);
 841 
 842     /**
 843      * A {@code StructuredTaskScope} that captures the result of the first subtask to
 844      * complete successfully. Once captured, it invokes the {@linkplain #shutdown() shutdown}
 845      * method to interrupt unfinished threads and wakeup the owner. The policy
 846      * implemented by this class is intended for cases where the result of any subtask
 847      * will do ("invoke any") and where the results of other unfinished subtask are no
 848      * longer needed.
 849      *
 850      * <p> Unless otherwise specified, passing a {@code null} argument to a method
 851      * in this class will cause a {@link NullPointerException} to be thrown.
 852      *
 853      * @param <T> the result type
 854      * @since 19
 855      */
 856     public static final class ShutdownOnSuccess<T> extends StructuredTaskScope<T> {
 857         private static final VarHandle FUTURE;
 858         static {
 859             try {
 860                 MethodHandles.Lookup l = MethodHandles.lookup();
 861                 FUTURE = l.findVarHandle(ShutdownOnSuccess.class, "future", Future.class);
 862             } catch (Exception e) {
 863                 throw new InternalError(e);
 864             }
 865         }
 866         private volatile Future<T> future;
 867 
 868         /**
 869          * Constructs a new {@code ShutdownOnSuccess} with the given name and thread factory.
 870          * The task scope is optionally named for the purposes of monitoring and management.
 871          * The thread factory is used to {@link ThreadFactory#newThread(Runnable) create}
 872          * threads when tasks are {@linkplain #fork(Callable) forked}. The task scope is
 873          * owned by the current thread.
 874          *
 875          * <p> This method captures the current thread's {@linkplain ScopedValue scoped value}
 876          * bindings for inheritance by threads created in the task scope. The
 877          * <a href="StructuredTaskScope.html#TreeStructure">Tree Structure</a> section in
 878          * the class description details how parent-child relations are established
 879          * implicitly for the purpose of inheritance of scoped value bindings.
 880          *
 881          * @param name the name of the task scope, can be null
 882          * @param factory the thread factory
 883          */
 884         public ShutdownOnSuccess(String name, ThreadFactory factory) {
 885             super(name, factory);
 886         }
 887 
 888         /**
 889          * Constructs a new unnamed {@code ShutdownOnSuccess} that creates virtual threads.
 890          *
 891          * <p> This constructor is equivalent to invoking the 2-arg constructor with a
 892          * name of {@code null} and a thread factory that creates virtual threads.
 893          */
 894         public ShutdownOnSuccess() {
 895             super(null, Thread.ofVirtual().factory());
 896         }
 897 
 898         /**
 899          * Shut down the given task scope when invoked for the first time with a {@code
 900          * Future} for a task that completed with a result.
 901          *
 902          * @param future the completed task
 903          * @see #shutdown()
 904          * @see Future.State#SUCCESS
 905          */
 906         @Override
 907         protected void handleComplete(Future<T> future) {
 908             Future.State state = future.state();
 909             if (state == Future.State.RUNNING) {
 910                 throw new IllegalArgumentException("Task is not completed");
 911             }
 912 
 913             Future<T> f;
 914             while (((f = this.future) == null)
 915                     || FUTURE_STATE_COMPARATOR.compare(f.state(), state) < 0) {
 916                 if (FUTURE.compareAndSet(this, f, future)) {
 917                     if (state == Future.State.SUCCESS)
 918                         shutdown();
 919                     break;
 920                 }
 921             }
 922         }
 923 
 924         /**
 925          * {@inheritDoc}
 926          * @return this task scope
 927          * @throws IllegalStateException {@inheritDoc}
 928          * @throws WrongThreadException {@inheritDoc}
 929          */
 930         @Override
 931         public ShutdownOnSuccess<T> join() throws InterruptedException {
 932             super.join();
 933             return this;
 934         }
 935 
 936         /**
 937          * {@inheritDoc}
 938          * @return this task scope
 939          * @throws IllegalStateException {@inheritDoc}
 940          * @throws WrongThreadException {@inheritDoc}
 941          */
 942         @Override
 943         public ShutdownOnSuccess<T> joinUntil(Instant deadline)
 944             throws InterruptedException, TimeoutException
 945         {
 946             super.joinUntil(deadline);
 947             return this;
 948         }
 949 
 950         /**
 951          * {@return the result of the first subtask that completed with a result}
 952          *
 953          * <p> When no subtask completed with a result but a task completed with an
 954          * exception then {@code ExecutionException} is thrown with the exception as the
 955          * {@linkplain Throwable#getCause() cause}. If only cancelled subtasks were
 956          * notified to the {@code handleComplete} method then {@code CancellationException}
 957          * is thrown.
 958          *
 959          * @apiNote This method is intended to be invoked by the task scope owner after it
 960          * has invoked {@link #join() join} (or {@link #joinUntil(Instant) joinUntil}).
 961          * A future release may add enforcement to prevent the method being called by
 962          * other threads or before joining.
 963          *
 964          * @throws ExecutionException if no subtasks completed with a result but a subtask
 965          * completed with an exception
 966          * @throws CancellationException if all subtasks were cancelled
 967          * @throws IllegalStateException if the handle method was not invoked with a
 968          * completed subtask
 969          */
 970         public T result() throws ExecutionException {
 971             Future<T> f = future;
 972             if (f == null) {
 973                 throw new IllegalStateException("No completed subtasks");
 974             }
 975             return switch (f.state()) {
 976                 case SUCCESS   -> f.resultNow();
 977                 case FAILED    -> throw new ExecutionException(f.exceptionNow());
 978                 case CANCELLED -> throw new CancellationException();
 979                 default        -> throw new InternalError("Unexpected state: " + f);
 980             };
 981 
 982         }
 983 
 984         /**
 985          * Returns the result of the first subtask that completed with a result, otherwise
 986          * throws an exception produced by the given exception supplying function.
 987          *
 988          * <p> When no subtask completed with a result but a subtask completed with an
 989          * exception then the exception supplying function is invoked with the exception.
 990          * If only cancelled subtasks were notified to the {@code handleComplete} method
 991          * then the exception supplying function is invoked with a {@code CancellationException}.
 992          *
 993          * @apiNote This method is intended to be invoked by the task scope owner after it
 994          * has invoked {@link #join() join} (or {@link #joinUntil(Instant) joinUntil}).
 995          * A future release may add enforcement to prevent the method being called by
 996          * other threads or before joining.
 997          *
 998          * @param esf the exception supplying function
 999          * @param <X> type of the exception to be thrown
1000          * @return the result of the first subtask that completed with a result
1001          * @throws X if no subtask completed with a result
1002          * @throws IllegalStateException if the handle method was not invoked with a
1003          * completed subtask
1004          */
1005         public <X extends Throwable> T result(Function<Throwable, ? extends X> esf) throws X {
1006             Objects.requireNonNull(esf);
1007             Future<T> f = future;
1008             if (f == null) {
1009                 throw new IllegalStateException("No completed subtasks");
1010             }
1011             Future.State state = f.state();
1012             if (state == Future.State.SUCCESS) {
1013                 return f.resultNow();
1014             } else {
1015                 Throwable throwable = (state == Future.State.FAILED)
1016                         ? f.exceptionNow()
1017                         : new CancellationException();
1018                 X ex = esf.apply(throwable);
1019                 Objects.requireNonNull(ex, "esf returned null");
1020                 throw ex;
1021             }
1022         }
1023     }
1024 
1025     /**
1026      * A {@code StructuredTaskScope} that captures the exception of the first subtask to
1027      * complete abnormally. Once captured, it invokes the {@linkplain #shutdown() shutdown}
1028      * method to interrupt unfinished threads and wakeup the owner. The policy implemented
1029      * by this class is intended for cases where the results for all subtasks are required
1030      * ("invoke all"); if any subtask fails then the results of other unfinished subtasks
1031      * are no longer needed.
1032      *
1033      * <p> Unless otherwise specified, passing a {@code null} argument to a method
1034      * in this class will cause a {@link NullPointerException} to be thrown.
1035      *
1036      * @since 19
1037      */
1038     public static final class ShutdownOnFailure extends StructuredTaskScope<Object> {
1039         private static final VarHandle FUTURE;
1040         static {
1041             try {
1042                 MethodHandles.Lookup l = MethodHandles.lookup();
1043                 FUTURE = l.findVarHandle(ShutdownOnFailure.class, "future", Future.class);
1044             } catch (Exception e) {
1045                 throw new InternalError(e);
1046             }
1047         }
1048         private volatile Future<Object> future;
1049 
1050         /**
1051          * Constructs a new {@code ShutdownOnFailure} with the given name and thread factory.
1052          * The task scope is optionally named for the purposes of monitoring and management.
1053          * The thread factory is used to {@link ThreadFactory#newThread(Runnable) create}
1054          * threads when tasks are {@linkplain #fork(Callable) forked}. The task scope
1055          * is owned by the current thread.
1056          *
1057          * <p> This method captures the current thread's {@linkplain ScopedValue scoped value}
1058          * bindings for inheritance by threads created in the task scope. The
1059          * <a href="StructuredTaskScope.html#TreeStructure">Tree Structure</a> section in
1060          * the class description details how parent-child relations are established
1061          * implicitly for the purpose of inheritance of scoped value bindings.
1062          *
1063          * @param name the name of the task scope, can be null
1064          * @param factory the thread factory
1065          */
1066         public ShutdownOnFailure(String name, ThreadFactory factory) {
1067             super(name, factory);
1068         }
1069 
1070         /**
1071          * Constructs a new unnamed {@code ShutdownOnFailure} that creates virtual threads.
1072          *
1073          * <p> This constructor is equivalent to invoking the 2-arg constructor with a
1074          * name of {@code null} and a thread factory that creates virtual threads.
1075          */
1076         public ShutdownOnFailure() {
1077             super(null, Thread.ofVirtual().factory());
1078         }
1079 
1080         /**
1081          * Shut down the given task scope when invoked for the first time with a {@code
1082          * Future} for a task that completed abnormally (exception or cancelled).
1083          *
1084          * @param future the completed task
1085          * @see #shutdown()
1086          * @see Future.State#FAILED
1087          * @see Future.State#CANCELLED
1088          */
1089         @Override
1090         protected void handleComplete(Future<Object> future) {
1091             Future.State state = future.state();
1092             if (state == Future.State.RUNNING) {
1093                 throw new IllegalArgumentException("Task is not completed");
1094             } else if (state == Future.State.SUCCESS) {
1095                 return;
1096             }
1097 
1098             // A failed task overrides a cancelled task.
1099             // The first failed or cancelled task causes the scope to shutdown.
1100             Future<Object> f;
1101             while (((f = this.future) == null)
1102                     || FUTURE_STATE_COMPARATOR.compare(f.state(), state) < 0) {
1103                 if (FUTURE.compareAndSet(this, f, future)) {
1104                     shutdown();
1105                     break;
1106                 }
1107             }
1108         }
1109 
1110         /**
1111          * {@inheritDoc}
1112          * @return this task scope
1113          * @throws IllegalStateException {@inheritDoc}
1114          * @throws WrongThreadException {@inheritDoc}
1115          */
1116         @Override
1117         public ShutdownOnFailure join() throws InterruptedException {
1118             super.join();
1119             return this;
1120         }
1121 
1122         /**
1123          * {@inheritDoc}
1124          * @return this task scope
1125          * @throws IllegalStateException {@inheritDoc}
1126          * @throws WrongThreadException {@inheritDoc}
1127          */
1128         @Override
1129         public ShutdownOnFailure joinUntil(Instant deadline)
1130             throws InterruptedException, TimeoutException
1131         {
1132             super.joinUntil(deadline);
1133             return this;
1134         }
1135 
1136         /**
1137          * Returns the exception for the first subtask that completed with an exception.
1138          * If no subtask completed with an exception but cancelled subtasks were notified
1139          * to the {@code handleComplete} method then a {@code CancellationException}
1140          * is returned. If no subtasks completed abnormally then an empty {@code Optional}
1141          * is returned.
1142          *
1143          * @apiNote This method is intended to be invoked by the task scope owner after it
1144          * has invoked {@link #join() join} (or {@link #joinUntil(Instant) joinUntil}).
1145          * A future release may add enforcement to prevent the method being called by
1146          * other threads or before joining.
1147          *
1148          * @return the exception for a subtask that completed abnormally or an empty
1149          * optional if no subtasks completed abnormally
1150          */
1151         public Optional<Throwable> exception() {
1152             Future<Object> f = future;
1153             if (f != null) {
1154                 Throwable throwable = (f.state() == Future.State.FAILED)
1155                         ? f.exceptionNow()
1156                         : new CancellationException();
1157                 return Optional.of(throwable);
1158             } else {
1159                 return Optional.empty();
1160             }
1161         }
1162 
1163         /**
1164          * Throws if a subtask completed abnormally. If any subtask completed with an
1165          * exception then {@code ExecutionException} is thrown with the exception of the
1166          * first subtask to fail as the {@linkplain Throwable#getCause() cause}. If no
1167          * subtask completed with an exception but cancelled subtasks were notified to the
1168          * {@code handleComplete} method then {@code CancellationException} is thrown.
1169          * This method does nothing if no subtasks completed abnormally.
1170          *
1171          * @apiNote This method is intended to be invoked by the task scope owner after it
1172          * has invoked {@link #join() join} (or {@link #joinUntil(Instant) joinUntil}).
1173          * A future release may add enforcement to prevent the method being called by
1174          * other threads or before joining.
1175          *
1176          * @throws ExecutionException if a subtask completed with an exception
1177          * @throws CancellationException if no subtasks completed with an exception but
1178          * subtasks were cancelled
1179          */
1180         public void throwIfFailed() throws ExecutionException {
1181             Future<Object> f = future;
1182             if (f != null) {
1183                 if (f.state() == Future.State.FAILED) {
1184                     throw new ExecutionException(f.exceptionNow());
1185                 } else {
1186                     throw new CancellationException();
1187                 }
1188             }
1189         }
1190 
1191         /**
1192          * Throws the exception produced by the given exception supplying function if
1193          * a subtask completed abnormally. If any subtask completed with an exception then
1194          * the function is invoked with the exception of the first subtask to fail.
1195          * If no subtask completed with an exception but cancelled subtasks were notified
1196          * to the {@code handleComplete} method then the function is called with a {@code
1197          * CancellationException}. The exception returned by the function is thrown.
1198          * This method does nothing if no subtasks completed abnormally.
1199          *
1200          * @apiNote This method is intended to be invoked by the task scope owner after it
1201          * has invoked {@link #join() join} (or {@link #joinUntil(Instant) joinUntil}).
1202          * A future release may add enforcement to prevent the method being called by
1203          * other threads or before joining.
1204          *
1205          * @param esf the exception supplying function
1206          * @param <X> type of the exception to be thrown
1207          * @throws X produced by the exception supplying function
1208          */
1209         public <X extends Throwable>
1210         void throwIfFailed(Function<Throwable, ? extends X> esf) throws X {
1211             Objects.requireNonNull(esf);
1212             Future<Object> f = future;
1213             if (f != null) {
1214                 Throwable throwable = (f.state() == Future.State.FAILED)
1215                         ? f.exceptionNow()
1216                         : new CancellationException();
1217                 X ex = esf.apply(throwable);
1218                 Objects.requireNonNull(ex, "esf returned null");
1219                 throw ex;
1220             }
1221         }
1222     }
1223 }