1 /*
   2  * Copyright (c) 2021, 2022, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package jdk.incubator.concurrent;
  26 
  27 import java.lang.invoke.MethodHandles;
  28 import java.lang.invoke.VarHandle;
  29 import java.security.AccessController;
  30 import java.security.PrivilegedAction;
  31 import java.time.Duration;
  32 import java.time.Instant;
  33 import java.util.Comparator;
  34 import java.util.Objects;
  35 import java.util.Optional;
  36 import java.util.Set;
  37 import java.util.concurrent.Callable;
  38 import java.util.concurrent.CancellationException;
  39 import java.util.concurrent.ConcurrentHashMap;
  40 import java.util.concurrent.ExecutionException;
  41 import java.util.concurrent.Future;
  42 import java.util.concurrent.FutureTask;
  43 import java.util.concurrent.RejectedExecutionException;
  44 import java.util.concurrent.ThreadFactory;
  45 import java.util.concurrent.TimeoutException;
  46 import java.util.concurrent.TimeUnit;
  47 import java.util.concurrent.locks.ReentrantLock;
  48 import java.util.function.Function;
  49 import jdk.internal.misc.PreviewFeatures;
  50 import jdk.internal.misc.ThreadFlock;
  51 
  52 /**
  53  * A basic API for <em>structured concurrency</em>. {@code StructuredTaskScope} supports
  54  * cases where a task splits into several concurrent subtasks, to be executed in their
  55  * own threads, and where the subtasks must complete before the main task continues. A
  56  * {@code StructuredTaskScope} can be used to ensure that the lifetime of a concurrent
  57  * operation is confined by a <em>syntax block</em>, just like that of a sequential
  58  * operation in structured programming.
  59  *
  60  * <h2>Basic usage</h2>
  61  *
  62  * A {@code StructuredTaskScope} is created with one of its public constructors. It defines
  63  * the {@link #fork(Callable) fork} method to start a thread to execute a task, the {@link
  64  * #join() join} method to wait for all threads to finish, and the {@link #close() close}
  65  * method to close the task scope. The API is intended to be used with the {@code
  66  * try-with-resources} construct. The intention is that code in the <em>block</em> uses
  67  * the {@code fork} method to fork threads to execute the subtasks, wait for the threads
  68  * to finish with the {@code join} method, and then <em>process the results</em>.
  69  * Processing of results may include handling or re-throwing of exceptions.
  70  * {@snippet lang=java :
  71  *     try (var scope = new StructuredTaskScope<Object>()) {
  72  *
  73  *         Future<Integer> future1 = scope.fork(task1);   // @highlight substring="fork"
  74  *         Future<String> future2 = scope.fork(task2);    // @highlight substring="fork"
  75  *
  76  *         scope.join();                                  // @highlight substring="join"
  77  *
  78  *         ... process results/exceptions ...
  79  *
  80  *     } // close                                         // @highlight substring="close"
  81  * }
  82  * To ensure correct usage, the {@code join} and {@code close} methods may only be invoked
  83  * by the <em>owner</em> (the thread that opened/created the task scope}, and the
  84  * {@code close} method throws an exception after closing if the owner did not invoke the
  85  * {@code join} method after forking.
  86  *
  87  * <p> {@code StructuredTaskScope} defines the {@link #shutdown() shutdown} method to shut
  88  * down a task scope without closing it. Shutdown is useful for cases where a subtask
  89  * completes with a result (or exception) and the results of other unfinished subtasks are
  90  * no longer needed. If a subtask invokes {@code shutdown} while the owner is waiting in
  91  * the {@code join} method then it will cause {@code join} to wakeup, all unfinished
  92  * threads to be {@linkplain Thread#interrupt() interrupted} and prevents new threads
  93  * from starting in the task scope.
  94  *
  95  * <h2>Subclasses with policies for common cases</h2>
  96  *
  97  * Two subclasses of {@code StructuredTaskScope} are defined to implement policy for
  98  * common cases:
  99  * <ol>
 100  *   <li> {@link ShutdownOnSuccess ShutdownOnSuccess} captures the first result and
 101  *   shuts down the task scope to interrupt unfinished threads and wakeup the owner. This
 102  *   class is intended for cases where the result of any subtask will do ("invoke any")
 103  *   and where there is no need to wait for results of other unfinished tasks. It defines
 104  *   methods to get the first result or throw an exception if all subtasks fail.
 105  *   <li> {@link ShutdownOnFailure ShutdownOnFailure} captures the first exception and
 106  *   shuts down the task scope. This class is intended for cases where the results of all
 107  *   subtasks are required ("invoke all"); if any subtask fails then the results of other
 108  *   unfinished subtasks are no longer needed. If defines methods to throw an exception if
 109  *   any of the subtasks fail.
 110  * </ol>
 111  *
 112  * <p> The following are two examples that use the two classes. In both cases, a pair of
 113  * subtasks are forked to fetch resources from two URL locations "left" and "right". The
 114  * first example creates a ShutdownOnSuccess object to capture the result of the first
 115  * subtask to complete normally, cancelling the other by way of shutting down the task
 116  * scope. The main task waits in {@code join} until either subtask completes with a result
 117  * or both subtasks fail. It invokes {@link ShutdownOnSuccess#result(Function)
 118  * result(Function)} method to get the captured result. If both subtasks fail then this
 119  * method throws a {@code WebApplicationException} with the exception from one of the
 120  * subtasks as the cause.
 121  * {@snippet lang=java :
 122  *     try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
 123  *
 124  *         scope.fork(() -> fetch(left));
 125  *         scope.fork(() -> fetch(right));
 126  *
 127  *         scope.join();
 128  *
 129  *         // @link regex="result(?=\()" target="ShutdownOnSuccess#result" :
 130  *         String result = scope.result(e -> new WebApplicationException(e));
 131  *
 132  *         ...
 133  *     }
 134  * }
 135  * The second example creates a ShutdownOnFailure object to capture the exception of the
 136  * first subtask to fail, cancelling the other by way of shutting down the task scope. The
 137  * main task waits in {@link #joinUntil(Instant)} until both subtasks complete with a
 138  * result, either fails, or a deadline is reached. It invokes {@link
 139  * ShutdownOnFailure#throwIfFailed(Function) throwIfFailed(Function)} to throw an exception
 140  * when either subtask fails. This method is a no-op if no subtasks fail. The main task
 141  * uses {@code Future}'s {@link Future#resultNow() resultNow()} method to retrieve the
 142  * results.
 143  *
 144  * {@snippet lang=java :
 145  *    Instant deadline = ...
 146  *
 147  *    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
 148  *
 149  *         Future<String> future1 = scope.fork(() -> query(left));
 150  *         Future<String> future2 = scope.fork(() -> query(right));
 151  *
 152  *         scope.joinUntil(deadline);
 153  *
 154  *         // @link substring="throwIfFailed" target="ShutdownOnFailure#throwIfFailed" :
 155  *         scope.throwIfFailed(e -> new WebApplicationException(e));
 156  *
 157  *         // both subtasks completed successfully
 158  *         String result = Stream.of(future1, future2)
 159  *                 // @link substring="Future::resultNow" target="Future#resultNow" :
 160  *                 .map(Future::resultNow)
 161  *                 .collect(Collectors.joining(", ", "{ ", " }"));
 162  *
 163  *         ...
 164  *     }
 165  * }
 166  *
 167  * <h2>Extending StructuredTaskScope</h2>
 168  *
 169  * {@code StructuredTaskScope} can be extended, and the {@link #handleComplete(Future)
 170  * handleComplete} overridden, to implement policies other than those implemented by
 171  * {@code ShutdownOnSuccess} and {@code ShutdownOnFailure}. The method may be overridden
 172  * to, for example, collect the results of subtasks that complete with a result and ignore
 173  * subtasks that fail. It may collect exceptions when subtasks fail. It may invoke the
 174  * {@link #shutdown() shutdown} method to shut down and cause {@link #join() join} to
 175  * wakeup when some condition arises.
 176  *
 177  * <p> A subclass will typically define methods to make available results, state, or other
 178  * outcome to code that executes after the {@code join} method. A subclass that collects
 179  * results and ignores subtasks that fail may define a method that returns a collection of
 180  * results. A subclass that implements a policy to shut down when a subtask fails may
 181  * define a method to retrieve the exception of the first subtask to fail.
 182  *
 183  * <p> The following is an example of a {@code StructuredTaskScope} implementation that
 184  * collects the results of subtasks that complete successfully. It defines the method
 185  * <b>{@code results()}</b> to be used by the main task to retrieve the results.
 186  *
 187  * {@snippet lang=java :
 188  *     class MyScope<T> extends StructuredTaskScope<T> {
 189  *         private final Queue<T> results = new ConcurrentLinkedQueue<>();
 190  *
 191  *         MyScope() {
 192  *             super(null, Thread.ofVirtual().factory());
 193  *         }
 194  *
 195  *         @Override
 196  *         // @link substring="handleComplete" target="handleComplete" :
 197  *         protected void handleComplete(Future<T> future) {
 198  *             if (future.state() == Future.State.SUCCESS) {
 199  *                 T result = future.resultNow();
 200  *                 results.add(result);
 201  *             }
 202  *         }
 203  *
 204  *         // Returns a stream of results from the subtasks that completed successfully
 205  *         public Stream<T> results() {     // @highlight substring="results"
 206  *             return results.stream();
 207  *         }
 208  *     }
 209  *  }
 210  *
 211  * <h2><a id="TreeStructure">Tree structure</a></h2>
 212  *
 213  * StructuredTaskScopes form a tree where parent-child relations are established
 214  * implicitly when opening a new task scope:
 215  * <ul>
 216  *   <li> A parent-child relation is established when a thread started in a task scope
 217  *   opens its own task scope. A thread started in task scope "A" that opens task scope
 218  *   "B" establishes a parent-child relation where task scope "A" is the parent of task
 219  *   scope "B".
 220  *   <li> A parent-child relation is established with nesting. If a thread opens task
 221  *   scope "B", then opens task scope "C" (before it closes "B"), then the enclosing task
 222  *   scope "B" is the parent of the nested task scope "C".
 223  * </ul>
 224  *
 225  * <p> The tree structure supports confinement checks. The phrase "threads contained in
 226  * the task scope" in method descriptions means threads started in the task scope or
 227  * descendant scopes. {@code StructuredTaskScope} does not define APIs that exposes the
 228  * tree structure at this time.































 229  *
 230  * <p> Unless otherwise specified, passing a {@code null} argument to a constructor
 231  * or method in this class will cause a {@link NullPointerException} to be thrown.
 232  *
 233  * <h2>Memory consistency effects</h2>
 234  *
 235  * <p> Actions in the owner thread of, or a thread contained in, the task scope prior to
 236  * {@linkplain #fork forking} of a {@code Callable} task
 237  * <a href="../../../../java.base/java/util/concurrent/package-summary.html#MemoryVisibility">
 238  * <i>happen-before</i></a> any actions taken by that task, which in turn <i>happen-before</i>
 239  * the task result is retrieved via its {@code Future}, or <i>happen-before</i> any actions
 240  * taken in a thread after {@linkplain #join() joining} of the task scope.
 241  *
 242  * @jls 17.4.5 Happens-before Order
 243  *
 244  * @param <T> the result type of tasks executed in the scope
 245  * @since 19
 246  */
 247 public class StructuredTaskScope<T> implements AutoCloseable {
 248     private static final VarHandle FUTURES;
 249     static {
 250         try {
 251             MethodHandles.Lookup l = MethodHandles.lookup();
 252             FUTURES = l.findVarHandle(StructuredTaskScope.class, "futures", Set.class);
 253         } catch (Exception e) {
 254             throw new InternalError(e);
 255         }
 256     }
 257 
 258     private final ThreadFactory factory;
 259     private final ThreadFlock flock;
 260     private final ReentrantLock shutdownLock = new ReentrantLock();
 261 
 262     // lazily created set of Future objects with threads waiting in Future::get
 263     private volatile Set<Future<?>> futures;
 264 
 265     // set by owner when it forks, reset by owner when it joins
 266     private boolean needJoin;
 267 
 268     // states: OPEN -> SHUTDOWN -> CLOSED
 269     private static final int OPEN     = 0;   // initial state
 270     private static final int SHUTDOWN = 1;
 271     private static final int CLOSED   = 2;
 272 
 273     // scope state, set by owner, read by any thread
 274     private volatile int state;
 275 
 276     /**
 277      * Creates a structured task scope with the given name and thread factory. The task
 278      * scope is optionally named for the purposes of monitoring and management. The thread
 279      * factory is used to {@link ThreadFactory#newThread(Runnable) create} threads when
 280      * tasks are {@linkplain #fork(Callable) forked}. The task scope is owned by the
 281      * current thread.
 282      *






 283      * @param name the name of the task scope, can be null
 284      * @param factory the thread factory
 285      */
 286     public StructuredTaskScope(String name, ThreadFactory factory) {
 287         this.factory = Objects.requireNonNull(factory, "'factory' is null");
 288         this.flock = ThreadFlock.open(name);
 289     }
 290 
 291     /**
 292      * Creates an unnamed structured task scope that creates virtual threads. The task
 293      * scope is owned by the current thread.
 294      *
 295      * <p> This constructor is equivalent to invoking the 2-arg constructor with a name
 296      * of {@code null} and a thread factory that creates virtual threads.
 297      *
 298      * @throws UnsupportedOperationException if preview features are not enabled
 299      */
 300     public StructuredTaskScope() {
 301         PreviewFeatures.ensureEnabled();
 302         this.factory = Thread.ofVirtual().factory();
 303         this.flock = ThreadFlock.open(null);
 304     }
 305 
 306     /**
 307      * Throws WrongThreadException if the current thread is not the owner.
 308      */
 309     private void ensureOwner() {
 310         if (Thread.currentThread() != flock.owner())
 311             throw new WrongThreadException("Current thread not owner");
 312     }
 313 
 314     /**
 315      * Throws WrongThreadException if the current thread is not the owner
 316      * or a thread contained in the tree.
 317      */
 318     private void ensureOwnerOrContainsThread() {
 319         Thread currentThread = Thread.currentThread();
 320         if (currentThread != flock.owner() && !flock.containsThread(currentThread))
 321             throw new WrongThreadException("Current thread not owner or thread in the tree");
 322     }
 323 
 324     /**
 325      * Tests if the task scope is shutdown.
 326      */
 327     private boolean isShutdown() {
 328         return state >= SHUTDOWN;
 329     }
 330 
 331     /**
 332      * Track the given Future.
 333      */
 334     private void track(Future<?> future) {
 335         // create the set of Futures if not already created
 336         Set<Future<?>> futures = this.futures;
 337         if (futures == null) {
 338             futures = ConcurrentHashMap.newKeySet();
 339             if (!FUTURES.compareAndSet(this, null, futures)) {
 340                 // lost the race
 341                 futures = this.futures;
 342             }
 343         }
 344         futures.add(future);
 345     }
 346 
 347     /**
 348      * Stop tracking the Future.
 349      */
 350     private void untrack(Future<?> future) {
 351         assert futures != null;
 352         futures.remove(future);
 353     }
 354 
 355     /**
 356      * Invoked when a task completes before the scope is shut down.
 357      *
 358      * <p> The {@code handleComplete} method should be thread safe. It may be invoked by
 359      * several threads concurrently.
 360      *
 361      * @implSpec The default implementation does nothing.
 362      *
 363      * @param future the completed task
 364      */
 365     protected void handleComplete(Future<T> future) { }
 366 
 367     /**
 368      * Starts a new thread to run the given task.
 369      *
 370      * <p> The new thread is created with the task scope's {@link ThreadFactory}.


 371      *
 372      * <p> If the task completes before the task scope is {@link #shutdown() shutdown}
 373      * then the {@link #handleComplete(Future) handle} method is invoked to consume the
 374      * completed task. The {@code handleComplete} method is run when the task completes
 375      * with a result or exception. If the {@code Future} {@link Future#cancel(boolean)
 376      * cancel} method is used the cancel a task before the task scope is shut down, then
 377      * the {@code handleComplete} method is run by the thread that invokes {@code cancel}.
 378      * If the task scope shuts down at or around the same time that the task completes or
 379      * is cancelled then the {@code handleComplete} method may or may not be invoked.
 380      *
 381      * <p> If this task scope is {@linkplain #shutdown() shutdown} (or in the process
 382      * of shutting down) then {@code fork} returns a {@code Future} representing a {@link
 383      * Future.State#CANCELLED cancelled} task that was not run.
 384      *
 385      * <p> This method may only be invoked by the task scope owner or threads contained
 386      * in the task scope. The {@link Future#cancel(boolean) cancel} method of the returned
 387      * {@code Future} object is also restricted to the task scope owner or threads contained
 388      * in the task scope. The {@code cancel} method throws {@link WrongThreadException}
 389      * if invoked from another thread. All other methods on the returned {@code Future}
 390      * object, such as {@link Future#get() get}, are not restricted.
 391      *
 392      * @param task the task to run
 393      * @param <U> the result type
 394      * @return a future
 395      * @throws IllegalStateException if this task scope is closed
 396      * @throws WrongThreadException if the current thread is not the owner or a thread
 397      * contained in the task scope


 398      * @throws RejectedExecutionException if the thread factory rejected creating a
 399      * thread to run the task
 400      */
 401     public <U extends T> Future<U> fork(Callable<? extends U> task) {
 402         Objects.requireNonNull(task, "'task' is null");
 403 
 404         // create future
 405         var future = new FutureImpl<U>(this, task);
 406 
 407         boolean shutdown = (state >= SHUTDOWN);
 408 
 409         if (!shutdown) {
 410             // create thread
 411             Thread thread = factory.newThread(future);
 412             if (thread == null) {
 413                 throw new RejectedExecutionException("Rejected by thread factory");
 414             }
 415 
 416             // attempt to start the thread
 417             try {
 418                 flock.start(thread);
 419             } catch (IllegalStateException e) {
 420                 // shutdown or in the process of shutting down
 421                 shutdown = true;
 422             }
 423         }
 424 
 425         if (shutdown) {
 426             if (state == CLOSED) {
 427                 throw new IllegalStateException("Task scope is closed");
 428             } else {
 429                 future.cancel(false);
 430             }
 431         }
 432 
 433         // if owner forks then it will need to join
 434         if (Thread.currentThread() == flock.owner() && !needJoin) {
 435             needJoin = true;
 436         }
 437 
 438         return future;
 439     }
 440 
 441     /**
 442      * Wait for all threads to finish or the task scope to shut down.
 443      */
 444     private void implJoin(Duration timeout)
 445         throws InterruptedException, TimeoutException
 446     {
 447         ensureOwner();
 448         needJoin = false;
 449         int s = state;
 450         if (s >= SHUTDOWN) {
 451             if (s == CLOSED)
 452                 throw new IllegalStateException("Task scope is closed");
 453             return;
 454         }
 455 
 456         // wait for all threads, wakeup, interrupt, or timeout
 457         if (timeout != null) {
 458             flock.awaitAll(timeout);
 459         } else {
 460             flock.awaitAll();
 461         }
 462     }
 463 
 464     /**
 465      * Wait for all threads to finish or the task scope to shut down. This method waits
 466      * until all threads started in the task scope finish execution (of both task and
 467      * {@link #handleComplete(Future) handleComplete} method), the {@link #shutdown()
 468      * shutdown} method is invoked to shut down the task scope, or the current thread
 469      * is {@linkplain Thread#interrupt() interrupted}.
 470      *
 471      * <p> This method may only be invoked by the task scope owner.
 472      *
 473      * @return this task scope
 474      * @throws IllegalStateException if this task scope is closed
 475      * @throws WrongThreadException if the current thread is not the owner
 476      * @throws InterruptedException if interrupted while waiting
 477      */
 478     public StructuredTaskScope<T> join() throws InterruptedException {
 479         try {
 480             implJoin(null);
 481         } catch (TimeoutException e) {
 482             throw new InternalError();
 483         }
 484         return this;
 485     }
 486 
 487     /**
 488      * Wait for all threads to finish or the task scope to shut down, up to the given
 489      * deadline. This method waits until all threads started in the task scope finish
 490      * execution (of both task and {@link #handleComplete(Future) handleComplete} method),
 491      * the {@link #shutdown() shutdown} method is invoked to shut down the task scope,
 492      * the current thread is {@linkplain Thread#interrupt() interrupted}, or the deadline
 493      * is reached.
 494      *
 495      * <p> This method may only be invoked by the task scope owner.
 496      *
 497      * @param deadline the deadline
 498      * @return this task scope
 499      * @throws IllegalStateException if this task scope is closed
 500      * @throws WrongThreadException if the current thread is not the owner
 501      * @throws InterruptedException if interrupted while waiting
 502      * @throws TimeoutException if the deadline is reached while waiting
 503      */
 504     public StructuredTaskScope<T> joinUntil(Instant deadline)
 505         throws InterruptedException, TimeoutException
 506     {
 507         Duration timeout = Duration.between(Instant.now(), deadline);
 508         implJoin(timeout);
 509         return this;
 510     }
 511 
 512     /**
 513      * Cancel all tracked Future objects.
 514      */
 515     private void cancelTrackedFutures() {
 516         Set<Future<?>> futures = this.futures;
 517         if (futures != null) {
 518             futures.forEach(f -> f.cancel(false));
 519         }
 520     }
 521 
 522     /**
 523      * Interrupt all unfinished threads.
 524      */
 525     private void implInterruptAll() {
 526         flock.threads().forEach(t -> {
 527             if (t != Thread.currentThread()) {
 528                 t.interrupt();
 529             }
 530         });
 531     }
 532 
 533     @SuppressWarnings("removal")
 534     private void interruptAll() {
 535         if (System.getSecurityManager() == null) {
 536             implInterruptAll();
 537         } else {
 538             PrivilegedAction<Void> pa = () -> {
 539                 implInterruptAll();
 540                 return null;
 541             };
 542             AccessController.doPrivileged(pa);
 543         }
 544     }
 545 
 546     /**
 547      * Shutdown the task scope if not already shutdown. Return true if this method
 548      * shutdowns the task scope, false if already shutdown.
 549      */
 550     private boolean implShutdown() {
 551         if (state < SHUTDOWN) {
 552             shutdownLock.lock();
 553             try {
 554                 if (state < SHUTDOWN) {
 555 
 556                     // prevent new threads from starting
 557                     flock.shutdown();
 558 
 559                     // wakeup any threads waiting in Future::get
 560                     cancelTrackedFutures();
 561 
 562                     // interrupt all unfinished threads
 563                     interruptAll();
 564 
 565                     state = SHUTDOWN;
 566                     return true;
 567                 }
 568             } finally {
 569                 shutdownLock.unlock();
 570             }
 571         }
 572         assert state >= SHUTDOWN;
 573         return false;
 574     }
 575 
 576     /**
 577      * Shut down the task scope without closing it. Shutting down a task scope prevents
 578      * new threads from starting, interrupts all unfinished threads, and causes the
 579      * {@link #join() join} method to wakeup. Shutdown is useful for cases where the
 580      * results of unfinished subtasks are no longer needed.
 581      *
 582      * <p> More specifically, this method:
 583      * <ul>
 584      * <li> {@linkplain Future#cancel(boolean) Cancels} the tasks that have threads
 585      * {@linkplain Future#get() waiting} on a result so that the waiting threads wakeup.
 586      * <li> {@linkplain Thread#interrupt() Interrupts} all unfinished threads in the
 587      * task scope (except the current thread).
 588      * <li> Wakes up the owner if it is waiting in {@link #join()} or {@link
 589      * #joinUntil(Instant)}. If the owner is not waiting then its next call to {@code
 590      * join} or {@code joinUntil} will return immediately.
 591      * </ul>
 592      *
 593      * <p> When this method completes then the {@code Future} objects for all tasks will
 594      * be {@linkplain Future#isDone() done}, normally or abnormally. There may still
 595      * be threads that have not finished because they are executing code that did not
 596      * respond (or respond promptly) to thread interrupt. This method does not wait
 597      * for these threads. When the owner invokes the {@link #close() close} method
 598      * to close the task scope then it will wait for the remaining threads to finish.
 599      *
 600      * <p> This method may only be invoked by the task scope owner or threads contained
 601      * in the task scope.
 602      *
 603      * @throws IllegalStateException if this task scope is closed
 604      * @throws WrongThreadException if the current thread is not the owner or
 605      * a thread contained in the task scope
 606      */
 607     public void shutdown() {
 608         ensureOwnerOrContainsThread();
 609         if (state == CLOSED)
 610             throw new IllegalStateException("Task scope is closed");
 611         if (implShutdown())
 612             flock.wakeup();
 613     }
 614 
 615     /**
 616      * Closes this task scope.
 617      *
 618      * <p> This method first shuts down the task scope (as if by invoking the {@link
 619      * #shutdown() shutdown} method). It then waits for the threads executing any
 620      * unfinished tasks to finish. If interrupted then this method will continue to
 621      * wait for the threads to finish before completing with the interrupt status set.
 622      *
 623      * <p> This method may only be invoked by the task scope owner. If the task scope
 624      * is already closed then the owner invoking this method has no effect.
 625      *
 626      * <p> A {@code StructuredTaskScope} is intended to be used in a <em>structured
 627      * manner</em>. If this method is called to close a task scope before nested task
 628      * scopes are closed then it closes the underlying construct of each nested task scope
 629      * (in the reverse order that they were created in), closes this task scope, and then
 630      * throws {@link StructureViolationException}.





 631      * If a thread terminates without first closing task scopes that it owns then
 632      * termination will cause the underlying construct of each of its open tasks scopes to
 633      * be closed. Closing is performed in the reverse order that the task scopes were
 634      * created in. Thread termination may therefore be delayed when the owner has to wait
 635      * for threads forked in these task scopes to finish.
 636      *
 637      * @throws IllegalStateException thrown after closing the task scope if the owner
 638      * did not invoke join after forking
 639      * @throws WrongThreadException if the current thread is not the owner
 640      * @throws StructureViolationException if a structure violation was detected
 641      */
 642     @Override
 643     public void close() {
 644         ensureOwner();
 645         if (state == CLOSED)
 646             return;
 647 
 648         try {
 649             implShutdown();
 650             flock.close();
 651         } finally {
 652             state = CLOSED;
 653         }
 654 
 655         if (needJoin) {
 656             throw new IllegalStateException("Owner did not invoke join or joinUntil after fork");
 657         }
 658     }
 659 
 660     @Override
 661     public String toString() {
 662         StringBuilder sb = new StringBuilder();
 663         String name = flock.name();
 664         if (name != null) {
 665             sb.append(name);
 666             sb.append('/');
 667         }
 668         sb.append(Objects.toIdentityString(this));
 669         int s = state;
 670         if (s == CLOSED)
 671             sb.append("/closed");
 672         else if (s == SHUTDOWN)
 673             sb.append("/shutdown");
 674         return sb.toString();
 675     }
 676 
 677     /**
 678      * The Future implementation returned by the fork methods. Most methods are
 679      * overridden to support cancellation when the task scope is shutdown.
 680      * The blocking get methods register the Future with the task scope so that they
 681      * are cancelled when the task scope shuts down.
 682      */
 683     private static final class FutureImpl<V> extends FutureTask<V> {
 684         private final StructuredTaskScope<V> scope;
 685 
 686         @SuppressWarnings("unchecked")
 687         FutureImpl(StructuredTaskScope<? super V> scope, Callable<? extends V> task) {
 688             super((Callable<V>) task);
 689             this.scope = (StructuredTaskScope<V>) scope;
 690         }
 691 
 692         @Override
 693         protected void done() {
 694             if (!scope.isShutdown()) {
 695                 scope.handleComplete(this);
 696             }
 697         }
 698 
 699         private void cancelIfShutdown() {
 700             if (scope.isShutdown() && !super.isDone()) {
 701                 super.cancel(false);
 702             }
 703         }
 704 
 705         @Override
 706         public boolean isDone() {
 707             cancelIfShutdown();
 708             return super.isDone();
 709         }
 710 
 711         @Override
 712         public boolean isCancelled() {
 713             cancelIfShutdown();
 714             return super.isCancelled();
 715         }
 716 
 717         @Override
 718         public boolean cancel(boolean mayInterruptIfRunning) {
 719             scope.ensureOwnerOrContainsThread();
 720             cancelIfShutdown();
 721             return super.cancel(mayInterruptIfRunning);
 722         }
 723 
 724         @Override
 725         public V get() throws InterruptedException, ExecutionException {
 726             if (super.isDone())
 727                 return super.get();
 728             scope.track(this);
 729             try {
 730                 cancelIfShutdown();
 731                 return super.get();
 732             } finally {
 733                 scope.untrack(this);
 734             }
 735         }
 736 
 737         @Override
 738         public V get(long timeout, TimeUnit unit)
 739                 throws InterruptedException, ExecutionException, TimeoutException {
 740             Objects.requireNonNull(unit);
 741             if (super.isDone())
 742                 return super.get();
 743             scope.track(this);
 744             try {
 745                 cancelIfShutdown();
 746                 return super.get(timeout, unit);
 747             } finally {
 748                 scope.untrack(this);
 749             }
 750         }
 751 
 752         @Override
 753         public V resultNow() {
 754             cancelIfShutdown();
 755             return super.resultNow();
 756         }
 757 
 758         @Override
 759         public Throwable exceptionNow() {
 760             cancelIfShutdown();
 761             return super.exceptionNow();
 762         }
 763 
 764         @Override
 765         public State state() {
 766             cancelIfShutdown();
 767             return super.state();
 768         }
 769 
 770         @Override
 771         public String toString() {
 772             cancelIfShutdown();
 773             return super.toString();
 774         }
 775     }
 776 
 777     /**
 778      * Maps a Future.State to an int that can be compared.
 779      * RUNNING < CANCELLED < FAILED < SUCCESS.
 780      */
 781     private static int futureStateToInt(Future.State s) {
 782         return switch (s) {
 783             case RUNNING   -> 0;
 784             case CANCELLED -> 1;
 785             case FAILED    -> 2;
 786             case SUCCESS   -> 3;
 787         };
 788     }
 789 
 790     // RUNNING < CANCELLED < FAILED < SUCCESS
 791     private static final Comparator<Future.State> FUTURE_STATE_COMPARATOR =
 792             Comparator.comparingInt(StructuredTaskScope::futureStateToInt);
 793 
 794     /**
 795      * A {@code StructuredTaskScope} that captures the result of the first subtask to
 796      * complete successfully. Once captured, it invokes the {@linkplain #shutdown() shutdown}
 797      * method to interrupt unfinished threads and wakeup the owner. The policy
 798      * implemented by this class is intended for cases where the result of any subtask
 799      * will do ("invoke any") and where the results of other unfinished subtask are no
 800      * longer needed.
 801      *
 802      * <p> Unless otherwise specified, passing a {@code null} argument to a method
 803      * in this class will cause a {@link NullPointerException} to be thrown.
 804      *
 805      * @param <T> the result type
 806      * @since 19
 807      */
 808     public static final class ShutdownOnSuccess<T> extends StructuredTaskScope<T> {
 809         private static final VarHandle FUTURE;
 810         static {
 811             try {
 812                 MethodHandles.Lookup l = MethodHandles.lookup();
 813                 FUTURE = l.findVarHandle(ShutdownOnSuccess.class, "future", Future.class);
 814             } catch (Exception e) {
 815                 throw new InternalError(e);
 816             }
 817         }
 818         private volatile Future<T> future;
 819 
 820         /**
 821          * Constructs a new {@code ShutdownOnSuccess} with the given name and thread factory.
 822          * The task scope is optionally named for the purposes of monitoring and management.
 823          * The thread factory is used to {@link ThreadFactory#newThread(Runnable) create}
 824          * threads when tasks are {@linkplain #fork(Callable) forked}. The task scope is
 825          * owned by the current thread.
 826          *






 827          * @param name the name of the task scope, can be null
 828          * @param factory the thread factory
 829          */
 830         public ShutdownOnSuccess(String name, ThreadFactory factory) {
 831             super(name, factory);
 832         }
 833 
 834         /**
 835          * Constructs a new unnamed {@code ShutdownOnSuccess} that creates virtual threads.
 836          *
 837          * <p> This constructor is equivalent to invoking the 2-arg constructor with a
 838          * name of {@code null} and a thread factory that creates virtual threads.
 839          */
 840         public ShutdownOnSuccess() {
 841             super(null, Thread.ofVirtual().factory());
 842         }
 843 
 844         /**
 845          * Shut down the given task scope when invoked for the first time with a {@code
 846          * Future} for a task that completed with a result.
 847          *
 848          * @param future the completed task
 849          * @see #shutdown()
 850          * @see Future.State#SUCCESS
 851          */
 852         @Override
 853         protected void handleComplete(Future<T> future) {
 854             Future.State state = future.state();
 855             if (state == Future.State.RUNNING) {
 856                 throw new IllegalArgumentException("Task is not completed");
 857             }
 858 
 859             Future<T> f;
 860             while (((f = this.future) == null)
 861                     || FUTURE_STATE_COMPARATOR.compare(f.state(), state) < 0) {
 862                 if (FUTURE.compareAndSet(this, f, future)) {
 863                     if (state == Future.State.SUCCESS)
 864                         shutdown();
 865                     break;
 866                 }
 867             }
 868         }
 869 
 870         /**
 871          * {@inheritDoc}
 872          * @return this task scope
 873          * @throws IllegalStateException {@inheritDoc}
 874          * @throws WrongThreadException {@inheritDoc}
 875          */
 876         @Override
 877         public ShutdownOnSuccess<T> join() throws InterruptedException {
 878             super.join();
 879             return this;
 880         }
 881 
 882         /**
 883          * {@inheritDoc}
 884          * @return this task scope
 885          * @throws IllegalStateException {@inheritDoc}
 886          * @throws WrongThreadException {@inheritDoc}
 887          */
 888         @Override
 889         public ShutdownOnSuccess<T> joinUntil(Instant deadline)
 890             throws InterruptedException, TimeoutException
 891         {
 892             super.joinUntil(deadline);
 893             return this;
 894         }
 895 
 896         /**
 897          * {@return the result of the first subtask that completed with a result}
 898          *
 899          * <p> When no subtask completed with a result but a task completed with an
 900          * exception then {@code ExecutionException} is thrown with the exception as the
 901          * {@linkplain Throwable#getCause() cause}. If only cancelled subtasks were
 902          * notified to the {@code handleComplete} method then {@code CancellationException}
 903          * is thrown.
 904          *
 905          * @apiNote This method is intended to be invoked by the task scope owner after it
 906          * has invoked {@link #join() join} (or {@link #joinUntil(Instant) joinUntil}).
 907          * A future release may add enforcement to prevent the method being called by
 908          * other threads or before joining.
 909          *
 910          * @throws ExecutionException if no subtasks completed with a result but a subtask
 911          * completed with an exception
 912          * @throws CancellationException if all subtasks were cancelled
 913          * @throws IllegalStateException if the handle method was not invoked with a
 914          * completed subtask
 915          */
 916         public T result() throws ExecutionException {
 917             Future<T> f = future;
 918             if (f == null) {
 919                 throw new IllegalStateException("No completed subtasks");
 920             }
 921             return switch (f.state()) {
 922                 case SUCCESS   -> f.resultNow();
 923                 case FAILED    -> throw new ExecutionException(f.exceptionNow());
 924                 case CANCELLED -> throw new CancellationException();
 925                 default        -> throw new InternalError("Unexpected state: " + f);
 926             };
 927 
 928         }
 929 
 930         /**
 931          * Returns the result of the first subtask that completed with a result, otherwise
 932          * throws an exception produced by the given exception supplying function.
 933          *
 934          * <p> When no subtask completed with a result but a subtask completed with an
 935          * exception then the exception supplying function is invoked with the exception.
 936          * If only cancelled subtasks were notified to the {@code handleComplete} method
 937          * then the exception supplying function is invoked with a {@code CancellationException}.
 938          *
 939          * @apiNote This method is intended to be invoked by the task scope owner after it
 940          * has invoked {@link #join() join} (or {@link #joinUntil(Instant) joinUntil}).
 941          * A future release may add enforcement to prevent the method being called by
 942          * other threads or before joining.
 943          *
 944          * @param esf the exception supplying function
 945          * @param <X> type of the exception to be thrown
 946          * @return the result of the first subtask that completed with a result
 947          * @throws X if no subtask completed with a result
 948          * @throws IllegalStateException if the handle method was not invoked with a
 949          * completed subtask
 950          */
 951         public <X extends Throwable> T result(Function<Throwable, ? extends X> esf) throws X {
 952             Objects.requireNonNull(esf);
 953             Future<T> f = future;
 954             if (f == null) {
 955                 throw new IllegalStateException("No completed subtasks");
 956             }
 957             Future.State state = f.state();
 958             if (state == Future.State.SUCCESS) {
 959                 return f.resultNow();
 960             } else {
 961                 Throwable throwable = (state == Future.State.FAILED)
 962                         ? f.exceptionNow()
 963                         : new CancellationException();
 964                 X ex = esf.apply(throwable);
 965                 Objects.requireNonNull(ex, "esf returned null");
 966                 throw ex;
 967             }
 968         }
 969     }
 970 
 971     /**
 972      * A {@code StructuredTaskScope} that captures the exception of the first subtask to
 973      * complete abnormally. Once captured, it invokes the {@linkplain #shutdown() shutdown}
 974      * method to interrupt unfinished threads and wakeup the owner. The policy implemented
 975      * by this class is intended for cases where the results for all subtasks are required
 976      * ("invoke all"); if any subtask fails then the results of other unfinished subtasks
 977      * are no longer needed.
 978      *
 979      * <p> Unless otherwise specified, passing a {@code null} argument to a method
 980      * in this class will cause a {@link NullPointerException} to be thrown.
 981      *
 982      * @since 19
 983      */
 984     public static final class ShutdownOnFailure extends StructuredTaskScope<Object> {
 985         private static final VarHandle FUTURE;
 986         static {
 987             try {
 988                 MethodHandles.Lookup l = MethodHandles.lookup();
 989                 FUTURE = l.findVarHandle(ShutdownOnFailure.class, "future", Future.class);
 990             } catch (Exception e) {
 991                 throw new InternalError(e);
 992             }
 993         }
 994         private volatile Future<Object> future;
 995 
 996         /**
 997          * Constructs a new {@code ShutdownOnFailure} with the given name and thread factory.
 998          * The task scope is optionally named for the purposes of monitoring and management.
 999          * The thread factory is used to {@link ThreadFactory#newThread(Runnable) create}
1000          * threads when tasks are {@linkplain #fork(Callable) forked}. The task scope
1001          * is owned by the current thread.
1002          *






1003          * @param name the name of the task scope, can be null
1004          * @param factory the thread factory
1005          */
1006         public ShutdownOnFailure(String name, ThreadFactory factory) {
1007             super(name, factory);
1008         }
1009 
1010         /**
1011          * Constructs a new unnamed {@code ShutdownOnFailure} that creates virtual threads.
1012          *
1013          * <p> This constructor is equivalent to invoking the 2-arg constructor with a
1014          * name of {@code null} and a thread factory that creates virtual threads.
1015          */
1016         public ShutdownOnFailure() {
1017             super(null, Thread.ofVirtual().factory());
1018         }
1019 
1020         /**
1021          * Shut down the given task scope when invoked for the first time with a {@code
1022          * Future} for a task that completed abnormally (exception or cancelled).
1023          *
1024          * @param future the completed task
1025          * @see #shutdown()
1026          * @see Future.State#FAILED
1027          * @see Future.State#CANCELLED
1028          */
1029         @Override
1030         protected void handleComplete(Future<Object> future) {
1031             Future.State state = future.state();
1032             if (state == Future.State.RUNNING) {
1033                 throw new IllegalArgumentException("Task is not completed");
1034             } else if (state == Future.State.SUCCESS) {
1035                 return;
1036             }
1037 
1038             // A failed task overrides a cancelled task.
1039             // The first failed or cancelled task causes the scope to shutdown.
1040             Future<Object> f;
1041             while (((f = this.future) == null)
1042                     || FUTURE_STATE_COMPARATOR.compare(f.state(), state) < 0) {
1043                 if (FUTURE.compareAndSet(this, f, future)) {
1044                     shutdown();
1045                     break;
1046                 }
1047             }
1048         }
1049 
1050         /**
1051          * {@inheritDoc}
1052          * @return this task scope
1053          * @throws IllegalStateException {@inheritDoc}
1054          * @throws WrongThreadException {@inheritDoc}
1055          */
1056         @Override
1057         public ShutdownOnFailure join() throws InterruptedException {
1058             super.join();
1059             return this;
1060         }
1061 
1062         /**
1063          * {@inheritDoc}
1064          * @return this task scope
1065          * @throws IllegalStateException {@inheritDoc}
1066          * @throws WrongThreadException {@inheritDoc}
1067          */
1068         @Override
1069         public ShutdownOnFailure joinUntil(Instant deadline)
1070             throws InterruptedException, TimeoutException
1071         {
1072             super.joinUntil(deadline);
1073             return this;
1074         }
1075 
1076         /**
1077          * Returns the exception for the first subtask that completed with an exception.
1078          * If no subtask completed with an exception but cancelled subtasks were notified
1079          * to the {@code handleComplete} method then a {@code CancellationException}
1080          * is returned. If no subtasks completed abnormally then an empty {@code Optional}
1081          * is returned.
1082          *
1083          * @apiNote This method is intended to be invoked by the task scope owner after it
1084          * has invoked {@link #join() join} (or {@link #joinUntil(Instant) joinUntil}).
1085          * A future release may add enforcement to prevent the method being called by
1086          * other threads or before joining.
1087          *
1088          * @return the exception for a subtask that completed abnormally or an empty
1089          * optional if no subtasks completed abnormally
1090          */
1091         public Optional<Throwable> exception() {
1092             Future<Object> f = future;
1093             if (f != null) {
1094                 Throwable throwable = (f.state() == Future.State.FAILED)
1095                         ? f.exceptionNow()
1096                         : new CancellationException();
1097                 return Optional.of(throwable);
1098             } else {
1099                 return Optional.empty();
1100             }
1101         }
1102 
1103         /**
1104          * Throws if a subtask completed abnormally. If any subtask completed with an
1105          * exception then {@code ExecutionException} is thrown with the exception of the
1106          * first subtask to fail as the {@linkplain Throwable#getCause() cause}. If no
1107          * subtask completed with an exception but cancelled subtasks were notified to the
1108          * {@code handleComplete} method then {@code CancellationException} is thrown.
1109          * This method does nothing if no subtasks completed abnormally.
1110          *
1111          * @apiNote This method is intended to be invoked by the task scope owner after it
1112          * has invoked {@link #join() join} (or {@link #joinUntil(Instant) joinUntil}).
1113          * A future release may add enforcement to prevent the method being called by
1114          * other threads or before joining.
1115          *
1116          * @throws ExecutionException if a subtask completed with an exception
1117          * @throws CancellationException if no subtasks completed with an exception but
1118          * subtasks were cancelled
1119          */
1120         public void throwIfFailed() throws ExecutionException {
1121             Future<Object> f = future;
1122             if (f != null) {
1123                 if (f.state() == Future.State.FAILED) {
1124                     throw new ExecutionException(f.exceptionNow());
1125                 } else {
1126                     throw new CancellationException();
1127                 }
1128             }
1129         }
1130 
1131         /**
1132          * Throws the exception produced by the given exception supplying function if
1133          * a subtask completed abnormally. If any subtask completed with an exception then
1134          * the function is invoked with the exception of the first subtask to fail.
1135          * If no subtask completed with an exception but cancelled subtasks were notified
1136          * to the {@code handleComplete} method then the function is called with a {@code
1137          * CancellationException}. The exception returned by the function is thrown.
1138          * This method does nothing if no subtasks completed abnormally.
1139          *
1140          * @apiNote This method is intended to be invoked by the task scope owner after it
1141          * has invoked {@link #join() join} (or {@link #joinUntil(Instant) joinUntil}).
1142          * A future release may add enforcement to prevent the method being called by
1143          * other threads or before joining.
1144          *
1145          * @param esf the exception supplying function
1146          * @param <X> type of the exception to be thrown
1147          * @throws X produced by the exception supplying function
1148          */
1149         public <X extends Throwable>
1150         void throwIfFailed(Function<Throwable, ? extends X> esf) throws X {
1151             Objects.requireNonNull(esf);
1152             Future<Object> f = future;
1153             if (f != null) {
1154                 Throwable throwable = (f.state() == Future.State.FAILED)
1155                         ? f.exceptionNow()
1156                         : new CancellationException();
1157                 X ex = esf.apply(throwable);
1158                 Objects.requireNonNull(ex, "esf returned null");
1159                 throw ex;
1160             }
1161         }
1162     }
1163 }
--- EOF ---