295 *
296 * @jls 17.4.5 Happens-before Order
297 *
298 * @param <T> the result type of tasks executed in the task scope
299 * @since 21
300 */
301 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
302 public class StructuredTaskScope<T> implements AutoCloseable {
303 private final ThreadFactory factory;
304 private final ThreadFlock flock;
305 private final ReentrantLock shutdownLock = new ReentrantLock();
306
307 // states: OPEN -> SHUTDOWN -> CLOSED
308 private static final int OPEN = 0; // initial state
309 private static final int SHUTDOWN = 1;
310 private static final int CLOSED = 2;
311
312 // state: set to SHUTDOWN by any thread, set to CLOSED by owner, read by any thread
313 private volatile int state;
314
315 // Counters to support checking that the task scope owner joins before processing
316 // results and attempts join before closing the task scope. These counters are
317 // accessed only by the owner thread.
318 private int forkRound; // incremented when the first subtask is forked after join
319 private int lastJoinAttempted; // set to the current fork round when join is attempted
320 private int lastJoinCompleted; // set to the current fork round when join completes
321
322 /**
323 * Represents a subtask forked with {@link #fork(Callable)}.
324 * @param <T> the result type
325 * @since 21
326 */
327 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
328 public sealed interface Subtask<T> extends Supplier<T> permits SubtaskImpl {
329 /**
330 * {@return the value returning task provided to the {@code fork} method}
331 *
332 * @apiNote Task objects with unique identity may be used for correlation by
333 * implementations of {@link #handleComplete(Subtask) handleComplete}.
334 */
335 Callable<? extends T> task();
336
337 /**
353 * Subtask.get()} method can be used to obtain the result. This is a terminal
354 * state.
355 */
356 SUCCESS,
357 /**
358 * The subtask failed with an exception. The {@link Subtask#exception()
359 * Subtask.exception()} method can be used to obtain the exception. This is a
360 * terminal state.
361 */
362 FAILED,
363 }
364
365 /**
366 * {@return the state of the subtask}
367 */
368 State state();
369
370 /**
371 * Returns the result of the subtask.
372 *
373 * <p> To ensure correct usage, if the scope owner {@linkplain #fork(Callable) forks}
374 * a subtask, then it must join (with {@link #join() join} or {@link #joinUntil(Instant)
375 * joinUntil}) before it can obtain the result of the subtask.
376 *
377 * @return the possibly-null result
378 * @throws IllegalStateException if the subtask has not completed, did not complete
379 * successfully, or the current thread is the task scope owner and did not join
380 * after forking
381 * @see State#SUCCESS
382 */
383 T get();
384
385 /**
386 * {@return the exception thrown by the subtask}
387 *
388 * <p> To ensure correct usage, if the scope owner {@linkplain #fork(Callable) forks}
389 * a subtask, then it must join (with {@link #join() join} or {@link #joinUntil(Instant)
390 * joinUntil}) before it can obtain the exception thrown by the subtask.
391 *
392 * @throws IllegalStateException if the subtask has not completed, completed with
393 * a result, or the current thread is the task scope owner and did not join after
394 * forking
395 * @see State#FAILED
396 */
397 Throwable exception();
398 }
399
400 /**
401 * Creates a structured task scope with the given name and thread factory. The task
402 * scope is optionally named for the purposes of monitoring and management. The thread
403 * factory is used to {@link ThreadFactory#newThread(Runnable) create} threads when
404 * subtasks are {@linkplain #fork(Callable) forked}. The task scope is owned by the
405 * current thread.
406 *
407 * <p> Construction captures the current thread's {@linkplain ScopedValue scoped value}
408 * bindings for inheritance by threads started in the task scope. The
409 * <a href="#TreeStructure">Tree Structure</a> section in the class description details
410 * how parent-child relations are established implicitly for the purpose of inheritance
411 * of scoped value bindings.
412 *
413 * @param name the name of the task scope, can be null
414 * @param factory the thread factory
452 }
453
454 /**
455 * Throws WrongThreadException if the current thread is not the owner.
456 */
457 private void ensureOwner() {
458 if (Thread.currentThread() != flock.owner())
459 throw new WrongThreadException("Current thread not owner");
460 }
461
462 /**
463 * Throws WrongThreadException if the current thread is not the owner
464 * or a thread contained in the tree.
465 */
466 private void ensureOwnerOrContainsThread() {
467 Thread currentThread = Thread.currentThread();
468 if (currentThread != flock.owner() && !flock.containsThread(currentThread))
469 throw new WrongThreadException("Current thread not owner or thread in the tree");
470 }
471
472 /**
473 * Throws IllegalStateException if the current thread is the owner, and the owner did
474 * not join after forking a subtask in the given fork round.
475 */
476 private void ensureJoinedIfOwner(int round) {
477 if (Thread.currentThread() == flock.owner() && (round > lastJoinCompleted)) {
478 throw newIllegalStateExceptionNoJoin();
479 }
480 }
481
482 /**
483 * Ensures that the current thread is the owner of this task scope and that it joined
484 * (with {@link #join()} or {@link #joinUntil(Instant)}) after {@linkplain #fork(Callable)
485 * forking} subtasks.
486 *
487 * @apiNote This method can be used by subclasses that define methods to make available
488 * results, state, or other outcome to code intended to execute after the join method.
489 *
490 * @throws WrongThreadException if the current thread is not the task scope owner
491 * @throws IllegalStateException if the task scope is open and task scope owner did
492 * not join after forking
493 */
494 protected final void ensureOwnerAndJoined() {
495 ensureOwner();
496 if (forkRound > lastJoinCompleted) {
497 throw newIllegalStateExceptionNoJoin();
498 }
499 }
500
501 /**
559 * thread contained in the task scope
560 * @throws StructureViolationException if the current scoped value bindings are not
561 * the same as when the task scope was created
562 * @throws RejectedExecutionException if the thread factory rejected creating a
563 * thread to run the subtask
564 */
565 public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
566 Objects.requireNonNull(task, "'task' is null");
567 int s = ensureOpen(); // throws ISE if closed
568
569 // when forked by the owner, the subtask is forked in the current or next round
570 int round = -1;
571 if (Thread.currentThread() == flock.owner()) {
572 round = forkRound;
573 if (forkRound == lastJoinCompleted) {
574 // new round if first fork after join
575 round++;
576 }
577 }
578
579 SubtaskImpl<U> subtask = new SubtaskImpl<>(this, task, round);
580 if (s < SHUTDOWN) {
581 // create thread to run task
582 Thread thread = factory.newThread(subtask);
583 if (thread == null) {
584 throw new RejectedExecutionException("Rejected by thread factory");
585 }
586
587 // attempt to start the thread
588 try {
589 flock.start(thread);
590 } catch (IllegalStateException e) {
591 // shutdown by another thread, or underlying flock is shutdown due
592 // to unstructured use
593 }
594 }
595
596 // force owner to join if this is the first fork in the round
597 if (Thread.currentThread() == flock.owner() && round > forkRound) {
598 forkRound = round;
599 }
827 * @throws WrongThreadException if the current thread is not the task scope owner
828 * @throws StructureViolationException if a structure violation was detected
829 */
830 @Override
831 public void close() {
832 ensureOwner();
833 int s = state;
834 if (s == CLOSED)
835 return;
836
837 try {
838 if (s < SHUTDOWN)
839 implShutdown();
840 flock.close();
841 } finally {
842 state = CLOSED;
843 }
844
845 // throw ISE if the owner didn't attempt to join after forking
846 if (forkRound > lastJoinAttempted) {
847 lastJoinCompleted = forkRound;
848 throw newIllegalStateExceptionNoJoin();
849 }
850 }
851
852 @Override
853 public String toString() {
854 String name = flock.name();
855 return switch (state) {
856 case OPEN -> name;
857 case SHUTDOWN -> name + "/shutdown";
858 case CLOSED -> name + "/closed";
859 default -> throw new InternalError();
860 };
861 }
862
863 /**
864 * Subtask implementation, runs the task specified to the fork method.
865 */
866 private static final class SubtaskImpl<T> implements Subtask<T>, Runnable {
867 private static final AltResult RESULT_NULL = new AltResult(Subtask.State.SUCCESS);
868
869 private record AltResult(Subtask.State state, Throwable exception) {
870 AltResult(Subtask.State state) {
871 this(state, null);
872 }
873 }
874
875 private final StructuredTaskScope<? super T> scope;
876 private final Callable<? extends T> task;
877 private final int round;
878 private volatile Object result;
879
880 SubtaskImpl(StructuredTaskScope<? super T> scope,
881 Callable<? extends T> task,
882 int round) {
883 this.scope = scope;
884 this.task = task;
885 this.round = round;
886 }
887
888 @Override
889 public void run() {
890 T result = null;
891 Throwable ex = null;
892 try {
893 result = task.call();
894 } catch (Throwable e) {
895 ex = e;
896 }
897
898 // nothing to do if task scope is shutdown
899 if (scope.isShutdown())
900 return;
901
902 // capture result or exception, invoke handleComplete
903 if (ex == null) {
904 this.result = (result != null) ? result : RESULT_NULL;
905 } else {
911 @Override
912 public Callable<? extends T> task() {
913 return task;
914 }
915
916 @Override
917 public Subtask.State state() {
918 Object result = this.result;
919 if (result == null) {
920 return State.UNAVAILABLE;
921 } else if (result instanceof AltResult alt) {
922 // null or failed
923 return alt.state();
924 } else {
925 return State.SUCCESS;
926 }
927 }
928
929 @Override
930 public T get() {
931 scope.ensureJoinedIfOwner(round);
932 Object result = this.result;
933 if (result instanceof AltResult) {
934 if (result == RESULT_NULL) return null;
935 } else if (result != null) {
936 @SuppressWarnings("unchecked")
937 T r = (T) result;
938 return r;
939 }
940 throw new IllegalStateException(
941 "Result is unavailable or subtask did not complete successfully");
942 }
943
944 @Override
945 public Throwable exception() {
946 scope.ensureJoinedIfOwner(round);
947 Object result = this.result;
948 if (result instanceof AltResult alt && alt.state() == State.FAILED) {
949 return alt.exception();
950 }
951 throw new IllegalStateException(
952 "Exception is unavailable or subtask did not complete with exception");
953 }
954
955 @Override
956 public String toString() {
957 String stateAsString = switch (state()) {
958 case UNAVAILABLE -> "[Unavailable]";
959 case SUCCESS -> "[Completed successfully]";
960 case FAILED -> {
961 Throwable ex = ((AltResult) result).exception();
962 yield "[Failed: " + ex + "]";
963 }
964 };
965 return Objects.toIdentityString(this) + stateAsString;
966 }
|
295 *
296 * @jls 17.4.5 Happens-before Order
297 *
298 * @param <T> the result type of tasks executed in the task scope
299 * @since 21
300 */
301 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
302 public class StructuredTaskScope<T> implements AutoCloseable {
303 private final ThreadFactory factory;
304 private final ThreadFlock flock;
305 private final ReentrantLock shutdownLock = new ReentrantLock();
306
307 // states: OPEN -> SHUTDOWN -> CLOSED
308 private static final int OPEN = 0; // initial state
309 private static final int SHUTDOWN = 1;
310 private static final int CLOSED = 2;
311
312 // state: set to SHUTDOWN by any thread, set to CLOSED by owner, read by any thread
313 private volatile int state;
314
315 // Counters to support checking that the task scope owner joins before closing the task
316 // scope. These counters are accessed only by the owner thread.
317 private int forkRound; // incremented when the first subtask is forked after join
318 private int lastJoinAttempted; // set to the current fork round when join is attempted
319 private int lastJoinCompleted; // set to the current fork round when join completes
320
321 /**
322 * Represents a subtask forked with {@link #fork(Callable)}.
323 * @param <T> the result type
324 * @since 21
325 */
326 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
327 public sealed interface Subtask<T> extends Supplier<T> permits SubtaskImpl {
328 /**
329 * {@return the value returning task provided to the {@code fork} method}
330 *
331 * @apiNote Task objects with unique identity may be used for correlation by
332 * implementations of {@link #handleComplete(Subtask) handleComplete}.
333 */
334 Callable<? extends T> task();
335
336 /**
352 * Subtask.get()} method can be used to obtain the result. This is a terminal
353 * state.
354 */
355 SUCCESS,
356 /**
357 * The subtask failed with an exception. The {@link Subtask#exception()
358 * Subtask.exception()} method can be used to obtain the exception. This is a
359 * terminal state.
360 */
361 FAILED,
362 }
363
364 /**
365 * {@return the state of the subtask}
366 */
367 State state();
368
369 /**
370 * Returns the result of the subtask.
371 *
372 * @return the possibly-null result
373 * @throws IllegalStateException if the subtask has not completed or did not
374 * complete successfully
375 * @see State#SUCCESS
376 */
377 T get();
378
379 /**
380 * {@return the exception thrown by the subtask}
381 *
382 * @throws IllegalStateException if the subtask has not completed or completed
383 * with a result rather than an exception
384 * @see State#FAILED
385 */
386 Throwable exception();
387 }
388
389 /**
390 * Creates a structured task scope with the given name and thread factory. The task
391 * scope is optionally named for the purposes of monitoring and management. The thread
392 * factory is used to {@link ThreadFactory#newThread(Runnable) create} threads when
393 * subtasks are {@linkplain #fork(Callable) forked}. The task scope is owned by the
394 * current thread.
395 *
396 * <p> Construction captures the current thread's {@linkplain ScopedValue scoped value}
397 * bindings for inheritance by threads started in the task scope. The
398 * <a href="#TreeStructure">Tree Structure</a> section in the class description details
399 * how parent-child relations are established implicitly for the purpose of inheritance
400 * of scoped value bindings.
401 *
402 * @param name the name of the task scope, can be null
403 * @param factory the thread factory
441 }
442
443 /**
444 * Throws WrongThreadException if the current thread is not the owner.
445 */
446 private void ensureOwner() {
447 if (Thread.currentThread() != flock.owner())
448 throw new WrongThreadException("Current thread not owner");
449 }
450
451 /**
452 * Throws WrongThreadException if the current thread is not the owner
453 * or a thread contained in the tree.
454 */
455 private void ensureOwnerOrContainsThread() {
456 Thread currentThread = Thread.currentThread();
457 if (currentThread != flock.owner() && !flock.containsThread(currentThread))
458 throw new WrongThreadException("Current thread not owner or thread in the tree");
459 }
460
461 /**
462 * Ensures that the current thread is the owner of this task scope and that it joined
463 * (with {@link #join()} or {@link #joinUntil(Instant)}) after {@linkplain #fork(Callable)
464 * forking} subtasks.
465 *
466 * @apiNote This method can be used by subclasses that define methods to make available
467 * results, state, or other outcome to code intended to execute after the join method.
468 *
469 * @throws WrongThreadException if the current thread is not the task scope owner
470 * @throws IllegalStateException if the task scope is open and task scope owner did
471 * not join after forking
472 */
473 protected final void ensureOwnerAndJoined() {
474 ensureOwner();
475 if (forkRound > lastJoinCompleted) {
476 throw newIllegalStateExceptionNoJoin();
477 }
478 }
479
480 /**
538 * thread contained in the task scope
539 * @throws StructureViolationException if the current scoped value bindings are not
540 * the same as when the task scope was created
541 * @throws RejectedExecutionException if the thread factory rejected creating a
542 * thread to run the subtask
543 */
544 public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
545 Objects.requireNonNull(task, "'task' is null");
546 int s = ensureOpen(); // throws ISE if closed
547
548 // when forked by the owner, the subtask is forked in the current or next round
549 int round = -1;
550 if (Thread.currentThread() == flock.owner()) {
551 round = forkRound;
552 if (forkRound == lastJoinCompleted) {
553 // new round if first fork after join
554 round++;
555 }
556 }
557
558 var subtask = new SubtaskImpl<U>(this, task);
559 if (s < SHUTDOWN) {
560 // create thread to run task
561 Thread thread = factory.newThread(subtask);
562 if (thread == null) {
563 throw new RejectedExecutionException("Rejected by thread factory");
564 }
565
566 // attempt to start the thread
567 try {
568 flock.start(thread);
569 } catch (IllegalStateException e) {
570 // shutdown by another thread, or underlying flock is shutdown due
571 // to unstructured use
572 }
573 }
574
575 // force owner to join if this is the first fork in the round
576 if (Thread.currentThread() == flock.owner() && round > forkRound) {
577 forkRound = round;
578 }
806 * @throws WrongThreadException if the current thread is not the task scope owner
807 * @throws StructureViolationException if a structure violation was detected
808 */
809 @Override
810 public void close() {
811 ensureOwner();
812 int s = state;
813 if (s == CLOSED)
814 return;
815
816 try {
817 if (s < SHUTDOWN)
818 implShutdown();
819 flock.close();
820 } finally {
821 state = CLOSED;
822 }
823
824 // throw ISE if the owner didn't attempt to join after forking
825 if (forkRound > lastJoinAttempted) {
826 lastJoinCompleted = forkRound; // ensureOwnerAndJoined is a no-op after close
827 throw newIllegalStateExceptionNoJoin();
828 }
829 }
830
831 @Override
832 public String toString() {
833 String name = flock.name();
834 return switch (state) {
835 case OPEN -> name;
836 case SHUTDOWN -> name + "/shutdown";
837 case CLOSED -> name + "/closed";
838 default -> throw new InternalError();
839 };
840 }
841
842 /**
843 * Subtask implementation, runs the task specified to the fork method.
844 */
845 private static final class SubtaskImpl<T> implements Subtask<T>, Runnable {
846 private static final AltResult RESULT_NULL = new AltResult(Subtask.State.SUCCESS);
847
848 private record AltResult(Subtask.State state, Throwable exception) {
849 AltResult(Subtask.State state) {
850 this(state, null);
851 }
852 }
853
854 private final StructuredTaskScope<? super T> scope;
855 private final Callable<? extends T> task;
856 private volatile Object result;
857
858 SubtaskImpl(StructuredTaskScope<? super T> scope, Callable<? extends T> task) {
859 this.scope = scope;
860 this.task = task;
861 }
862
863 @Override
864 public void run() {
865 T result = null;
866 Throwable ex = null;
867 try {
868 result = task.call();
869 } catch (Throwable e) {
870 ex = e;
871 }
872
873 // nothing to do if task scope is shutdown
874 if (scope.isShutdown())
875 return;
876
877 // capture result or exception, invoke handleComplete
878 if (ex == null) {
879 this.result = (result != null) ? result : RESULT_NULL;
880 } else {
886 @Override
887 public Callable<? extends T> task() {
888 return task;
889 }
890
891 @Override
892 public Subtask.State state() {
893 Object result = this.result;
894 if (result == null) {
895 return State.UNAVAILABLE;
896 } else if (result instanceof AltResult alt) {
897 // null or failed
898 return alt.state();
899 } else {
900 return State.SUCCESS;
901 }
902 }
903
904 @Override
905 public T get() {
906 Object result = this.result;
907 if (result instanceof AltResult) {
908 if (result == RESULT_NULL) return null;
909 } else if (result != null) {
910 @SuppressWarnings("unchecked")
911 T r = (T) result;
912 return r;
913 }
914 throw new IllegalStateException(
915 "Result is unavailable or subtask did not complete successfully");
916 }
917
918 @Override
919 public Throwable exception() {
920 Object result = this.result;
921 if (result instanceof AltResult alt && alt.state() == State.FAILED) {
922 return alt.exception();
923 }
924 throw new IllegalStateException(
925 "Exception is unavailable or subtask did not complete with exception");
926 }
927
928 @Override
929 public String toString() {
930 String stateAsString = switch (state()) {
931 case UNAVAILABLE -> "[Unavailable]";
932 case SUCCESS -> "[Completed successfully]";
933 case FAILED -> {
934 Throwable ex = ((AltResult) result).exception();
935 yield "[Failed: " + ex + "]";
936 }
937 };
938 return Objects.toIdentityString(this) + stateAsString;
939 }
|