< prev index next >

src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java

Print this page

 295  *
 296  * @jls 17.4.5 Happens-before Order
 297  *
 298  * @param <T> the result type of tasks executed in the task scope
 299  * @since 21
 300  */
 301 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 302 public class StructuredTaskScope<T> implements AutoCloseable {
 303     private final ThreadFactory factory;
 304     private final ThreadFlock flock;
 305     private final ReentrantLock shutdownLock = new ReentrantLock();
 306 
 307     // states: OPEN -> SHUTDOWN -> CLOSED
 308     private static final int OPEN     = 0;   // initial state
 309     private static final int SHUTDOWN = 1;
 310     private static final int CLOSED   = 2;
 311 
 312     // state: set to SHUTDOWN by any thread, set to CLOSED by owner, read by any thread
 313     private volatile int state;
 314 
 315     // Counters to support checking that the task scope owner joins before processing
 316     // results and attempts join before closing the task scope. These counters are
 317     // accessed only by the owner thread.
 318     private int forkRound;         // incremented when the first subtask is forked after join
 319     private int lastJoinAttempted; // set to the current fork round when join is attempted
 320     private int lastJoinCompleted; // set to the current fork round when join completes
 321 
 322     /**
 323      * Represents a subtask forked with {@link #fork(Callable)}.
 324      * @param <T> the result type
 325      * @since 21
 326      */
 327     @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 328     public sealed interface Subtask<T> extends Supplier<T> permits SubtaskImpl {
 329         /**
 330          * {@return the value returning task provided to the {@code fork} method}
 331          *
 332          * @apiNote Task objects with unique identity may be used for correlation by
 333          * implementations of {@link #handleComplete(Subtask) handleComplete}.
 334          */
 335         Callable<? extends T> task();
 336 
 337         /**

 353              * Subtask.get()} method can be used to obtain the result. This is a terminal
 354              * state.
 355              */
 356             SUCCESS,
 357             /**
 358              * The subtask failed with an exception. The {@link Subtask#exception()
 359              * Subtask.exception()} method can be used to obtain the exception. This is a
 360              * terminal state.
 361              */
 362             FAILED,
 363         }
 364 
 365         /**
 366          * {@return the state of the subtask}
 367          */
 368         State state();
 369 
 370         /**
 371          * Returns the result of the subtask.
 372          *
 373          * <p> To ensure correct usage, if the scope owner {@linkplain #fork(Callable) forks}
 374          * a subtask, then it must join (with {@link #join() join} or {@link #joinUntil(Instant)
 375          * joinUntil}) before it can obtain the result of the subtask.
 376          *
 377          * @return the possibly-null result
 378          * @throws IllegalStateException if the subtask has not completed, did not complete
 379          * successfully, or the current thread is the task scope owner and did not join
 380          * after forking
 381          * @see State#SUCCESS
 382          */
 383         T get();
 384 
 385         /**
 386          * {@return the exception thrown by the subtask}
 387          *
 388          * <p> To ensure correct usage, if the scope owner {@linkplain #fork(Callable) forks}
 389          * a subtask, then it must join (with {@link #join() join} or {@link #joinUntil(Instant)
 390          * joinUntil}) before it can obtain the exception thrown by the subtask.
 391          *
 392          * @throws IllegalStateException if the subtask has not completed, completed with
 393          * a result, or the current thread is the task scope owner and did not join after
 394          * forking
 395          * @see State#FAILED
 396          */
 397         Throwable exception();
 398     }
 399 
 400     /**
 401      * Creates a structured task scope with the given name and thread factory. The task
 402      * scope is optionally named for the purposes of monitoring and management. The thread
 403      * factory is used to {@link ThreadFactory#newThread(Runnable) create} threads when
 404      * subtasks are {@linkplain #fork(Callable) forked}. The task scope is owned by the
 405      * current thread.
 406      *
 407      * <p> Construction captures the current thread's {@linkplain ScopedValue scoped value}
 408      * bindings for inheritance by threads started in the task scope. The
 409      * <a href="#TreeStructure">Tree Structure</a> section in the class description details
 410      * how parent-child relations are established implicitly for the purpose of inheritance
 411      * of scoped value bindings.
 412      *
 413      * @param name the name of the task scope, can be null
 414      * @param factory the thread factory

 451     }
 452 
 453     /**
 454      * Throws WrongThreadException if the current thread is not the owner.
 455      */
 456     private void ensureOwner() {
 457         if (Thread.currentThread() != flock.owner())
 458             throw new WrongThreadException("Current thread not owner");
 459     }
 460 
 461     /**
 462      * Throws WrongThreadException if the current thread is not the owner
 463      * or a thread contained in the tree.
 464      */
 465     private void ensureOwnerOrContainsThread() {
 466         Thread currentThread = Thread.currentThread();
 467         if (currentThread != flock.owner() && !flock.containsThread(currentThread))
 468             throw new WrongThreadException("Current thread not owner or thread in the tree");
 469     }
 470 
 471     /**
 472      * Throws IllegalStateException if the current thread is the owner, and the owner did
 473      * not join after forking a subtask in the given fork round.
 474      */
 475     private void ensureJoinedIfOwner(int round) {
 476         if (Thread.currentThread() == flock.owner() && (round > lastJoinCompleted)) {
 477             throw newIllegalStateExceptionNoJoin();
 478         }
 479     }
 480 
 481     /**
 482      * Ensures that the current thread is the owner of this task scope and that it joined
 483      * (with {@link #join()} or {@link #joinUntil(Instant)}) after {@linkplain #fork(Callable)
 484      * forking} subtasks.
 485      *
 486      * @apiNote This method can be used by subclasses that define methods to make available
 487      * results, state, or other outcome to code intended to execute after the join method.
 488      *
 489      * @throws WrongThreadException if the current thread is not the task scope owner
 490      * @throws IllegalStateException if the task scope is open and task scope owner did
 491      * not join after forking
 492      */
 493     protected final void ensureOwnerAndJoined() {
 494         ensureOwner();
 495         if (forkRound > lastJoinCompleted) {
 496             throw newIllegalStateExceptionNoJoin();
 497         }
 498     }
 499 
 500     /**

 558      * thread contained in the task scope
 559      * @throws StructureViolationException if the current scoped value bindings are not
 560      * the same as when the task scope was created
 561      * @throws RejectedExecutionException if the thread factory rejected creating a
 562      * thread to run the subtask
 563      */
 564     public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
 565         Objects.requireNonNull(task, "'task' is null");
 566         int s = ensureOpen();   // throws ISE if closed
 567 
 568         // when forked by the owner, the subtask is forked in the current or next round
 569         int round = -1;
 570         if (Thread.currentThread() == flock.owner()) {
 571             round = forkRound;
 572             if (forkRound == lastJoinCompleted) {
 573                 // new round if first fork after join
 574                 round++;
 575             }
 576         }
 577 
 578         SubtaskImpl<U> subtask = new SubtaskImpl<>(this, task, round);
 579         if (s < SHUTDOWN) {
 580             // create thread to run task
 581             Thread thread = factory.newThread(subtask);
 582             if (thread == null) {
 583                 throw new RejectedExecutionException("Rejected by thread factory");
 584             }
 585 
 586             // attempt to start the thread
 587             try {
 588                 flock.start(thread);
 589             } catch (IllegalStateException e) {
 590                 // shutdown by another thread, or underlying flock is shutdown due
 591                 // to unstructured use
 592             }
 593         }
 594 
 595         // force owner to join if this is the first fork in the round
 596         if (Thread.currentThread() == flock.owner() && round > forkRound) {
 597             forkRound = round;
 598         }

 826      * @throws WrongThreadException if the current thread is not the task scope owner
 827      * @throws StructureViolationException if a structure violation was detected
 828      */
 829     @Override
 830     public void close() {
 831         ensureOwner();
 832         int s = state;
 833         if (s == CLOSED)
 834             return;
 835 
 836         try {
 837             if (s < SHUTDOWN)
 838                 implShutdown();
 839             flock.close();
 840         } finally {
 841             state = CLOSED;
 842         }
 843 
 844         // throw ISE if the owner didn't attempt to join after forking
 845         if (forkRound > lastJoinAttempted) {
 846             lastJoinCompleted = forkRound;
 847             throw newIllegalStateExceptionNoJoin();
 848         }
 849     }
 850 
 851     @Override
 852     public String toString() {
 853         String name = flock.name();
 854         return switch (state) {
 855             case OPEN     -> name;
 856             case SHUTDOWN -> name + "/shutdown";
 857             case CLOSED   -> name + "/closed";
 858             default -> throw new InternalError();
 859         };
 860     }
 861 
 862     /**
 863      * Subtask implementation, runs the task specified to the fork method.
 864      */
 865     private static final class SubtaskImpl<T> implements Subtask<T>, Runnable {
 866         private static final AltResult RESULT_NULL = new AltResult(Subtask.State.SUCCESS);
 867 
 868         private record AltResult(Subtask.State state, Throwable exception) {
 869             AltResult(Subtask.State state) {
 870                 this(state, null);
 871             }
 872         }
 873 
 874         private final StructuredTaskScope<? super T> scope;
 875         private final Callable<? extends T> task;
 876         private final int round;
 877         private volatile Object result;
 878 
 879         SubtaskImpl(StructuredTaskScope<? super T> scope,
 880                     Callable<? extends T> task,
 881                     int round) {
 882             this.scope = scope;
 883             this.task = task;
 884             this.round = round;
 885         }
 886 
 887         @Override
 888         public void run() {
 889             T result = null;
 890             Throwable ex = null;
 891             try {
 892                 result = task.call();
 893             } catch (Throwable e) {
 894                 ex = e;
 895             }
 896 
 897             // nothing to do if task scope is shutdown
 898             if (scope.isShutdown())
 899                 return;
 900 
 901             // capture result or exception, invoke handleComplete
 902             if (ex == null) {
 903                 this.result = (result != null) ? result : RESULT_NULL;
 904             } else {

 910         @Override
 911         public Callable<? extends T> task() {
 912             return task;
 913         }
 914 
 915         @Override
 916         public Subtask.State state() {
 917             Object result = this.result;
 918             if (result == null) {
 919                 return State.UNAVAILABLE;
 920             } else if (result instanceof AltResult alt) {
 921                 // null or failed
 922                 return alt.state();
 923             } else {
 924                 return State.SUCCESS;
 925             }
 926         }
 927 
 928         @Override
 929         public T get() {
 930             scope.ensureJoinedIfOwner(round);
 931             Object result = this.result;
 932             if (result instanceof AltResult) {
 933                 if (result == RESULT_NULL) return null;
 934             } else if (result != null) {
 935                 @SuppressWarnings("unchecked")
 936                 T r = (T) result;
 937                 return r;
 938             }
 939             throw new IllegalStateException(
 940                     "Result is unavailable or subtask did not complete successfully");
 941         }
 942 
 943         @Override
 944         public Throwable exception() {
 945             scope.ensureJoinedIfOwner(round);
 946             Object result = this.result;
 947             if (result instanceof AltResult alt && alt.state() == State.FAILED) {
 948                 return alt.exception();
 949             }
 950             throw new IllegalStateException(
 951                     "Exception is unavailable or subtask did not complete with exception");
 952         }
 953 
 954         @Override
 955         public String toString() {
 956             String stateAsString = switch (state()) {
 957                 case UNAVAILABLE -> "[Unavailable]";
 958                 case SUCCESS     -> "[Completed successfully]";
 959                 case FAILED      -> {
 960                     Throwable ex = ((AltResult) result).exception();
 961                     yield "[Failed: " + ex + "]";
 962                 }
 963             };
 964             return Objects.toIdentityString(this) + stateAsString;
 965         }

 295  *
 296  * @jls 17.4.5 Happens-before Order
 297  *
 298  * @param <T> the result type of tasks executed in the task scope
 299  * @since 21
 300  */
 301 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 302 public class StructuredTaskScope<T> implements AutoCloseable {
 303     private final ThreadFactory factory;
 304     private final ThreadFlock flock;
 305     private final ReentrantLock shutdownLock = new ReentrantLock();
 306 
 307     // states: OPEN -> SHUTDOWN -> CLOSED
 308     private static final int OPEN     = 0;   // initial state
 309     private static final int SHUTDOWN = 1;
 310     private static final int CLOSED   = 2;
 311 
 312     // state: set to SHUTDOWN by any thread, set to CLOSED by owner, read by any thread
 313     private volatile int state;
 314 
 315     // Counters to support checking that the task scope owner joins before closing the task
 316     // scope. These counters are accessed only by the owner thread.

 317     private int forkRound;         // incremented when the first subtask is forked after join
 318     private int lastJoinAttempted; // set to the current fork round when join is attempted
 319     private int lastJoinCompleted; // set to the current fork round when join completes
 320 
 321     /**
 322      * Represents a subtask forked with {@link #fork(Callable)}.
 323      * @param <T> the result type
 324      * @since 21
 325      */
 326     @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 327     public sealed interface Subtask<T> extends Supplier<T> permits SubtaskImpl {
 328         /**
 329          * {@return the value returning task provided to the {@code fork} method}
 330          *
 331          * @apiNote Task objects with unique identity may be used for correlation by
 332          * implementations of {@link #handleComplete(Subtask) handleComplete}.
 333          */
 334         Callable<? extends T> task();
 335 
 336         /**

 352              * Subtask.get()} method can be used to obtain the result. This is a terminal
 353              * state.
 354              */
 355             SUCCESS,
 356             /**
 357              * The subtask failed with an exception. The {@link Subtask#exception()
 358              * Subtask.exception()} method can be used to obtain the exception. This is a
 359              * terminal state.
 360              */
 361             FAILED,
 362         }
 363 
 364         /**
 365          * {@return the state of the subtask}
 366          */
 367         State state();
 368 
 369         /**
 370          * Returns the result of the subtask.
 371          *




 372          * @return the possibly-null result
 373          * @throws IllegalStateException if the subtask has not completed or did not
 374          * complete successfully

 375          * @see State#SUCCESS
 376          */
 377         T get();
 378 
 379         /**
 380          * {@return the exception thrown by the subtask}
 381          *
 382          * @throws IllegalStateException if the subtask has not completed or completed
 383          * with a result rather than an exception





 384          * @see State#FAILED
 385          */
 386         Throwable exception();
 387     }
 388 
 389     /**
 390      * Creates a structured task scope with the given name and thread factory. The task
 391      * scope is optionally named for the purposes of monitoring and management. The thread
 392      * factory is used to {@link ThreadFactory#newThread(Runnable) create} threads when
 393      * subtasks are {@linkplain #fork(Callable) forked}. The task scope is owned by the
 394      * current thread.
 395      *
 396      * <p> Construction captures the current thread's {@linkplain ScopedValue scoped value}
 397      * bindings for inheritance by threads started in the task scope. The
 398      * <a href="#TreeStructure">Tree Structure</a> section in the class description details
 399      * how parent-child relations are established implicitly for the purpose of inheritance
 400      * of scoped value bindings.
 401      *
 402      * @param name the name of the task scope, can be null
 403      * @param factory the thread factory

 440     }
 441 
 442     /**
 443      * Throws WrongThreadException if the current thread is not the owner.
 444      */
 445     private void ensureOwner() {
 446         if (Thread.currentThread() != flock.owner())
 447             throw new WrongThreadException("Current thread not owner");
 448     }
 449 
 450     /**
 451      * Throws WrongThreadException if the current thread is not the owner
 452      * or a thread contained in the tree.
 453      */
 454     private void ensureOwnerOrContainsThread() {
 455         Thread currentThread = Thread.currentThread();
 456         if (currentThread != flock.owner() && !flock.containsThread(currentThread))
 457             throw new WrongThreadException("Current thread not owner or thread in the tree");
 458     }
 459 










 460     /**
 461      * Ensures that the current thread is the owner of this task scope and that it joined
 462      * (with {@link #join()} or {@link #joinUntil(Instant)}) after {@linkplain #fork(Callable)
 463      * forking} subtasks.
 464      *
 465      * @apiNote This method can be used by subclasses that define methods to make available
 466      * results, state, or other outcome to code intended to execute after the join method.
 467      *
 468      * @throws WrongThreadException if the current thread is not the task scope owner
 469      * @throws IllegalStateException if the task scope is open and task scope owner did
 470      * not join after forking
 471      */
 472     protected final void ensureOwnerAndJoined() {
 473         ensureOwner();
 474         if (forkRound > lastJoinCompleted) {
 475             throw newIllegalStateExceptionNoJoin();
 476         }
 477     }
 478 
 479     /**

 537      * thread contained in the task scope
 538      * @throws StructureViolationException if the current scoped value bindings are not
 539      * the same as when the task scope was created
 540      * @throws RejectedExecutionException if the thread factory rejected creating a
 541      * thread to run the subtask
 542      */
 543     public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
 544         Objects.requireNonNull(task, "'task' is null");
 545         int s = ensureOpen();   // throws ISE if closed
 546 
 547         // when forked by the owner, the subtask is forked in the current or next round
 548         int round = -1;
 549         if (Thread.currentThread() == flock.owner()) {
 550             round = forkRound;
 551             if (forkRound == lastJoinCompleted) {
 552                 // new round if first fork after join
 553                 round++;
 554             }
 555         }
 556 
 557         var subtask = new SubtaskImpl<U>(this, task);
 558         if (s < SHUTDOWN) {
 559             // create thread to run task
 560             Thread thread = factory.newThread(subtask);
 561             if (thread == null) {
 562                 throw new RejectedExecutionException("Rejected by thread factory");
 563             }
 564 
 565             // attempt to start the thread
 566             try {
 567                 flock.start(thread);
 568             } catch (IllegalStateException e) {
 569                 // shutdown by another thread, or underlying flock is shutdown due
 570                 // to unstructured use
 571             }
 572         }
 573 
 574         // force owner to join if this is the first fork in the round
 575         if (Thread.currentThread() == flock.owner() && round > forkRound) {
 576             forkRound = round;
 577         }

 805      * @throws WrongThreadException if the current thread is not the task scope owner
 806      * @throws StructureViolationException if a structure violation was detected
 807      */
 808     @Override
 809     public void close() {
 810         ensureOwner();
 811         int s = state;
 812         if (s == CLOSED)
 813             return;
 814 
 815         try {
 816             if (s < SHUTDOWN)
 817                 implShutdown();
 818             flock.close();
 819         } finally {
 820             state = CLOSED;
 821         }
 822 
 823         // throw ISE if the owner didn't attempt to join after forking
 824         if (forkRound > lastJoinAttempted) {
 825             lastJoinCompleted = forkRound;  // ensureOwnerAndJoined is a no-op after close
 826             throw newIllegalStateExceptionNoJoin();
 827         }
 828     }
 829 
 830     @Override
 831     public String toString() {
 832         String name = flock.name();
 833         return switch (state) {
 834             case OPEN     -> name;
 835             case SHUTDOWN -> name + "/shutdown";
 836             case CLOSED   -> name + "/closed";
 837             default -> throw new InternalError();
 838         };
 839     }
 840 
 841     /**
 842      * Subtask implementation, runs the task specified to the fork method.
 843      */
 844     private static final class SubtaskImpl<T> implements Subtask<T>, Runnable {
 845         private static final AltResult RESULT_NULL = new AltResult(Subtask.State.SUCCESS);
 846 
 847         private record AltResult(Subtask.State state, Throwable exception) {
 848             AltResult(Subtask.State state) {
 849                 this(state, null);
 850             }
 851         }
 852 
 853         private final StructuredTaskScope<? super T> scope;
 854         private final Callable<? extends T> task;

 855         private volatile Object result;
 856 
 857         SubtaskImpl(StructuredTaskScope<? super T> scope, Callable<? extends T> task) {


 858             this.scope = scope;
 859             this.task = task;

 860         }
 861 
 862         @Override
 863         public void run() {
 864             T result = null;
 865             Throwable ex = null;
 866             try {
 867                 result = task.call();
 868             } catch (Throwable e) {
 869                 ex = e;
 870             }
 871 
 872             // nothing to do if task scope is shutdown
 873             if (scope.isShutdown())
 874                 return;
 875 
 876             // capture result or exception, invoke handleComplete
 877             if (ex == null) {
 878                 this.result = (result != null) ? result : RESULT_NULL;
 879             } else {

 885         @Override
 886         public Callable<? extends T> task() {
 887             return task;
 888         }
 889 
 890         @Override
 891         public Subtask.State state() {
 892             Object result = this.result;
 893             if (result == null) {
 894                 return State.UNAVAILABLE;
 895             } else if (result instanceof AltResult alt) {
 896                 // null or failed
 897                 return alt.state();
 898             } else {
 899                 return State.SUCCESS;
 900             }
 901         }
 902 
 903         @Override
 904         public T get() {

 905             Object result = this.result;
 906             if (result instanceof AltResult) {
 907                 if (result == RESULT_NULL) return null;
 908             } else if (result != null) {
 909                 @SuppressWarnings("unchecked")
 910                 T r = (T) result;
 911                 return r;
 912             }
 913             throw new IllegalStateException(
 914                     "Result is unavailable or subtask did not complete successfully");
 915         }
 916 
 917         @Override
 918         public Throwable exception() {

 919             Object result = this.result;
 920             if (result instanceof AltResult alt && alt.state() == State.FAILED) {
 921                 return alt.exception();
 922             }
 923             throw new IllegalStateException(
 924                     "Exception is unavailable or subtask did not complete with exception");
 925         }
 926 
 927         @Override
 928         public String toString() {
 929             String stateAsString = switch (state()) {
 930                 case UNAVAILABLE -> "[Unavailable]";
 931                 case SUCCESS     -> "[Completed successfully]";
 932                 case FAILED      -> {
 933                     Throwable ex = ((AltResult) result).exception();
 934                     yield "[Failed: " + ex + "]";
 935                 }
 936             };
 937             return Objects.toIdentityString(this) + stateAsString;
 938         }
< prev index next >