1 /*
   2  * Copyright (c) 2021, 2022, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package jdk.incubator.concurrent;
  26 
  27 import java.lang.invoke.MethodHandles;
  28 import java.lang.invoke.VarHandle;
  29 import java.security.AccessController;
  30 import java.security.PrivilegedAction;
  31 import java.time.Duration;
  32 import java.time.Instant;
  33 import java.util.Comparator;
  34 import java.util.Objects;
  35 import java.util.Optional;
  36 import java.util.Set;
  37 import java.util.concurrent.Callable;
  38 import java.util.concurrent.CancellationException;
  39 import java.util.concurrent.ConcurrentHashMap;
  40 import java.util.concurrent.ExecutionException;
  41 import java.util.concurrent.Future;
  42 import java.util.concurrent.FutureTask;
  43 import java.util.concurrent.RejectedExecutionException;
  44 import java.util.concurrent.ThreadFactory;
  45 import java.util.concurrent.TimeoutException;
  46 import java.util.concurrent.TimeUnit;
  47 import java.util.concurrent.locks.ReentrantLock;
  48 import java.util.function.Function;
  49 import jdk.internal.misc.PreviewFeatures;
  50 import jdk.internal.misc.ThreadFlock;
  51 
  52 /**
  53  * A basic API for <em>structured concurrency</em>. {@code StructuredTaskScope} supports
  54  * cases where a task splits into several concurrent subtasks, to be executed in their
  55  * own threads, and where the subtasks must complete before the main task continues. A
  56  * {@code StructuredTaskScope} can be used to ensure that the lifetime of a concurrent
  57  * operation is confined by a <em>syntax block</em>, just like that of a sequential
  58  * operation in structured programming.
  59  *
  60  * <h2>Basic usage</h2>
  61  *
  62  * A {@code StructuredTaskScope} is created with one of its public constructors. It defines
  63  * the {@link #fork(Callable) fork} method to start a thread to execute a task, the {@link
  64  * #join() join} method to wait for all threads to finish, and the {@link #close() close}
  65  * method to close the task scope. The API is intended to be used with the {@code
  66  * try-with-resources} construct. The intention is that code in the <em>block</em> uses
  67  * the {@code fork} method to fork threads to execute the subtasks, wait for the threads
  68  * to finish with the {@code join} method, and then <em>process the results</em>.
  69  * Processing of results may include handling or re-throwing of exceptions.
  70  * {@snippet lang=java :
  71  *     try (var scope = new StructuredTaskScope<Object>()) {
  72  *
  73  *         Future<Integer> future1 = scope.fork(task1);   // @highlight substring="fork"
  74  *         Future<String> future2 = scope.fork(task2);    // @highlight substring="fork"
  75  *
  76  *         scope.join();                                  // @highlight substring="join"
  77  *
  78  *         ... process results/exceptions ...
  79  *
  80  *     } // close                                         // @highlight substring="close"
  81  * }
  82  * To ensure correct usage, the {@code join} and {@code close} methods may only be invoked
  83  * by the <em>owner</em> (the thread that opened/created the task scope}, and the
  84  * {@code close} method throws an exception after closing if the owner did not invoke the
  85  * {@code join} method after forking.
  86  *
  87  * <p> {@code StructuredTaskScope} defines the {@link #shutdown() shutdown} method to shut
  88  * down a task scope without closing it. Shutdown is useful for cases where a subtask
  89  * completes with a result (or exception) and the results of other unfinished subtasks are
  90  * no longer needed. If a subtask invokes {@code shutdown} while the owner is waiting in
  91  * the {@code join} method then it will cause {@code join} to wakeup, all unfinished
  92  * threads to be {@linkplain Thread#interrupt() interrupted} and prevents new threads
  93  * from starting in the task scope.
  94  *
  95  * <h2>Subclasses with policies for common cases</h2>
  96  *
  97  * Two subclasses of {@code StructuredTaskScope} are defined to implement policy for
  98  * common cases:
  99  * <ol>
 100  *   <li> {@link ShutdownOnSuccess ShutdownOnSuccess} captures the first result and
 101  *   shuts down the task scope to interrupt unfinished threads and wakeup the owner. This
 102  *   class is intended for cases where the result of any subtask will do ("invoke any")
 103  *   and where there is no need to wait for results of other unfinished tasks. It defines
 104  *   methods to get the first result or throw an exception if all subtasks fail.
 105  *   <li> {@link ShutdownOnFailure ShutdownOnFailure} captures the first exception and
 106  *   shuts down the task scope. This class is intended for cases where the results of all
 107  *   subtasks are required ("invoke all"); if any subtask fails then the results of other
 108  *   unfinished subtasks are no longer needed. If defines methods to throw an exception if
 109  *   any of the subtasks fail.
 110  * </ol>
 111  *
 112  * <p> The following are two examples that use the two classes. In both cases, a pair of
 113  * subtasks are forked to fetch resources from two URL locations "left" and "right". The
 114  * first example creates a ShutdownOnSuccess object to capture the result of the first
 115  * subtask to complete normally, cancelling the other by way of shutting down the task
 116  * scope. The main task waits in {@code join} until either subtask completes with a result
 117  * or both subtasks fail. It invokes {@link ShutdownOnSuccess#result(Function)
 118  * result(Function)} method to get the captured result. If both subtasks fail then this
 119  * method throws a {@code WebApplicationException} with the exception from one of the
 120  * subtasks as the cause.
 121  * {@snippet lang=java :
 122  *     try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
 123  *
 124  *         scope.fork(() -> fetch(left));
 125  *         scope.fork(() -> fetch(right));
 126  *
 127  *         scope.join();
 128  *
 129  *         // @link regex="result(?=\()" target="ShutdownOnSuccess#result" :
 130  *         String result = scope.result(e -> new WebApplicationException(e));
 131  *
 132  *         ...
 133  *     }
 134  * }
 135  * The second example creates a ShutdownOnFailure object to capture the exception of the
 136  * first subtask to fail, cancelling the other by way of shutting down the task scope. The
 137  * main task waits in {@link #joinUntil(Instant)} until both subtasks complete with a
 138  * result, either fails, or a deadline is reached. It invokes {@link
 139  * ShutdownOnFailure#throwIfFailed(Function) throwIfFailed(Function)} to throw an exception
 140  * when either subtask fails. This method is a no-op if no subtasks fail. The main task
 141  * uses {@code Future}'s {@link Future#resultNow() resultNow()} method to retrieve the
 142  * results.
 143  *
 144  * {@snippet lang=java :
 145  *    Instant deadline = ...
 146  *
 147  *    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
 148  *
 149  *         Future<String> future1 = scope.fork(() -> query(left));
 150  *         Future<String> future2 = scope.fork(() -> query(right));
 151  *
 152  *         scope.joinUntil(deadline);
 153  *
 154  *         // @link substring="throwIfFailed" target="ShutdownOnFailure#throwIfFailed" :
 155  *         scope.throwIfFailed(e -> new WebApplicationException(e));
 156  *
 157  *         // both subtasks completed successfully
 158  *         String result = Stream.of(future1, future2)
 159  *                 // @link substring="Future::resultNow" target="Future#resultNow" :
 160  *                 .map(Future::resultNow)
 161  *                 .collect(Collectors.joining(", ", "{ ", " }"));
 162  *
 163  *         ...
 164  *     }
 165  * }
 166  *
 167  * <h2>Extending StructuredTaskScope</h2>
 168  *
 169  * {@code StructuredTaskScope} can be extended, and the {@link #handleComplete(Future)
 170  * handleComplete} overridden, to implement policies other than those implemented by
 171  * {@code ShutdownOnSuccess} and {@code ShutdownOnFailure}. The method may be overridden
 172  * to, for example, collect the results of subtasks that complete with a result and ignore
 173  * subtasks that fail. It may collect exceptions when subtasks fail. It may invoke the
 174  * {@link #shutdown() shutdown} method to shut down and cause {@link #join() join} to
 175  * wakeup when some condition arises.
 176  *
 177  * <p> A subclass will typically define methods to make available results, state, or other
 178  * outcome to code that executes after the {@code join} method. A subclass that collects
 179  * results and ignores subtasks that fail may define a method that returns a collection of
 180  * results. A subclass that implements a policy to shut down when a subtask fails may
 181  * define a method to retrieve the exception of the first subtask to fail.
 182  *
 183  * <p> The following is an example of a {@code StructuredTaskScope} implementation that
 184  * collects the results of subtasks that complete successfully. It defines the method
 185  * <b>{@code results()}</b> to be used by the main task to retrieve the results.
 186  *
 187  * {@snippet lang=java :
 188  *     class MyScope<T> extends StructuredTaskScope<T> {
 189  *         private final Queue<T> results = new ConcurrentLinkedQueue<>();
 190  *
 191  *         MyScope() {
 192  *             super(null, Thread.ofVirtual().factory());
 193  *         }
 194  *
 195  *         @Override
 196  *         // @link substring="handleComplete" target="handleComplete" :
 197  *         protected void handleComplete(Future<T> future) {
 198  *             if (future.state() == Future.State.SUCCESS) {
 199  *                 T result = future.resultNow();
 200  *                 results.add(result);
 201  *             }
 202  *         }
 203  *
 204  *         // Returns a stream of results from the subtasks that completed successfully
 205  *         public Stream<T> results() {     // @highlight substring="results"
 206  *             return results.stream();
 207  *         }
 208  *     }
 209  *  }
 210  *
 211  * <h2><a id="TreeStructure">Tree structure</a></h2>
 212  *
 213  * Task scopes form a tree where parent-child relations are established implicitly when
 214  * opening a new task scope:
 215  * <ul>
 216  *   <li> A parent-child relation is established when a thread started in a task scope
 217  *   opens its own task scope. A thread started in task scope "A" that opens task scope
 218  *   "B" establishes a parent-child relation where task scope "A" is the parent of task
 219  *   scope "B".
 220  *   <li> A parent-child relation is established with nesting. If a thread opens task
 221  *   scope "B", then opens task scope "C" (before it closes "B"), then the enclosing task
 222  *   scope "B" is the parent of the nested task scope "C".
 223  * </ul>
 224  *
 225  * The <i>descendants</i> of a task scope are the child task scopes that it is a parent
 226  * of, plus the descendants of the child task scopes, recursively.
 227  *
 228  * <p> The tree structure supports:
 229  * <ul>
 230  *   <li> Inheritance of {@linkplain ScopedValue scoped values} across threads.
 231  *   <li> Confinement checks. The phrase "threads contained in the task scope" in method
 232  *   descriptions means threads started in the task scope or descendant scopes.
 233  * </ul>
 234  *
 235  * <p> The following example demonstrates the inheritance of a scoped value. A scoped
 236  * value {@code USERNAME} is bound to the value "{@code duke}". A {@code StructuredTaskScope}
 237  * is created and its {@code fork} method invoked to start a thread to execute {@code
 238  * childTask}. The thread inherits the scoped value <em>bindings</em> captured when
 239  * creating the task scope. The code in {@code childTask} uses the value of the scoped
 240  * value and so reads the value "{@code duke}".
 241  * {@snippet lang=java :
 242  *     private static final ScopedValue<String> USERNAME = ScopedValue.newInstance();
 243  *
 244  *     // @link substring="where" target="ScopedValue#where(ScopedValue, Object, Runnable)" :
 245  *     ScopedValue.where(USERNAME, "duke", () -> {
 246  *         try (var scope = new StructuredTaskScope<String>()) {
 247  *
 248  *             scope.fork(() -> childTask());           // @highlight substring="fork"
 249  *             ...
 250  *          }
 251  *     });
 252  *
 253  *     ...
 254  *
 255  *     String childTask() {
 256  *         // @link substring="get" target="ScopedValue#get()" :
 257  *         String name = USERNAME.get();   // "duke"
 258  *         ...
 259  *     }
 260  * }
 261  *
 262  * <p> {@code StructuredTaskScope} does not define APIs that exposes the tree structure
 263  * at this time.
 264  *
 265  * <p> Unless otherwise specified, passing a {@code null} argument to a constructor
 266  * or method in this class will cause a {@link NullPointerException} to be thrown.
 267  *
 268  * <h2>Memory consistency effects</h2>
 269  *
 270  * <p> Actions in the owner thread of, or a thread contained in, the task scope prior to
 271  * {@linkplain #fork forking} of a {@code Callable} task
 272  * <a href="{@docRoot}/java.base/java/util/concurrent/package-summary.html#MemoryVisibility">
 273  * <i>happen-before</i></a> any actions taken by that task, which in turn <i>happen-before</i>
 274  * the task result is retrieved via its {@code Future}, or <i>happen-before</i> any actions
 275  * taken in a thread after {@linkplain #join() joining} of the task scope.
 276  *
 277  * @jls 17.4.5 Happens-before Order
 278  *
 279  * @param <T> the result type of tasks executed in the scope
 280  * @since 19
 281  */
 282 public class StructuredTaskScope<T> implements AutoCloseable {
 283     private static final VarHandle FUTURES;
 284     static {
 285         try {
 286             MethodHandles.Lookup l = MethodHandles.lookup();
 287             FUTURES = l.findVarHandle(StructuredTaskScope.class, "futures", Set.class);
 288         } catch (Exception e) {
 289             throw new InternalError(e);
 290         }
 291     }
 292 
 293     private final ThreadFactory factory;
 294     private final ThreadFlock flock;
 295     private final ReentrantLock shutdownLock = new ReentrantLock();
 296 
 297     // lazily created set of Future objects with threads waiting in Future::get
 298     private volatile Set<Future<?>> futures;
 299 
 300     // set by owner when it forks, reset by owner when it joins
 301     private boolean needJoin;
 302 
 303     // states: OPEN -> SHUTDOWN -> CLOSED
 304     private static final int OPEN     = 0;   // initial state
 305     private static final int SHUTDOWN = 1;
 306     private static final int CLOSED   = 2;
 307 
 308     // scope state, set by owner, read by any thread
 309     private volatile int state;
 310 
 311     /**
 312      * Creates a structured task scope with the given name and thread factory. The task
 313      * scope is optionally named for the purposes of monitoring and management. The thread
 314      * factory is used to {@link ThreadFactory#newThread(Runnable) create} threads when
 315      * tasks are {@linkplain #fork(Callable) forked}. The task scope is owned by the
 316      * current thread.
 317      *
 318      * <p> This method captures the current thread's {@linkplain ScopedValue scoped value}
 319      * bindings for inheritance by threads created in the task scope. The
 320      * <a href="#TreeStructure">Tree Structure</a> section in the class description
 321      * details how parent-child relations are established implicitly for the purpose of
 322      * inheritance of scoped value bindings.
 323      *
 324      * @param name the name of the task scope, can be null
 325      * @param factory the thread factory
 326      */
 327     public StructuredTaskScope(String name, ThreadFactory factory) {
 328         this.factory = Objects.requireNonNull(factory, "'factory' is null");
 329         this.flock = ThreadFlock.open(name);
 330     }
 331 
 332     /**
 333      * Creates an unnamed structured task scope that creates virtual threads. The task
 334      * scope is owned by the current thread.
 335      *
 336      * <p> This constructor is equivalent to invoking the 2-arg constructor with a name
 337      * of {@code null} and a thread factory that creates virtual threads.
 338      *
 339      * @throws UnsupportedOperationException if preview features are not enabled
 340      */
 341     public StructuredTaskScope() {
 342         PreviewFeatures.ensureEnabled();
 343         this.factory = Thread.ofVirtual().factory();
 344         this.flock = ThreadFlock.open(null);
 345     }
 346 
 347     /**
 348      * Throws WrongThreadException if the current thread is not the owner.
 349      */
 350     private void ensureOwner() {
 351         if (Thread.currentThread() != flock.owner())
 352             throw new WrongThreadException("Current thread not owner");
 353     }
 354 
 355     /**
 356      * Throws WrongThreadException if the current thread is not the owner
 357      * or a thread contained in the tree.
 358      */
 359     private void ensureOwnerOrContainsThread() {
 360         Thread currentThread = Thread.currentThread();
 361         if (currentThread != flock.owner() && !flock.containsThread(currentThread))
 362             throw new WrongThreadException("Current thread not owner or thread in the tree");
 363     }
 364 
 365     /**
 366      * Tests if the task scope is shutdown.
 367      */
 368     private boolean isShutdown() {
 369         return state >= SHUTDOWN;
 370     }
 371 
 372     /**
 373      * Track the given Future.
 374      */
 375     private void track(Future<?> future) {
 376         // create the set of Futures if not already created
 377         Set<Future<?>> futures = this.futures;
 378         if (futures == null) {
 379             futures = ConcurrentHashMap.newKeySet();
 380             if (!FUTURES.compareAndSet(this, null, futures)) {
 381                 // lost the race
 382                 futures = this.futures;
 383             }
 384         }
 385         futures.add(future);
 386     }
 387 
 388     /**
 389      * Stop tracking the Future.
 390      */
 391     private void untrack(Future<?> future) {
 392         assert futures != null;
 393         futures.remove(future);
 394     }
 395 
 396     /**
 397      * Invoked when a task completes before the scope is shut down.
 398      *
 399      * <p> The {@code handleComplete} method should be thread safe. It may be invoked by
 400      * several threads concurrently.
 401      *
 402      * @implSpec The default implementation does nothing.
 403      *
 404      * @param future the completed task
 405      */
 406     protected void handleComplete(Future<T> future) { }
 407 
 408     /**
 409      * Starts a new thread to run the given task.
 410      *
 411      * <p> The new thread is created with the task scope's {@link ThreadFactory}. It
 412      * inherits the current thread's {@linkplain ScopedValue scoped value} bindings. The
 413      * bindings must match the bindings captured when the task scope was created.
 414      *
 415      * <p> If the task completes before the task scope is {@link #shutdown() shutdown}
 416      * then the {@link #handleComplete(Future) handleComplete} method is invoked to
 417      * consume the completed task. The {@code handleComplete} method is run when the task
 418      * completes with a result or exception. If the {@code Future}'s {@link
 419      * Future#cancel(boolean) cancel} method is used to cancel a task before the task scope
 420      * is shut down, then the {@code handleComplete} method is run by the thread that
 421      * invokes {@code cancel}. If the task scope shuts down at or around the same time
 422      * that the task completes or is cancelled then the {@code handleComplete} method may
 423      * or may not be invoked.
 424      *
 425      * <p> If this task scope is {@linkplain #shutdown() shutdown} (or in the process
 426      * of shutting down) then {@code fork} returns a {@code Future} representing a {@link
 427      * Future.State#CANCELLED cancelled} task that was not run.
 428      *
 429      * <p> This method may only be invoked by the task scope owner or threads contained
 430      * in the task scope. The {@link Future#cancel(boolean) cancel} method of the returned
 431      * {@code Future} object is also restricted to the task scope owner or threads contained
 432      * in the task scope. The {@code cancel} method throws {@link WrongThreadException}
 433      * if invoked from another thread. All other methods on the returned {@code Future}
 434      * object, such as {@link Future#get() get}, are not restricted.
 435      *
 436      * @param task the task to run
 437      * @param <U> the result type
 438      * @return a future
 439      * @throws IllegalStateException if this task scope is closed
 440      * @throws WrongThreadException if the current thread is not the owner or a thread
 441      * contained in the task scope
 442      * @throws StructureViolationException if the current scoped value bindings are not
 443      * the same as when the task scope was created
 444      * @throws RejectedExecutionException if the thread factory rejected creating a
 445      * thread to run the task
 446      */
 447     public <U extends T> Future<U> fork(Callable<? extends U> task) {
 448         Objects.requireNonNull(task, "'task' is null");
 449 
 450         // create future
 451         var future = new FutureImpl<U>(this, task);
 452 
 453         boolean shutdown = (state >= SHUTDOWN);
 454 
 455         if (!shutdown) {
 456             // create thread
 457             Thread thread = factory.newThread(future);
 458             if (thread == null) {
 459                 throw new RejectedExecutionException("Rejected by thread factory");
 460             }
 461 
 462             // attempt to start the thread
 463             try {
 464                 flock.start(thread);
 465             } catch (IllegalStateException e) {
 466                 // shutdown or in the process of shutting down
 467                 shutdown = true;
 468             }
 469         }
 470 
 471         if (shutdown) {
 472             if (state == CLOSED) {
 473                 throw new IllegalStateException("Task scope is closed");
 474             } else {
 475                 future.cancel(false);
 476             }
 477         }
 478 
 479         // if owner forks then it will need to join
 480         if (Thread.currentThread() == flock.owner() && !needJoin) {
 481             needJoin = true;
 482         }
 483 
 484         return future;
 485     }
 486 
 487     /**
 488      * Wait for all threads to finish or the task scope to shut down.
 489      */
 490     private void implJoin(Duration timeout)
 491         throws InterruptedException, TimeoutException
 492     {
 493         ensureOwner();
 494         needJoin = false;
 495         int s = state;
 496         if (s >= SHUTDOWN) {
 497             if (s == CLOSED)
 498                 throw new IllegalStateException("Task scope is closed");
 499             return;
 500         }
 501 
 502         // wait for all threads, wakeup, interrupt, or timeout
 503         if (timeout != null) {
 504             flock.awaitAll(timeout);
 505         } else {
 506             flock.awaitAll();
 507         }
 508     }
 509 
 510     /**
 511      * Wait for all threads to finish or the task scope to shut down. This method waits
 512      * until all threads started in the task scope finish execution (of both task and
 513      * {@link #handleComplete(Future) handleComplete} method), the {@link #shutdown()
 514      * shutdown} method is invoked to shut down the task scope, or the current thread
 515      * is {@linkplain Thread#interrupt() interrupted}.
 516      *
 517      * <p> This method may only be invoked by the task scope owner.
 518      *
 519      * @return this task scope
 520      * @throws IllegalStateException if this task scope is closed
 521      * @throws WrongThreadException if the current thread is not the owner
 522      * @throws InterruptedException if interrupted while waiting
 523      */
 524     public StructuredTaskScope<T> join() throws InterruptedException {
 525         try {
 526             implJoin(null);
 527         } catch (TimeoutException e) {
 528             throw new InternalError();
 529         }
 530         return this;
 531     }
 532 
 533     /**
 534      * Wait for all threads to finish or the task scope to shut down, up to the given
 535      * deadline. This method waits until all threads started in the task scope finish
 536      * execution (of both task and {@link #handleComplete(Future) handleComplete} method),
 537      * the {@link #shutdown() shutdown} method is invoked to shut down the task scope,
 538      * the current thread is {@linkplain Thread#interrupt() interrupted}, or the deadline
 539      * is reached.
 540      *
 541      * <p> This method may only be invoked by the task scope owner.
 542      *
 543      * @param deadline the deadline
 544      * @return this task scope
 545      * @throws IllegalStateException if this task scope is closed
 546      * @throws WrongThreadException if the current thread is not the owner
 547      * @throws InterruptedException if interrupted while waiting
 548      * @throws TimeoutException if the deadline is reached while waiting
 549      */
 550     public StructuredTaskScope<T> joinUntil(Instant deadline)
 551         throws InterruptedException, TimeoutException
 552     {
 553         Duration timeout = Duration.between(Instant.now(), deadline);
 554         implJoin(timeout);
 555         return this;
 556     }
 557 
 558     /**
 559      * Cancel all tracked Future objects.
 560      */
 561     private void cancelTrackedFutures() {
 562         Set<Future<?>> futures = this.futures;
 563         if (futures != null) {
 564             futures.forEach(f -> f.cancel(false));
 565         }
 566     }
 567 
 568     /**
 569      * Interrupt all unfinished threads.
 570      */
 571     private void implInterruptAll() {
 572         flock.threads().forEach(t -> {
 573             if (t != Thread.currentThread()) {
 574                 t.interrupt();
 575             }
 576         });
 577     }
 578 
 579     @SuppressWarnings("removal")
 580     private void interruptAll() {
 581         if (System.getSecurityManager() == null) {
 582             implInterruptAll();
 583         } else {
 584             PrivilegedAction<Void> pa = () -> {
 585                 implInterruptAll();
 586                 return null;
 587             };
 588             AccessController.doPrivileged(pa);
 589         }
 590     }
 591 
 592     /**
 593      * Shutdown the task scope if not already shutdown. Return true if this method
 594      * shutdowns the task scope, false if already shutdown.
 595      */
 596     private boolean implShutdown() {
 597         if (state < SHUTDOWN) {
 598             shutdownLock.lock();
 599             try {
 600                 if (state < SHUTDOWN) {
 601 
 602                     // prevent new threads from starting
 603                     flock.shutdown();
 604 
 605                     // wakeup any threads waiting in Future::get
 606                     cancelTrackedFutures();
 607 
 608                     // interrupt all unfinished threads
 609                     interruptAll();
 610 
 611                     state = SHUTDOWN;
 612                     return true;
 613                 }
 614             } finally {
 615                 shutdownLock.unlock();
 616             }
 617         }
 618         assert state >= SHUTDOWN;
 619         return false;
 620     }
 621 
 622     /**
 623      * Shut down the task scope without closing it. Shutting down a task scope prevents
 624      * new threads from starting, interrupts all unfinished threads, and causes the
 625      * {@link #join() join} method to wakeup. Shutdown is useful for cases where the
 626      * results of unfinished subtasks are no longer needed.
 627      *
 628      * <p> More specifically, this method:
 629      * <ul>
 630      * <li> {@linkplain Future#cancel(boolean) Cancels} the tasks that have threads
 631      * {@linkplain Future#get() waiting} on a result so that the waiting threads wakeup.
 632      * <li> {@linkplain Thread#interrupt() Interrupts} all unfinished threads in the
 633      * task scope (except the current thread).
 634      * <li> Wakes up the owner if it is waiting in {@link #join()} or {@link
 635      * #joinUntil(Instant)}. If the owner is not waiting then its next call to {@code
 636      * join} or {@code joinUntil} will return immediately.
 637      * </ul>
 638      *
 639      * <p> When this method completes then the {@code Future} objects for all tasks will
 640      * be {@linkplain Future#isDone() done}, normally or abnormally. There may still
 641      * be threads that have not finished because they are executing code that did not
 642      * respond (or respond promptly) to thread interrupt. This method does not wait
 643      * for these threads. When the owner invokes the {@link #close() close} method
 644      * to close the task scope then it will wait for the remaining threads to finish.
 645      *
 646      * <p> This method may only be invoked by the task scope owner or threads contained
 647      * in the task scope.
 648      *
 649      * @throws IllegalStateException if this task scope is closed
 650      * @throws WrongThreadException if the current thread is not the owner or
 651      * a thread contained in the task scope
 652      */
 653     public void shutdown() {
 654         ensureOwnerOrContainsThread();
 655         if (state == CLOSED)
 656             throw new IllegalStateException("Task scope is closed");
 657         if (implShutdown())
 658             flock.wakeup();
 659     }
 660 
 661     /**
 662      * Closes this task scope.
 663      *
 664      * <p> This method first shuts down the task scope (as if by invoking the {@link
 665      * #shutdown() shutdown} method). It then waits for the threads executing any
 666      * unfinished tasks to finish. If interrupted then this method will continue to
 667      * wait for the threads to finish before completing with the interrupt status set.
 668      *
 669      * <p> This method may only be invoked by the task scope owner. If the task scope
 670      * is already closed then the owner invoking this method has no effect.
 671      *
 672      * <p> A {@code StructuredTaskScope} is intended to be used in a <em>structured
 673      * manner</em>. If this method is called to close a task scope before nested task
 674      * scopes are closed then it closes the underlying construct of each nested task scope
 675      * (in the reverse order that they were created in), closes this task scope, and then
 676      * throws {@link StructureViolationException}.
 677      *
 678      * Similarly, if this method is called to close a task scope while executing with
 679      * {@linkplain ScopedValue scoped value} bindings, and the task scope was created
 680      * before the scoped values were bound, then {@code StructureViolationException} is
 681      * thrown after closing the task scope.
 682      *
 683      * If a thread terminates without first closing task scopes that it owns then
 684      * termination will cause the underlying construct of each of its open tasks scopes to
 685      * be closed. Closing is performed in the reverse order that the task scopes were
 686      * created in. Thread termination may therefore be delayed when the owner has to wait
 687      * for threads forked in these task scopes to finish.
 688      *
 689      * @throws IllegalStateException thrown after closing the task scope if the owner
 690      * did not invoke join after forking
 691      * @throws WrongThreadException if the current thread is not the owner
 692      * @throws StructureViolationException if a structure violation was detected
 693      */
 694     @Override
 695     public void close() {
 696         ensureOwner();
 697         if (state == CLOSED)
 698             return;
 699 
 700         try {
 701             implShutdown();
 702             flock.close();
 703         } finally {
 704             state = CLOSED;
 705         }
 706 
 707         if (needJoin) {
 708             throw new IllegalStateException("Owner did not invoke join or joinUntil after fork");
 709         }
 710     }
 711 
 712     @Override
 713     public String toString() {
 714         StringBuilder sb = new StringBuilder();
 715         String name = flock.name();
 716         if (name != null) {
 717             sb.append(name);
 718             sb.append('/');
 719         }
 720         sb.append(Objects.toIdentityString(this));
 721         int s = state;
 722         if (s == CLOSED)
 723             sb.append("/closed");
 724         else if (s == SHUTDOWN)
 725             sb.append("/shutdown");
 726         return sb.toString();
 727     }
 728 
 729     /**
 730      * The Future implementation returned by the fork methods. Most methods are
 731      * overridden to support cancellation when the task scope is shutdown.
 732      * The blocking get methods register the Future with the task scope so that they
 733      * are cancelled when the task scope shuts down.
 734      */
 735     private static final class FutureImpl<V> extends FutureTask<V> {
 736         private final StructuredTaskScope<V> scope;
 737 
 738         @SuppressWarnings("unchecked")
 739         FutureImpl(StructuredTaskScope<? super V> scope, Callable<? extends V> task) {
 740             super((Callable<V>) task);
 741             this.scope = (StructuredTaskScope<V>) scope;
 742         }
 743 
 744         @Override
 745         protected void done() {
 746             if (!scope.isShutdown()) {
 747                 scope.handleComplete(this);
 748             }
 749         }
 750 
 751         private void cancelIfShutdown() {
 752             if (scope.isShutdown() && !super.isDone()) {
 753                 super.cancel(false);
 754             }
 755         }
 756 
 757         @Override
 758         public boolean isDone() {
 759             cancelIfShutdown();
 760             return super.isDone();
 761         }
 762 
 763         @Override
 764         public boolean isCancelled() {
 765             cancelIfShutdown();
 766             return super.isCancelled();
 767         }
 768 
 769         @Override
 770         public boolean cancel(boolean mayInterruptIfRunning) {
 771             scope.ensureOwnerOrContainsThread();
 772             cancelIfShutdown();
 773             return super.cancel(mayInterruptIfRunning);
 774         }
 775 
 776         @Override
 777         public V get() throws InterruptedException, ExecutionException {
 778             if (super.isDone())
 779                 return super.get();
 780             scope.track(this);
 781             try {
 782                 cancelIfShutdown();
 783                 return super.get();
 784             } finally {
 785                 scope.untrack(this);
 786             }
 787         }
 788 
 789         @Override
 790         public V get(long timeout, TimeUnit unit)
 791                 throws InterruptedException, ExecutionException, TimeoutException {
 792             Objects.requireNonNull(unit);
 793             if (super.isDone())
 794                 return super.get();
 795             scope.track(this);
 796             try {
 797                 cancelIfShutdown();
 798                 return super.get(timeout, unit);
 799             } finally {
 800                 scope.untrack(this);
 801             }
 802         }
 803 
 804         @Override
 805         public V resultNow() {
 806             cancelIfShutdown();
 807             return super.resultNow();
 808         }
 809 
 810         @Override
 811         public Throwable exceptionNow() {
 812             cancelIfShutdown();
 813             return super.exceptionNow();
 814         }
 815 
 816         @Override
 817         public State state() {
 818             cancelIfShutdown();
 819             return super.state();
 820         }
 821 
 822         @Override
 823         public String toString() {
 824             cancelIfShutdown();
 825             return super.toString();
 826         }
 827     }
 828 
 829     /**
 830      * Maps a Future.State to an int that can be compared.
 831      * RUNNING < CANCELLED < FAILED < SUCCESS.
 832      */
 833     private static int futureStateToInt(Future.State s) {
 834         return switch (s) {
 835             case RUNNING   -> 0;
 836             case CANCELLED -> 1;
 837             case FAILED    -> 2;
 838             case SUCCESS   -> 3;
 839         };
 840     }
 841 
 842     // RUNNING < CANCELLED < FAILED < SUCCESS
 843     private static final Comparator<Future.State> FUTURE_STATE_COMPARATOR =
 844             Comparator.comparingInt(StructuredTaskScope::futureStateToInt);
 845 
 846     /**
 847      * A {@code StructuredTaskScope} that captures the result of the first subtask to
 848      * complete successfully. Once captured, it invokes the {@linkplain #shutdown() shutdown}
 849      * method to interrupt unfinished threads and wakeup the owner. The policy
 850      * implemented by this class is intended for cases where the result of any subtask
 851      * will do ("invoke any") and where the results of other unfinished subtask are no
 852      * longer needed.
 853      *
 854      * <p> Unless otherwise specified, passing a {@code null} argument to a method
 855      * in this class will cause a {@link NullPointerException} to be thrown.
 856      *
 857      * @param <T> the result type
 858      * @since 19
 859      */
 860     public static final class ShutdownOnSuccess<T> extends StructuredTaskScope<T> {
 861         private static final VarHandle FUTURE;
 862         static {
 863             try {
 864                 MethodHandles.Lookup l = MethodHandles.lookup();
 865                 FUTURE = l.findVarHandle(ShutdownOnSuccess.class, "future", Future.class);
 866             } catch (Exception e) {
 867                 throw new InternalError(e);
 868             }
 869         }
 870         private volatile Future<T> future;
 871 
 872         /**
 873          * Constructs a new {@code ShutdownOnSuccess} with the given name and thread factory.
 874          * The task scope is optionally named for the purposes of monitoring and management.
 875          * The thread factory is used to {@link ThreadFactory#newThread(Runnable) create}
 876          * threads when tasks are {@linkplain #fork(Callable) forked}. The task scope is
 877          * owned by the current thread.
 878          *
 879          * <p> This method captures the current thread's {@linkplain ScopedValue scoped value}
 880          * bindings for inheritance by threads created in the task scope. The
 881          * <a href="StructuredTaskScope.html#TreeStructure">Tree Structure</a> section in
 882          * the class description details how parent-child relations are established
 883          * implicitly for the purpose of inheritance of scoped value bindings.
 884          *
 885          * @param name the name of the task scope, can be null
 886          * @param factory the thread factory
 887          */
 888         public ShutdownOnSuccess(String name, ThreadFactory factory) {
 889             super(name, factory);
 890         }
 891 
 892         /**
 893          * Constructs a new unnamed {@code ShutdownOnSuccess} that creates virtual threads.
 894          *
 895          * <p> This constructor is equivalent to invoking the 2-arg constructor with a
 896          * name of {@code null} and a thread factory that creates virtual threads.
 897          */
 898         public ShutdownOnSuccess() {
 899             super(null, Thread.ofVirtual().factory());
 900         }
 901 
 902         /**
 903          * Shut down the given task scope when invoked for the first time with a {@code
 904          * Future} for a task that completed with a result.
 905          *
 906          * @param future the completed task
 907          * @see #shutdown()
 908          * @see Future.State#SUCCESS
 909          */
 910         @Override
 911         protected void handleComplete(Future<T> future) {
 912             Future.State state = future.state();
 913             if (state == Future.State.RUNNING) {
 914                 throw new IllegalArgumentException("Task is not completed");
 915             }
 916 
 917             Future<T> f;
 918             while (((f = this.future) == null)
 919                     || FUTURE_STATE_COMPARATOR.compare(f.state(), state) < 0) {
 920                 if (FUTURE.compareAndSet(this, f, future)) {
 921                     if (state == Future.State.SUCCESS)
 922                         shutdown();
 923                     break;
 924                 }
 925             }
 926         }
 927 
 928         /**
 929          * {@inheritDoc}
 930          * @return this task scope
 931          * @throws IllegalStateException {@inheritDoc}
 932          * @throws WrongThreadException {@inheritDoc}
 933          */
 934         @Override
 935         public ShutdownOnSuccess<T> join() throws InterruptedException {
 936             super.join();
 937             return this;
 938         }
 939 
 940         /**
 941          * {@inheritDoc}
 942          * @return this task scope
 943          * @throws IllegalStateException {@inheritDoc}
 944          * @throws WrongThreadException {@inheritDoc}
 945          */
 946         @Override
 947         public ShutdownOnSuccess<T> joinUntil(Instant deadline)
 948             throws InterruptedException, TimeoutException
 949         {
 950             super.joinUntil(deadline);
 951             return this;
 952         }
 953 
 954         /**
 955          * {@return the result of the first subtask that completed with a result}
 956          *
 957          * <p> When no subtask completed with a result but a task completed with an
 958          * exception then {@code ExecutionException} is thrown with the exception as the
 959          * {@linkplain Throwable#getCause() cause}. If only cancelled subtasks were
 960          * notified to the {@code handleComplete} method then {@code CancellationException}
 961          * is thrown.
 962          *
 963          * @apiNote This method is intended to be invoked by the task scope owner after it
 964          * has invoked {@link #join() join} (or {@link #joinUntil(Instant) joinUntil}).
 965          * A future release may add enforcement to prevent the method being called by
 966          * other threads or before joining.
 967          *
 968          * @throws ExecutionException if no subtasks completed with a result but a subtask
 969          * completed with an exception
 970          * @throws CancellationException if all subtasks were cancelled
 971          * @throws IllegalStateException if the handle method was not invoked with a
 972          * completed subtask
 973          */
 974         public T result() throws ExecutionException {
 975             Future<T> f = future;
 976             if (f == null) {
 977                 throw new IllegalStateException("No completed subtasks");
 978             }
 979             return switch (f.state()) {
 980                 case SUCCESS   -> f.resultNow();
 981                 case FAILED    -> throw new ExecutionException(f.exceptionNow());
 982                 case CANCELLED -> throw new CancellationException();
 983                 default        -> throw new InternalError("Unexpected state: " + f);
 984             };
 985 
 986         }
 987 
 988         /**
 989          * Returns the result of the first subtask that completed with a result, otherwise
 990          * throws an exception produced by the given exception supplying function.
 991          *
 992          * <p> When no subtask completed with a result but a subtask completed with an
 993          * exception then the exception supplying function is invoked with the exception.
 994          * If only cancelled subtasks were notified to the {@code handleComplete} method
 995          * then the exception supplying function is invoked with a {@code CancellationException}.
 996          *
 997          * @apiNote This method is intended to be invoked by the task scope owner after it
 998          * has invoked {@link #join() join} (or {@link #joinUntil(Instant) joinUntil}).
 999          * A future release may add enforcement to prevent the method being called by
1000          * other threads or before joining.
1001          *
1002          * @param esf the exception supplying function
1003          * @param <X> type of the exception to be thrown
1004          * @return the result of the first subtask that completed with a result
1005          * @throws X if no subtask completed with a result
1006          * @throws IllegalStateException if the handle method was not invoked with a
1007          * completed subtask
1008          */
1009         public <X extends Throwable> T result(Function<Throwable, ? extends X> esf) throws X {
1010             Objects.requireNonNull(esf);
1011             Future<T> f = future;
1012             if (f == null) {
1013                 throw new IllegalStateException("No completed subtasks");
1014             }
1015             Future.State state = f.state();
1016             if (state == Future.State.SUCCESS) {
1017                 return f.resultNow();
1018             } else {
1019                 Throwable throwable = (state == Future.State.FAILED)
1020                         ? f.exceptionNow()
1021                         : new CancellationException();
1022                 X ex = esf.apply(throwable);
1023                 Objects.requireNonNull(ex, "esf returned null");
1024                 throw ex;
1025             }
1026         }
1027     }
1028 
1029     /**
1030      * A {@code StructuredTaskScope} that captures the exception of the first subtask to
1031      * complete abnormally. Once captured, it invokes the {@linkplain #shutdown() shutdown}
1032      * method to interrupt unfinished threads and wakeup the owner. The policy implemented
1033      * by this class is intended for cases where the results for all subtasks are required
1034      * ("invoke all"); if any subtask fails then the results of other unfinished subtasks
1035      * are no longer needed.
1036      *
1037      * <p> Unless otherwise specified, passing a {@code null} argument to a method
1038      * in this class will cause a {@link NullPointerException} to be thrown.
1039      *
1040      * @since 19
1041      */
1042     public static final class ShutdownOnFailure extends StructuredTaskScope<Object> {
1043         private static final VarHandle FUTURE;
1044         static {
1045             try {
1046                 MethodHandles.Lookup l = MethodHandles.lookup();
1047                 FUTURE = l.findVarHandle(ShutdownOnFailure.class, "future", Future.class);
1048             } catch (Exception e) {
1049                 throw new InternalError(e);
1050             }
1051         }
1052         private volatile Future<Object> future;
1053 
1054         /**
1055          * Constructs a new {@code ShutdownOnFailure} with the given name and thread factory.
1056          * The task scope is optionally named for the purposes of monitoring and management.
1057          * The thread factory is used to {@link ThreadFactory#newThread(Runnable) create}
1058          * threads when tasks are {@linkplain #fork(Callable) forked}. The task scope
1059          * is owned by the current thread.
1060          *
1061          * <p> This method captures the current thread's {@linkplain ScopedValue scoped value}
1062          * bindings for inheritance by threads created in the task scope. The
1063          * <a href="StructuredTaskScope.html#TreeStructure">Tree Structure</a> section in
1064          * the class description details how parent-child relations are established
1065          * implicitly for the purpose of inheritance of scoped value bindings.
1066          *
1067          * @param name the name of the task scope, can be null
1068          * @param factory the thread factory
1069          */
1070         public ShutdownOnFailure(String name, ThreadFactory factory) {
1071             super(name, factory);
1072         }
1073 
1074         /**
1075          * Constructs a new unnamed {@code ShutdownOnFailure} that creates virtual threads.
1076          *
1077          * <p> This constructor is equivalent to invoking the 2-arg constructor with a
1078          * name of {@code null} and a thread factory that creates virtual threads.
1079          */
1080         public ShutdownOnFailure() {
1081             super(null, Thread.ofVirtual().factory());
1082         }
1083 
1084         /**
1085          * Shut down the given task scope when invoked for the first time with a {@code
1086          * Future} for a task that completed abnormally (exception or cancelled).
1087          *
1088          * @param future the completed task
1089          * @see #shutdown()
1090          * @see Future.State#FAILED
1091          * @see Future.State#CANCELLED
1092          */
1093         @Override
1094         protected void handleComplete(Future<Object> future) {
1095             Future.State state = future.state();
1096             if (state == Future.State.RUNNING) {
1097                 throw new IllegalArgumentException("Task is not completed");
1098             } else if (state == Future.State.SUCCESS) {
1099                 return;
1100             }
1101 
1102             // A failed task overrides a cancelled task.
1103             // The first failed or cancelled task causes the scope to shutdown.
1104             Future<Object> f;
1105             while (((f = this.future) == null)
1106                     || FUTURE_STATE_COMPARATOR.compare(f.state(), state) < 0) {
1107                 if (FUTURE.compareAndSet(this, f, future)) {
1108                     shutdown();
1109                     break;
1110                 }
1111             }
1112         }
1113 
1114         /**
1115          * {@inheritDoc}
1116          * @return this task scope
1117          * @throws IllegalStateException {@inheritDoc}
1118          * @throws WrongThreadException {@inheritDoc}
1119          */
1120         @Override
1121         public ShutdownOnFailure join() throws InterruptedException {
1122             super.join();
1123             return this;
1124         }
1125 
1126         /**
1127          * {@inheritDoc}
1128          * @return this task scope
1129          * @throws IllegalStateException {@inheritDoc}
1130          * @throws WrongThreadException {@inheritDoc}
1131          */
1132         @Override
1133         public ShutdownOnFailure joinUntil(Instant deadline)
1134             throws InterruptedException, TimeoutException
1135         {
1136             super.joinUntil(deadline);
1137             return this;
1138         }
1139 
1140         /**
1141          * Returns the exception for the first subtask that completed with an exception.
1142          * If no subtask completed with an exception but cancelled subtasks were notified
1143          * to the {@code handleComplete} method then a {@code CancellationException}
1144          * is returned. If no subtasks completed abnormally then an empty {@code Optional}
1145          * is returned.
1146          *
1147          * @apiNote This method is intended to be invoked by the task scope owner after it
1148          * has invoked {@link #join() join} (or {@link #joinUntil(Instant) joinUntil}).
1149          * A future release may add enforcement to prevent the method being called by
1150          * other threads or before joining.
1151          *
1152          * @return the exception for a subtask that completed abnormally or an empty
1153          * optional if no subtasks completed abnormally
1154          */
1155         public Optional<Throwable> exception() {
1156             Future<Object> f = future;
1157             if (f != null) {
1158                 Throwable throwable = (f.state() == Future.State.FAILED)
1159                         ? f.exceptionNow()
1160                         : new CancellationException();
1161                 return Optional.of(throwable);
1162             } else {
1163                 return Optional.empty();
1164             }
1165         }
1166 
1167         /**
1168          * Throws if a subtask completed abnormally. If any subtask completed with an
1169          * exception then {@code ExecutionException} is thrown with the exception of the
1170          * first subtask to fail as the {@linkplain Throwable#getCause() cause}. If no
1171          * subtask completed with an exception but cancelled subtasks were notified to the
1172          * {@code handleComplete} method then {@code CancellationException} is thrown.
1173          * This method does nothing if no subtasks completed abnormally.
1174          *
1175          * @apiNote This method is intended to be invoked by the task scope owner after it
1176          * has invoked {@link #join() join} (or {@link #joinUntil(Instant) joinUntil}).
1177          * A future release may add enforcement to prevent the method being called by
1178          * other threads or before joining.
1179          *
1180          * @throws ExecutionException if a subtask completed with an exception
1181          * @throws CancellationException if no subtasks completed with an exception but
1182          * subtasks were cancelled
1183          */
1184         public void throwIfFailed() throws ExecutionException {
1185             Future<Object> f = future;
1186             if (f != null) {
1187                 if (f.state() == Future.State.FAILED) {
1188                     throw new ExecutionException(f.exceptionNow());
1189                 } else {
1190                     throw new CancellationException();
1191                 }
1192             }
1193         }
1194 
1195         /**
1196          * Throws the exception produced by the given exception supplying function if
1197          * a subtask completed abnormally. If any subtask completed with an exception then
1198          * the function is invoked with the exception of the first subtask to fail.
1199          * If no subtask completed with an exception but cancelled subtasks were notified
1200          * to the {@code handleComplete} method then the function is called with a {@code
1201          * CancellationException}. The exception returned by the function is thrown.
1202          * This method does nothing if no subtasks completed abnormally.
1203          *
1204          * @apiNote This method is intended to be invoked by the task scope owner after it
1205          * has invoked {@link #join() join} (or {@link #joinUntil(Instant) joinUntil}).
1206          * A future release may add enforcement to prevent the method being called by
1207          * other threads or before joining.
1208          *
1209          * @param esf the exception supplying function
1210          * @param <X> type of the exception to be thrown
1211          * @throws X produced by the exception supplying function
1212          */
1213         public <X extends Throwable>
1214         void throwIfFailed(Function<Throwable, ? extends X> esf) throws X {
1215             Objects.requireNonNull(esf);
1216             Future<Object> f = future;
1217             if (f != null) {
1218                 Throwable throwable = (f.state() == Future.State.FAILED)
1219                         ? f.exceptionNow()
1220                         : new CancellationException();
1221                 X ex = esf.apply(throwable);
1222                 Objects.requireNonNull(ex, "esf returned null");
1223                 throw ex;
1224             }
1225         }
1226     }
1227 }