12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package java.util.concurrent;
26
27 import java.lang.invoke.MethodHandles;
28 import java.lang.invoke.VarHandle;
29 import java.security.AccessController;
30 import java.security.PrivilegedAction;
31 import java.time.Duration;
32 import java.time.Instant;
33 import java.util.Objects;
34 import java.util.Optional;
35 import java.util.concurrent.locks.ReentrantLock;
36 import java.util.function.Function;
37 import java.util.function.Supplier;
38 import jdk.internal.javac.PreviewFeature;
39 import jdk.internal.misc.ThreadFlock;
40 import jdk.internal.invoke.MhUtil;
41
42 /**
43 * A basic API for <em>structured concurrency</em>. {@code StructuredTaskScope} supports
44 * cases where a task splits into several concurrent subtasks, and where the subtasks must
45 * complete before the main task continues. A {@code StructuredTaskScope} can be used to
46 * ensure that the lifetime of a concurrent operation is confined by a <em>syntax block</em>,
47 * just like that of a sequential operation in structured programming.
48 *
49 * <h2>Basic operation</h2>
50 *
51 * A {@code StructuredTaskScope} is created with one of its public constructors. It defines
52 * the {@link #fork(Callable) fork} method to start a thread to execute a subtask, the {@link
53 * #join() join} method to wait for all subtasks to finish, and the {@link #close() close}
54 * method to close the task scope. The API is intended to be used with the {@code
55 * try-with-resources} statement. The intention is that code in the try <em>block</em>
56 * uses the {@code fork} method to fork threads to execute the subtasks, wait for the
57 * subtasks to finish with the {@code join} method, and then <em>process the results</em>.
58 * A call to the {@code fork} method returns a {@link Subtask Subtask} to representing
59 * the <em>forked subtask</em>. Once {@code join} is called, the {@code Subtask} can be
60 * used to get the result completed successfully, or the exception if the subtask failed.
61 * {@snippet lang=java :
62 * Callable<String> task1 = ...
63 * Callable<Integer> task2 = ...
64 *
65 * try (var scope = new StructuredTaskScope<Object>()) {
66 *
67 * Subtask<String> subtask1 = scope.fork(task1); // @highlight substring="fork"
68 * Subtask<Integer> subtask2 = scope.fork(task2); // @highlight substring="fork"
69 *
70 * scope.join(); // @highlight substring="join"
71 *
72 * ... process results/exceptions ...
73 *
74 * } // close // @highlight substring="close"
75 * }
76 * <p> The following example forks a collection of homogeneous subtasks, waits for all of
77 * them to complete with the {@code join} method, and uses the {@link Subtask.State
78 * Subtask.State} to partition the subtasks into a set of the subtasks that completed
79 * successfully and another for the subtasks that failed.
80 * {@snippet lang=java :
81 * List<Callable<String>> callables = ...
82 *
83 * try (var scope = new StructuredTaskScope<String>()) {
84 *
85 * List<Subtask<String>> subtasks = callables.stream().map(scope::fork).toList();
86 *
87 * scope.join();
88 *
89 * Map<Boolean, Set<Subtask<String>>> map = subtasks.stream()
90 * .collect(Collectors.partitioningBy(h -> h.state() == Subtask.State.SUCCESS,
91 * Collectors.toSet()));
92 *
93 * } // close
94 * }
95 *
96 * <p> To ensure correct usage, the {@code join} and {@code close} methods may only be
97 * invoked by the <em>owner</em> (the thread that opened/created the task scope), and the
98 * {@code close} method throws an exception after closing if the owner did not invoke the
99 * {@code join} method after forking.
100 *
101 * <p> {@code StructuredTaskScope} defines the {@link #shutdown() shutdown} method to shut
102 * down a task scope without closing it. The {@code shutdown()} method <em>cancels</em> all
103 * unfinished subtasks by {@linkplain Thread#interrupt() interrupting} the threads. It
104 * prevents new threads from starting in the task scope. If the owner is waiting in the
105 * {@code join} method then it will wakeup.
106 *
107 * <p> Shutdown is used for <em>short-circuiting</em> and allow subclasses to implement
108 * <em>policy</em> that does not require all subtasks to finish.
109 *
110 * <h2>Subclasses with policies for common cases</h2>
111 *
112 * Two subclasses of {@code StructuredTaskScope} are defined to implement policy for
113 * common cases:
114 * <ol>
115 * <li> {@link ShutdownOnSuccess ShutdownOnSuccess} captures the result of the first
116 * subtask to complete successfully. Once captured, it shuts down the task scope to
117 * interrupt unfinished threads and wakeup the owner. This class is intended for cases
118 * where the result of any subtask will do ("invoke any") and where there is no need to
119 * wait for results of other unfinished subtasks. It defines methods to get the first
120 * result or throw an exception if all subtasks fail.
121 * <li> {@link ShutdownOnFailure ShutdownOnFailure} captures the exception of the first
122 * subtask to fail. Once captured, it shuts down the task scope to interrupt unfinished
123 * threads and wakeup the owner. This class is intended for cases where the results of all
124 * subtasks are required ("invoke all"); if any subtask fails then the results of other
125 * unfinished subtasks are no longer needed. If defines methods to throw an exception if
126 * any of the subtasks fail.
127 * </ol>
128 *
129 * <p> The following are two examples that use the two classes. In both cases, a pair of
130 * subtasks are forked to fetch resources from two URL locations "left" and "right". The
131 * first example creates a ShutdownOnSuccess object to capture the result of the first
132 * subtask to complete successfully, cancelling the other by way of shutting down the task
133 * scope. The main task waits in {@code join} until either subtask completes with a result
134 * or both subtasks fail. It invokes {@link ShutdownOnSuccess#result(Function)
135 * result(Function)} method to get the captured result. If both subtasks fail then this
136 * method throws a {@code WebApplicationException} with the exception from one of the
137 * subtasks as the cause.
138 * {@snippet lang=java :
139 * try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
140 *
141 * scope.fork(() -> fetch(left));
142 * scope.fork(() -> fetch(right));
143 *
144 * scope.join();
145 *
146 * // @link regex="result(?=\()" target="ShutdownOnSuccess#result" :
147 * String result = scope.result(e -> new WebApplicationException(e));
148 *
149 * ...
150 * }
151 * }
152 * The second example creates a ShutdownOnFailure object to capture the exception of the
153 * first subtask to fail, cancelling the other by way of shutting down the task scope. The
154 * main task waits in {@link #joinUntil(Instant)} until both subtasks complete with a
155 * result, either fails, or a deadline is reached. It invokes {@link
156 * ShutdownOnFailure#throwIfFailed(Function) throwIfFailed(Function)} to throw an exception
157 * if either subtask fails. This method is a no-op if both subtasks complete successfully.
158 * The example uses {@link Supplier#get()} to get the result of each subtask. Using
159 * {@code Supplier} instead of {@code Subtask} is preferred for common cases where the
160 * object returned by fork is only used to get the result of a subtask that completed
161 * successfully.
162 * {@snippet lang=java :
163 * Instant deadline = ...
164 *
165 * try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
166 *
167 * Supplier<String> supplier1 = scope.fork(() -> query(left));
168 * Supplier<String> supplier2 = scope.fork(() -> query(right));
169 *
170 * scope.joinUntil(deadline);
171 *
172 * // @link substring="throwIfFailed" target="ShutdownOnFailure#throwIfFailed" :
173 * scope.throwIfFailed(e -> new WebApplicationException(e));
174 *
175 * // both subtasks completed successfully
176 * String result = Stream.of(supplier1, supplier2)
177 * .map(Supplier::get)
178 * .collect(Collectors.joining(", ", "{ ", " }"));
179 *
180 * ...
181 * }
182 * }
183 *
184 * <h2>Extending StructuredTaskScope</h2>
185 *
186 * {@code StructuredTaskScope} can be extended, and the {@link #handleComplete(Subtask)
187 * handleComplete} method overridden, to implement policies other than those implemented
188 * by {@code ShutdownOnSuccess} and {@code ShutdownOnFailure}. A subclass may, for example,
189 * collect the results of subtasks that complete successfully and ignore subtasks that
190 * fail. It may collect exceptions when subtasks fail. It may invoke the {@link #shutdown()
191 * shutdown} method to shut down and cause {@link #join() join} to wakeup when some
192 * condition arises.
193 *
194 * <p> A subclass will typically define methods to make available results, state, or other
195 * outcome to code that executes after the {@code join} method. A subclass that collects
196 * results and ignores subtasks that fail may define a method that returns the results.
197 * A subclass that implements a policy to shut down when a subtask fails may define a
198 * method to get the exception of the first subtask to fail.
199 *
200 * <p> The following is an example of a simple {@code StructuredTaskScope} implementation
201 * that collects homogenous subtasks that complete successfully. It defines the method
202 * "{@code completedSuccessfully()}" that the main task can invoke after it joins.
203 * {@snippet lang=java :
204 * class CollectingScope<T> extends StructuredTaskScope<T> {
205 * private final Queue<Subtask<? extends T>> subtasks = new LinkedTransferQueue<>();
206 *
207 * @Override
208 * protected void handleComplete(Subtask<? extends T> subtask) {
209 * if (subtask.state() == Subtask.State.SUCCESS) {
210 * subtasks.add(subtask);
211 * }
212 * }
213 *
214 * @Override
215 * public CollectingScope<T> join() throws InterruptedException {
216 * super.join();
217 * return this;
218 * }
219 *
220 * public Stream<Subtask<? extends T>> completedSuccessfully() {
221 * // @link substring="ensureOwnerAndJoined" target="ensureOwnerAndJoined" :
222 * super.ensureOwnerAndJoined();
223 * return subtasks.stream();
224 * }
225 * }
226 * }
227 * <p> The implementations of the {@code completedSuccessfully()} method in the example
228 * invokes {@link #ensureOwnerAndJoined()} to ensure that the method can only be invoked
229 * by the owner thread and only after it has joined.
230 *
231 * <h2><a id="TreeStructure">Tree structure</a></h2>
232 *
233 * Task scopes form a tree where parent-child relations are established implicitly when
234 * opening a new task scope:
235 * <ul>
236 * <li> A parent-child relation is established when a thread started in a task scope
237 * opens its own task scope. A thread started in task scope "A" that opens task scope
238 * "B" establishes a parent-child relation where task scope "A" is the parent of task
239 * scope "B".
240 * <li> A parent-child relation is established with nesting. If a thread opens task
241 * scope "B", then opens task scope "C" (before it closes "B"), then the enclosing task
242 * scope "B" is the parent of the nested task scope "C".
243 * </ul>
244 *
245 * The <i>descendants</i> of a task scope are the child task scopes that it is a parent
246 * of, plus the descendants of the child task scopes, recursively.
247 *
248 * <p> The tree structure supports:
249 * <ul>
250 * <li> Inheritance of {@linkplain ScopedValue scoped values} across threads.
251 * <li> Confinement checks. The phrase "threads contained in the task scope" in method
252 * descriptions means threads started in the task scope or descendant scopes.
253 * </ul>
254 *
255 * <p> The following example demonstrates the inheritance of a scoped value. A scoped
256 * value {@code USERNAME} is bound to the value "{@code duke}". A {@code StructuredTaskScope}
257 * is created and its {@code fork} method invoked to start a thread to execute {@code
258 * childTask}. The thread inherits the scoped value <em>bindings</em> captured when
259 * creating the task scope. The code in {@code childTask} uses the value of the scoped
260 * value and so reads the value "{@code duke}".
261 * {@snippet lang=java :
262 * private static final ScopedValue<String> USERNAME = ScopedValue.newInstance();
263 *
264 * // @link substring="runWhere" target="ScopedValue#runWhere(ScopedValue, Object, Runnable)" :
265 * ScopedValue.runWhere(USERNAME, "duke", () -> {
266 * try (var scope = new StructuredTaskScope<String>()) {
267 *
268 * scope.fork(() -> childTask()); // @highlight substring="fork"
269 * ...
270 * }
271 * });
272 *
273 * ...
274 *
275 * String childTask() {
276 * // @link substring="get" target="ScopedValue#get()" :
277 * String name = USERNAME.get(); // "duke"
278 * ...
279 * }
280 * }
281 *
282 * <p> {@code StructuredTaskScope} does not define APIs that exposes the tree structure
283 * at this time.
284 *
285 * <p> Unless otherwise specified, passing a {@code null} argument to a constructor
286 * or method in this class will cause a {@link NullPointerException} to be thrown.
287 *
288 * <h2>Memory consistency effects</h2>
289 *
290 * <p> Actions in the owner thread of, or a thread contained in, the task scope prior to
291 * {@linkplain #fork forking} of a subtask
292 * <a href="{@docRoot}/java.base/java/util/concurrent/package-summary.html#MemoryVisibility">
293 * <i>happen-before</i></a> any actions taken by that subtask, which in turn <i>happen-before</i>
294 * the subtask result is {@linkplain Subtask#get() retrieved} or <i>happen-before</i> any
295 * actions taken in a thread after {@linkplain #join() joining} of the task scope.
296 *
297 * @jls 17.4.5 Happens-before Order
298 *
299 * @param <T> the result type of tasks executed in the task scope
300 * @since 21
301 */
302 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
303 public class StructuredTaskScope<T> implements AutoCloseable {
304 private final ThreadFactory factory;
305 private final ThreadFlock flock;
306 private final ReentrantLock shutdownLock = new ReentrantLock();
307
308 // states: OPEN -> SHUTDOWN -> CLOSED
309 private static final int OPEN = 0; // initial state
310 private static final int SHUTDOWN = 1;
311 private static final int CLOSED = 2;
312
313 // state: set to SHUTDOWN by any thread, set to CLOSED by owner, read by any thread
314 private volatile int state;
315
316 // Counters to support checking that the task scope owner joins before processing
317 // results and attempts join before closing the task scope. These counters are
318 // accessed only by the owner thread.
319 private int forkRound; // incremented when the first subtask is forked after join
320 private int lastJoinAttempted; // set to the current fork round when join is attempted
321 private int lastJoinCompleted; // set to the current fork round when join completes
322
323 /**
324 * Represents a subtask forked with {@link #fork(Callable)}.
325 * @param <T> the result type
326 * @since 21
327 */
328 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
329 public sealed interface Subtask<T> extends Supplier<T> permits SubtaskImpl {
330 /**
331 * {@return the value returning task provided to the {@code fork} method}
332 *
333 * @apiNote Task objects with unique identity may be used for correlation by
334 * implementations of {@link #handleComplete(Subtask) handleComplete}.
335 */
336 Callable<? extends T> task();
337
338 /**
339 * Represents the state of a subtask.
340 * @see Subtask#state()
341 * @since 21
342 */
343 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
344 enum State {
345 /**
346 * The subtask result or exception is not available. This state indicates that
347 * the subtask was forked but has not completed, it completed after the task
348 * scope was {@linkplain #shutdown() shut down}, or it was forked after the
349 * task scope was shut down.
350 */
351 UNAVAILABLE,
352 /**
353 * The subtask completed successfully with a result. The {@link Subtask#get()
354 * Subtask.get()} method can be used to obtain the result. This is a terminal
355 * state.
356 */
357 SUCCESS,
358 /**
359 * The subtask failed with an exception. The {@link Subtask#exception()
360 * Subtask.exception()} method can be used to obtain the exception. This is a
361 * terminal state.
362 */
363 FAILED,
364 }
365
366 /**
367 * {@return the state of the subtask}
368 */
369 State state();
370
371 /**
372 * Returns the result of the subtask.
373 *
374 * <p> To ensure correct usage, if the scope owner {@linkplain #fork(Callable) forks}
375 * a subtask, then it must join (with {@link #join() join} or {@link #joinUntil(Instant)
376 * joinUntil}) before it can obtain the result of the subtask.
377 *
378 * @return the possibly-null result
379 * @throws IllegalStateException if the subtask has not completed, did not complete
380 * successfully, or the current thread is the task scope owner and did not join
381 * after forking
382 * @see State#SUCCESS
383 */
384 T get();
385
386 /**
387 * {@return the exception thrown by the subtask}
388 *
389 * <p> To ensure correct usage, if the scope owner {@linkplain #fork(Callable) forks}
390 * a subtask, then it must join (with {@link #join() join} or {@link #joinUntil(Instant)
391 * joinUntil}) before it can obtain the exception thrown by the subtask.
392 *
393 * @throws IllegalStateException if the subtask has not completed, completed with
394 * a result, or the current thread is the task scope owner and did not join after
395 * forking
396 * @see State#FAILED
397 */
398 Throwable exception();
399 }
400
401 /**
402 * Creates a structured task scope with the given name and thread factory. The task
403 * scope is optionally named for the purposes of monitoring and management. The thread
404 * factory is used to {@link ThreadFactory#newThread(Runnable) create} threads when
405 * subtasks are {@linkplain #fork(Callable) forked}. The task scope is owned by the
406 * current thread.
407 *
408 * <p> Construction captures the current thread's {@linkplain ScopedValue scoped value}
409 * bindings for inheritance by threads started in the task scope. The
410 * <a href="#TreeStructure">Tree Structure</a> section in the class description details
411 * how parent-child relations are established implicitly for the purpose of inheritance
412 * of scoped value bindings.
413 *
414 * @param name the name of the task scope, can be null
415 * @param factory the thread factory
416 */
417 @SuppressWarnings("this-escape")
418 public StructuredTaskScope(String name, ThreadFactory factory) {
419 this.factory = Objects.requireNonNull(factory, "'factory' is null");
420 if (name == null)
421 name = Objects.toIdentityString(this);
422 this.flock = ThreadFlock.open(name);
423 }
424
425 /**
426 * Creates an unnamed structured task scope that creates virtual threads. The task
427 * scope is owned by the current thread.
428 *
429 * @implSpec This constructor is equivalent to invoking the 2-arg constructor with a
430 * name of {@code null} and a thread factory that creates virtual threads.
431 */
432 public StructuredTaskScope() {
433 this(null, Thread.ofVirtual().factory());
434 }
435
436 private IllegalStateException newIllegalStateExceptionScopeClosed() {
437 return new IllegalStateException("Task scope is closed");
438 }
439
440 private IllegalStateException newIllegalStateExceptionNoJoin() {
441 return new IllegalStateException("Owner did not join after forking subtasks");
442 }
443
444 /**
445 * Throws IllegalStateException if the scope is closed, returning the state if not
446 * closed.
447 */
448 private int ensureOpen() {
449 int s = state;
450 if (s == CLOSED)
451 throw newIllegalStateExceptionScopeClosed();
452 return s;
453 }
454
455 /**
456 * Throws WrongThreadException if the current thread is not the owner.
457 */
458 private void ensureOwner() {
459 if (Thread.currentThread() != flock.owner())
460 throw new WrongThreadException("Current thread not owner");
461 }
462
463 /**
464 * Throws WrongThreadException if the current thread is not the owner
465 * or a thread contained in the tree.
466 */
467 private void ensureOwnerOrContainsThread() {
468 Thread currentThread = Thread.currentThread();
469 if (currentThread != flock.owner() && !flock.containsThread(currentThread))
470 throw new WrongThreadException("Current thread not owner or thread in the tree");
471 }
472
473 /**
474 * Throws IllegalStateException if the current thread is the owner, and the owner did
475 * not join after forking a subtask in the given fork round.
476 */
477 private void ensureJoinedIfOwner(int round) {
478 if (Thread.currentThread() == flock.owner() && (round > lastJoinCompleted)) {
479 throw newIllegalStateExceptionNoJoin();
480 }
481 }
482
483 /**
484 * Ensures that the current thread is the owner of this task scope and that it joined
485 * (with {@link #join()} or {@link #joinUntil(Instant)}) after {@linkplain #fork(Callable)
486 * forking} subtasks.
487 *
488 * @apiNote This method can be used by subclasses that define methods to make available
489 * results, state, or other outcome to code intended to execute after the join method.
490 *
491 * @throws WrongThreadException if the current thread is not the task scope owner
492 * @throws IllegalStateException if the task scope is open and task scope owner did
493 * not join after forking
494 */
495 protected final void ensureOwnerAndJoined() {
496 ensureOwner();
497 if (forkRound > lastJoinCompleted) {
498 throw newIllegalStateExceptionNoJoin();
499 }
500 }
501
502 /**
503 * Invoked by a subtask when it completes successfully or fails in this task scope.
504 * This method is not invoked if a subtask completes after the task scope is
505 * {@linkplain #shutdown() shut down}.
506 *
507 * @implSpec The default implementation throws {@code NullPointerException} if the
508 * subtask is {@code null}. It throws {@link IllegalArgumentException} if the subtask
509 * has not completed.
510 *
511 * @apiNote The {@code handleComplete} method should be thread safe. It may be
512 * invoked by several threads concurrently.
513 *
514 * @param subtask the subtask
515 *
516 * @throws IllegalArgumentException if called with a subtask that has not completed
517 */
518 protected void handleComplete(Subtask<? extends T> subtask) {
519 if (subtask.state() == Subtask.State.UNAVAILABLE)
520 throw new IllegalArgumentException();
521 }
522
523 /**
524 * Starts a new thread in this task scope to execute a value-returning task, thus
525 * creating a <em>subtask</em> of this task scope.
526 *
527 * <p> The value-returning task is provided to this method as a {@link Callable}, the
528 * thread executes the task's {@link Callable#call() call} method. The thread is
529 * created with the task scope's {@link ThreadFactory}. It inherits the current thread's
530 * {@linkplain ScopedValue scoped value} bindings. The bindings must match the bindings
531 * captured when the task scope was created.
532 *
533 * <p> This method returns a {@link Subtask Subtask} to represent the <em>forked
534 * subtask</em>. The {@code Subtask} object can be used to obtain the result when
535 * the subtask completes successfully, or the exception when the subtask fails. To
536 * ensure correct usage, the {@link Subtask#get() get()} and {@link Subtask#exception()
537 * exception()} methods may only be called by the task scope owner after it has waited
538 * for all threads to finish with the {@link #join() join} or {@link #joinUntil(Instant)}
539 * methods. When the subtask completes, the thread invokes the {@link
540 * #handleComplete(Subtask) handleComplete} method to consume the completed subtask.
541 * If the task scope is {@linkplain #shutdown() shut down} before the subtask completes
542 * then the {@code handleComplete} method will not be invoked.
543 *
544 * <p> If this task scope is {@linkplain #shutdown() shutdown} (or in the process of
545 * shutting down) then the subtask will not run and the {@code handleComplete} method
546 * will not be invoked.
547 *
548 * <p> This method may only be invoked by the task scope owner or threads contained
549 * in the task scope.
550 *
551 * @implSpec This method may be overridden for customization purposes, wrapping tasks
552 * for example. If overridden, the subclass must invoke {@code super.fork} to start a
553 * new thread in this task scope.
554 *
555 * @param task the value-returning task for the thread to execute
556 * @param <U> the result type
557 * @return the subtask
558 * @throws IllegalStateException if this task scope is closed
559 * @throws WrongThreadException if the current thread is not the task scope owner or a
560 * thread contained in the task scope
561 * @throws StructureViolationException if the current scoped value bindings are not
562 * the same as when the task scope was created
563 * @throws RejectedExecutionException if the thread factory rejected creating a
564 * thread to run the subtask
565 */
566 public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
567 Objects.requireNonNull(task, "'task' is null");
568 int s = ensureOpen(); // throws ISE if closed
569
570 // when forked by the owner, the subtask is forked in the current or next round
571 int round = -1;
572 if (Thread.currentThread() == flock.owner()) {
573 round = forkRound;
574 if (forkRound == lastJoinCompleted) {
575 // new round if first fork after join
576 round++;
577 }
578 }
579
580 SubtaskImpl<U> subtask = new SubtaskImpl<>(this, task, round);
581 if (s < SHUTDOWN) {
582 // create thread to run task
583 Thread thread = factory.newThread(subtask);
584 if (thread == null) {
585 throw new RejectedExecutionException("Rejected by thread factory");
586 }
587
588 // attempt to start the thread
589 try {
590 flock.start(thread);
591 } catch (IllegalStateException e) {
592 // shutdown by another thread, or underlying flock is shutdown due
593 // to unstructured use
594 }
595 }
596
597 // force owner to join if this is the first fork in the round
598 if (Thread.currentThread() == flock.owner() && round > forkRound) {
599 forkRound = round;
600 }
601
602 // return forked subtask or a subtask that did not run
603 return subtask;
604 }
605
606 /**
607 * Wait for all threads to finish or the task scope to shut down.
608 */
609 private void implJoin(Duration timeout)
610 throws InterruptedException, TimeoutException
611 {
612 ensureOwner();
613 lastJoinAttempted = forkRound;
614 int s = ensureOpen(); // throws ISE if closed
615 if (s == OPEN) {
616 // wait for all threads, wakeup, interrupt, or timeout
617 if (timeout != null) {
618 flock.awaitAll(timeout);
619 } else {
620 flock.awaitAll();
621 }
622 }
623 lastJoinCompleted = forkRound;
624 }
625
626 /**
627 * Wait for all subtasks started in this task scope to finish or the task scope to
628 * shut down.
629 *
630 * <p> This method waits for all subtasks by waiting for all threads {@linkplain
631 * #fork(Callable) started} in this task scope to finish execution. It stops waiting
632 * when all threads finish, the task scope is {@linkplain #shutdown() shut down}, or
633 * the current thread is {@linkplain Thread#interrupt() interrupted}.
634 *
635 * <p> This method may only be invoked by the task scope owner.
636 *
637 * @implSpec This method may be overridden for customization purposes or to return a
638 * more specific return type. If overridden, the subclass must invoke {@code
639 * super.join} to ensure that the method waits for threads in this task scope to
640 * finish.
641 *
642 * @return this task scope
643 * @throws IllegalStateException if this task scope is closed
644 * @throws WrongThreadException if the current thread is not the task scope owner
645 * @throws InterruptedException if interrupted while waiting
646 */
647 public StructuredTaskScope<T> join() throws InterruptedException {
648 try {
649 implJoin(null);
650 } catch (TimeoutException e) {
651 throw new InternalError();
652 }
653 return this;
654 }
655
656 /**
657 * Wait for all subtasks started in this task scope to finish or the task scope to
658 * shut down, up to the given deadline.
659 *
660 * <p> This method waits for all subtasks by waiting for all threads {@linkplain
661 * #fork(Callable) started} in this task scope to finish execution. It stops waiting
662 * when all threads finish, the task scope is {@linkplain #shutdown() shut down}, the
663 * deadline is reached, or the current thread is {@linkplain Thread#interrupt()
664 * interrupted}.
665 *
666 * <p> This method may only be invoked by the task scope owner.
667 *
668 * @implSpec This method may be overridden for customization purposes or to return a
669 * more specific return type. If overridden, the subclass must invoke {@code
670 * super.joinUntil} to ensure that the method waits for threads in this task scope to
671 * finish.
672 *
673 * @param deadline the deadline
674 * @return this task scope
675 * @throws IllegalStateException if this task scope is closed
676 * @throws WrongThreadException if the current thread is not the task scope owner
677 * @throws InterruptedException if interrupted while waiting
678 * @throws TimeoutException if the deadline is reached while waiting
679 */
680 public StructuredTaskScope<T> joinUntil(Instant deadline)
681 throws InterruptedException, TimeoutException
682 {
683 Duration timeout = Duration.between(Instant.now(), deadline);
684 implJoin(timeout);
685 return this;
686 }
687
688 /**
689 * Interrupt all unfinished threads.
690 */
691 private void implInterruptAll() {
692 flock.threads()
693 .filter(t -> t != Thread.currentThread())
694 .forEach(t -> {
695 try {
696 t.interrupt();
697 } catch (Throwable ignore) { }
698 });
699 }
700
701 @SuppressWarnings("removal")
702 private void interruptAll() {
703 if (System.getSecurityManager() == null) {
704 implInterruptAll();
705 } else {
706 PrivilegedAction<Void> pa = () -> {
707 implInterruptAll();
708 return null;
709 };
710 AccessController.doPrivileged(pa);
711 }
712 }
713
714 /**
715 * Shutdown the task scope if not already shutdown. Return true if this method
716 * shutdowns the task scope, false if already shutdown.
717 */
718 private boolean implShutdown() {
719 shutdownLock.lock();
720 try {
721 if (state < SHUTDOWN) {
722 // prevent new threads from starting
723 flock.shutdown();
724
725 // set status before interrupting tasks
726 state = SHUTDOWN;
727
728 // interrupt all unfinished threads
729 interruptAll();
730
731 return true;
732 } else {
733 // already shutdown
734 return false;
735 }
736 } finally {
737 shutdownLock.unlock();
738 }
739 }
740
741 /**
742 * Shut down this task scope without closing it. Shutting down a task scope prevents
743 * new threads from starting, interrupts all unfinished threads, and causes the
744 * {@link #join() join} method to wakeup. Shutdown is useful for cases where the
745 * results of unfinished subtasks are no longer needed. It will typically be called
746 * by the {@link #handleComplete(Subtask)} implementation of a subclass that
747 * implements a policy to discard unfinished tasks once some outcome is reached.
748 *
749 * <p> More specifically, this method:
750 * <ul>
751 * <li> {@linkplain Thread#interrupt() Interrupts} all unfinished threads in the
752 * task scope (except the current thread).
753 * <li> Wakes up the task scope owner if it is waiting in {@link #join()} or {@link
754 * #joinUntil(Instant)}. If the task scope owner is not waiting then its next call to
755 * {@code join} or {@code joinUntil} will return immediately.
756 * </ul>
757 *
758 * <p> The {@linkplain Subtask.State state} of unfinished subtasks that complete at
759 * around the time that the task scope is shutdown is not defined. A subtask that
760 * completes successfully with a result, or fails with an exception, at around
761 * the time that the task scope is shutdown may or may not <i>transition</i> to a
762 * terminal state.
763 *
764 * <p> This method may only be invoked by the task scope owner or threads contained
765 * in the task scope.
766 *
767 * @implSpec This method may be overridden for customization purposes. If overridden,
768 * the subclass must invoke {@code super.shutdown} to ensure that the method shuts
769 * down the task scope.
770 *
771 * @apiNote
772 * There may be threads that have not finished because they are executing code that
773 * did not respond (or respond promptly) to thread interrupt. This method does not wait
774 * for these threads. When the owner invokes the {@link #close() close} method
775 * to close the task scope then it will wait for the remaining threads to finish.
776 *
777 * @throws IllegalStateException if this task scope is closed
778 * @throws WrongThreadException if the current thread is not the task scope owner or
779 * a thread contained in the task scope
780 * @see #isShutdown()
781 */
782 public void shutdown() {
783 ensureOwnerOrContainsThread();
784 int s = ensureOpen(); // throws ISE if closed
785 if (s < SHUTDOWN && implShutdown())
786 flock.wakeup();
787 }
788
789 /**
790 * {@return true if this task scope is shutdown, otherwise false}
791 * @see #shutdown()
792 */
793 public final boolean isShutdown() {
794 return state >= SHUTDOWN;
795 }
796
797 /**
798 * Closes this task scope.
799 *
800 * <p> This method first shuts down the task scope (as if by invoking the {@link
801 * #shutdown() shutdown} method). It then waits for the threads executing any
802 * unfinished tasks to finish. If interrupted, this method will continue to wait for
803 * the threads to finish before completing with the interrupt status set.
804 *
805 * <p> This method may only be invoked by the task scope owner. If the task scope
806 * is already closed then the task scope owner invoking this method has no effect.
807 *
808 * <p> A {@code StructuredTaskScope} is intended to be used in a <em>structured
809 * manner</em>. If this method is called to close a task scope before nested task
810 * scopes are closed then it closes the underlying construct of each nested task scope
811 * (in the reverse order that they were created in), closes this task scope, and then
812 * throws {@link StructureViolationException}.
813 * Similarly, if this method is called to close a task scope while executing with
814 * {@linkplain ScopedValue scoped value} bindings, and the task scope was created
815 * before the scoped values were bound, then {@code StructureViolationException} is
816 * thrown after closing the task scope.
817 * If a thread terminates without first closing task scopes that it owns then
818 * termination will cause the underlying construct of each of its open tasks scopes to
819 * be closed. Closing is performed in the reverse order that the task scopes were
820 * created in. Thread termination may therefore be delayed when the task scope owner
821 * has to wait for threads forked in these task scopes to finish.
822 *
823 * @implSpec This method may be overridden for customization purposes. If overridden,
824 * the subclass must invoke {@code super.close} to close the task scope.
825 *
826 * @throws IllegalStateException thrown after closing the task scope if the task scope
827 * owner did not attempt to join after forking
828 * @throws WrongThreadException if the current thread is not the task scope owner
829 * @throws StructureViolationException if a structure violation was detected
830 */
831 @Override
832 public void close() {
833 ensureOwner();
834 int s = state;
835 if (s == CLOSED)
836 return;
837
838 try {
839 if (s < SHUTDOWN)
840 implShutdown();
841 flock.close();
842 } finally {
843 state = CLOSED;
844 }
845
846 // throw ISE if the owner didn't attempt to join after forking
847 if (forkRound > lastJoinAttempted) {
848 lastJoinCompleted = forkRound;
849 throw newIllegalStateExceptionNoJoin();
850 }
851 }
852
853 @Override
854 public String toString() {
855 String name = flock.name();
856 return switch (state) {
857 case OPEN -> name;
858 case SHUTDOWN -> name + "/shutdown";
859 case CLOSED -> name + "/closed";
860 default -> throw new InternalError();
861 };
862 }
863
864 /**
865 * Subtask implementation, runs the task specified to the fork method.
866 */
867 private static final class SubtaskImpl<T> implements Subtask<T>, Runnable {
868 private static final AltResult RESULT_NULL = new AltResult(Subtask.State.SUCCESS);
869
870 private record AltResult(Subtask.State state, Throwable exception) {
871 AltResult(Subtask.State state) {
872 this(state, null);
873 }
874 }
875
876 private final StructuredTaskScope<? super T> scope;
877 private final Callable<? extends T> task;
878 private final int round;
879 private volatile Object result;
880
881 SubtaskImpl(StructuredTaskScope<? super T> scope,
882 Callable<? extends T> task,
883 int round) {
884 this.scope = scope;
885 this.task = task;
886 this.round = round;
887 }
888
889 @Override
890 public void run() {
891 T result = null;
892 Throwable ex = null;
893 try {
894 result = task.call();
895 } catch (Throwable e) {
896 ex = e;
897 }
898
899 // nothing to do if task scope is shutdown
900 if (scope.isShutdown())
901 return;
902
903 // capture result or exception, invoke handleComplete
904 if (ex == null) {
905 this.result = (result != null) ? result : RESULT_NULL;
906 } else {
907 this.result = new AltResult(State.FAILED, ex);
908 }
909 scope.handleComplete(this);
910 }
911
912 @Override
913 public Callable<? extends T> task() {
914 return task;
915 }
916
917 @Override
918 public Subtask.State state() {
919 Object result = this.result;
920 if (result == null) {
921 return State.UNAVAILABLE;
922 } else if (result instanceof AltResult alt) {
923 // null or failed
924 return alt.state();
925 } else {
926 return State.SUCCESS;
927 }
928 }
929
930 @Override
931 public T get() {
932 scope.ensureJoinedIfOwner(round);
933 Object result = this.result;
934 if (result instanceof AltResult) {
935 if (result == RESULT_NULL) return null;
936 } else if (result != null) {
937 @SuppressWarnings("unchecked")
938 T r = (T) result;
939 return r;
940 }
941 throw new IllegalStateException(
942 "Result is unavailable or subtask did not complete successfully");
943 }
944
945 @Override
946 public Throwable exception() {
947 scope.ensureJoinedIfOwner(round);
948 Object result = this.result;
949 if (result instanceof AltResult alt && alt.state() == State.FAILED) {
950 return alt.exception();
951 }
952 throw new IllegalStateException(
953 "Exception is unavailable or subtask did not complete with exception");
954 }
955
956 @Override
957 public String toString() {
958 String stateAsString = switch (state()) {
959 case UNAVAILABLE -> "[Unavailable]";
960 case SUCCESS -> "[Completed successfully]";
961 case FAILED -> {
962 Throwable ex = ((AltResult) result).exception();
963 yield "[Failed: " + ex + "]";
964 }
965 };
966 return Objects.toIdentityString(this) + stateAsString;
967 }
968 }
969
970 /**
971 * A {@code StructuredTaskScope} that captures the result of the first subtask to
972 * complete {@linkplain Subtask.State#SUCCESS successfully}. Once captured, it
973 * {@linkplain #shutdown() shuts down} the task scope to interrupt unfinished threads
974 * and wakeup the task scope owner. The policy implemented by this class is intended
975 * for cases where the result of any subtask will do ("invoke any") and where the
976 * results of other unfinished subtasks are no longer needed.
977 *
978 * <p> Unless otherwise specified, passing a {@code null} argument to a method
979 * in this class will cause a {@link NullPointerException} to be thrown.
980 *
981 * @apiNote This class implements a policy to shut down the task scope when a subtask
982 * completes successfully. There shouldn't be any need to directly shut down the task
983 * scope with the {@link #shutdown() shutdown} method.
984 *
985 * @param <T> the result type
986 * @since 21
987 */
988 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
989 public static final class ShutdownOnSuccess<T> extends StructuredTaskScope<T> {
990 private static final Object RESULT_NULL = new Object();
991 private static final VarHandle FIRST_RESULT;
992 private static final VarHandle FIRST_EXCEPTION;
993 static {
994 MethodHandles.Lookup l = MethodHandles.lookup();
995 FIRST_RESULT = MhUtil.findVarHandle(l, "firstResult", Object.class);
996 FIRST_EXCEPTION = MhUtil.findVarHandle(l, "firstException", Throwable.class);
997 }
998 private volatile Object firstResult;
999 private volatile Throwable firstException;
1000
1001 /**
1002 * Constructs a new {@code ShutdownOnSuccess} with the given name and thread factory.
1003 * The task scope is optionally named for the purposes of monitoring and management.
1004 * The thread factory is used to {@link ThreadFactory#newThread(Runnable) create}
1005 * threads when subtasks are {@linkplain #fork(Callable) forked}. The task scope
1006 * is owned by the current thread.
1007 *
1008 * <p> Construction captures the current thread's {@linkplain ScopedValue scoped
1009 * value} bindings for inheritance by threads started in the task scope. The
1010 * <a href="#TreeStructure">Tree Structure</a> section in the class description
1011 * details how parent-child relations are established implicitly for the purpose
1012 * of inheritance of scoped value bindings.
1013 *
1014 * @param name the name of the task scope, can be null
1015 * @param factory the thread factory
1016 */
1017 public ShutdownOnSuccess(String name, ThreadFactory factory) {
1018 super(name, factory);
1019 }
1020
1021 /**
1022 * Constructs a new unnamed {@code ShutdownOnSuccess} that creates virtual threads.
1023 *
1024 * @implSpec This constructor is equivalent to invoking the 2-arg constructor with
1025 * a name of {@code null} and a thread factory that creates virtual threads.
1026 */
1027 public ShutdownOnSuccess() {
1028 this(null, Thread.ofVirtual().factory());
1029 }
1030
1031 @Override
1032 protected void handleComplete(Subtask<? extends T> subtask) {
1033 if (firstResult != null) {
1034 // already captured a result
1035 return;
1036 }
1037
1038 if (subtask.state() == Subtask.State.SUCCESS) {
1039 // task succeeded
1040 T result = subtask.get();
1041 Object r = (result != null) ? result : RESULT_NULL;
1042 if (FIRST_RESULT.compareAndSet(this, null, r)) {
1043 super.shutdown();
1044 }
1045 } else if (firstException == null) {
1046 // capture the exception thrown by the first subtask that failed
1047 FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
1048 }
1049 }
1050
1051 /**
1052 * Wait for a subtask started in this task scope to complete {@linkplain
1053 * Subtask.State#SUCCESS successfully} or all subtasks to complete.
1054 *
1055 * <p> This method waits for all subtasks by waiting for all threads {@linkplain
1056 * #fork(Callable) started} in this task scope to finish execution. It stops waiting
1057 * when all threads finish, a subtask completes successfully, or the current
1058 * thread is {@linkplain Thread#interrupt() interrupted}. It also stops waiting
1059 * if the {@link #shutdown() shutdown} method is invoked directly to shut down
1060 * this task scope.
1061 *
1062 * <p> This method may only be invoked by the task scope owner.
1063 *
1064 * @throws IllegalStateException {@inheritDoc}
1065 * @throws WrongThreadException {@inheritDoc}
1066 */
1067 @Override
1068 public ShutdownOnSuccess<T> join() throws InterruptedException {
1069 super.join();
1070 return this;
1071 }
1072
1073 /**
1074 * Wait for a subtask started in this task scope to complete {@linkplain
1075 * Subtask.State#SUCCESS successfully} or all subtasks to complete, up to the
1076 * given deadline.
1077 *
1078 * <p> This method waits for all subtasks by waiting for all threads {@linkplain
1079 * #fork(Callable) started} in this task scope to finish execution. It stops waiting
1080 * when all threads finish, a subtask completes successfully, the deadline is
1081 * reached, or the current thread is {@linkplain Thread#interrupt() interrupted}.
1082 * It also stops waiting if the {@link #shutdown() shutdown} method is invoked
1083 * directly to shut down this task scope.
1084 *
1085 * <p> This method may only be invoked by the task scope owner.
1086 *
1087 * @throws IllegalStateException {@inheritDoc}
1088 * @throws WrongThreadException {@inheritDoc}
1089 */
1090 @Override
1091 public ShutdownOnSuccess<T> joinUntil(Instant deadline)
1092 throws InterruptedException, TimeoutException
1093 {
1094 super.joinUntil(deadline);
1095 return this;
1096 }
1097
1098 /**
1099 * {@return the result of the first subtask that completed {@linkplain
1100 * Subtask.State#SUCCESS successfully}}
1101 *
1102 * <p> When no subtask completed successfully, but a subtask {@linkplain
1103 * Subtask.State#FAILED failed} then {@code ExecutionException} is thrown with
1104 * the subtask's exception as the {@linkplain Throwable#getCause() cause}.
1105 *
1106 * @throws ExecutionException if no subtasks completed successfully but at least
1107 * one subtask failed
1108 * @throws IllegalStateException if no subtasks completed or the task scope owner
1109 * did not join after forking
1110 * @throws WrongThreadException if the current thread is not the task scope owner
1111 */
1112 public T result() throws ExecutionException {
1113 return result(ExecutionException::new);
1114 }
1115
1116 /**
1117 * Returns the result of the first subtask that completed {@linkplain
1118 * Subtask.State#SUCCESS successfully}, otherwise throws an exception produced
1119 * by the given exception supplying function.
1120 *
1121 * <p> When no subtask completed successfully, but a subtask {@linkplain
1122 * Subtask.State#FAILED failed}, then the exception supplying function is invoked
1123 * with subtask's exception.
1124 *
1125 * @param esf the exception supplying function
1126 * @param <X> type of the exception to be thrown
1127 * @return the result of the first subtask that completed with a result
1128 *
1129 * @throws X if no subtasks completed successfully but at least one subtask failed
1130 * @throws IllegalStateException if no subtasks completed or the task scope owner
1131 * did not join after forking
1132 * @throws WrongThreadException if the current thread is not the task scope owner
1133 */
1134 public <X extends Throwable> T result(Function<Throwable, ? extends X> esf) throws X {
1135 Objects.requireNonNull(esf);
1136 ensureOwnerAndJoined();
1137
1138 Object result = firstResult;
1139 if (result == RESULT_NULL) {
1140 return null;
1141 } else if (result != null) {
1142 @SuppressWarnings("unchecked")
1143 T r = (T) result;
1144 return r;
1145 }
1146
1147 Throwable exception = firstException;
1148 if (exception != null) {
1149 X ex = esf.apply(exception);
1150 Objects.requireNonNull(ex, "esf returned null");
1151 throw ex;
1152 }
1153
1154 throw new IllegalStateException("No completed subtasks");
1155 }
1156 }
1157
1158 /**
1159 * A {@code StructuredTaskScope} that captures the exception of the first subtask to
1160 * {@linkplain Subtask.State#FAILED fail}. Once captured, it {@linkplain #shutdown()
1161 * shuts down} the task scope to interrupt unfinished threads and wakeup the task
1162 * scope owner. The policy implemented by this class is intended for cases where the
1163 * results for all subtasks are required ("invoke all"); if any subtask fails then the
1164 * results of other unfinished subtasks are no longer needed.
1165 *
1166 * <p> Unless otherwise specified, passing a {@code null} argument to a method
1167 * in this class will cause a {@link NullPointerException} to be thrown.
1168 *
1169 * @apiNote This class implements a policy to shut down the task scope when a subtask
1170 * fails. There shouldn't be any need to directly shut down the task scope with the
1171 * {@link #shutdown() shutdown} method.
1172 *
1173 * @since 21
1174 */
1175 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
1176 public static final class ShutdownOnFailure extends StructuredTaskScope<Object> {
1177 private static final VarHandle FIRST_EXCEPTION =
1178 MhUtil.findVarHandle(MethodHandles.lookup(), "firstException", Throwable.class);
1179 private volatile Throwable firstException;
1180
1181 /**
1182 * Constructs a new {@code ShutdownOnFailure} with the given name and thread factory.
1183 * The task scope is optionally named for the purposes of monitoring and management.
1184 * The thread factory is used to {@link ThreadFactory#newThread(Runnable) create}
1185 * threads when subtasks are {@linkplain #fork(Callable) forked}. The task scope
1186 * is owned by the current thread.
1187 *
1188 * <p> Construction captures the current thread's {@linkplain ScopedValue scoped
1189 * value} bindings for inheritance by threads started in the task scope. The
1190 * <a href="#TreeStructure">Tree Structure</a> section in the class description
1191 * details how parent-child relations are established implicitly for the purpose
1192 * of inheritance of scoped value bindings.
1193 *
1194 * @param name the name of the task scope, can be null
1195 * @param factory the thread factory
1196 */
1197 public ShutdownOnFailure(String name, ThreadFactory factory) {
1198 super(name, factory);
1199 }
1200
1201 /**
1202 * Constructs a new unnamed {@code ShutdownOnFailure} that creates virtual threads.
1203 *
1204 * @implSpec This constructor is equivalent to invoking the 2-arg constructor with
1205 * a name of {@code null} and a thread factory that creates virtual threads.
1206 */
1207 public ShutdownOnFailure() {
1208 this(null, Thread.ofVirtual().factory());
1209 }
1210
1211 @Override
1212 protected void handleComplete(Subtask<?> subtask) {
1213 if (subtask.state() == Subtask.State.FAILED
1214 && firstException == null
1215 && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception())) {
1216 super.shutdown();
1217 }
1218 }
1219
1220 /**
1221 * Wait for all subtasks started in this task scope to complete or for a subtask
1222 * to {@linkplain Subtask.State#FAILED fail}.
1223 *
1224 * <p> This method waits for all subtasks by waiting for all threads {@linkplain
1225 * #fork(Callable) started} in this task scope to finish execution. It stops waiting
1226 * when all threads finish, a subtask fails, or the current thread is {@linkplain
1227 * Thread#interrupt() interrupted}. It also stops waiting if the {@link #shutdown()
1228 * shutdown} method is invoked directly to shut down this task scope.
1229 *
1230 * <p> This method may only be invoked by the task scope owner.
1231 *
1232 * @throws IllegalStateException {@inheritDoc}
1233 * @throws WrongThreadException {@inheritDoc}
1234 */
1235 @Override
1236 public ShutdownOnFailure join() throws InterruptedException {
1237 super.join();
1238 return this;
1239 }
1240
1241 /**
1242 * Wait for all subtasks started in this task scope to complete or for a subtask
1243 * to {@linkplain Subtask.State#FAILED fail}, up to the given deadline.
1244 *
1245 * <p> This method waits for all subtasks by waiting for all threads {@linkplain
1246 * #fork(Callable) started} in this task scope to finish execution. It stops waiting
1247 * when all threads finish, a subtask fails, the deadline is reached, or the current
1248 * thread is {@linkplain Thread#interrupt() interrupted}. It also stops waiting
1249 * if the {@link #shutdown() shutdown} method is invoked directly to shut down
1250 * this task scope.
1251 *
1252 * <p> This method may only be invoked by the task scope owner.
1253 *
1254 * @throws IllegalStateException {@inheritDoc}
1255 * @throws WrongThreadException {@inheritDoc}
1256 */
1257 @Override
1258 public ShutdownOnFailure joinUntil(Instant deadline)
1259 throws InterruptedException, TimeoutException
1260 {
1261 super.joinUntil(deadline);
1262 return this;
1263 }
1264
1265 /**
1266 * Returns the exception of the first subtask that {@linkplain Subtask.State#FAILED
1267 * failed}. If no subtasks failed then an empty {@code Optional} is returned.
1268 *
1269 * @return the exception for the first subtask to fail or an empty optional if no
1270 * subtasks failed
1271 *
1272 * @throws WrongThreadException if the current thread is not the task scope owner
1273 * @throws IllegalStateException if the task scope owner did not join after forking
1274 */
1275 public Optional<Throwable> exception() {
1276 ensureOwnerAndJoined();
1277 return Optional.ofNullable(firstException);
1278 }
1279
1280 /**
1281 * Throws if a subtask {@linkplain Subtask.State#FAILED failed}.
1282 * If any subtask failed with an exception then {@code ExecutionException} is
1283 * thrown with the exception of the first subtask to fail as the {@linkplain
1284 * Throwable#getCause() cause}. This method does nothing if no subtasks failed.
1285 *
1286 * @throws ExecutionException if a subtask failed
1287 * @throws WrongThreadException if the current thread is not the task scope owner
1288 * @throws IllegalStateException if the task scope owner did not join after forking
1289 */
1290 public void throwIfFailed() throws ExecutionException {
1291 throwIfFailed(ExecutionException::new);
1292 }
1293
1294 /**
1295 * Throws the exception produced by the given exception supplying function if a
1296 * subtask {@linkplain Subtask.State#FAILED failed}. If any subtask failed with
1297 * an exception then the function is invoked with the exception of the first
1298 * subtask to fail. The exception returned by the function is thrown. This method
1299 * does nothing if no subtasks failed.
1300 *
1301 * @param esf the exception supplying function
1302 * @param <X> type of the exception to be thrown
1303 *
1304 * @throws X produced by the exception supplying function
1305 * @throws WrongThreadException if the current thread is not the task scope owner
1306 * @throws IllegalStateException if the task scope owner did not join after forking
1307 */
1308 public <X extends Throwable>
1309 void throwIfFailed(Function<Throwable, ? extends X> esf) throws X {
1310 ensureOwnerAndJoined();
1311 Objects.requireNonNull(esf);
1312 Throwable exception = firstException;
1313 if (exception != null) {
1314 X ex = esf.apply(exception);
1315 Objects.requireNonNull(ex, "esf returned null");
1316 throw ex;
1317 }
1318 }
1319 }
1320 }
|
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package java.util.concurrent;
26
27 import java.lang.invoke.MethodHandles;
28 import java.lang.invoke.VarHandle;
29 import java.security.AccessController;
30 import java.security.PrivilegedAction;
31 import java.time.Duration;
32 import java.util.ArrayList;
33 import java.util.List;
34 import java.util.NoSuchElementException;
35 import java.util.Objects;
36 import java.util.function.Function;
37 import java.util.function.Predicate;
38 import java.util.function.Supplier;
39 import java.util.stream.Stream;
40 import jdk.internal.javac.PreviewFeature;
41 import jdk.internal.misc.InnocuousThread;
42 import jdk.internal.misc.ThreadFlock;
43 import jdk.internal.invoke.MhUtil;
44
45 /**
46 * An API for <em>structured concurrency</em>. {@code StructuredTaskScope} supports cases
47 * where a main task splits into several concurrent subtasks, and where the subtasks must
48 * complete before the main task continues. A {@code StructuredTaskScope} can be used to
49 * ensure that the lifetime of a concurrent operation is confined by a <em>syntax block</em>,
50 * just like that of a sequential operation in structured programming.
51 *
52 * <p> {@code StructuredTaskScope} defines the static method {@link #open() open} to open
53 * a new {@code StructuredTaskScope} and the {@link #close() close} method to close it.
54 * The API is designed to be used with the {@code try-with-resources} statement where
55 * the {@code StructuredTaskScope} is opened as a resource and then closed automatically.
56 * The code in the block uses the {@link #fork(Callable) fork} method to fork subtasks.
57 * After forking, it uses the {@link #join() join} method to wait for all subtasks to
58 * finish (or some other outcome) as a single operation. Forking a subtask starts a new
59 * {@link Thread} to run the subtask. The thread executing the main task does not continue
60 * beyond the {@code close} method until all threads started to execute subtasks have finished.
61 * To ensure correct usage, the {@code fork}, {@code join} and {@code close} methods may
62 * only be invoked by the <em>owner thread</em> (the thread that opened the {@code
63 * StructuredTaskScope}), the {@code fork} method may not be called after {@code join},
64 * the {@code join} method may only be invoked once, and the {@code close} method throws
65 * an exception after closing if the owner did not invoke the {@code join} method after
66 * forking subtasks.
67 *
68 * <p> As a first example, consider a main task that splits into two subtasks to concurrently
69 * fetch resources from two URL locations "left" and "right". Both subtasks may complete
70 * successfully, one subtask may succeed and the other may fail, or both subtasks may
71 * fail. The main task in this example is interested in the successful result from both
72 * subtasks. It waits in the {@link #join() join} method for both subtasks to complete
73 * successfully or for either subtask to fail.
74 * {@snippet lang=java :
75 * // @link substring="open" target="#open()" :
76 * try (var scope = StructuredTaskScope.open()) {
77 *
78 * // @link substring="fork" target="#fork(Callable)" :
79 * Subtask<String> subtask1 = scope.fork(() -> query(left));
80 * Subtask<Integer> subtask2 = scope.fork(() -> query(right));
81 *
82 * // throws if either subtask fails
83 * scope.join(); // @link substring="join" target="#join()"
84 *
85 * // both subtasks completed successfully
86 * // @link substring="get" target="Subtask#get()" :
87 * return new MyResult(subtask1.get(), subtask2.get());
88 *
89 * // @link substring="close" target="#close()" :
90 * } // close
91 * }
92 *
93 * <p> If both subtasks complete successfully then the {@code join} method completes
94 * normally and the main task uses the {@link Subtask#get() Subtask.get()} method to get
95 * the result of each subtask. If one of the subtasks fails then the other subtask is
96 * cancelled (this will interrupt the thread executing the other subtask) and the {@code
97 * join} method throws {@link FailedException} with the exception from the failed subtask
98 * as the {@linkplain Throwable#getCause() cause}.
99 *
100 * <p> In the example, the subtasks produce results of different types ({@code String} and
101 * {@code Integer}). In other cases the subtasks may all produce results of the same type.
102 * If the example had used {@code StructuredTaskScope.<String>open()} then it could
103 * only be used to fork subtasks that return a {@code String} result.
104 *
105 * <h2>Joiners</h2>
106 *
107 * <p> In the example above, the main task fails if any subtask fails. If all subtasks
108 * succeed then the {@code join} method completes normally. Other policy and outcome is
109 * supported by creating a {@code StructuredTaskScope} with a {@link Joiner} that
110 * implements the desired policy. A {@code Joiner} handles subtask completion and produces
111 * the outcome for the {@link #join() join} method. In the example above, {@code join}
112 * returns {@code null}. Depending on the {@code Joiner}, {@code join} may return a
113 * result, a stream of elements, or some other object. The {@code Joiner} interface defines
114 * factory methods to create {@code Joiner}s for some common cases.
115 *
116 * <p> A {@code Joiner} may <a id="CancelExecution"><em>cancel execution</em></a> (sometimes
117 * called "short-circuiting") when some condition is reached that does not require the
118 * result of subtasks that are still executing. Cancelling execution prevents new threads
119 * from being started to execute further subtasks, {@linkplain Thread#interrupt() interrupts}
120 * the threads executing subtasks that have not completed, and causes the {@code join}
121 * method to wakeup with the outcome (result or exception). In the above example, the
122 * outcome is that {@code join} completes with a result of {@code null} when all subtasks
123 * succeed. It cancels execution if any of the subtasks fail, throwing the exception from
124 * the first subtask that fails. Other {@code Joiner} implementations may return an object
125 * instead of {@code null} and may cancel execution or throw based on some other policy.
126 *
127 * <p> To allow for cancelling execution, subtasks must be coded so that they
128 * finish as soon as possible when interrupted. Subtasks that do not respond to interrupt,
129 * e.g. block on methods that are not interruptible, may delay the closing of a task scope
130 * indefinitely. The {@link #close() close} method always waits for threads executing
131 * subtasks to finish, even if execution is cancelled, so execution cannot continue beyond
132 * the {@code close} method until the interrupted threads finish.
133 *
134 * <p> Now consider another example that also splits into two subtasks. In this example,
135 * each subtask produces a {@code String} result and the main task is only interested in
136 * the result from the first subtask to complete successfully. The example uses {@link
137 * Joiner#anySuccessfulResultOrThrow() Joiner.anySuccessfulResultOrThrow()} to
138 * create a {@code Joiner} that makes available the result of the first subtask to
139 * complete successfully. The type parameter in the example is "{@code String}" so that
140 * only subtasks that return a {@code String} can be forked.
141 * {@snippet lang=java :
142 * // @link substring="open" target="#open(Policy)" :
143 * try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow())) {
144 *
145 * scope.fork(callable1);
146 * scope.fork(callable2);
147 *
148 * // throws if both subtasks fail
149 * String firstResult = scope.join();
150 *
151 * }
152 * }
153 *
154 * <p> In the example, the main task forks the two subtasks, then waits in the {@code
155 * join} method for either subtask to complete successfully or for both subtasks to fail.
156 * If one of the subtasks completes successfully then the {@code Joiner} causes the other
157 * subtask to be cancelled (this will interrupt the thread executing the subtask), and
158 * the {@code join} method returns the result from the successful subtask. Cancelling the
159 * other subtask avoids the main task waiting for a result that it doesn't care about. If
160 * both subtasks fail then the {@code join} method throws {@link FailedException} with the
161 * exception from one of the subtasks as the {@linkplain Throwable#getCause() cause}.
162 *
163 * <p> Whether code uses the {@code Subtask} returned from {@code fork} will depend on
164 * the {@code Joiner} and usage. Some {@code Joiner} implementations are suited to subtasks
165 * that return results of the same type and where the {@code join} method returns a result
166 * for the main task to use. Code that forks subtasks that return results of different
167 * types, and uses a {@code Joiner} such as {@code Joiner.awaitAllSuccessfulOrThrow()} that
168 * does not return a result, will use {@link Subtask#get() Subtask.get()} after joining.
169 *
170 * <h2>Exception handling</h2>
171 *
172 * <p> A {@code StructuredTaskScope} is opened with a {@link Joiner Joiner} that
173 * handles subtask completion and produces the outcome for the {@link #join() join} method.
174 * In some cases, the outcome will be a result, in other cases it will be an exception.
175 * If the outcome is an exception then the {@code join} method throws {@link
176 * FailedException} with the exception as the {@linkplain Throwable#getCause()
177 * cause}. For many {@code Joiner} implementations, the exception will be an exception
178 * thrown by a subtask that failed. In the case of {@link Joiner#allSuccessfulOrThrow()
179 * allSuccessfulOrThrow} and {@link Joiner#awaitAllSuccessfulOrThrow() awaitAllSuccessfulOrThrow}
180 * for example, the exception is from the first subtask to fail.
181 *
182 * <p> Many of the details for how exceptions are handled will depend on usage. In some
183 * cases it may be useful to add a {@code catch} block to catch {@code FailedException}.
184 * The exception handling may use {@code instanceof} with pattern matching to handle
185 * specific causes.
186 * {@snippet lang=java :
187 * try (var scope = StructuredTaskScope.open()) {
188 *
189 * ..
190 *
191 * } catch (StructuredTaskScope.FailedException e) {
192 *
193 * Throwable cause = e.getCause();
194 * switch (cause) {
195 * case IOException ioe -> ..
196 * default -> ..
197 * }
198 *
199 * }
200 * }
201 * In other cases it may not be useful to catch {@code FailedException} but instead leave
202 * it to propagate to the configured {@linkplain Thread.UncaughtExceptionHandler uncaught
203 * exception handler} for logging purposes.
204 *
205 * <p> For cases where a specific exception triggers the use of a default result then it
206 * may be more appropriate to handle this in the subtask itself rather than the subtask
207 * failing and code in the main task handling the exception.
208 *
209 * <h2>Configuration</h2>
210 *
211 * A {@code StructuredTaskScope} is opened with {@linkplain Config configuration} that
212 * consists of a {@link ThreadFactory} to create threads, an optional name for monitoring
213 * and management purposes, and an optional timeout.
214 *
215 * <p> The {@link #open()} and {@link #open(Joiner)} methods create a {@code StructuredTaskScope}
216 * with the <a id="DefaultConfiguration"> <em>default configuration</em></a>. The default
217 * configuration has a {@code ThreadFactory} that creates unnamed
218 * <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>,
219 * is unnamed for monitoring and management purposes, and has no timeout.
220 *
221 * <p> The 2-arg {@link #open(Joiner, Function) open} method can be used to create a
222 * {@code StructuredTaskScope} that uses a different {@code ThreadFactory}, has a name for
223 * the purposes of monitoring and management, or has a timeout that cancels execution if
224 * the timeout expires before or while waiting for subtasks to complete. The {@code open}
225 * method is called with a {@linkplain Function function} that is applied to the default
226 * configuration and returns a {@link Config Config} for the {@code StructuredTaskScope}
227 * under construction.
228 *
229 * <p> The following example opens a new {@code StructuredTaskScope} with a {@code
230 * ThreadFactory} that creates virtual threads {@linkplain Thread#setName(String) named}
231 * "duke-0", "duke-1" ...
232 * {@snippet lang = java:
233 * // @link substring="name" target="Thread.Builder#name(String, long)" :
234 * ThreadFactory factory = Thread.ofVirtual().name("duke-", 0).factory();
235 *
236 * // @link substring="withThreadFactory" target="Config#withThreadFactory(ThreadFactory)" :
237 * try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) {
238 *
239 * scope.fork( .. ); // runs in a virtual thread with name "duke-0"
240 * scope.fork( .. ); // runs in a virtual thread with name "duke-1"
241 *
242 * scope.join();
243 *
244 * }
245 *}
246 *
247 * <p> A second example sets a timeout, represented by a {@link Duration}. The timeout
248 * starts when the new task scope is opened. If the timeout expires before the {@code join}
249 * method has completed then <a href="#CancelExecution">execution is cancelled</a>. This
250 * interrupts the threads executing the two subtasks and causes the {@link #join() join}
251 * method to throw {@link TimeoutException}.
252 * {@snippet lang=java :
253 * Duration timeout = Duration.ofSeconds(10);
254 *
255 * // @link substring="allSuccessfulOrThrow" target="Joiner#allSuccessfulOrThrow()" :
256 * try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
257 * // @link substring="withTimeout" target="Config#withTimeout(Duration)" :
258 * cf -> cf.withTimeout(timeout))) {
259 *
260 * scope.fork(callable1);
261 * scope.fork(callable2);
262 *
263 * List<String> result = scope.join()
264 * .map(Subtask::get)
265 * .toList();
266 *
267 * }
268 * }
269 *
270 * <h2>Inheritance of scoped value bindings</h2>
271 *
272 * {@link ScopedValue} supports the execution of a method with a {@code ScopedValue} bound
273 * to a value for the bounded period of execution of the method by the <em>current thread</em>.
274 * It allows a value to be safely and efficiently shared to methods without using method
275 * parameters.
276 *
277 * <p> When used in conjunction with a {@code StructuredTaskScope}, a {@code ScopedValue}
278 * can also safely and efficiently share a value to methods executed by subtasks forked
279 * in the task scope. When a {@code ScopedValue} object is bound to a value in the thread
280 * executing the main task then that binding is inherited by the threads created to
281 * execute the subtasks. The thread executing the main task does not continue beyond the
282 * {@link #close() close} method until all threads executing the subtasks have finished.
283 * This ensures that the {@code ScopedValue} is not reverted to being {@linkplain
284 * ScopedValue#isBound() unbound} (or its previous value) while subtasks are executing.
285 * In addition to providing a safe and efficient means to inherit a value into subtasks,
286 * the inheritance allows sequential code using {@code ScopedValue} be refactored to use
287 * structured concurrency.
288 *
289 * <p> To ensure correctness, opening a new {@code StructuredTaskScope} captures the
290 * current thread's scoped value bindings. These are the scoped values bindings that are
291 * inherited by the threads created to execute subtasks in the task scope. Forking a
292 * subtask checks that the bindings in effect at the time that the subtask is forked
293 * match the bindings when the {@code StructuredTaskScope} was created. This check ensures
294 * that a subtask does not inherit a binding that is reverted in the main task before the
295 * subtask has completed.
296 *
297 * <p> A {@code ScopedValue} that is shared across threads requires that the value be an
298 * immutable object or for all access to the value to be appropriately synchronized.
299 *
300 * <p> The following example demonstrates the inheritance of scoped value bindings. The
301 * scoped value USERNAME is bound to the value "duke" for the bounded period of a lambda
302 * expression by the thread executing it. The code in the block opens a {@code
303 * StructuredTaskScope} and forks two subtasks, it then waits in the {@code join} method
304 * and aggregates the results from both subtasks. If code executed by the threads
305 * running subtask1 and subtask2 uses {@link ScopedValue#get()}, to get the value of
306 * USERNAME, then value "duke" will be returned.
307 * {@snippet lang=java :
308 * // @link substring="newInstance" target="ScopedValue#newInstance()" :
309 * private static final ScopedValue<String> USERNAME = ScopedValue.newInstance();
310 *
311 * // @link substring="callWhere" target="ScopedValue#where" :
312 * MyResult result = ScopedValue.where(USERNAME, "duke").call(() -> {
313 *
314 * try (var scope = StructuredTaskScope.open()) {
315 *
316 * Subtask<String> subtask1 = scope.fork( .. ); // inherits binding
317 * Subtask<Integer> subtask2 = scope.fork( .. ); // inherits binding
318 *
319 * scope.join();
320 * return new MyResult(subtask1.get(), subtask2.get());
321 * }
322 *
323 * });
324 * }
325 *
326 * <p> A scoped value inherited into a subtask may be
327 * <a href="{@docRoot}/java.base/java/lang/ScopedValues.html#rebind">rebound</a> to a new
328 * value in the subtask for the bounded execution of some method executed in the subtask.
329 * When the method completes, the value of the {@code ScopedValue} reverts to its previous
330 * value, the value inherited from the thread executing the main task.
331 *
332 * <p> A subtask may execute code that itself opens a new {@code StructuredTaskScope}.
333 * A main task executing in thread T1 opens a {@code StructuredTaskScope} and forks a
334 * subtask that runs in thread T2. The scoped value bindings captured when T1 opens the
335 * task scope are inherited into T2. The subtask (in thread T2) executes code that opens a
336 * new {@code StructuredTaskScope} and forks a subtask that runs in thread T3. The scoped
337 * value bindings captured when T2 opens the task scope are inherited into T3. These
338 * include (or may be the same) as the bindings that were inherited from T1. In effect,
339 * scoped values are inherited into a tree of subtasks, not just one level of subtask.
340 *
341 * <h2>Memory consistency effects</h2>
342 *
343 * <p> Actions in the owner thread of a {@code StructuredTaskScope} prior to
344 * {@linkplain #fork forking} of a subtask
345 * <a href="{@docRoot}/java.base/java/util/concurrent/package-summary.html#MemoryVisibility">
346 * <i>happen-before</i></a> any actions taken by that subtask, which in turn
347 * <i>happen-before</i> the subtask result is {@linkplain Subtask#get() retrieved}.
348 *
349 * <h2>General exceptions</h2>
350 *
351 * <p> Unless otherwise specified, passing a {@code null} argument to a method in this
352 * class will cause a {@link NullPointerException} to be thrown.
353 *
354 * @param <T> the result type of tasks executed in the task scope
355 * @param <R> the type of the result returned by the join method
356 *
357 * @jls 17.4.5 Happens-before Order
358 * @since 21
359 */
360 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
361 public class StructuredTaskScope<T, R> implements AutoCloseable {
362 private static final VarHandle CANCELLED;
363 static {
364 MethodHandles.Lookup l = MethodHandles.lookup();
365 CANCELLED = MhUtil.findVarHandle(l, "cancelled", boolean.class);
366 }
367
368 private final Joiner<? super T, ? extends R> joiner;
369 private final ThreadFactory threadFactory;
370 private final ThreadFlock flock;
371
372 // state, only accessed by owner thread
373 private static final int ST_NEW = 0;
374 private static final int ST_FORKED = 1; // subtasks forked, need to join
375 private static final int ST_JOIN_STARTED = 2; // join started, can no longer fork
376 private static final int ST_JOIN_COMPLETED = 3; // join completed
377 private static final int ST_CLOSED = 4; // closed
378 private int state;
379
380 // timer task, only accessed by owner thread
381 private Future<?> timerTask;
382
383 // set or read by any thread
384 private volatile boolean cancelled;
385
386 // set by the timer thread, read by the owner thread
387 private volatile boolean timeoutExpired;
388
389 /**
390 * Throws WrongThreadException if the current thread is not the owner thread.
391 */
392 private void ensureOwner() {
393 if (Thread.currentThread() != flock.owner()) {
394 throw new WrongThreadException("Current thread not owner");
395 }
396 }
397
398 /**
399 * Throws IllegalStateException if already joined or task scope is closed.
400 */
401 private void ensureNotJoined() {
402 assert Thread.currentThread() == flock.owner();
403 if (state > ST_FORKED) {
404 throw new IllegalStateException("Already joined or task scope is closed");
405 }
406 }
407
408 /**
409 * Throws IllegalStateException if invoked by the owner thread and the owner thread
410 * has not joined.
411 */
412 private void ensureJoinedIfOwner() {
413 if (Thread.currentThread() == flock.owner() && state <= ST_JOIN_STARTED) {
414 throw new IllegalStateException("join not called");
415 }
416 }
417
418 /**
419 * Interrupts all threads in this task scope, except the current thread.
420 */
421 private void implInterruptAll() {
422 flock.threads()
423 .filter(t -> t != Thread.currentThread())
424 .forEach(t -> {
425 try {
426 t.interrupt();
427 } catch (Throwable ignore) { }
428 });
429 }
430
431 @SuppressWarnings("removal")
432 private void interruptAll() {
433 if (System.getSecurityManager() == null) {
434 implInterruptAll();
435 } else {
436 PrivilegedAction<Void> pa = () -> {
437 implInterruptAll();
438 return null;
439 };
440 AccessController.doPrivileged(pa);
441 }
442 }
443
444 /**
445 * Cancel exception if not already cancelled.
446 */
447 private void cancelExecution() {
448 if (!cancelled && CANCELLED.compareAndSet(this, false, true)) {
449 // prevent new threads from starting
450 flock.shutdown();
451
452 // interrupt all unfinished threads
453 interruptAll();
454
455 // wakeup join
456 flock.wakeup();
457 }
458 }
459
460 /**
461 * Schedules a task to cancel execution on timeout.
462 */
463 private void scheduleTimeout(Duration timeout) {
464 assert Thread.currentThread() == flock.owner() && timerTask == null;
465 timerTask = TimerSupport.schedule(timeout, () -> {
466 if (!cancelled) {
467 timeoutExpired = true;
468 cancelExecution();
469 }
470 });
471 }
472
473 /**
474 * Cancels the timer task if set.
475 */
476 private void cancelTimeout() {
477 assert Thread.currentThread() == flock.owner();
478 if (timerTask != null) {
479 timerTask.cancel(false);
480 }
481 }
482
483 /**
484 * Invoked by the thread for a subtask when the subtask completes before execution
485 * was cancelled.
486 */
487 private void onComplete(SubtaskImpl<? extends T> subtask) {
488 assert subtask.state() != Subtask.State.UNAVAILABLE;
489 if (joiner.onComplete(subtask)) {
490 cancelExecution();
491 }
492 }
493
494 /**
495 * Initialize a new StructuredTaskScope.
496 */
497 @SuppressWarnings("this-escape")
498 private StructuredTaskScope(Joiner<? super T, ? extends R> joiner,
499 ThreadFactory threadFactory,
500 String name) {
501 this.joiner = joiner;
502 this.threadFactory = threadFactory;
503
504 if (name == null)
505 name = Objects.toIdentityString(this);
506 this.flock = ThreadFlock.open(name);
507 }
508
509 /**
510 * Represents a subtask forked with {@link #fork(Callable)} or {@link #fork(Runnable)}.
511 *
512 * <p> Code that forks subtasks can use the {@link #get() get()} method after {@linkplain
513 * #join() joining} to obtain the result of a subtask that completed successfully. It
514 * can use the {@link #exception()} method to obtain the exception thrown by a subtask
515 * that failed.
516 *
517 * @param <T> the result type
518 * @since 21
519 */
520 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
521 public sealed interface Subtask<T> extends Supplier<T> permits SubtaskImpl {
522 /**
523 * Represents the state of a subtask.
524 * @see Subtask#state()
525 * @since 21
526 */
527 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
528 enum State {
529 /**
530 * The subtask result or exception is not available. This state indicates that
531 * the subtask was forked but has not completed, it completed after execution
532 * was cancelled, or it was forked after execution was cancelled (in which
533 * case a thread was not created to execute the subtask).
534 */
535 UNAVAILABLE,
536 /**
537 * The subtask completed successfully. The {@link Subtask#get() Subtask.get()}
538 * method can be used to get the result. This is a terminal state.
539 */
540 SUCCESS,
541 /**
542 * The subtask failed with an exception. The {@link Subtask#exception()
543 * Subtask.exception()} method can be used to get the exception. This is a
544 * terminal state.
545 */
546 FAILED,
547 }
548
549 /**
550 * {@return the subtask state}
551 */
552 State state();
553
554 /**
555 * Returns the result of this subtask if it completed successfully. If
556 * {@linkplain #fork(Callable) forked} to execute a value-returning task then the
557 * result from the {@link Callable#call() call} method is returned. If
558 * {@linkplain #fork(Runnable) forked} to execute a task that does not return a
559 * result then {@code null} is returned.
560 *
561 * <p> Code executing in the scope owner thread can use this method to get the
562 * result of a successful subtask only after it has {@linkplain #join() joined}.
563 *
564 * <p> Code executing in the {@code Joiner} {@link Joiner#onComplete(Subtask)
565 * onComplete} method should test that the {@linkplain #state() subtask state} is
566 * {@link State#SUCCESS SUCCESS} before using this method to get the result.
567 *
568 * @return the possibly-null result
569 * @throws IllegalStateException if the subtask has not completed, did not complete
570 * successfully, or the current thread is the task scope owner invoking this
571 * method before {@linkplain #join() joining}
572 * @see State#SUCCESS
573 */
574 T get();
575
576 /**
577 * {@return the exception thrown by this subtask if it failed} If
578 * {@linkplain #fork(Callable) forked} to execute a value-returning task then
579 * the exception thrown by the {@link Callable#call() call} method is returned.
580 * If {@linkplain #fork(Runnable) forked} to execute a task that does not return
581 * a result then the exception thrown by the {@link Runnable#run() run} method is
582 * returned.
583 *
584 * <p> Code executing in the scope owner thread can use this method to get the
585 * exception thrown by a failed subtask only after it has {@linkplain #join() joined}.
586 *
587 * <p> Code executing in a {@code Joiner} {@link Joiner#onComplete(Subtask)
588 * onComplete} method should test that the {@linkplain #state() subtask state} is
589 * {@link State#FAILED FAILED} before using this method to get the exception.
590 *
591 * @throws IllegalStateException if the subtask has not completed, completed with
592 * a result, or the current thread is the task scope owner invoking this method
593 * before {@linkplain #join() joining}
594 * @see State#FAILED
595 */
596 Throwable exception();
597 }
598
599 /**
600 * An object used with a {@link StructuredTaskScope} to handle subtask completion
601 * and produce the result for a main task waiting in the {@link #join() join} method
602 * for subtasks to complete.
603 *
604 * <p> Joiner defines static methods to create {@code Joiner} objects for common cases:
605 * <ul>
606 * <li> {@link #allSuccessfulOrThrow() allSuccessfulOrThrow()} creates a {@code Joiner}
607 * that yields a stream of the completed subtasks for {@code join} to return when
608 * all subtasks complete successfully. It cancels execution and causes {@code join}
609 * to throw if any subtask fails.
610 * <li> {@link #anySuccessfulResultOrThrow() anySuccessfulResultOrThrow()} creates a
611 * {@code Joiner} that yields the result of the first subtask to succeed. It cancels
612 * execution and causes {@code join} to throw if all subtasks fail.
613 * <li> {@link #awaitAllSuccessfulOrThrow() awaitAllSuccessfulOrThrow()} creates a
614 * {@code Joiner} that waits for all successful subtasks. It cancels execution and
615 * causes {@code join} to throw if any subtask fails.
616 * <li> {@link #awaitAll() awaitAll()} creates a {@code Joiner} that waits for all
617 * subtasks. It does not cancel execution or cause {@code join} to throw.
618 * </ul>
619 *
620 * <p> In addition to the methods to create {@code Joiner} objects for common cases,
621 * the {@link #allUntil(Predicate) allUntil(Predicate)} method is defined to create a
622 * {@code Joiner} that yields a stream of all subtasks. It is created with a {@link
623 * Predicate Predicate} that determines if execution should continue or be cancelled.
624 * This {@code Joiner} can be built upon to create custom policies that cancel
625 * execution based on some condition.
626 *
627 * <p> More advanced policies can be developed by implementing the {@code Joiner}
628 * interface. The {@link #onFork(Subtask)} method is invoked when subtasks are forked.
629 * The {@link #onComplete(Subtask)} method is invoked when subtasks complete with a
630 * result or exception. These methods return a {@code boolean} to indicate if execution
631 * should be cancelled. These methods can be used to collect subtasks, results, or
632 * exceptions, and control when to cancel execution. The {@link #result()} method
633 * must be implemented to produce the result (or exception) for the {@code join}
634 * method.
635 *
636 * <p> Unless otherwise specified, passing a {@code null} argument to a method
637 * in this class will cause a {@link NullPointerException} to be thrown.
638 *
639 * @implSpec Implementations of this interface must be thread safe. The {@link
640 * #onComplete(Subtask)} method defined by this interface may be invoked by several
641 * threads concurrently.
642 *
643 * @apiNote It is very important that a new {@code Joiner} object is created for each
644 * {@code StructuredTaskScope}. {@code Joiner} objects should never be shared with
645 * different task scopes or re-used after a task is closed.
646 *
647 * <p> Designing a {@code Joiner} should take into account the code at the use-site
648 * where the results from the {@link StructuredTaskScope#join() join} method are
649 * processed. It should be clear what the {@code Joiner} does vs. the application
650 * code at the use-site. In general, the {@code Joiner} implementation is not the
651 * place to code "business logic". A {@code Joiner} should be designed to be as
652 * general purpose as possible.
653 *
654 * @param <T> the result type of tasks executed in the task scope
655 * @param <R> the type of results returned by the join method
656 * @since 24
657 * @see #open(Joiner)
658 */
659 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
660 @FunctionalInterface
661 public interface Joiner<T, R> {
662
663 /**
664 * Invoked by {@link #fork(Callable) fork(Callable)} and {@link #fork(Runnable)
665 * fork(Runnable)} when forking a subtask. The method is invoked from the task
666 * owner thread. The method is invoked before a thread is created to run the
667 * subtask.
668 *
669 * @implSpec The default implementation throws {@code NullPointerException} if the
670 * subtask is {@code null}. It throws {@code IllegalArgumentException} if the
671 * subtask is not in the {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state, it
672 * otherwise returns {@code false}.
673 *
674 * @apiNote This method is invoked by the {@code fork} methods. It should not be
675 * invoked directly.
676 *
677 * @param subtask the subtask
678 * @return {@code true} to cancel execution
679 */
680 default boolean onFork(Subtask<? extends T> subtask) {
681 if (subtask.state() != Subtask.State.UNAVAILABLE) {
682 throw new IllegalArgumentException();
683 }
684 return false;
685 }
686
687 /**
688 * Invoked by the thread started to execute a subtask after the subtask completes
689 * successfully or fails with an exception. This method is not invoked if a
690 * subtask completes after execution has been cancelled.
691 *
692 * @implSpec The default implementation throws {@code NullPointerException} if the
693 * subtask is {@code null}. It throws {@code IllegalArgumentException} if the
694 * subtask is not in the {@link Subtask.State#SUCCESS SUCCESS} or {@link
695 * Subtask.State#FAILED FAILED} state, it otherwise returns {@code false}.
696 *
697 * @apiNote This method is invoked by subtasks when they complete. It should not
698 * be invoked directly.
699 *
700 * @param subtask the subtask
701 * @return {@code true} to cancel execution
702 */
703 default boolean onComplete(Subtask<? extends T> subtask) {
704 if (subtask.state() == Subtask.State.UNAVAILABLE) {
705 throw new IllegalArgumentException();
706 }
707 return false;
708 }
709
710 /**
711 * Invoked by {@link #join()} to produce the result (or exception) after waiting
712 * for all subtasks to complete or execution to be cancelled. The result from this
713 * method is returned by the {@code join} method. If this method throws, then
714 * {@code join} throws {@link FailedException} with the exception thrown by
715 * this method as the cause.
716 *
717 * <p> In normal usage, this method will be called at most once by the {@code join}
718 * method to produce the result (or exception). The behavior of this method when
719 * invoked directly, and invoked more than once, is not specified. Where possible,
720 * an implementation should return an equal result (or throw the same exception)
721 * on second or subsequent calls to produce the outcome.
722 *
723 * @apiNote This method is invoked by the {@code join} method. It should not be
724 * invoked directly.
725 *
726 * @return the result
727 * @throws Throwable the exception
728 */
729 R result() throws Throwable;
730
731 /**
732 * {@return a new Joiner object that yields a stream of all subtasks when all
733 * subtasks complete successfully}
734 * This method throws, and <a href="StructuredTaskScope.html#CancelExecution">
735 * execution is cancelled</a>, if any subtask fails.
736 *
737 * <p> If all subtasks complete successfully, the joiner's {@link Joiner#result()}
738 * method returns a stream of all subtasks in the order that they were forked.
739 * If any subtask failed then the {@code result} method throws the exception from
740 * the first subtask to fail.
741 *
742 * @apiNote Joiners returned by this method are suited to cases where all subtasks
743 * return a result of the same type. Joiners returned by {@link
744 * #awaitAllSuccessfulOrThrow()} are suited to cases where the subtasks return
745 * results of different types.
746 *
747 * @param <T> the result type of subtasks
748 */
749 static <T> Joiner<T, Stream<Subtask<T>>> allSuccessfulOrThrow() {
750 return new AllSuccessful<>();
751 }
752
753 /**
754 * {@return a new Joiner object that yields the result of any subtask that
755 * completed successfully}
756 * This method throws, and <a href="StructuredTaskScope.html#CancelExecution">
757 * execution is cancelled</a>, if all subtasks fail.
758 *
759 * <p> The joiner's {@link Joiner#result()} method returns the result of a subtask
760 * that completed successfully. If all subtasks fail then the {@code result} method
761 * throws the exception from one of the failed subtasks. The {@code result} method
762 * throws {@code NoSuchElementException} if no subtasks were forked.
763 *
764 * @param <T> the result type of subtasks
765 */
766 static <T> Joiner<T, T> anySuccessfulResultOrThrow() {
767 return new AnySuccessful<>();
768 }
769
770 /**
771 * {@return a new Joiner object that waits for subtasks to complete successfully}
772 * This method throws, and <a href="StructuredTaskScope.html#CancelExecution">
773 * execution is cancelled</a>, if any subtask fails.
774 *
775 * <p> The joiner's {@link Joiner#result() result} method returns {@code null}
776 * if all subtasks complete successfully, or throws the exception from the first
777 * subtask to fail.
778 *
779 * @apiNote Joiners returned by this method are suited to cases where subtasks
780 * return results of different types. Joiners returned by {@link #allSuccessfulOrThrow()}
781 * are suited to cases where the subtasks return a result of the same type.
782 *
783 * @param <T> the result type of subtasks
784 */
785 static <T> Joiner<T, Void> awaitAllSuccessfulOrThrow() {
786 return new AwaitSuccessful<>();
787 }
788
789 /**
790 * {@return a new Joiner object that waits for all subtasks to complete}
791 * This method does not cancel execution if a subtask fails.
792 *
793 * <p> The joiner's {@link Joiner#result() result} method returns {@code null}.
794 *
795 * @apiNote This Joiner can be useful for cases where subtasks make use of
796 * <em>side-effects</em> rather than return results or fail with exceptions.
797 * The {@link #fork(Runnable) fork(Runnable)} method can be used to fork subtasks
798 * that do not return a result.
799 *
800 * @param <T> the result type of subtasks
801 */
802 static <T> Joiner<T, Void> awaitAll() {
803 // ensure that new Joiner object is returned
804 return new Joiner<T, Void>() {
805 @Override
806 public Void result() {
807 return null;
808 }
809 };
810 }
811
812 /**
813 * {@return a new Joiner object that yields a stream of all subtasks when all
814 * subtasks complete or <a href="StructuredTaskScope.html#CancelExecution">
815 * execution is cancelled</a> by a predicate}
816 *
817 * <p> The joiner's {@link Joiner#onComplete(Subtask)} method invokes the
818 * predicate's {@link Predicate#test(Object) test} method with the subtask that
819 * completed successfully or failed with an exception. If the {@code test} method
820 * returns {@code true} then <a href="StructuredTaskScope.html#CancelExecution">
821 * execution is cancelled</a>. The {@code test} method must be thread safe as it
822 * may be invoked concurrently from several threads.
823 *
824 * <p> The joiner's {@link #result()} method returns the stream of all subtasks,
825 * in fork order. The stream may contain subtasks that have completed
826 * (in {@link Subtask.State#SUCCESS SUCCESS} or {@link Subtask.State#FAILED FAILED}
827 * state) or subtasks in the {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state
828 * if execution was cancelled before all subtasks were forked or completed.
829 *
830 * <p> The following example uses this method to create a {@code Joiner} that
831 * <a href="StructuredTaskScope.html#CancelExecution">cancels execution</a> when
832 * two or more subtasks fail.
833 * {@snippet lang=java :
834 * class CancelAfterTwoFailures<T> implements Predicate<Subtask<? extends T>> {
835 * private final AtomicInteger failedCount = new AtomicInteger();
836 * @Override
837 * public boolean test(Subtask<? extends T> subtask) {
838 * return subtask.state() == Subtask.State.FAILED
839 * && failedCount.incrementAndGet() >= 2;
840 * }
841 * }
842 *
843 * var joiner = Joiner.all(new CancelAfterTwoFailures<String>());
844 * }
845 *
846 * @param isDone the predicate to evaluate completed subtasks
847 * @param <T> the result type of subtasks
848 */
849 static <T> Joiner<T, Stream<Subtask<T>>> allUntil(Predicate<Subtask<? extends T>> isDone) {
850 return new AllSubtasks<>(isDone);
851 }
852 }
853
854 /**
855 * Represents the configuration for a {@code StructuredTaskScope}.
856 *
857 * <p> The configuration for a {@code StructuredTaskScope} consists of a {@link
858 * ThreadFactory} to create threads, an optional name for the purposes of monitoring
859 * and management, and an optional timeout.
860 *
861 * <p> Creating a {@code StructuredTaskScope} with {@link #open()} or {@link #open(Joiner)}
862 * uses the <a href="StructuredTaskScope.html#DefaultConfiguration">default
863 * configuration</a>. The default configuration consists of a thread factory that
864 * creates unnamed <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">
865 * virtual threads</a>, no name for monitoring and management purposes, and no timeout.
866 *
867 * <p> Creating a {@code StructuredTaskScope} with its 2-arg {@link #open(Joiner, Function)
868 * open} method allows a different configuration to be used. The function specified
869 * to the {@code open} method is applied to the default configuration and returns the
870 * configuration for the {@code StructuredTaskScope} under construction. The function
871 * can use the {@code with-} prefixed methods defined here to specify the components
872 * of the configuration to use.
873 *
874 * <p> Unless otherwise specified, passing a {@code null} argument to a method
875 * in this class will cause a {@link NullPointerException} to be thrown.
876 *
877 * @since 24
878 */
879 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
880 public sealed interface Config permits ConfigImpl {
881 /**
882 * {@return a new {@code Config} object with the given thread factory}
883 * The other components are the same as this object. The thread factory is used by
884 * a task scope to create threads when {@linkplain #fork(Callable) forking} subtasks.
885 * @param threadFactory the thread factory
886 *
887 * @apiNote The thread factory will typically create
888 * <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>,
889 * maybe with names for monitoring purposes, an {@linkplain Thread.UncaughtExceptionHandler
890 * uncaught exception handler}, or other properties configured.
891 *
892 * @see #fork(Callable)
893 */
894 Config withThreadFactory(ThreadFactory threadFactory);
895
896 /**
897 * {@return a new {@code Config} object with the given name}
898 * The other components are the same as this object. A task scope is optionally
899 * named for the purposes of monitoring and management.
900 * @param name the name
901 * @see StructuredTaskScope#toString()
902 */
903 Config withName(String name);
904
905 /**
906 * {@return a new {@code Config} object with the given timeout}
907 * The other components are the same as this object.
908 * @param timeout the timeout
909 *
910 * @apiNote Applications using deadlines, expressed as an {@link java.time.Instant},
911 * can use {@link Duration#between Duration.between(Instant.now(), deadline)} to
912 * compute the timeout for this method.
913 *
914 * @see #join()
915 */
916 Config withTimeout(Duration timeout);
917 }
918
919 /**
920 * Exception thrown by {@link #join()} when the outcome is an exception rather than a
921 * result.
922 *
923 * @since 24
924 */
925 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
926 public static class FailedException extends RuntimeException {
927 @java.io.Serial
928 static final long serialVersionUID = -1533055100078459923L;
929
930 /**
931 * Constructs a {@code FailedException} with the specified cause.
932 *
933 * @param cause the cause, can be {@code null}
934 */
935 public FailedException(Throwable cause) {
936 super(cause);
937 }
938 }
939
940 /**
941 * Exception thrown by {@link #join()} if the task scope was created a timeout and
942 * the timeout expired before or while waiting in {@code join}.
943 *
944 * @since 24
945 * @see Config#withTimeout(Duration)
946 */
947 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
948 public static class TimeoutException extends RuntimeException {
949 @java.io.Serial
950 static final long serialVersionUID = 705788143955048766L;
951
952 /**
953 * Constructs a {@code TimeoutException} with no detail message.
954 */
955 public TimeoutException() { }
956 }
957
958 /**
959 * Opens a new structured task scope to use the given {@code Joiner} object and with
960 * configuration that is the result of applying the given function to the
961 * <a href="#DefaultConfiguration">default configuration</a>.
962 *
963 * <p> The {@code configFunction} is called with the default configuration and returns
964 * the configuration for the new structured task scope. The function may, for example,
965 * set the {@linkplain Config#withThreadFactory(ThreadFactory) ThreadFactory} or set
966 * a {@linkplain Config#withTimeout(Duration) timeout}.
967 *
968 * <p> If a {@code ThreadFactory} is set then its {@link ThreadFactory#newThread(Runnable)
969 * newThread} method will be called to create threads when {@linkplain #fork(Callable)
970 * forking} subtasks in this task scope. If a {@code ThreadFactory} is not set then
971 * forking subtasks will create an unnamed virtual thread for each subtask.
972 *
973 * <p> If a {@linkplain Config#withTimeout(Duration) timeout} is set then it starts
974 * when the task scope is opened. If the timeout expires before the task scope has
975 * {@linkplain #join() joined} then execution is cancelled and the {@code join} method
976 * throws {@link TimeoutException}.
977 *
978 * <p> The new task scope is owned by the current thread. Only code executing in this
979 * thread can {@linkplain #fork(Callable) fork}, {@linkplain #join() join}, or
980 * {@linkplain #close close} the task scope.
981 *
982 * <p> Construction captures the current thread's {@linkplain ScopedValue scoped
983 * value} bindings for inheritance by threads started in the task scope.
984 *
985 * @param joiner the joiner
986 * @param configFunction a function to produce the configuration
987 * @return a new task scope
988 * @param <T> the result type of tasks executed in the task scope
989 * @param <R> the type of the result returned by the join method
990 * @since 24
991 */
992 public static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner,
993 Function<Config, Config> configFunction) {
994 Objects.requireNonNull(joiner);
995
996 var config = (ConfigImpl) configFunction.apply(ConfigImpl.defaultConfig());
997 var scope = new StructuredTaskScope<T, R>(joiner, config.threadFactory(), config.name());
998
999 // schedule timeout
1000 Duration timeout = config.timeout();
1001 if (timeout != null) {
1002 boolean scheduled = false;
1003 try {
1004 scope.scheduleTimeout(timeout);
1005 scheduled = true;
1006 } finally {
1007 if (!scheduled) {
1008 scope.close(); // pop if scheduling timeout failed
1009 }
1010 }
1011 }
1012
1013 return scope;
1014 }
1015
1016 /**
1017 * Opens a new structured task scope to use the given {@code Joiner} object. The
1018 * task scope is created with the <a href="#DefaultConfiguration">default configuration</a>.
1019 * The default configuration has a {@code ThreadFactory} that creates unnamed
1020 * <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>,
1021 * is unnamed for monitoring and management purposes, and has no timeout.
1022 *
1023 * @implSpec
1024 * This factory method is equivalent to invoking the 2-arg open method with the given
1025 * joiner and the {@linkplain Function#identity() identity function}.
1026 *
1027 * @param joiner the joiner
1028 * @return a new task scope
1029 * @param <T> the result type of tasks executed in the task scope
1030 * @param <R> the type of the result returned by the join method
1031 * @since 24
1032 */
1033 public static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner) {
1034 return open(joiner, Function.identity());
1035 }
1036
1037 /**
1038 * Opens a new structured task scope that can be used to fork subtasks that return
1039 * results of any type. The {@link #join()} method waits for all subtasks to succeed
1040 * or any subtask to fail.
1041 *
1042 * <p> The {@code join} method returns {@code null} if all subtasks complete successfully.
1043 * It throws {@link FailedException} if any subtask fails, with the exception from
1044 * the first subtask to fail as the cause.
1045 *
1046 * <p> The task scope is created with the <a href="#DefaultConfiguration">default
1047 * configuration</a>. The default configuration has a {@code ThreadFactory} that creates
1048 * unnamed <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual
1049 * threads</a>, is unnamed for monitoring and management purposes, and has no timeout.
1050 *
1051 * @implSpec
1052 * This factory method is equivalent to invoking the 2-arg open method with a joiner
1053 * created with {@link Joiner#awaitAllSuccessfulOrThrow() awaitAllSuccessfulOrThrow()}
1054 * and the {@linkplain Function#identity() identity function}.
1055 *
1056 * @param <T> the result type of subtasks
1057 * @return a new task scope
1058 * @since 24
1059 */
1060 public static <T> StructuredTaskScope<T, Void> open() {
1061 return open(Joiner.awaitAllSuccessfulOrThrow(), Function.identity());
1062 }
1063
1064 /**
1065 * Starts a new thread in this task scope to execute a value-returning task, thus
1066 * creating a <em>subtask</em>. The value-returning task is provided to this method
1067 * as a {@link Callable}, the thread executes the task's {@link Callable#call() call}
1068 * method.
1069 *
1070 * <p> This method first creates a {@link Subtask Subtask} to represent the <em>forked
1071 * subtask</em>. It invokes the joiner's {@link Joiner#onFork(Subtask) onFork} method
1072 * with the {@code Subtask} object. If the {@code onFork} completes with an exception
1073 * or error then it is propagated by the {@code fork} method. If execution is
1074 * {@linkplain #isCancelled() cancelled}, or {@code onFork} returns {@code true} to
1075 * cancel execution, then this method returns the {@code Subtask} (in the {@link
1076 * Subtask.State#UNAVAILABLE UNAVAILABLE} state) without creating a thread to execute
1077 * the subtask. If execution is not cancelled then a thread is created with the
1078 * {@link ThreadFactory} configured when the task scope was created, and the thread is
1079 * started. Forking a subtask inherits the current thread's {@linkplain ScopedValue
1080 * scoped value} bindings. The bindings must match the bindings captured when the
1081 * task scope was opened. If the subtask completes (successfully or with an exception)
1082 * before execution is cancelled, then the thread invokes the joiner's
1083 * {@link Joiner#onComplete(Subtask) onComplete} method with subtask in the
1084 * {@link Subtask.State#SUCCESS SUCCESS} or {@link Subtask.State#FAILED FAILED} state.
1085 *
1086 * <p> This method returns the {@link Subtask Subtask} object. In some usages, this
1087 * object may be used to get its result. In other cases it may be used for correlation
1088 * or just discarded. To ensure correct usage, the {@link Subtask#get() Subtask.get()}
1089 * method may only be called by the task scope owner to get the result after it has
1090 * waited for subtasks to complete with the {@link #join() join} method and the subtask
1091 * completed successfully. Similarly, the {@link Subtask#exception() Subtask.exception()}
1092 * method may only be called by the task scope owner after it has joined and the subtask
1093 * failed. If execution was cancelled before the subtask was forked, or before it
1094 * completes, then neither method can be used to obtain the outcome.
1095 *
1096 * <p> This method may only be invoked by the task scope owner.
1097 *
1098 * @param task the value-returning task for the thread to execute
1099 * @param <U> the result type
1100 * @return the subtask
1101 * @throws WrongThreadException if the current thread is not the task scope owner
1102 * @throws IllegalStateException if the owner has already {@linkplain #join() joined}
1103 * or the task scope is closed
1104 * @throws StructureViolationException if the current scoped value bindings are not
1105 * the same as when the task scope was created
1106 * @throws RejectedExecutionException if the thread factory rejected creating a
1107 * thread to run the subtask
1108 */
1109 public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
1110 Objects.requireNonNull(task);
1111 ensureOwner();
1112 ensureNotJoined();
1113
1114 var subtask = new SubtaskImpl<U>(this, task);
1115
1116 // notify joiner, even if cancelled
1117 if (joiner.onFork(subtask)) {
1118 cancelExecution();
1119 }
1120
1121 if (!cancelled) {
1122 // create thread to run task
1123 Thread thread = threadFactory.newThread(subtask);
1124 if (thread == null) {
1125 throw new RejectedExecutionException("Rejected by thread factory");
1126 }
1127
1128 // attempt to start the thread
1129 try {
1130 flock.start(thread);
1131 } catch (IllegalStateException e) {
1132 // shutdown by another thread, or underlying flock is shutdown due
1133 // to unstructured use
1134 }
1135 }
1136
1137 // force owner to join
1138 state = ST_FORKED;
1139 return subtask;
1140 }
1141
1142 /**
1143 * Starts a new thread in this task scope to execute a task that does not return a
1144 * result, creating a <em>subtask</em>.
1145 *
1146 * <p> This method works exactly the same as {@link #fork(Callable)} except that
1147 * the task is provided to this method as a {@link Runnable}, the thread executes
1148 * the task's {@link Runnable#run() run} method, and its result is {@code null}.
1149 *
1150 * @param task the task for the thread to execute
1151 * @return the subtask
1152 * @throws WrongThreadException if the current thread is not the task scope owner
1153 * @throws IllegalStateException if the owner has already {@linkplain #join() joined}
1154 * or the task scope is closed
1155 * @throws StructureViolationException if the current scoped value bindings are not
1156 * the same as when the task scope was created
1157 * @throws RejectedExecutionException if the thread factory rejected creating a
1158 * thread to run the subtask
1159 * @since 24
1160 */
1161 public Subtask<? extends T> fork(Runnable task) {
1162 Objects.requireNonNull(task);
1163 return fork(() -> { task.run(); return null; });
1164 }
1165
1166 /**
1167 * Waits for all subtasks started in this task scope to complete or execution to be
1168 * cancelled. If a {@linkplain Config#withTimeout(Duration) timeout} has been set
1169 * then execution will be cancelled if the timeout expires before or while waiting.
1170 * Once finished waiting, the {@code Joiner}'s {@link Joiner#result() result} method
1171 * is invoked to get the result or throw an exception. If the {@code result} method
1172 * throws then this method throws {@code FailedException} with the exception thrown
1173 * by the {@code result()} method as the cause.
1174 *
1175 * <p> This method waits for all subtasks by waiting for all threads {@linkplain
1176 * #fork(Callable) started} in this task scope to finish execution. It stops waiting
1177 * when all threads finish, the {@code Joiner}'s {@link Joiner#onFork(Subtask)
1178 * onFork} or {@link Joiner#onComplete(Subtask) onComplete} returns {@code true}
1179 * to cancel execution, the timeout (if set) expires, or the current thread is
1180 * {@linkplain Thread#interrupt() interrupted}.
1181 *
1182 * <p> This method may only be invoked by the task scope owner, and only once.
1183 *
1184 * @return the {@link Joiner#result() result}
1185 * @throws WrongThreadException if the current thread is not the task scope owner
1186 * @throws IllegalStateException if already joined or this task scope is closed
1187 * @throws FailedException if the <i>outcome</i> is an exception, thrown with the
1188 * exception from {@link Joiner#result() Joiner.result()} as the cause
1189 * @throws TimeoutException if a timeout is set and the timeout expires before or
1190 * while waiting
1191 * @throws InterruptedException if interrupted while waiting
1192 */
1193 public R join() throws InterruptedException {
1194 ensureOwner();
1195 ensureNotJoined();
1196
1197 // join started
1198 state = ST_JOIN_STARTED;
1199
1200 // wait for all subtasks, execution to be cancelled, or interrupt
1201 flock.awaitAll();
1202
1203 // throw if timeout expired
1204 if (timeoutExpired) {
1205 throw new TimeoutException();
1206 }
1207 cancelTimeout();
1208
1209 // all subtasks completed or cancelled
1210 state = ST_JOIN_COMPLETED;
1211
1212 // invoke joiner to get result
1213 try {
1214 return joiner.result();
1215 } catch (Throwable e) {
1216 throw new FailedException(e);
1217 }
1218 }
1219
1220 /**
1221 * {@return {@code true} if <a href="#CancelExecution">execution is cancelled</a>,
1222 * or in the process of being cancelled, otherwise {@code false}}
1223 *
1224 * <p> Cancelling execution prevents new threads from starting in the task scope and
1225 * {@linkplain Thread#interrupt() interrupts} threads executing unfinished subtasks.
1226 * It may take some time before the interrupted threads finish execution; this
1227 * method may return {@code true} before all threads have been interrupted or before
1228 * all threads have finished.
1229 *
1230 * @apiNote A main task with a lengthy "forking phase" (the code that executes before
1231 * the main task invokes {@link #join() join}) may use this method to avoid doing work
1232 * in cases where execution was cancelled by the completion of a previously forked
1233 * subtask or timeout.
1234 *
1235 * @since 24
1236 */
1237 public boolean isCancelled() {
1238 return cancelled;
1239 }
1240
1241 /**
1242 * Closes this task scope.
1243 *
1244 * <p> This method first <a href="#CancelExecution">cancels execution</a>, if not
1245 * already cancelled. This interrupts the threads executing unfinished subtasks. This
1246 * method then waits for all threads to finish. If interrupted while waiting then it
1247 * will continue to wait until the threads finish, before completing with the interrupt
1248 * status set.
1249 *
1250 * <p> This method may only be invoked by the task scope owner. If the task scope
1251 * is already closed then the task scope owner invoking this method has no effect.
1252 *
1253 * <p> A {@code StructuredTaskScope} is intended to be used in a <em>structured
1254 * manner</em>. If this method is called to close a task scope before nested task
1255 * scopes are closed then it closes the underlying construct of each nested task scope
1256 * (in the reverse order that they were created in), closes this task scope, and then
1257 * throws {@link StructureViolationException}.
1258 * Similarly, if this method is called to close a task scope while executing with
1259 * {@linkplain ScopedValue scoped value} bindings, and the task scope was created
1260 * before the scoped values were bound, then {@code StructureViolationException} is
1261 * thrown after closing the task scope.
1262 * If a thread terminates without first closing task scopes that it owns then
1263 * termination will cause the underlying construct of each of its open tasks scopes to
1264 * be closed. Closing is performed in the reverse order that the task scopes were
1265 * created in. Thread termination may therefore be delayed when the task scope owner
1266 * has to wait for threads forked in these task scopes to finish.
1267 *
1268 * @throws IllegalStateException thrown after closing the task scope if the task scope
1269 * owner did not attempt to join after forking
1270 * @throws WrongThreadException if the current thread is not the task scope owner
1271 * @throws StructureViolationException if a structure violation was detected
1272 */
1273 @Override
1274 public void close() {
1275 ensureOwner();
1276 int s = state;
1277 if (s == ST_CLOSED) {
1278 return;
1279 }
1280
1281 // cancel execution if join did not complete
1282 if (s < ST_JOIN_COMPLETED) {
1283 cancelExecution();
1284 cancelTimeout();
1285 }
1286
1287 // wait for stragglers
1288 try {
1289 flock.close();
1290 } finally {
1291 state = ST_CLOSED;
1292 }
1293
1294 // throw ISE if the owner didn't join after forking
1295 if (s == ST_FORKED) {
1296 throw new IllegalStateException("Owner did not join after forking");
1297 }
1298 }
1299
1300 /**
1301 * {@inheritDoc} If a {@link Config#withName(String) name} for monitoring and
1302 * monitoring purposes has been set then the string representation includes the name.
1303 */
1304 @Override
1305 public String toString() {
1306 return flock.name();
1307 }
1308
1309 /**
1310 * Subtask implementation, runs the task specified to the fork method.
1311 */
1312 private static final class SubtaskImpl<T> implements Subtask<T>, Runnable {
1313 private static final AltResult RESULT_NULL = new AltResult(Subtask.State.SUCCESS);
1314
1315 private record AltResult(Subtask.State state, Throwable exception) {
1316 AltResult(Subtask.State state) {
1317 this(state, null);
1318 }
1319 }
1320
1321 private final StructuredTaskScope<? super T, ?> scope;
1322 private final Callable<? extends T> task;
1323 private volatile Object result;
1324
1325 SubtaskImpl(StructuredTaskScope<? super T, ?> scope, Callable<? extends T> task) {
1326 this.scope = scope;
1327 this.task = task;
1328 }
1329
1330 @Override
1331 public void run() {
1332 T result = null;
1333 Throwable ex = null;
1334 try {
1335 result = task.call();
1336 } catch (Throwable e) {
1337 ex = e;
1338 }
1339
1340 // nothing to do if task scope is cancelled
1341 if (scope.isCancelled())
1342 return;
1343
1344 // set result/exception and invoke onComplete
1345 if (ex == null) {
1346 this.result = (result != null) ? result : RESULT_NULL;
1347 } else {
1348 this.result = new AltResult(State.FAILED, ex);
1349 }
1350 scope.onComplete(this);
1351 }
1352
1353 @Override
1354 public Subtask.State state() {
1355 Object result = this.result;
1356 if (result == null) {
1357 return State.UNAVAILABLE;
1358 } else if (result instanceof AltResult alt) {
1359 // null or failed
1360 return alt.state();
1361 } else {
1362 return State.SUCCESS;
1363 }
1364 }
1365
1366
1367 @Override
1368 public T get() {
1369 scope.ensureJoinedIfOwner();
1370 Object result = this.result;
1371 if (result instanceof AltResult) {
1372 if (result == RESULT_NULL) return null;
1373 } else if (result != null) {
1374 @SuppressWarnings("unchecked")
1375 T r = (T) result;
1376 return r;
1377 }
1378 throw new IllegalStateException(
1379 "Result is unavailable or subtask did not complete successfully");
1380 }
1381
1382 @Override
1383 public Throwable exception() {
1384 scope.ensureJoinedIfOwner();
1385 Object result = this.result;
1386 if (result instanceof AltResult alt && alt.state() == State.FAILED) {
1387 return alt.exception();
1388 }
1389 throw new IllegalStateException(
1390 "Exception is unavailable or subtask did not complete with exception");
1391 }
1392
1393 @Override
1394 public String toString() {
1395 String stateAsString = switch (state()) {
1396 case UNAVAILABLE -> "[Unavailable]";
1397 case SUCCESS -> "[Completed successfully]";
1398 case FAILED -> {
1399 Throwable ex = ((AltResult) result).exception();
1400 yield "[Failed: " + ex + "]";
1401 }
1402 };
1403 return Objects.toIdentityString(this) + stateAsString;
1404 }
1405 }
1406
1407 /**
1408 * A joiner that returns a stream of all subtasks when all subtasks complete
1409 * successfully. If any subtask fails then execution is cancelled.
1410 */
1411 private static final class AllSuccessful<T> implements Joiner<T, Stream<Subtask<T>>> {
1412 private static final VarHandle FIRST_EXCEPTION;
1413 static {
1414 MethodHandles.Lookup l = MethodHandles.lookup();
1415 FIRST_EXCEPTION = MhUtil.findVarHandle(l, "firstException", Throwable.class);
1416 }
1417 private volatile Throwable firstException;
1418
1419 // list of forked subtasks, only accessed by owner thread
1420 private final List<Subtask<T>> subtasks = new ArrayList<>();
1421
1422 @Override
1423 public boolean onFork(Subtask<? extends T> subtask) {
1424 @SuppressWarnings("unchecked")
1425 var tmp = (Subtask<T>) Objects.requireNonNull(subtask);
1426 subtasks.add(tmp);
1427 return false;
1428 }
1429
1430 @Override
1431 public boolean onComplete(Subtask<? extends T> subtask) {
1432 return (subtask.state() == Subtask.State.FAILED)
1433 && (firstException == null)
1434 && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
1435 }
1436
1437 @Override
1438 public Stream<Subtask<T>> result() throws Throwable {
1439 Throwable ex = firstException;
1440 if (ex != null) {
1441 throw ex;
1442 } else {
1443 return subtasks.stream();
1444 }
1445 }
1446 }
1447
1448 /**
1449 * A joiner that returns the result of the first subtask to complete successfully.
1450 * If any subtask completes successfully then execution is cancelled.
1451 */
1452 private static final class AnySuccessful<T> implements Joiner<T, T> {
1453 private static final VarHandle FIRST_SUCCESS;
1454 private static final VarHandle FIRST_EXCEPTION;
1455 static {
1456 MethodHandles.Lookup l = MethodHandles.lookup();
1457 FIRST_SUCCESS = MhUtil.findVarHandle(l, "firstSuccess", Subtask.class);
1458 FIRST_EXCEPTION = MhUtil.findVarHandle(l, "firstException", Throwable.class);
1459 }
1460 private volatile Subtask<T> firstSuccess;
1461 private volatile Throwable firstException;
1462
1463 @Override
1464 public boolean onComplete(Subtask<? extends T> subtask) {
1465 Objects.requireNonNull(subtask);
1466 if (firstSuccess == null) {
1467 if (subtask.state() == Subtask.State.SUCCESS) {
1468 // capture the first subtask that completes successfully
1469 return FIRST_SUCCESS.compareAndSet(this, null, subtask);
1470 } else if (firstException == null) {
1471 // capture the exception thrown by the first task to fail
1472 FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
1473 }
1474 }
1475 return false;
1476 }
1477
1478 @Override
1479 public T result() throws Throwable {
1480 Subtask<T> firstSuccess = this.firstSuccess;
1481 if (firstSuccess != null) {
1482 return firstSuccess.get();
1483 }
1484 Throwable firstException = this.firstException;
1485 if (firstException != null) {
1486 throw firstException;
1487 } else {
1488 throw new NoSuchElementException("No subtasks completed");
1489 }
1490 }
1491 }
1492
1493 /**
1494 * A joiner that that waits for all successful subtasks. If any subtask fails the
1495 * execution is cancelled.
1496 */
1497 private static final class AwaitSuccessful<T> implements Joiner<T, Void> {
1498 private static final VarHandle FIRST_EXCEPTION;
1499 static {
1500 MethodHandles.Lookup l = MethodHandles.lookup();
1501 FIRST_EXCEPTION = MhUtil.findVarHandle(l, "firstException", Throwable.class);
1502 }
1503 private volatile Throwable firstException;
1504
1505 @Override
1506 public boolean onComplete(Subtask<? extends T> subtask) {
1507 return (subtask.state() == Subtask.State.FAILED)
1508 && (firstException == null)
1509 && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
1510 }
1511
1512 @Override
1513 public Void result() throws Throwable {
1514 Throwable ex = firstException;
1515 if (ex != null) {
1516 throw ex;
1517 } else {
1518 return null;
1519 }
1520 }
1521 }
1522
1523 /**
1524 * A joiner that returns a stream of all subtasks.
1525 */
1526 private static class AllSubtasks<T> implements Joiner<T, Stream<Subtask<T>>> {
1527 private final Predicate<Subtask<? extends T>> isDone;
1528 // list of forked subtasks, only accessed by owner thread
1529 private final List<Subtask<T>> subtasks = new ArrayList<>();
1530
1531 AllSubtasks(Predicate<Subtask<? extends T>> isDone) {
1532 this.isDone = Objects.requireNonNull(isDone);
1533 }
1534
1535 @Override
1536 public boolean onFork(Subtask<? extends T> subtask) {
1537 @SuppressWarnings("unchecked")
1538 var tmp = (Subtask<T>) Objects.requireNonNull(subtask);
1539 subtasks.add(tmp);
1540 return false;
1541 }
1542
1543 @Override
1544 public boolean onComplete(Subtask<? extends T> subtask) {
1545 return isDone.test(Objects.requireNonNull(subtask));
1546 }
1547
1548 @Override
1549 public Stream<Subtask<T>> result() {
1550 return subtasks.stream();
1551 }
1552 }
1553
1554 /**
1555 * Implementation of Config.
1556 */
1557 private record ConfigImpl(ThreadFactory threadFactory,
1558 String name,
1559 Duration timeout) implements Config {
1560 static Config defaultConfig() {
1561 return new ConfigImpl(Thread.ofVirtual().factory(), null, null);
1562 }
1563
1564 @Override
1565 public Config withThreadFactory(ThreadFactory threadFactory) {
1566 return new ConfigImpl(Objects.requireNonNull(threadFactory), name, timeout);
1567 }
1568
1569 @Override
1570 public Config withName(String name) {
1571 return new ConfigImpl(threadFactory, Objects.requireNonNull(name), timeout);
1572 }
1573
1574 @Override
1575 public Config withTimeout(Duration timeout) {
1576 return new ConfigImpl(threadFactory, name, Objects.requireNonNull(timeout));
1577 }
1578 }
1579
1580 /**
1581 * Used to schedule a task to cancel execution when a timeout expires.
1582 */
1583 private static class TimerSupport {
1584 private static final ScheduledExecutorService DELAYED_TASK_SCHEDULER;
1585 static {
1586 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor)
1587 Executors.newScheduledThreadPool(1, task -> {
1588 Thread t = InnocuousThread.newThread("StructuredTaskScope-Timer", task);
1589 t.setDaemon(true);
1590 return t;
1591 });
1592 stpe.setRemoveOnCancelPolicy(true);
1593 DELAYED_TASK_SCHEDULER = stpe;
1594 }
1595
1596 static Future<?> schedule(Duration timeout, Runnable task) {
1597 long nanos = TimeUnit.NANOSECONDS.convert(timeout);
1598 return DELAYED_TASK_SCHEDULER.schedule(task, nanos, TimeUnit.NANOSECONDS);
1599 }
1600 }
1601 }
|