8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package java.util.concurrent;
26
27 import java.time.Duration;
28 import java.util.function.Function;
29 import java.util.function.Predicate;
30 import java.util.function.Supplier;
31 import java.util.stream.Stream;
32 import jdk.internal.javac.PreviewFeature;
33
34 /**
35 * An API for <em>structured concurrency</em>. {@code StructuredTaskScope} supports cases
36 * where execution of a <em>task</em> (a unit of work) splits into several concurrent
37 * subtasks, and where the subtasks must complete before the task continues. A {@code
38 * StructuredTaskScope} can be used to ensure that the lifetime of a concurrent operation
39 * is confined by a <em>syntax block</em>, similar to that of a sequential operation in
40 * structured programming.
41 *
42 * <p> {@code StructuredTaskScope} defines the static method {@link #open() open} to open
43 * a new {@code StructuredTaskScope} and the {@link #close() close} method to close it.
44 * The API is designed to be used with the {@code try}-with-resources statement where
45 * the {@code StructuredTaskScope} is opened as a resource and then closed automatically.
46 * The code inside the block uses the {@link #fork(Callable) fork} method to fork subtasks.
47 * After forking, it uses the {@link #join() join} method to wait for all subtasks to
48 * finish (or some other outcome) as a single operation. Forking a subtask starts a new
49 * {@link Thread} to run the subtask. The thread executing the task does not continue
50 * beyond the {@code close} method until all threads started to execute subtasks have finished.
51 * To ensure correct usage, the {@code fork}, {@code join} and {@code close} methods may
52 * only be invoked by the <em>owner thread</em> (the thread that opened the {@code
53 * StructuredTaskScope}), the {@code fork} method may not be called after {@code join},
54 * the {@code join} method may only be invoked once, and the {@code close} method throws
55 * an exception after closing if the owner did not invoke the {@code join} method after
56 * forking subtasks.
57 *
58 * <p> As a first example, consider a task that splits into two subtasks to concurrently
59 * fetch resources from two URL locations "left" and "right". Both subtasks may complete
60 * successfully, one subtask may succeed and the other may fail, or both subtasks may
61 * fail. The task in this example is interested in the successful result from both
62 * subtasks. It waits in the {@link #join() join} method for both subtasks to complete
63 * successfully or for either subtask to fail.
64 * {@snippet lang=java :
65 * // @link substring="open" target="#open()" :
66 * try (var scope = StructuredTaskScope.open()) {
67 *
68 * // @link substring="fork" target="#fork(Callable)" :
69 * Subtask<String> subtask1 = scope.fork(() -> query(left));
70 * Subtask<Integer> subtask2 = scope.fork(() -> query(right));
71 *
72 * // throws if either subtask fails
73 * scope.join(); // @link substring="join" target="#join()"
74 *
75 * // both subtasks completed successfully
76 * // @link substring="get" target="Subtask#get()" :
181 * } catch (StructuredTaskScope.FailedException e) {
182 *
183 * Throwable cause = e.getCause();
184 * switch (cause) {
185 * case IOException ioe -> ..
186 * default -> ..
187 * }
188 *
189 * }
190 * }
191 * In other cases it may not be useful to catch {@code FailedException} but instead leave
192 * it to propagate to the configured {@linkplain Thread.UncaughtExceptionHandler uncaught
193 * exception handler} for logging purposes.
194 *
195 * <p> For cases where a specific exception triggers the use of a default result then it
196 * may be more appropriate to handle this in the subtask itself rather than the subtask
197 * failing and the scope owner handling the exception.
198 *
199 * <h2>Configuration</h2>
200 *
201 *
202 * A {@code StructuredTaskScope} is opened with {@linkplain Configuration configuration}
203 * that consists of a {@link ThreadFactory} to create threads, an optional name for
204 * monitoring and management purposes, and an optional timeout.
205 *
206 * <p> The {@link #open()} and {@link #open(Joiner)} methods create a {@code StructuredTaskScope}
207 * with the <a id="DefaultConfiguration"> <em>default configuration</em></a>. The default
208 * configuration has a {@code ThreadFactory} that creates unnamed
209 * <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>,
210 * is unnamed for monitoring and management purposes, and has no timeout.
211 *
212 * <p> The 2-arg {@link #open(Joiner, Function) open} method can be used to create a
213 * {@code StructuredTaskScope} that uses a different {@code ThreadFactory}, has a name for
214 * the purposes of monitoring and management, or has a timeout that cancels the scope if
215 * the timeout expires before or while waiting for subtasks to complete. The {@code open}
216 * method is called with a {@linkplain Function function} that is applied to the default
217 * configuration and returns a {@link Configuration Configuration} for the
218 * {@code StructuredTaskScope} under construction.
219 *
220 * <p> The following example opens a new {@code StructuredTaskScope} with a {@code
221 * ThreadFactory} that creates virtual threads {@linkplain Thread#setName(String) named}
222 * "duke-0", "duke-1" ...
223 * {@snippet lang = java:
224 * // @link substring="name" target="Thread.Builder#name(String, long)" :
225 * ThreadFactory factory = Thread.ofVirtual().name("duke-", 0).factory();
226 *
227 * // @link substring="withThreadFactory" target="Configuration#withThreadFactory(ThreadFactory)" :
228 * try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) {
229 *
230 * scope.fork( .. ); // runs in a virtual thread with name "duke-0"
231 * scope.fork( .. ); // runs in a virtual thread with name "duke-1"
232 *
233 * scope.join();
234 *
235 * }
236 *}
237 *
238 * <p> A second example sets a timeout, represented by a {@link Duration}. The timeout
239 * starts when the new scope is opened. If the timeout expires before the {@code join}
240 * method has completed then the scope is <a href="#Cancallation">cancelled</a>. This
241 * interrupts the threads executing the two subtasks and causes the {@link #join() join}
242 * method to throw {@link TimeoutException}.
243 * {@snippet lang=java :
244 * Duration timeout = Duration.ofSeconds(10);
245 *
246 * // @link substring="allSuccessfulOrThrow" target="Joiner#allSuccessfulOrThrow()" :
247 * try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
248 * // @link substring="withTimeout" target="Configuration#withTimeout(Duration)" :
249 * cf -> cf.withTimeout(timeout))) {
250 *
251 * scope.fork(callable1);
252 * scope.fork(callable2);
253 *
254 * List<String> result = scope.join()
255 * .map(Subtask::get)
256 * .toList();
257 *
258 * }
259 * }
260 *
261 * <h2>Inheritance of scoped value bindings</h2>
262 *
297 * USERNAME, then value "duke" will be returned.
298 * {@snippet lang=java :
299 * // @link substring="newInstance" target="ScopedValue#newInstance()" :
300 * private static final ScopedValue<String> USERNAME = ScopedValue.newInstance();
301 *
302 * // @link substring="where" target="ScopedValue#where(ScopedValue, Object)" :
303 * MyResult result = ScopedValue.where(USERNAME, "duke").call(() -> {
304 *
305 * try (var scope = StructuredTaskScope.open()) {
306 *
307 * Subtask<String> subtask1 = scope.fork( .. ); // inherits binding
308 * Subtask<Integer> subtask2 = scope.fork( .. ); // inherits binding
309 *
310 * scope.join();
311 * return new MyResult(subtask1.get(), subtask2.get());
312 * }
313 *
314 * });
315 * }
316 *
317 * <p> A scoped value inherited into a subtask may be
318 * <a href="{@docRoot}/java.base/java/lang/ScopedValue.html#rebind">rebound</a> to a new
319 * value in the subtask for the bounded execution of some method executed in the subtask.
320 * When the method completes, the value of the {@code ScopedValue} reverts to its previous
321 * value, the value inherited from the thread executing the task.
322 *
323 * <p> A subtask may execute code that itself opens a new {@code StructuredTaskScope}.
324 * A task executing in thread T1 opens a {@code StructuredTaskScope} and forks a
325 * subtask that runs in thread T2. The scoped value bindings captured when T1 opens the
326 * scope are inherited into T2. The subtask (in thread T2) executes code that opens a
327 * new {@code StructuredTaskScope} and forks a subtask that runs in thread T3. The scoped
328 * value bindings captured when T2 opens the scope are inherited into T3. These
329 * include (or may be the same) as the bindings that were inherited from T1. In effect,
330 * scoped values are inherited into a tree of subtasks, not just one level of subtask.
331 *
332 * <h2>Memory consistency effects</h2>
333 *
334 * <p> Actions in the owner thread of a {@code StructuredTaskScope} prior to
335 * {@linkplain #fork forking} of a subtask
336 * <a href="{@docRoot}/java.base/java/util/concurrent/package-summary.html#MemoryVisibility">
337 * <i>happen-before</i></a> any actions taken by that subtask, which in turn
338 * <i>happen-before</i> the subtask result is {@linkplain Subtask#get() retrieved}.
339 *
340 * <h2>General exceptions</h2>
341 *
342 * <p> Unless otherwise specified, passing a {@code null} argument to a method in this
343 * class will cause a {@link NullPointerException} to be thrown.
344 *
345 * @param <T> the result type of subtasks executed in the scope
346 * @param <R> the result type of the scope
347 *
348 * @jls 17.4.5 Happens-before Order
349 * @since 21
350 */
351 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
352 public sealed interface StructuredTaskScope<T, R>
353 extends AutoCloseable
354 permits StructuredTaskScopeImpl {
355
356 /**
357 * Represents a subtask forked with {@link #fork(Callable)} or {@link #fork(Runnable)}.
388 /**
389 * The subtask failed with an exception. The {@link Subtask#exception()
390 * Subtask.exception()} method can be used to get the exception. This is a
391 * terminal state.
392 */
393 FAILED,
394 }
395
396 /**
397 * {@return the subtask state}
398 */
399 State state();
400
401 /**
402 * Returns the result of this subtask if it completed successfully. If the subtask
403 * was forked with {@link #fork(Callable) fork(Callable)} then the result from the
404 * {@link Callable#call() call} method is returned. If the subtask was forked with
405 * {@link #fork(Runnable) fork(Runnable)} then {@code null} is returned.
406 *
407 * <p> Code executing in the scope owner thread can use this method to get the
408 * result of a successful subtask only after it has {@linkplain #join() joined}.
409 *
410 * <p> Code executing in the {@code Joiner} {@link Joiner#onComplete(Subtask)
411 * onComplete} method should test that the {@linkplain #state() subtask state} is
412 * {@link State#SUCCESS SUCCESS} before using this method to get the result.
413 *
414 * @return the possibly-null result
415 * @throws IllegalStateException if the subtask has not completed, did not complete
416 * successfully, or the current thread is the scope owner invoking this
417 * method before {@linkplain #join() joining}
418 * @see State#SUCCESS
419 */
420 T get();
421
422 /**
423 * {@return the exception or error thrown by this subtask if it failed}
424 * If the subtask was forked with {@link #fork(Callable) fork(Callable)} then the
425 * exception or error thrown by the {@link Callable#call() call} method is returned.
426 * If the subtask was forked with {@link #fork(Runnable) fork(Runnable)} then the
427 * exception or error thrown by the {@link Runnable#run() run} method is returned.
428 *
429 * <p> Code executing in the scope owner thread can use this method to get the
430 * exception thrown by a failed subtask only after it has {@linkplain #join() joined}.
431 *
432 * <p> Code executing in a {@code Joiner} {@link Joiner#onComplete(Subtask)
433 * onComplete} method should test that the {@linkplain #state() subtask state} is
434 * {@link State#FAILED FAILED} before using this method to get the exception.
435 *
436 * @throws IllegalStateException if the subtask has not completed, completed with
437 * a result, or the current thread is the scope owner invoking this method
438 * before {@linkplain #join() joining}
439 * @see State#FAILED
440 */
441 Throwable exception();
442 }
443
444 /**
445 * An object used with a {@link StructuredTaskScope} to handle subtask completion and
446 * produce the result for the scope owner waiting in the {@link #join() join} method
447 * for subtasks to complete.
448 *
449 * <p> Joiner defines static methods to create {@code Joiner} objects for common cases:
450 * <ul>
451 * <li> {@link #allSuccessfulOrThrow() allSuccessfulOrThrow()} creates a {@code Joiner}
452 * that yields a stream of the completed subtasks for {@code join} to return when
453 * all subtasks complete successfully. It cancels the scope and causes {@code join}
454 * to throw if any subtask fails.
455 * <li> {@link #anySuccessfulResultOrThrow() anySuccessfulResultOrThrow()} creates a
456 * {@code Joiner} that yields the result of the first subtask to succeed for {@code
457 * join} to return. It causes {@code join} to throw if all subtasks fail.
458 * <li> {@link #awaitAllSuccessfulOrThrow() awaitAllSuccessfulOrThrow()} creates a
459 * {@code Joiner} that waits for all successful subtasks. It cancels the scope and
460 * causes {@code join} to throw if any subtask fails.
461 * <li> {@link #awaitAll() awaitAll()} creates a {@code Joiner} that waits for all
462 * subtasks. It does not cancel the scope or cause {@code join} to throw.
463 * </ul>
464 *
465 * <p> In addition to the methods to create {@code Joiner} objects for common cases,
466 * the {@link #allUntil(Predicate) allUntil(Predicate)} method is defined to create a
467 * {@code Joiner} that yields a stream of all subtasks. It is created with a {@link
468 * Predicate Predicate} that determines if the scope should continue or be cancelled.
469 * This {@code Joiner} can be built upon to create custom policies that cancel the
470 * scope based on some condition.
471 *
472 * <p> More advanced policies can be developed by implementing the {@code Joiner}
473 * interface. The {@link #onFork(Subtask)} method is invoked when subtasks are forked.
474 * The {@link #onComplete(Subtask)} method is invoked when subtasks complete with a
475 * result or exception. These methods return a {@code boolean} to indicate if scope
476 * should be cancelled. These methods can be used to collect subtasks, results, or
477 * exceptions, and control when to cancel the scope. The {@link #result()} method
478 * must be implemented to produce the result (or exception) for the {@code join}
479 * method.
480 *
481 * <p> Unless otherwise specified, passing a {@code null} argument to a method
482 * in this class will cause a {@link NullPointerException} to be thrown.
483 *
484 * @implSpec Implementations of this interface must be thread safe. The {@link
485 * #onComplete(Subtask)} method defined by this interface may be invoked by several
486 * threads concurrently.
487 *
488 * @apiNote It is very important that a new {@code Joiner} object is created for each
489 * {@code StructuredTaskScope}. {@code Joiner} objects should never be shared with
490 * different scopes or re-used after a task is closed.
491 *
492 * <p> Designing a {@code Joiner} should take into account the code at the use-site
493 * where the results from the {@link StructuredTaskScope#join() join} method are
494 * processed. It should be clear what the {@code Joiner} does vs. the application
495 * code at the use-site. In general, the {@code Joiner} implementation is not the
496 * place for "business logic". A {@code Joiner} should be designed to be as general
497 * purpose as possible.
498 *
499 * @param <T> the result type of subtasks executed in the scope
500 * @param <R> the result type of the scope
501 * @since 25
502 * @see #open(Joiner)
503 */
504 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
505 @FunctionalInterface
506 interface Joiner<T, R> {
507 /**
508 * Invoked by {@link #fork(Callable) fork(Callable)} and {@link #fork(Runnable)
509 * fork(Runnable)} when forking a subtask. The method is invoked from the task
510 * owner thread. The method is invoked before a thread is created to run the
511 * subtask.
512 *
513 * @implSpec The default implementation throws {@code NullPointerException} if the
514 * subtask is {@code null}. It throws {@code IllegalArgumentException} if the
515 * subtask is not in the {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state, it
516 * otherwise returns {@code false}.
517 *
518 * @apiNote This method is invoked by the {@code fork} methods. It should not be
519 * invoked directly.
520 *
521 * @param subtask the subtask
522 * @return {@code true} to cancel the scope, otherwise {@code false}
523 */
524 default boolean onFork(Subtask<? extends T> subtask) {
525 if (subtask.state() != Subtask.State.UNAVAILABLE) {
526 throw new IllegalArgumentException("Subtask not in UNAVAILABLE state");
527 }
528 return false;
529 }
530
531 /**
534 * subtask completes after the scope is cancelled.
535 *
536 * @implSpec The default implementation throws {@code NullPointerException} if the
537 * subtask is {@code null}. It throws {@code IllegalArgumentException} if the
538 * subtask is not in the {@link Subtask.State#SUCCESS SUCCESS} or {@link
539 * Subtask.State#FAILED FAILED} state, it otherwise returns {@code false}.
540 *
541 * @apiNote This method is invoked by subtasks when they complete. It should not
542 * be invoked directly.
543 *
544 * @param subtask the subtask
545 * @return {@code true} to cancel the scope, otherwise {@code false}
546 */
547 default boolean onComplete(Subtask<? extends T> subtask) {
548 if (subtask.state() == Subtask.State.UNAVAILABLE) {
549 throw new IllegalArgumentException("Subtask has not completed");
550 }
551 return false;
552 }
553
554 /**
555 * Invoked by the {@link #join() join()} method to produce the result (or exception)
556 * after waiting for all subtasks to complete or the scope cancelled. The result
557 * from this method is returned by the {@code join} method. If this method throws,
558 * then {@code join} throws {@link FailedException} with the exception thrown by
559 * this method as the cause.
560 *
561 * <p> In normal usage, this method will be called at most once by the {@code join}
562 * method to produce the result (or exception). The behavior of this method when
563 * invoked directly, and invoked more than once, is undefined. Where possible, an
564 * implementation should return an equal result (or throw the same exception) on
565 * second or subsequent calls to produce the outcome.
566 *
567 * @apiNote This method is invoked by the {@code join} method. It should not be
568 * invoked directly.
569 *
570 * @return the result
571 * @throws Throwable the exception
572 */
573 R result() throws Throwable;
574
575 /**
576 * {@return a new Joiner object that yields a stream of all subtasks when all
577 * subtasks complete successfully}
578 * The {@code Joiner} <a href="StructuredTaskScope.html#Cancallation">cancels</a>
579 * the scope and causes {@code join} to throw if any subtask fails.
580 *
581 * <p> If all subtasks complete successfully, the joiner's {@link Joiner#result()}
582 * method returns a stream of all subtasks in the order that they were forked.
583 * If any subtask failed then the {@code result} method throws the exception from
584 * the first subtask to fail.
585 *
586 * @apiNote Joiners returned by this method are suited to cases where all subtasks
587 * return a result of the same type. Joiners returned by {@link
588 * #awaitAllSuccessfulOrThrow()} are suited to cases where the subtasks return
589 * results of different types.
590 *
591 * @param <T> the result type of subtasks
592 */
593 static <T> Joiner<T, Stream<Subtask<T>>> allSuccessfulOrThrow() {
594 return new Joiners.AllSuccessful<>();
595 }
596
597 /**
598 * {@return a new Joiner object that yields the result of any subtask that
599 * completed successfully}
600 * The {@code Joiner} causes {@code join} to throw if all subtasks fail.
601 *
602 * <p> The joiner's {@link Joiner#result()} method returns the result of a subtask
603 * that completed successfully. If all subtasks fail then the {@code result} method
604 * throws the exception from one of the failed subtasks. The {@code result} method
605 * throws {@code NoSuchElementException} if no subtasks were forked.
606 *
607 * @param <T> the result type of subtasks
608 */
609 static <T> Joiner<T, T> anySuccessfulResultOrThrow() {
610 return new Joiners.AnySuccessful<>();
611 }
612
613 /**
614 * {@return a new Joiner object that waits for subtasks to complete successfully}
615 * The {@code Joiner} <a href="StructuredTaskScope.html#Cancallation">cancels</a>
616 * the scope and causes {@code join} to throw if any subtask fails.
617 *
618 * <p> The joiner's {@link Joiner#result() result} method returns {@code null}
619 * if all subtasks complete successfully, or throws the exception from the first
620 * subtask to fail.
621 *
622 * @apiNote Joiners returned by this method are suited to cases where subtasks
623 * return results of different types. Joiners returned by {@link #allSuccessfulOrThrow()}
624 * are suited to cases where the subtasks return a result of the same type.
625 *
626 * @param <T> the result type of subtasks
627 */
628 static <T> Joiner<T, Void> awaitAllSuccessfulOrThrow() {
629 return new Joiners.AwaitSuccessful<>();
630 }
631
632 /**
633 * {@return a new Joiner object that waits for all subtasks to complete}
634 * The {@code Joiner} does not cancel the scope if a subtask fails.
635 *
636 * <p> The joiner's {@link Joiner#result() result} method returns {@code null}.
637 *
638 * @apiNote This Joiner is useful for cases where subtasks make use of
639 * <em>side-effects</em> rather than return results or fail with exceptions.
640 * The {@link #fork(Runnable) fork(Runnable)} method can be used to fork subtasks
641 * that do not return a result.
642 *
643 * <p> This Joiner can also be used for <em>fan-in</em> scenarios where subtasks
644 * are forked to handle incoming connections and the number of subtasks is unbounded.
645 * In this example, the thread executing the {@code acceptLoop} method will only
646 * stop when interrupted or the listener socket is closed asynchronously.
647 * {@snippet lang=java :
648 * void acceptLoop(ServerSocket listener) throws IOException, InterruptedException {
649 * try (var scope = StructuredTaskScope.open(Joiner.<Socket>awaitAll())) {
650 * while (true) {
651 * Socket socket = listener.accept();
652 * scope.fork(() -> handle(socket));
653 * }
654 * }
655 * }
656 * }
657 *
658 * @param <T> the result type of subtasks
659 */
660 static <T> Joiner<T, Void> awaitAll() {
661 // ensure that new Joiner object is returned
662 return new Joiner<T, Void>() {
663 @Override
664 public Void result() {
665 return null;
666 }
667 };
668 }
669
670 /**
671 * {@return a new Joiner object that yields a stream of all subtasks when all
672 * subtasks complete or a predicate returns {@code true} to cancel the scope}
673 *
674 * <p> The joiner's {@link Joiner#onComplete(Subtask)} method invokes the
675 * predicate's {@link Predicate#test(Object) test} method with the subtask that
676 * completed successfully or failed with an exception. If the {@code test} method
677 * returns {@code true} then <a href="StructuredTaskScope.html#Cancallation">
678 * the scope is cancelled</a>. The {@code test} method must be thread safe as it
679 * may be invoked concurrently from several threads. If the {@code test} method
680 * completes with an exception or error, then the thread that executed the subtask
681 * invokes the {@linkplain Thread.UncaughtExceptionHandler uncaught exception handler}
682 * with the exception or error before the thread terminates.
683 *
684 * <p> The joiner's {@link #result()} method returns the stream of all subtasks,
685 * in fork order. The stream may contain subtasks that have completed
686 * (in {@link Subtask.State#SUCCESS SUCCESS} or {@link Subtask.State#FAILED FAILED}
687 * state) or subtasks in the {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state
688 * if the scope was cancelled before all subtasks were forked or completed.
689 *
690 * <p> The following example uses this method to create a {@code Joiner} that
691 * <a href="StructuredTaskScope.html#Cancallation">cancels</a> the scope when
692 * two or more subtasks fail.
693 * {@snippet lang=java :
694 * class CancelAfterTwoFailures<T> implements Predicate<Subtask<? extends T>> {
695 * private final AtomicInteger failedCount = new AtomicInteger();
696 * @Override
697 * public boolean test(Subtask<? extends T> subtask) {
698 * return subtask.state() == Subtask.State.FAILED
699 * && failedCount.incrementAndGet() >= 2;
700 * }
701 * }
702 *
703 * var joiner = Joiner.allUntil(new CancelAfterTwoFailures<String>());
704 * }
705 *
706 * <p> The following example uses {@code allUntil} to wait for all subtasks to
707 * complete without any cancellation. This is similar to {@link #awaitAll()}
708 * except that it yields a list of the completed subtasks.
709 * {@snippet lang=java :
710 * <T> List<Subtask<T>> invokeAll(Collection<Callable<T>> tasks) throws InterruptedException {
711 * try (var scope = StructuredTaskScope.open(Joiner.<T>allUntil(_ -> false))) {
712 * tasks.forEach(scope::fork);
713 * return scope.join().toList();
714 * }
715 * }
716 * }
717 *
718 * @param isDone the predicate to evaluate completed subtasks
719 * @param <T> the result type of subtasks
720 */
721 static <T> Joiner<T, Stream<Subtask<T>>> allUntil(Predicate<Subtask<? extends T>> isDone) {
722 return new Joiners.AllSubtasks<>(isDone);
723 }
724 }
725
726 /**
727 * Represents the configuration for a {@code StructuredTaskScope}.
728 *
729 * <p> The configuration for a {@code StructuredTaskScope} consists of a {@link
730 * ThreadFactory} to create threads, an optional name for the purposes of monitoring
731 * and management, and an optional timeout.
732 *
733 * <p> Creating a {@code StructuredTaskScope} with {@link #open()} or {@link #open(Joiner)}
734 * uses the <a href="StructuredTaskScope.html#DefaultConfiguration">default
735 * configuration</a>. The default configuration consists of a thread factory that
736 * creates unnamed <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">
737 * virtual threads</a>, no name for monitoring and management purposes, and no timeout.
738 *
739 * <p> Creating a {@code StructuredTaskScope} with its 2-arg {@link #open(Joiner, Function)
740 * open} method allows a different configuration to be used. The function specified
741 * to the {@code open} method is applied to the default configuration and returns the
742 * configuration for the {@code StructuredTaskScope} under construction. The function
743 * can use the {@code with-} prefixed methods defined here to specify the components
744 * of the configuration to use.
745 *
746 * <p> Unless otherwise specified, passing a {@code null} argument to a method
747 * in this class will cause a {@link NullPointerException} to be thrown.
748 *
749 * @since 25
750 */
751 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
752 sealed interface Configuration permits StructuredTaskScopeImpl.ConfigImpl {
753 /**
754 * {@return a new {@code Configuration} object with the given thread factory}
755 * The other components are the same as this object. The thread factory is used by
756 * a scope to create threads when {@linkplain #fork(Callable) forking} subtasks.
757 * @param threadFactory the thread factory
758 *
759 * @apiNote The thread factory will typically create
760 * <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>,
761 * maybe with names for monitoring purposes, an {@linkplain Thread.UncaughtExceptionHandler
762 * uncaught exception handler}, or other properties configured.
763 *
764 * @see #fork(Callable)
765 */
766 Configuration withThreadFactory(ThreadFactory threadFactory);
767
768 /**
769 * {@return a new {@code Configuration} object with the given name}
770 * The other components are the same as this object. A scope is optionally
771 * named for the purposes of monitoring and management.
772 * @param name the name
773 */
774 Configuration withName(String name);
775
776 /**
777 * {@return a new {@code Configuration} object with the given timeout}
778 * The other components are the same as this object.
779 * @param timeout the timeout
780 *
781 * @apiNote Applications using deadlines, expressed as an {@link java.time.Instant},
782 * can use {@link Duration#between Duration.between(Instant.now(), deadline)} to
783 * compute the timeout for this method.
784 *
785 * @see #join()
786 */
787 Configuration withTimeout(Duration timeout);
788 }
789
790 /**
791 * Exception thrown by {@link #join()} when the outcome is an exception rather than a
792 * result.
793 *
794 * @since 25
795 */
796 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
797 final class FailedException extends RuntimeException {
798 @java.io.Serial
799 static final long serialVersionUID = -1533055100078459923L;
800
801 /**
802 * Constructs a {@code FailedException} with the specified cause.
803 *
804 * @param cause the cause, can be {@code null}
805 */
806 FailedException(Throwable cause) {
807 super(cause);
808 }
809 }
810
811 /**
812 * Exception thrown by {@link #join()} if the scope was created with a timeout and
813 * the timeout expired before or while waiting in {@code join}.
814 *
815 * @since 25
816 * @see Configuration#withTimeout(Duration)
817 */
818 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
819 final class TimeoutException extends RuntimeException {
820 @java.io.Serial
821 static final long serialVersionUID = 705788143955048766L;
822
823 /**
824 * Constructs a {@code TimeoutException} with no detail message.
825 */
826 TimeoutException() { }
827 }
828
829 /**
830 * Opens a new {@code StructuredTaskScope} to use the given {@code Joiner} object and
831 * with configuration that is the result of applying the given function to the
832 * <a href="#DefaultConfiguration">default configuration</a>.
833 *
834 * <p> The {@code configFunction} is called with the default configuration and returns
835 * the configuration for the new scope. The function may, for example, set the
836 * {@linkplain Configuration#withThreadFactory(ThreadFactory) ThreadFactory} or set a
837 * {@linkplain Configuration#withTimeout(Duration) timeout}. If the function completes
838 * with an exception or error then it is propagated by this method. If the function
839 * returns {@code null} then {@code NullPointerException} is thrown.
840 *
841 * <p> If a {@code ThreadFactory} is set then its {@link ThreadFactory#newThread(Runnable)
842 * newThread} method will be called to create threads when {@linkplain #fork(Callable)
843 * forking} subtasks in this scope. If a {@code ThreadFactory} is not set then
844 * forking subtasks will create an unnamed virtual thread for each subtask.
845 *
846 * <p> If a {@linkplain Configuration#withTimeout(Duration) timeout} is set then it
847 * starts when the scope is opened. If the timeout expires before the scope has
848 * {@linkplain #join() joined} then the scope is <a href="#Cancallation">cancelled</a>
849 * and the {@code join} method throws {@link TimeoutException}.
850 *
851 * <p> The new scope is owned by the current thread. Only code executing in this
852 * thread can {@linkplain #fork(Callable) fork}, {@linkplain #join() join}, or
853 * {@linkplain #close close} the scope.
854 *
855 * <p> Construction captures the current thread's {@linkplain ScopedValue scoped
856 * value} bindings for inheritance by threads started in the scope.
857 *
858 * @param joiner the joiner
859 * @param configFunction a function to produce the configuration
860 * @return a new scope
861 * @param <T> the result type of subtasks executed in the scope
862 * @param <R> the result type of the scope
863 * @since 25
864 */
865 static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner,
866 Function<Configuration, Configuration> configFunction) {
867 return StructuredTaskScopeImpl.open(joiner, configFunction);
868 }
869
870 /**
871 * Opens a new {@code StructuredTaskScope}to use the given {@code Joiner} object. The
872 * scope is created with the <a href="#DefaultConfiguration">default configuration</a>.
873 * The default configuration has a {@code ThreadFactory} that creates unnamed
874 * <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>,
875 * is unnamed for monitoring and management purposes, and has no timeout.
876 *
877 * @implSpec
878 * This factory method is equivalent to invoking the 2-arg open method with the given
879 * joiner and the {@linkplain Function#identity() identity function}.
880 *
881 * @param joiner the joiner
882 * @return a new scope
883 * @param <T> the result type of subtasks executed in the scope
884 * @param <R> the result type of the scope
885 * @since 25
886 */
887 static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner) {
888 return open(joiner, Function.identity());
889 }
890
891 /**
892 * Opens a new {@code StructuredTaskScope} that can be used to fork subtasks that return
893 * results of any type. The scope's {@link #join()} method waits for all subtasks to
894 * succeed or any subtask to fail.
895 *
896 * <p> The {@code join} method returns {@code null} if all subtasks complete successfully.
897 * It throws {@link FailedException} if any subtask fails, with the exception from
898 * the first subtask to fail as the cause.
899 *
900 * <p> The scope is created with the <a href="#DefaultConfiguration">default
901 * configuration</a>. The default configuration has a {@code ThreadFactory} that creates
902 * unnamed <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual
903 * threads</a>, is unnamed for monitoring and management purposes, and has no timeout.
904 *
905 * @implSpec
906 * This factory method is equivalent to invoking the 2-arg open method with a joiner
907 * created with {@link Joiner#awaitAllSuccessfulOrThrow() awaitAllSuccessfulOrThrow()}
908 * and the {@linkplain Function#identity() identity function}.
909 *
910 * @param <T> the result type of subtasks
911 * @return a new scope
912 * @since 25
913 */
914 static <T> StructuredTaskScope<T, Void> open() {
915 return open(Joiner.awaitAllSuccessfulOrThrow(), Function.identity());
916 }
917
918 /**
919 * Fork a subtask by starting a new thread in this scope to execute a value-returning
920 * method. The new thread executes the subtask concurrently with the current thread.
921 * The parameter to this method is a {@link Callable}, the new thread executes its
922 * {@link Callable#call() call()} method.
923 *
924 * <p> This method first creates a {@link Subtask Subtask} object to represent the
925 * <em>forked subtask</em>. It invokes the joiner's {@link Joiner#onFork(Subtask) onFork}
926 * method with the subtask in the {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state.
927 * If the {@code onFork} completes with an exception or error then it is propagated by
928 * the {@code fork} method without creating a thread. If the scope is already
929 * <a href="#Cancallation">cancelled</a>, or {@code onFork} returns {@code true} to
930 * cancel the scope, then this method returns the {@code Subtask}, in the
931 * {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state, without creating a thread to
932 * execute the subtask.
933 *
934 * <p> If the scope is not cancelled, and the {@code onFork} method returns {@code false},
935 * then a thread is created with the {@link ThreadFactory} configured when the scope
936 * was opened, and the thread is started. Forking a subtask inherits the current thread's
937 * {@linkplain ScopedValue scoped value} bindings. The bindings must match the bindings
938 * captured when the scope was opened. If the subtask completes (successfully or with
939 * an exception) before the scope is cancelled, then the thread invokes the joiner's
940 * {@link Joiner#onComplete(Subtask) onComplete} method with the subtask in the
941 * {@link Subtask.State#SUCCESS SUCCESS} or {@link Subtask.State#FAILED FAILED} state.
942 * If the {@code onComplete} method completes with an exception or error, then the
943 * {@linkplain Thread.UncaughtExceptionHandler uncaught exception handler} is invoked
944 * with the exception or error before the thread terminates.
945 *
946 * <p> This method returns the {@link Subtask Subtask} object. In some usages, this
947 * object may be used to get its result. In other cases it may be used for correlation
948 * or be discarded. To ensure correct usage, the {@link Subtask#get() Subtask.get()}
949 * method may only be called by the scope owner to get the result after it has
976 * parameter to this method is a {@link Runnable}, the new thread executes its
977 * {@link Runnable#run() run} method, and {@link Subtask#get() Subtask.get()} returns
978 * {@code null} if the subtask completes successfully.
979 *
980 * @param task the task for the thread to execute
981 * @param <U> the result type
982 * @return the subtask
983 * @throws WrongThreadException if the current thread is not the scope owner
984 * @throws IllegalStateException if the owner has already {@linkplain #join() joined}
985 * or the scope is closed
986 * @throws StructureViolationException if the current scoped value bindings are not
987 * the same as when the scope was created
988 * @throws RejectedExecutionException if the thread factory rejected creating a
989 * thread to run the subtask
990 * @since 25
991 */
992 <U extends T> Subtask<U> fork(Runnable task);
993
994 /**
995 * Returns the result, or throws, after waiting for all subtasks to complete or
996 * the scope to be <a href="#Cancallation">cancelled</a>.
997 *
998 * <p> This method waits for all subtasks started in this scope to complete or the
999 * scope to be cancelled. If a {@linkplain Configuration#withTimeout(Duration) timeout}
1000 * is configured and the timeout expires before or while waiting, then the scope is
1001 * cancelled and {@link TimeoutException TimeoutException} is thrown. Once finished
1002 * waiting, the {@code Joiner}'s {@link Joiner#result() result()} method is invoked
1003 * to get the result or throw an exception. If the {@code result()} method throws
1004 * then this method throws {@code FailedException} with the exception as the cause.
1005 *
1006 * <p> This method may only be invoked by the scope owner, and only once.
1007 *
1008 * @return the result
1009 * @throws WrongThreadException if the current thread is not the scope owner
1010 * @throws IllegalStateException if already joined or this scope is closed
1011 * @throws FailedException if the <i>outcome</i> is an exception, thrown with the
1012 * exception from {@link Joiner#result() Joiner.result()} as the cause
1013 * @throws TimeoutException if a timeout is set and the timeout expires before or
1014 * while waiting
1015 * @throws InterruptedException if interrupted while waiting
1016 * @since 25
1017 */
1018 R join() throws InterruptedException;
1019
1020 /**
1021 * {@return {@code true} if this scope is <a href="#Cancallation">cancelled</a> or in
1022 * the process of being cancelled, otherwise {@code false}}
1023 *
1024 * <p> Cancelling the scope prevents new threads from starting in the scope and
1025 * {@linkplain Thread#interrupt() interrupts} threads executing unfinished subtasks.
1026 * It may take some time before the interrupted threads finish execution; this
1027 * method may return {@code true} before all threads have been interrupted or before
1028 * all threads have finished.
1029 *
1030 * @apiNote A task with a lengthy "forking phase" (the code that executes before
1031 * it invokes {@link #join() join}) may use this method to avoid doing work in cases
1032 * where scope is cancelled by the completion of a previously forked subtask or timeout.
1033 *
1034 * @since 25
1035 */
1036 boolean isCancelled();
1037
1038 /**
1039 * Closes this scope.
1040 *
1041 * <p> This method first <a href="#Cancallation">cancels</a> the scope, if not
1042 * already cancelled. This interrupts the threads executing unfinished subtasks. This
1043 * method then waits for all threads to finish. If interrupted while waiting then it
1044 * will continue to wait until the threads finish, before completing with the interrupt
1045 * status set.
1046 *
1047 * <p> This method may only be invoked by the scope owner. If the scope
1048 * is already closed then the scope owner invoking this method has no effect.
1049 *
1050 * <p> A {@code StructuredTaskScope} is intended to be used in a <em>structured
1051 * manner</em>. If this method is called to close a scope before nested task
1052 * scopes are closed then it closes the underlying construct of each nested scope
1053 * (in the reverse order that they were created in), closes this scope, and then
1054 * throws {@link StructureViolationException}.
1055 * Similarly, if this method is called to close a scope while executing with
1056 * {@linkplain ScopedValue scoped value} bindings, and the scope was created
1057 * before the scoped values were bound, then {@code StructureViolationException} is
1058 * thrown after closing the scope.
1059 * If a thread terminates without first closing scopes that it owns then
1060 * termination will cause the underlying construct of each of its open tasks scopes to
1061 * be closed. Closing is performed in the reverse order that the scopes were
1062 * created in. Thread termination may therefore be delayed when the scope owner
1063 * has to wait for threads forked in these scopes to finish.
1064 *
1065 * @throws IllegalStateException thrown after closing the scope if the scope
1066 * owner did not attempt to join after forking
1067 * @throws WrongThreadException if the current thread is not the scope owner
1068 * @throws StructureViolationException if a structure violation was detected
1069 */
1070 @Override
1071 void close();
1072 }
|
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package java.util.concurrent;
26
27 import java.time.Duration;
28 import java.util.function.Predicate;
29 import java.util.function.Supplier;
30 import java.util.function.UnaryOperator;
31 import java.util.stream.Stream;
32 import jdk.internal.javac.PreviewFeature;
33
34 /**
35 * An API for <em>structured concurrency</em>. {@code StructuredTaskScope} supports cases
36 * where execution of a <em>task</em> (a unit of work) splits into several concurrent
37 * subtasks, and where the subtasks must complete before the task continues. A {@code
38 * StructuredTaskScope} can be used to ensure that the lifetime of a concurrent operation
39 * is confined by a <em>syntax block</em>, similar to that of a sequential operation in
40 * structured programming.
41 *
42 * <p> {@code StructuredTaskScope} defines the static method {@link #open() open} to open
43 * a new {@code StructuredTaskScope} and the {@link #close() close} method to close it.
44 * The API is designed to be used with the {@code try}-with-resources statement where
45 * the {@code StructuredTaskScope} is opened as a resource and then closed automatically.
46 * The code inside the block uses the {@link #fork(Callable) fork} method to fork subtasks.
47 * After forking, it uses the {@link #join() join} method to wait for all subtasks to
48 * finish (or some other outcome) as a single operation. Forking a subtask starts a new
49 * {@link Thread} to run the subtask. The thread executing the task does not continue
50 * beyond the {@code close} method until all threads started to execute subtasks have finished.
51 * To ensure correct usage, the {@code fork}, {@code join} and {@code close} methods may
52 * only be invoked by the <em>owner thread</em> (the thread that opened the {@code
53 * StructuredTaskScope}), the {@code fork} method may not be called after {@code join},
54 * the {@code join} method may only be invoked once to get the outcome, and the
55 * {@code close} method throws an exception after closing if the owner did not invoke the
56 * {@code join} method after forking subtasks.
57 *
58 * <p> As a first example, consider a task that splits into two subtasks to concurrently
59 * fetch resources from two URL locations "left" and "right". Both subtasks may complete
60 * successfully, one subtask may succeed and the other may fail, or both subtasks may
61 * fail. The task in this example is interested in the successful result from both
62 * subtasks. It waits in the {@link #join() join} method for both subtasks to complete
63 * successfully or for either subtask to fail.
64 * {@snippet lang=java :
65 * // @link substring="open" target="#open()" :
66 * try (var scope = StructuredTaskScope.open()) {
67 *
68 * // @link substring="fork" target="#fork(Callable)" :
69 * Subtask<String> subtask1 = scope.fork(() -> query(left));
70 * Subtask<Integer> subtask2 = scope.fork(() -> query(right));
71 *
72 * // throws if either subtask fails
73 * scope.join(); // @link substring="join" target="#join()"
74 *
75 * // both subtasks completed successfully
76 * // @link substring="get" target="Subtask#get()" :
181 * } catch (StructuredTaskScope.FailedException e) {
182 *
183 * Throwable cause = e.getCause();
184 * switch (cause) {
185 * case IOException ioe -> ..
186 * default -> ..
187 * }
188 *
189 * }
190 * }
191 * In other cases it may not be useful to catch {@code FailedException} but instead leave
192 * it to propagate to the configured {@linkplain Thread.UncaughtExceptionHandler uncaught
193 * exception handler} for logging purposes.
194 *
195 * <p> For cases where a specific exception triggers the use of a default result then it
196 * may be more appropriate to handle this in the subtask itself rather than the subtask
197 * failing and the scope owner handling the exception.
198 *
199 * <h2>Configuration</h2>
200 *
201 * A {@code StructuredTaskScope} is opened with {@linkplain Configuration configuration}
202 * that consists of a {@link ThreadFactory} to create threads, an optional name for the
203 * scope, and an optional timeout. The name is intended for monitoring and management
204 * purposes.
205 *
206 * <p> The {@link #open()} and {@link #open(Joiner)} methods create a {@code StructuredTaskScope}
207 * with the <a id="DefaultConfiguration"> <em>default configuration</em></a>. The default
208 * configuration has a {@code ThreadFactory} that creates unnamed {@linkplain
209 * Thread##virtual-threads virtual threads}, does not name the scope, and has no timeout.
210 *
211 * <p> The 2-arg {@link #open(Joiner, UnaryOperator) open} method can be used to create a
212 * {@code StructuredTaskScope} that uses a different {@code ThreadFactory}, is named for
213 * monitoring and management purposes, or has a timeout that cancels the scope if the
214 * timeout expires before or while waiting for subtasks to complete. The {@code open}
215 * method is called with a {@linkplain UnaryOperator operator} that is applied to the
216 * default configuration and returns a {@link Configuration Configuration} for the
217 * {@code StructuredTaskScope} under construction.
218 *
219 * <p> The following example opens a new {@code StructuredTaskScope} with a {@code
220 * ThreadFactory} that creates virtual threads {@linkplain Thread#setName(String) named}
221 * "duke-0", "duke-1" ...
222 * {@snippet lang = java:
223 * // @link substring="name" target="Thread.Builder#name(String, long)" :
224 * ThreadFactory factory = Thread.ofVirtual().name("duke-", 0).factory();
225 *
226 * // @link substring="withThreadFactory" target="Configuration#withThreadFactory(ThreadFactory)" :
227 * try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) {
228 *
229 * scope.fork( .. ); // runs in a virtual thread with name "duke-0"
230 * scope.fork( .. ); // runs in a virtual thread with name "duke-1"
231 *
232 * scope.join();
233 *
234 * }
235 *}
236 *
237 * <p> A second example sets a timeout, represented by a {@link Duration}. The timeout
238 * starts when the new scope is opened. If the timeout expires before the {@code join}
239 * method has completed then the scope is {@linkplain ##Cancallation cancelled} (this
240 * interrupts the threads executing the two subtasks), and the {@code join} method
241 * throws {@link TimeoutException TimeoutException}.
242 * {@snippet lang=java :
243 * Duration timeout = Duration.ofSeconds(10);
244 *
245 * // @link substring="allSuccessfulOrThrow" target="Joiner#allSuccessfulOrThrow()" :
246 * try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
247 * // @link substring="withTimeout" target="Configuration#withTimeout(Duration)" :
248 * cf -> cf.withTimeout(timeout))) {
249 *
250 * scope.fork(callable1);
251 * scope.fork(callable2);
252 *
253 * List<String> result = scope.join()
254 * .map(Subtask::get)
255 * .toList();
256 *
257 * }
258 * }
259 *
260 * <h2>Inheritance of scoped value bindings</h2>
261 *
296 * USERNAME, then value "duke" will be returned.
297 * {@snippet lang=java :
298 * // @link substring="newInstance" target="ScopedValue#newInstance()" :
299 * private static final ScopedValue<String> USERNAME = ScopedValue.newInstance();
300 *
301 * // @link substring="where" target="ScopedValue#where(ScopedValue, Object)" :
302 * MyResult result = ScopedValue.where(USERNAME, "duke").call(() -> {
303 *
304 * try (var scope = StructuredTaskScope.open()) {
305 *
306 * Subtask<String> subtask1 = scope.fork( .. ); // inherits binding
307 * Subtask<Integer> subtask2 = scope.fork( .. ); // inherits binding
308 *
309 * scope.join();
310 * return new MyResult(subtask1.get(), subtask2.get());
311 * }
312 *
313 * });
314 * }
315 *
316 * <p> A scoped value inherited into a subtask may be {@linkplain ScopedValue##rebind
317 * rebound} to a new value in the subtask for the bounded execution of some method executed
318 * in the subtask. When the method completes, the value of the {@code ScopedValue} reverts
319 * to its previous value, the value inherited from the thread executing the task.
320 *
321 * <p> A subtask may execute code that itself opens a new {@code StructuredTaskScope}.
322 * A task executing in thread T1 opens a {@code StructuredTaskScope} and forks a
323 * subtask that runs in thread T2. The scoped value bindings captured when T1 opens the
324 * scope are inherited into T2. The subtask (in thread T2) executes code that opens a
325 * new {@code StructuredTaskScope} and forks a subtask that runs in thread T3. The scoped
326 * value bindings captured when T2 opens the scope are inherited into T3. These
327 * include (or may be the same) as the bindings that were inherited from T1. In effect,
328 * scoped values are inherited into a tree of subtasks, not just one level of subtask.
329 *
330 * <h2>Memory consistency effects</h2>
331 *
332 * <p> Actions in the owner thread of a {@code StructuredTaskScope} prior to {@linkplain
333 * #fork forking} of a subtask {@linkplain java.util.concurrent##MemoryVisibility
334 * <i>happen-before</i>} any actions taken by that subtask, which in turn
335 * <i>happen-before</i> the subtask result is {@linkplain Subtask#get() retrieved}.
336 *
337 * <h2>General exceptions</h2>
338 *
339 * <p> Unless otherwise specified, passing a {@code null} argument to a method in this
340 * class will cause a {@link NullPointerException} to be thrown.
341 *
342 * @param <T> the result type of subtasks executed in the scope
343 * @param <R> the result type of the scope
344 *
345 * @jls 17.4.5 Happens-before Order
346 * @since 21
347 */
348 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
349 public sealed interface StructuredTaskScope<T, R>
350 extends AutoCloseable
351 permits StructuredTaskScopeImpl {
352
353 /**
354 * Represents a subtask forked with {@link #fork(Callable)} or {@link #fork(Runnable)}.
385 /**
386 * The subtask failed with an exception. The {@link Subtask#exception()
387 * Subtask.exception()} method can be used to get the exception. This is a
388 * terminal state.
389 */
390 FAILED,
391 }
392
393 /**
394 * {@return the subtask state}
395 */
396 State state();
397
398 /**
399 * Returns the result of this subtask if it completed successfully. If the subtask
400 * was forked with {@link #fork(Callable) fork(Callable)} then the result from the
401 * {@link Callable#call() call} method is returned. If the subtask was forked with
402 * {@link #fork(Runnable) fork(Runnable)} then {@code null} is returned.
403 *
404 * <p> Code executing in the scope owner thread can use this method to get the
405 * result of a successful subtask after it has {@linkplain #join() joined}.
406 *
407 * <p> Code executing in the {@code Joiner} {@link Joiner#onComplete(Subtask)
408 * onComplete} method should test that the {@linkplain #state() subtask state} is
409 * {@link State#SUCCESS SUCCESS} before using this method to get the result.
410 *
411 * <p> This method may be invoked by any thread after the scope owner has joined.
412 * The only case where this method can be used to get the result before the scope
413 * owner has joined is when called from the {@code onComplete(Subtask)} method.
414 *
415 * @return the possibly-null result
416 * @throws IllegalStateException if the subtask has not completed or did not
417 * complete successfully, or this method if invoked outside the context of the
418 * {@code onComplete(Subtask)} method before the owner thread has joined
419 * @see State#SUCCESS
420 */
421 T get();
422
423 /**
424 * {@return the exception or error thrown by this subtask if it failed}
425 * If the subtask was forked with {@link #fork(Callable) fork(Callable)} then the
426 * exception or error thrown by the {@link Callable#call() call} method is returned.
427 * If the subtask was forked with {@link #fork(Runnable) fork(Runnable)} then the
428 * exception or error thrown by the {@link Runnable#run() run} method is returned.
429 *
430 * <p> Code executing in the scope owner thread can use this method to get the
431 * exception thrown by a failed subtask after it has {@linkplain #join() joined}.
432 *
433 * <p> Code executing in a {@code Joiner} {@link Joiner#onComplete(Subtask)
434 * onComplete} method should test that the {@linkplain #state() subtask state} is
435 * {@link State#FAILED FAILED} before using this method to get the exception.
436 *
437 * <p> This method may be invoked by any thread after the scope owner has joined.
438 * The only case where this method can be used to get the exception before the scope
439 * owner has joined is when called from the {@code onComplete(Subtask)} method.
440 *
441 * @throws IllegalStateException if the subtask has not completed or completed
442 * with a result, or this method if invoked outside the context of the {@code
443 * onComplete(Subtask)} method before the owner thread has joined
444 * @see State#FAILED
445 */
446 Throwable exception();
447 }
448
449 /**
450 * An object used with a {@link StructuredTaskScope} to handle subtask completion and
451 * produce the result for the scope owner waiting in the {@link #join() join} method
452 * for subtasks to complete.
453 *
454 * <p> Joiner defines static methods to create {@code Joiner} objects for common cases:
455 * <ul>
456 * <li> {@link #allSuccessfulOrThrow() allSuccessfulOrThrow()} creates a {@code Joiner}
457 * that yields a stream of the completed subtasks for {@code join} to return when
458 * all subtasks complete successfully. It cancels the scope and causes {@code join}
459 * to throw if any subtask fails.
460 * <li> {@link #anySuccessfulResultOrThrow() anySuccessfulResultOrThrow()} creates a
461 * {@code Joiner} that yields the result of the first subtask to succeed for {@code
462 * join} to return. It causes {@code join} to throw if all subtasks fail.
463 * <li> {@link #awaitAllSuccessfulOrThrow() awaitAllSuccessfulOrThrow()} creates a
464 * {@code Joiner} that waits for all successful subtasks. It cancels the scope and
465 * causes {@code join} to throw if any subtask fails.
466 * <li> {@link #awaitAll() awaitAll()} creates a {@code Joiner} that waits for all
467 * subtasks to complete. It does not cancel the scope or cause {@code join} to throw.
468 * </ul>
469 *
470 * <p> In addition to the methods to create {@code Joiner} objects for common cases,
471 * the {@link #allUntil(Predicate) allUntil(Predicate)} method is defined to create a
472 * {@code Joiner} that yields a stream of all subtasks. It is created with a {@link
473 * Predicate Predicate} that determines if the scope should continue or be cancelled.
474 * This {@code Joiner} can be built upon to create custom policies that cancel the
475 * scope based on some condition.
476 *
477 * <p> More advanced policies can be developed by implementing the {@code Joiner}
478 * interface. The {@link #onFork(Subtask)} method is invoked when subtasks are forked.
479 * The {@link #onComplete(Subtask)} method is invoked when subtasks complete with a
480 * result or exception. These methods return a {@code boolean} to indicate if the scope
481 * should be cancelled. These methods can be used to collect subtasks, results, or
482 * exceptions, and control when to cancel the scope. The {@link #result()} method
483 * must be implemented to produce the result (or exception) for the {@code join}
484 * method.
485 *
486 * <p> If a {@code StructuredTaskScope} is opened with a {@linkplain
487 * Configuration#withTimeout(Duration) timeout}, and the timeout expires before or
488 * while waiting in {@link StructuredTaskScope#join() join()}, then the scope is
489 * {@linkplain StructuredTaskScope##Cancallation cancelled}, and the {@code Joiners}'s
490 * {@link #onTimeout()} method is invoked to notify the {@code Joiner} and optionally
491 * throw {@link TimeoutException TimeoutException}. If the {@code onTimeout()} method
492 * does not throw then the {@code join()} method will invoke the {@link #result()}
493 * method to produce a result. This result may be based on the outcome of subtasks
494 * that completed before the timeout expired.
495 *
496 * <p> Unless otherwise specified, passing a {@code null} argument to a method
497 * in this class will cause a {@link NullPointerException} to be thrown.
498 *
499 * @implSpec Implementations of this interface must be thread safe. The {@link
500 * #onComplete(Subtask)} method defined by this interface may be invoked by several
501 * threads concurrently. The {@link #onTimeout()} method may be invoked at around
502 * the same time that subtasks complete.
503 *
504 * @apiNote It is very important that a new {@code Joiner} object is created for each
505 * {@code StructuredTaskScope}. {@code Joiner} objects should never be shared with
506 * different scopes or re-used after a scope is closed.
507 *
508 * <p> Designing a {@code Joiner} should take into account the code at the use-site
509 * where the results from the {@link StructuredTaskScope#join() join} method are
510 * processed. It should be clear what the {@code Joiner} does vs. the application
511 * code at the use-site. In general, the {@code Joiner} implementation is not the
512 * place for "business logic". A {@code Joiner} should be designed to be as general
513 * purpose as possible.
514 *
515 * @param <T> the result type of subtasks executed in the scope
516 * @param <R> the result type of the scope
517 * @since 25
518 * @see #open(Joiner)
519 */
520 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
521 interface Joiner<T, R> {
522 /**
523 * Invoked by {@link #fork(Callable) fork(Callable)} and {@link #fork(Runnable)
524 * fork(Runnable)} when forking a subtask. The method is invoked before a thread
525 * is created to run the subtask.
526 *
527 * @implSpec The default implementation throws {@code NullPointerException} if the
528 * subtask is {@code null}. It throws {@code IllegalArgumentException} if the
529 * subtask is not in the {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state, it
530 * otherwise returns {@code false}.
531 *
532 * @apiNote This method is invoked by the {@code fork} methods. It should not be
533 * invoked directly.
534 *
535 * @param subtask the subtask
536 * @return {@code true} to cancel the scope, otherwise {@code false}
537 */
538 default boolean onFork(Subtask<? extends T> subtask) {
539 if (subtask.state() != Subtask.State.UNAVAILABLE) {
540 throw new IllegalArgumentException("Subtask not in UNAVAILABLE state");
541 }
542 return false;
543 }
544
545 /**
548 * subtask completes after the scope is cancelled.
549 *
550 * @implSpec The default implementation throws {@code NullPointerException} if the
551 * subtask is {@code null}. It throws {@code IllegalArgumentException} if the
552 * subtask is not in the {@link Subtask.State#SUCCESS SUCCESS} or {@link
553 * Subtask.State#FAILED FAILED} state, it otherwise returns {@code false}.
554 *
555 * @apiNote This method is invoked by subtasks when they complete. It should not
556 * be invoked directly.
557 *
558 * @param subtask the subtask
559 * @return {@code true} to cancel the scope, otherwise {@code false}
560 */
561 default boolean onComplete(Subtask<? extends T> subtask) {
562 if (subtask.state() == Subtask.State.UNAVAILABLE) {
563 throw new IllegalArgumentException("Subtask has not completed");
564 }
565 return false;
566 }
567
568 /**
569 * Invoked by the {@link #join() join()} method if the scope was opened with a
570 * timeout, and the timeout expires before or while waiting in the {@code join}
571 * method.
572 *
573 * @implSpec The default implementation throws {@link TimeoutException TimeoutException}.
574 *
575 * @apiNote This method is intended for {@code Joiner} implementations that do not
576 * throw {@link TimeoutException TimeoutException}, or require a notification when
577 * the timeout expires before or while waiting in {@code join}.
578 *
579 * <p> This method is invoked by the {@code join} method. It should not be
580 * invoked directly.
581 *
582 * @throws TimeoutException for {@code join} to throw
583 * @since 26
584 */
585 default void onTimeout() {
586 throw new TimeoutException();
587 }
588
589 /**
590 * Invoked by the {@link #join() join()} method to produce the result (or exception)
591 * after waiting for all subtasks to complete or the scope cancelled. The result
592 * from this method is returned by the {@code join} method. If this method throws,
593 * then {@code join} throws {@link FailedException} with the exception thrown by
594 * this method as the cause.
595 *
596 * <p> In normal usage, this method will be called at most once by the {@code join}
597 * method to produce the result (or exception). The behavior of this method when
598 * invoked directly, and invoked more than once, is undefined. Where possible, an
599 * implementation should return an equal result (or throw the same exception) on
600 * second or subsequent calls to produce the outcome.
601 *
602 * @apiNote This method is invoked by the {@code join} method. It should not be
603 * invoked directly.
604 *
605 * @return the result
606 * @throws Throwable the exception
607 */
608 R result() throws Throwable;
609
610 /**
611 * {@return a new Joiner object that yields a stream of all subtasks when all
612 * subtasks complete successfully}
613 * The {@code Joiner} {@linkplain StructuredTaskScope##Cancallation cancels}
614 * the scope and causes {@code join} to throw if any subtask fails.
615 *
616 * <p> If all subtasks complete successfully then the joiner's {@link
617 * Joiner#result()} method returns a stream of all subtasks, in the order that they
618 * were forked, for the {@link StructuredTaskScope#join() join()} to return. If
619 * the scope was opened with a {@linkplain Configuration#withTimeout(Duration)
620 * timeout}, and the timeout expires before or while waiting for all subtasks to
621 * complete, then the {@code join} method throws {@code TimeoutException}.
622 *
623 * @apiNote Joiners returned by this method are suited to cases where all subtasks
624 * return a result of the same type. Joiners returned by {@link
625 * #awaitAllSuccessfulOrThrow()} are suited to cases where the subtasks return
626 * results of different types.
627 *
628 * @param <T> the result type of subtasks
629 */
630 static <T> Joiner<T, Stream<Subtask<T>>> allSuccessfulOrThrow() {
631 return new Joiners.AllSuccessful<>();
632 }
633
634 /**
635 * {@return a new Joiner object that yields the result of any subtask that
636 * completed successfully}
637 * The {@code Joiner} causes {@code join} to throw if all subtasks fail.
638 *
639 * <p> The joiner's {@link Joiner#result()} method returns the result of a subtask,
640 * that completed successfully, for the {@link StructuredTaskScope#join() join()}
641 * to return. If all subtasks fail then the {@code result} method throws the
642 * exception from one of the failed subtasks. The {@code result} method throws
643 * {@code NoSuchElementException} if no subtasks were forked. If the scope was
644 * opened with a {@linkplain Configuration#withTimeout(Duration) timeout}, and
645 * the timeout expires before or while waiting for any subtask to complete
646 * successfully, then the {@code join} method throws {@code TimeoutException}.
647 *
648 * @param <T> the result type of subtasks
649 */
650 static <T> Joiner<T, T> anySuccessfulResultOrThrow() {
651 return new Joiners.AnySuccessful<>();
652 }
653
654 /**
655 * {@return a new Joiner object that waits for subtasks to complete successfully}
656 * The {@code Joiner} {@linkplain StructuredTaskScope##Cancallation cancels}
657 * the scope and causes {@code join} to throw if any subtask fails.
658 *
659 * <p> The joiner's {@link Joiner#result() result} method returns {@code null}
660 * if all subtasks complete successfully, or throws the exception from the first
661 * subtask to fail. If the scope was opened with a {@linkplain
662 * Configuration#withTimeout(Duration) timeout}, and the timeout expires before or
663 * while waiting for all subtasks to complete, then the {@code join} method throws
664 * {@code TimeoutException}.
665 *
666 * @apiNote Joiners returned by this method are suited to cases where subtasks
667 * return results of different types. Joiners returned by {@link #allSuccessfulOrThrow()}
668 * are suited to cases where the subtasks return a result of the same type.
669 *
670 * @param <T> the result type of subtasks
671 */
672 static <T> Joiner<T, Void> awaitAllSuccessfulOrThrow() {
673 return new Joiners.AwaitSuccessful<>();
674 }
675
676 /**
677 * {@return a new Joiner object that waits for all subtasks to complete}
678 * The {@code Joiner} does not cancel the scope if a subtask fails.
679 *
680 * <p> The joiner's {@link Joiner#result() result} method returns {@code null}.
681 * If the scope was opened with a {@linkplain Configuration#withTimeout(Duration)
682 * timeout}, and the timeout expires before or while waiting for all subtasks to
683 * complete, then the {@code join} method throws {@code TimeoutException}.
684 *
685 * @apiNote This Joiner is useful for cases where subtasks make use of
686 * <em>side-effects</em> rather than return results or fail with exceptions.
687 * The {@link #fork(Runnable) fork(Runnable)} method can be used to fork subtasks
688 * that do not return a result.
689 *
690 * <p> This Joiner can also be used for <em>fan-in</em> scenarios where subtasks
691 * are forked to handle incoming connections and the number of subtasks is unbounded.
692 * In this example, the thread executing the {@code acceptLoop} method will only
693 * stop when interrupted or the listener socket is closed asynchronously.
694 * {@snippet lang=java :
695 * void acceptLoop(ServerSocket listener) throws IOException, InterruptedException {
696 * try (var scope = StructuredTaskScope.open(Joiner.<Socket>awaitAll())) {
697 * while (true) {
698 * Socket socket = listener.accept();
699 * scope.fork(() -> handle(socket));
700 * }
701 * }
702 * }
703 * }
704 *
705 * @param <T> the result type of subtasks
706 */
707 static <T> Joiner<T, Void> awaitAll() {
708 // ensure that new Joiner object is returned
709 return new Joiner<T, Void>() {
710 @Override
711 public Void result() {
712 return null;
713 }
714 };
715 }
716
717 /**
718 * {@return a new Joiner object that yields a stream of all subtasks when all
719 * subtasks complete or a predicate returns {@code true} to cancel the scope}
720 *
721 * <p> The joiner's {@link #onComplete(Subtask)} method invokes the predicate's
722 * {@link Predicate#test(Object) test} method with the subtask that completed
723 * successfully or failed with an exception. If the {@code test} method
724 * returns {@code true} then {@linkplain StructuredTaskScope##Cancallation
725 * the scope is cancelled}. The {@code test} method must be thread safe as it
726 * may be invoked concurrently from several threads. If the {@code test} method
727 * completes with an exception or error, then the thread that executed the subtask
728 * invokes the {@linkplain Thread.UncaughtExceptionHandler uncaught exception handler}
729 * with the exception or error before the thread terminates.
730 *
731 * <p> The joiner's {@link #result()} method returns the stream of all subtasks,
732 * in fork order. The stream may contain subtasks that have completed
733 * (in {@link Subtask.State#SUCCESS SUCCESS} or {@link Subtask.State#FAILED FAILED}
734 * state) or subtasks in the {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state
735 * if the scope was cancelled before all subtasks were forked or completed.
736 *
737 * <p> The joiner's {@link #onTimeout()} method does nothing. If configured with
738 * a {@linkplain Configuration#withTimeout(Duration) timeout}, and the timeout
739 * expires before or while waiting in {@link StructuredTaskScope#join() join},
740 * then the {@link #result()} method returns the stream of all subtasks.
741 * Subtasks that did not complete before the timeout expired will be in the
742 * {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state.
743 *
744 * <p> The following example uses this method to create a {@code Joiner} that
745 * {@linkplain StructuredTaskScope##Cancallation cancels} the scope when two or
746 * more subtasks fail.
747 * {@snippet lang=java :
748 * class CancelAfterTwoFailures<T> implements Predicate<Subtask<? extends T>> {
749 * private final AtomicInteger failedCount = new AtomicInteger();
750 * @Override
751 * public boolean test(Subtask<? extends T> subtask) {
752 * return subtask.state() == Subtask.State.FAILED
753 * && failedCount.incrementAndGet() >= 2;
754 * }
755 * }
756 *
757 * var joiner = Joiner.allUntil(new CancelAfterTwoFailures<String>());
758 * }
759 *
760 * <p> The following example uses {@code allUntil} to wait for all subtasks to
761 * complete without any cancellation. This is similar to {@link #awaitAll()}
762 * except that it yields a list of the completed subtasks.
763 * {@snippet lang=java :
764 * <T> List<Subtask<T>> invokeAll(Collection<Callable<T>> tasks) throws InterruptedException {
765 * try (var scope = StructuredTaskScope.open(Joiner.<T>allUntil(_ -> false))) {
766 * tasks.forEach(scope::fork);
767 * return scope.join().toList();
768 * }
769 * }
770 * }
771 *
772 * <p> The following example uses {@code allUntil} to get the results of all
773 * subtasks that complete successfully within a timeout period.
774 * {@snippet lang=java :
775 * <T> List<T> invokeAll(Collection<Callable<T>> tasks, Duration timeout) throws InterruptedException {
776 * try (var scope = StructuredTaskScope.open(Joiner.<T>allUntil(_ -> false), cf -> cf.withTimeout(timeout))) {
777 * tasks.forEach(scope::fork);
778 * return scope.join()
779 * .filter(s -> s.state() == Subtask.State.SUCCESS)
780 * .map(Subtask::get)
781 * .toList();
782 * }
783 * }
784 * }
785 *
786 * @param isDone the predicate to evaluate completed subtasks
787 * @param <T> the result type of subtasks
788 */
789 static <T> Joiner<T, Stream<Subtask<T>>> allUntil(Predicate<Subtask<? extends T>> isDone) {
790 return new Joiners.AllSubtasks<>(isDone);
791 }
792 }
793
794 /**
795 * Represents the configuration for a {@code StructuredTaskScope}.
796 *
797 * <p> The configuration for a {@code StructuredTaskScope} consists of a {@link
798 * ThreadFactory} to create threads, an optional name for the scope, and an optional
799 * timeout. The name is intended for monitoring and management purposes.
800 *
801 * <p> Creating a {@code StructuredTaskScope} with its 2-arg {@link #open(Joiner, UnaryOperator)
802 * open} method allows a different configuration to be used. The operator specified
803 * to the {@code open} method is applied to the default configuration and returns the
804 * configuration for the {@code StructuredTaskScope} under construction. The operator
805 * can use the {@code with-} prefixed methods defined here to specify the components
806 * of the configuration to use.
807 *
808 * <p> Unless otherwise specified, passing a {@code null} argument to a method
809 * in this class will cause a {@link NullPointerException} to be thrown.
810 *
811 * @since 25
812 */
813 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
814 sealed interface Configuration permits StructuredTaskScopeImpl.ConfigImpl {
815 /**
816 * {@return a new {@code Configuration} object with the given thread factory}
817 * The other components are the same as this object. The thread factory is used by
818 * a scope to create threads when {@linkplain #fork(Callable) forking} subtasks.
819 * @param threadFactory the thread factory
820 *
821 * @apiNote The thread factory will typically create {@linkplain Thread##virtual-threads
822 * virtual threads}, maybe with names for monitoring purposes, an {@linkplain
823 * Thread.UncaughtExceptionHandler uncaught exception handler}, or other properties
824 * configured.
825 *
826 * @see #fork(Callable)
827 */
828 Configuration withThreadFactory(ThreadFactory threadFactory);
829
830 /**
831 * {@return a new {@code Configuration} object with the given scope name}
832 * The other components are the same as this object. A scope is optionally
833 * named for the purposes of monitoring and management.
834 * @param name the name
835 */
836 Configuration withName(String name);
837
838 /**
839 * {@return a new {@code Configuration} object with the given timeout}
840 * The other components are the same as this object.
841 * @param timeout the timeout
842 *
843 * @apiNote Applications using deadlines, expressed as an {@link java.time.Instant},
844 * can use {@link Duration#between Duration.between(Instant.now(), deadline)} to
845 * compute the timeout for this method.
846 *
847 * @see #join()
848 * @see Joiner#onTimeout()
849 */
850 Configuration withTimeout(Duration timeout);
851 }
852
853 /**
854 * Exception thrown by {@link #join()} when the outcome is an exception rather than a
855 * result.
856 *
857 * @since 25
858 */
859 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
860 final class FailedException extends RuntimeException {
861 @java.io.Serial
862 static final long serialVersionUID = -1533055100078459923L;
863
864 /**
865 * Constructs a {@code FailedException} with the specified cause.
866 *
867 * @param cause the cause, can be {@code null}
868 */
869 FailedException(Throwable cause) {
870 super(cause);
871 }
872 }
873
874 /**
875 * Exception thrown by {@link #join()} if the scope was opened with a timeout,
876 * the timeout expired before or while waiting in {@code join}, and the {@link
877 * Joiner#onTimeout() Joiner.onTimeout} method throws this exception.
878 *
879 * @since 25
880 * @see Configuration#withTimeout(Duration)
881 * @see Joiner#onTimeout()
882 */
883 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
884 final class TimeoutException extends RuntimeException {
885 @java.io.Serial
886 static final long serialVersionUID = 705788143955048766L;
887
888 /**
889 * Constructs a {@code TimeoutException} with no detail message.
890 */
891 TimeoutException() { }
892 }
893
894 /**
895 * Opens a new {@code StructuredTaskScope} to use the given {@code Joiner} object and
896 * with configuration that is the result of applying the given operator to the
897 * {@linkplain ##DefaultConfiguration default configuration}.
898 *
899 * <p> The {@code configOperator} is called with the default configuration and returns
900 * the configuration for the new scope. The operator may, for example, set the
901 * {@linkplain Configuration#withThreadFactory(ThreadFactory) ThreadFactory} or set a
902 * {@linkplain Configuration#withTimeout(Duration) timeout}. If the operator completes
903 * with an exception or error then it is propagated by this method. If the operator
904 * returns {@code null} then {@code NullPointerException} is thrown.
905 *
906 * <p> If a {@code ThreadFactory} is set then its {@link ThreadFactory#newThread(Runnable)
907 * newThread} method will be called to create threads when {@linkplain #fork(Callable)
908 * forking} subtasks in this scope. If a {@code ThreadFactory} is not set then
909 * forking subtasks will create an unnamed virtual thread for each subtask.
910 *
911 * <p> If a {@linkplain Configuration#withTimeout(Duration) timeout} is set then it
912 * starts when the scope is opened. If the timeout expires before the scope has
913 * {@linkplain #join() joined} then the scope is {@linkplain ##Cancallation cancelled}
914 * and the {@code Joiner}'s {@link Joiner#onTimeout()} method is invoked to throw
915 * optionally throw {@link TimeoutException TimeoutException}.
916 *
917 * <p> The new scope is owned by the current thread. Only code executing in this
918 * thread can {@linkplain #fork(Callable) fork}, {@linkplain #join() join}, or
919 * {@linkplain #close close} the scope.
920 *
921 * <p> Construction captures the current thread's {@linkplain ScopedValue scoped
922 * value} bindings for inheritance by threads started in the scope.
923 *
924 * @param joiner the joiner
925 * @param configOperator the operator to produce the configuration
926 * @return a new scope
927 * @param <T> the result type of subtasks executed in the scope
928 * @param <R> the result type of the scope
929 * @since 26
930 */
931 static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner,
932 UnaryOperator<Configuration> configOperator) {
933 return StructuredTaskScopeImpl.open(joiner, configOperator);
934 }
935
936 /**
937 * Opens a new {@code StructuredTaskScope}to use the given {@code Joiner} object. The
938 * scope is created with the {@linkplain ##DefaultConfiguration default configuration}.
939 * The default configuration has a {@code ThreadFactory} that creates unnamed
940 * {@linkplain Thread##irtual-threads virtual threads}, does not name the scope, and
941 * has no timeout.
942 *
943 * @implSpec
944 * This factory method is equivalent to invoking the 2-arg open method with the given
945 * joiner and the {@linkplain UnaryOperator#identity() identity operator}.
946 *
947 * @param joiner the joiner
948 * @return a new scope
949 * @param <T> the result type of subtasks executed in the scope
950 * @param <R> the result type of the scope
951 * @since 25
952 */
953 static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner) {
954 return open(joiner, UnaryOperator.identity());
955 }
956
957 /**
958 * Opens a new {@code StructuredTaskScope} that can be used to fork subtasks that return
959 * results of any type. The scope's {@link #join()} method waits for all subtasks to
960 * succeed or any subtask to fail.
961 *
962 * <p> The {@code join} method returns {@code null} if all subtasks complete successfully.
963 * It throws {@link FailedException} if any subtask fails, with the exception from
964 * the first subtask to fail as the cause.
965 *
966 * <p> The scope is created with the {@linkplain ##DefaultConfiguration default
967 * configuration}. The default configuration has a {@code ThreadFactory} that creates
968 * unnamed {@linkplain Thread##virtual-threads virtual threads}, does not name the
969 * scope, and has no timeout.
970 *
971 * @implSpec
972 * This factory method is equivalent to invoking the 2-arg open method with a joiner
973 * created with {@link Joiner#awaitAllSuccessfulOrThrow() awaitAllSuccessfulOrThrow()}
974 * and the {@linkplain UnaryOperator#identity() identity operator}.
975 *
976 * @param <T> the result type of subtasks
977 * @return a new scope
978 * @since 25
979 */
980 static <T> StructuredTaskScope<T, Void> open() {
981 return open(Joiner.awaitAllSuccessfulOrThrow(), UnaryOperator.identity());
982 }
983
984 /**
985 * Fork a subtask by starting a new thread in this scope to execute a value-returning
986 * method. The new thread executes the subtask concurrently with the current thread.
987 * The parameter to this method is a {@link Callable}, the new thread executes its
988 * {@link Callable#call() call()} method.
989 *
990 * <p> This method first creates a {@link Subtask Subtask} object to represent the
991 * <em>forked subtask</em>. It invokes the joiner's {@link Joiner#onFork(Subtask) onFork}
992 * method with the subtask in the {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state.
993 * If the {@code onFork} completes with an exception or error then it is propagated by
994 * the {@code fork} method without creating a thread. If the scope is already
995 * {@linkplain ##Cancallation cancelled}, or {@code onFork} returns {@code true} to
996 * cancel the scope, then this method returns the {@code Subtask}, in the
997 * {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state, without creating a thread to
998 * execute the subtask.
999 *
1000 * <p> If the scope is not cancelled, and the {@code onFork} method returns {@code false},
1001 * then a thread is created with the {@link ThreadFactory} configured when the scope
1002 * was opened, and the thread is started. Forking a subtask inherits the current thread's
1003 * {@linkplain ScopedValue scoped value} bindings. The bindings must match the bindings
1004 * captured when the scope was opened. If the subtask completes (successfully or with
1005 * an exception) before the scope is cancelled, then the thread invokes the joiner's
1006 * {@link Joiner#onComplete(Subtask) onComplete} method with the subtask in the
1007 * {@link Subtask.State#SUCCESS SUCCESS} or {@link Subtask.State#FAILED FAILED} state.
1008 * If the {@code onComplete} method completes with an exception or error, then the
1009 * {@linkplain Thread.UncaughtExceptionHandler uncaught exception handler} is invoked
1010 * with the exception or error before the thread terminates.
1011 *
1012 * <p> This method returns the {@link Subtask Subtask} object. In some usages, this
1013 * object may be used to get its result. In other cases it may be used for correlation
1014 * or be discarded. To ensure correct usage, the {@link Subtask#get() Subtask.get()}
1015 * method may only be called by the scope owner to get the result after it has
1042 * parameter to this method is a {@link Runnable}, the new thread executes its
1043 * {@link Runnable#run() run} method, and {@link Subtask#get() Subtask.get()} returns
1044 * {@code null} if the subtask completes successfully.
1045 *
1046 * @param task the task for the thread to execute
1047 * @param <U> the result type
1048 * @return the subtask
1049 * @throws WrongThreadException if the current thread is not the scope owner
1050 * @throws IllegalStateException if the owner has already {@linkplain #join() joined}
1051 * or the scope is closed
1052 * @throws StructureViolationException if the current scoped value bindings are not
1053 * the same as when the scope was created
1054 * @throws RejectedExecutionException if the thread factory rejected creating a
1055 * thread to run the subtask
1056 * @since 25
1057 */
1058 <U extends T> Subtask<U> fork(Runnable task);
1059
1060 /**
1061 * Returns the result, or throws, after waiting for all subtasks to complete or
1062 * the scope to be {@linkplain ##Cancallation cancelled}.
1063 *
1064 * <p> This method waits for all subtasks started in this scope to complete or the
1065 * scope to be cancelled. Once finished waiting, the {@code Joiner}'s {@link
1066 * Joiner#result() result()} method is invoked to get the result or throw an exception.
1067 * If the {@code result()} method throws then {@code join()} throws
1068 * {@code FailedException} with the exception from the {@code Joiner} as the cause.
1069 *
1070 * <p> If a {@linkplain Configuration#withTimeout(Duration) timeout} is configured,
1071 * and the timeout expires before or while waiting, then the scope is cancelled and
1072 * the {@code Joiner}'s {@link Joiner#onTimeout() onTimeout()} method is invoked
1073 * before calling the {@code Joiner}'s {@code result()} method. If the {@code onTimeout()}
1074 * method throws {@link TimeoutException TimeoutException} (or throws any exception
1075 * or error), then it is propagated by this method. If the {@code onTimeout()} method
1076 * does not throw then the {@code Joiner}'s {@code result()} method is invoked to
1077 * get the result or throw.
1078 *
1079 * <p> This method may only be invoked by the scope owner. Once the result or
1080 * exception outcome is obtained, this method may not be invoked again. The only
1081 * case where the method may be called again is where {@code InterruptedException}
1082 * is thrown while waiting.
1083 *
1084 * @return the result
1085 * @throws WrongThreadException if the current thread is not the scope owner
1086 * @throws IllegalStateException if already joined or this scope is closed
1087 * @throws FailedException if the <i>outcome</i> is an exception, thrown with the
1088 * exception from {@link Joiner#result() Joiner.result()} as the cause
1089 * @throws TimeoutException if a timeout is set, the timeout expires before or while
1090 * waiting, and {@link Joiner#onTimeout() Joiner.onTimeout()} throws this exception
1091 * @throws InterruptedException if interrupted while waiting
1092 * @since 25
1093 */
1094 R join() throws InterruptedException;
1095
1096 /**
1097 * {@return {@code true} if this scope is {@linkplain ##Cancallation cancelled} or in
1098 * the process of being cancelled, otherwise {@code false}}
1099 *
1100 * <p> Cancelling the scope prevents new threads from starting in the scope and
1101 * {@linkplain Thread#interrupt() interrupts} threads executing unfinished subtasks.
1102 * It may take some time before the interrupted threads finish execution; this
1103 * method may return {@code true} before all threads have been interrupted or before
1104 * all threads have finished.
1105 *
1106 * @apiNote A task with a lengthy "forking phase" (the code that executes before
1107 * it invokes {@link #join() join}) may use this method to avoid doing work in cases
1108 * where scope is cancelled by the completion of a previously forked subtask or timeout.
1109 *
1110 * @since 25
1111 */
1112 boolean isCancelled();
1113
1114 /**
1115 * Closes this scope.
1116 *
1117 * <p> This method first {@linkplain ##Cancallation cancels} the scope, if not
1118 * already cancelled. This interrupts the threads executing unfinished subtasks. This
1119 * method then waits for all threads to finish. If interrupted while waiting then it
1120 * will continue to wait until the threads finish, before completing with the interrupt
1121 * status set.
1122 *
1123 * <p> This method may only be invoked by the scope owner. If the scope
1124 * is already closed then the scope owner invoking this method has no effect.
1125 *
1126 * <p> A {@code StructuredTaskScope} is intended to be used in a <em>structured
1127 * manner</em>. If this method is called to close a scope before nested task
1128 * scopes are closed then it closes the underlying construct of each nested scope
1129 * (in the reverse order that they were created in), closes this scope, and then
1130 * throws {@link StructureViolationException}.
1131 * Similarly, if this method is called to close a scope while executing with
1132 * {@linkplain ScopedValue scoped value} bindings, and the scope was created
1133 * before the scoped values were bound, then {@code StructureViolationException} is
1134 * thrown after closing the scope.
1135 * If a thread terminates without first closing scopes that it owns then
1136 * termination will cause the underlying construct of each of its open tasks scopes to
1137 * be closed. Closing is performed in the reverse order that the scopes were
1138 * created in. Thread termination may therefore be delayed when the scope owner
1139 * has to wait for threads forked in these scopes to finish.
1140 *
1141 * @throws IllegalStateException thrown after closing the scope if the scope
1142 * owner did not attempt to join after forking
1143 * @throws WrongThreadException if the current thread is not the scope owner
1144 * @throws StructureViolationException if a structure violation was detected
1145 */
1146 @Override
1147 void close();
1148 }
|