< prev index next >

src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java

Print this page

 295  *
 296  * @jls 17.4.5 Happens-before Order
 297  *
 298  * @param <T> the result type of tasks executed in the task scope
 299  * @since 21
 300  */
 301 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 302 public class StructuredTaskScope<T> implements AutoCloseable {
 303     private final ThreadFactory factory;
 304     private final ThreadFlock flock;
 305     private final ReentrantLock shutdownLock = new ReentrantLock();
 306 
 307     // states: OPEN -> SHUTDOWN -> CLOSED
 308     private static final int OPEN     = 0;   // initial state
 309     private static final int SHUTDOWN = 1;
 310     private static final int CLOSED   = 2;
 311 
 312     // state: set to SHUTDOWN by any thread, set to CLOSED by owner, read by any thread
 313     private volatile int state;
 314 
 315     // Counters to support checking that the task scope owner joins before processing
 316     // results and attempts join before closing the task scope. These counters are
 317     // accessed only by the owner thread.
 318     private int forkRound;         // incremented when the first subtask is forked after join
 319     private int lastJoinAttempted; // set to the current fork round when join is attempted
 320     private int lastJoinCompleted; // set to the current fork round when join completes
 321 
 322     /**
 323      * Represents a subtask forked with {@link #fork(Callable)}.
 324      * @param <T> the result type
 325      * @since 21
 326      */
 327     @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 328     public sealed interface Subtask<T> extends Supplier<T> permits SubtaskImpl {
 329         /**
 330          * {@return the value returning task provided to the {@code fork} method}
 331          *
 332          * @apiNote Task objects with unique identity may be used for correlation by
 333          * implementations of {@link #handleComplete(Subtask) handleComplete}.
 334          */
 335         Callable<? extends T> task();
 336 
 337         /**

 353              * Subtask.get()} method can be used to obtain the result. This is a terminal
 354              * state.
 355              */
 356             SUCCESS,
 357             /**
 358              * The subtask failed with an exception. The {@link Subtask#exception()
 359              * Subtask.exception()} method can be used to obtain the exception. This is a
 360              * terminal state.
 361              */
 362             FAILED,
 363         }
 364 
 365         /**
 366          * {@return the state of the subtask}
 367          */
 368         State state();
 369 
 370         /**
 371          * Returns the result of the subtask.
 372          *
 373          * <p> To ensure correct usage, if the scope owner {@linkplain #fork(Callable) forks}
 374          * a subtask, then it must join (with {@link #join() join} or {@link #joinUntil(Instant)
 375          * joinUntil}) before it can obtain the result of the subtask.
 376          *
 377          * @return the possibly-null result
 378          * @throws IllegalStateException if the subtask has not completed, did not complete
 379          * successfully, or the current thread is the task scope owner and did not join
 380          * after forking
 381          * @see State#SUCCESS
 382          */
 383         T get();
 384 
 385         /**
 386          * {@return the exception thrown by the subtask}
 387          *
 388          * <p> To ensure correct usage, if the scope owner {@linkplain #fork(Callable) forks}
 389          * a subtask, then it must join (with {@link #join() join} or {@link #joinUntil(Instant)
 390          * joinUntil}) before it can obtain the exception thrown by the subtask.
 391          *
 392          * @throws IllegalStateException if the subtask has not completed, completed with
 393          * a result, or the current thread is the task scope owner and did not join after
 394          * forking
 395          * @see State#FAILED
 396          */
 397         Throwable exception();
 398     }
 399 
 400     /**
 401      * Creates a structured task scope with the given name and thread factory. The task
 402      * scope is optionally named for the purposes of monitoring and management. The thread
 403      * factory is used to {@link ThreadFactory#newThread(Runnable) create} threads when
 404      * subtasks are {@linkplain #fork(Callable) forked}. The task scope is owned by the
 405      * current thread.
 406      *
 407      * <p> Construction captures the current thread's {@linkplain ScopedValue scoped value}
 408      * bindings for inheritance by threads started in the task scope. The
 409      * <a href="#TreeStructure">Tree Structure</a> section in the class description details
 410      * how parent-child relations are established implicitly for the purpose of inheritance
 411      * of scoped value bindings.
 412      *
 413      * @param name the name of the task scope, can be null
 414      * @param factory the thread factory

 452     }
 453 
 454     /**
 455      * Throws WrongThreadException if the current thread is not the owner.
 456      */
 457     private void ensureOwner() {
 458         if (Thread.currentThread() != flock.owner())
 459             throw new WrongThreadException("Current thread not owner");
 460     }
 461 
 462     /**
 463      * Throws WrongThreadException if the current thread is not the owner
 464      * or a thread contained in the tree.
 465      */
 466     private void ensureOwnerOrContainsThread() {
 467         Thread currentThread = Thread.currentThread();
 468         if (currentThread != flock.owner() && !flock.containsThread(currentThread))
 469             throw new WrongThreadException("Current thread not owner or thread in the tree");
 470     }
 471 
 472     /**
 473      * Throws IllegalStateException if the current thread is the owner, and the owner did
 474      * not join after forking a subtask in the given fork round.
 475      */
 476     private void ensureJoinedIfOwner(int round) {
 477         if (Thread.currentThread() == flock.owner() && (round > lastJoinCompleted)) {
 478             throw newIllegalStateExceptionNoJoin();
 479         }
 480     }
 481 
 482     /**
 483      * Ensures that the current thread is the owner of this task scope and that it joined
 484      * (with {@link #join()} or {@link #joinUntil(Instant)}) after {@linkplain #fork(Callable)
 485      * forking} subtasks.
 486      *
 487      * @apiNote This method can be used by subclasses that define methods to make available
 488      * results, state, or other outcome to code intended to execute after the join method.
 489      *
 490      * @throws WrongThreadException if the current thread is not the task scope owner
 491      * @throws IllegalStateException if the task scope is open and task scope owner did
 492      * not join after forking
 493      */
 494     protected final void ensureOwnerAndJoined() {
 495         ensureOwner();
 496         if (forkRound > lastJoinCompleted) {
 497             throw newIllegalStateExceptionNoJoin();
 498         }
 499     }
 500 
 501     /**

 559      * thread contained in the task scope
 560      * @throws StructureViolationException if the current scoped value bindings are not
 561      * the same as when the task scope was created
 562      * @throws RejectedExecutionException if the thread factory rejected creating a
 563      * thread to run the subtask
 564      */
 565     public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
 566         Objects.requireNonNull(task, "'task' is null");
 567         int s = ensureOpen();   // throws ISE if closed
 568 
 569         // when forked by the owner, the subtask is forked in the current or next round
 570         int round = -1;
 571         if (Thread.currentThread() == flock.owner()) {
 572             round = forkRound;
 573             if (forkRound == lastJoinCompleted) {
 574                 // new round if first fork after join
 575                 round++;
 576             }
 577         }
 578 
 579         SubtaskImpl<U> subtask = new SubtaskImpl<>(this, task, round);
 580         if (s < SHUTDOWN) {
 581             // create thread to run task
 582             Thread thread = factory.newThread(subtask);
 583             if (thread == null) {
 584                 throw new RejectedExecutionException("Rejected by thread factory");
 585             }
 586 
 587             // attempt to start the thread
 588             try {
 589                 flock.start(thread);
 590             } catch (IllegalStateException e) {
 591                 // shutdown by another thread, or underlying flock is shutdown due
 592                 // to unstructured use
 593             }
 594         }
 595 
 596         // force owner to join if this is the first fork in the round
 597         if (Thread.currentThread() == flock.owner() && round > forkRound) {
 598             forkRound = round;
 599         }

 827      * @throws WrongThreadException if the current thread is not the task scope owner
 828      * @throws StructureViolationException if a structure violation was detected
 829      */
 830     @Override
 831     public void close() {
 832         ensureOwner();
 833         int s = state;
 834         if (s == CLOSED)
 835             return;
 836 
 837         try {
 838             if (s < SHUTDOWN)
 839                 implShutdown();
 840             flock.close();
 841         } finally {
 842             state = CLOSED;
 843         }
 844 
 845         // throw ISE if the owner didn't attempt to join after forking
 846         if (forkRound > lastJoinAttempted) {
 847             lastJoinCompleted = forkRound;
 848             throw newIllegalStateExceptionNoJoin();
 849         }
 850     }
 851 
 852     @Override
 853     public String toString() {
 854         String name = flock.name();
 855         return switch (state) {
 856             case OPEN     -> name;
 857             case SHUTDOWN -> name + "/shutdown";
 858             case CLOSED   -> name + "/closed";
 859             default -> throw new InternalError();
 860         };
 861     }
 862 
 863     /**
 864      * Subtask implementation, runs the task specified to the fork method.
 865      */
 866     private static final class SubtaskImpl<T> implements Subtask<T>, Runnable {
 867         private static final AltResult RESULT_NULL = new AltResult(Subtask.State.SUCCESS);
 868 
 869         private record AltResult(Subtask.State state, Throwable exception) {
 870             AltResult(Subtask.State state) {
 871                 this(state, null);
 872             }
 873         }
 874 
 875         private final StructuredTaskScope<? super T> scope;
 876         private final Callable<? extends T> task;
 877         private final int round;
 878         private volatile Object result;
 879 
 880         SubtaskImpl(StructuredTaskScope<? super T> scope,
 881                     Callable<? extends T> task,
 882                     int round) {
 883             this.scope = scope;
 884             this.task = task;
 885             this.round = round;
 886         }
 887 
 888         @Override
 889         public void run() {
 890             T result = null;
 891             Throwable ex = null;
 892             try {
 893                 result = task.call();
 894             } catch (Throwable e) {
 895                 ex = e;
 896             }
 897 
 898             // nothing to do if task scope is shutdown
 899             if (scope.isShutdown())
 900                 return;
 901 
 902             // capture result or exception, invoke handleComplete
 903             if (ex == null) {
 904                 this.result = (result != null) ? result : RESULT_NULL;
 905             } else {

 911         @Override
 912         public Callable<? extends T> task() {
 913             return task;
 914         }
 915 
 916         @Override
 917         public Subtask.State state() {
 918             Object result = this.result;
 919             if (result == null) {
 920                 return State.UNAVAILABLE;
 921             } else if (result instanceof AltResult alt) {
 922                 // null or failed
 923                 return alt.state();
 924             } else {
 925                 return State.SUCCESS;
 926             }
 927         }
 928 
 929         @Override
 930         public T get() {
 931             scope.ensureJoinedIfOwner(round);
 932             Object result = this.result;
 933             if (result instanceof AltResult) {
 934                 if (result == RESULT_NULL) return null;
 935             } else if (result != null) {
 936                 @SuppressWarnings("unchecked")
 937                 T r = (T) result;
 938                 return r;
 939             }
 940             throw new IllegalStateException(
 941                     "Result is unavailable or subtask did not complete successfully");
 942         }
 943 
 944         @Override
 945         public Throwable exception() {
 946             scope.ensureJoinedIfOwner(round);
 947             Object result = this.result;
 948             if (result instanceof AltResult alt && alt.state() == State.FAILED) {
 949                 return alt.exception();
 950             }
 951             throw new IllegalStateException(
 952                     "Exception is unavailable or subtask did not complete with exception");
 953         }
 954 
 955         @Override
 956         public String toString() {
 957             String stateAsString = switch (state()) {
 958                 case UNAVAILABLE -> "[Unavailable]";
 959                 case SUCCESS     -> "[Completed successfully]";
 960                 case FAILED      -> {
 961                     Throwable ex = ((AltResult) result).exception();
 962                     yield "[Failed: " + ex + "]";
 963                 }
 964             };
 965             return Objects.toIdentityString(this) + stateAsString;
 966         }

 295  *
 296  * @jls 17.4.5 Happens-before Order
 297  *
 298  * @param <T> the result type of tasks executed in the task scope
 299  * @since 21
 300  */
 301 @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 302 public class StructuredTaskScope<T> implements AutoCloseable {
 303     private final ThreadFactory factory;
 304     private final ThreadFlock flock;
 305     private final ReentrantLock shutdownLock = new ReentrantLock();
 306 
 307     // states: OPEN -> SHUTDOWN -> CLOSED
 308     private static final int OPEN     = 0;   // initial state
 309     private static final int SHUTDOWN = 1;
 310     private static final int CLOSED   = 2;
 311 
 312     // state: set to SHUTDOWN by any thread, set to CLOSED by owner, read by any thread
 313     private volatile int state;
 314 
 315     // Counters to support checking that the task scope owner joins before closing the task
 316     // scope. These counters are accessed only by the owner thread.

 317     private int forkRound;         // incremented when the first subtask is forked after join
 318     private int lastJoinAttempted; // set to the current fork round when join is attempted
 319     private int lastJoinCompleted; // set to the current fork round when join completes
 320 
 321     /**
 322      * Represents a subtask forked with {@link #fork(Callable)}.
 323      * @param <T> the result type
 324      * @since 21
 325      */
 326     @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
 327     public sealed interface Subtask<T> extends Supplier<T> permits SubtaskImpl {
 328         /**
 329          * {@return the value returning task provided to the {@code fork} method}
 330          *
 331          * @apiNote Task objects with unique identity may be used for correlation by
 332          * implementations of {@link #handleComplete(Subtask) handleComplete}.
 333          */
 334         Callable<? extends T> task();
 335 
 336         /**

 352              * Subtask.get()} method can be used to obtain the result. This is a terminal
 353              * state.
 354              */
 355             SUCCESS,
 356             /**
 357              * The subtask failed with an exception. The {@link Subtask#exception()
 358              * Subtask.exception()} method can be used to obtain the exception. This is a
 359              * terminal state.
 360              */
 361             FAILED,
 362         }
 363 
 364         /**
 365          * {@return the state of the subtask}
 366          */
 367         State state();
 368 
 369         /**
 370          * Returns the result of the subtask.
 371          *




 372          * @return the possibly-null result
 373          * @throws IllegalStateException if the subtask has not completed or did not
 374          * complete successfully

 375          * @see State#SUCCESS
 376          */
 377         T get();
 378 
 379         /**
 380          * {@return the exception thrown by the subtask}
 381          *
 382          * @throws IllegalStateException if the subtask has not completed or completed
 383          * with a result rather than an exception





 384          * @see State#FAILED
 385          */
 386         Throwable exception();
 387     }
 388 
 389     /**
 390      * Creates a structured task scope with the given name and thread factory. The task
 391      * scope is optionally named for the purposes of monitoring and management. The thread
 392      * factory is used to {@link ThreadFactory#newThread(Runnable) create} threads when
 393      * subtasks are {@linkplain #fork(Callable) forked}. The task scope is owned by the
 394      * current thread.
 395      *
 396      * <p> Construction captures the current thread's {@linkplain ScopedValue scoped value}
 397      * bindings for inheritance by threads started in the task scope. The
 398      * <a href="#TreeStructure">Tree Structure</a> section in the class description details
 399      * how parent-child relations are established implicitly for the purpose of inheritance
 400      * of scoped value bindings.
 401      *
 402      * @param name the name of the task scope, can be null
 403      * @param factory the thread factory

 441     }
 442 
 443     /**
 444      * Throws WrongThreadException if the current thread is not the owner.
 445      */
 446     private void ensureOwner() {
 447         if (Thread.currentThread() != flock.owner())
 448             throw new WrongThreadException("Current thread not owner");
 449     }
 450 
 451     /**
 452      * Throws WrongThreadException if the current thread is not the owner
 453      * or a thread contained in the tree.
 454      */
 455     private void ensureOwnerOrContainsThread() {
 456         Thread currentThread = Thread.currentThread();
 457         if (currentThread != flock.owner() && !flock.containsThread(currentThread))
 458             throw new WrongThreadException("Current thread not owner or thread in the tree");
 459     }
 460 










 461     /**
 462      * Ensures that the current thread is the owner of this task scope and that it joined
 463      * (with {@link #join()} or {@link #joinUntil(Instant)}) after {@linkplain #fork(Callable)
 464      * forking} subtasks.
 465      *
 466      * @apiNote This method can be used by subclasses that define methods to make available
 467      * results, state, or other outcome to code intended to execute after the join method.
 468      *
 469      * @throws WrongThreadException if the current thread is not the task scope owner
 470      * @throws IllegalStateException if the task scope is open and task scope owner did
 471      * not join after forking
 472      */
 473     protected final void ensureOwnerAndJoined() {
 474         ensureOwner();
 475         if (forkRound > lastJoinCompleted) {
 476             throw newIllegalStateExceptionNoJoin();
 477         }
 478     }
 479 
 480     /**

 538      * thread contained in the task scope
 539      * @throws StructureViolationException if the current scoped value bindings are not
 540      * the same as when the task scope was created
 541      * @throws RejectedExecutionException if the thread factory rejected creating a
 542      * thread to run the subtask
 543      */
 544     public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
 545         Objects.requireNonNull(task, "'task' is null");
 546         int s = ensureOpen();   // throws ISE if closed
 547 
 548         // when forked by the owner, the subtask is forked in the current or next round
 549         int round = -1;
 550         if (Thread.currentThread() == flock.owner()) {
 551             round = forkRound;
 552             if (forkRound == lastJoinCompleted) {
 553                 // new round if first fork after join
 554                 round++;
 555             }
 556         }
 557 
 558         var subtask = new SubtaskImpl<U>(this, task);
 559         if (s < SHUTDOWN) {
 560             // create thread to run task
 561             Thread thread = factory.newThread(subtask);
 562             if (thread == null) {
 563                 throw new RejectedExecutionException("Rejected by thread factory");
 564             }
 565 
 566             // attempt to start the thread
 567             try {
 568                 flock.start(thread);
 569             } catch (IllegalStateException e) {
 570                 // shutdown by another thread, or underlying flock is shutdown due
 571                 // to unstructured use
 572             }
 573         }
 574 
 575         // force owner to join if this is the first fork in the round
 576         if (Thread.currentThread() == flock.owner() && round > forkRound) {
 577             forkRound = round;
 578         }

 806      * @throws WrongThreadException if the current thread is not the task scope owner
 807      * @throws StructureViolationException if a structure violation was detected
 808      */
 809     @Override
 810     public void close() {
 811         ensureOwner();
 812         int s = state;
 813         if (s == CLOSED)
 814             return;
 815 
 816         try {
 817             if (s < SHUTDOWN)
 818                 implShutdown();
 819             flock.close();
 820         } finally {
 821             state = CLOSED;
 822         }
 823 
 824         // throw ISE if the owner didn't attempt to join after forking
 825         if (forkRound > lastJoinAttempted) {
 826             lastJoinCompleted = forkRound;  // ensureOwnerAndJoined is a no-op after close
 827             throw newIllegalStateExceptionNoJoin();
 828         }
 829     }
 830 
 831     @Override
 832     public String toString() {
 833         String name = flock.name();
 834         return switch (state) {
 835             case OPEN     -> name;
 836             case SHUTDOWN -> name + "/shutdown";
 837             case CLOSED   -> name + "/closed";
 838             default -> throw new InternalError();
 839         };
 840     }
 841 
 842     /**
 843      * Subtask implementation, runs the task specified to the fork method.
 844      */
 845     private static final class SubtaskImpl<T> implements Subtask<T>, Runnable {
 846         private static final AltResult RESULT_NULL = new AltResult(Subtask.State.SUCCESS);
 847 
 848         private record AltResult(Subtask.State state, Throwable exception) {
 849             AltResult(Subtask.State state) {
 850                 this(state, null);
 851             }
 852         }
 853 
 854         private final StructuredTaskScope<? super T> scope;
 855         private final Callable<? extends T> task;

 856         private volatile Object result;
 857 
 858         SubtaskImpl(StructuredTaskScope<? super T> scope, Callable<? extends T> task) {


 859             this.scope = scope;
 860             this.task = task;

 861         }
 862 
 863         @Override
 864         public void run() {
 865             T result = null;
 866             Throwable ex = null;
 867             try {
 868                 result = task.call();
 869             } catch (Throwable e) {
 870                 ex = e;
 871             }
 872 
 873             // nothing to do if task scope is shutdown
 874             if (scope.isShutdown())
 875                 return;
 876 
 877             // capture result or exception, invoke handleComplete
 878             if (ex == null) {
 879                 this.result = (result != null) ? result : RESULT_NULL;
 880             } else {

 886         @Override
 887         public Callable<? extends T> task() {
 888             return task;
 889         }
 890 
 891         @Override
 892         public Subtask.State state() {
 893             Object result = this.result;
 894             if (result == null) {
 895                 return State.UNAVAILABLE;
 896             } else if (result instanceof AltResult alt) {
 897                 // null or failed
 898                 return alt.state();
 899             } else {
 900                 return State.SUCCESS;
 901             }
 902         }
 903 
 904         @Override
 905         public T get() {

 906             Object result = this.result;
 907             if (result instanceof AltResult) {
 908                 if (result == RESULT_NULL) return null;
 909             } else if (result != null) {
 910                 @SuppressWarnings("unchecked")
 911                 T r = (T) result;
 912                 return r;
 913             }
 914             throw new IllegalStateException(
 915                     "Result is unavailable or subtask did not complete successfully");
 916         }
 917 
 918         @Override
 919         public Throwable exception() {

 920             Object result = this.result;
 921             if (result instanceof AltResult alt && alt.state() == State.FAILED) {
 922                 return alt.exception();
 923             }
 924             throw new IllegalStateException(
 925                     "Exception is unavailable or subtask did not complete with exception");
 926         }
 927 
 928         @Override
 929         public String toString() {
 930             String stateAsString = switch (state()) {
 931                 case UNAVAILABLE -> "[Unavailable]";
 932                 case SUCCESS     -> "[Completed successfully]";
 933                 case FAILED      -> {
 934                     Throwable ex = ((AltResult) result).exception();
 935                     yield "[Failed: " + ex + "]";
 936                 }
 937             };
 938             return Objects.toIdentityString(this) + stateAsString;
 939         }
< prev index next >