7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package java.util.concurrent;
26
27 import java.lang.invoke.MethodHandles;
28 import java.lang.invoke.VarHandle;
29 import java.time.Duration;
30 import java.time.Instant;
31 import java.util.Objects;
32 import java.util.Optional;
33 import java.util.concurrent.locks.ReentrantLock;
34 import java.util.function.Function;
35 import java.util.function.Supplier;
36 import jdk.internal.javac.PreviewFeature;
37 import jdk.internal.misc.ThreadFlock;
38 import jdk.internal.invoke.MhUtil;
39
40 /**
41 * A basic API for <em>structured concurrency</em>. {@code StructuredTaskScope} supports
42 * cases where a task splits into several concurrent subtasks, and where the subtasks must
43 * complete before the main task continues. A {@code StructuredTaskScope} can be used to
44 * ensure that the lifetime of a concurrent operation is confined by a <em>syntax block</em>,
45 * just like that of a sequential operation in structured programming.
46 *
47 * <h2>Basic operation</h2>
48 *
49 * A {@code StructuredTaskScope} is created with one of its public constructors. It defines
50 * the {@link #fork(Callable) fork} method to start a thread to execute a subtask, the {@link
51 * #join() join} method to wait for all subtasks to finish, and the {@link #close() close}
52 * method to close the task scope. The API is intended to be used with the {@code
53 * try-with-resources} statement. The intention is that code in the try <em>block</em>
54 * uses the {@code fork} method to fork threads to execute the subtasks, wait for the
55 * subtasks to finish with the {@code join} method, and then <em>process the results</em>.
56 * A call to the {@code fork} method returns a {@link Subtask Subtask} to representing
57 * the <em>forked subtask</em>. Once {@code join} is called, the {@code Subtask} can be
58 * used to get the result completed successfully, or the exception if the subtask failed.
59 * {@snippet lang=java :
60 * Callable<String> task1 = ...
61 * Callable<Integer> task2 = ...
62 *
63 * try (var scope = new StructuredTaskScope<Object>()) {
64 *
65 * Subtask<String> subtask1 = scope.fork(task1); // @highlight substring="fork"
66 * Subtask<Integer> subtask2 = scope.fork(task2); // @highlight substring="fork"
67 *
68 * scope.join(); // @highlight substring="join"
69 *
70 * ... process results/exceptions ...
71 *
72 * } // close // @highlight substring="close"
73 * }
74 * <p> The following example forks a collection of homogeneous subtasks, waits for all of
75 * them to complete with the {@code join} method, and uses the {@link Subtask.State
76 * Subtask.State} to partition the subtasks into a set of the subtasks that completed
77 * successfully and another for the subtasks that failed.
78 * {@snippet lang=java :
79 * List<Callable<String>> callables = ...
80 *
81 * try (var scope = new StructuredTaskScope<String>()) {
82 *
83 * List<Subtask<String>> subtasks = callables.stream().map(scope::fork).toList();
84 *
85 * scope.join();
86 *
87 * Map<Boolean, Set<Subtask<String>>> map = subtasks.stream()
88 * .collect(Collectors.partitioningBy(h -> h.state() == Subtask.State.SUCCESS,
89 * Collectors.toSet()));
90 *
91 * } // close
92 * }
93 *
94 * <p> To ensure correct usage, the {@code join} and {@code close} methods may only be
95 * invoked by the <em>owner</em> (the thread that opened/created the task scope), and the
96 * {@code close} method throws an exception after closing if the owner did not invoke the
97 * {@code join} method after forking.
98 *
99 * <p> {@code StructuredTaskScope} defines the {@link #shutdown() shutdown} method to shut
100 * down a task scope without closing it. The {@code shutdown()} method <em>cancels</em> all
101 * unfinished subtasks by {@linkplain Thread#interrupt() interrupting} the threads. It
102 * prevents new threads from starting in the task scope. If the owner is waiting in the
103 * {@code join} method then it will wakeup.
104 *
105 * <p> Shutdown is used for <em>short-circuiting</em> and allow subclasses to implement
106 * <em>policy</em> that does not require all subtasks to finish.
107 *
108 * <h2>Subclasses with policies for common cases</h2>
109 *
110 * Two subclasses of {@code StructuredTaskScope} are defined to implement policy for
111 * common cases:
112 * <ol>
113 * <li> {@link ShutdownOnSuccess ShutdownOnSuccess} captures the result of the first
114 * subtask to complete successfully. Once captured, it shuts down the task scope to
115 * interrupt unfinished threads and wakeup the owner. This class is intended for cases
116 * where the result of any subtask will do ("invoke any") and where there is no need to
117 * wait for results of other unfinished subtasks. It defines methods to get the first
118 * result or throw an exception if all subtasks fail.
119 * <li> {@link ShutdownOnFailure ShutdownOnFailure} captures the exception of the first
120 * subtask to fail. Once captured, it shuts down the task scope to interrupt unfinished
121 * threads and wakeup the owner. This class is intended for cases where the results of all
122 * subtasks are required ("invoke all"); if any subtask fails then the results of other
123 * unfinished subtasks are no longer needed. If defines methods to throw an exception if
124 * any of the subtasks fail.
125 * </ol>
126 *
127 * <p> The following are two examples that use the two classes. In both cases, a pair of
128 * subtasks are forked to fetch resources from two URL locations "left" and "right". The
129 * first example creates a ShutdownOnSuccess object to capture the result of the first
130 * subtask to complete successfully, cancelling the other by way of shutting down the task
131 * scope. The main task waits in {@code join} until either subtask completes with a result
132 * or both subtasks fail. It invokes {@link ShutdownOnSuccess#result(Function)
133 * result(Function)} method to get the captured result. If both subtasks fail then this
134 * method throws a {@code WebApplicationException} with the exception from one of the
135 * subtasks as the cause.
136 * {@snippet lang=java :
137 * try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
138 *
139 * scope.fork(() -> fetch(left));
140 * scope.fork(() -> fetch(right));
141 *
142 * scope.join();
143 *
144 * // @link regex="result(?=\()" target="ShutdownOnSuccess#result" :
145 * String result = scope.result(e -> new WebApplicationException(e));
146 *
147 * ...
148 * }
149 * }
150 * The second example creates a ShutdownOnFailure object to capture the exception of the
151 * first subtask to fail, cancelling the other by way of shutting down the task scope. The
152 * main task waits in {@link #joinUntil(Instant)} until both subtasks complete with a
153 * result, either fails, or a deadline is reached. It invokes {@link
154 * ShutdownOnFailure#throwIfFailed(Function) throwIfFailed(Function)} to throw an exception
155 * if either subtask fails. This method is a no-op if both subtasks complete successfully.
156 * The example uses {@link Supplier#get()} to get the result of each subtask. Using
157 * {@code Supplier} instead of {@code Subtask} is preferred for common cases where the
158 * object returned by fork is only used to get the result of a subtask that completed
159 * successfully.
160 * {@snippet lang=java :
161 * Instant deadline = ...
162 *
163 * try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
164 *
165 * Supplier<String> supplier1 = scope.fork(() -> query(left));
166 * Supplier<String> supplier2 = scope.fork(() -> query(right));
167 *
168 * scope.joinUntil(deadline);
169 *
170 * // @link substring="throwIfFailed" target="ShutdownOnFailure#throwIfFailed" :
171 * scope.throwIfFailed(e -> new WebApplicationException(e));
172 *
173 * // both subtasks completed successfully
174 * String result = Stream.of(supplier1, supplier2)
175 * .map(Supplier::get)
176 * .collect(Collectors.joining(", ", "{ ", " }"));
177 *
178 * ...
179 * }
180 * }
181 *
182 * <h2>Extending StructuredTaskScope</h2>
183 *
184 * {@code StructuredTaskScope} can be extended, and the {@link #handleComplete(Subtask)
185 * handleComplete} method overridden, to implement policies other than those implemented
186 * by {@code ShutdownOnSuccess} and {@code ShutdownOnFailure}. A subclass may, for example,
187 * collect the results of subtasks that complete successfully and ignore subtasks that
188 * fail. It may collect exceptions when subtasks fail. It may invoke the {@link #shutdown()
189 * shutdown} method to shut down and cause {@link #join() join} to wakeup when some
190 * condition arises.
191 *
192 * <p> A subclass will typically define methods to make available results, state, or other
193 * outcome to code that executes after the {@code join} method. A subclass that collects
194 * results and ignores subtasks that fail may define a method that returns the results.
195 * A subclass that implements a policy to shut down when a subtask fails may define a
196 * method to get the exception of the first subtask to fail.
197 *
198 * <p> The following is an example of a simple {@code StructuredTaskScope} implementation
199 * that collects homogenous subtasks that complete successfully. It defines the method
200 * "{@code completedSuccessfully()}" that the main task can invoke after it joins.
201 * {@snippet lang=java :
202 * class CollectingScope<T> extends StructuredTaskScope<T> {
203 * private final Queue<Subtask<? extends T>> subtasks = new LinkedTransferQueue<>();
204 *
205 * @Override
206 * protected void handleComplete(Subtask<? extends T> subtask) {
207 * if (subtask.state() == Subtask.State.SUCCESS) {
208 * subtasks.add(subtask);
209 * }
210 * }
211 *
212 * @Override
213 * public CollectingScope<T> join() throws InterruptedException {
214 * super.join();
215 * return this;
216 * }
217 *
218 * public Stream<Subtask<? extends T>> completedSuccessfully() {
219 * // @link substring="ensureOwnerAndJoined" target="ensureOwnerAndJoined" :
220 * super.ensureOwnerAndJoined();
221 * return subtasks.stream();
222 * }
223 * }
224 * }
225 * <p> The implementations of the {@code completedSuccessfully()} method in the example
226 * invokes {@link #ensureOwnerAndJoined()} to ensure that the method can only be invoked
227 * by the owner thread and only after it has joined.
228 *
229 * <h2><a id="TreeStructure">Tree structure</a></h2>
230 *
231 * Task scopes form a tree where parent-child relations are established implicitly when
232 * opening a new task scope:
233 * <ul>
234 * <li> A parent-child relation is established when a thread started in a task scope
235 * opens its own task scope. A thread started in task scope "A" that opens task scope
236 * "B" establishes a parent-child relation where task scope "A" is the parent of task
237 * scope "B".
238 * <li> A parent-child relation is established with nesting. If a thread opens task
239 * scope "B", then opens task scope "C" (before it closes "B"), then the enclosing task
240 * scope "B" is the parent of the nested task scope "C".
241 * </ul>
242 *
243 * The <i>descendants</i> of a task scope are the child task scopes that it is a parent
244 * of, plus the descendants of the child task scopes, recursively.
245 *
246 * <p> The tree structure supports:
247 * <ul>
248 * <li> Inheritance of {@linkplain ScopedValue scoped values} across threads.
249 * <li> Confinement checks. The phrase "threads contained in the task scope" in method
250 * descriptions means threads started in the task scope or descendant scopes.
251 * </ul>
252 *
253 * <p> The following example demonstrates the inheritance of a scoped value. A scoped
254 * value {@code USERNAME} is bound to the value "{@code duke}". A {@code StructuredTaskScope}
255 * is created and its {@code fork} method invoked to start a thread to execute {@code
256 * childTask}. The thread inherits the scoped value <em>bindings</em> captured when
257 * creating the task scope. The code in {@code childTask} uses the value of the scoped
258 * value and so reads the value "{@code duke}".
259 * {@snippet lang=java :
260 * private static final ScopedValue<String> USERNAME = ScopedValue.newInstance();
261 *
262 * // @link substring="run" target="ScopedValue.Carrier#run(Runnable)" :
263 * ScopedValue.where(USERNAME, "duke").run(() -> {
264 * try (var scope = new StructuredTaskScope<String>()) {
265 *
266 * scope.fork(() -> childTask()); // @highlight substring="fork"
267 * ...
268 * }
269 * });
270 *
271 * ...
272 *
273 * String childTask() {
274 * // @link substring="get" target="ScopedValue#get()" :
275 * String name = USERNAME.get(); // "duke"
276 * ...
277 * }
278 * }
279 *
280 * <p> {@code StructuredTaskScope} does not define APIs that exposes the tree structure
281 * at this time.
282 *
283 * <p> Unless otherwise specified, passing a {@code null} argument to a constructor
284 * or method in this class will cause a {@link NullPointerException} to be thrown.
285 *
286 * <h2>Memory consistency effects</h2>
287 *
288 * <p> Actions in the owner thread of, or a thread contained in, the task scope prior to
289 * {@linkplain #fork forking} of a subtask
290 * <a href="{@docRoot}/java.base/java/util/concurrent/package-summary.html#MemoryVisibility">
291 * <i>happen-before</i></a> any actions taken by that subtask, which in turn <i>happen-before</i>
292 * the subtask result is {@linkplain Subtask#get() retrieved} or <i>happen-before</i> any
293 * actions taken in a thread after {@linkplain #join() joining} of the task scope.
294 *
295 * @jls 17.4.5 Happens-before Order
296 *
297 * @param <T> the result type of tasks executed in the task scope
298 * @since 21
299 */
300 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
301 public class StructuredTaskScope<T> implements AutoCloseable {
302 private final ThreadFactory factory;
303 private final ThreadFlock flock;
304 private final ReentrantLock shutdownLock = new ReentrantLock();
305
306 // states: OPEN -> SHUTDOWN -> CLOSED
307 private static final int OPEN = 0; // initial state
308 private static final int SHUTDOWN = 1;
309 private static final int CLOSED = 2;
310
311 // state: set to SHUTDOWN by any thread, set to CLOSED by owner, read by any thread
312 private volatile int state;
313
314 // Counters to support checking that the task scope owner joins before processing
315 // results and attempts join before closing the task scope. These counters are
316 // accessed only by the owner thread.
317 private int forkRound; // incremented when the first subtask is forked after join
318 private int lastJoinAttempted; // set to the current fork round when join is attempted
319 private int lastJoinCompleted; // set to the current fork round when join completes
320
321 /**
322 * Represents a subtask forked with {@link #fork(Callable)}.
323 * @param <T> the result type
324 * @since 21
325 */
326 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
327 public sealed interface Subtask<T> extends Supplier<T> permits SubtaskImpl {
328 /**
329 * {@return the value returning task provided to the {@code fork} method}
330 *
331 * @apiNote Task objects with unique identity may be used for correlation by
332 * implementations of {@link #handleComplete(Subtask) handleComplete}.
333 */
334 Callable<? extends T> task();
335
336 /**
337 * Represents the state of a subtask.
338 * @see Subtask#state()
339 * @since 21
340 */
341 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
342 enum State {
343 /**
344 * The subtask result or exception is not available. This state indicates that
345 * the subtask was forked but has not completed, it completed after the task
346 * scope was {@linkplain #shutdown() shut down}, or it was forked after the
347 * task scope was shut down.
348 */
349 UNAVAILABLE,
350 /**
351 * The subtask completed successfully with a result. The {@link Subtask#get()
352 * Subtask.get()} method can be used to obtain the result. This is a terminal
353 * state.
354 */
355 SUCCESS,
356 /**
357 * The subtask failed with an exception. The {@link Subtask#exception()
358 * Subtask.exception()} method can be used to obtain the exception. This is a
359 * terminal state.
360 */
361 FAILED,
362 }
363
364 /**
365 * {@return the state of the subtask}
366 */
367 State state();
368
369 /**
370 * Returns the result of the subtask.
371 *
372 * <p> To ensure correct usage, if the scope owner {@linkplain #fork(Callable) forks}
373 * a subtask, then it must join (with {@link #join() join} or {@link #joinUntil(Instant)
374 * joinUntil}) before it can obtain the result of the subtask.
375 *
376 * @return the possibly-null result
377 * @throws IllegalStateException if the subtask has not completed, did not complete
378 * successfully, or the current thread is the task scope owner and did not join
379 * after forking
380 * @see State#SUCCESS
381 */
382 T get();
383
384 /**
385 * {@return the exception thrown by the subtask}
386 *
387 * <p> To ensure correct usage, if the scope owner {@linkplain #fork(Callable) forks}
388 * a subtask, then it must join (with {@link #join() join} or {@link #joinUntil(Instant)
389 * joinUntil}) before it can obtain the exception thrown by the subtask.
390 *
391 * @throws IllegalStateException if the subtask has not completed, completed with
392 * a result, or the current thread is the task scope owner and did not join after
393 * forking
394 * @see State#FAILED
395 */
396 Throwable exception();
397 }
398
399 /**
400 * Creates a structured task scope with the given name and thread factory. The task
401 * scope is optionally named for the purposes of monitoring and management. The thread
402 * factory is used to {@link ThreadFactory#newThread(Runnable) create} threads when
403 * subtasks are {@linkplain #fork(Callable) forked}. The task scope is owned by the
404 * current thread.
405 *
406 * <p> Construction captures the current thread's {@linkplain ScopedValue scoped value}
407 * bindings for inheritance by threads started in the task scope. The
408 * <a href="#TreeStructure">Tree Structure</a> section in the class description details
409 * how parent-child relations are established implicitly for the purpose of inheritance
410 * of scoped value bindings.
411 *
412 * @param name the name of the task scope, can be null
413 * @param factory the thread factory
414 */
415 @SuppressWarnings("this-escape")
416 public StructuredTaskScope(String name, ThreadFactory factory) {
417 this.factory = Objects.requireNonNull(factory, "'factory' is null");
418 if (name == null)
419 name = Objects.toIdentityString(this);
420 this.flock = ThreadFlock.open(name);
421 }
422
423 /**
424 * Creates an unnamed structured task scope that creates virtual threads. The task
425 * scope is owned by the current thread.
426 *
427 * @implSpec This constructor is equivalent to invoking the 2-arg constructor with a
428 * name of {@code null} and a thread factory that creates virtual threads.
429 */
430 public StructuredTaskScope() {
431 this(null, Thread.ofVirtual().factory());
432 }
433
434 private IllegalStateException newIllegalStateExceptionScopeClosed() {
435 return new IllegalStateException("Task scope is closed");
436 }
437
438 private IllegalStateException newIllegalStateExceptionNoJoin() {
439 return new IllegalStateException("Owner did not join after forking subtasks");
440 }
441
442 /**
443 * Throws IllegalStateException if the scope is closed, returning the state if not
444 * closed.
445 */
446 private int ensureOpen() {
447 int s = state;
448 if (s == CLOSED)
449 throw newIllegalStateExceptionScopeClosed();
450 return s;
451 }
452
453 /**
454 * Throws WrongThreadException if the current thread is not the owner.
455 */
456 private void ensureOwner() {
457 if (Thread.currentThread() != flock.owner())
458 throw new WrongThreadException("Current thread not owner");
459 }
460
461 /**
462 * Throws WrongThreadException if the current thread is not the owner
463 * or a thread contained in the tree.
464 */
465 private void ensureOwnerOrContainsThread() {
466 Thread currentThread = Thread.currentThread();
467 if (currentThread != flock.owner() && !flock.containsThread(currentThread))
468 throw new WrongThreadException("Current thread not owner or thread in the tree");
469 }
470
471 /**
472 * Throws IllegalStateException if the current thread is the owner, and the owner did
473 * not join after forking a subtask in the given fork round.
474 */
475 private void ensureJoinedIfOwner(int round) {
476 if (Thread.currentThread() == flock.owner() && (round > lastJoinCompleted)) {
477 throw newIllegalStateExceptionNoJoin();
478 }
479 }
480
481 /**
482 * Ensures that the current thread is the owner of this task scope and that it joined
483 * (with {@link #join()} or {@link #joinUntil(Instant)}) after {@linkplain #fork(Callable)
484 * forking} subtasks.
485 *
486 * @apiNote This method can be used by subclasses that define methods to make available
487 * results, state, or other outcome to code intended to execute after the join method.
488 *
489 * @throws WrongThreadException if the current thread is not the task scope owner
490 * @throws IllegalStateException if the task scope is open and task scope owner did
491 * not join after forking
492 */
493 protected final void ensureOwnerAndJoined() {
494 ensureOwner();
495 if (forkRound > lastJoinCompleted) {
496 throw newIllegalStateExceptionNoJoin();
497 }
498 }
499
500 /**
501 * Invoked by a subtask when it completes successfully or fails in this task scope.
502 * This method is not invoked if a subtask completes after the task scope is
503 * {@linkplain #shutdown() shut down}.
504 *
505 * @implSpec The default implementation throws {@code NullPointerException} if the
506 * subtask is {@code null}. It throws {@link IllegalArgumentException} if the subtask
507 * has not completed.
508 *
509 * @apiNote The {@code handleComplete} method should be thread safe. It may be
510 * invoked by several threads concurrently.
511 *
512 * @param subtask the subtask
513 *
514 * @throws IllegalArgumentException if called with a subtask that has not completed
515 */
516 protected void handleComplete(Subtask<? extends T> subtask) {
517 if (subtask.state() == Subtask.State.UNAVAILABLE)
518 throw new IllegalArgumentException();
519 }
520
521 /**
522 * Starts a new thread in this task scope to execute a value-returning task, thus
523 * creating a <em>subtask</em> of this task scope.
524 *
525 * <p> The value-returning task is provided to this method as a {@link Callable}, the
526 * thread executes the task's {@link Callable#call() call} method. The thread is
527 * created with the task scope's {@link ThreadFactory}. It inherits the current thread's
528 * {@linkplain ScopedValue scoped value} bindings. The bindings must match the bindings
529 * captured when the task scope was created.
530 *
531 * <p> This method returns a {@link Subtask Subtask} to represent the <em>forked
532 * subtask</em>. The {@code Subtask} object can be used to obtain the result when
533 * the subtask completes successfully, or the exception when the subtask fails. To
534 * ensure correct usage, the {@link Subtask#get() get()} and {@link Subtask#exception()
535 * exception()} methods may only be called by the task scope owner after it has waited
536 * for all threads to finish with the {@link #join() join} or {@link #joinUntil(Instant)}
537 * methods. When the subtask completes, the thread invokes the {@link
538 * #handleComplete(Subtask) handleComplete} method to consume the completed subtask.
539 * If the task scope is {@linkplain #shutdown() shut down} before the subtask completes
540 * then the {@code handleComplete} method will not be invoked.
541 *
542 * <p> If this task scope is {@linkplain #shutdown() shutdown} (or in the process of
543 * shutting down) then the subtask will not run and the {@code handleComplete} method
544 * will not be invoked.
545 *
546 * <p> This method may only be invoked by the task scope owner or threads contained
547 * in the task scope.
548 *
549 * @implSpec This method may be overridden for customization purposes, wrapping tasks
550 * for example. If overridden, the subclass must invoke {@code super.fork} to start a
551 * new thread in this task scope.
552 *
553 * @param task the value-returning task for the thread to execute
554 * @param <U> the result type
555 * @return the subtask
556 * @throws IllegalStateException if this task scope is closed
557 * @throws WrongThreadException if the current thread is not the task scope owner or a
558 * thread contained in the task scope
559 * @throws StructureViolationException if the current scoped value bindings are not
560 * the same as when the task scope was created
561 * @throws RejectedExecutionException if the thread factory rejected creating a
562 * thread to run the subtask
563 */
564 public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
565 Objects.requireNonNull(task, "'task' is null");
566 int s = ensureOpen(); // throws ISE if closed
567
568 // when forked by the owner, the subtask is forked in the current or next round
569 int round = -1;
570 if (Thread.currentThread() == flock.owner()) {
571 round = forkRound;
572 if (forkRound == lastJoinCompleted) {
573 // new round if first fork after join
574 round++;
575 }
576 }
577
578 SubtaskImpl<U> subtask = new SubtaskImpl<>(this, task, round);
579 if (s < SHUTDOWN) {
580 // create thread to run task
581 Thread thread = factory.newThread(subtask);
582 if (thread == null) {
583 throw new RejectedExecutionException("Rejected by thread factory");
584 }
585
586 // attempt to start the thread
587 try {
588 flock.start(thread);
589 } catch (IllegalStateException e) {
590 // shutdown by another thread, or underlying flock is shutdown due
591 // to unstructured use
592 }
593 }
594
595 // force owner to join if this is the first fork in the round
596 if (Thread.currentThread() == flock.owner() && round > forkRound) {
597 forkRound = round;
598 }
599
600 // return forked subtask or a subtask that did not run
601 return subtask;
602 }
603
604 /**
605 * Wait for all threads to finish or the task scope to shut down.
606 */
607 private void implJoin(Duration timeout)
608 throws InterruptedException, TimeoutException
609 {
610 ensureOwner();
611 lastJoinAttempted = forkRound;
612 int s = ensureOpen(); // throws ISE if closed
613 if (s == OPEN) {
614 // wait for all threads, wakeup, interrupt, or timeout
615 if (timeout != null) {
616 flock.awaitAll(timeout);
617 } else {
618 flock.awaitAll();
619 }
620 }
621 lastJoinCompleted = forkRound;
622 }
623
624 /**
625 * Wait for all subtasks started in this task scope to finish or the task scope to
626 * shut down.
627 *
628 * <p> This method waits for all subtasks by waiting for all threads {@linkplain
629 * #fork(Callable) started} in this task scope to finish execution. It stops waiting
630 * when all threads finish, the task scope is {@linkplain #shutdown() shut down}, or
631 * the current thread is {@linkplain Thread#interrupt() interrupted}.
632 *
633 * <p> This method may only be invoked by the task scope owner.
634 *
635 * @implSpec This method may be overridden for customization purposes or to return a
636 * more specific return type. If overridden, the subclass must invoke {@code
637 * super.join} to ensure that the method waits for threads in this task scope to
638 * finish.
639 *
640 * @return this task scope
641 * @throws IllegalStateException if this task scope is closed
642 * @throws WrongThreadException if the current thread is not the task scope owner
643 * @throws InterruptedException if interrupted while waiting
644 */
645 public StructuredTaskScope<T> join() throws InterruptedException {
646 try {
647 implJoin(null);
648 } catch (TimeoutException e) {
649 throw new InternalError();
650 }
651 return this;
652 }
653
654 /**
655 * Wait for all subtasks started in this task scope to finish or the task scope to
656 * shut down, up to the given deadline.
657 *
658 * <p> This method waits for all subtasks by waiting for all threads {@linkplain
659 * #fork(Callable) started} in this task scope to finish execution. It stops waiting
660 * when all threads finish, the task scope is {@linkplain #shutdown() shut down}, the
661 * deadline is reached, or the current thread is {@linkplain Thread#interrupt()
662 * interrupted}.
663 *
664 * <p> This method may only be invoked by the task scope owner.
665 *
666 * @implSpec This method may be overridden for customization purposes or to return a
667 * more specific return type. If overridden, the subclass must invoke {@code
668 * super.joinUntil} to ensure that the method waits for threads in this task scope to
669 * finish.
670 *
671 * @param deadline the deadline
672 * @return this task scope
673 * @throws IllegalStateException if this task scope is closed
674 * @throws WrongThreadException if the current thread is not the task scope owner
675 * @throws InterruptedException if interrupted while waiting
676 * @throws TimeoutException if the deadline is reached while waiting
677 */
678 public StructuredTaskScope<T> joinUntil(Instant deadline)
679 throws InterruptedException, TimeoutException
680 {
681 Duration timeout = Duration.between(Instant.now(), deadline);
682 implJoin(timeout);
683 return this;
684 }
685
686 /**
687 * Interrupt all unfinished threads.
688 */
689 private void interruptAll() {
690 flock.threads()
691 .filter(t -> t != Thread.currentThread())
692 .forEach(t -> {
693 try {
694 t.interrupt();
695 } catch (Throwable ignore) { }
696 });
697 }
698
699 /**
700 * Shutdown the task scope if not already shutdown. Return true if this method
701 * shutdowns the task scope, false if already shutdown.
702 */
703 private boolean implShutdown() {
704 shutdownLock.lock();
705 try {
706 if (state < SHUTDOWN) {
707 // prevent new threads from starting
708 flock.shutdown();
709
710 // set status before interrupting tasks
711 state = SHUTDOWN;
712
713 // interrupt all unfinished threads
714 interruptAll();
715
716 return true;
717 } else {
718 // already shutdown
719 return false;
720 }
721 } finally {
722 shutdownLock.unlock();
723 }
724 }
725
726 /**
727 * Shut down this task scope without closing it. Shutting down a task scope prevents
728 * new threads from starting, interrupts all unfinished threads, and causes the
729 * {@link #join() join} method to wakeup. Shutdown is useful for cases where the
730 * results of unfinished subtasks are no longer needed. It will typically be called
731 * by the {@link #handleComplete(Subtask)} implementation of a subclass that
732 * implements a policy to discard unfinished tasks once some outcome is reached.
733 *
734 * <p> More specifically, this method:
735 * <ul>
736 * <li> {@linkplain Thread#interrupt() Interrupts} all unfinished threads in the
737 * task scope (except the current thread).
738 * <li> Wakes up the task scope owner if it is waiting in {@link #join()} or {@link
739 * #joinUntil(Instant)}. If the task scope owner is not waiting then its next call to
740 * {@code join} or {@code joinUntil} will return immediately.
741 * </ul>
742 *
743 * <p> The {@linkplain Subtask.State state} of unfinished subtasks that complete at
744 * around the time that the task scope is shutdown is not defined. A subtask that
745 * completes successfully with a result, or fails with an exception, at around
746 * the time that the task scope is shutdown may or may not <i>transition</i> to a
747 * terminal state.
748 *
749 * <p> This method may only be invoked by the task scope owner or threads contained
750 * in the task scope.
751 *
752 * @implSpec This method may be overridden for customization purposes. If overridden,
753 * the subclass must invoke {@code super.shutdown} to ensure that the method shuts
754 * down the task scope.
755 *
756 * @apiNote
757 * There may be threads that have not finished because they are executing code that
758 * did not respond (or respond promptly) to thread interrupt. This method does not wait
759 * for these threads. When the owner invokes the {@link #close() close} method
760 * to close the task scope then it will wait for the remaining threads to finish.
761 *
762 * @throws IllegalStateException if this task scope is closed
763 * @throws WrongThreadException if the current thread is not the task scope owner or
764 * a thread contained in the task scope
765 * @see #isShutdown()
766 */
767 public void shutdown() {
768 ensureOwnerOrContainsThread();
769 int s = ensureOpen(); // throws ISE if closed
770 if (s < SHUTDOWN && implShutdown())
771 flock.wakeup();
772 }
773
774 /**
775 * {@return true if this task scope is shutdown, otherwise false}
776 * @see #shutdown()
777 */
778 public final boolean isShutdown() {
779 return state >= SHUTDOWN;
780 }
781
782 /**
783 * Closes this task scope.
784 *
785 * <p> This method first shuts down the task scope (as if by invoking the {@link
786 * #shutdown() shutdown} method). It then waits for the threads executing any
787 * unfinished tasks to finish. If interrupted, this method will continue to wait for
788 * the threads to finish before completing with the interrupt status set.
789 *
790 * <p> This method may only be invoked by the task scope owner. If the task scope
791 * is already closed then the task scope owner invoking this method has no effect.
792 *
793 * <p> A {@code StructuredTaskScope} is intended to be used in a <em>structured
794 * manner</em>. If this method is called to close a task scope before nested task
795 * scopes are closed then it closes the underlying construct of each nested task scope
796 * (in the reverse order that they were created in), closes this task scope, and then
797 * throws {@link StructureViolationException}.
798 * Similarly, if this method is called to close a task scope while executing with
799 * {@linkplain ScopedValue scoped value} bindings, and the task scope was created
800 * before the scoped values were bound, then {@code StructureViolationException} is
801 * thrown after closing the task scope.
802 * If a thread terminates without first closing task scopes that it owns then
803 * termination will cause the underlying construct of each of its open tasks scopes to
804 * be closed. Closing is performed in the reverse order that the task scopes were
805 * created in. Thread termination may therefore be delayed when the task scope owner
806 * has to wait for threads forked in these task scopes to finish.
807 *
808 * @implSpec This method may be overridden for customization purposes. If overridden,
809 * the subclass must invoke {@code super.close} to close the task scope.
810 *
811 * @throws IllegalStateException thrown after closing the task scope if the task scope
812 * owner did not attempt to join after forking
813 * @throws WrongThreadException if the current thread is not the task scope owner
814 * @throws StructureViolationException if a structure violation was detected
815 */
816 @Override
817 public void close() {
818 ensureOwner();
819 int s = state;
820 if (s == CLOSED)
821 return;
822
823 try {
824 if (s < SHUTDOWN)
825 implShutdown();
826 flock.close();
827 } finally {
828 state = CLOSED;
829 }
830
831 // throw ISE if the owner didn't attempt to join after forking
832 if (forkRound > lastJoinAttempted) {
833 lastJoinCompleted = forkRound;
834 throw newIllegalStateExceptionNoJoin();
835 }
836 }
837
838 @Override
839 public String toString() {
840 String name = flock.name();
841 return switch (state) {
842 case OPEN -> name;
843 case SHUTDOWN -> name + "/shutdown";
844 case CLOSED -> name + "/closed";
845 default -> throw new InternalError();
846 };
847 }
848
849 /**
850 * Subtask implementation, runs the task specified to the fork method.
851 */
852 private static final class SubtaskImpl<T> implements Subtask<T>, Runnable {
853 private static final AltResult RESULT_NULL = new AltResult(Subtask.State.SUCCESS);
854
855 private record AltResult(Subtask.State state, Throwable exception) {
856 AltResult(Subtask.State state) {
857 this(state, null);
858 }
859 }
860
861 private final StructuredTaskScope<? super T> scope;
862 private final Callable<? extends T> task;
863 private final int round;
864 private volatile Object result;
865
866 SubtaskImpl(StructuredTaskScope<? super T> scope,
867 Callable<? extends T> task,
868 int round) {
869 this.scope = scope;
870 this.task = task;
871 this.round = round;
872 }
873
874 @Override
875 public void run() {
876 T result = null;
877 Throwable ex = null;
878 try {
879 result = task.call();
880 } catch (Throwable e) {
881 ex = e;
882 }
883
884 // nothing to do if task scope is shutdown
885 if (scope.isShutdown())
886 return;
887
888 // capture result or exception, invoke handleComplete
889 if (ex == null) {
890 this.result = (result != null) ? result : RESULT_NULL;
891 } else {
892 this.result = new AltResult(State.FAILED, ex);
893 }
894 scope.handleComplete(this);
895 }
896
897 @Override
898 public Callable<? extends T> task() {
899 return task;
900 }
901
902 @Override
903 public Subtask.State state() {
904 Object result = this.result;
905 if (result == null) {
906 return State.UNAVAILABLE;
907 } else if (result instanceof AltResult alt) {
908 // null or failed
909 return alt.state();
910 } else {
911 return State.SUCCESS;
912 }
913 }
914
915 @Override
916 public T get() {
917 scope.ensureJoinedIfOwner(round);
918 Object result = this.result;
919 if (result instanceof AltResult) {
920 if (result == RESULT_NULL) return null;
921 } else if (result != null) {
922 @SuppressWarnings("unchecked")
923 T r = (T) result;
924 return r;
925 }
926 throw new IllegalStateException(
927 "Result is unavailable or subtask did not complete successfully");
928 }
929
930 @Override
931 public Throwable exception() {
932 scope.ensureJoinedIfOwner(round);
933 Object result = this.result;
934 if (result instanceof AltResult alt && alt.state() == State.FAILED) {
935 return alt.exception();
936 }
937 throw new IllegalStateException(
938 "Exception is unavailable or subtask did not complete with exception");
939 }
940
941 @Override
942 public String toString() {
943 String stateAsString = switch (state()) {
944 case UNAVAILABLE -> "[Unavailable]";
945 case SUCCESS -> "[Completed successfully]";
946 case FAILED -> {
947 Throwable ex = ((AltResult) result).exception();
948 yield "[Failed: " + ex + "]";
949 }
950 };
951 return Objects.toIdentityString(this) + stateAsString;
952 }
953 }
954
955 /**
956 * A {@code StructuredTaskScope} that captures the result of the first subtask to
957 * complete {@linkplain Subtask.State#SUCCESS successfully}. Once captured, it
958 * {@linkplain #shutdown() shuts down} the task scope to interrupt unfinished threads
959 * and wakeup the task scope owner. The policy implemented by this class is intended
960 * for cases where the result of any subtask will do ("invoke any") and where the
961 * results of other unfinished subtasks are no longer needed.
962 *
963 * <p> Unless otherwise specified, passing a {@code null} argument to a method
964 * in this class will cause a {@link NullPointerException} to be thrown.
965 *
966 * @apiNote This class implements a policy to shut down the task scope when a subtask
967 * completes successfully. There shouldn't be any need to directly shut down the task
968 * scope with the {@link #shutdown() shutdown} method.
969 *
970 * @param <T> the result type
971 * @since 21
972 */
973 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
974 public static final class ShutdownOnSuccess<T> extends StructuredTaskScope<T> {
975 private static final Object RESULT_NULL = new Object();
976 private static final VarHandle FIRST_RESULT;
977 private static final VarHandle FIRST_EXCEPTION;
978 static {
979 MethodHandles.Lookup l = MethodHandles.lookup();
980 FIRST_RESULT = MhUtil.findVarHandle(l, "firstResult", Object.class);
981 FIRST_EXCEPTION = MhUtil.findVarHandle(l, "firstException", Throwable.class);
982 }
983 private volatile Object firstResult;
984 private volatile Throwable firstException;
985
986 /**
987 * Constructs a new {@code ShutdownOnSuccess} with the given name and thread factory.
988 * The task scope is optionally named for the purposes of monitoring and management.
989 * The thread factory is used to {@link ThreadFactory#newThread(Runnable) create}
990 * threads when subtasks are {@linkplain #fork(Callable) forked}. The task scope
991 * is owned by the current thread.
992 *
993 * <p> Construction captures the current thread's {@linkplain ScopedValue scoped
994 * value} bindings for inheritance by threads started in the task scope. The
995 * {@linkplain StructuredTaskScope##TreeStructure Tree Structure} section
996 * in the class description details how parent-child relations are established
997 * implicitly for the purpose of inheritance of scoped value bindings.
998 *
999 * @param name the name of the task scope, can be null
1000 * @param factory the thread factory
1001 */
1002 public ShutdownOnSuccess(String name, ThreadFactory factory) {
1003 super(name, factory);
1004 }
1005
1006 /**
1007 * Constructs a new unnamed {@code ShutdownOnSuccess} that creates virtual threads.
1008 *
1009 * @implSpec This constructor is equivalent to invoking the 2-arg constructor with
1010 * a name of {@code null} and a thread factory that creates virtual threads.
1011 */
1012 public ShutdownOnSuccess() {
1013 this(null, Thread.ofVirtual().factory());
1014 }
1015
1016 @Override
1017 protected void handleComplete(Subtask<? extends T> subtask) {
1018 if (firstResult != null) {
1019 // already captured a result
1020 return;
1021 }
1022
1023 if (subtask.state() == Subtask.State.SUCCESS) {
1024 // task succeeded
1025 T result = subtask.get();
1026 Object r = (result != null) ? result : RESULT_NULL;
1027 if (FIRST_RESULT.compareAndSet(this, null, r)) {
1028 super.shutdown();
1029 }
1030 } else if (firstException == null) {
1031 // capture the exception thrown by the first subtask that failed
1032 FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
1033 }
1034 }
1035
1036 /**
1037 * Wait for a subtask started in this task scope to complete {@linkplain
1038 * Subtask.State#SUCCESS successfully} or all subtasks to complete.
1039 *
1040 * <p> This method waits for all subtasks by waiting for all threads {@linkplain
1041 * #fork(Callable) started} in this task scope to finish execution. It stops waiting
1042 * when all threads finish, a subtask completes successfully, or the current
1043 * thread is {@linkplain Thread#interrupt() interrupted}. It also stops waiting
1044 * if the {@link #shutdown() shutdown} method is invoked directly to shut down
1045 * this task scope.
1046 *
1047 * <p> This method may only be invoked by the task scope owner.
1048 *
1049 * @throws IllegalStateException {@inheritDoc}
1050 * @throws WrongThreadException {@inheritDoc}
1051 */
1052 @Override
1053 public ShutdownOnSuccess<T> join() throws InterruptedException {
1054 super.join();
1055 return this;
1056 }
1057
1058 /**
1059 * Wait for a subtask started in this task scope to complete {@linkplain
1060 * Subtask.State#SUCCESS successfully} or all subtasks to complete, up to the
1061 * given deadline.
1062 *
1063 * <p> This method waits for all subtasks by waiting for all threads {@linkplain
1064 * #fork(Callable) started} in this task scope to finish execution. It stops waiting
1065 * when all threads finish, a subtask completes successfully, the deadline is
1066 * reached, or the current thread is {@linkplain Thread#interrupt() interrupted}.
1067 * It also stops waiting if the {@link #shutdown() shutdown} method is invoked
1068 * directly to shut down this task scope.
1069 *
1070 * <p> This method may only be invoked by the task scope owner.
1071 *
1072 * @throws IllegalStateException {@inheritDoc}
1073 * @throws WrongThreadException {@inheritDoc}
1074 */
1075 @Override
1076 public ShutdownOnSuccess<T> joinUntil(Instant deadline)
1077 throws InterruptedException, TimeoutException
1078 {
1079 super.joinUntil(deadline);
1080 return this;
1081 }
1082
1083 /**
1084 * {@return the result of the first subtask that completed {@linkplain
1085 * Subtask.State#SUCCESS successfully}}
1086 *
1087 * <p> When no subtask completed successfully, but a subtask {@linkplain
1088 * Subtask.State#FAILED failed} then {@code ExecutionException} is thrown with
1089 * the subtask's exception as the {@linkplain Throwable#getCause() cause}.
1090 *
1091 * @throws ExecutionException if no subtasks completed successfully but at least
1092 * one subtask failed
1093 * @throws IllegalStateException if no subtasks completed or the task scope owner
1094 * did not join after forking
1095 * @throws WrongThreadException if the current thread is not the task scope owner
1096 */
1097 public T result() throws ExecutionException {
1098 return result(ExecutionException::new);
1099 }
1100
1101 /**
1102 * Returns the result of the first subtask that completed {@linkplain
1103 * Subtask.State#SUCCESS successfully}, otherwise throws an exception produced
1104 * by the given exception supplying function.
1105 *
1106 * <p> When no subtask completed successfully, but a subtask {@linkplain
1107 * Subtask.State#FAILED failed}, then the exception supplying function is invoked
1108 * with subtask's exception.
1109 *
1110 * @param esf the exception supplying function
1111 * @param <X> type of the exception to be thrown
1112 * @return the result of the first subtask that completed with a result
1113 *
1114 * @throws X if no subtasks completed successfully but at least one subtask failed
1115 * @throws IllegalStateException if no subtasks completed or the task scope owner
1116 * did not join after forking
1117 * @throws WrongThreadException if the current thread is not the task scope owner
1118 */
1119 public <X extends Throwable> T result(Function<Throwable, ? extends X> esf) throws X {
1120 Objects.requireNonNull(esf);
1121 ensureOwnerAndJoined();
1122
1123 Object result = firstResult;
1124 if (result == RESULT_NULL) {
1125 return null;
1126 } else if (result != null) {
1127 @SuppressWarnings("unchecked")
1128 T r = (T) result;
1129 return r;
1130 }
1131
1132 Throwable exception = firstException;
1133 if (exception != null) {
1134 X ex = esf.apply(exception);
1135 Objects.requireNonNull(ex, "esf returned null");
1136 throw ex;
1137 }
1138
1139 throw new IllegalStateException("No completed subtasks");
1140 }
1141 }
1142
1143 /**
1144 * A {@code StructuredTaskScope} that captures the exception of the first subtask to
1145 * {@linkplain Subtask.State#FAILED fail}. Once captured, it {@linkplain #shutdown()
1146 * shuts down} the task scope to interrupt unfinished threads and wakeup the task
1147 * scope owner. The policy implemented by this class is intended for cases where the
1148 * results for all subtasks are required ("invoke all"); if any subtask fails then the
1149 * results of other unfinished subtasks are no longer needed.
1150 *
1151 * <p> Unless otherwise specified, passing a {@code null} argument to a method
1152 * in this class will cause a {@link NullPointerException} to be thrown.
1153 *
1154 * @apiNote This class implements a policy to shut down the task scope when a subtask
1155 * fails. There shouldn't be any need to directly shut down the task scope with the
1156 * {@link #shutdown() shutdown} method.
1157 *
1158 * @since 21
1159 */
1160 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
1161 public static final class ShutdownOnFailure extends StructuredTaskScope<Object> {
1162 private static final VarHandle FIRST_EXCEPTION =
1163 MhUtil.findVarHandle(MethodHandles.lookup(), "firstException", Throwable.class);
1164 private volatile Throwable firstException;
1165
1166 /**
1167 * Constructs a new {@code ShutdownOnFailure} with the given name and thread factory.
1168 * The task scope is optionally named for the purposes of monitoring and management.
1169 * The thread factory is used to {@link ThreadFactory#newThread(Runnable) create}
1170 * threads when subtasks are {@linkplain #fork(Callable) forked}. The task scope
1171 * is owned by the current thread.
1172 *
1173 * <p> Construction captures the current thread's {@linkplain ScopedValue scoped
1174 * value} bindings for inheritance by threads started in the task scope. The
1175 * {@linkplain StructuredTaskScope##TreeStructure Tree Structure} section in the class description
1176 * details how parent-child relations are established implicitly for the purpose
1177 * of inheritance of scoped value bindings.
1178 *
1179 * @param name the name of the task scope, can be null
1180 * @param factory the thread factory
1181 */
1182 public ShutdownOnFailure(String name, ThreadFactory factory) {
1183 super(name, factory);
1184 }
1185
1186 /**
1187 * Constructs a new unnamed {@code ShutdownOnFailure} that creates virtual threads.
1188 *
1189 * @implSpec This constructor is equivalent to invoking the 2-arg constructor with
1190 * a name of {@code null} and a thread factory that creates virtual threads.
1191 */
1192 public ShutdownOnFailure() {
1193 this(null, Thread.ofVirtual().factory());
1194 }
1195
1196 @Override
1197 protected void handleComplete(Subtask<?> subtask) {
1198 if (subtask.state() == Subtask.State.FAILED
1199 && firstException == null
1200 && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception())) {
1201 super.shutdown();
1202 }
1203 }
1204
1205 /**
1206 * Wait for all subtasks started in this task scope to complete or for a subtask
1207 * to {@linkplain Subtask.State#FAILED fail}.
1208 *
1209 * <p> This method waits for all subtasks by waiting for all threads {@linkplain
1210 * #fork(Callable) started} in this task scope to finish execution. It stops waiting
1211 * when all threads finish, a subtask fails, or the current thread is {@linkplain
1212 * Thread#interrupt() interrupted}. It also stops waiting if the {@link #shutdown()
1213 * shutdown} method is invoked directly to shut down this task scope.
1214 *
1215 * <p> This method may only be invoked by the task scope owner.
1216 *
1217 * @throws IllegalStateException {@inheritDoc}
1218 * @throws WrongThreadException {@inheritDoc}
1219 */
1220 @Override
1221 public ShutdownOnFailure join() throws InterruptedException {
1222 super.join();
1223 return this;
1224 }
1225
1226 /**
1227 * Wait for all subtasks started in this task scope to complete or for a subtask
1228 * to {@linkplain Subtask.State#FAILED fail}, up to the given deadline.
1229 *
1230 * <p> This method waits for all subtasks by waiting for all threads {@linkplain
1231 * #fork(Callable) started} in this task scope to finish execution. It stops waiting
1232 * when all threads finish, a subtask fails, the deadline is reached, or the current
1233 * thread is {@linkplain Thread#interrupt() interrupted}. It also stops waiting
1234 * if the {@link #shutdown() shutdown} method is invoked directly to shut down
1235 * this task scope.
1236 *
1237 * <p> This method may only be invoked by the task scope owner.
1238 *
1239 * @throws IllegalStateException {@inheritDoc}
1240 * @throws WrongThreadException {@inheritDoc}
1241 */
1242 @Override
1243 public ShutdownOnFailure joinUntil(Instant deadline)
1244 throws InterruptedException, TimeoutException
1245 {
1246 super.joinUntil(deadline);
1247 return this;
1248 }
1249
1250 /**
1251 * Returns the exception of the first subtask that {@linkplain Subtask.State#FAILED
1252 * failed}. If no subtasks failed then an empty {@code Optional} is returned.
1253 *
1254 * @return the exception for the first subtask to fail or an empty optional if no
1255 * subtasks failed
1256 *
1257 * @throws WrongThreadException if the current thread is not the task scope owner
1258 * @throws IllegalStateException if the task scope owner did not join after forking
1259 */
1260 public Optional<Throwable> exception() {
1261 ensureOwnerAndJoined();
1262 return Optional.ofNullable(firstException);
1263 }
1264
1265 /**
1266 * Throws if a subtask {@linkplain Subtask.State#FAILED failed}.
1267 * If any subtask failed with an exception then {@code ExecutionException} is
1268 * thrown with the exception of the first subtask to fail as the {@linkplain
1269 * Throwable#getCause() cause}. This method does nothing if no subtasks failed.
1270 *
1271 * @throws ExecutionException if a subtask failed
1272 * @throws WrongThreadException if the current thread is not the task scope owner
1273 * @throws IllegalStateException if the task scope owner did not join after forking
1274 */
1275 public void throwIfFailed() throws ExecutionException {
1276 throwIfFailed(ExecutionException::new);
1277 }
1278
1279 /**
1280 * Throws the exception produced by the given exception supplying function if a
1281 * subtask {@linkplain Subtask.State#FAILED failed}. If any subtask failed with
1282 * an exception then the function is invoked with the exception of the first
1283 * subtask to fail. The exception returned by the function is thrown. This method
1284 * does nothing if no subtasks failed.
1285 *
1286 * @param esf the exception supplying function
1287 * @param <X> type of the exception to be thrown
1288 *
1289 * @throws X produced by the exception supplying function
1290 * @throws WrongThreadException if the current thread is not the task scope owner
1291 * @throws IllegalStateException if the task scope owner did not join after forking
1292 */
1293 public <X extends Throwable>
1294 void throwIfFailed(Function<Throwable, ? extends X> esf) throws X {
1295 ensureOwnerAndJoined();
1296 Objects.requireNonNull(esf);
1297 Throwable exception = firstException;
1298 if (exception != null) {
1299 X ex = esf.apply(exception);
1300 Objects.requireNonNull(ex, "esf returned null");
1301 throw ex;
1302 }
1303 }
1304 }
1305 }
|
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package java.util.concurrent;
26
27 import java.time.Duration;
28 import java.util.function.Function;
29 import java.util.function.Predicate;
30 import java.util.function.Supplier;
31 import java.util.stream.Stream;
32 import jdk.internal.javac.PreviewFeature;
33
34 /**
35 * An API for <em>structured concurrency</em>. {@code StructuredTaskScope} supports cases
36 * where execution of a <em>task</em> (a unit of work) splits into several concurrent
37 * subtasks, and where the subtasks must complete before the task continues. A {@code
38 * StructuredTaskScope} can be used to ensure that the lifetime of a concurrent operation
39 * is confined by a <em>syntax block</em>, similar to that of a sequential operation in
40 * structured programming.
41 *
42 * <p> {@code StructuredTaskScope} defines the static method {@link #open() open} to open
43 * a new {@code StructuredTaskScope} and the {@link #close() close} method to close it.
44 * The API is designed to be used with the {@code try}-with-resources statement where
45 * the {@code StructuredTaskScope} is opened as a resource and then closed automatically.
46 * The code inside the block uses the {@link #fork(Callable) fork} method to fork subtasks.
47 * After forking, it uses the {@link #join() join} method to wait for all subtasks to
48 * finish (or some other outcome) as a single operation. Forking a subtask starts a new
49 * {@link Thread} to run the subtask. The thread executing the task does not continue
50 * beyond the {@code close} method until all threads started to execute subtasks have finished.
51 * To ensure correct usage, the {@code fork}, {@code join} and {@code close} methods may
52 * only be invoked by the <em>owner thread</em> (the thread that opened the {@code
53 * StructuredTaskScope}), the {@code fork} method may not be called after {@code join},
54 * the {@code join} method may only be invoked once, and the {@code close} method throws
55 * an exception after closing if the owner did not invoke the {@code join} method after
56 * forking subtasks.
57 *
58 * <p> As a first example, consider a task that splits into two subtasks to concurrently
59 * fetch resources from two URL locations "left" and "right". Both subtasks may complete
60 * successfully, one subtask may succeed and the other may fail, or both subtasks may
61 * fail. The task in this example is interested in the successful result from both
62 * subtasks. It waits in the {@link #join() join} method for both subtasks to complete
63 * successfully or for either subtask to fail.
64 * {@snippet lang=java :
65 * // @link substring="open" target="#open()" :
66 * try (var scope = StructuredTaskScope.open()) {
67 *
68 * // @link substring="fork" target="#fork(Callable)" :
69 * Subtask<String> subtask1 = scope.fork(() -> query(left));
70 * Subtask<Integer> subtask2 = scope.fork(() -> query(right));
71 *
72 * // throws if either subtask fails
73 * scope.join(); // @link substring="join" target="#join()"
74 *
75 * // both subtasks completed successfully
76 * // @link substring="get" target="Subtask#get()" :
77 * return new MyResult(subtask1.get(), subtask2.get());
78 *
79 * // @link substring="close" target="#close()" :
80 * } // close
81 * }
82 *
83 * <p> If both subtasks complete successfully then the {@code join} method completes
84 * normally and the task uses the {@link Subtask#get() Subtask.get()} method to get
85 * the result of each subtask. If one of the subtasks fails then the other subtask is
86 * cancelled (this will {@linkplain Thread#interrupt() interrupt} the thread executing the
87 * other subtask) and the {@code join} method throws {@link FailedException} with the
88 * exception from the failed subtask as the {@linkplain Throwable#getCause() cause}.
89 *
90 * <p> To allow for cancellation, subtasks must be coded so that they finish as soon as
91 * possible when interrupted. Subtasks that do not respond to interrupt, e.g. block on
92 * methods that are not interruptible, may delay the closing of a scope indefinitely. The
93 * {@link #close() close} method always waits for threads executing subtasks to finish,
94 * even if the scope is cancelled, so execution cannot continue beyond the {@code close}
95 * method until the interrupted threads finish.
96 *
97 * <p> In the example, the subtasks produce results of different types ({@code String} and
98 * {@code Integer}). In other cases the subtasks may all produce results of the same type.
99 * If the example had used {@code StructuredTaskScope.<String>open()} then it could
100 * only be used to fork subtasks that return a {@code String} result.
101 *
102 * <h2>Joiners</h2>
103 *
104 * <p> In the example above, the task fails if any subtask fails. If all subtasks
105 * succeed then the {@code join} method completes normally. Other policy and outcome is
106 * supported by creating a {@code StructuredTaskScope} with a {@link Joiner} that
107 * implements the desired policy. A {@code Joiner} handles subtask completion and produces
108 * the outcome for the {@link #join() join} method. In the example above, {@code join}
109 * returns {@code null}. Depending on the {@code Joiner}, {@code join} may return a
110 * result, a stream of elements, or some other object. The {@code Joiner} interface defines
111 * factory methods to create {@code Joiner}s for some common cases.
112 *
113 * <p> A {@code Joiner} may <a id="Cancallation">cancel</a> the scope (sometimes called
114 * "short-circuiting") when some condition is reached that does not require the result of
115 * subtasks that are still executing. Cancelling the scope prevents new threads from being
116 * started to execute further subtasks, {@linkplain Thread#interrupt() interrupts} the
117 * threads executing subtasks that have not completed, and causes the {@code join} method
118 * to wakeup with the outcome (result or exception). In the above example, the outcome is
119 * that {@code join} completes with a result of {@code null} when all subtasks succeed.
120 * The scope is cancelled if any of the subtasks fail and {@code join} throws {@code
121 * FailedException} with the exception from the failed subtask as the cause. Other {@code
122 * Joiner} implementations may cancel the scope for other reasons.
123 *
124 * <p> Now consider another example that splits into two subtasks. In this example,
125 * each subtask produces a {@code String} result and the task is only interested in
126 * the result from the first subtask to complete successfully. The example uses {@link
127 * Joiner#anySuccessfulResultOrThrow() Joiner.anySuccessfulResultOrThrow()} to
128 * create a {@code Joiner} that makes available the result of the first subtask to
129 * complete successfully. The type parameter in the example is "{@code String}" so that
130 * only subtasks that return a {@code String} can be forked.
131 * {@snippet lang=java :
132 * // @link substring="open" target="#open(Joiner)" :
133 * try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow())) {
134 *
135 * scope.fork(callable1);
136 * scope.fork(callable2);
137 *
138 * // throws if both subtasks fail
139 * String firstResult = scope.join();
140 *
141 * }
142 * }
143 *
144 * <p> In the example, the task forks the two subtasks, then waits in the {@code
145 * join} method for either subtask to complete successfully or for both subtasks to fail.
146 * If one of the subtasks completes successfully then the {@code Joiner} causes the other
147 * subtask to be cancelled (this will interrupt the thread executing the subtask), and
148 * the {@code join} method returns the result from the successful subtask. Cancelling the
149 * other subtask avoids the task waiting for a result that it doesn't care about. If
150 * both subtasks fail then the {@code join} method throws {@code FailedException} with the
151 * exception from one of the subtasks as the {@linkplain Throwable#getCause() cause}.
152 *
153 * <p> Whether code uses the {@code Subtask} returned from {@code fork} will depend on
154 * the {@code Joiner} and usage. Some {@code Joiner} implementations are suited to subtasks
155 * that return results of the same type and where the {@code join} method returns a result
156 * for the task to use. Code that forks subtasks that return results of different
157 * types, and uses a {@code Joiner} such as {@code Joiner.awaitAllSuccessfulOrThrow()} that
158 * does not return a result, will use {@link Subtask#get() Subtask.get()} after joining.
159 *
160 * <h2>Exception handling</h2>
161 *
162 * <p> A {@code StructuredTaskScope} is opened with a {@link Joiner Joiner} that
163 * handles subtask completion and produces the outcome for the {@link #join() join} method.
164 * In some cases, the outcome will be a result, in other cases it will be an exception.
165 * If the outcome is an exception then the {@code join} method throws {@link
166 * FailedException} with the exception as the {@linkplain Throwable#getCause()
167 * cause}. For many {@code Joiner} implementations, the exception will be an exception
168 * thrown by a subtask that failed. In the case of {@link Joiner#allSuccessfulOrThrow()
169 * allSuccessfulOrThrow} and {@link Joiner#awaitAllSuccessfulOrThrow() awaitAllSuccessfulOrThrow}
170 * for example, the exception is from the first subtask to fail.
171 *
172 * <p> Many of the details for how exceptions are handled will depend on usage. In some
173 * cases it may be useful to add a {@code catch} block to the {@code try}-with-resources
174 * statement to catch {@code FailedException}. The exception handling may use {@code
175 * instanceof} with pattern matching to handle specific causes.
176 * {@snippet lang=java :
177 * try (var scope = StructuredTaskScope.open()) {
178 *
179 * ..
180 *
181 * } catch (StructuredTaskScope.FailedException e) {
182 *
183 * Throwable cause = e.getCause();
184 * switch (cause) {
185 * case IOException ioe -> ..
186 * default -> ..
187 * }
188 *
189 * }
190 * }
191 * In other cases it may not be useful to catch {@code FailedException} but instead leave
192 * it to propagate to the configured {@linkplain Thread.UncaughtExceptionHandler uncaught
193 * exception handler} for logging purposes.
194 *
195 * <p> For cases where a specific exception triggers the use of a default result then it
196 * may be more appropriate to handle this in the subtask itself rather than the subtask
197 * failing and the scope owner handling the exception.
198 *
199 * <h2>Configuration</h2>
200 *
201 * A {@code StructuredTaskScope} is opened with {@linkplain Config configuration} that
202 * consists of a {@link ThreadFactory} to create threads, an optional name for monitoring
203 * and management purposes, and an optional timeout.
204 *
205 * <p> The {@link #open()} and {@link #open(Joiner)} methods create a {@code StructuredTaskScope}
206 * with the <a id="DefaultConfiguration"> <em>default configuration</em></a>. The default
207 * configuration has a {@code ThreadFactory} that creates unnamed
208 * <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>,
209 * is unnamed for monitoring and management purposes, and has no timeout.
210 *
211 * <p> The 2-arg {@link #open(Joiner, Function) open} method can be used to create a
212 * {@code StructuredTaskScope} that uses a different {@code ThreadFactory}, has a name for
213 * the purposes of monitoring and management, or has a timeout that cancels the scope if
214 * the timeout expires before or while waiting for subtasks to complete. The {@code open}
215 * method is called with a {@linkplain Function function} that is applied to the default
216 * configuration and returns a {@link Config Config} for the {@code StructuredTaskScope}
217 * under construction.
218 *
219 * <p> The following example opens a new {@code StructuredTaskScope} with a {@code
220 * ThreadFactory} that creates virtual threads {@linkplain Thread#setName(String) named}
221 * "duke-0", "duke-1" ...
222 * {@snippet lang = java:
223 * // @link substring="name" target="Thread.Builder#name(String, long)" :
224 * ThreadFactory factory = Thread.ofVirtual().name("duke-", 0).factory();
225 *
226 * // @link substring="withThreadFactory" target="Config#withThreadFactory(ThreadFactory)" :
227 * try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) {
228 *
229 * scope.fork( .. ); // runs in a virtual thread with name "duke-0"
230 * scope.fork( .. ); // runs in a virtual thread with name "duke-1"
231 *
232 * scope.join();
233 *
234 * }
235 *}
236 *
237 * <p> A second example sets a timeout, represented by a {@link Duration}. The timeout
238 * starts when the new scope is opened. If the timeout expires before the {@code join}
239 * method has completed then the scope is <a href="#Cancallation">cancelled</a>. This
240 * interrupts the threads executing the two subtasks and causes the {@link #join() join}
241 * method to throw {@link TimeoutException}.
242 * {@snippet lang=java :
243 * Duration timeout = Duration.ofSeconds(10);
244 *
245 * // @link substring="allSuccessfulOrThrow" target="Joiner#allSuccessfulOrThrow()" :
246 * try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
247 * // @link substring="withTimeout" target="Config#withTimeout(Duration)" :
248 * cf -> cf.withTimeout(timeout))) {
249 *
250 * scope.fork(callable1);
251 * scope.fork(callable2);
252 *
253 * List<String> result = scope.join()
254 * .map(Subtask::get)
255 * .toList();
256 *
257 * }
258 * }
259 *
260 * <h2>Inheritance of scoped value bindings</h2>
261 *
262 * {@link ScopedValue} supports the execution of a method with a {@code ScopedValue} bound
263 * to a value for the bounded period of execution of the method by the <em>current thread</em>.
264 * It allows a value to be safely and efficiently shared to methods without using method
265 * parameters.
266 *
267 * <p> When used in conjunction with a {@code StructuredTaskScope}, a {@code ScopedValue}
268 * can also safely and efficiently share a value to methods executed by subtasks forked
269 * in the scope. When a {@code ScopedValue} object is bound to a value in the thread
270 * executing the task then that binding is inherited by the threads created to
271 * execute the subtasks. The thread executing the task does not continue beyond the
272 * {@link #close() close} method until all threads executing the subtasks have finished.
273 * This ensures that the {@code ScopedValue} is not reverted to being {@linkplain
274 * ScopedValue#isBound() unbound} (or its previous value) while subtasks are executing.
275 * In addition to providing a safe and efficient means to inherit a value into subtasks,
276 * the inheritance allows sequential code using {@code ScopedValue} be refactored to use
277 * structured concurrency.
278 *
279 * <p> To ensure correctness, opening a new {@code StructuredTaskScope} captures the
280 * current thread's scoped value bindings. These are the scoped values bindings that are
281 * inherited by the threads created to execute subtasks in the scope. Forking a
282 * subtask checks that the bindings in effect at the time that the subtask is forked
283 * match the bindings when the {@code StructuredTaskScope} was created. This check ensures
284 * that a subtask does not inherit a binding that is reverted in the task before the
285 * subtask has completed.
286 *
287 * <p> A {@code ScopedValue} that is shared across threads requires that the value be an
288 * immutable object or for all access to the value to be appropriately synchronized.
289 *
290 * <p> The following example demonstrates the inheritance of scoped value bindings. The
291 * scoped value USERNAME is bound to the value "duke" for the bounded period of a lambda
292 * expression by the thread executing it. The code in the block opens a {@code
293 * StructuredTaskScope} and forks two subtasks, it then waits in the {@code join} method
294 * and aggregates the results from both subtasks. If code executed by the threads
295 * running subtask1 and subtask2 uses {@link ScopedValue#get()}, to get the value of
296 * USERNAME, then value "duke" will be returned.
297 * {@snippet lang=java :
298 * // @link substring="newInstance" target="ScopedValue#newInstance()" :
299 * private static final ScopedValue<String> USERNAME = ScopedValue.newInstance();
300 *
301 * // @link substring="callWhere" target="ScopedValue#where" :
302 * MyResult result = ScopedValue.where(USERNAME, "duke").call(() -> {
303 *
304 * try (var scope = StructuredTaskScope.open()) {
305 *
306 * Subtask<String> subtask1 = scope.fork( .. ); // inherits binding
307 * Subtask<Integer> subtask2 = scope.fork( .. ); // inherits binding
308 *
309 * scope.join();
310 * return new MyResult(subtask1.get(), subtask2.get());
311 * }
312 *
313 * });
314 * }
315 *
316 * <p> A scoped value inherited into a subtask may be
317 * <a href="{@docRoot}/java.base/java/lang/ScopedValues.html#rebind">rebound</a> to a new
318 * value in the subtask for the bounded execution of some method executed in the subtask.
319 * When the method completes, the value of the {@code ScopedValue} reverts to its previous
320 * value, the value inherited from the thread executing the task.
321 *
322 * <p> A subtask may execute code that itself opens a new {@code StructuredTaskScope}.
323 * A task executing in thread T1 opens a {@code StructuredTaskScope} and forks a
324 * subtask that runs in thread T2. The scoped value bindings captured when T1 opens the
325 * scope are inherited into T2. The subtask (in thread T2) executes code that opens a
326 * new {@code StructuredTaskScope} and forks a subtask that runs in thread T3. The scoped
327 * value bindings captured when T2 opens the scope are inherited into T3. These
328 * include (or may be the same) as the bindings that were inherited from T1. In effect,
329 * scoped values are inherited into a tree of subtasks, not just one level of subtask.
330 *
331 * <h2>Memory consistency effects</h2>
332 *
333 * <p> Actions in the owner thread of a {@code StructuredTaskScope} prior to
334 * {@linkplain #fork forking} of a subtask
335 * <a href="{@docRoot}/java.base/java/util/concurrent/package-summary.html#MemoryVisibility">
336 * <i>happen-before</i></a> any actions taken by that subtask, which in turn
337 * <i>happen-before</i> the subtask result is {@linkplain Subtask#get() retrieved}.
338 *
339 * <h2>General exceptions</h2>
340 *
341 * <p> Unless otherwise specified, passing a {@code null} argument to a method in this
342 * class will cause a {@link NullPointerException} to be thrown.
343 *
344 * @param <T> the result type of subtasks executed in the scope
345 * @param <R> the result type of the scope
346 *
347 * @jls 17.4.5 Happens-before Order
348 * @since 21
349 */
350 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
351 public sealed interface StructuredTaskScope<T, R>
352 extends AutoCloseable
353 permits StructuredTaskScopeImpl {
354
355 /**
356 * Represents a subtask forked with {@link #fork(Callable)} or {@link #fork(Runnable)}.
357 *
358 * <p> Code that forks subtasks can use the {@link #get() get()} method after {@linkplain
359 * #join() joining} to obtain the result of a subtask that completed successfully. It
360 * can use the {@link #exception()} method to obtain the exception thrown by a subtask
361 * that failed.
362 *
363 * @param <T> the result type
364 * @since 21
365 */
366 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
367 sealed interface Subtask<T> extends Supplier<T> permits StructuredTaskScopeImpl.SubtaskImpl {
368 /**
369 * Represents the state of a subtask.
370 * @see Subtask#state()
371 * @since 21
372 */
373 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
374 enum State {
375 /**
376 * The subtask result or exception is not available. This state indicates that
377 * the subtask was forked but has not completed, it completed after the scope
378 * was cancelled, or it was forked after the scoped was cancelled (in which
379 * case a thread was not created to execute the subtask).
380 */
381 UNAVAILABLE,
382 /**
383 * The subtask completed successfully. The {@link Subtask#get() Subtask.get()}
384 * method can be used to get the result. This is a terminal state.
385 */
386 SUCCESS,
387 /**
388 * The subtask failed with an exception. The {@link Subtask#exception()
389 * Subtask.exception()} method can be used to get the exception. This is a
390 * terminal state.
391 */
392 FAILED,
393 }
394
395 /**
396 * {@return the subtask state}
397 */
398 State state();
399
400 /**
401 * Returns the result of this subtask if it completed successfully. If the subtask
402 * was forked with {@link #fork(Callable) fork(Callable)} then the result from the
403 * {@link Callable#call() call} method is returned. If the subtask was forked with
404 * {@link #fork(Runnable) fork(Runnable)} then {@code null} is returned.
405 *
406 * <p> Code executing in the scope owner thread can use this method to get the
407 * result of a successful subtask only after it has {@linkplain #join() joined}.
408 *
409 * <p> Code executing in the {@code Joiner} {@link Joiner#onComplete(Subtask)
410 * onComplete} method should test that the {@linkplain #state() subtask state} is
411 * {@link State#SUCCESS SUCCESS} before using this method to get the result.
412 *
413 * @return the possibly-null result
414 * @throws IllegalStateException if the subtask has not completed, did not complete
415 * successfully, or the current thread is the scope owner invoking this
416 * method before {@linkplain #join() joining}
417 * @see State#SUCCESS
418 */
419 T get();
420
421 /**
422 * {@return the exception or error thrown by this subtask if it failed}
423 * If the subtask was forked with {@link #fork(Callable) fork(Callable)} then the
424 * exception or error thrown by the {@link Callable#call() call} method is returned.
425 * If the subtask was forked with {@link #fork(Runnable) fork(Runnable)} then the
426 * exception or error thrown by the {@link Runnable#run() run} method is returned.
427 *
428 * <p> Code executing in the scope owner thread can use this method to get the
429 * exception thrown by a failed subtask only after it has {@linkplain #join() joined}.
430 *
431 * <p> Code executing in a {@code Joiner} {@link Joiner#onComplete(Subtask)
432 * onComplete} method should test that the {@linkplain #state() subtask state} is
433 * {@link State#FAILED FAILED} before using this method to get the exception.
434 *
435 * @throws IllegalStateException if the subtask has not completed, completed with
436 * a result, or the current thread is the scope owner invoking this method
437 * before {@linkplain #join() joining}
438 * @see State#FAILED
439 */
440 Throwable exception();
441 }
442
443 /**
444 * An object used with a {@link StructuredTaskScope} to handle subtask completion and
445 * produce the result for the scope owner waiting in the {@link #join() join} method
446 * for subtasks to complete.
447 *
448 * <p> Joiner defines static methods to create {@code Joiner} objects for common cases:
449 * <ul>
450 * <li> {@link #allSuccessfulOrThrow() allSuccessfulOrThrow()} creates a {@code Joiner}
451 * that yields a stream of the completed subtasks for {@code join} to return when
452 * all subtasks complete successfully. It cancels the scope and causes {@code join}
453 * to throw if any subtask fails.
454 * <li> {@link #anySuccessfulResultOrThrow() anySuccessfulResultOrThrow()} creates a
455 * {@code Joiner} that yields the result of the first subtask to succeed for {@code
456 * join} to return. It causes {@code join} to throw if all subtasks fail.
457 * <li> {@link #awaitAllSuccessfulOrThrow() awaitAllSuccessfulOrThrow()} creates a
458 * {@code Joiner} that waits for all successful subtasks. It cancels the scope and
459 * causes {@code join} to throw if any subtask fails.
460 * <li> {@link #awaitAll() awaitAll()} creates a {@code Joiner} that waits for all
461 * subtasks. It does not cancel the scope or cause {@code join} to throw.
462 * </ul>
463 *
464 * <p> In addition to the methods to create {@code Joiner} objects for common cases,
465 * the {@link #allUntil(Predicate) allUntil(Predicate)} method is defined to create a
466 * {@code Joiner} that yields a stream of all subtasks. It is created with a {@link
467 * Predicate Predicate} that determines if the scope should continue or be cancelled.
468 * This {@code Joiner} can be built upon to create custom policies that cancel the
469 * scope based on some condition.
470 *
471 * <p> More advanced policies can be developed by implementing the {@code Joiner}
472 * interface. The {@link #onFork(Subtask)} method is invoked when subtasks are forked.
473 * The {@link #onComplete(Subtask)} method is invoked when subtasks complete with a
474 * result or exception. These methods return a {@code boolean} to indicate if scope
475 * should be cancelled. These methods can be used to collect subtasks, results, or
476 * exceptions, and control when to cancel the scope. The {@link #result()} method
477 * must be implemented to produce the result (or exception) for the {@code join}
478 * method.
479 *
480 * <p> Unless otherwise specified, passing a {@code null} argument to a method
481 * in this class will cause a {@link NullPointerException} to be thrown.
482 *
483 * @implSpec Implementations of this interface must be thread safe. The {@link
484 * #onComplete(Subtask)} method defined by this interface may be invoked by several
485 * threads concurrently.
486 *
487 * @apiNote It is very important that a new {@code Joiner} object is created for each
488 * {@code StructuredTaskScope}. {@code Joiner} objects should never be shared with
489 * different scopes or re-used after a task is closed.
490 *
491 * <p> Designing a {@code Joiner} should take into account the code at the use-site
492 * where the results from the {@link StructuredTaskScope#join() join} method are
493 * processed. It should be clear what the {@code Joiner} does vs. the application
494 * code at the use-site. In general, the {@code Joiner} implementation is not the
495 * place to code "business logic". A {@code Joiner} should be designed to be as
496 * general purpose as possible.
497 *
498 * @param <T> the result type of subtasks executed in the scope
499 * @param <R> the result type of the scope
500 * @since 24
501 * @see #open(Joiner)
502 */
503 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
504 @FunctionalInterface
505 interface Joiner<T, R> {
506 /**
507 * Invoked by {@link #fork(Callable) fork(Callable)} and {@link #fork(Runnable)
508 * fork(Runnable)} when forking a subtask. The method is invoked from the task
509 * owner thread. The method is invoked before a thread is created to run the
510 * subtask.
511 *
512 * @implSpec The default implementation throws {@code NullPointerException} if the
513 * subtask is {@code null}. It throws {@code IllegalArgumentException} if the
514 * subtask is not in the {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state, it
515 * otherwise returns {@code false}.
516 *
517 * @apiNote This method is invoked by the {@code fork} methods. It should not be
518 * invoked directly.
519 *
520 * @param subtask the subtask
521 * @return {@code true} to cancel the scope, otherwise {@code false}
522 */
523 default boolean onFork(Subtask<? extends T> subtask) {
524 if (subtask.state() != Subtask.State.UNAVAILABLE) {
525 throw new IllegalArgumentException();
526 }
527 return false;
528 }
529
530 /**
531 * Invoked by the thread started to execute a subtask after the subtask completes
532 * successfully or fails with an exception. This method is not invoked if a
533 * subtask completes after the scope is cancelled.
534 *
535 * @implSpec The default implementation throws {@code NullPointerException} if the
536 * subtask is {@code null}. It throws {@code IllegalArgumentException} if the
537 * subtask is not in the {@link Subtask.State#SUCCESS SUCCESS} or {@link
538 * Subtask.State#FAILED FAILED} state, it otherwise returns {@code false}.
539 *
540 * @apiNote This method is invoked by subtasks when they complete. It should not
541 * be invoked directly.
542 *
543 * @param subtask the subtask
544 * @return {@code true} to cancel the scope, otherwise {@code false}
545 */
546 default boolean onComplete(Subtask<? extends T> subtask) {
547 if (subtask.state() == Subtask.State.UNAVAILABLE) {
548 throw new IllegalArgumentException();
549 }
550 return false;
551 }
552
553 /**
554 * Invoked by the {@link #join() join()} method to produce the result (or exception)
555 * after waiting for all subtasks to complete or the scope cancelled. The result
556 * from this method is returned by the {@code join} method. If this method throws,
557 * then {@code join} throws {@link FailedException} with the exception thrown by
558 * this method as the cause.
559 *
560 * <p> In normal usage, this method will be called at most once by the {@code join}
561 * method to produce the result (or exception). The behavior of this method when
562 * invoked directly, and invoked more than once, is not specified. Where possible,
563 * an implementation should return an equal result (or throw the same exception)
564 * on second or subsequent calls to produce the outcome.
565 *
566 * @apiNote This method is invoked by the {@code join} method. It should not be
567 * invoked directly.
568 *
569 * @return the result
570 * @throws Throwable the exception
571 */
572 R result() throws Throwable;
573
574 /**
575 * {@return a new Joiner object that yields a stream of all subtasks when all
576 * subtasks complete successfully}
577 * The {@code Joiner} <a href="StructuredTaskScope.html#Cancallation">cancels</a>
578 * the scope and causes {@code join} to throw if any subtask fails.
579 *
580 * <p> If all subtasks complete successfully, the joiner's {@link Joiner#result()}
581 * method returns a stream of all subtasks in the order that they were forked.
582 * If any subtask failed then the {@code result} method throws the exception from
583 * the first subtask to fail.
584 *
585 * @apiNote Joiners returned by this method are suited to cases where all subtasks
586 * return a result of the same type. Joiners returned by {@link
587 * #awaitAllSuccessfulOrThrow()} are suited to cases where the subtasks return
588 * results of different types.
589 *
590 * @param <T> the result type of subtasks
591 */
592 static <T> Joiner<T, Stream<Subtask<T>>> allSuccessfulOrThrow() {
593 return new Joiners.AllSuccessful<>();
594 }
595
596 /**
597 * {@return a new Joiner object that yields the result of any subtask that
598 * completed successfully}
599 * The {@code Joiner} causes {@code join} to throw if all subtasks fail.
600 *
601 * <p> The joiner's {@link Joiner#result()} method returns the result of a subtask
602 * that completed successfully. If all subtasks fail then the {@code result} method
603 * throws the exception from one of the failed subtasks. The {@code result} method
604 * throws {@code NoSuchElementException} if no subtasks were forked.
605 *
606 * @param <T> the result type of subtasks
607 */
608 static <T> Joiner<T, T> anySuccessfulResultOrThrow() {
609 return new Joiners.AnySuccessful<>();
610 }
611
612 /**
613 * {@return a new Joiner object that waits for subtasks to complete successfully}
614 * The {@code Joiner} <a href="StructuredTaskScope.html#Cancallation">cancels</a>
615 * the scope and causes {@code join} to throw if any subtask fails.
616 *
617 * <p> The joiner's {@link Joiner#result() result} method returns {@code null}
618 * if all subtasks complete successfully, or throws the exception from the first
619 * subtask to fail.
620 *
621 * @apiNote Joiners returned by this method are suited to cases where subtasks
622 * return results of different types. Joiners returned by {@link #allSuccessfulOrThrow()}
623 * are suited to cases where the subtasks return a result of the same type.
624 *
625 * @param <T> the result type of subtasks
626 */
627 static <T> Joiner<T, Void> awaitAllSuccessfulOrThrow() {
628 return new Joiners.AwaitSuccessful<>();
629 }
630
631 /**
632 * {@return a new Joiner object that waits for all subtasks to complete}
633 * The {@code Joiner} does not cancel the scope if a subtask fails.
634 *
635 * <p> The joiner's {@link Joiner#result() result} method returns {@code null}.
636 *
637 * @apiNote This Joiner is useful for cases where subtasks make use of
638 * <em>side-effects</em> rather than return results or fail with exceptions.
639 * The {@link #fork(Runnable) fork(Runnable)} method can be used to fork subtasks
640 * that do not return a result.
641 *
642 * <p> This Joiner can also be used for <em>fan-in</em> scenarios where subtasks
643 * are forked to handle incoming connections and the number of subtasks is unbounded.
644 * In this example, the thread executing the {@code acceptLoop} method will only
645 * stop when interrupted or the listener socket is closed asynchronously.
646 * {@snippet lang=java :
647 * void acceptLoop(ServerSocket listener) throws IOException, InterruptedException {
648 * try (var scope = StructuredTaskScope.open(Joiner.<Socket>awaitAll())) {
649 * while (true) {
650 * Socket socket = listener.accept();
651 * scope.fork(() -> handle(socket));
652 * }
653 * }
654 * }
655 * }
656 *
657 * @param <T> the result type of subtasks
658 */
659 static <T> Joiner<T, Void> awaitAll() {
660 // ensure that new Joiner object is returned
661 return new Joiner<T, Void>() {
662 @Override
663 public Void result() {
664 return null;
665 }
666 };
667 }
668
669 /**
670 * {@return a new Joiner object that yields a stream of all subtasks when all
671 * subtasks complete or a predicate returns {@code true} to cancel the scope}
672 *
673 * <p> The joiner's {@link Joiner#onComplete(Subtask)} method invokes the
674 * predicate's {@link Predicate#test(Object) test} method with the subtask that
675 * completed successfully or failed with an exception. If the {@code test} method
676 * returns {@code true} then <a href="StructuredTaskScope.html#Cancallation">
677 * the scope is cancelled</a>. The {@code test} method must be thread safe as it
678 * may be invoked concurrently from several threads. If the {@code test} method
679 * completes with an exception or error, then the thread that executed the subtask
680 * invokes the {@linkplain Thread.UncaughtExceptionHandler uncaught exception handler}
681 * with the exception or error before the thread terminates.
682 *
683 * <p> The joiner's {@link #result()} method returns the stream of all subtasks,
684 * in fork order. The stream may contain subtasks that have completed
685 * (in {@link Subtask.State#SUCCESS SUCCESS} or {@link Subtask.State#FAILED FAILED}
686 * state) or subtasks in the {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state
687 * if the scope was cancelled before all subtasks were forked or completed.
688 *
689 * <p> The following example uses this method to create a {@code Joiner} that
690 * <a href="StructuredTaskScope.html#Cancallation">cancels</a> the scope when
691 * two or more subtasks fail.
692 * {@snippet lang=java :
693 * class CancelAfterTwoFailures<T> implements Predicate<Subtask<? extends T>> {
694 * private final AtomicInteger failedCount = new AtomicInteger();
695 * @Override
696 * public boolean test(Subtask<? extends T> subtask) {
697 * return subtask.state() == Subtask.State.FAILED
698 * && failedCount.incrementAndGet() >= 2;
699 * }
700 * }
701 *
702 * var joiner = Joiner.all(new CancelAfterTwoFailures<String>());
703 * }
704 *
705 * <p> The following example uses {@code allUntil} to wait for all subtasks to
706 * complete without any cancellation. This is similar to {@link #awaitAll()}
707 * except that it yields a stream of the completed subtasks.
708 * {@snippet lang=java :
709 * <T> List<Subtask<T>> invokeAll(Collection<Callable<T>> tasks) throws InterruptedException {
710 * try (var scope = StructuredTaskScope.open(Joiner.<T>allUntil(_ -> false))) {
711 * tasks.forEach(scope::fork);
712 * return scope.join().toList();
713 * }
714 * }
715 * }
716 *
717 * @param isDone the predicate to evaluate completed subtasks
718 * @param <T> the result type of subtasks
719 */
720 static <T> Joiner<T, Stream<Subtask<T>>> allUntil(Predicate<Subtask<? extends T>> isDone) {
721 return new Joiners.AllSubtasks<>(isDone);
722 }
723 }
724
725 /**
726 * Represents the configuration for a {@code StructuredTaskScope}.
727 *
728 * <p> The configuration for a {@code StructuredTaskScope} consists of a {@link
729 * ThreadFactory} to create threads, an optional name for the purposes of monitoring
730 * and management, and an optional timeout.
731 *
732 * <p> Creating a {@code StructuredTaskScope} with {@link #open()} or {@link #open(Joiner)}
733 * uses the <a href="StructuredTaskScope.html#DefaultConfiguration">default
734 * configuration</a>. The default configuration consists of a thread factory that
735 * creates unnamed <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">
736 * virtual threads</a>, no name for monitoring and management purposes, and no timeout.
737 *
738 * <p> Creating a {@code StructuredTaskScope} with its 2-arg {@link #open(Joiner, Function)
739 * open} method allows a different configuration to be used. The function specified
740 * to the {@code open} method is applied to the default configuration and returns the
741 * configuration for the {@code StructuredTaskScope} under construction. The function
742 * can use the {@code with-} prefixed methods defined here to specify the components
743 * of the configuration to use.
744 *
745 * <p> Unless otherwise specified, passing a {@code null} argument to a method
746 * in this class will cause a {@link NullPointerException} to be thrown.
747 *
748 * @since 24
749 */
750 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
751 sealed interface Config permits StructuredTaskScopeImpl.ConfigImpl {
752 /**
753 * {@return a new {@code Config} object with the given thread factory}
754 * The other components are the same as this object. The thread factory is used by
755 * a scope to create threads when {@linkplain #fork(Callable) forking} subtasks.
756 * @param threadFactory the thread factory
757 *
758 * @apiNote The thread factory will typically create
759 * <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>,
760 * maybe with names for monitoring purposes, an {@linkplain Thread.UncaughtExceptionHandler
761 * uncaught exception handler}, or other properties configured.
762 *
763 * @see #fork(Callable)
764 */
765 Config withThreadFactory(ThreadFactory threadFactory);
766
767 /**
768 * {@return a new {@code Config} object with the given name}
769 * The other components are the same as this object. A scope is optionally
770 * named for the purposes of monitoring and management.
771 * @param name the name
772 */
773 Config withName(String name);
774
775 /**
776 * {@return a new {@code Config} object with the given timeout}
777 * The other components are the same as this object.
778 * @param timeout the timeout
779 *
780 * @apiNote Applications using deadlines, expressed as an {@link java.time.Instant},
781 * can use {@link Duration#between Duration.between(Instant.now(), deadline)} to
782 * compute the timeout for this method.
783 *
784 * @see #join()
785 */
786 Config withTimeout(Duration timeout);
787 }
788
789 /**
790 * Exception thrown by {@link #join()} when the outcome is an exception rather than a
791 * result.
792 *
793 * @since 24
794 */
795 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
796 final class FailedException extends RuntimeException {
797 @java.io.Serial
798 static final long serialVersionUID = -1533055100078459923L;
799
800 /**
801 * Constructs a {@code FailedException} with the specified cause.
802 *
803 * @param cause the cause, can be {@code null}
804 */
805 public FailedException(Throwable cause) {
806 super(cause);
807 }
808 }
809
810 /**
811 * Exception thrown by {@link #join()} if the scope was created with a timeout and
812 * the timeout expired before or while waiting in {@code join}.
813 *
814 * @since 24
815 * @see Config#withTimeout(Duration)
816 */
817 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
818 final class TimeoutException extends RuntimeException {
819 @java.io.Serial
820 static final long serialVersionUID = 705788143955048766L;
821
822 /**
823 * Constructs a {@code TimeoutException} with no detail message.
824 */
825 public TimeoutException() { }
826 }
827
828 /**
829 * Opens a new {@code StructuredTaskScope} to use the given {@code Joiner} object and
830 * with configuration that is the result of applying the given function to the
831 * <a href="#DefaultConfiguration">default configuration</a>.
832 *
833 * <p> The {@code configFunction} is called with the default configuration and returns
834 * the configuration for the new scope. The function may, for example, set the
835 * {@linkplain Config#withThreadFactory(ThreadFactory) ThreadFactory} or set a
836 * {@linkplain Config#withTimeout(Duration) timeout}. If the function completes with
837 * an exception or error then it is propagated by this method. If the function returns
838 * {@code null} then {@code NullPointerException} is thrown.
839 *
840 * <p> If a {@code ThreadFactory} is set then its {@link ThreadFactory#newThread(Runnable)
841 * newThread} method will be called to create threads when {@linkplain #fork(Callable)
842 * forking} subtasks in this scope. If a {@code ThreadFactory} is not set then
843 * forking subtasks will create an unnamed virtual thread for each subtask.
844 *
845 * <p> If a {@linkplain Config#withTimeout(Duration) timeout} is set then it starts
846 * when the scope is opened. If the timeout expires before the scope has
847 * {@linkplain #join() joined} then the scope is <a href="#Cancallation">cancelled</a>
848 * and the {@code join} method throws {@link TimeoutException}.
849 *
850 * <p> The new scope is owned by the current thread. Only code executing in this
851 * thread can {@linkplain #fork(Callable) fork}, {@linkplain #join() join}, or
852 * {@linkplain #close close} the scope.
853 *
854 * <p> Construction captures the current thread's {@linkplain ScopedValue scoped
855 * value} bindings for inheritance by threads started in the scope.
856 *
857 * @param joiner the joiner
858 * @param configFunction a function to produce the configuration
859 * @return a new scope
860 * @param <T> the result type of subtasks executed in the scope
861 * @param <R> the result type of the scope
862 * @since 24
863 */
864 static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner,
865 Function<Config, Config> configFunction) {
866 return StructuredTaskScopeImpl.open(joiner, configFunction);
867 }
868
869 /**
870 * Opens a new {@code StructuredTaskScope}to use the given {@code Joiner} object. The
871 * scope is created with the <a href="#DefaultConfiguration">default configuration</a>.
872 * The default configuration has a {@code ThreadFactory} that creates unnamed
873 * <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>,
874 * is unnamed for monitoring and management purposes, and has no timeout.
875 *
876 * @implSpec
877 * This factory method is equivalent to invoking the 2-arg open method with the given
878 * joiner and the {@linkplain Function#identity() identity function}.
879 *
880 * @param joiner the joiner
881 * @return a new scope
882 * @param <T> the result type of subtasks executed in the scope
883 * @param <R> the result type of the scope
884 * @since 24
885 */
886 static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner) {
887 return open(joiner, Function.identity());
888 }
889
890 /**
891 * Opens a new {@code StructuredTaskScope} that can be used to fork subtasks that return
892 * results of any type. The scope's {@link #join()} method waits for all subtasks to
893 * succeed or any subtask to fail.
894 *
895 * <p> The {@code join} method returns {@code null} if all subtasks complete successfully.
896 * It throws {@link FailedException} if any subtask fails, with the exception from
897 * the first subtask to fail as the cause.
898 *
899 * <p> The scope is created with the <a href="#DefaultConfiguration">default
900 * configuration</a>. The default configuration has a {@code ThreadFactory} that creates
901 * unnamed <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual
902 * threads</a>, is unnamed for monitoring and management purposes, and has no timeout.
903 *
904 * @implSpec
905 * This factory method is equivalent to invoking the 2-arg open method with a joiner
906 * created with {@link Joiner#awaitAllSuccessfulOrThrow() awaitAllSuccessfulOrThrow()}
907 * and the {@linkplain Function#identity() identity function}.
908 *
909 * @param <T> the result type of subtasks
910 * @return a new scope
911 * @since 24
912 */
913 static <T> StructuredTaskScope<T, Void> open() {
914 return open(Joiner.awaitAllSuccessfulOrThrow(), Function.identity());
915 }
916
917 /**
918 * Fork a subtask by starting a new thread in this scope to execute a value-returning
919 * method. The new thread executes the subtask concurrently with the current thread.
920 * The parameter to this method is a {@link Callable}, the new thread executes its
921 * {@link Callable#call() call()} method.
922 *
923 * <p> This method first creates a {@link Subtask Subtask} object to represent the
924 * <em>forked subtask</em>. It invokes the joiner's {@link Joiner#onFork(Subtask) onFork}
925 * method with the subtask in the {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state.
926 * If the {@code onFork} completes with an exception or error then it is propagated by
927 * the {@code fork} method without creating a thread. If the scope is already
928 * <a href="#Cancallation">cancelled</a>, or {@code onFork} returns {@code true} to
929 * cancel the scope, then this method returns the {@code Subtask}, in the
930 * {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state, without creating a thread to
931 * execute the subtask.
932 *
933 * <p> If the scope is not cancelled, and the {@code onFork} method returns {@code false},
934 * then a thread is created with the {@link ThreadFactory} configured when the scope
935 * was opened, and the thread is started. Forking a subtask inherits the current thread's
936 * {@linkplain ScopedValue scoped value} bindings. The bindings must match the bindings
937 * captured when the scope was opened. If the subtask completes (successfully or with
938 * an exception) before the scope is cancelled, then the thread invokes the joiner's
939 * {@link Joiner#onComplete(Subtask) onComplete} method with the subtask in the
940 * {@link Subtask.State#SUCCESS SUCCESS} or {@link Subtask.State#FAILED FAILED} state.
941 * If the {@code onComplete} method completes with an exception or error, then the
942 * {@linkplain Thread.UncaughtExceptionHandler uncaught exception handler} is invoked
943 * with the exception or error before the thread terminates.
944 *
945 * <p> This method returns the {@link Subtask Subtask} object. In some usages, this
946 * object may be used to get its result. In other cases it may be used for correlation
947 * or just discarded. To ensure correct usage, the {@link Subtask#get() Subtask.get()}
948 * method may only be called by the scope owner to get the result after it has
949 * waited for subtasks to complete with the {@link #join() join} method and the subtask
950 * completed successfully. Similarly, the {@link Subtask#exception() Subtask.exception()}
951 * method may only be called by the scope owner after it has joined and the subtask
952 * failed. If the scope was cancelled before the subtask was forked, or before it
953 * completes, then neither method can be used to obtain the outcome.
954 *
955 * <p> This method may only be invoked by the scope owner.
956 *
957 * @param task the value-returning task for the thread to execute
958 * @param <U> the result type
959 * @return the subtask
960 * @throws WrongThreadException if the current thread is not the scope owner
961 * @throws IllegalStateException if the owner has already {@linkplain #join() joined}
962 * or the scope is closed
963 * @throws StructureViolationException if the current scoped value bindings are not
964 * the same as when the scope was created
965 * @throws RejectedExecutionException if the thread factory rejected creating a
966 * thread to run the subtask
967 */
968 <U extends T> Subtask<U> fork(Callable<? extends U> task);
969
970 /**
971 * Fork a subtask by starting a new thread in this scope to execute a method that
972 * does not return a result.
973 *
974 * <p> This method works exactly the same as {@link #fork(Callable)} except that the
975 * parameter to this method is a {@link Runnable}, the new thread executes its
976 * {@link Runnable#run() run} method, and {@link Subtask#get() Subtask.get()} returns
977 * {@code null} if the subtask completes successfully.
978 *
979 * @param task the task for the thread to execute
980 * @param <U> the result type
981 * @return the subtask
982 * @throws WrongThreadException if the current thread is not the scope owner
983 * @throws IllegalStateException if the owner has already {@linkplain #join() joined}
984 * or the scope is closed
985 * @throws StructureViolationException if the current scoped value bindings are not
986 * the same as when the scope was created
987 * @throws RejectedExecutionException if the thread factory rejected creating a
988 * thread to run the subtask
989 * @since 24
990 */
991 <U extends T> Subtask<U> fork(Runnable task);
992
993 /**
994 * Returns the result, or throws, after waiting for all subtasks to complete or
995 * the scope to be <a href="#Cancallation">cancelled</a>.
996 *
997 * <p> This method waits for all subtasks started in this scope to complete or the
998 * scope to be cancelled. If a {@linkplain Config#withTimeout(Duration) timeout} is
999 * configured and the timeout expires before or while waiting, then the scope is
1000 * cancelled and {@link TimeoutException TimeoutException} is thrown. Once finished
1001 * waiting, the {@code Joiner}'s {@link Joiner#result() result()} method is invoked
1002 * to get the result or throw an exception. If the {@code result()} method throws
1003 * then this method throws {@code FailedException} with the exception as the cause.
1004 *
1005 * <p> This method may only be invoked by the scope owner, and only once.
1006 *
1007 * @return the result
1008 * @throws WrongThreadException if the current thread is not the scope owner
1009 * @throws IllegalStateException if already joined or this scope is closed
1010 * @throws FailedException if the <i>outcome</i> is an exception, thrown with the
1011 * exception from {@link Joiner#result() Joiner.result()} as the cause
1012 * @throws TimeoutException if a timeout is set and the timeout expires before or
1013 * while waiting
1014 * @throws InterruptedException if interrupted while waiting
1015 * @since 24
1016 */
1017 R join() throws InterruptedException;
1018
1019 /**
1020 * {@return {@code true} if this scope is <a href="#Cancallation">cancelled</a> or in
1021 * the process of being cancelled, otherwise {@code false}}
1022 *
1023 * <p> Cancelling the scope prevents new threads from starting in the scope and
1024 * {@linkplain Thread#interrupt() interrupts} threads executing unfinished subtasks.
1025 * It may take some time before the interrupted threads finish execution; this
1026 * method may return {@code true} before all threads have been interrupted or before
1027 * all threads have finished.
1028 *
1029 * @apiNote A task with a lengthy "forking phase" (the code that executes before
1030 * it invokes {@link #join() join}) may use this method to avoid doing work in cases
1031 * where scope is cancelled by the completion of a previously forked subtask or timeout.
1032 *
1033 * @since 24
1034 */
1035 boolean isCancelled();
1036
1037 /**
1038 * Closes this scope.
1039 *
1040 * <p> This method first <a href="#Cancallation">cancels</a> the scope, if not
1041 * already cancelled. This interrupts the threads executing unfinished subtasks. This
1042 * method then waits for all threads to finish. If interrupted while waiting then it
1043 * will continue to wait until the threads finish, before completing with the interrupt
1044 * status set.
1045 *
1046 * <p> This method may only be invoked by the scope owner. If the scope
1047 * is already closed then the scope owner invoking this method has no effect.
1048 *
1049 * <p> A {@code StructuredTaskScope} is intended to be used in a <em>structured
1050 * manner</em>. If this method is called to close a scope before nested task
1051 * scopes are closed then it closes the underlying construct of each nested scope
1052 * (in the reverse order that they were created in), closes this scope, and then
1053 * throws {@link StructureViolationException}.
1054 * Similarly, if this method is called to close a scope while executing with
1055 * {@linkplain ScopedValue scoped value} bindings, and the scope was created
1056 * before the scoped values were bound, then {@code StructureViolationException} is
1057 * thrown after closing the scope.
1058 * If a thread terminates without first closing scopes that it owns then
1059 * termination will cause the underlying construct of each of its open tasks scopes to
1060 * be closed. Closing is performed in the reverse order that the scopes were
1061 * created in. Thread termination may therefore be delayed when the scope owner
1062 * has to wait for threads forked in these scopes to finish.
1063 *
1064 * @throws IllegalStateException thrown after closing the scope if the scope
1065 * owner did not attempt to join after forking
1066 * @throws WrongThreadException if the current thread is not the scope owner
1067 * @throws StructureViolationException if a structure violation was detected
1068 */
1069 @Override
1070 void close();
1071 }
|