295 *
296 * @jls 17.4.5 Happens-before Order
297 *
298 * @param <T> the result type of tasks executed in the task scope
299 * @since 21
300 */
301 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
302 public class StructuredTaskScope<T> implements AutoCloseable {
303 private final ThreadFactory factory;
304 private final ThreadFlock flock;
305 private final ReentrantLock shutdownLock = new ReentrantLock();
306
307 // states: OPEN -> SHUTDOWN -> CLOSED
308 private static final int OPEN = 0; // initial state
309 private static final int SHUTDOWN = 1;
310 private static final int CLOSED = 2;
311
312 // state: set to SHUTDOWN by any thread, set to CLOSED by owner, read by any thread
313 private volatile int state;
314
315 // Counters to support checking that the task scope owner joins before processing
316 // results and attempts join before closing the task scope. These counters are
317 // accessed only by the owner thread.
318 private int forkRound; // incremented when the first subtask is forked after join
319 private int lastJoinAttempted; // set to the current fork round when join is attempted
320 private int lastJoinCompleted; // set to the current fork round when join completes
321
322 /**
323 * Represents a subtask forked with {@link #fork(Callable)}.
324 * @param <T> the result type
325 * @since 21
326 */
327 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
328 public sealed interface Subtask<T> extends Supplier<T> permits SubtaskImpl {
329 /**
330 * {@return the value returning task provided to the {@code fork} method}
331 *
332 * @apiNote Task objects with unique identity may be used for correlation by
333 * implementations of {@link #handleComplete(Subtask) handleComplete}.
334 */
335 Callable<? extends T> task();
336
337 /**
353 * Subtask.get()} method can be used to obtain the result. This is a terminal
354 * state.
355 */
356 SUCCESS,
357 /**
358 * The subtask failed with an exception. The {@link Subtask#exception()
359 * Subtask.exception()} method can be used to obtain the exception. This is a
360 * terminal state.
361 */
362 FAILED,
363 }
364
365 /**
366 * {@return the state of the subtask}
367 */
368 State state();
369
370 /**
371 * Returns the result of the subtask.
372 *
373 * <p> To ensure correct usage, if the scope owner {@linkplain #fork(Callable) forks}
374 * a subtask, then it must join (with {@link #join() join} or {@link #joinUntil(Instant)
375 * joinUntil}) before it can obtain the result of the subtask.
376 *
377 * @return the possibly-null result
378 * @throws IllegalStateException if the subtask has not completed, did not complete
379 * successfully, or the current thread is the task scope owner and did not join
380 * after forking
381 * @see State#SUCCESS
382 */
383 T get();
384
385 /**
386 * {@return the exception thrown by the subtask}
387 *
388 * <p> To ensure correct usage, if the scope owner {@linkplain #fork(Callable) forks}
389 * a subtask, then it must join (with {@link #join() join} or {@link #joinUntil(Instant)
390 * joinUntil}) before it can obtain the exception thrown by the subtask.
391 *
392 * @throws IllegalStateException if the subtask has not completed, completed with
393 * a result, or the current thread is the task scope owner and did not join after
394 * forking
395 * @see State#FAILED
396 */
397 Throwable exception();
398 }
399
400 /**
401 * Creates a structured task scope with the given name and thread factory. The task
402 * scope is optionally named for the purposes of monitoring and management. The thread
403 * factory is used to {@link ThreadFactory#newThread(Runnable) create} threads when
404 * subtasks are {@linkplain #fork(Callable) forked}. The task scope is owned by the
405 * current thread.
406 *
407 * <p> Construction captures the current thread's {@linkplain ScopedValue scoped value}
408 * bindings for inheritance by threads started in the task scope. The
409 * <a href="#TreeStructure">Tree Structure</a> section in the class description details
410 * how parent-child relations are established implicitly for the purpose of inheritance
411 * of scoped value bindings.
412 *
413 * @param name the name of the task scope, can be null
414 * @param factory the thread factory
451 }
452
453 /**
454 * Throws WrongThreadException if the current thread is not the owner.
455 */
456 private void ensureOwner() {
457 if (Thread.currentThread() != flock.owner())
458 throw new WrongThreadException("Current thread not owner");
459 }
460
461 /**
462 * Throws WrongThreadException if the current thread is not the owner
463 * or a thread contained in the tree.
464 */
465 private void ensureOwnerOrContainsThread() {
466 Thread currentThread = Thread.currentThread();
467 if (currentThread != flock.owner() && !flock.containsThread(currentThread))
468 throw new WrongThreadException("Current thread not owner or thread in the tree");
469 }
470
471 /**
472 * Throws IllegalStateException if the current thread is the owner, and the owner did
473 * not join after forking a subtask in the given fork round.
474 */
475 private void ensureJoinedIfOwner(int round) {
476 if (Thread.currentThread() == flock.owner() && (round > lastJoinCompleted)) {
477 throw newIllegalStateExceptionNoJoin();
478 }
479 }
480
481 /**
482 * Ensures that the current thread is the owner of this task scope and that it joined
483 * (with {@link #join()} or {@link #joinUntil(Instant)}) after {@linkplain #fork(Callable)
484 * forking} subtasks.
485 *
486 * @apiNote This method can be used by subclasses that define methods to make available
487 * results, state, or other outcome to code intended to execute after the join method.
488 *
489 * @throws WrongThreadException if the current thread is not the task scope owner
490 * @throws IllegalStateException if the task scope is open and task scope owner did
491 * not join after forking
492 */
493 protected final void ensureOwnerAndJoined() {
494 ensureOwner();
495 if (forkRound > lastJoinCompleted) {
496 throw newIllegalStateExceptionNoJoin();
497 }
498 }
499
500 /**
558 * thread contained in the task scope
559 * @throws StructureViolationException if the current scoped value bindings are not
560 * the same as when the task scope was created
561 * @throws RejectedExecutionException if the thread factory rejected creating a
562 * thread to run the subtask
563 */
564 public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
565 Objects.requireNonNull(task, "'task' is null");
566 int s = ensureOpen(); // throws ISE if closed
567
568 // when forked by the owner, the subtask is forked in the current or next round
569 int round = -1;
570 if (Thread.currentThread() == flock.owner()) {
571 round = forkRound;
572 if (forkRound == lastJoinCompleted) {
573 // new round if first fork after join
574 round++;
575 }
576 }
577
578 SubtaskImpl<U> subtask = new SubtaskImpl<>(this, task, round);
579 if (s < SHUTDOWN) {
580 // create thread to run task
581 Thread thread = factory.newThread(subtask);
582 if (thread == null) {
583 throw new RejectedExecutionException("Rejected by thread factory");
584 }
585
586 // attempt to start the thread
587 try {
588 flock.start(thread);
589 } catch (IllegalStateException e) {
590 // shutdown by another thread, or underlying flock is shutdown due
591 // to unstructured use
592 }
593 }
594
595 // force owner to join if this is the first fork in the round
596 if (Thread.currentThread() == flock.owner() && round > forkRound) {
597 forkRound = round;
598 }
826 * @throws WrongThreadException if the current thread is not the task scope owner
827 * @throws StructureViolationException if a structure violation was detected
828 */
829 @Override
830 public void close() {
831 ensureOwner();
832 int s = state;
833 if (s == CLOSED)
834 return;
835
836 try {
837 if (s < SHUTDOWN)
838 implShutdown();
839 flock.close();
840 } finally {
841 state = CLOSED;
842 }
843
844 // throw ISE if the owner didn't attempt to join after forking
845 if (forkRound > lastJoinAttempted) {
846 lastJoinCompleted = forkRound;
847 throw newIllegalStateExceptionNoJoin();
848 }
849 }
850
851 @Override
852 public String toString() {
853 String name = flock.name();
854 return switch (state) {
855 case OPEN -> name;
856 case SHUTDOWN -> name + "/shutdown";
857 case CLOSED -> name + "/closed";
858 default -> throw new InternalError();
859 };
860 }
861
862 /**
863 * Subtask implementation, runs the task specified to the fork method.
864 */
865 private static final class SubtaskImpl<T> implements Subtask<T>, Runnable {
866 private static final AltResult RESULT_NULL = new AltResult(Subtask.State.SUCCESS);
867
868 private record AltResult(Subtask.State state, Throwable exception) {
869 AltResult(Subtask.State state) {
870 this(state, null);
871 }
872 }
873
874 private final StructuredTaskScope<? super T> scope;
875 private final Callable<? extends T> task;
876 private final int round;
877 private volatile Object result;
878
879 SubtaskImpl(StructuredTaskScope<? super T> scope,
880 Callable<? extends T> task,
881 int round) {
882 this.scope = scope;
883 this.task = task;
884 this.round = round;
885 }
886
887 @Override
888 public void run() {
889 T result = null;
890 Throwable ex = null;
891 try {
892 result = task.call();
893 } catch (Throwable e) {
894 ex = e;
895 }
896
897 // nothing to do if task scope is shutdown
898 if (scope.isShutdown())
899 return;
900
901 // capture result or exception, invoke handleComplete
902 if (ex == null) {
903 this.result = (result != null) ? result : RESULT_NULL;
904 } else {
910 @Override
911 public Callable<? extends T> task() {
912 return task;
913 }
914
915 @Override
916 public Subtask.State state() {
917 Object result = this.result;
918 if (result == null) {
919 return State.UNAVAILABLE;
920 } else if (result instanceof AltResult alt) {
921 // null or failed
922 return alt.state();
923 } else {
924 return State.SUCCESS;
925 }
926 }
927
928 @Override
929 public T get() {
930 scope.ensureJoinedIfOwner(round);
931 Object result = this.result;
932 if (result instanceof AltResult) {
933 if (result == RESULT_NULL) return null;
934 } else if (result != null) {
935 @SuppressWarnings("unchecked")
936 T r = (T) result;
937 return r;
938 }
939 throw new IllegalStateException(
940 "Result is unavailable or subtask did not complete successfully");
941 }
942
943 @Override
944 public Throwable exception() {
945 scope.ensureJoinedIfOwner(round);
946 Object result = this.result;
947 if (result instanceof AltResult alt && alt.state() == State.FAILED) {
948 return alt.exception();
949 }
950 throw new IllegalStateException(
951 "Exception is unavailable or subtask did not complete with exception");
952 }
953
954 @Override
955 public String toString() {
956 String stateAsString = switch (state()) {
957 case UNAVAILABLE -> "[Unavailable]";
958 case SUCCESS -> "[Completed successfully]";
959 case FAILED -> {
960 Throwable ex = ((AltResult) result).exception();
961 yield "[Failed: " + ex + "]";
962 }
963 };
964 return Objects.toIdentityString(this) + stateAsString;
965 }
|
295 *
296 * @jls 17.4.5 Happens-before Order
297 *
298 * @param <T> the result type of tasks executed in the task scope
299 * @since 21
300 */
301 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
302 public class StructuredTaskScope<T> implements AutoCloseable {
303 private final ThreadFactory factory;
304 private final ThreadFlock flock;
305 private final ReentrantLock shutdownLock = new ReentrantLock();
306
307 // states: OPEN -> SHUTDOWN -> CLOSED
308 private static final int OPEN = 0; // initial state
309 private static final int SHUTDOWN = 1;
310 private static final int CLOSED = 2;
311
312 // state: set to SHUTDOWN by any thread, set to CLOSED by owner, read by any thread
313 private volatile int state;
314
315 // Counters to support checking that the task scope owner joins before closing the task
316 // scope. These counters are accessed only by the owner thread.
317 private int forkRound; // incremented when the first subtask is forked after join
318 private int lastJoinAttempted; // set to the current fork round when join is attempted
319 private int lastJoinCompleted; // set to the current fork round when join completes
320
321 /**
322 * Represents a subtask forked with {@link #fork(Callable)}.
323 * @param <T> the result type
324 * @since 21
325 */
326 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
327 public sealed interface Subtask<T> extends Supplier<T> permits SubtaskImpl {
328 /**
329 * {@return the value returning task provided to the {@code fork} method}
330 *
331 * @apiNote Task objects with unique identity may be used for correlation by
332 * implementations of {@link #handleComplete(Subtask) handleComplete}.
333 */
334 Callable<? extends T> task();
335
336 /**
352 * Subtask.get()} method can be used to obtain the result. This is a terminal
353 * state.
354 */
355 SUCCESS,
356 /**
357 * The subtask failed with an exception. The {@link Subtask#exception()
358 * Subtask.exception()} method can be used to obtain the exception. This is a
359 * terminal state.
360 */
361 FAILED,
362 }
363
364 /**
365 * {@return the state of the subtask}
366 */
367 State state();
368
369 /**
370 * Returns the result of the subtask.
371 *
372 * @return the possibly-null result
373 * @throws IllegalStateException if the subtask has not completed or did not
374 * complete successfully
375 * @see State#SUCCESS
376 */
377 T get();
378
379 /**
380 * {@return the exception thrown by the subtask}
381 *
382 * @throws IllegalStateException if the subtask has not completed or completed
383 * with a result rather than an exception
384 * @see State#FAILED
385 */
386 Throwable exception();
387 }
388
389 /**
390 * Creates a structured task scope with the given name and thread factory. The task
391 * scope is optionally named for the purposes of monitoring and management. The thread
392 * factory is used to {@link ThreadFactory#newThread(Runnable) create} threads when
393 * subtasks are {@linkplain #fork(Callable) forked}. The task scope is owned by the
394 * current thread.
395 *
396 * <p> Construction captures the current thread's {@linkplain ScopedValue scoped value}
397 * bindings for inheritance by threads started in the task scope. The
398 * <a href="#TreeStructure">Tree Structure</a> section in the class description details
399 * how parent-child relations are established implicitly for the purpose of inheritance
400 * of scoped value bindings.
401 *
402 * @param name the name of the task scope, can be null
403 * @param factory the thread factory
440 }
441
442 /**
443 * Throws WrongThreadException if the current thread is not the owner.
444 */
445 private void ensureOwner() {
446 if (Thread.currentThread() != flock.owner())
447 throw new WrongThreadException("Current thread not owner");
448 }
449
450 /**
451 * Throws WrongThreadException if the current thread is not the owner
452 * or a thread contained in the tree.
453 */
454 private void ensureOwnerOrContainsThread() {
455 Thread currentThread = Thread.currentThread();
456 if (currentThread != flock.owner() && !flock.containsThread(currentThread))
457 throw new WrongThreadException("Current thread not owner or thread in the tree");
458 }
459
460 /**
461 * Ensures that the current thread is the owner of this task scope and that it joined
462 * (with {@link #join()} or {@link #joinUntil(Instant)}) after {@linkplain #fork(Callable)
463 * forking} subtasks.
464 *
465 * @apiNote This method can be used by subclasses that define methods to make available
466 * results, state, or other outcome to code intended to execute after the join method.
467 *
468 * @throws WrongThreadException if the current thread is not the task scope owner
469 * @throws IllegalStateException if the task scope is open and task scope owner did
470 * not join after forking
471 */
472 protected final void ensureOwnerAndJoined() {
473 ensureOwner();
474 if (forkRound > lastJoinCompleted) {
475 throw newIllegalStateExceptionNoJoin();
476 }
477 }
478
479 /**
537 * thread contained in the task scope
538 * @throws StructureViolationException if the current scoped value bindings are not
539 * the same as when the task scope was created
540 * @throws RejectedExecutionException if the thread factory rejected creating a
541 * thread to run the subtask
542 */
543 public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
544 Objects.requireNonNull(task, "'task' is null");
545 int s = ensureOpen(); // throws ISE if closed
546
547 // when forked by the owner, the subtask is forked in the current or next round
548 int round = -1;
549 if (Thread.currentThread() == flock.owner()) {
550 round = forkRound;
551 if (forkRound == lastJoinCompleted) {
552 // new round if first fork after join
553 round++;
554 }
555 }
556
557 var subtask = new SubtaskImpl<U>(this, task);
558 if (s < SHUTDOWN) {
559 // create thread to run task
560 Thread thread = factory.newThread(subtask);
561 if (thread == null) {
562 throw new RejectedExecutionException("Rejected by thread factory");
563 }
564
565 // attempt to start the thread
566 try {
567 flock.start(thread);
568 } catch (IllegalStateException e) {
569 // shutdown by another thread, or underlying flock is shutdown due
570 // to unstructured use
571 }
572 }
573
574 // force owner to join if this is the first fork in the round
575 if (Thread.currentThread() == flock.owner() && round > forkRound) {
576 forkRound = round;
577 }
805 * @throws WrongThreadException if the current thread is not the task scope owner
806 * @throws StructureViolationException if a structure violation was detected
807 */
808 @Override
809 public void close() {
810 ensureOwner();
811 int s = state;
812 if (s == CLOSED)
813 return;
814
815 try {
816 if (s < SHUTDOWN)
817 implShutdown();
818 flock.close();
819 } finally {
820 state = CLOSED;
821 }
822
823 // throw ISE if the owner didn't attempt to join after forking
824 if (forkRound > lastJoinAttempted) {
825 lastJoinCompleted = forkRound; // ensureOwnerAndJoined is a no-op after close
826 throw newIllegalStateExceptionNoJoin();
827 }
828 }
829
830 @Override
831 public String toString() {
832 String name = flock.name();
833 return switch (state) {
834 case OPEN -> name;
835 case SHUTDOWN -> name + "/shutdown";
836 case CLOSED -> name + "/closed";
837 default -> throw new InternalError();
838 };
839 }
840
841 /**
842 * Subtask implementation, runs the task specified to the fork method.
843 */
844 private static final class SubtaskImpl<T> implements Subtask<T>, Runnable {
845 private static final AltResult RESULT_NULL = new AltResult(Subtask.State.SUCCESS);
846
847 private record AltResult(Subtask.State state, Throwable exception) {
848 AltResult(Subtask.State state) {
849 this(state, null);
850 }
851 }
852
853 private final StructuredTaskScope<? super T> scope;
854 private final Callable<? extends T> task;
855 private volatile Object result;
856
857 SubtaskImpl(StructuredTaskScope<? super T> scope, Callable<? extends T> task) {
858 this.scope = scope;
859 this.task = task;
860 }
861
862 @Override
863 public void run() {
864 T result = null;
865 Throwable ex = null;
866 try {
867 result = task.call();
868 } catch (Throwable e) {
869 ex = e;
870 }
871
872 // nothing to do if task scope is shutdown
873 if (scope.isShutdown())
874 return;
875
876 // capture result or exception, invoke handleComplete
877 if (ex == null) {
878 this.result = (result != null) ? result : RESULT_NULL;
879 } else {
885 @Override
886 public Callable<? extends T> task() {
887 return task;
888 }
889
890 @Override
891 public Subtask.State state() {
892 Object result = this.result;
893 if (result == null) {
894 return State.UNAVAILABLE;
895 } else if (result instanceof AltResult alt) {
896 // null or failed
897 return alt.state();
898 } else {
899 return State.SUCCESS;
900 }
901 }
902
903 @Override
904 public T get() {
905 Object result = this.result;
906 if (result instanceof AltResult) {
907 if (result == RESULT_NULL) return null;
908 } else if (result != null) {
909 @SuppressWarnings("unchecked")
910 T r = (T) result;
911 return r;
912 }
913 throw new IllegalStateException(
914 "Result is unavailable or subtask did not complete successfully");
915 }
916
917 @Override
918 public Throwable exception() {
919 Object result = this.result;
920 if (result instanceof AltResult alt && alt.state() == State.FAILED) {
921 return alt.exception();
922 }
923 throw new IllegalStateException(
924 "Exception is unavailable or subtask did not complete with exception");
925 }
926
927 @Override
928 public String toString() {
929 String stateAsString = switch (state()) {
930 case UNAVAILABLE -> "[Unavailable]";
931 case SUCCESS -> "[Completed successfully]";
932 case FAILED -> {
933 Throwable ex = ((AltResult) result).exception();
934 yield "[Failed: " + ex + "]";
935 }
936 };
937 return Objects.toIdentityString(this) + stateAsString;
938 }
|