1 /*
2 * Copyright (c) 2021, 2023, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package java.util.concurrent;
26
27 import java.lang.invoke.MethodHandles;
28 import java.lang.invoke.VarHandle;
29 import java.security.AccessController;
30 import java.security.PrivilegedAction;
31 import java.time.Duration;
32 import java.time.Instant;
33 import java.util.Objects;
34 import java.util.Optional;
35 import java.util.concurrent.locks.ReentrantLock;
36 import java.util.function.Function;
37 import java.util.function.Supplier;
38 import jdk.internal.javac.PreviewFeature;
39 import jdk.internal.misc.ThreadFlock;
40
41 /**
42 * A basic API for <em>structured concurrency</em>. {@code StructuredTaskScope} supports
43 * cases where a task splits into several concurrent subtasks, and where the subtasks must
44 * complete before the main task continues. A {@code StructuredTaskScope} can be used to
45 * ensure that the lifetime of a concurrent operation is confined by a <em>syntax block</em>,
46 * just like that of a sequential operation in structured programming.
47 *
48 * <h2>Basic operation</h2>
49 *
50 * A {@code StructuredTaskScope} is created with one of its public constructors. It defines
51 * the {@link #fork(Callable) fork} method to start a thread to execute a subtask, the {@link
52 * #join() join} method to wait for all subtasks to finish, and the {@link #close() close}
53 * method to close the task scope. The API is intended to be used with the {@code
54 * try-with-resources} statement. The intention is that code in the try <em>block</em>
55 * uses the {@code fork} method to fork threads to execute the subtasks, wait for the
56 * subtasks to finish with the {@code join} method, and then <em>process the results</em>.
57 * A call to the {@code fork} method returns a {@link Subtask Subtask} to representing
58 * the <em>forked subtask</em>. Once {@code join} is called, the {@code Subtask} can be
59 * used to get the result completed successfully, or the exception if the subtask failed.
60 * {@snippet lang=java :
61 * Callable<String> task1 = ...
62 * Callable<Integer> task2 = ...
63 *
64 * try (var scope = new StructuredTaskScope<Object>()) {
65 *
66 * Subtask<String> subtask1 = scope.fork(task1); // @highlight substring="fork"
67 * Subtask<Integer> subtask2 = scope.fork(task2); // @highlight substring="fork"
68 *
69 * scope.join(); // @highlight substring="join"
70 *
71 * ... process results/exceptions ...
72 *
73 * } // close // @highlight substring="close"
74 * }
75 * <p> The following example forks a collection of homogeneous subtasks, waits for all of
76 * them to complete with the {@code join} method, and uses the {@link Subtask.State
77 * Subtask.State} to partition the subtasks into a set of the subtasks that completed
78 * successfully and another for the subtasks that failed.
79 * {@snippet lang=java :
80 * List<Callable<String>> callables = ...
81 *
82 * try (var scope = new StructuredTaskScope<String>()) {
83 *
84 * List<Subtask<String>> subtasks = callables.stream().map(scope::fork).toList();
85 *
86 * scope.join();
87 *
88 * Map<Boolean, Set<Subtask<String>>> map = subtasks.stream()
89 * .collect(Collectors.partitioningBy(h -> h.state() == Subtask.State.SUCCESS,
90 * Collectors.toSet()));
91 *
92 * } // close
93 * }
94 *
95 * <p> To ensure correct usage, the {@code join} and {@code close} methods may only be
96 * invoked by the <em>owner</em> (the thread that opened/created the task scope), and the
97 * {@code close} method throws an exception after closing if the owner did not invoke the
98 * {@code join} method after forking.
99 *
100 * <p> {@code StructuredTaskScope} defines the {@link #shutdown() shutdown} method to shut
101 * down a task scope without closing it. The {@code shutdown()} method <em>cancels</em> all
102 * unfinished subtasks by {@linkplain Thread#interrupt() interrupting} the threads. It
103 * prevents new threads from starting in the task scope. If the owner is waiting in the
104 * {@code join} method then it will wakeup.
105 *
106 * <p> Shutdown is used for <em>short-circuiting</em> and allow subclasses to implement
107 * <em>policy</em> that does not require all subtasks to finish.
108 *
109 * <h2>Subclasses with policies for common cases</h2>
110 *
111 * Two subclasses of {@code StructuredTaskScope} are defined to implement policy for
112 * common cases:
113 * <ol>
114 * <li> {@link ShutdownOnSuccess ShutdownOnSuccess} captures the result of the first
115 * subtask to complete successfully. Once captured, it shuts down the task scope to
116 * interrupt unfinished threads and wakeup the owner. This class is intended for cases
117 * where the result of any subtask will do ("invoke any") and where there is no need to
118 * wait for results of other unfinished subtasks. It defines methods to get the first
119 * result or throw an exception if all subtasks fail.
120 * <li> {@link ShutdownOnFailure ShutdownOnFailure} captures the exception of the first
121 * subtask to fail. Once captured, it shuts down the task scope to interrupt unfinished
122 * threads and wakeup the owner. This class is intended for cases where the results of all
123 * subtasks are required ("invoke all"); if any subtask fails then the results of other
124 * unfinished subtasks are no longer needed. If defines methods to throw an exception if
125 * any of the subtasks fail.
126 * </ol>
127 *
128 * <p> The following are two examples that use the two classes. In both cases, a pair of
129 * subtasks are forked to fetch resources from two URL locations "left" and "right". The
130 * first example creates a ShutdownOnSuccess object to capture the result of the first
131 * subtask to complete successfully, cancelling the other by way of shutting down the task
132 * scope. The main task waits in {@code join} until either subtask completes with a result
133 * or both subtasks fail. It invokes {@link ShutdownOnSuccess#result(Function)
134 * result(Function)} method to get the captured result. If both subtasks fail then this
135 * method throws a {@code WebApplicationException} with the exception from one of the
136 * subtasks as the cause.
137 * {@snippet lang=java :
138 * try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
139 *
140 * scope.fork(() -> fetch(left));
141 * scope.fork(() -> fetch(right));
142 *
143 * scope.join();
144 *
145 * // @link regex="result(?=\()" target="ShutdownOnSuccess#result" :
146 * String result = scope.result(e -> new WebApplicationException(e));
147 *
148 * ...
149 * }
150 * }
151 * The second example creates a ShutdownOnFailure object to capture the exception of the
152 * first subtask to fail, cancelling the other by way of shutting down the task scope. The
153 * main task waits in {@link #joinUntil(Instant)} until both subtasks complete with a
154 * result, either fails, or a deadline is reached. It invokes {@link
155 * ShutdownOnFailure#throwIfFailed(Function) throwIfFailed(Function)} to throw an exception
156 * if either subtask fails. This method is a no-op if both subtasks complete successfully.
157 * The example uses {@link Supplier#get()} to get the result of each subtask. Using
158 * {@code Supplier} instead of {@code Subtask} is preferred for common cases where the
159 * object returned by fork is only used to get the result of a subtask that completed
160 * successfully.
161 * {@snippet lang=java :
162 * Instant deadline = ...
163 *
164 * try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
165 *
166 * Supplier<String> supplier1 = scope.fork(() -> query(left));
167 * Supplier<String> supplier2 = scope.fork(() -> query(right));
168 *
169 * scope.joinUntil(deadline);
170 *
171 * // @link substring="throwIfFailed" target="ShutdownOnFailure#throwIfFailed" :
172 * scope.throwIfFailed(e -> new WebApplicationException(e));
173 *
174 * // both subtasks completed successfully
175 * String result = Stream.of(supplier1, supplier2)
176 * .map(Supplier::get)
177 * .collect(Collectors.joining(", ", "{ ", " }"));
178 *
179 * ...
180 * }
181 * }
182 *
183 * <h2>Extending StructuredTaskScope</h2>
184 *
185 * {@code StructuredTaskScope} can be extended, and the {@link #handleComplete(Subtask)
186 * handleComplete} method overridden, to implement policies other than those implemented
187 * by {@code ShutdownOnSuccess} and {@code ShutdownOnFailure}. A subclass may, for example,
188 * collect the results of subtasks that complete successfully and ignore subtasks that
189 * fail. It may collect exceptions when subtasks fail. It may invoke the {@link #shutdown()
190 * shutdown} method to shut down and cause {@link #join() join} to wakeup when some
191 * condition arises.
192 *
193 * <p> A subclass will typically define methods to make available results, state, or other
194 * outcome to code that executes after the {@code join} method. A subclass that collects
195 * results and ignores subtasks that fail may define a method that returns the results.
196 * A subclass that implements a policy to shut down when a subtask fails may define a
197 * method to get the exception of the first subtask to fail.
198 *
199 * <p> The following is an example of a simple {@code StructuredTaskScope} implementation
200 * that collects homogenous subtasks that complete successfully. It defines the method
201 * "{@code completedSuccessfully()}" that the main task can invoke after it joins.
202 * {@snippet lang=java :
203 * class CollectingScope<T> extends StructuredTaskScope<T> {
204 * private final Queue<Subtask<? extends T>> subtasks = new LinkedTransferQueue<>();
205 *
206 * @Override
207 * protected void handleComplete(Subtask<? extends T> subtask) {
208 * if (subtask.state() == Subtask.State.SUCCESS) {
209 * subtasks.add(subtask);
210 * }
211 * }
212 *
213 * @Override
214 * public CollectingScope<T> join() throws InterruptedException {
215 * super.join();
216 * return this;
217 * }
218 *
219 * public Stream<Subtask<? extends T>> completedSuccessfully() {
220 * // @link substring="ensureOwnerAndJoined" target="ensureOwnerAndJoined" :
221 * super.ensureOwnerAndJoined();
222 * return subtasks.stream();
223 * }
224 * }
225 * }
226 * <p> The implementations of the {@code completedSuccessfully()} method in the example
227 * invokes {@link #ensureOwnerAndJoined()} to ensure that the method can only be invoked
228 * by the owner thread and only after it has joined.
229 *
230 * <h2><a id="TreeStructure">Tree structure</a></h2>
231 *
232 * Task scopes form a tree where parent-child relations are established implicitly when
233 * opening a new task scope:
234 * <ul>
235 * <li> A parent-child relation is established when a thread started in a task scope
236 * opens its own task scope. A thread started in task scope "A" that opens task scope
237 * "B" establishes a parent-child relation where task scope "A" is the parent of task
238 * scope "B".
239 * <li> A parent-child relation is established with nesting. If a thread opens task
240 * scope "B", then opens task scope "C" (before it closes "B"), then the enclosing task
241 * scope "B" is the parent of the nested task scope "C".
242 * </ul>
243 *
244 * The <i>descendants</i> of a task scope are the child task scopes that it is a parent
245 * of, plus the descendants of the child task scopes, recursively.
246 *
247 * <p> The tree structure supports:
248 * <ul>
249 * <li> Inheritance of {@linkplain ScopedValue scoped values} across threads.
250 * <li> Confinement checks. The phrase "threads contained in the task scope" in method
251 * descriptions means threads started in the task scope or descendant scopes.
252 * </ul>
253 *
254 * <p> The following example demonstrates the inheritance of a scoped value. A scoped
255 * value {@code USERNAME} is bound to the value "{@code duke}". A {@code StructuredTaskScope}
256 * is created and its {@code fork} method invoked to start a thread to execute {@code
257 * childTask}. The thread inherits the scoped value <em>bindings</em> captured when
258 * creating the task scope. The code in {@code childTask} uses the value of the scoped
259 * value and so reads the value "{@code duke}".
260 * {@snippet lang=java :
261 * private static final ScopedValue<String> USERNAME = ScopedValue.newInstance();
262 *
263 * // @link substring="runWhere" target="ScopedValue#runWhere(ScopedValue, Object, Runnable)" :
264 * ScopedValue.runWhere(USERNAME, "duke", () -> {
265 * try (var scope = new StructuredTaskScope<String>()) {
266 *
267 * scope.fork(() -> childTask()); // @highlight substring="fork"
268 * ...
269 * }
270 * });
271 *
272 * ...
273 *
274 * String childTask() {
275 * // @link substring="get" target="ScopedValue#get()" :
276 * String name = USERNAME.get(); // "duke"
277 * ...
278 * }
279 * }
280 *
281 * <p> {@code StructuredTaskScope} does not define APIs that exposes the tree structure
282 * at this time.
283 *
284 * <p> Unless otherwise specified, passing a {@code null} argument to a constructor
285 * or method in this class will cause a {@link NullPointerException} to be thrown.
286 *
287 * <h2>Memory consistency effects</h2>
288 *
289 * <p> Actions in the owner thread of, or a thread contained in, the task scope prior to
290 * {@linkplain #fork forking} of a subtask
291 * <a href="{@docRoot}/java.base/java/util/concurrent/package-summary.html#MemoryVisibility">
292 * <i>happen-before</i></a> any actions taken by that subtask, which in turn <i>happen-before</i>
293 * the subtask result is {@linkplain Subtask#get() retrieved} or <i>happen-before</i> any
294 * actions taken in a thread after {@linkplain #join() joining} of the task scope.
295 *
296 * @jls 17.4.5 Happens-before Order
297 *
298 * @param <T> the result type of tasks executed in the task scope
299 * @since 21
300 */
301 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
302 public class StructuredTaskScope<T> implements AutoCloseable {
303 private final ThreadFactory factory;
304 private final ThreadFlock flock;
305 private final ReentrantLock shutdownLock = new ReentrantLock();
306
307 // states: OPEN -> SHUTDOWN -> CLOSED
308 private static final int OPEN = 0; // initial state
309 private static final int SHUTDOWN = 1;
310 private static final int CLOSED = 2;
311
312 // state: set to SHUTDOWN by any thread, set to CLOSED by owner, read by any thread
313 private volatile int state;
314
315 // Counters to support checking that the task scope owner joins before processing
316 // results and attempts join before closing the task scope. These counters are
317 // accessed only by the owner thread.
318 private int forkRound; // incremented when the first subtask is forked after join
319 private int lastJoinAttempted; // set to the current fork round when join is attempted
320 private int lastJoinCompleted; // set to the current fork round when join completes
321
322 /**
323 * Represents a subtask forked with {@link #fork(Callable)}.
324 * @param <T> the result type
325 * @since 21
326 */
327 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
328 public sealed interface Subtask<T> extends Supplier<T> permits SubtaskImpl {
329 /**
330 * {@return the value returning task provided to the {@code fork} method}
331 *
332 * @apiNote Task objects with unique identity may be used for correlation by
333 * implementations of {@link #handleComplete(Subtask) handleComplete}.
334 */
335 Callable<? extends T> task();
336
337 /**
338 * Represents the state of a subtask.
339 * @see Subtask#state()
340 * @since 21
341 */
342 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
343 enum State {
344 /**
345 * The subtask result or exception is not available. This state indicates that
346 * the subtask was forked but has not completed, it completed after the task
347 * scope was {@linkplain #shutdown() shut down}, or it was forked after the
348 * task scope was shut down.
349 */
350 UNAVAILABLE,
351 /**
352 * The subtask completed successfully with a result. The {@link Subtask#get()
353 * Subtask.get()} method can be used to obtain the result. This is a terminal
354 * state.
355 */
356 SUCCESS,
357 /**
358 * The subtask failed with an exception. The {@link Subtask#exception()
359 * Subtask.exception()} method can be used to obtain the exception. This is a
360 * terminal state.
361 */
362 FAILED,
363 }
364
365 /**
366 * {@return the state of the subtask}
367 */
368 State state();
369
370 /**
371 * Returns the result of the subtask.
372 *
373 * <p> To ensure correct usage, if the scope owner {@linkplain #fork(Callable) forks}
374 * a subtask, then it must join (with {@link #join() join} or {@link #joinUntil(Instant)
375 * joinUntil}) before it can obtain the result of the subtask.
376 *
377 * @return the possibly-null result
378 * @throws IllegalStateException if the subtask has not completed, did not complete
379 * successfully, or the current thread is the task scope owner and did not join
380 * after forking
381 * @see State#SUCCESS
382 */
383 T get();
384
385 /**
386 * {@return the exception thrown by the subtask}
387 *
388 * <p> To ensure correct usage, if the scope owner {@linkplain #fork(Callable) forks}
389 * a subtask, then it must join (with {@link #join() join} or {@link #joinUntil(Instant)
390 * joinUntil}) before it can obtain the exception thrown by the subtask.
391 *
392 * @throws IllegalStateException if the subtask has not completed, completed with
393 * a result, or the current thread is the task scope owner and did not join after
394 * forking
395 * @see State#FAILED
396 */
397 Throwable exception();
398 }
399
400 /**
401 * Creates a structured task scope with the given name and thread factory. The task
402 * scope is optionally named for the purposes of monitoring and management. The thread
403 * factory is used to {@link ThreadFactory#newThread(Runnable) create} threads when
404 * subtasks are {@linkplain #fork(Callable) forked}. The task scope is owned by the
405 * current thread.
406 *
407 * <p> Construction captures the current thread's {@linkplain ScopedValue scoped value}
408 * bindings for inheritance by threads started in the task scope. The
409 * <a href="#TreeStructure">Tree Structure</a> section in the class description details
410 * how parent-child relations are established implicitly for the purpose of inheritance
411 * of scoped value bindings.
412 *
413 * @param name the name of the task scope, can be null
414 * @param factory the thread factory
415 */
416 @SuppressWarnings("this-escape")
417 public StructuredTaskScope(String name, ThreadFactory factory) {
418 this.factory = Objects.requireNonNull(factory, "'factory' is null");
419 if (name == null)
420 name = Objects.toIdentityString(this);
421 this.flock = ThreadFlock.open(name);
422 }
423
424 /**
425 * Creates an unnamed structured task scope that creates virtual threads. The task
426 * scope is owned by the current thread.
427 *
428 * @implSpec This constructor is equivalent to invoking the 2-arg constructor with a
429 * name of {@code null} and a thread factory that creates virtual threads.
430 */
431 public StructuredTaskScope() {
432 this(null, Thread.ofVirtual().factory());
433 }
434
435 private IllegalStateException newIllegalStateExceptionScopeClosed() {
436 return new IllegalStateException("Task scope is closed");
437 }
438
439 private IllegalStateException newIllegalStateExceptionNoJoin() {
440 return new IllegalStateException("Owner did not join after forking subtasks");
441 }
442
443 /**
444 * Throws IllegalStateException if the scope is closed, returning the state if not
445 * closed.
446 */
447 private int ensureOpen() {
448 int s = state;
449 if (s == CLOSED)
450 throw newIllegalStateExceptionScopeClosed();
451 return s;
452 }
453
454 /**
455 * Throws WrongThreadException if the current thread is not the owner.
456 */
457 private void ensureOwner() {
458 if (Thread.currentThread() != flock.owner())
459 throw new WrongThreadException("Current thread not owner");
460 }
461
462 /**
463 * Throws WrongThreadException if the current thread is not the owner
464 * or a thread contained in the tree.
465 */
466 private void ensureOwnerOrContainsThread() {
467 Thread currentThread = Thread.currentThread();
468 if (currentThread != flock.owner() && !flock.containsThread(currentThread))
469 throw new WrongThreadException("Current thread not owner or thread in the tree");
470 }
471
472 /**
473 * Throws IllegalStateException if the current thread is the owner, and the owner did
474 * not join after forking a subtask in the given fork round.
475 */
476 private void ensureJoinedIfOwner(int round) {
477 if (Thread.currentThread() == flock.owner() && (round > lastJoinCompleted)) {
478 throw newIllegalStateExceptionNoJoin();
479 }
480 }
481
482 /**
483 * Ensures that the current thread is the owner of this task scope and that it joined
484 * (with {@link #join()} or {@link #joinUntil(Instant)}) after {@linkplain #fork(Callable)
485 * forking} subtasks.
486 *
487 * @apiNote This method can be used by subclasses that define methods to make available
488 * results, state, or other outcome to code intended to execute after the join method.
489 *
490 * @throws WrongThreadException if the current thread is not the task scope owner
491 * @throws IllegalStateException if the task scope is open and task scope owner did
492 * not join after forking
493 */
494 protected final void ensureOwnerAndJoined() {
495 ensureOwner();
496 if (forkRound > lastJoinCompleted) {
497 throw newIllegalStateExceptionNoJoin();
498 }
499 }
500
501 /**
502 * Invoked by a subtask when it completes successfully or fails in this task scope.
503 * This method is not invoked if a subtask completes after the task scope is
504 * {@linkplain #shutdown() shut down}.
505 *
506 * @implSpec The default implementation throws {@code NullPointerException} if the
507 * subtask is {@code null}. It throws {@link IllegalArgumentException} if the subtask
508 * has not completed.
509 *
510 * @apiNote The {@code handleComplete} method should be thread safe. It may be
511 * invoked by several threads concurrently.
512 *
513 * @param subtask the subtask
514 *
515 * @throws IllegalArgumentException if called with a subtask that has not completed
516 */
517 protected void handleComplete(Subtask<? extends T> subtask) {
518 if (subtask.state() == Subtask.State.UNAVAILABLE)
519 throw new IllegalArgumentException();
520 }
521
522 /**
523 * Starts a new thread in this task scope to execute a value-returning task, thus
524 * creating a <em>subtask</em> of this task scope.
525 *
526 * <p> The value-returning task is provided to this method as a {@link Callable}, the
527 * thread executes the task's {@link Callable#call() call} method. The thread is
528 * created with the task scope's {@link ThreadFactory}. It inherits the current thread's
529 * {@linkplain ScopedValue scoped value} bindings. The bindings must match the bindings
530 * captured when the task scope was created.
531 *
532 * <p> This method returns a {@link Subtask Subtask} to represent the <em>forked
533 * subtask</em>. The {@code Subtask} object can be used to obtain the result when
534 * the subtask completes successfully, or the exception when the subtask fails. To
535 * ensure correct usage, the {@link Subtask#get() get()} and {@link Subtask#exception()
536 * exception()} methods may only be called by the task scope owner after it has waited
537 * for all threads to finish with the {@link #join() join} or {@link #joinUntil(Instant)}
538 * methods. When the subtask completes, the thread invokes the {@link
539 * #handleComplete(Subtask) handleComplete} method to consume the completed subtask.
540 * If the task scope is {@linkplain #shutdown() shut down} before the subtask completes
541 * then the {@code handleComplete} method will not be invoked.
542 *
543 * <p> If this task scope is {@linkplain #shutdown() shutdown} (or in the process of
544 * shutting down) then the subtask will not run and the {@code handleComplete} method
545 * will not be invoked.
546 *
547 * <p> This method may only be invoked by the task scope owner or threads contained
548 * in the task scope.
549 *
550 * @implSpec This method may be overridden for customization purposes, wrapping tasks
551 * for example. If overridden, the subclass must invoke {@code super.fork} to start a
552 * new thread in this task scope.
553 *
554 * @param task the value-returning task for the thread to execute
555 * @param <U> the result type
556 * @return the subtask
557 * @throws IllegalStateException if this task scope is closed
558 * @throws WrongThreadException if the current thread is not the task scope owner or a
559 * thread contained in the task scope
560 * @throws StructureViolationException if the current scoped value bindings are not
561 * the same as when the task scope was created
562 * @throws RejectedExecutionException if the thread factory rejected creating a
563 * thread to run the subtask
564 */
565 public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
566 Objects.requireNonNull(task, "'task' is null");
567 int s = ensureOpen(); // throws ISE if closed
568
569 // when forked by the owner, the subtask is forked in the current or next round
570 int round = -1;
571 if (Thread.currentThread() == flock.owner()) {
572 round = forkRound;
573 if (forkRound == lastJoinCompleted) {
574 // new round if first fork after join
575 round++;
576 }
577 }
578
579 SubtaskImpl<U> subtask = new SubtaskImpl<>(this, task, round);
580 if (s < SHUTDOWN) {
581 // create thread to run task
582 Thread thread = factory.newThread(subtask);
583 if (thread == null) {
584 throw new RejectedExecutionException("Rejected by thread factory");
585 }
586
587 // attempt to start the thread
588 try {
589 flock.start(thread);
590 } catch (IllegalStateException e) {
591 // shutdown by another thread, or underlying flock is shutdown due
592 // to unstructured use
593 }
594 }
595
596 // force owner to join if this is the first fork in the round
597 if (Thread.currentThread() == flock.owner() && round > forkRound) {
598 forkRound = round;
599 }
600
601 // return forked subtask or a subtask that did not run
602 return subtask;
603 }
604
605 /**
606 * Wait for all threads to finish or the task scope to shut down.
607 */
608 private void implJoin(Duration timeout)
609 throws InterruptedException, TimeoutException
610 {
611 ensureOwner();
612 lastJoinAttempted = forkRound;
613 int s = ensureOpen(); // throws ISE if closed
614 if (s == OPEN) {
615 // wait for all threads, wakeup, interrupt, or timeout
616 if (timeout != null) {
617 flock.awaitAll(timeout);
618 } else {
619 flock.awaitAll();
620 }
621 }
622 lastJoinCompleted = forkRound;
623 }
624
625 /**
626 * Wait for all subtasks started in this task scope to finish or the task scope to
627 * shut down.
628 *
629 * <p> This method waits for all subtasks by waiting for all threads {@linkplain
630 * #fork(Callable) started} in this task scope to finish execution. It stops waiting
631 * when all threads finish, the task scope is {@linkplain #shutdown() shut down}, or
632 * the current thread is {@linkplain Thread#interrupt() interrupted}.
633 *
634 * <p> This method may only be invoked by the task scope owner.
635 *
636 * @implSpec This method may be overridden for customization purposes or to return a
637 * more specific return type. If overridden, the subclass must invoke {@code
638 * super.join} to ensure that the method waits for threads in this task scope to
639 * finish.
640 *
641 * @return this task scope
642 * @throws IllegalStateException if this task scope is closed
643 * @throws WrongThreadException if the current thread is not the task scope owner
644 * @throws InterruptedException if interrupted while waiting
645 */
646 public StructuredTaskScope<T> join() throws InterruptedException {
647 try {
648 implJoin(null);
649 } catch (TimeoutException e) {
650 throw new InternalError();
651 }
652 return this;
653 }
654
655 /**
656 * Wait for all subtasks started in this task scope to finish or the task scope to
657 * shut down, up to the given deadline.
658 *
659 * <p> This method waits for all subtasks by waiting for all threads {@linkplain
660 * #fork(Callable) started} in this task scope to finish execution. It stops waiting
661 * when all threads finish, the task scope is {@linkplain #shutdown() shut down}, the
662 * deadline is reached, or the current thread is {@linkplain Thread#interrupt()
663 * interrupted}.
664 *
665 * <p> This method may only be invoked by the task scope owner.
666 *
667 * @implSpec This method may be overridden for customization purposes or to return a
668 * more specific return type. If overridden, the subclass must invoke {@code
669 * super.joinUntil} to ensure that the method waits for threads in this task scope to
670 * finish.
671 *
672 * @param deadline the deadline
673 * @return this task scope
674 * @throws IllegalStateException if this task scope is closed
675 * @throws WrongThreadException if the current thread is not the task scope owner
676 * @throws InterruptedException if interrupted while waiting
677 * @throws TimeoutException if the deadline is reached while waiting
678 */
679 public StructuredTaskScope<T> joinUntil(Instant deadline)
680 throws InterruptedException, TimeoutException
681 {
682 Duration timeout = Duration.between(Instant.now(), deadline);
683 implJoin(timeout);
684 return this;
685 }
686
687 /**
688 * Interrupt all unfinished threads.
689 */
690 private void implInterruptAll() {
691 flock.threads()
692 .filter(t -> t != Thread.currentThread())
693 .forEach(t -> {
694 try {
695 t.interrupt();
696 } catch (Throwable ignore) { }
697 });
698 }
699
700 @SuppressWarnings("removal")
701 private void interruptAll() {
702 if (System.getSecurityManager() == null) {
703 implInterruptAll();
704 } else {
705 PrivilegedAction<Void> pa = () -> {
706 implInterruptAll();
707 return null;
708 };
709 AccessController.doPrivileged(pa);
710 }
711 }
712
713 /**
714 * Shutdown the task scope if not already shutdown. Return true if this method
715 * shutdowns the task scope, false if already shutdown.
716 */
717 private boolean implShutdown() {
718 shutdownLock.lock();
719 try {
720 if (state < SHUTDOWN) {
721 // prevent new threads from starting
722 flock.shutdown();
723
724 // set status before interrupting tasks
725 state = SHUTDOWN;
726
727 // interrupt all unfinished threads
728 interruptAll();
729
730 return true;
731 } else {
732 // already shutdown
733 return false;
734 }
735 } finally {
736 shutdownLock.unlock();
737 }
738 }
739
740 /**
741 * Shut down this task scope without closing it. Shutting down a task scope prevents
742 * new threads from starting, interrupts all unfinished threads, and causes the
743 * {@link #join() join} method to wakeup. Shutdown is useful for cases where the
744 * results of unfinished subtasks are no longer needed. It will typically be called
745 * by the {@link #handleComplete(Subtask)} implementation of a subclass that
746 * implements a policy to discard unfinished tasks once some outcome is reached.
747 *
748 * <p> More specifically, this method:
749 * <ul>
750 * <li> {@linkplain Thread#interrupt() Interrupts} all unfinished threads in the
751 * task scope (except the current thread).
752 * <li> Wakes up the task scope owner if it is waiting in {@link #join()} or {@link
753 * #joinUntil(Instant)}. If the task scope owner is not waiting then its next call to
754 * {@code join} or {@code joinUntil} will return immediately.
755 * </ul>
756 *
757 * <p> The {@linkplain Subtask.State state} of unfinished subtasks that complete at
758 * around the time that the task scope is shutdown is not defined. A subtask that
759 * completes successfully with a result, or fails with an exception, at around
760 * the time that the task scope is shutdown may or may not <i>transition</i> to a
761 * terminal state.
762 *
763 * <p> This method may only be invoked by the task scope owner or threads contained
764 * in the task scope.
765 *
766 * @implSpec This method may be overridden for customization purposes. If overridden,
767 * the subclass must invoke {@code super.shutdown} to ensure that the method shuts
768 * down the task scope.
769 *
770 * @apiNote
771 * There may be threads that have not finished because they are executing code that
772 * did not respond (or respond promptly) to thread interrupt. This method does not wait
773 * for these threads. When the owner invokes the {@link #close() close} method
774 * to close the task scope then it will wait for the remaining threads to finish.
775 *
776 * @throws IllegalStateException if this task scope is closed
777 * @throws WrongThreadException if the current thread is not the task scope owner or
778 * a thread contained in the task scope
779 * @see #isShutdown()
780 */
781 public void shutdown() {
782 ensureOwnerOrContainsThread();
783 int s = ensureOpen(); // throws ISE if closed
784 if (s < SHUTDOWN && implShutdown())
785 flock.wakeup();
786 }
787
788 /**
789 * {@return true if this task scope is shutdown, otherwise false}
790 * @see #shutdown()
791 */
792 public final boolean isShutdown() {
793 return state >= SHUTDOWN;
794 }
795
796 /**
797 * Closes this task scope.
798 *
799 * <p> This method first shuts down the task scope (as if by invoking the {@link
800 * #shutdown() shutdown} method). It then waits for the threads executing any
801 * unfinished tasks to finish. If interrupted, this method will continue to wait for
802 * the threads to finish before completing with the interrupt status set.
803 *
804 * <p> This method may only be invoked by the task scope owner. If the task scope
805 * is already closed then the task scope owner invoking this method has no effect.
806 *
807 * <p> A {@code StructuredTaskScope} is intended to be used in a <em>structured
808 * manner</em>. If this method is called to close a task scope before nested task
809 * scopes are closed then it closes the underlying construct of each nested task scope
810 * (in the reverse order that they were created in), closes this task scope, and then
811 * throws {@link StructureViolationException}.
812 * Similarly, if this method is called to close a task scope while executing with
813 * {@linkplain ScopedValue scoped value} bindings, and the task scope was created
814 * before the scoped values were bound, then {@code StructureViolationException} is
815 * thrown after closing the task scope.
816 * If a thread terminates without first closing task scopes that it owns then
817 * termination will cause the underlying construct of each of its open tasks scopes to
818 * be closed. Closing is performed in the reverse order that the task scopes were
819 * created in. Thread termination may therefore be delayed when the task scope owner
820 * has to wait for threads forked in these task scopes to finish.
821 *
822 * @implSpec This method may be overridden for customization purposes. If overridden,
823 * the subclass must invoke {@code super.close} to close the task scope.
824 *
825 * @throws IllegalStateException thrown after closing the task scope if the task scope
826 * owner did not attempt to join after forking
827 * @throws WrongThreadException if the current thread is not the task scope owner
828 * @throws StructureViolationException if a structure violation was detected
829 */
830 @Override
831 public void close() {
832 ensureOwner();
833 int s = state;
834 if (s == CLOSED)
835 return;
836
837 try {
838 if (s < SHUTDOWN)
839 implShutdown();
840 flock.close();
841 } finally {
842 state = CLOSED;
843 }
844
845 // throw ISE if the owner didn't attempt to join after forking
846 if (forkRound > lastJoinAttempted) {
847 lastJoinCompleted = forkRound;
848 throw newIllegalStateExceptionNoJoin();
849 }
850 }
851
852 @Override
853 public String toString() {
854 String name = flock.name();
855 return switch (state) {
856 case OPEN -> name;
857 case SHUTDOWN -> name + "/shutdown";
858 case CLOSED -> name + "/closed";
859 default -> throw new InternalError();
860 };
861 }
862
863 /**
864 * Subtask implementation, runs the task specified to the fork method.
865 */
866 private static final class SubtaskImpl<T> implements Subtask<T>, Runnable {
867 private static final AltResult RESULT_NULL = new AltResult(Subtask.State.SUCCESS);
868
869 private record AltResult(Subtask.State state, Throwable exception) {
870 AltResult(Subtask.State state) {
871 this(state, null);
872 }
873 }
874
875 private final StructuredTaskScope<? super T> scope;
876 private final Callable<? extends T> task;
877 private final int round;
878 private volatile Object result;
879
880 SubtaskImpl(StructuredTaskScope<? super T> scope,
881 Callable<? extends T> task,
882 int round) {
883 this.scope = scope;
884 this.task = task;
885 this.round = round;
886 }
887
888 @Override
889 public void run() {
890 T result = null;
891 Throwable ex = null;
892 try {
893 result = task.call();
894 } catch (Throwable e) {
895 ex = e;
896 }
897
898 // nothing to do if task scope is shutdown
899 if (scope.isShutdown())
900 return;
901
902 // capture result or exception, invoke handleComplete
903 if (ex == null) {
904 this.result = (result != null) ? result : RESULT_NULL;
905 } else {
906 this.result = new AltResult(State.FAILED, ex);
907 }
908 scope.handleComplete(this);
909 }
910
911 @Override
912 public Callable<? extends T> task() {
913 return task;
914 }
915
916 @Override
917 public Subtask.State state() {
918 Object result = this.result;
919 if (result == null) {
920 return State.UNAVAILABLE;
921 } else if (result instanceof AltResult alt) {
922 // null or failed
923 return alt.state();
924 } else {
925 return State.SUCCESS;
926 }
927 }
928
929 @Override
930 public T get() {
931 scope.ensureJoinedIfOwner(round);
932 Object result = this.result;
933 if (result instanceof AltResult) {
934 if (result == RESULT_NULL) return null;
935 } else if (result != null) {
936 @SuppressWarnings("unchecked")
937 T r = (T) result;
938 return r;
939 }
940 throw new IllegalStateException(
941 "Result is unavailable or subtask did not complete successfully");
942 }
943
944 @Override
945 public Throwable exception() {
946 scope.ensureJoinedIfOwner(round);
947 Object result = this.result;
948 if (result instanceof AltResult alt && alt.state() == State.FAILED) {
949 return alt.exception();
950 }
951 throw new IllegalStateException(
952 "Exception is unavailable or subtask did not complete with exception");
953 }
954
955 @Override
956 public String toString() {
957 String stateAsString = switch (state()) {
958 case UNAVAILABLE -> "[Unavailable]";
959 case SUCCESS -> "[Completed successfully]";
960 case FAILED -> {
961 Throwable ex = ((AltResult) result).exception();
962 yield "[Failed: " + ex + "]";
963 }
964 };
965 return Objects.toIdentityString(this) + stateAsString;
966 }
967 }
968
969 /**
970 * A {@code StructuredTaskScope} that captures the result of the first subtask to
971 * complete {@linkplain Subtask.State#SUCCESS successfully}. Once captured, it
972 * {@linkplain #shutdown() shuts down} the task scope to interrupt unfinished threads
973 * and wakeup the task scope owner. The policy implemented by this class is intended
974 * for cases where the result of any subtask will do ("invoke any") and where the
975 * results of other unfinished subtasks are no longer needed.
976 *
977 * <p> Unless otherwise specified, passing a {@code null} argument to a method
978 * in this class will cause a {@link NullPointerException} to be thrown.
979 *
980 * @apiNote This class implements a policy to shut down the task scope when a subtask
981 * completes successfully. There shouldn't be any need to directly shut down the task
982 * scope with the {@link #shutdown() shutdown} method.
983 *
984 * @param <T> the result type
985 * @since 21
986 */
987 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
988 public static final class ShutdownOnSuccess<T> extends StructuredTaskScope<T> {
989 private static final Object RESULT_NULL = new Object();
990 private static final VarHandle FIRST_RESULT;
991 private static final VarHandle FIRST_EXCEPTION;
992 static {
993 try {
994 MethodHandles.Lookup l = MethodHandles.lookup();
995 FIRST_RESULT = l.findVarHandle(ShutdownOnSuccess.class, "firstResult", Object.class);
996 FIRST_EXCEPTION = l.findVarHandle(ShutdownOnSuccess.class, "firstException", Throwable.class);
997 } catch (Exception e) {
998 throw new ExceptionInInitializerError(e);
999 }
1000 }
1001 private volatile Object firstResult;
1002 private volatile Throwable firstException;
1003
1004 /**
1005 * Constructs a new {@code ShutdownOnSuccess} with the given name and thread factory.
1006 * The task scope is optionally named for the purposes of monitoring and management.
1007 * The thread factory is used to {@link ThreadFactory#newThread(Runnable) create}
1008 * threads when subtasks are {@linkplain #fork(Callable) forked}. The task scope
1009 * is owned by the current thread.
1010 *
1011 * <p> Construction captures the current thread's {@linkplain ScopedValue scoped
1012 * value} bindings for inheritance by threads started in the task scope. The
1013 * <a href="#TreeStructure">Tree Structure</a> section in the class description
1014 * details how parent-child relations are established implicitly for the purpose
1015 * of inheritance of scoped value bindings.
1016 *
1017 * @param name the name of the task scope, can be null
1018 * @param factory the thread factory
1019 */
1020 public ShutdownOnSuccess(String name, ThreadFactory factory) {
1021 super(name, factory);
1022 }
1023
1024 /**
1025 * Constructs a new unnamed {@code ShutdownOnSuccess} that creates virtual threads.
1026 *
1027 * @implSpec This constructor is equivalent to invoking the 2-arg constructor with
1028 * a name of {@code null} and a thread factory that creates virtual threads.
1029 */
1030 public ShutdownOnSuccess() {
1031 this(null, Thread.ofVirtual().factory());
1032 }
1033
1034 @Override
1035 protected void handleComplete(Subtask<? extends T> subtask) {
1036 if (firstResult != null) {
1037 // already captured a result
1038 return;
1039 }
1040
1041 if (subtask.state() == Subtask.State.SUCCESS) {
1042 // task succeeded
1043 T result = subtask.get();
1044 Object r = (result != null) ? result : RESULT_NULL;
1045 if (FIRST_RESULT.compareAndSet(this, null, r)) {
1046 super.shutdown();
1047 }
1048 } else if (firstException == null) {
1049 // capture the exception thrown by the first subtask that failed
1050 FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
1051 }
1052 }
1053
1054 /**
1055 * Wait for a subtask started in this task scope to complete {@linkplain
1056 * Subtask.State#SUCCESS successfully} or all subtasks to complete.
1057 *
1058 * <p> This method waits for all subtasks by waiting for all threads {@linkplain
1059 * #fork(Callable) started} in this task scope to finish execution. It stops waiting
1060 * when all threads finish, a subtask completes successfully, or the current
1061 * thread is {@linkplain Thread#interrupt() interrupted}. It also stops waiting
1062 * if the {@link #shutdown() shutdown} method is invoked directly to shut down
1063 * this task scope.
1064 *
1065 * <p> This method may only be invoked by the task scope owner.
1066 *
1067 * @throws IllegalStateException {@inheritDoc}
1068 * @throws WrongThreadException {@inheritDoc}
1069 */
1070 @Override
1071 public ShutdownOnSuccess<T> join() throws InterruptedException {
1072 super.join();
1073 return this;
1074 }
1075
1076 /**
1077 * Wait for a subtask started in this task scope to complete {@linkplain
1078 * Subtask.State#SUCCESS successfully} or all subtasks to complete, up to the
1079 * given deadline.
1080 *
1081 * <p> This method waits for all subtasks by waiting for all threads {@linkplain
1082 * #fork(Callable) started} in this task scope to finish execution. It stops waiting
1083 * when all threads finish, a subtask completes successfully, the deadline is
1084 * reached, or the current thread is {@linkplain Thread#interrupt() interrupted}.
1085 * It also stops waiting if the {@link #shutdown() shutdown} method is invoked
1086 * directly to shut down this task scope.
1087 *
1088 * <p> This method may only be invoked by the task scope owner.
1089 *
1090 * @throws IllegalStateException {@inheritDoc}
1091 * @throws WrongThreadException {@inheritDoc}
1092 */
1093 @Override
1094 public ShutdownOnSuccess<T> joinUntil(Instant deadline)
1095 throws InterruptedException, TimeoutException
1096 {
1097 super.joinUntil(deadline);
1098 return this;
1099 }
1100
1101 /**
1102 * {@return the result of the first subtask that completed {@linkplain
1103 * Subtask.State#SUCCESS successfully}}
1104 *
1105 * <p> When no subtask completed successfully, but a subtask {@linkplain
1106 * Subtask.State#FAILED failed} then {@code ExecutionException} is thrown with
1107 * the subtask's exception as the {@linkplain Throwable#getCause() cause}.
1108 *
1109 * @throws ExecutionException if no subtasks completed successfully but at least
1110 * one subtask failed
1111 * @throws IllegalStateException if no subtasks completed or the task scope owner
1112 * did not join after forking
1113 * @throws WrongThreadException if the current thread is not the task scope owner
1114 */
1115 public T result() throws ExecutionException {
1116 return result(ExecutionException::new);
1117 }
1118
1119 /**
1120 * Returns the result of the first subtask that completed {@linkplain
1121 * Subtask.State#SUCCESS successfully}, otherwise throws an exception produced
1122 * by the given exception supplying function.
1123 *
1124 * <p> When no subtask completed successfully, but a subtask {@linkplain
1125 * Subtask.State#FAILED failed}, then the exception supplying function is invoked
1126 * with subtask's exception.
1127 *
1128 * @param esf the exception supplying function
1129 * @param <X> type of the exception to be thrown
1130 * @return the result of the first subtask that completed with a result
1131 *
1132 * @throws X if no subtasks completed successfully but at least one subtask failed
1133 * @throws IllegalStateException if no subtasks completed or the task scope owner
1134 * did not join after forking
1135 * @throws WrongThreadException if the current thread is not the task scope owner
1136 */
1137 public <X extends Throwable> T result(Function<Throwable, ? extends X> esf) throws X {
1138 Objects.requireNonNull(esf);
1139 ensureOwnerAndJoined();
1140
1141 Object result = firstResult;
1142 if (result == RESULT_NULL) {
1143 return null;
1144 } else if (result != null) {
1145 @SuppressWarnings("unchecked")
1146 T r = (T) result;
1147 return r;
1148 }
1149
1150 Throwable exception = firstException;
1151 if (exception != null) {
1152 X ex = esf.apply(exception);
1153 Objects.requireNonNull(ex, "esf returned null");
1154 throw ex;
1155 }
1156
1157 throw new IllegalStateException("No completed subtasks");
1158 }
1159 }
1160
1161 /**
1162 * A {@code StructuredTaskScope} that captures the exception of the first subtask to
1163 * {@linkplain Subtask.State#FAILED fail}. Once captured, it {@linkplain #shutdown()
1164 * shuts down} the task scope to interrupt unfinished threads and wakeup the task
1165 * scope owner. The policy implemented by this class is intended for cases where the
1166 * results for all subtasks are required ("invoke all"); if any subtask fails then the
1167 * results of other unfinished subtasks are no longer needed.
1168 *
1169 * <p> Unless otherwise specified, passing a {@code null} argument to a method
1170 * in this class will cause a {@link NullPointerException} to be thrown.
1171 *
1172 * @apiNote This class implements a policy to shut down the task scope when a subtask
1173 * fails. There shouldn't be any need to directly shut down the task scope with the
1174 * {@link #shutdown() shutdown} method.
1175 *
1176 * @since 21
1177 */
1178 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
1179 public static final class ShutdownOnFailure extends StructuredTaskScope<Object> {
1180 private static final VarHandle FIRST_EXCEPTION;
1181 static {
1182 try {
1183 MethodHandles.Lookup l = MethodHandles.lookup();
1184 FIRST_EXCEPTION = l.findVarHandle(ShutdownOnFailure.class, "firstException", Throwable.class);
1185 } catch (Exception e) {
1186 throw new ExceptionInInitializerError(e);
1187 }
1188 }
1189 private volatile Throwable firstException;
1190
1191 /**
1192 * Constructs a new {@code ShutdownOnFailure} with the given name and thread factory.
1193 * The task scope is optionally named for the purposes of monitoring and management.
1194 * The thread factory is used to {@link ThreadFactory#newThread(Runnable) create}
1195 * threads when subtasks are {@linkplain #fork(Callable) forked}. The task scope
1196 * is owned by the current thread.
1197 *
1198 * <p> Construction captures the current thread's {@linkplain ScopedValue scoped
1199 * value} bindings for inheritance by threads started in the task scope. The
1200 * <a href="#TreeStructure">Tree Structure</a> section in the class description
1201 * details how parent-child relations are established implicitly for the purpose
1202 * of inheritance of scoped value bindings.
1203 *
1204 * @param name the name of the task scope, can be null
1205 * @param factory the thread factory
1206 */
1207 public ShutdownOnFailure(String name, ThreadFactory factory) {
1208 super(name, factory);
1209 }
1210
1211 /**
1212 * Constructs a new unnamed {@code ShutdownOnFailure} that creates virtual threads.
1213 *
1214 * @implSpec This constructor is equivalent to invoking the 2-arg constructor with
1215 * a name of {@code null} and a thread factory that creates virtual threads.
1216 */
1217 public ShutdownOnFailure() {
1218 this(null, Thread.ofVirtual().factory());
1219 }
1220
1221 @Override
1222 protected void handleComplete(Subtask<?> subtask) {
1223 if (subtask.state() == Subtask.State.FAILED
1224 && firstException == null
1225 && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception())) {
1226 super.shutdown();
1227 }
1228 }
1229
1230 /**
1231 * Wait for all subtasks started in this task scope to complete or for a subtask
1232 * to {@linkplain Subtask.State#FAILED fail}.
1233 *
1234 * <p> This method waits for all subtasks by waiting for all threads {@linkplain
1235 * #fork(Callable) started} in this task scope to finish execution. It stops waiting
1236 * when all threads finish, a subtask fails, or the current thread is {@linkplain
1237 * Thread#interrupt() interrupted}. It also stops waiting if the {@link #shutdown()
1238 * shutdown} method is invoked directly to shut down this task scope.
1239 *
1240 * <p> This method may only be invoked by the task scope owner.
1241 *
1242 * @throws IllegalStateException {@inheritDoc}
1243 * @throws WrongThreadException {@inheritDoc}
1244 */
1245 @Override
1246 public ShutdownOnFailure join() throws InterruptedException {
1247 super.join();
1248 return this;
1249 }
1250
1251 /**
1252 * Wait for all subtasks started in this task scope to complete or for a subtask
1253 * to {@linkplain Subtask.State#FAILED fail}, up to the given deadline.
1254 *
1255 * <p> This method waits for all subtasks by waiting for all threads {@linkplain
1256 * #fork(Callable) started} in this task scope to finish execution. It stops waiting
1257 * when all threads finish, a subtask fails, the deadline is reached, or the current
1258 * thread is {@linkplain Thread#interrupt() interrupted}. It also stops waiting
1259 * if the {@link #shutdown() shutdown} method is invoked directly to shut down
1260 * this task scope.
1261 *
1262 * <p> This method may only be invoked by the task scope owner.
1263 *
1264 * @throws IllegalStateException {@inheritDoc}
1265 * @throws WrongThreadException {@inheritDoc}
1266 */
1267 @Override
1268 public ShutdownOnFailure joinUntil(Instant deadline)
1269 throws InterruptedException, TimeoutException
1270 {
1271 super.joinUntil(deadline);
1272 return this;
1273 }
1274
1275 /**
1276 * Returns the exception of the first subtask that {@linkplain Subtask.State#FAILED
1277 * failed}. If no subtasks failed then an empty {@code Optional} is returned.
1278 *
1279 * @return the exception for the first subtask to fail or an empty optional if no
1280 * subtasks failed
1281 *
1282 * @throws WrongThreadException if the current thread is not the task scope owner
1283 * @throws IllegalStateException if the task scope owner did not join after forking
1284 */
1285 public Optional<Throwable> exception() {
1286 ensureOwnerAndJoined();
1287 return Optional.ofNullable(firstException);
1288 }
1289
1290 /**
1291 * Throws if a subtask {@linkplain Subtask.State#FAILED failed}.
1292 * If any subtask failed with an exception then {@code ExecutionException} is
1293 * thrown with the exception of the first subtask to fail as the {@linkplain
1294 * Throwable#getCause() cause}. This method does nothing if no subtasks failed.
1295 *
1296 * @throws ExecutionException if a subtask failed
1297 * @throws WrongThreadException if the current thread is not the task scope owner
1298 * @throws IllegalStateException if the task scope owner did not join after forking
1299 */
1300 public void throwIfFailed() throws ExecutionException {
1301 throwIfFailed(ExecutionException::new);
1302 }
1303
1304 /**
1305 * Throws the exception produced by the given exception supplying function if a
1306 * subtask {@linkplain Subtask.State#FAILED failed}. If any subtask failed with
1307 * an exception then the function is invoked with the exception of the first
1308 * subtask to fail. The exception returned by the function is thrown. This method
1309 * does nothing if no subtasks failed.
1310 *
1311 * @param esf the exception supplying function
1312 * @param <X> type of the exception to be thrown
1313 *
1314 * @throws X produced by the exception supplying function
1315 * @throws WrongThreadException if the current thread is not the task scope owner
1316 * @throws IllegalStateException if the task scope owner did not join after forking
1317 */
1318 public <X extends Throwable>
1319 void throwIfFailed(Function<Throwable, ? extends X> esf) throws X {
1320 ensureOwnerAndJoined();
1321 Objects.requireNonNull(esf);
1322 Throwable exception = firstException;
1323 if (exception != null) {
1324 X ex = esf.apply(exception);
1325 Objects.requireNonNull(ex, "esf returned null");
1326 throw ex;
1327 }
1328 }
1329 }
1330 }
|
1 /*
2 * Copyright (c) 2021, 2024, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package java.util.concurrent;
26
27 import java.lang.invoke.MethodHandles;
28 import java.lang.invoke.VarHandle;
29 import java.security.AccessController;
30 import java.security.PrivilegedAction;
31 import java.time.Duration;
32 import java.util.ArrayList;
33 import java.util.List;
34 import java.util.NoSuchElementException;
35 import java.util.Objects;
36 import java.util.function.Function;
37 import java.util.function.Predicate;
38 import java.util.function.Supplier;
39 import java.util.stream.Stream;
40 import jdk.internal.javac.PreviewFeature;
41 import jdk.internal.misc.InnocuousThread;
42 import jdk.internal.misc.ThreadFlock;
43
44 /**
45 * An API for <em>structured concurrency</em>. {@code StructuredTaskScope} supports cases
46 * where a main task splits into several concurrent subtasks, and where the subtasks must
47 * complete before the main task continues. A {@code StructuredTaskScope} can be used to
48 * ensure that the lifetime of a concurrent operation is confined by a <em>syntax block</em>,
49 * just like that of a sequential operation in structured programming.
50 *
51 * <p> {@code StructuredTaskScope} defines the static method {@link #open(Joiner) open}
52 * to open a new {@code StructuredTaskScope} and the {@link #close() close} method to close
53 * it. The API is designed to be used with the {@code try-with-resources} statement where
54 * the {@code StructuredTaskScope} is opened as a resource and then closed automatically.
55 * The code in the block uses the {@link #fork(Callable) fork} method to fork subtasks.
56 * After forking, it uses the {@link #join() join} method to wait for all subtasks to
57 * finish (or some other outcome) as a single operation. Forking a subtask starts a new
58 * {@link Thread} to run the subtask. The thread executing the main task does not continue
59 * beyond the {@code close} method until all threads started to execute subtasks have finished.
60 * To ensure correct usage, the {@code fork}, {@code join} and {@code close} methods may
61 * only be invoked by the <em>owner thread</em> (the thread that opened the {@code
62 * StructuredTaskScope}), and the {@code close} method throws an exception after closing
63 * if the owner did not invoke the {@code join} method.
64 *
65 * <p> A {@code StructuredTaskScope} is opened with a {@link Joiner} that handles subtask
66 * completion and produces the outcome (the result or an exception) for the {@link #join()
67 * join} method. The {@code Joiner} interface defines static methods to create a
68 * {@code Joiner} for common cases.
69 *
70 * <p> A {@code Joiner} may <a id="CancelExecution"><em>cancel execution</em></a>
71 * (sometimes called "short-circuiting") when some condition is reached that does not
72 * require the result of subtasks that are still executing. Cancelling execution prevents
73 * new threads from being started to execute further subtasks, {@linkplain Thread#interrupt()
74 * interrupts} the threads executing subtasks that have not completed, and causes the
75 * {@code join} method to wakeup with a result (or exception). The {@link #close() close}
76 * method always waits for threads executing subtasks to finish, even if execution is
77 * cancelled, so it cannot continue beyond the {@code close} method until the interrupted
78 * threads finish. Subtasks should be coded so that they finish as soon as possible when
79 * interrupted. Subtasks that block on methods that are not interruptible may delay the
80 * closing of a task scope.
81 *
82 * <p> Consider the example of a main task that splits into two subtasks to concurrently
83 * fetch resources from two URL locations "left" and "right". Both subtasks may complete
84 * successfully, one subtask may succeed and the other may fail, or both subtasks may
85 * fail. The main task in this example is interested in the successful result from both
86 * subtasks. It uses {@link Joiner#awaitAllSuccessfulOrThrow()
87 * Joiner.awaitAllSuccessfulOrThrow()} to create a {@code Joiner} that waits for both
88 * subtasks to complete successfully or for either subtask to fail.
89 * {@snippet lang=java :
90 * try (var scope = StructuredTaskScope.open(Joiner.awaitAllSuccessfulOrThrow())) {
91 *
92 * // @link substring="fork" target="#fork(Callable)" :
93 * Subtask<String> subtask1 = scope.fork(() -> query(left));
94 * Subtask<Integer> subtask2 = scope.fork(() -> query(right));
95 *
96 * // throws if either subtask fails
97 * scope.join(); // @link substring="join" target="#join()"
98 *
99 * // both subtasks completed successfully
100 * return new MyResult(subtask1.get(), subtask2.get()); // @link substring="get" target="Subtask#get()"
101 *
102 * }
103 * }
104 *
105 * <p> In this example, the main task forks the two subtasks. The {@code fork} method
106 * returns a {@link Subtask Subtask} that is a handle to the forked subtask. The main task
107 * waits in the {@code join} method for both subtasks to complete successfully or for either
108 * subtask to fail. If both subtasks complete successfully then the {@code join} method
109 * completes and the main task uses the {@link Subtask#get() Subtask.get()} method to get
110 * the result of each subtask. If either subtask fails then the {@code Joiner} causes the
111 * other subtask to be cancelled (this will interrupt the thread executing the subtask)
112 * and the {@code join} method throws {@link ExecutionException} with the exception from
113 * the failed subtask as the {@linkplain Throwable#getCause() cause}.
114 *
115 * <p> Now consider another example that also splits into two subtasks to concurrently
116 * fetch resources. In this example, the code in the main task is only interested in the
117 * result from the first subtask to complete successfully. The example uses {@link
118 * Joiner#anySuccessfulResultOrThrow() Joiner.anySuccessfulResultOrThrow()} to
119 * create a {@code Joiner} that makes available the result of the first subtask to
120 * complete successfully. The type parameter in the example is "{@code String}" so that
121 * only subtasks that return a {@code String} can be forked.
122 * {@snippet lang=java :
123 * // @link substring="open" target="#open(Policy)" :
124 * try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow())) {
125 *
126 * scope.fork(() -> query(left)); // @link substring="fork" target="#fork(Callable)"
127 * scope.fork(() -> query(right));
128 *
129 * // throws if both subtasks fail
130 * String firstResult = scope.join();
131 *
132 * // @link substring="close" target="#close()" :
133 * } // close
134 * }
135 *
136 * <p> In the example, the main task forks the two subtasks, then waits in the {@code
137 * join} method for either subtask to complete successfully or for both subtasks to fail.
138 * If one of the subtasks completes successfully then the {@code Joiner} causes the other
139 * subtask to be cancelled (this will interrupt the thread executing the subtask), and
140 * the {@code join} method returns the result from the first subtask. Cancelling the other
141 * subtask avoids the main task waiting for a result that it doesn't care about. If both
142 * subtasks fail then the {@code join} method throws {@link ExecutionException} with the
143 * exception from one of the subtasks as the {@linkplain Throwable#getCause() cause}.
144 *
145 * <p> Whether code uses the {@code Subtask} returned from {@code fork} will depend on
146 * the {@code Joiner} and usage. Some {@code Joiner} implementations are suited to subtasks
147 * that return results of the same type and where the {@code join} method returns a result
148 * for the main task to use. Code that forks subtasks that return results of different
149 * types, and uses a {@code Joiner} such as {@code Joiner.awaitAllSuccessfulOrThrow()} that
150 * does not return a result, will use {@link Subtask#get() Subtask.get()} after joining.
151 *
152 * <h2>Exception handling</h2>
153 *
154 * <p> A {@code StructuredTaskScope} is opened with a {@link Joiner Joiner} that
155 * handles subtask completion and produces the outcome for the {@link #join() join} method.
156 * In some cases, the outcome will be a result, in other cases it will be an exception.
157 * If the outcome is an exception then the {@code join} method throws {@link
158 * ExecutionException} with the exception as the {@linkplain Throwable#getCause()
159 * cause}. For many {@code Joiner} implementations, the exception will be an exception
160 * thrown by a subtask that failed. In the case of {@link Joiner#allSuccessfulOrThrow()
161 * allSuccessfulOrThrow} and {@link Joiner#awaitAllSuccessfulOrThrow() awaitAllSuccessfulOrThrow}
162 * for example, the exception is from the first subtask to fail.
163 *
164 * <p> Many of the details for how exceptions are handled will depend on usage. In some
165 * cases, the {@code join} method will be called in a {@code try-catch} block to catch
166 * {@code ExecutionException} and handle the cause. The exception handling may use
167 * {@code instanceof} with pattern matching to handle specific causes. In some cases it
168 * may not be useful to catch {@code ExecutionException} but instead leave it to propagate
169 * to the configured {@linkplain Thread.UncaughtExceptionHandler uncaught exception handler}
170 * for logging purposes.
171 *
172 * <p> For cases where a specific exception triggers the use of a default result then it
173 * may be more appropriate to handle this in the subtask itself rather than the subtask
174 * failing and code in the main task handling the exception.
175 *
176 * <h2>Configuration</h2>
177 *
178 * A {@code StructuredTaskScope} is opened with {@linkplain Config configuration} that
179 * consists of a {@link ThreadFactory} to create threads, an optional name for monitoring
180 * and management purposes, and an optional timeout.
181 *
182 * <p> The 1-arg {@link #open(Joiner) open} method creates a {@code StructuredTaskScope}
183 * with the <a id="DefaultConfiguration"> <em>default configuration</em></a>. The default
184 * configuration has a {@code ThreadFactory} that creates unnamed
185 * <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>,
186 * is unnamed for monitoring and management purposes, and has no timeout.
187 *
188 * <p> The 2-arg {@link #open(Joiner, Function) open} method can be used to create a
189 * {@code StructuredTaskScope} that uses a different {@code ThreadFactory}, has a name for
190 * the purposes of monitoring and management, or has a timeout that cancels execution if
191 * the timeout expires before or while waiting for subtasks to complete. The {@code open}
192 * method is called with a {@linkplain Function function} that is applied to the default
193 * configuration and returns a {@link Config Config} for the {@code StructuredTaskScope}
194 * under construction.
195 *
196 * <p> The following example opens a new {@code StructuredTaskScope} with a {@code
197 * ThreadFactory} that creates virtual threads {@linkplain Thread#setName(String) named}
198 * "duke-0", "duke-1" ...
199 * {@snippet lang = java:
200 * // @link substring="name" target="Thread.Builder#name(String, long)" :
201 * ThreadFactory factory = Thread.ofVirtual().name("duke-", 0).factory();
202 *
203 * // @link substring="withThreadFactory" target="Config#withThreadFactory(ThreadFactory)" :
204 * try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) {
205 *
206 * scope.fork( .. ); // runs in a virtual thread with name "duke-0"
207 * scope.fork( .. ); // runs in a virtual thread with name "duke-1"
208 *
209 * scope.join();
210 *
211 * }
212 *}
213 *
214 * <p> A second example sets a timeout, represented by a {@link Duration}. The timeout
215 * starts when the new task scope is opened. If the timeout expires before the {@code join}
216 * method has completed then <a href="#CancelExecution">execution is cancelled</a>. This
217 * interrupts the threads executing the two subtasks and causes the {@link #join() join}
218 * method to throw {@link ExecutionException} with {@link TimeoutException} as the cause.
219 * {@snippet lang=java :
220 * Duration timeout = Duration.ofSeconds(10);
221 *
222 * // @link substring="allSuccessfulOrThrow" target="Joiner#allSuccessfulOrThrow()" :
223 * try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
224 * // @link substring="withTimeout" target="Config#withTimeout(Duration)" :
225 * cf -> cf.withTimeout(timeout))) {
226 *
227 * scope.fork(() -> query(left));
228 * scope.fork(() -> query(right));
229 *
230 * List<String> result = scope.join()
231 * .map(Subtask::get)
232 * .toList();
233 *
234 * }
235 * }
236 *
237 * <h2>Inheritance of scoped value bindings</h2>
238 *
239 * {@link ScopedValue} supports the execution of a method with a {@code ScopedValue} bound
240 * to a value for the bounded period of execution of the method by the <em>current thread</em>.
241 * It allows a value to be safely and efficiently shared to methods without using method
242 * parameters.
243 *
244 * <p> When used in conjunction with a {@code StructuredTaskScope}, a {@code ScopedValue}
245 * can also safely and efficiently share a value to methods executed by subtasks forked
246 * in the task scope. When a {@code ScopedValue} object is bound to a value in the thread
247 * executing the main task then that binding is inherited by the threads created to
248 * execute the subtasks. The thread executing the main task does not continue beyond the
249 * {@link #close() close} method until all threads executing the subtasks have finished.
250 * This ensures that the {@code ScopedValue} is not reverted to being {@linkplain
251 * ScopedValue#isBound() unbound} (or its previous value) while subtasks are executing.
252 * In addition to providing a safe and efficient means to inherit a value into subtasks,
253 * the inheritance allows sequential code using {@code ScopedValue} be refactored to use
254 * structured concurrency.
255 *
256 * <p> To ensure correctness, opening a new {@code StructuredTaskScope} captures the
257 * current thread's scoped value bindings. These are the scoped values bindings that are
258 * inherited by the threads created to execute subtasks in the task scope. Forking a
259 * subtask checks that the bindings in effect at the time that the subtask is forked
260 * match the bindings when the {@code StructuredTaskScope} was created. This check ensures
261 * that a subtask does not inherit a binding that is reverted in the main task before the
262 * subtask has completed.
263 *
264 * <p> A {@code ScopedValue} that is shared across threads requires that the value be an
265 * immutable object or for all access to the value to be appropriately synchronized.
266 *
267 * <p> The following example demonstrates the inheritance of scoped value bindings. The
268 * scoped value USERNAME is bound to the value "duke" for the bounded period of a lambda
269 * expression by the thread executing it. The code in the block opens a {@code
270 * StructuredTaskScope} and forks two subtasks, it then waits in the {@code join} method
271 * and aggregates the results from both subtasks. If code executed by the threads
272 * running subtask1 and subtask2 uses {@link ScopedValue#get()}, to get the value of
273 * USERNAME, then value "duke" will be returned.
274 * {@snippet lang=java :
275 * // @link substring="newInstance" target="ScopedValue#newInstance()" :
276 * private static final ScopedValue<String> USERNAME = ScopedValue.newInstance();
277 *
278 * // @link substring="callWhere" target="ScopedValue#callWhere" :
279 * Result result = ScopedValue.callWhere(USERNAME, "duke", () -> {
280 *
281 * try (var scope = StructuredTaskScope.open(Joiner.awaitAllSuccessfulOrThrow())) {
282 *
283 * Subtask<String> subtask1 = scope.fork( .. ); // inherits binding
284 * Subtask<Integer> subtask2 = scope.fork( .. ); // inherits binding
285 *
286 * scope.join();
287 * return new MyResult(subtask1.get(), subtask2.get());
288 * }
289 *
290 * });
291 * }
292 *
293 * <p> A scoped value inherited into a subtask may be
294 * <a href="{@docRoot}/java.base/java/lang/ScopedValues.html#rebind">rebound</a> to a new
295 * value in the subtask for the bounded execution of some method executed in the subtask.
296 * When the method completes, the value of the {@code ScopedValue} reverts to its previous
297 * value, the value inherited from the thread executing the main task.
298 *
299 * <p> A subtask may execute code that itself opens a new {@code StructuredTaskScope}.
300 * A main task executing in thread T1 opens a {@code StructuredTaskScope} and forks a
301 * subtask that runs in thread T2. The scoped value bindings captured when T1 opens the
302 * task scope are inherited into T2. The subtask (in thread T2) executes code that opens a
303 * new {@code StructuredTaskScope} and forks a subtask that runs in thread T3. The scoped
304 * value bindings captured when T2 opens the task scope are inherited into T3. These
305 * include (or may be the same) as the bindings that were inherited from T1. In effect,
306 * scoped values are inherited into a tree of subtasks, not just one level of subtask.
307 *
308 * <h2>Memory consistency effects</h2>
309 *
310 * <p> Actions in the owner thread of a {@code StructuredTaskScope} prior to
311 * {@linkplain #fork forking} of a subtask
312 * <a href="{@docRoot}/java.base/java/util/concurrent/package-summary.html#MemoryVisibility">
313 * <i>happen-before</i></a> any actions taken by that subtask, which in turn
314 * <i>happen-before</i> the subtask result is {@linkplain Subtask#get() retrieved}.
315 *
316 * <h2>General exceptions</h2>
317 *
318 * <p> Unless otherwise specified, passing a {@code null} argument to a method in this
319 * class will cause a {@link NullPointerException} to be thrown.
320 *
321 * @param <T> the result type of tasks executed in the task scope
322 * @param <R> the type of the result returned by the join method
323 *
324 * @jls 17.4.5 Happens-before Order
325 * @since 21
326 */
327 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
328 public class StructuredTaskScope<T, R> implements AutoCloseable {
329 private static final VarHandle CANCELLED;
330 static {
331 try {
332 MethodHandles.Lookup l = MethodHandles.lookup();
333 CANCELLED = l.findVarHandle(StructuredTaskScope.class,"cancelled", boolean.class);
334 } catch (Exception e) {
335 throw new ExceptionInInitializerError(e);
336 }
337 }
338
339 private final Joiner<? super T, ? extends R> joiner;
340 private final ThreadFactory threadFactory;
341 private final ThreadFlock flock;
342
343 // fields that are only accessed by owner thread
344 private boolean needToJoin; // set by fork to indicate that owner must join
345 private boolean joined; // set to true when owner joins
346 private boolean closed;
347 private Future<?> timerTask;
348
349 // set or read by any thread
350 private volatile boolean cancelled;
351
352 // set by the timer thread, read by the owner thread
353 private volatile boolean timeoutExpired;
354
355 /**
356 * Throws IllegalStateException if the task scope is closed.
357 */
358 private void ensureOpen() {
359 assert Thread.currentThread() == flock.owner();
360 if (closed) {
361 throw new IllegalStateException("Task scope is closed");
362 }
363 }
364
365 /**
366 * Throws WrongThreadException if the current thread is not the owner thread.
367 */
368 private void ensureOwner() {
369 if (Thread.currentThread() != flock.owner()) {
370 throw new WrongThreadException("Current thread not owner");
371 }
372 }
373
374 /**
375 * Throws IllegalStateException if invoked by the owner thread and the owner thread
376 * has not joined.
377 */
378 private void ensureJoinedIfOwner() {
379 if (Thread.currentThread() == flock.owner() && !joined) {
380 String msg = needToJoin ? "Owner did not join" : "join did not complete";
381 throw new IllegalStateException(msg);
382 }
383 }
384
385 /**
386 * Interrupts all threads in this task scope, except the current thread.
387 */
388 private void implInterruptAll() {
389 flock.threads()
390 .filter(t -> t != Thread.currentThread())
391 .forEach(t -> {
392 try {
393 t.interrupt();
394 } catch (Throwable ignore) { }
395 });
396 }
397
398 @SuppressWarnings("removal")
399 private void interruptAll() {
400 if (System.getSecurityManager() == null) {
401 implInterruptAll();
402 } else {
403 PrivilegedAction<Void> pa = () -> {
404 implInterruptAll();
405 return null;
406 };
407 AccessController.doPrivileged(pa);
408 }
409 }
410
411 /**
412 * Cancel exception if not already cancelled.
413 */
414 private void cancelExecution() {
415 if (!cancelled && CANCELLED.compareAndSet(this, false, true)) {
416 // prevent new threads from starting
417 flock.shutdown();
418
419 // interrupt all unfinished threads
420 interruptAll();
421
422 // wakeup join
423 flock.wakeup();
424 }
425 }
426
427 /**
428 * Schedules a task to cancel execution on timeout.
429 */
430 private void scheduleTimeout(Duration timeout) {
431 assert Thread.currentThread() == flock.owner() && timerTask == null;
432 timerTask = TimerSupport.schedule(timeout, () -> {
433 if (!cancelled) {
434 timeoutExpired = true;
435 cancelExecution();
436 }
437 });
438 }
439
440 /**
441 * Cancels the timer task if set.
442 */
443 private void cancelTimeout() {
444 assert Thread.currentThread() == flock.owner();
445 if (timerTask != null) {
446 timerTask.cancel(false);
447 }
448 }
449
450 /**
451 * Invoked by the thread for a subtask when the subtask completes before execution
452 * was cancelled.
453 */
454 private void onComplete(SubtaskImpl<? extends T> subtask) {
455 assert subtask.state() != Subtask.State.UNAVAILABLE;
456 if (joiner.onComplete(subtask)) {
457 cancelExecution();
458 }
459 }
460
461 /**
462 * Initialize a new StructuredTaskScope.
463 */
464 @SuppressWarnings("this-escape")
465 private StructuredTaskScope(Joiner<? super T, ? extends R> joiner,
466 ThreadFactory threadFactory,
467 String name) {
468 this.joiner = joiner;
469 this.threadFactory = threadFactory;
470
471 if (name == null)
472 name = Objects.toIdentityString(this);
473 this.flock = ThreadFlock.open(name);
474 }
475
476 /**
477 * Represents a subtask forked with {@link #fork(Callable)} or {@link #fork(Runnable)}.
478 *
479 * <p> Code that forks subtasks can use the {@link #get() get()} method after {@linkplain
480 * #join() joining} to obtain the result of a subtask that completed successfully. It
481 * can use the {@link #exception()} method to obtain the exception thrown by a subtask
482 * that failed.
483 *
484 * @param <T> the result type
485 * @since 21
486 */
487 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
488 public sealed interface Subtask<T> extends Supplier<T> permits SubtaskImpl {
489 /**
490 * Represents the state of a subtask.
491 * @see Subtask#state()
492 * @since 21
493 */
494 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
495 enum State {
496 /**
497 * The subtask result or exception is not available. This state indicates that
498 * the subtask was forked but has not completed, it completed after execution
499 * was cancelled, or it was forked after execution was cancelled (in which
500 * case a thread was not created to execute the subtask).
501 */
502 UNAVAILABLE,
503 /**
504 * The subtask completed successfully. The {@link Subtask#get() Subtask.get()}
505 * method can be used to get the result. This is a terminal state.
506 */
507 SUCCESS,
508 /**
509 * The subtask failed with an exception. The {@link Subtask#exception()
510 * Subtask.exception()} method can be used to get the exception. This is a
511 * terminal state.
512 */
513 FAILED,
514 }
515
516 /**
517 * {@return the subtask state}
518 */
519 State state();
520
521 /**
522 * Returns the result of this subtask if it completed successfully. If
523 * {@linkplain #fork(Callable) forked} to execute a value-returning task then the
524 * result from the {@link Callable#call() call} method is returned. If
525 * {@linkplain #fork(Runnable) forked} to execute a task that does not return a
526 * result then {@code null} is returned.
527 *
528 * <p> Code executing in the scope owner thread can use this method to get the
529 * result of a successful subtask only after it has {@linkplain #join() joined}.
530 *
531 * <p> Code executing in the {@code Joiner} {@link Joiner#onComplete(Subtask)
532 * onComplete} method should test that the {@linkplain #state() subtask state} is
533 * {@link State#SUCCESS SUCCESS} before using this method to get the result.
534 *
535 * @return the possibly-null result
536 * @throws IllegalStateException if the subtask has not completed, did not complete
537 * successfully, or the current thread is the task scope owner and it has not joined
538 * @see State#SUCCESS
539 */
540 T get();
541
542 /**
543 * {@return the exception thrown by this subtask if it failed} If
544 * {@linkplain #fork(Callable) forked} to execute a value-returning task then
545 * the exception thrown by the {@link Callable#call() call} method is returned.
546 * If {@linkplain #fork(Runnable) forked} to execute a task that does not return
547 * a result then the exception thrown by the {@link Runnable#run() run} method is
548 * returned.
549 *
550 * <p> Code executing in the scope owner thread can use this method to get the
551 * exception thrown by a failed subtask only after it has {@linkplain #join() joined}.
552 *
553 * <p> Code executing in a {@code Joiner} {@link Joiner#onComplete(Subtask)
554 * onComplete} method should test that the {@linkplain #state() subtask state} is
555 * {@link State#FAILED FAILED} before using this method to get the exception.
556 *
557 * @throws IllegalStateException if the subtask has not completed, completed with
558 * a result, or the current thread is the task scope owner and it has not joined
559 * @see State#FAILED
560 */
561 Throwable exception();
562 }
563
564 /**
565 * An object used with a {@link StructuredTaskScope} to handle subtask completion
566 * and produce the result for a main task waiting in the {@link #join() join} method
567 * for subtasks to complete.
568 *
569 * <p> Joiner defines static methods to create {@code Joiner} objects for common cases:
570 * <ul>
571 * <li> {@link #allSuccessfulOrThrow() allSuccessfulOrThrow()} creates a {@code Joiner}
572 * that yields a stream of the completed subtasks for {@code join} to return when
573 * all subtasks complete successfully. It cancels execution and causes {@code join}
574 * to throw if any subtask fails.
575 * <li> {@link #anySuccessfulResultOrThrow() anySuccessfulResultOrThrow()} creates a
576 * {@code Joiner} that yields the result of the first subtask to succeed. It cancels
577 * execution and causes {@code join} to throw if all subtasks fail.
578 * <li> {@link #awaitAllSuccessfulOrThrow() awaitAllSuccessfulOrThrow()} creates a
579 * {@code Joiner} that waits for all successful subtasks. It cancels execution and
580 * causes {@code join} to throw if any subtask fails.
581 * <li> {@link #awaitAll() awaitAll()} creates a {@code Joiner} that waits for all
582 * subtasks. If does not cancel execution.
583 * </ul>
584 *
585 * <p> In addition to the methods to create {@code Joiner} objects for common cases,
586 * the {@link #all(Predicate) all(Predicate)} method is defined to create a {@code
587 * Joiner} that yields a stream of all subtasks. It is created with a {@link
588 * Predicate Predicate} that determines if execution should continue or be cancelled.
589 * This {@code Joiner} can be built upon to create custom policies that cancel
590 * execution based on some condition.
591 *
592 * <p> More advanced policies can be developed by implementing the {@code Joiner}
593 * interface. The {@link #onFork(Subtask)} method is invoked when subtasks are forked.
594 * The {@link #onComplete(Subtask)} method is invoked when subtasks complete with a
595 * result or exception. These methods return a {@code boolean} to indicate if execution
596 * should be cancelled. These methods can be used to collect subtasks, results, or
597 * exceptions, and control when to cancel execution. The {@link #result()} method
598 * must be implemented to produce the result (or exception) for the {@code join}
599 * method.
600 *
601 * <p> Unless otherwise specified, passing a {@code null} argument to a method
602 * in this class will cause a {@link NullPointerException} to be thrown.
603 *
604 * @implSpec Implementations of this interface must be thread safe. The {@link
605 * #onComplete(Subtask)} method defined by this interface may be invoked by several
606 * threads concurrently.
607 *
608 * @apiNote It is very important that a new {@code Joiner} object is created for each
609 * {@code StructuredTaskScope}. {@code Joiner} objects should never be shared with
610 * different task scopes or re-used after a task is closed.
611 *
612 * <p> Designing a {@code Joiner} should take into account the code at the use-site
613 * where the results from the {@link StructuredTaskScope#join() join} method are
614 * processed. It should be clear what the {@code Joiner} does vs. the application
615 * code at the use-site. In general, the {@code Joiner} implementation is not the
616 * place to code "business logic". A {@code Joiner} should be designed to be as
617 * general purpose as possible.
618 *
619 * @param <T> the result type of tasks executed in the task scope
620 * @param <R> the type of results returned by the join method
621 * @since 24
622 * @see #open(Joiner)
623 */
624 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
625 @FunctionalInterface
626 public interface Joiner<T, R> {
627
628 /**
629 * Invoked by {@link #fork(Callable) fork(Callable)} and {@link #fork(Runnable)
630 * fork(Runnable)} when forking a subtask. The method is invoked from the task
631 * owner thread. The method is invoked before a thread is created to run the
632 * subtask.
633 *
634 * @implSpec The default implementation throws {@code NullPointerException} if the
635 * subtask is {@code null}. It throws {@code IllegalArgumentException} if the
636 * subtask is not in the {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state, it
637 * otherwise returns {@code false}.
638 *
639 * @apiNote This method is invoked by the {@code fork} methods. It should not be
640 * invoked directly.
641 *
642 * @param subtask the subtask
643 * @return {@code true} to cancel execution
644 */
645 default boolean onFork(Subtask<? extends T> subtask) {
646 if (subtask.state() != Subtask.State.UNAVAILABLE) {
647 throw new IllegalArgumentException();
648 }
649 return false;
650 }
651
652 /**
653 * Invoked by the thread started to execute a subtask after the subtask completes
654 * successfully or fails with an exception. This method is not invoked if a
655 * subtask completes after execution has been cancelled.
656 *
657 * @implSpec The default implementation throws {@code NullPointerException} if the
658 * subtask is {@code null}. It throws {@code IllegalArgumentException} if the
659 * subtask is not in the {@link Subtask.State#SUCCESS SUCCESS} or {@link
660 * Subtask.State#FAILED FAILED} state, it otherwise returns {@code false}.
661 *
662 * @apiNote This method is invoked by subtasks when they complete. It should not
663 * be invoked directly.
664 *
665 * @param subtask the subtask
666 * @return {@code true} to cancel execution
667 */
668 default boolean onComplete(Subtask<? extends T> subtask) {
669 if (subtask.state() == Subtask.State.UNAVAILABLE) {
670 throw new IllegalArgumentException();
671 }
672 return false;
673 }
674
675 /**
676 * Invoked by {@link #join()} to produce the result (or exception) after waiting
677 * for all subtasks to complete or execution to be cancelled. The result from this
678 * method is returned by the {@code join} method. If this method throws, then
679 * {@code join} throws {@link ExecutionException} with the exception thrown by
680 * this method as the cause.
681 *
682 * <p> In normal usage, this method will be called at most once to produce the
683 * result (or exception). If the {@code join} method is called more than once
684 * then this method may be called more than once to produce the result. An
685 * implementation should return an equal result (or throw the same exception) on
686 * second or subsequent calls to produce the outcome.
687 *
688 * @apiNote This method is invoked by the {@code join} method. It should not be
689 * invoked directly.
690 *
691 * @return the result
692 * @throws Throwable the exception
693 */
694 R result() throws Throwable;
695
696 /**
697 * {@return a new Joiner object that yields a stream of all subtasks when all
698 * subtasks complete successfully, or throws if any subtask fails}
699 * If any subtask fails then execution is cancelled.
700 *
701 * <p> If all subtasks complete successfully, the joiner's {@link Joiner#result()}
702 * method returns a stream of all subtasks in the order that they were forked.
703 * If any subtask failed then the {@code result} method throws the exception from
704 * the first subtask to fail.
705 *
706 * @apiNote This joiner is intended for cases where the results for all subtasks
707 * are required ("invoke all"); if any subtask fails then the results of other
708 * unfinished subtasks are no longer needed. A typical usage will be when the
709 * subtasks return results of the same type, the returned stream of forked
710 * subtasks can be used to get the results.
711 *
712 * @param <T> the result type of subtasks
713 */
714 static <T> Joiner<T, Stream<Subtask<T>>> allSuccessfulOrThrow() {
715 return new AllSuccessful<>();
716 }
717
718 /**
719 * {@return a new Joiner object that yields the result of a subtask that completed
720 * successfully, or throws if all subtasks fail} If any subtask completes
721 * successfully then execution is cancelled.
722 *
723 * <p> The joiner's {@link Joiner#result()} method returns the result of a subtask
724 * that completed successfully. If all subtasks fail then the {@code result} method
725 * throws the exception from one of the failed subtasks. The {@code result} method
726 * throws {@code NoSuchElementException} if no subtasks were forked.
727 *
728 * @apiNote This joiner is intended for cases where the result of any subtask will
729 * do ("invoke any") and where the results of other unfinished subtasks are no
730 * longer needed.
731 *
732 * @param <T> the result type of subtasks
733 */
734 static <T> Joiner<T, T> anySuccessfulResultOrThrow() {
735 return new AnySuccessful<>();
736 }
737
738 /**
739 * {@return a new Joiner object that waits for all successful subtasks. It
740 * <a href="StructuredTaskScope.html#CancelExecution">cancels execution</a> if
741 * any subtask fails}
742 *
743 * <p> The joiner's {@link Joiner#result() result} method returns {@code null}
744 * if all subtasks complete successfully, or throws the exception from the first
745 * subtask to fail.
746 *
747 * @apiNote This joiner is intended for cases where the results for all subtasks
748 * are required ("invoke all"), and where the code {@linkplain #fork(Callable)
749 * forking} subtasks keeps a reference to the {@linkplain Subtask Subtask} objects.
750 * A typical usage will be when subtasks return results of different types.
751 *
752 * @param <T> the result type of subtasks
753 */
754 static <T> Joiner<T, Void> awaitAllSuccessfulOrThrow() {
755 return new AwaitSuccessful<>();
756 }
757
758 /**
759 * {@return a new Joiner object that waits for all subtasks}
760 *
761 * <p> The joiner's {@link Joiner#result() result} method returns {@code null}.
762 *
763 * @apiNote This joiner is intended for cases where subtasks make use of
764 * <em>side-effects</em> rather than return results or fail with exceptions.
765 * The {@link #fork(Runnable) fork(Runnable)} method can be used to fork subtasks
766 * that do not return a result.
767 *
768 * @param <T> the result type of subtasks
769 */
770 static <T> Joiner<T, Void> awaitAll() {
771 // ensure that new Joiner object is returned
772 return new Joiner<T, Void>() {
773 @Override
774 public Void result() {
775 return null;
776 }
777 };
778 }
779
780 /**
781 * {@return a new Joiner object that yields a stream of all subtasks, cancelling
782 * execution when evaluating a completed subtask with the given predicate returns
783 * {@code true}}
784 *
785 * <p> The joiner's {@link Joiner#onComplete(Subtask)} method invokes the
786 * predicate's {@link Predicate#test(Object) test} method with the subtask that
787 * completed successfully or failed with an exception. If the {@code test} method
788 * returns {@code true} then <a href="StructuredTaskScope.html#CancelExecution">
789 * execution is cancelled</a>. The {@code test} method must be thread safe as it
790 * may be invoked concurrently from several threads.
791 *
792 * <p> The joiner's {@link #result()} method returns the stream of all subtasks,
793 * in fork order. The stream may contain subtasks that have completed
794 * (in {@link Subtask.State#SUCCESS SUCCESS} or {@link Subtask.State#FAILED FAILED}
795 * state) or subtasks in the {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state
796 * if execution was cancelled before all subtasks were forked or completed.
797 *
798 * <p> The following example uses this method to create a {@code Joiner} that
799 * <a href="StructuredTaskScope.html#CancelExecution">cancels execution</a> when
800 * two or more subtasks fail.
801 * {@snippet lang=java :
802 * class CancelAfterTwoFailures<T> implements Predicate<Subtask<? extends T>> {
803 * private final AtomicInteger failedCount = new AtomicInteger();
804 * @Override
805 * public boolean test(Subtask<? extends T> subtask) {
806 * return subtask.state() == Subtask.State.FAILED
807 * && failedCount.incrementAndGet() >= 2;
808 * }
809 * }
810 *
811 * var joinPolicy = Joiner.all(new CancelAfterTwoFailures<String>());
812 * }
813 *
814 * @param isDone the predicate to evaluate completed subtasks
815 * @param <T> the result type of subtasks
816 */
817 static <T> Joiner<T, Stream<Subtask<T>>> all(Predicate<Subtask<? extends T>> isDone) {
818 return new AllSubtasks<>(isDone);
819 }
820 }
821
822 /**
823 * Represents the configuration for a {@code StructuredTaskScope}.
824 *
825 * <p> The configuration for a {@code StructuredTaskScope} consists of a {@link
826 * ThreadFactory} to create threads, an optional name for the purposes of monitoring
827 * and management, and an optional timeout.
828 *
829 * <p> Creating a {@code StructuredTaskScope} with its 1-arg {@link #open(Joiner) open}
830 * method uses the <a href="StructuredTaskScope.html#DefaultConfiguration">default
831 * configuration</a>. The default configuration consists of a thread factory that
832 * creates unnamed <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">
833 * virtual threads</a>, no name for monitoring and management purposes, and no timeout.
834 *
835 * <p> Creating a {@code StructuredTaskScope} with its 2-arg {@link #open(Joiner, Function)
836 * open} method allows a different configuration to be used. The function specified
837 * to the {@code open} method is applied to the default configuration and returns the
838 * configuration for the {@code StructuredTaskScope} under construction. The function
839 * can use the {@code with-} prefixed methods defined here to specify the components
840 * of the configuration to use.
841 *
842 * <p> Unless otherwise specified, passing a {@code null} argument to a method
843 * in this class will cause a {@link NullPointerException} to be thrown.
844 *
845 * @since 24
846 */
847 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
848 public sealed interface Config permits ConfigImpl {
849 /**
850 * {@return a new {@code Config} object with the given thread factory}
851 * The other components are the same as this object. The thread factory is used by
852 * a task scope to create threads when {@linkplain #fork(Callable) forking} subtasks.
853 * @param threadFactory the thread factory
854 *
855 * @apiNote The thread factory will typically create
856 * <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>,
857 * maybe with names for monitoring purposes, an {@linkplain Thread.UncaughtExceptionHandler
858 * uncaught exception handler}, or other properties configured.
859 *
860 * @see #fork(Callable)
861 */
862 Config withThreadFactory(ThreadFactory threadFactory);
863
864 /**
865 * {@return a new {@code Config} object with the given name}
866 * The other components are the same as this object. A task scope is optionally
867 * named for the purposes of monitoring and management.
868 * @param name the name
869 * @see StructuredTaskScope#toString()
870 */
871 Config withName(String name);
872
873 /**
874 * {@return a new {@code Config} object with the given timeout}
875 * The other components are the same as this object.
876 * @param timeout the timeout
877 *
878 * @apiNote Applications using deadlines, expressed as an {@link java.time.Instant},
879 * can use {@link Duration#between Duration.between(Instant.now(), deadline)} to
880 * compute the timeout for this method.
881 *
882 * @see #join()
883 */
884 Config withTimeout(Duration timeout);
885 }
886
887 /**
888 * Opens a new structured task scope to use the given {@code Joiner} object and with
889 * configuration that is the result of applying the given function to the
890 * <a href="#DefaultConfiguration">default configuration</a>.
891 *
892 * <p> The {@code configFunction} is called with the default configuration and returns
893 * the configuration for the new structured task scope. The function may, for example,
894 * set the {@linkplain Config#withThreadFactory(ThreadFactory) ThreadFactory} or set
895 * a {@linkplain Config#withTimeout(Duration) timeout}.
896 *
897 * <p> If a {@linkplain Config#withThreadFactory(ThreadFactory) ThreadFactory} is set
898 * then the {@code ThreadFactory}'s {@link ThreadFactory#newThread(Runnable) newThread}
899 * method will be used to create threads when forking subtasks in this task scope.
900 *
901 * <p> If a {@linkplain Config#withTimeout(Duration) timeout} is set then it starts
902 * when the task scope is opened. If the timeout expires before the task scope has
903 * {@linkplain #join() joined} then execution is cancelled and the {@code join} method
904 * throws {@link ExecutionException} with {@link TimeoutException} as the cause.
905 *
906 * <p> The new task scope is owned by the current thread. Only code executing in this
907 * thread can {@linkplain #fork(Callable) fork}, {@linkplain #join() join}, or
908 * {@linkplain #close close} the task scope.
909 *
910 * <p> Construction captures the current thread's {@linkplain ScopedValue scoped
911 * value} bindings for inheritance by threads started in the task scope.
912 *
913 * @param joiner the joiner
914 * @param configFunction a function to produce the configuration
915 * @return a new task scope
916 * @param <T> the result type of tasks executed in the task scope
917 * @param <R> the type of the result returned by the join method
918 * @since 24
919 */
920 public static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner,
921 Function<Config, Config> configFunction) {
922 Objects.requireNonNull(joiner);
923
924 var config = (ConfigImpl) configFunction.apply(ConfigImpl.defaultConfig());
925 var scope = new StructuredTaskScope<T, R>(joiner, config.threadFactory(), config.name());
926
927 // schedule timeout
928 Duration timeout = config.timeout();
929 if (timeout != null) {
930 boolean done = false;
931 try {
932 scope.scheduleTimeout(timeout);
933 done = true;
934 } finally {
935 if (!done) {
936 scope.close(); // pop if scheduling timeout failed
937 }
938 }
939 }
940
941 return scope;
942 }
943
944 /**
945 * Opens a new structured task scope to use the given {@code Joiner} object. The
946 * task scope is created with the <a href="#DefaultConfiguration">default configuration</a>.
947 * The default configuration has a {@code ThreadFactory} that creates unnamed
948 * <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>,
949 * is unnamed for monitoring and management purposes, and has no timeout.
950 *
951 * @implSpec
952 * This factory method is equivalent to invoking the 2-arg open method with the given
953 * joiner and the {@linkplain Function#identity() identity function}.
954 *
955 * @param joiner the joiner
956 * @return a new task scope
957 * @param <T> the result type of tasks executed in the task scope
958 * @param <R> the type of the result returned by the join method
959 * @since 24
960 */
961 public static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner) {
962 return open(joiner, Function.identity());
963 }
964
965 /**
966 * Starts a new thread in this task scope to execute a value-returning task, thus
967 * creating a <em>subtask</em>. The value-returning task is provided to this method
968 * as a {@link Callable}, the thread executes the task's {@link Callable#call() call}
969 * method.
970 *
971 * <p> This method first creates a {@link Subtask Subtask} to represent the <em>forked
972 * subtask</em>. It invokes the joiner's {@link Joiner#onFork(Subtask) onFork} method
973 * with the {@code Subtask} object. If the {@code onFork} completes with an exception
974 * or error then it is propagated by the {@code fork} method. If execution is
975 * {@linkplain #isCancelled() cancelled}, or {@code onFork} returns {@code true} to
976 * cancel execution, then this method returns the {@code Subtask} (in the {@link
977 * Subtask.State#UNAVAILABLE UNAVAILABLE} state) without creating a thread to execute
978 * the subtask. If execution is not cancelled then a thread is created with the
979 * {@link ThreadFactory} configured when the task scope was created, and the thread is
980 * started. Forking a subtask inherits the current thread's {@linkplain ScopedValue
981 * scoped value} bindings. The bindings must match the bindings captured when the
982 * task scope was opened. If the subtask completes (successfully or with an exception)
983 * before execution is cancelled, then the thread invokes the joiner's
984 * {@link Joiner#onComplete(Subtask) onComplete} method with subtask in the
985 * {@link Subtask.State#SUCCESS SUCCESS} or {@link Subtask.State#FAILED FAILED} state.
986 *
987 * <p> This method returns the {@link Subtask Subtask} object. In some usages, this
988 * object may be used to get its result. In other cases it may be used for correlation
989 * or just discarded. To ensure correct usage, the {@link Subtask#get() Subtask.get()}
990 * method may only be called by the task scope owner to get the result after it has
991 * waited for subtasks to complete with the {@link #join() join} method and the subtask
992 * completed successfully. Similarly, the {@link Subtask#exception() Subtask.exception()}
993 * method may only be called by the task scope owner after it has joined and the subtask
994 * failed. If execution was cancelled before the subtask was forked, or before it
995 * completes, then neither method can be used to obtain the outcome.
996 *
997 * <p> This method may only be invoked by the task scope owner.
998 *
999 * @param task the value-returning task for the thread to execute
1000 * @param <U> the result type
1001 * @return the subtask
1002 * @throws IllegalStateException if this task scope is closed or the owner has already
1003 * joined
1004 * @throws WrongThreadException if the current thread is not the task scope owner
1005 * @throws StructureViolationException if the current scoped value bindings are not
1006 * the same as when the task scope was created
1007 * @throws RejectedExecutionException if the thread factory rejected creating a
1008 * thread to run the subtask
1009 */
1010 public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
1011 Objects.requireNonNull(task);
1012 ensureOwner();
1013 ensureOpen();
1014 if (joined) {
1015 throw new IllegalStateException("Already joined");
1016 }
1017
1018 var subtask = new SubtaskImpl<U>(this, task);
1019
1020 // notify joiner, even if cancelled
1021 if (joiner.onFork(subtask)) {
1022 cancelExecution();
1023 }
1024
1025 if (!cancelled) {
1026 // create thread to run task
1027 Thread thread = threadFactory.newThread(subtask);
1028 if (thread == null) {
1029 throw new RejectedExecutionException("Rejected by thread factory");
1030 }
1031
1032 // attempt to start the thread
1033 try {
1034 flock.start(thread);
1035 } catch (IllegalStateException e) {
1036 // shutdown by another thread, or underlying flock is shutdown due
1037 // to unstructured use
1038 }
1039 }
1040
1041 needToJoin = true;
1042 return subtask;
1043 }
1044
1045 /**
1046 * Starts a new thread in this task scope to execute a task that does not return a
1047 * result, creating a <em>subtask</em>.
1048 *
1049 * <p> This method works exactly the same as {@link #fork(Callable)} except that
1050 * the task is provided to this method as a {@link Runnable}, the thread executes
1051 * the task's {@link Runnable#run() run} method, and its result is {@code null}.
1052 *
1053 * @param task the task for the thread to execute
1054 * @return the subtask
1055 * @throws IllegalStateException if this task scope is closed or the owner has already
1056 * joined
1057 * @throws WrongThreadException if the current thread is not the task scope owner
1058 * @throws StructureViolationException if the current scoped value bindings are not
1059 * the same as when the task scope was created
1060 * @throws RejectedExecutionException if the thread factory rejected creating a
1061 * thread to run the subtask
1062 * @since 24
1063 */
1064 public Subtask<? extends T> fork(Runnable task) {
1065 Objects.requireNonNull(task);
1066 return fork(() -> { task.run(); return null; });
1067 }
1068
1069 /**
1070 * Waits for all subtasks started in this task scope to complete or execution to be
1071 * cancelled. If a {@linkplain Config#withTimeout(Duration) timeout} has been set
1072 * then execution will be cancelled if the timeout expires before or while waiting.
1073 * Once finished waiting, the {@code Joiner}'s {@link Joiner#result() result}
1074 * method is invoked to get the result or throw an exception. If the {@code result}
1075 * method throws then this method throws {@code ExecutionException} with the
1076 * exception thrown by the {@code result()} method as the cause.
1077 *
1078 * <p> This method waits for all subtasks by waiting for all threads {@linkplain
1079 * #fork(Callable) started} in this task scope to finish execution. It stops waiting
1080 * when all threads finish, the {@code Joiner}'s {@link Joiner#onFork(Subtask)
1081 * onFork} or {@link Joiner#onComplete(Subtask) onComplete} returns {@code true}
1082 * to cancel execution, the timeout (if set) expires, or the current thread is
1083 * {@linkplain Thread#interrupt() interrupted}.
1084 *
1085 * <p> This method may only be invoked by the task scope owner.
1086 *
1087 * @return the {@link Joiner#result() result}
1088 * @throws IllegalStateException if this task scope is closed
1089 * @throws WrongThreadException if the current thread is not the task scope owner
1090 * @throws ExecutionException if the joiner's {@code result} method throws, or with
1091 * cause {@link TimeoutException} if a timeout is set and the timeout expires
1092 * @throws InterruptedException if interrupted while waiting
1093 * @since 24
1094 */
1095 public R join() throws ExecutionException, InterruptedException {
1096 ensureOwner();
1097 ensureOpen();
1098
1099 if (!joined) {
1100 // owner has attempted to join
1101 needToJoin = false;
1102
1103 // wait for all threads, execution to be cancelled, or interrupt
1104 flock.awaitAll();
1105
1106 // throw if timeout expired while waiting
1107 if (timeoutExpired) {
1108 throw new ExecutionException(new TimeoutException());
1109 }
1110
1111 // join completed successfully
1112 cancelTimeout();
1113 joined = true;
1114 }
1115
1116 // invoke joiner to get result
1117 try {
1118 return joiner.result();
1119 } catch (Throwable e) {
1120 throw new ExecutionException(e);
1121 }
1122 }
1123
1124 /**
1125 * Cancels execution. This method allows the task scope owner to explicitly
1126 * <a href="#CancelExecution">cancel execution</a>. If not already cancelled, this
1127 * method {@linkplain Thread#interrupt() interrupts} the threads executing subtasks
1128 * that have not completed, and prevents new threads from being started in the task
1129 * scope.
1130 *
1131 * @apiNote This method is intended for cases where a task scope is created with a
1132 * {@link Joiner Joiner} that doesn't cancel execution or where the code in the main
1133 * task needs to cancel execution due to some exception or other condition in the main
1134 * task. The following example accepts network connections indefinitely, forking a
1135 * subtask to handle each connection. If the {@code accept()} method in the example
1136 * throws then the main task cancels execution before joining and closing the scope.
1137 * The {@link #close() close} method waits for the interrupted threads to finish.
1138 *
1139 * {@snippet lang=java :
1140 * // @link substring="awaitAll" target="Joiner#awaitAll()" :
1141 * try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
1142 *
1143 * try {
1144 * while (true) {
1145 * Socket peer = listener.accept();
1146 * // @link substring="fork" target="#fork(Runnable)" :
1147 * scope.fork(() -> handle(peer));
1148 * }
1149 * } finally {
1150 * scope.cancel();
1151 * scope.join(); // completes immediately
1152 * }
1153 *
1154 * }
1155 * }
1156 *
1157 * @throws IllegalStateException if this task scope is closed
1158 * @throws WrongThreadException if the current thread is not the task scope owner
1159 * @since 24
1160 */
1161 public void cancel() {
1162 ensureOwner();
1163 ensureOpen();
1164 cancelExecution();
1165 }
1166
1167 /**
1168 * {@return {@code true} if <a href="#CancelExecution">execution is cancelled</a>,
1169 * or in the process of being cancelled, otherwise {@code false}}
1170 *
1171 * <p> Cancelling execution prevents new threads from starting in the task scope and
1172 * {@linkplain Thread#interrupt() interrupts} threads executing unfinished subtasks.
1173 * It may take some time before the interrupted threads finish execution; this
1174 * method may return {@code true} before all threads have been interrupted or before
1175 * all threads have finished.
1176 *
1177 * @apiNote A main task with a lengthy "forking phase" (the code that executes before
1178 * the main task invokes {@link #join() join}) may use this method to avoid doing work
1179 * in cases where execution was cancelled by the completion of a previously forked
1180 * subtask or timeout.
1181 *
1182 * @since 24
1183 */
1184 public boolean isCancelled() {
1185 return cancelled;
1186 }
1187
1188 /**
1189 * Closes this task scope.
1190 *
1191 * <p> This method first <a href="#CancelExecution">cancels execution</a>, if not
1192 * already cancelled. This interrupts the threads executing unfinished subtasks. This
1193 * method then waits for all threads to finish. If interrupted while waiting then it
1194 * will continue to wait until the threads finish, before completing with the interrupt
1195 * status set.
1196 *
1197 * <p> This method may only be invoked by the task scope owner. If the task scope
1198 * is already closed then the task scope owner invoking this method has no effect.
1199 *
1200 * <p> A {@code StructuredTaskScope} is intended to be used in a <em>structured
1201 * manner</em>. If this method is called to close a task scope before nested task
1202 * scopes are closed then it closes the underlying construct of each nested task scope
1203 * (in the reverse order that they were created in), closes this task scope, and then
1204 * throws {@link StructureViolationException}.
1205 * Similarly, if this method is called to close a task scope while executing with
1206 * {@linkplain ScopedValue scoped value} bindings, and the task scope was created
1207 * before the scoped values were bound, then {@code StructureViolationException} is
1208 * thrown after closing the task scope.
1209 * If a thread terminates without first closing task scopes that it owns then
1210 * termination will cause the underlying construct of each of its open tasks scopes to
1211 * be closed. Closing is performed in the reverse order that the task scopes were
1212 * created in. Thread termination may therefore be delayed when the task scope owner
1213 * has to wait for threads forked in these task scopes to finish.
1214 *
1215 * @throws IllegalStateException thrown after closing the task scope if the task scope
1216 * owner did not attempt to join after forking
1217 * @throws WrongThreadException if the current thread is not the task scope owner
1218 * @throws StructureViolationException if a structure violation was detected
1219 */
1220 @Override
1221 public void close() {
1222 ensureOwner();
1223 if (closed) {
1224 return;
1225 }
1226
1227 // cancel execution if not already joined
1228 if (!joined) {
1229 cancelExecution();
1230 cancelTimeout();
1231 }
1232
1233 // wait for stragglers
1234 try {
1235 flock.close();
1236 } finally {
1237 closed = true;
1238 }
1239
1240 // throw ISE if the owner didn't join after forking
1241 if (needToJoin) {
1242 needToJoin = false;
1243 throw new IllegalStateException("Owner did not join");
1244 }
1245 }
1246
1247 /**
1248 * {@inheritDoc} If a {@link Config#withName(String) name} for monitoring and
1249 * monitoring purposes has been set then the string representation includes the name.
1250 */
1251 @Override
1252 public String toString() {
1253 return flock.name();
1254 }
1255
1256 /**
1257 * Subtask implementation, runs the task specified to the fork method.
1258 */
1259 private static final class SubtaskImpl<T> implements Subtask<T>, Runnable {
1260 private static final AltResult RESULT_NULL = new AltResult(Subtask.State.SUCCESS);
1261
1262 private record AltResult(Subtask.State state, Throwable exception) {
1263 AltResult(Subtask.State state) {
1264 this(state, null);
1265 }
1266 }
1267
1268 private final StructuredTaskScope<? super T, ?> scope;
1269 private final Callable<? extends T> task;
1270 private volatile Object result;
1271
1272 SubtaskImpl(StructuredTaskScope<? super T, ?> scope, Callable<? extends T> task) {
1273 this.scope = scope;
1274 this.task = task;
1275 }
1276
1277 @Override
1278 public void run() {
1279 T result = null;
1280 Throwable ex = null;
1281 try {
1282 result = task.call();
1283 } catch (Throwable e) {
1284 ex = e;
1285 }
1286
1287 // nothing to do if task scope is cancelled
1288 if (scope.isCancelled())
1289 return;
1290
1291 // set result/exception and invoke onComplete
1292 if (ex == null) {
1293 this.result = (result != null) ? result : RESULT_NULL;
1294 } else {
1295 this.result = new AltResult(State.FAILED, ex);
1296 }
1297 scope.onComplete(this);
1298 }
1299
1300 @Override
1301 public Subtask.State state() {
1302 Object result = this.result;
1303 if (result == null) {
1304 return State.UNAVAILABLE;
1305 } else if (result instanceof AltResult alt) {
1306 // null or failed
1307 return alt.state();
1308 } else {
1309 return State.SUCCESS;
1310 }
1311 }
1312
1313
1314 @Override
1315 public T get() {
1316 scope.ensureJoinedIfOwner();
1317 Object result = this.result;
1318 if (result instanceof AltResult) {
1319 if (result == RESULT_NULL) return null;
1320 } else if (result != null) {
1321 @SuppressWarnings("unchecked")
1322 T r = (T) result;
1323 return r;
1324 }
1325 throw new IllegalStateException(
1326 "Result is unavailable or subtask did not complete successfully");
1327 }
1328
1329 @Override
1330 public Throwable exception() {
1331 scope.ensureJoinedIfOwner();
1332 Object result = this.result;
1333 if (result instanceof AltResult alt && alt.state() == State.FAILED) {
1334 return alt.exception();
1335 }
1336 throw new IllegalStateException(
1337 "Exception is unavailable or subtask did not complete with exception");
1338 }
1339
1340 @Override
1341 public String toString() {
1342 String stateAsString = switch (state()) {
1343 case UNAVAILABLE -> "[Unavailable]";
1344 case SUCCESS -> "[Completed successfully]";
1345 case FAILED -> {
1346 Throwable ex = ((AltResult) result).exception();
1347 yield "[Failed: " + ex + "]";
1348 }
1349 };
1350 return Objects.toIdentityString(this) + stateAsString;
1351 }
1352 }
1353
1354 /**
1355 * A joiner that returns a stream of all subtasks when all subtasks complete
1356 * successfully. If any subtask fails then execution is cancelled.
1357 */
1358 private static final class AllSuccessful<T> implements Joiner<T, Stream<Subtask<T>>> {
1359 private static final VarHandle FIRST_EXCEPTION;
1360 static {
1361 try {
1362 MethodHandles.Lookup l = MethodHandles.lookup();
1363 FIRST_EXCEPTION = l.findVarHandle(AllSuccessful.class, "firstException", Throwable.class);
1364 } catch (Exception e) {
1365 throw new ExceptionInInitializerError(e);
1366 }
1367 }
1368 // list of forked subtasks, only accessed by owner thread
1369 private final List<Subtask<T>> subtasks = new ArrayList<>();
1370 private volatile Throwable firstException;
1371
1372 @Override
1373 public boolean onFork(Subtask<? extends T> subtask) {
1374 @SuppressWarnings("unchecked")
1375 var tmp = (Subtask<T>) subtask;
1376 subtasks.add(tmp);
1377 return false;
1378 }
1379
1380 @Override
1381 public boolean onComplete(Subtask<? extends T> subtask) {
1382 return (subtask.state() == Subtask.State.FAILED)
1383 && (firstException == null)
1384 && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
1385 }
1386
1387 @Override
1388 public Stream<Subtask<T>> result() throws Throwable {
1389 Throwable ex = firstException;
1390 if (ex != null) {
1391 throw ex;
1392 } else {
1393 return subtasks.stream();
1394 }
1395 }
1396 }
1397
1398 /**
1399 * A joiner that returns the result of the first subtask to complete successfully.
1400 * If any subtask completes successfully then execution is cancelled.
1401 */
1402 private static final class AnySuccessful<T> implements Joiner<T, T> {
1403 private static final VarHandle FIRST_SUCCESS;
1404 private static final VarHandle FIRST_EXCEPTION;
1405 static {
1406 try {
1407 MethodHandles.Lookup l = MethodHandles.lookup();
1408 FIRST_SUCCESS = l.findVarHandle(AnySuccessful.class, "firstSuccess", Subtask.class);
1409 FIRST_EXCEPTION = l.findVarHandle(AnySuccessful.class, "firstException", Throwable.class);
1410 } catch (Exception e) {
1411 throw new ExceptionInInitializerError(e);
1412 }
1413 }
1414 private volatile Subtask<T> firstSuccess;
1415 private volatile Throwable firstException;
1416
1417 @Override
1418 public boolean onComplete(Subtask<? extends T> subtask) {
1419 if (firstSuccess == null) {
1420 if (subtask.state() == Subtask.State.SUCCESS) {
1421 // capture the first subtask that completes successfully
1422 return FIRST_SUCCESS.compareAndSet(this, null, subtask);
1423 } else if (firstException == null) {
1424 // capture the exception thrown by the first task to fail
1425 FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
1426 }
1427 }
1428 return false;
1429 }
1430
1431 @Override
1432 public T result() throws Throwable {
1433 Subtask<T> firstSuccess = this.firstSuccess;
1434 if (firstSuccess != null) {
1435 return firstSuccess.get();
1436 }
1437 Throwable firstException = this.firstException;
1438 if (firstException != null) {
1439 throw firstException;
1440 } else {
1441 throw new NoSuchElementException("No subtasks completed");
1442 }
1443 }
1444 }
1445
1446 /**
1447 * A joiner that that waits for all successful subtasks. If any subtask fails the
1448 * execution is cancelled.
1449 */
1450 private static final class AwaitSuccessful<T> implements Joiner<T, Void> {
1451 private static final VarHandle FIRST_EXCEPTION;
1452 static {
1453 try {
1454 MethodHandles.Lookup l = MethodHandles.lookup();
1455 FIRST_EXCEPTION = l.findVarHandle(AwaitSuccessful.class, "firstException", Throwable.class);
1456 } catch (Exception e) {
1457 throw new ExceptionInInitializerError(e);
1458 }
1459 }
1460 private volatile Throwable firstException;
1461
1462 @Override
1463 public boolean onComplete(Subtask<? extends T> subtask) {
1464 return (subtask.state() == Subtask.State.FAILED)
1465 && (firstException == null)
1466 && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
1467 }
1468
1469 @Override
1470 public Void result() throws Throwable {
1471 Throwable ex = firstException;
1472 if (ex != null) {
1473 throw ex;
1474 } else {
1475 return null;
1476 }
1477 }
1478 }
1479
1480 /**
1481 * A joiner that returns a stream of all subtasks.
1482 */
1483 private static class AllSubtasks<T> implements Joiner<T, Stream<Subtask<T>>> {
1484 private final Predicate<Subtask<? extends T>> isDone;
1485 // list of forked subtasks, only accessed by owner thread
1486 private final List<Subtask<T>> subtasks = new ArrayList<>();
1487
1488 AllSubtasks(Predicate<Subtask<? extends T>> isDone) {
1489 this.isDone = Objects.requireNonNull(isDone);
1490 }
1491
1492 @Override
1493 public boolean onFork(Subtask<? extends T> subtask) {
1494 @SuppressWarnings("unchecked")
1495 var tmp = (Subtask<T>) subtask;
1496 subtasks.add(tmp);
1497 return false;
1498 }
1499
1500 @Override
1501 public boolean onComplete(Subtask<? extends T> subtask) {
1502 return isDone.test(subtask);
1503 }
1504
1505 @Override
1506 public Stream<Subtask<T>> result() {
1507 return subtasks.stream();
1508 }
1509 }
1510
1511 /**
1512 * Implementation of Config.
1513 */
1514 private record ConfigImpl(ThreadFactory threadFactory,
1515 String name,
1516 Duration timeout) implements Config {
1517 static Config defaultConfig() {
1518 return new ConfigImpl(Thread.ofVirtual().factory(), null, null);
1519 }
1520
1521 @Override
1522 public Config withThreadFactory(ThreadFactory threadFactory) {
1523 return new ConfigImpl(Objects.requireNonNull(threadFactory), name, timeout);
1524 }
1525
1526 @Override
1527 public Config withName(String name) {
1528 return new ConfigImpl(threadFactory, Objects.requireNonNull(name), timeout);
1529 }
1530
1531 @Override
1532 public Config withTimeout(Duration timeout) {
1533 return new ConfigImpl(threadFactory, name, Objects.requireNonNull(timeout));
1534 }
1535 }
1536
1537 /**
1538 * Used to schedule a task to cancel exception when a timeout expires.
1539 */
1540 private static class TimerSupport {
1541 private static final ScheduledExecutorService DELAYED_TASK_SCHEDULER;
1542 static {
1543 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor)
1544 Executors.newScheduledThreadPool(1, task -> {
1545 Thread t = InnocuousThread.newThread("StructuredTaskScope-Timer", task);
1546 t.setDaemon(true);
1547 return t;
1548 });
1549 stpe.setRemoveOnCancelPolicy(true);
1550 DELAYED_TASK_SCHEDULER = stpe;
1551 }
1552
1553 static Future<?> schedule(Duration timeout, Runnable task) {
1554 long nanos = TimeUnit.NANOSECONDS.convert(timeout);
1555 return DELAYED_TASK_SCHEDULER.schedule(task, nanos, TimeUnit.NANOSECONDS);
1556 }
1557 }
1558 }
|