205 * public Stream<T> results() { // @highlight substring="results"
206 * return results.stream();
207 * }
208 * }
209 * }
210 *
211 * <h2><a id="TreeStructure">Tree structure</a></h2>
212 *
213 * StructuredTaskScopes form a tree where parent-child relations are established
214 * implicitly when opening a new task scope:
215 * <ul>
216 * <li> A parent-child relation is established when a thread started in a task scope
217 * opens its own task scope. A thread started in task scope "A" that opens task scope
218 * "B" establishes a parent-child relation where task scope "A" is the parent of task
219 * scope "B".
220 * <li> A parent-child relation is established with nesting. If a thread opens task
221 * scope "B", then opens task scope "C" (before it closes "B"), then the enclosing task
222 * scope "B" is the parent of the nested task scope "C".
223 * </ul>
224 *
225 * <p> The tree structure supports confinement checks. The phrase "threads contained in
226 * the task scope" in method descriptions means threads started in the task scope or
227 * descendant scopes. {@code StructuredTaskScope} does not define APIs that exposes the
228 * tree structure at this time.
229 *
230 * <p> Unless otherwise specified, passing a {@code null} argument to a constructor
231 * or method in this class will cause a {@link NullPointerException} to be thrown.
232 *
233 * <h2>Memory consistency effects</h2>
234 *
235 * <p> Actions in the owner thread of, or a thread contained in, the task scope prior to
236 * {@linkplain #fork forking} of a {@code Callable} task
237 * <a href="../../../../java.base/java/util/concurrent/package-summary.html#MemoryVisibility">
238 * <i>happen-before</i></a> any actions taken by that task, which in turn <i>happen-before</i>
239 * the task result is retrieved via its {@code Future}, or <i>happen-before</i> any actions
240 * taken in a thread after {@linkplain #join() joining} of the task scope.
241 *
242 * @jls 17.4.5 Happens-before Order
243 *
244 * @param <T> the result type of tasks executed in the scope
245 * @since 19
246 */
247 public class StructuredTaskScope<T> implements AutoCloseable {
248 private static final VarHandle FUTURES;
263 private volatile Set<Future<?>> futures;
264
265 // set by owner when it forks, reset by owner when it joins
266 private boolean needJoin;
267
268 // states: OPEN -> SHUTDOWN -> CLOSED
269 private static final int OPEN = 0; // initial state
270 private static final int SHUTDOWN = 1;
271 private static final int CLOSED = 2;
272
273 // scope state, set by owner, read by any thread
274 private volatile int state;
275
276 /**
277 * Creates a structured task scope with the given name and thread factory. The task
278 * scope is optionally named for the purposes of monitoring and management. The thread
279 * factory is used to {@link ThreadFactory#newThread(Runnable) create} threads when
280 * tasks are {@linkplain #fork(Callable) forked}. The task scope is owned by the
281 * current thread.
282 *
283 * @param name the name of the task scope, can be null
284 * @param factory the thread factory
285 */
286 public StructuredTaskScope(String name, ThreadFactory factory) {
287 this.factory = Objects.requireNonNull(factory, "'factory' is null");
288 this.flock = ThreadFlock.open(name);
289 }
290
291 /**
292 * Creates an unnamed structured task scope that creates virtual threads. The task
293 * scope is owned by the current thread.
294 *
295 * <p> This constructor is equivalent to invoking the 2-arg constructor with a name
296 * of {@code null} and a thread factory that creates virtual threads.
297 *
298 * @throws UnsupportedOperationException if preview features are not enabled
299 */
300 public StructuredTaskScope() {
301 PreviewFeatures.ensureEnabled();
302 this.factory = Thread.ofVirtual().factory();
350 private void untrack(Future<?> future) {
351 assert futures != null;
352 futures.remove(future);
353 }
354
355 /**
356 * Invoked when a task completes before the scope is shut down.
357 *
358 * <p> The {@code handleComplete} method should be thread safe. It may be invoked by
359 * several threads concurrently.
360 *
361 * @implSpec The default implementation does nothing.
362 *
363 * @param future the completed task
364 */
365 protected void handleComplete(Future<T> future) { }
366
367 /**
368 * Starts a new thread to run the given task.
369 *
370 * <p> The new thread is created with the task scope's {@link ThreadFactory}.
371 *
372 * <p> If the task completes before the task scope is {@link #shutdown() shutdown}
373 * then the {@link #handleComplete(Future) handle} method is invoked to consume the
374 * completed task. The {@code handleComplete} method is run when the task completes
375 * with a result or exception. If the {@code Future} {@link Future#cancel(boolean)
376 * cancel} method is used the cancel a task before the task scope is shut down, then
377 * the {@code handleComplete} method is run by the thread that invokes {@code cancel}.
378 * If the task scope shuts down at or around the same time that the task completes or
379 * is cancelled then the {@code handleComplete} method may or may not be invoked.
380 *
381 * <p> If this task scope is {@linkplain #shutdown() shutdown} (or in the process
382 * of shutting down) then {@code fork} returns a {@code Future} representing a {@link
383 * Future.State#CANCELLED cancelled} task that was not run.
384 *
385 * <p> This method may only be invoked by the task scope owner or threads contained
386 * in the task scope. The {@link Future#cancel(boolean) cancel} method of the returned
387 * {@code Future} object is also restricted to the task scope owner or threads contained
388 * in the task scope. The {@code cancel} method throws {@link WrongThreadException}
389 * if invoked from another thread. All other methods on the returned {@code Future}
390 * object, such as {@link Future#get() get}, are not restricted.
391 *
392 * @param task the task to run
393 * @param <U> the result type
394 * @return a future
395 * @throws IllegalStateException if this task scope is closed
396 * @throws WrongThreadException if the current thread is not the owner or a thread
397 * contained in the task scope
398 * @throws RejectedExecutionException if the thread factory rejected creating a
399 * thread to run the task
400 */
401 public <U extends T> Future<U> fork(Callable<? extends U> task) {
402 Objects.requireNonNull(task, "'task' is null");
403
404 // create future
405 var future = new FutureImpl<U>(this, task);
406
407 boolean shutdown = (state >= SHUTDOWN);
408
409 if (!shutdown) {
410 // create thread
411 Thread thread = factory.newThread(future);
412 if (thread == null) {
413 throw new RejectedExecutionException("Rejected by thread factory");
414 }
415
416 // attempt to start the thread
417 try {
611 if (implShutdown())
612 flock.wakeup();
613 }
614
615 /**
616 * Closes this task scope.
617 *
618 * <p> This method first shuts down the task scope (as if by invoking the {@link
619 * #shutdown() shutdown} method). It then waits for the threads executing any
620 * unfinished tasks to finish. If interrupted then this method will continue to
621 * wait for the threads to finish before completing with the interrupt status set.
622 *
623 * <p> This method may only be invoked by the task scope owner. If the task scope
624 * is already closed then the owner invoking this method has no effect.
625 *
626 * <p> A {@code StructuredTaskScope} is intended to be used in a <em>structured
627 * manner</em>. If this method is called to close a task scope before nested task
628 * scopes are closed then it closes the underlying construct of each nested task scope
629 * (in the reverse order that they were created in), closes this task scope, and then
630 * throws {@link StructureViolationException}.
631 * If a thread terminates without first closing task scopes that it owns then
632 * termination will cause the underlying construct of each of its open tasks scopes to
633 * be closed. Closing is performed in the reverse order that the task scopes were
634 * created in. Thread termination may therefore be delayed when the owner has to wait
635 * for threads forked in these task scopes to finish.
636 *
637 * @throws IllegalStateException thrown after closing the task scope if the owner
638 * did not invoke join after forking
639 * @throws WrongThreadException if the current thread is not the owner
640 * @throws StructureViolationException if a structure violation was detected
641 */
642 @Override
643 public void close() {
644 ensureOwner();
645 if (state == CLOSED)
646 return;
647
648 try {
649 implShutdown();
650 flock.close();
807 */
808 public static final class ShutdownOnSuccess<T> extends StructuredTaskScope<T> {
809 private static final VarHandle FUTURE;
810 static {
811 try {
812 MethodHandles.Lookup l = MethodHandles.lookup();
813 FUTURE = l.findVarHandle(ShutdownOnSuccess.class, "future", Future.class);
814 } catch (Exception e) {
815 throw new InternalError(e);
816 }
817 }
818 private volatile Future<T> future;
819
820 /**
821 * Constructs a new {@code ShutdownOnSuccess} with the given name and thread factory.
822 * The task scope is optionally named for the purposes of monitoring and management.
823 * The thread factory is used to {@link ThreadFactory#newThread(Runnable) create}
824 * threads when tasks are {@linkplain #fork(Callable) forked}. The task scope is
825 * owned by the current thread.
826 *
827 * @param name the name of the task scope, can be null
828 * @param factory the thread factory
829 */
830 public ShutdownOnSuccess(String name, ThreadFactory factory) {
831 super(name, factory);
832 }
833
834 /**
835 * Constructs a new unnamed {@code ShutdownOnSuccess} that creates virtual threads.
836 *
837 * <p> This constructor is equivalent to invoking the 2-arg constructor with a
838 * name of {@code null} and a thread factory that creates virtual threads.
839 */
840 public ShutdownOnSuccess() {
841 super(null, Thread.ofVirtual().factory());
842 }
843
844 /**
845 * Shut down the given task scope when invoked for the first time with a {@code
846 * Future} for a task that completed with a result.
983 */
984 public static final class ShutdownOnFailure extends StructuredTaskScope<Object> {
985 private static final VarHandle FUTURE;
986 static {
987 try {
988 MethodHandles.Lookup l = MethodHandles.lookup();
989 FUTURE = l.findVarHandle(ShutdownOnFailure.class, "future", Future.class);
990 } catch (Exception e) {
991 throw new InternalError(e);
992 }
993 }
994 private volatile Future<Object> future;
995
996 /**
997 * Constructs a new {@code ShutdownOnFailure} with the given name and thread factory.
998 * The task scope is optionally named for the purposes of monitoring and management.
999 * The thread factory is used to {@link ThreadFactory#newThread(Runnable) create}
1000 * threads when tasks are {@linkplain #fork(Callable) forked}. The task scope
1001 * is owned by the current thread.
1002 *
1003 * @param name the name of the task scope, can be null
1004 * @param factory the thread factory
1005 */
1006 public ShutdownOnFailure(String name, ThreadFactory factory) {
1007 super(name, factory);
1008 }
1009
1010 /**
1011 * Constructs a new unnamed {@code ShutdownOnFailure} that creates virtual threads.
1012 *
1013 * <p> This constructor is equivalent to invoking the 2-arg constructor with a
1014 * name of {@code null} and a thread factory that creates virtual threads.
1015 */
1016 public ShutdownOnFailure() {
1017 super(null, Thread.ofVirtual().factory());
1018 }
1019
1020 /**
1021 * Shut down the given task scope when invoked for the first time with a {@code
1022 * Future} for a task that completed abnormally (exception or cancelled).
|
205 * public Stream<T> results() { // @highlight substring="results"
206 * return results.stream();
207 * }
208 * }
209 * }
210 *
211 * <h2><a id="TreeStructure">Tree structure</a></h2>
212 *
213 * StructuredTaskScopes form a tree where parent-child relations are established
214 * implicitly when opening a new task scope:
215 * <ul>
216 * <li> A parent-child relation is established when a thread started in a task scope
217 * opens its own task scope. A thread started in task scope "A" that opens task scope
218 * "B" establishes a parent-child relation where task scope "A" is the parent of task
219 * scope "B".
220 * <li> A parent-child relation is established with nesting. If a thread opens task
221 * scope "B", then opens task scope "C" (before it closes "B"), then the enclosing task
222 * scope "B" is the parent of the nested task scope "C".
223 * </ul>
224 *
225 * <p> The tree structure supports:
226 * <ul>
227 * <li> Inheritance of {@linkplain ExtentLocal extent-local} variables across threads.
228 * <li> Confinement checks. The phrase "threads contained in the task scope" in method
229 * descriptions means threads started in the task scope or descendant scopes.
230 * </ul>
231 *
232 * <p> The following example demonstrates the inheritance of an extent-local variable. An
233 * extent local {@code NAME} is bound to the value "duke". A StructuredTaskScope is created
234 * and its {@code fork} method invoked to start a thread to execute {@code childTask}.
235 * The thread inherits the extent-local {@linkplain ExtentLocal.Carrier bindings} captured
236 * when creating the task scope. The code in {@code childTask} uses the value of the
237 * extent-local and so reads the value "duke".
238 * {@snippet lang=java :
239 * private static final ExtentLocal<String> NAME = ExtentLocal.newInstance();
240 *
241 * // @link substring="where" target="ExtentLocal#where" :
242 * ExtentLocal.where(NAME, "duke").run(() -> {
243 * try (var scope = new StructuredTaskScope<String>()) {
244 *
245 * scope.fork(() -> childTask()); // @highlight substring="fork"
246 * ...
247 * }
248 * });
249 *
250 * ...
251 *
252 * String childTask() {
253 * String name = NAME.get(); // "duke" // @highlight substring="get"
254 * ...
255 * }
256 * }
257 *
258 * <p> {@code StructuredTaskScope} does not define APIs that exposes the tree structure
259 * at this time.
260 *
261 * <p> Unless otherwise specified, passing a {@code null} argument to a constructor
262 * or method in this class will cause a {@link NullPointerException} to be thrown.
263 *
264 * <h2>Memory consistency effects</h2>
265 *
266 * <p> Actions in the owner thread of, or a thread contained in, the task scope prior to
267 * {@linkplain #fork forking} of a {@code Callable} task
268 * <a href="../../../../java.base/java/util/concurrent/package-summary.html#MemoryVisibility">
269 * <i>happen-before</i></a> any actions taken by that task, which in turn <i>happen-before</i>
270 * the task result is retrieved via its {@code Future}, or <i>happen-before</i> any actions
271 * taken in a thread after {@linkplain #join() joining} of the task scope.
272 *
273 * @jls 17.4.5 Happens-before Order
274 *
275 * @param <T> the result type of tasks executed in the scope
276 * @since 19
277 */
278 public class StructuredTaskScope<T> implements AutoCloseable {
279 private static final VarHandle FUTURES;
294 private volatile Set<Future<?>> futures;
295
296 // set by owner when it forks, reset by owner when it joins
297 private boolean needJoin;
298
299 // states: OPEN -> SHUTDOWN -> CLOSED
300 private static final int OPEN = 0; // initial state
301 private static final int SHUTDOWN = 1;
302 private static final int CLOSED = 2;
303
304 // scope state, set by owner, read by any thread
305 private volatile int state;
306
307 /**
308 * Creates a structured task scope with the given name and thread factory. The task
309 * scope is optionally named for the purposes of monitoring and management. The thread
310 * factory is used to {@link ThreadFactory#newThread(Runnable) create} threads when
311 * tasks are {@linkplain #fork(Callable) forked}. The task scope is owned by the
312 * current thread.
313 *
314 * <p> This method captures the current thread's {@linkplain ExtentLocal extent-local}
315 * bindings for inheritance by threads created in the task scope. The
316 * <a href="#TreeStructure">Tree Structure</a> section in the class description
317 * details how parent-child relations are established implicitly for the purpose of
318 * inheritance of extent-local bindings.
319 *
320 * @param name the name of the task scope, can be null
321 * @param factory the thread factory
322 */
323 public StructuredTaskScope(String name, ThreadFactory factory) {
324 this.factory = Objects.requireNonNull(factory, "'factory' is null");
325 this.flock = ThreadFlock.open(name);
326 }
327
328 /**
329 * Creates an unnamed structured task scope that creates virtual threads. The task
330 * scope is owned by the current thread.
331 *
332 * <p> This constructor is equivalent to invoking the 2-arg constructor with a name
333 * of {@code null} and a thread factory that creates virtual threads.
334 *
335 * @throws UnsupportedOperationException if preview features are not enabled
336 */
337 public StructuredTaskScope() {
338 PreviewFeatures.ensureEnabled();
339 this.factory = Thread.ofVirtual().factory();
387 private void untrack(Future<?> future) {
388 assert futures != null;
389 futures.remove(future);
390 }
391
392 /**
393 * Invoked when a task completes before the scope is shut down.
394 *
395 * <p> The {@code handleComplete} method should be thread safe. It may be invoked by
396 * several threads concurrently.
397 *
398 * @implSpec The default implementation does nothing.
399 *
400 * @param future the completed task
401 */
402 protected void handleComplete(Future<T> future) { }
403
404 /**
405 * Starts a new thread to run the given task.
406 *
407 * <p> The new thread is created with the task scope's {@link ThreadFactory}. It
408 * inherits the current thread's {@linkplain ExtentLocal extent-local} bindings. The
409 * bindings must match the bindings captured when the task scope was created.
410 *
411 * <p> If the task completes before the task scope is {@link #shutdown() shutdown}
412 * then the {@link #handleComplete(Future) handle} method is invoked to consume the
413 * completed task. The {@code handleComplete} method is run when the task completes
414 * with a result or exception. If the {@code Future} {@link Future#cancel(boolean)
415 * cancel} method is used the cancel a task before the task scope is shut down, then
416 * the {@code handleComplete} method is run by the thread that invokes {@code cancel}.
417 * If the task scope shuts down at or around the same time that the task completes or
418 * is cancelled then the {@code handleComplete} method may or may not be invoked.
419 *
420 * <p> If this task scope is {@linkplain #shutdown() shutdown} (or in the process
421 * of shutting down) then {@code fork} returns a {@code Future} representing a {@link
422 * Future.State#CANCELLED cancelled} task that was not run.
423 *
424 * <p> This method may only be invoked by the task scope owner or threads contained
425 * in the task scope. The {@link Future#cancel(boolean) cancel} method of the returned
426 * {@code Future} object is also restricted to the task scope owner or threads contained
427 * in the task scope. The {@code cancel} method throws {@link WrongThreadException}
428 * if invoked from another thread. All other methods on the returned {@code Future}
429 * object, such as {@link Future#get() get}, are not restricted.
430 *
431 * @param task the task to run
432 * @param <U> the result type
433 * @return a future
434 * @throws IllegalStateException if this task scope is closed
435 * @throws WrongThreadException if the current thread is not the owner or a thread
436 * contained in the task scope
437 * @throws StructureViolationException if the current extent-local bindings are not
438 * the same as when the task scope was created
439 * @throws RejectedExecutionException if the thread factory rejected creating a
440 * thread to run the task
441 */
442 public <U extends T> Future<U> fork(Callable<? extends U> task) {
443 Objects.requireNonNull(task, "'task' is null");
444
445 // create future
446 var future = new FutureImpl<U>(this, task);
447
448 boolean shutdown = (state >= SHUTDOWN);
449
450 if (!shutdown) {
451 // create thread
452 Thread thread = factory.newThread(future);
453 if (thread == null) {
454 throw new RejectedExecutionException("Rejected by thread factory");
455 }
456
457 // attempt to start the thread
458 try {
652 if (implShutdown())
653 flock.wakeup();
654 }
655
656 /**
657 * Closes this task scope.
658 *
659 * <p> This method first shuts down the task scope (as if by invoking the {@link
660 * #shutdown() shutdown} method). It then waits for the threads executing any
661 * unfinished tasks to finish. If interrupted then this method will continue to
662 * wait for the threads to finish before completing with the interrupt status set.
663 *
664 * <p> This method may only be invoked by the task scope owner. If the task scope
665 * is already closed then the owner invoking this method has no effect.
666 *
667 * <p> A {@code StructuredTaskScope} is intended to be used in a <em>structured
668 * manner</em>. If this method is called to close a task scope before nested task
669 * scopes are closed then it closes the underlying construct of each nested task scope
670 * (in the reverse order that they were created in), closes this task scope, and then
671 * throws {@link StructureViolationException}.
672 *
673 * Similarly, if called to close a task scope that <em>encloses</em> {@linkplain
674 * ExtentLocal.Carrier#run(Runnable) operations} with extent-local bindings then
675 * it also throws {@code StructureViolationException} after closing the task scope.
676 *
677 * If a thread terminates without first closing task scopes that it owns then
678 * termination will cause the underlying construct of each of its open tasks scopes to
679 * be closed. Closing is performed in the reverse order that the task scopes were
680 * created in. Thread termination may therefore be delayed when the owner has to wait
681 * for threads forked in these task scopes to finish.
682 *
683 * @throws IllegalStateException thrown after closing the task scope if the owner
684 * did not invoke join after forking
685 * @throws WrongThreadException if the current thread is not the owner
686 * @throws StructureViolationException if a structure violation was detected
687 */
688 @Override
689 public void close() {
690 ensureOwner();
691 if (state == CLOSED)
692 return;
693
694 try {
695 implShutdown();
696 flock.close();
853 */
854 public static final class ShutdownOnSuccess<T> extends StructuredTaskScope<T> {
855 private static final VarHandle FUTURE;
856 static {
857 try {
858 MethodHandles.Lookup l = MethodHandles.lookup();
859 FUTURE = l.findVarHandle(ShutdownOnSuccess.class, "future", Future.class);
860 } catch (Exception e) {
861 throw new InternalError(e);
862 }
863 }
864 private volatile Future<T> future;
865
866 /**
867 * Constructs a new {@code ShutdownOnSuccess} with the given name and thread factory.
868 * The task scope is optionally named for the purposes of monitoring and management.
869 * The thread factory is used to {@link ThreadFactory#newThread(Runnable) create}
870 * threads when tasks are {@linkplain #fork(Callable) forked}. The task scope is
871 * owned by the current thread.
872 *
873 * <p> This method captures the current thread's {@linkplain ExtentLocal extent-local}
874 * bindings for inheritance by threads created in the task scope. The
875 * <a href="StructuredTaskScope.html#TreeStructure">Tree Structure</a> section in
876 * the class description details how parent-child relations are established
877 * implicitly for the purpose of inheritance of extent-local bindings.
878 *
879 * @param name the name of the task scope, can be null
880 * @param factory the thread factory
881 */
882 public ShutdownOnSuccess(String name, ThreadFactory factory) {
883 super(name, factory);
884 }
885
886 /**
887 * Constructs a new unnamed {@code ShutdownOnSuccess} that creates virtual threads.
888 *
889 * <p> This constructor is equivalent to invoking the 2-arg constructor with a
890 * name of {@code null} and a thread factory that creates virtual threads.
891 */
892 public ShutdownOnSuccess() {
893 super(null, Thread.ofVirtual().factory());
894 }
895
896 /**
897 * Shut down the given task scope when invoked for the first time with a {@code
898 * Future} for a task that completed with a result.
1035 */
1036 public static final class ShutdownOnFailure extends StructuredTaskScope<Object> {
1037 private static final VarHandle FUTURE;
1038 static {
1039 try {
1040 MethodHandles.Lookup l = MethodHandles.lookup();
1041 FUTURE = l.findVarHandle(ShutdownOnFailure.class, "future", Future.class);
1042 } catch (Exception e) {
1043 throw new InternalError(e);
1044 }
1045 }
1046 private volatile Future<Object> future;
1047
1048 /**
1049 * Constructs a new {@code ShutdownOnFailure} with the given name and thread factory.
1050 * The task scope is optionally named for the purposes of monitoring and management.
1051 * The thread factory is used to {@link ThreadFactory#newThread(Runnable) create}
1052 * threads when tasks are {@linkplain #fork(Callable) forked}. The task scope
1053 * is owned by the current thread.
1054 *
1055 * <p> This method captures the current thread's {@linkplain ExtentLocal extent-local}
1056 * bindings for inheritance by threads created in the task scope. The
1057 * <a href="StructuredTaskScope.html#TreeStructure">Tree Structure</a> section in
1058 * the class description details how parent-child relations are established
1059 * implicitly for the purpose of inheritance of extent-local bindings.
1060 *
1061 * @param name the name of the task scope, can be null
1062 * @param factory the thread factory
1063 */
1064 public ShutdownOnFailure(String name, ThreadFactory factory) {
1065 super(name, factory);
1066 }
1067
1068 /**
1069 * Constructs a new unnamed {@code ShutdownOnFailure} that creates virtual threads.
1070 *
1071 * <p> This constructor is equivalent to invoking the 2-arg constructor with a
1072 * name of {@code null} and a thread factory that creates virtual threads.
1073 */
1074 public ShutdownOnFailure() {
1075 super(null, Thread.ofVirtual().factory());
1076 }
1077
1078 /**
1079 * Shut down the given task scope when invoked for the first time with a {@code
1080 * Future} for a task that completed abnormally (exception or cancelled).
|