1 /*
   2  * Copyright (c) 2021, 2022, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package jdk.incubator.concurrent;
  26 
  27 import java.lang.invoke.MethodHandles;
  28 import java.lang.invoke.VarHandle;
  29 import java.security.AccessController;
  30 import java.security.PrivilegedAction;
  31 import java.time.Duration;
  32 import java.time.Instant;
  33 import java.util.Comparator;
  34 import java.util.Objects;
  35 import java.util.Optional;
  36 import java.util.Set;
  37 import java.util.concurrent.Callable;
  38 import java.util.concurrent.CancellationException;
  39 import java.util.concurrent.ConcurrentHashMap;
  40 import java.util.concurrent.ExecutionException;
  41 import java.util.concurrent.Future;
  42 import java.util.concurrent.FutureTask;
  43 import java.util.concurrent.RejectedExecutionException;
  44 import java.util.concurrent.ThreadFactory;
  45 import java.util.concurrent.TimeoutException;
  46 import java.util.concurrent.TimeUnit;
  47 import java.util.concurrent.locks.ReentrantLock;
  48 import java.util.function.Function;
  49 import jdk.internal.misc.PreviewFeatures;
  50 import jdk.internal.misc.ThreadFlock;
  51 
  52 /**
  53  * A basic API for <em>structured concurrency</em>. {@code StructuredTaskScope} supports
  54  * cases where a task splits into several concurrent subtasks, to be executed in their
  55  * own threads, and where the subtasks must complete before the main task continues. A
  56  * {@code StructuredTaskScope} can be used to ensure that the lifetime of a concurrent
  57  * operation is confined by a <em>syntax block</em>, just like that of a sequential
  58  * operation in structured programming.
  59  *
  60  * <h2>Basic usage</h2>
  61  *
  62  * A {@code StructuredTaskScope} is created with one of its public constructors. It defines
  63  * the {@link #fork(Callable) fork} method to start a thread to execute a task, the {@link
  64  * #join() join} method to wait for all threads to finish, and the {@link #close() close}
  65  * method to close the task scope. The API is intended to be used with the {@code
  66  * try-with-resources} construct. The intention is that code in the <em>block</em> uses
  67  * the {@code fork} method to fork threads to execute the subtasks, wait for the threads
  68  * to finish with the {@code join} method, and then <em>process the results</em>.
  69  * Processing of results may include handling or re-throwing of exceptions.
  70  * {@snippet lang=java :
  71  *     try (var scope = new StructuredTaskScope<Object>()) {
  72  *
  73  *         Future<Integer> future1 = scope.fork(task1);   // @highlight substring="fork"
  74  *         Future<String> future2 = scope.fork(task2);    // @highlight substring="fork"
  75  *
  76  *         scope.join();                                  // @highlight substring="join"
  77  *
  78  *         ... process results/exceptions ...
  79  *
  80  *     } // close                                         // @highlight substring="close"
  81  * }
  82  * To ensure correct usage, the {@code join} and {@code close} methods may only be invoked
  83  * by the <em>owner</em> (the thread that opened/created the task scope}, and the
  84  * {@code close} method throws an exception after closing if the owner did not invoke the
  85  * {@code join} method after forking.
  86  *
  87  * <p> {@code StructuredTaskScope} defines the {@link #shutdown() shutdown} method to shut
  88  * down a task scope without closing it. Shutdown is useful for cases where a subtask
  89  * completes with a result (or exception) and the results of other unfinished subtasks are
  90  * no longer needed. If a subtask invokes {@code shutdown} while the owner is waiting in
  91  * the {@code join} method then it will cause {@code join} to wakeup, all unfinished
  92  * threads to be {@linkplain Thread#interrupt() interrupted} and prevents new threads
  93  * from starting in the task scope.
  94  *
  95  * <h2>Subclasses with policies for common cases</h2>
  96  *
  97  * Two subclasses of {@code StructuredTaskScope} are defined to implement policy for
  98  * common cases:
  99  * <ol>
 100  *   <li> {@link ShutdownOnSuccess ShutdownOnSuccess} captures the first result and
 101  *   shuts down the task scope to interrupt unfinished threads and wakeup the owner. This
 102  *   class is intended for cases where the result of any subtask will do ("invoke any")
 103  *   and where there is no need to wait for results of other unfinished tasks. It defines
 104  *   methods to get the first result or throw an exception if all subtasks fail.
 105  *   <li> {@link ShutdownOnFailure ShutdownOnFailure} captures the first exception and
 106  *   shuts down the task scope. This class is intended for cases where the results of all
 107  *   subtasks are required ("invoke all"); if any subtask fails then the results of other
 108  *   unfinished subtasks are no longer needed. If defines methods to throw an exception if
 109  *   any of the subtasks fail.
 110  * </ol>
 111  *
 112  * <p> The following are two examples that use the two classes. In both cases, a pair of
 113  * subtasks are forked to fetch resources from two URL locations "left" and "right". The
 114  * first example creates a ShutdownOnSuccess object to capture the result of the first
 115  * subtask to complete normally, cancelling the other by way of shutting down the task
 116  * scope. The main task waits in {@code join} until either subtask completes with a result
 117  * or both subtasks fail. It invokes {@link ShutdownOnSuccess#result(Function)
 118  * result(Function)} method to get the captured result. If both subtasks fail then this
 119  * method throws a {@code WebApplicationException} with the exception from one of the
 120  * subtasks as the cause.
 121  * {@snippet lang=java :
 122  *     try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
 123  *
 124  *         scope.fork(() -> fetch(left));
 125  *         scope.fork(() -> fetch(right));
 126  *
 127  *         scope.join();
 128  *
 129  *         // @link regex="result(?=\()" target="ShutdownOnSuccess#result" :
 130  *         String result = scope.result(e -> new WebApplicationException(e));
 131  *
 132  *         ...
 133  *     }
 134  * }
 135  * The second example creates a ShutdownOnFailure object to capture the exception of the
 136  * first subtask to fail, cancelling the other by way of shutting down the task scope. The
 137  * main task waits in {@link #joinUntil(Instant)} until both subtasks complete with a
 138  * result, either fails, or a deadline is reached. It invokes {@link
 139  * ShutdownOnFailure#throwIfFailed(Function) throwIfFailed(Function)} to throw an exception
 140  * when either subtask fails. This method is a no-op if no subtasks fail. The main task
 141  * uses {@code Future}'s {@link Future#resultNow() resultNow()} method to retrieve the
 142  * results.
 143  *
 144  * {@snippet lang=java :
 145  *    Instant deadline = ...
 146  *
 147  *    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
 148  *
 149  *         Future<String> future1 = scope.fork(() -> query(left));
 150  *         Future<String> future2 = scope.fork(() -> query(right));
 151  *
 152  *         scope.joinUntil(deadline);
 153  *
 154  *         // @link substring="throwIfFailed" target="ShutdownOnFailure#throwIfFailed" :
 155  *         scope.throwIfFailed(e -> new WebApplicationException(e));
 156  *
 157  *         // both subtasks completed successfully
 158  *         String result = Stream.of(future1, future2)
 159  *                 // @link substring="Future::resultNow" target="Future#resultNow" :
 160  *                 .map(Future::resultNow)
 161  *                 .collect(Collectors.joining(", ", "{ ", " }"));
 162  *
 163  *         ...
 164  *     }
 165  * }
 166  *
 167  * <h2>Extending StructuredTaskScope</h2>
 168  *
 169  * {@code StructuredTaskScope} can be extended, and the {@link #handleComplete(Future)
 170  * handleComplete} overridden, to implement policies other than those implemented by
 171  * {@code ShutdownOnSuccess} and {@code ShutdownOnFailure}. The method may be overridden
 172  * to, for example, collect the results of subtasks that complete with a result and ignore
 173  * subtasks that fail. It may collect exceptions when subtasks fail. It may invoke the
 174  * {@link #shutdown() shutdown} method to shut down and cause {@link #join() join} to
 175  * wakeup when some condition arises.
 176  *
 177  * <p> A subclass will typically define methods to make available results, state, or other
 178  * outcome to code that executes after the {@code join} method. A subclass that collects
 179  * results and ignores subtasks that fail may define a method that returns a collection of
 180  * results. A subclass that implements a policy to shut down when a subtask fails may
 181  * define a method to retrieve the exception of the first subtask to fail.
 182  *
 183  * <p> The following is an example of a {@code StructuredTaskScope} implementation that
 184  * collects the results of subtasks that complete successfully. It defines the method
 185  * <b>{@code results()}</b> to be used by the main task to retrieve the results.
 186  *
 187  * {@snippet lang=java :
 188  *     class MyScope<T> extends StructuredTaskScope<T> {
 189  *         private final Queue<T> results = new ConcurrentLinkedQueue<>();
 190  *
 191  *         MyScope() {
 192  *             super(null, Thread.ofVirtual().factory());
 193  *         }
 194  *
 195  *         @Override
 196  *         // @link substring="handleComplete" target="handleComplete" :
 197  *         protected void handleComplete(Future<T> future) {
 198  *             if (future.state() == Future.State.SUCCESS) {
 199  *                 T result = future.resultNow();
 200  *                 results.add(result);
 201  *             }
 202  *         }
 203  *
 204  *         // Returns a stream of results from the subtasks that completed successfully
 205  *         public Stream<T> results() {     // @highlight substring="results"
 206  *             return results.stream();
 207  *         }
 208  *     }
 209  *  }
 210  *
 211  * <h2><a id="TreeStructure">Tree structure</a></h2>
 212  *
 213  * StructuredTaskScopes form a tree where parent-child relations are established
 214  * implicitly when opening a new task scope:
 215  * <ul>
 216  *   <li> A parent-child relation is established when a thread started in a task scope
 217  *   opens its own task scope. A thread started in task scope "A" that opens task scope
 218  *   "B" establishes a parent-child relation where task scope "A" is the parent of task
 219  *   scope "B".
 220  *   <li> A parent-child relation is established with nesting. If a thread opens task
 221  *   scope "B", then opens task scope "C" (before it closes "B"), then the enclosing task
 222  *   scope "B" is the parent of the nested task scope "C".
 223  * </ul>
 224  *
 225  * <p> The tree structure supports:
 226  * <ul>
 227  *   <li> Inheritance of {@linkplain ExtentLocal extent-local} variables across threads.
 228  *   <li> Confinement checks. The phrase "threads contained in the task scope" in method
 229  *   descriptions means threads started in the task scope or descendant scopes.
 230  * </ul>
 231  *
 232  * <p> The following example demonstrates the inheritance of an extent-local variable. An
 233  * extent local {@code NAME} is bound to the value "duke". A StructuredTaskScope is created
 234  * and its {@code fork} method invoked to start a thread to execute {@code childTask}.
 235  * The thread inherits the extent-local {@linkplain ExtentLocal.Carrier bindings} captured
 236  * when creating the task scope. The code in {@code childTask} uses the value of the
 237  * extent-local and so reads the value "duke".
 238  * {@snippet lang=java :
 239  *     private static final ExtentLocal<String> NAME = ExtentLocal.newInstance();
 240  *
 241  *     // @link substring="where" target="ExtentLocal#where" :
 242  *     ExtentLocal.where(NAME, "duke").run(() -> {
 243  *         try (var scope = new StructuredTaskScope<String>()) {
 244  *
 245  *             scope.fork(() -> childTask());           // @highlight substring="fork"
 246  *             ...
 247  *          }
 248  *     });
 249  *
 250  *     ...
 251  *
 252  *     String childTask() {
 253  *         String name = NAME.get();   // "duke"    // @highlight substring="get"
 254  *         ...
 255  *     }
 256  * }
 257  *
 258  * <p> {@code StructuredTaskScope} does not define APIs that exposes the tree structure
 259  * at this time.
 260  *
 261  * <p> Unless otherwise specified, passing a {@code null} argument to a constructor
 262  * or method in this class will cause a {@link NullPointerException} to be thrown.
 263  *
 264  * <h2>Memory consistency effects</h2>
 265  *
 266  * <p> Actions in the owner thread of, or a thread contained in, the task scope prior to
 267  * {@linkplain #fork forking} of a {@code Callable} task
 268  * <a href="../../../../java.base/java/util/concurrent/package-summary.html#MemoryVisibility">
 269  * <i>happen-before</i></a> any actions taken by that task, which in turn <i>happen-before</i>
 270  * the task result is retrieved via its {@code Future}, or <i>happen-before</i> any actions
 271  * taken in a thread after {@linkplain #join() joining} of the task scope.
 272  *
 273  * @jls 17.4.5 Happens-before Order
 274  *
 275  * @param <T> the result type of tasks executed in the scope
 276  * @since 19
 277  */
 278 public class StructuredTaskScope<T> implements AutoCloseable {
 279     private static final VarHandle FUTURES;
 280     static {
 281         try {
 282             MethodHandles.Lookup l = MethodHandles.lookup();
 283             FUTURES = l.findVarHandle(StructuredTaskScope.class, "futures", Set.class);
 284         } catch (Exception e) {
 285             throw new InternalError(e);
 286         }
 287     }
 288 
 289     private final ThreadFactory factory;
 290     private final ThreadFlock flock;
 291     private final ReentrantLock shutdownLock = new ReentrantLock();
 292 
 293     // lazily created set of Future objects with threads waiting in Future::get
 294     private volatile Set<Future<?>> futures;
 295 
 296     // set by owner when it forks, reset by owner when it joins
 297     private boolean needJoin;
 298 
 299     // states: OPEN -> SHUTDOWN -> CLOSED
 300     private static final int OPEN     = 0;   // initial state
 301     private static final int SHUTDOWN = 1;
 302     private static final int CLOSED   = 2;
 303 
 304     // scope state, set by owner, read by any thread
 305     private volatile int state;
 306 
 307     /**
 308      * Creates a structured task scope with the given name and thread factory. The task
 309      * scope is optionally named for the purposes of monitoring and management. The thread
 310      * factory is used to {@link ThreadFactory#newThread(Runnable) create} threads when
 311      * tasks are {@linkplain #fork(Callable) forked}. The task scope is owned by the
 312      * current thread.
 313      *
 314      * <p> This method captures the current thread's {@linkplain ExtentLocal extent-local}
 315      * bindings for inheritance by threads created in the task scope. The
 316      * <a href="#TreeStructure">Tree Structure</a> section in the class description
 317      * details how parent-child relations are established implicitly for the purpose of
 318      * inheritance of extent-local bindings.
 319      *
 320      * @param name the name of the task scope, can be null
 321      * @param factory the thread factory
 322      */
 323     public StructuredTaskScope(String name, ThreadFactory factory) {
 324         this.factory = Objects.requireNonNull(factory, "'factory' is null");
 325         this.flock = ThreadFlock.open(name);
 326     }
 327 
 328     /**
 329      * Creates an unnamed structured task scope that creates virtual threads. The task
 330      * scope is owned by the current thread.
 331      *
 332      * <p> This constructor is equivalent to invoking the 2-arg constructor with a name
 333      * of {@code null} and a thread factory that creates virtual threads.
 334      *
 335      * @throws UnsupportedOperationException if preview features are not enabled
 336      */
 337     public StructuredTaskScope() {
 338         PreviewFeatures.ensureEnabled();
 339         this.factory = Thread.ofVirtual().factory();
 340         this.flock = ThreadFlock.open(null);
 341     }
 342 
 343     /**
 344      * Throws WrongThreadException if the current thread is not the owner.
 345      */
 346     private void ensureOwner() {
 347         if (Thread.currentThread() != flock.owner())
 348             throw new WrongThreadException("Current thread not owner");
 349     }
 350 
 351     /**
 352      * Throws WrongThreadException if the current thread is not the owner
 353      * or a thread contained in the tree.
 354      */
 355     private void ensureOwnerOrContainsThread() {
 356         Thread currentThread = Thread.currentThread();
 357         if (currentThread != flock.owner() && !flock.containsThread(currentThread))
 358             throw new WrongThreadException("Current thread not owner or thread in the tree");
 359     }
 360 
 361     /**
 362      * Tests if the task scope is shutdown.
 363      */
 364     private boolean isShutdown() {
 365         return state >= SHUTDOWN;
 366     }
 367 
 368     /**
 369      * Track the given Future.
 370      */
 371     private void track(Future<?> future) {
 372         // create the set of Futures if not already created
 373         Set<Future<?>> futures = this.futures;
 374         if (futures == null) {
 375             futures = ConcurrentHashMap.newKeySet();
 376             if (!FUTURES.compareAndSet(this, null, futures)) {
 377                 // lost the race
 378                 futures = this.futures;
 379             }
 380         }
 381         futures.add(future);
 382     }
 383 
 384     /**
 385      * Stop tracking the Future.
 386      */
 387     private void untrack(Future<?> future) {
 388         assert futures != null;
 389         futures.remove(future);
 390     }
 391 
 392     /**
 393      * Invoked when a task completes before the scope is shut down.
 394      *
 395      * <p> The {@code handleComplete} method should be thread safe. It may be invoked by
 396      * several threads concurrently.
 397      *
 398      * @implSpec The default implementation does nothing.
 399      *
 400      * @param future the completed task
 401      */
 402     protected void handleComplete(Future<T> future) { }
 403 
 404     /**
 405      * Starts a new thread to run the given task.
 406      *
 407      * <p> The new thread is created with the task scope's {@link ThreadFactory}. It
 408      * inherits the current thread's {@linkplain ExtentLocal extent-local} bindings. The
 409      * bindings must match the bindings captured when the task scope was created.
 410      *
 411      * <p> If the task completes before the task scope is {@link #shutdown() shutdown}
 412      * then the {@link #handleComplete(Future) handle} method is invoked to consume the
 413      * completed task. The {@code handleComplete} method is run when the task completes
 414      * with a result or exception. If the {@code Future} {@link Future#cancel(boolean)
 415      * cancel} method is used the cancel a task before the task scope is shut down, then
 416      * the {@code handleComplete} method is run by the thread that invokes {@code cancel}.
 417      * If the task scope shuts down at or around the same time that the task completes or
 418      * is cancelled then the {@code handleComplete} method may or may not be invoked.
 419      *
 420      * <p> If this task scope is {@linkplain #shutdown() shutdown} (or in the process
 421      * of shutting down) then {@code fork} returns a {@code Future} representing a {@link
 422      * Future.State#CANCELLED cancelled} task that was not run.
 423      *
 424      * <p> This method may only be invoked by the task scope owner or threads contained
 425      * in the task scope. The {@link Future#cancel(boolean) cancel} method of the returned
 426      * {@code Future} object is also restricted to the task scope owner or threads contained
 427      * in the task scope. The {@code cancel} method throws {@link WrongThreadException}
 428      * if invoked from another thread. All other methods on the returned {@code Future}
 429      * object, such as {@link Future#get() get}, are not restricted.
 430      *
 431      * @param task the task to run
 432      * @param <U> the result type
 433      * @return a future
 434      * @throws IllegalStateException if this task scope is closed
 435      * @throws WrongThreadException if the current thread is not the owner or a thread
 436      * contained in the task scope
 437      * @throws StructureViolationException if the current extent-local bindings are not
 438      * the same as when the task scope was created
 439      * @throws RejectedExecutionException if the thread factory rejected creating a
 440      * thread to run the task
 441      */
 442     public <U extends T> Future<U> fork(Callable<? extends U> task) {
 443         Objects.requireNonNull(task, "'task' is null");
 444 
 445         // create future
 446         var future = new FutureImpl<U>(this, task);
 447 
 448         boolean shutdown = (state >= SHUTDOWN);
 449 
 450         if (!shutdown) {
 451             // create thread
 452             Thread thread = factory.newThread(future);
 453             if (thread == null) {
 454                 throw new RejectedExecutionException("Rejected by thread factory");
 455             }
 456 
 457             // attempt to start the thread
 458             try {
 459                 flock.start(thread);
 460             } catch (IllegalStateException e) {
 461                 // shutdown or in the process of shutting down
 462                 shutdown = true;
 463             }
 464         }
 465 
 466         if (shutdown) {
 467             if (state == CLOSED) {
 468                 throw new IllegalStateException("Task scope is closed");
 469             } else {
 470                 future.cancel(false);
 471             }
 472         }
 473 
 474         // if owner forks then it will need to join
 475         if (Thread.currentThread() == flock.owner() && !needJoin) {
 476             needJoin = true;
 477         }
 478 
 479         return future;
 480     }
 481 
 482     /**
 483      * Wait for all threads to finish or the task scope to shut down.
 484      */
 485     private void implJoin(Duration timeout)
 486         throws InterruptedException, TimeoutException
 487     {
 488         ensureOwner();
 489         needJoin = false;
 490         int s = state;
 491         if (s >= SHUTDOWN) {
 492             if (s == CLOSED)
 493                 throw new IllegalStateException("Task scope is closed");
 494             return;
 495         }
 496 
 497         // wait for all threads, wakeup, interrupt, or timeout
 498         if (timeout != null) {
 499             flock.awaitAll(timeout);
 500         } else {
 501             flock.awaitAll();
 502         }
 503     }
 504 
 505     /**
 506      * Wait for all threads to finish or the task scope to shut down. This method waits
 507      * until all threads started in the task scope finish execution (of both task and
 508      * {@link #handleComplete(Future) handleComplete} method), the {@link #shutdown()
 509      * shutdown} method is invoked to shut down the task scope, or the current thread
 510      * is {@linkplain Thread#interrupt() interrupted}.
 511      *
 512      * <p> This method may only be invoked by the task scope owner.
 513      *
 514      * @return this task scope
 515      * @throws IllegalStateException if this task scope is closed
 516      * @throws WrongThreadException if the current thread is not the owner
 517      * @throws InterruptedException if interrupted while waiting
 518      */
 519     public StructuredTaskScope<T> join() throws InterruptedException {
 520         try {
 521             implJoin(null);
 522         } catch (TimeoutException e) {
 523             throw new InternalError();
 524         }
 525         return this;
 526     }
 527 
 528     /**
 529      * Wait for all threads to finish or the task scope to shut down, up to the given
 530      * deadline. This method waits until all threads started in the task scope finish
 531      * execution (of both task and {@link #handleComplete(Future) handleComplete} method),
 532      * the {@link #shutdown() shutdown} method is invoked to shut down the task scope,
 533      * the current thread is {@linkplain Thread#interrupt() interrupted}, or the deadline
 534      * is reached.
 535      *
 536      * <p> This method may only be invoked by the task scope owner.
 537      *
 538      * @param deadline the deadline
 539      * @return this task scope
 540      * @throws IllegalStateException if this task scope is closed
 541      * @throws WrongThreadException if the current thread is not the owner
 542      * @throws InterruptedException if interrupted while waiting
 543      * @throws TimeoutException if the deadline is reached while waiting
 544      */
 545     public StructuredTaskScope<T> joinUntil(Instant deadline)
 546         throws InterruptedException, TimeoutException
 547     {
 548         Duration timeout = Duration.between(Instant.now(), deadline);
 549         implJoin(timeout);
 550         return this;
 551     }
 552 
 553     /**
 554      * Cancel all tracked Future objects.
 555      */
 556     private void cancelTrackedFutures() {
 557         Set<Future<?>> futures = this.futures;
 558         if (futures != null) {
 559             futures.forEach(f -> f.cancel(false));
 560         }
 561     }
 562 
 563     /**
 564      * Interrupt all unfinished threads.
 565      */
 566     private void implInterruptAll() {
 567         flock.threads().forEach(t -> {
 568             if (t != Thread.currentThread()) {
 569                 t.interrupt();
 570             }
 571         });
 572     }
 573 
 574     @SuppressWarnings("removal")
 575     private void interruptAll() {
 576         if (System.getSecurityManager() == null) {
 577             implInterruptAll();
 578         } else {
 579             PrivilegedAction<Void> pa = () -> {
 580                 implInterruptAll();
 581                 return null;
 582             };
 583             AccessController.doPrivileged(pa);
 584         }
 585     }
 586 
 587     /**
 588      * Shutdown the task scope if not already shutdown. Return true if this method
 589      * shutdowns the task scope, false if already shutdown.
 590      */
 591     private boolean implShutdown() {
 592         if (state < SHUTDOWN) {
 593             shutdownLock.lock();
 594             try {
 595                 if (state < SHUTDOWN) {
 596 
 597                     // prevent new threads from starting
 598                     flock.shutdown();
 599 
 600                     // wakeup any threads waiting in Future::get
 601                     cancelTrackedFutures();
 602 
 603                     // interrupt all unfinished threads
 604                     interruptAll();
 605 
 606                     state = SHUTDOWN;
 607                     return true;
 608                 }
 609             } finally {
 610                 shutdownLock.unlock();
 611             }
 612         }
 613         assert state >= SHUTDOWN;
 614         return false;
 615     }
 616 
 617     /**
 618      * Shut down the task scope without closing it. Shutting down a task scope prevents
 619      * new threads from starting, interrupts all unfinished threads, and causes the
 620      * {@link #join() join} method to wakeup. Shutdown is useful for cases where the
 621      * results of unfinished subtasks are no longer needed.
 622      *
 623      * <p> More specifically, this method:
 624      * <ul>
 625      * <li> {@linkplain Future#cancel(boolean) Cancels} the tasks that have threads
 626      * {@linkplain Future#get() waiting} on a result so that the waiting threads wakeup.
 627      * <li> {@linkplain Thread#interrupt() Interrupts} all unfinished threads in the
 628      * task scope (except the current thread).
 629      * <li> Wakes up the owner if it is waiting in {@link #join()} or {@link
 630      * #joinUntil(Instant)}. If the owner is not waiting then its next call to {@code
 631      * join} or {@code joinUntil} will return immediately.
 632      * </ul>
 633      *
 634      * <p> When this method completes then the {@code Future} objects for all tasks will
 635      * be {@linkplain Future#isDone() done}, normally or abnormally. There may still
 636      * be threads that have not finished because they are executing code that did not
 637      * respond (or respond promptly) to thread interrupt. This method does not wait
 638      * for these threads. When the owner invokes the {@link #close() close} method
 639      * to close the task scope then it will wait for the remaining threads to finish.
 640      *
 641      * <p> This method may only be invoked by the task scope owner or threads contained
 642      * in the task scope.
 643      *
 644      * @throws IllegalStateException if this task scope is closed
 645      * @throws WrongThreadException if the current thread is not the owner or
 646      * a thread contained in the task scope
 647      */
 648     public void shutdown() {
 649         ensureOwnerOrContainsThread();
 650         if (state == CLOSED)
 651             throw new IllegalStateException("Task scope is closed");
 652         if (implShutdown())
 653             flock.wakeup();
 654     }
 655 
 656     /**
 657      * Closes this task scope.
 658      *
 659      * <p> This method first shuts down the task scope (as if by invoking the {@link
 660      * #shutdown() shutdown} method). It then waits for the threads executing any
 661      * unfinished tasks to finish. If interrupted then this method will continue to
 662      * wait for the threads to finish before completing with the interrupt status set.
 663      *
 664      * <p> This method may only be invoked by the task scope owner. If the task scope
 665      * is already closed then the owner invoking this method has no effect.
 666      *
 667      * <p> A {@code StructuredTaskScope} is intended to be used in a <em>structured
 668      * manner</em>. If this method is called to close a task scope before nested task
 669      * scopes are closed then it closes the underlying construct of each nested task scope
 670      * (in the reverse order that they were created in), closes this task scope, and then
 671      * throws {@link StructureViolationException}.
 672      *
 673      * Similarly, if called to close a task scope that <em>encloses</em> {@linkplain
 674      * ExtentLocal.Carrier#run(Runnable) operations} with extent-local bindings then
 675      * it also throws {@code StructureViolationException} after closing the task scope.
 676      *
 677      * If a thread terminates without first closing task scopes that it owns then
 678      * termination will cause the underlying construct of each of its open tasks scopes to
 679      * be closed. Closing is performed in the reverse order that the task scopes were
 680      * created in. Thread termination may therefore be delayed when the owner has to wait
 681      * for threads forked in these task scopes to finish.
 682      *
 683      * @throws IllegalStateException thrown after closing the task scope if the owner
 684      * did not invoke join after forking
 685      * @throws WrongThreadException if the current thread is not the owner
 686      * @throws StructureViolationException if a structure violation was detected
 687      */
 688     @Override
 689     public void close() {
 690         ensureOwner();
 691         if (state == CLOSED)
 692             return;
 693 
 694         try {
 695             implShutdown();
 696             flock.close();
 697         } finally {
 698             state = CLOSED;
 699         }
 700 
 701         if (needJoin) {
 702             throw new IllegalStateException("Owner did not invoke join or joinUntil after fork");
 703         }
 704     }
 705 
 706     @Override
 707     public String toString() {
 708         StringBuilder sb = new StringBuilder();
 709         String name = flock.name();
 710         if (name != null) {
 711             sb.append(name);
 712             sb.append('/');
 713         }
 714         sb.append(Objects.toIdentityString(this));
 715         int s = state;
 716         if (s == CLOSED)
 717             sb.append("/closed");
 718         else if (s == SHUTDOWN)
 719             sb.append("/shutdown");
 720         return sb.toString();
 721     }
 722 
 723     /**
 724      * The Future implementation returned by the fork methods. Most methods are
 725      * overridden to support cancellation when the task scope is shutdown.
 726      * The blocking get methods register the Future with the task scope so that they
 727      * are cancelled when the task scope shuts down.
 728      */
 729     private static final class FutureImpl<V> extends FutureTask<V> {
 730         private final StructuredTaskScope<V> scope;
 731 
 732         @SuppressWarnings("unchecked")
 733         FutureImpl(StructuredTaskScope<? super V> scope, Callable<? extends V> task) {
 734             super((Callable<V>) task);
 735             this.scope = (StructuredTaskScope<V>) scope;
 736         }
 737 
 738         @Override
 739         protected void done() {
 740             if (!scope.isShutdown()) {
 741                 scope.handleComplete(this);
 742             }
 743         }
 744 
 745         private void cancelIfShutdown() {
 746             if (scope.isShutdown() && !super.isDone()) {
 747                 super.cancel(false);
 748             }
 749         }
 750 
 751         @Override
 752         public boolean isDone() {
 753             cancelIfShutdown();
 754             return super.isDone();
 755         }
 756 
 757         @Override
 758         public boolean isCancelled() {
 759             cancelIfShutdown();
 760             return super.isCancelled();
 761         }
 762 
 763         @Override
 764         public boolean cancel(boolean mayInterruptIfRunning) {
 765             scope.ensureOwnerOrContainsThread();
 766             cancelIfShutdown();
 767             return super.cancel(mayInterruptIfRunning);
 768         }
 769 
 770         @Override
 771         public V get() throws InterruptedException, ExecutionException {
 772             if (super.isDone())
 773                 return super.get();
 774             scope.track(this);
 775             try {
 776                 cancelIfShutdown();
 777                 return super.get();
 778             } finally {
 779                 scope.untrack(this);
 780             }
 781         }
 782 
 783         @Override
 784         public V get(long timeout, TimeUnit unit)
 785                 throws InterruptedException, ExecutionException, TimeoutException {
 786             Objects.requireNonNull(unit);
 787             if (super.isDone())
 788                 return super.get();
 789             scope.track(this);
 790             try {
 791                 cancelIfShutdown();
 792                 return super.get(timeout, unit);
 793             } finally {
 794                 scope.untrack(this);
 795             }
 796         }
 797 
 798         @Override
 799         public V resultNow() {
 800             cancelIfShutdown();
 801             return super.resultNow();
 802         }
 803 
 804         @Override
 805         public Throwable exceptionNow() {
 806             cancelIfShutdown();
 807             return super.exceptionNow();
 808         }
 809 
 810         @Override
 811         public State state() {
 812             cancelIfShutdown();
 813             return super.state();
 814         }
 815 
 816         @Override
 817         public String toString() {
 818             cancelIfShutdown();
 819             return super.toString();
 820         }
 821     }
 822 
 823     /**
 824      * Maps a Future.State to an int that can be compared.
 825      * RUNNING < CANCELLED < FAILED < SUCCESS.
 826      */
 827     private static int futureStateToInt(Future.State s) {
 828         return switch (s) {
 829             case RUNNING   -> 0;
 830             case CANCELLED -> 1;
 831             case FAILED    -> 2;
 832             case SUCCESS   -> 3;
 833         };
 834     }
 835 
 836     // RUNNING < CANCELLED < FAILED < SUCCESS
 837     private static final Comparator<Future.State> FUTURE_STATE_COMPARATOR =
 838             Comparator.comparingInt(StructuredTaskScope::futureStateToInt);
 839 
 840     /**
 841      * A {@code StructuredTaskScope} that captures the result of the first subtask to
 842      * complete successfully. Once captured, it invokes the {@linkplain #shutdown() shutdown}
 843      * method to interrupt unfinished threads and wakeup the owner. The policy
 844      * implemented by this class is intended for cases where the result of any subtask
 845      * will do ("invoke any") and where the results of other unfinished subtask are no
 846      * longer needed.
 847      *
 848      * <p> Unless otherwise specified, passing a {@code null} argument to a method
 849      * in this class will cause a {@link NullPointerException} to be thrown.
 850      *
 851      * @param <T> the result type
 852      * @since 19
 853      */
 854     public static final class ShutdownOnSuccess<T> extends StructuredTaskScope<T> {
 855         private static final VarHandle FUTURE;
 856         static {
 857             try {
 858                 MethodHandles.Lookup l = MethodHandles.lookup();
 859                 FUTURE = l.findVarHandle(ShutdownOnSuccess.class, "future", Future.class);
 860             } catch (Exception e) {
 861                 throw new InternalError(e);
 862             }
 863         }
 864         private volatile Future<T> future;
 865 
 866         /**
 867          * Constructs a new {@code ShutdownOnSuccess} with the given name and thread factory.
 868          * The task scope is optionally named for the purposes of monitoring and management.
 869          * The thread factory is used to {@link ThreadFactory#newThread(Runnable) create}
 870          * threads when tasks are {@linkplain #fork(Callable) forked}. The task scope is
 871          * owned by the current thread.
 872          *
 873          * <p> This method captures the current thread's {@linkplain ExtentLocal extent-local}
 874          * bindings for inheritance by threads created in the task scope. The
 875          * <a href="StructuredTaskScope.html#TreeStructure">Tree Structure</a> section in
 876          * the class description details how parent-child relations are established
 877          * implicitly for the purpose of inheritance of extent-local bindings.
 878          *
 879          * @param name the name of the task scope, can be null
 880          * @param factory the thread factory
 881          */
 882         public ShutdownOnSuccess(String name, ThreadFactory factory) {
 883             super(name, factory);
 884         }
 885 
 886         /**
 887          * Constructs a new unnamed {@code ShutdownOnSuccess} that creates virtual threads.
 888          *
 889          * <p> This constructor is equivalent to invoking the 2-arg constructor with a
 890          * name of {@code null} and a thread factory that creates virtual threads.
 891          */
 892         public ShutdownOnSuccess() {
 893             super(null, Thread.ofVirtual().factory());
 894         }
 895 
 896         /**
 897          * Shut down the given task scope when invoked for the first time with a {@code
 898          * Future} for a task that completed with a result.
 899          *
 900          * @param future the completed task
 901          * @see #shutdown()
 902          * @see Future.State#SUCCESS
 903          */
 904         @Override
 905         protected void handleComplete(Future<T> future) {
 906             Future.State state = future.state();
 907             if (state == Future.State.RUNNING) {
 908                 throw new IllegalArgumentException("Task is not completed");
 909             }
 910 
 911             Future<T> f;
 912             while (((f = this.future) == null)
 913                     || FUTURE_STATE_COMPARATOR.compare(f.state(), state) < 0) {
 914                 if (FUTURE.compareAndSet(this, f, future)) {
 915                     if (state == Future.State.SUCCESS)
 916                         shutdown();
 917                     break;
 918                 }
 919             }
 920         }
 921 
 922         /**
 923          * {@inheritDoc}
 924          * @return this task scope
 925          * @throws IllegalStateException {@inheritDoc}
 926          * @throws WrongThreadException {@inheritDoc}
 927          */
 928         @Override
 929         public ShutdownOnSuccess<T> join() throws InterruptedException {
 930             super.join();
 931             return this;
 932         }
 933 
 934         /**
 935          * {@inheritDoc}
 936          * @return this task scope
 937          * @throws IllegalStateException {@inheritDoc}
 938          * @throws WrongThreadException {@inheritDoc}
 939          */
 940         @Override
 941         public ShutdownOnSuccess<T> joinUntil(Instant deadline)
 942             throws InterruptedException, TimeoutException
 943         {
 944             super.joinUntil(deadline);
 945             return this;
 946         }
 947 
 948         /**
 949          * {@return the result of the first subtask that completed with a result}
 950          *
 951          * <p> When no subtask completed with a result but a task completed with an
 952          * exception then {@code ExecutionException} is thrown with the exception as the
 953          * {@linkplain Throwable#getCause() cause}. If only cancelled subtasks were
 954          * notified to the {@code handleComplete} method then {@code CancellationException}
 955          * is thrown.
 956          *
 957          * @apiNote This method is intended to be invoked by the task scope owner after it
 958          * has invoked {@link #join() join} (or {@link #joinUntil(Instant) joinUntil}).
 959          * A future release may add enforcement to prevent the method being called by
 960          * other threads or before joining.
 961          *
 962          * @throws ExecutionException if no subtasks completed with a result but a subtask
 963          * completed with an exception
 964          * @throws CancellationException if all subtasks were cancelled
 965          * @throws IllegalStateException if the handle method was not invoked with a
 966          * completed subtask
 967          */
 968         public T result() throws ExecutionException {
 969             Future<T> f = future;
 970             if (f == null) {
 971                 throw new IllegalStateException("No completed subtasks");
 972             }
 973             return switch (f.state()) {
 974                 case SUCCESS   -> f.resultNow();
 975                 case FAILED    -> throw new ExecutionException(f.exceptionNow());
 976                 case CANCELLED -> throw new CancellationException();
 977                 default        -> throw new InternalError("Unexpected state: " + f);
 978             };
 979 
 980         }
 981 
 982         /**
 983          * Returns the result of the first subtask that completed with a result, otherwise
 984          * throws an exception produced by the given exception supplying function.
 985          *
 986          * <p> When no subtask completed with a result but a subtask completed with an
 987          * exception then the exception supplying function is invoked with the exception.
 988          * If only cancelled subtasks were notified to the {@code handleComplete} method
 989          * then the exception supplying function is invoked with a {@code CancellationException}.
 990          *
 991          * @apiNote This method is intended to be invoked by the task scope owner after it
 992          * has invoked {@link #join() join} (or {@link #joinUntil(Instant) joinUntil}).
 993          * A future release may add enforcement to prevent the method being called by
 994          * other threads or before joining.
 995          *
 996          * @param esf the exception supplying function
 997          * @param <X> type of the exception to be thrown
 998          * @return the result of the first subtask that completed with a result
 999          * @throws X if no subtask completed with a result
1000          * @throws IllegalStateException if the handle method was not invoked with a
1001          * completed subtask
1002          */
1003         public <X extends Throwable> T result(Function<Throwable, ? extends X> esf) throws X {
1004             Objects.requireNonNull(esf);
1005             Future<T> f = future;
1006             if (f == null) {
1007                 throw new IllegalStateException("No completed subtasks");
1008             }
1009             Future.State state = f.state();
1010             if (state == Future.State.SUCCESS) {
1011                 return f.resultNow();
1012             } else {
1013                 Throwable throwable = (state == Future.State.FAILED)
1014                         ? f.exceptionNow()
1015                         : new CancellationException();
1016                 X ex = esf.apply(throwable);
1017                 Objects.requireNonNull(ex, "esf returned null");
1018                 throw ex;
1019             }
1020         }
1021     }
1022 
1023     /**
1024      * A {@code StructuredTaskScope} that captures the exception of the first subtask to
1025      * complete abnormally. Once captured, it invokes the {@linkplain #shutdown() shutdown}
1026      * method to interrupt unfinished threads and wakeup the owner. The policy implemented
1027      * by this class is intended for cases where the results for all subtasks are required
1028      * ("invoke all"); if any subtask fails then the results of other unfinished subtasks
1029      * are no longer needed.
1030      *
1031      * <p> Unless otherwise specified, passing a {@code null} argument to a method
1032      * in this class will cause a {@link NullPointerException} to be thrown.
1033      *
1034      * @since 19
1035      */
1036     public static final class ShutdownOnFailure extends StructuredTaskScope<Object> {
1037         private static final VarHandle FUTURE;
1038         static {
1039             try {
1040                 MethodHandles.Lookup l = MethodHandles.lookup();
1041                 FUTURE = l.findVarHandle(ShutdownOnFailure.class, "future", Future.class);
1042             } catch (Exception e) {
1043                 throw new InternalError(e);
1044             }
1045         }
1046         private volatile Future<Object> future;
1047 
1048         /**
1049          * Constructs a new {@code ShutdownOnFailure} with the given name and thread factory.
1050          * The task scope is optionally named for the purposes of monitoring and management.
1051          * The thread factory is used to {@link ThreadFactory#newThread(Runnable) create}
1052          * threads when tasks are {@linkplain #fork(Callable) forked}. The task scope
1053          * is owned by the current thread.
1054          *
1055          * <p> This method captures the current thread's {@linkplain ExtentLocal extent-local}
1056          * bindings for inheritance by threads created in the task scope. The
1057          * <a href="StructuredTaskScope.html#TreeStructure">Tree Structure</a> section in
1058          * the class description details how parent-child relations are established
1059          * implicitly for the purpose of inheritance of extent-local bindings.
1060          *
1061          * @param name the name of the task scope, can be null
1062          * @param factory the thread factory
1063          */
1064         public ShutdownOnFailure(String name, ThreadFactory factory) {
1065             super(name, factory);
1066         }
1067 
1068         /**
1069          * Constructs a new unnamed {@code ShutdownOnFailure} that creates virtual threads.
1070          *
1071          * <p> This constructor is equivalent to invoking the 2-arg constructor with a
1072          * name of {@code null} and a thread factory that creates virtual threads.
1073          */
1074         public ShutdownOnFailure() {
1075             super(null, Thread.ofVirtual().factory());
1076         }
1077 
1078         /**
1079          * Shut down the given task scope when invoked for the first time with a {@code
1080          * Future} for a task that completed abnormally (exception or cancelled).
1081          *
1082          * @param future the completed task
1083          * @see #shutdown()
1084          * @see Future.State#FAILED
1085          * @see Future.State#CANCELLED
1086          */
1087         @Override
1088         protected void handleComplete(Future<Object> future) {
1089             Future.State state = future.state();
1090             if (state == Future.State.RUNNING) {
1091                 throw new IllegalArgumentException("Task is not completed");
1092             } else if (state == Future.State.SUCCESS) {
1093                 return;
1094             }
1095 
1096             // A failed task overrides a cancelled task.
1097             // The first failed or cancelled task causes the scope to shutdown.
1098             Future<Object> f;
1099             while (((f = this.future) == null)
1100                     || FUTURE_STATE_COMPARATOR.compare(f.state(), state) < 0) {
1101                 if (FUTURE.compareAndSet(this, f, future)) {
1102                     shutdown();
1103                     break;
1104                 }
1105             }
1106         }
1107 
1108         /**
1109          * {@inheritDoc}
1110          * @return this task scope
1111          * @throws IllegalStateException {@inheritDoc}
1112          * @throws WrongThreadException {@inheritDoc}
1113          */
1114         @Override
1115         public ShutdownOnFailure join() throws InterruptedException {
1116             super.join();
1117             return this;
1118         }
1119 
1120         /**
1121          * {@inheritDoc}
1122          * @return this task scope
1123          * @throws IllegalStateException {@inheritDoc}
1124          * @throws WrongThreadException {@inheritDoc}
1125          */
1126         @Override
1127         public ShutdownOnFailure joinUntil(Instant deadline)
1128             throws InterruptedException, TimeoutException
1129         {
1130             super.joinUntil(deadline);
1131             return this;
1132         }
1133 
1134         /**
1135          * Returns the exception for the first subtask that completed with an exception.
1136          * If no subtask completed with an exception but cancelled subtasks were notified
1137          * to the {@code handleComplete} method then a {@code CancellationException}
1138          * is returned. If no subtasks completed abnormally then an empty {@code Optional}
1139          * is returned.
1140          *
1141          * @apiNote This method is intended to be invoked by the task scope owner after it
1142          * has invoked {@link #join() join} (or {@link #joinUntil(Instant) joinUntil}).
1143          * A future release may add enforcement to prevent the method being called by
1144          * other threads or before joining.
1145          *
1146          * @return the exception for a subtask that completed abnormally or an empty
1147          * optional if no subtasks completed abnormally
1148          */
1149         public Optional<Throwable> exception() {
1150             Future<Object> f = future;
1151             if (f != null) {
1152                 Throwable throwable = (f.state() == Future.State.FAILED)
1153                         ? f.exceptionNow()
1154                         : new CancellationException();
1155                 return Optional.of(throwable);
1156             } else {
1157                 return Optional.empty();
1158             }
1159         }
1160 
1161         /**
1162          * Throws if a subtask completed abnormally. If any subtask completed with an
1163          * exception then {@code ExecutionException} is thrown with the exception of the
1164          * first subtask to fail as the {@linkplain Throwable#getCause() cause}. If no
1165          * subtask completed with an exception but cancelled subtasks were notified to the
1166          * {@code handleComplete} method then {@code CancellationException} is thrown.
1167          * This method does nothing if no subtasks completed abnormally.
1168          *
1169          * @apiNote This method is intended to be invoked by the task scope owner after it
1170          * has invoked {@link #join() join} (or {@link #joinUntil(Instant) joinUntil}).
1171          * A future release may add enforcement to prevent the method being called by
1172          * other threads or before joining.
1173          *
1174          * @throws ExecutionException if a subtask completed with an exception
1175          * @throws CancellationException if no subtasks completed with an exception but
1176          * subtasks were cancelled
1177          */
1178         public void throwIfFailed() throws ExecutionException {
1179             Future<Object> f = future;
1180             if (f != null) {
1181                 if (f.state() == Future.State.FAILED) {
1182                     throw new ExecutionException(f.exceptionNow());
1183                 } else {
1184                     throw new CancellationException();
1185                 }
1186             }
1187         }
1188 
1189         /**
1190          * Throws the exception produced by the given exception supplying function if
1191          * a subtask completed abnormally. If any subtask completed with an exception then
1192          * the function is invoked with the exception of the first subtask to fail.
1193          * If no subtask completed with an exception but cancelled subtasks were notified
1194          * to the {@code handleComplete} method then the function is called with a {@code
1195          * CancellationException}. The exception returned by the function is thrown.
1196          * This method does nothing if no subtasks completed abnormally.
1197          *
1198          * @apiNote This method is intended to be invoked by the task scope owner after it
1199          * has invoked {@link #join() join} (or {@link #joinUntil(Instant) joinUntil}).
1200          * A future release may add enforcement to prevent the method being called by
1201          * other threads or before joining.
1202          *
1203          * @param esf the exception supplying function
1204          * @param <X> type of the exception to be thrown
1205          * @throws X produced by the exception supplying function
1206          */
1207         public <X extends Throwable>
1208         void throwIfFailed(Function<Throwable, ? extends X> esf) throws X {
1209             Objects.requireNonNull(esf);
1210             Future<Object> f = future;
1211             if (f != null) {
1212                 Throwable throwable = (f.state() == Future.State.FAILED)
1213                         ? f.exceptionNow()
1214                         : new CancellationException();
1215                 X ex = esf.apply(throwable);
1216                 Objects.requireNonNull(ex, "esf returned null");
1217                 throw ex;
1218             }
1219         }
1220     }
1221 }
--- EOF ---