< prev index next >

src/java.base/share/classes/java/util/concurrent/StructuredTaskScopeImpl.java

Print this page
*** 26,11 ***
  
  import java.lang.invoke.MethodHandles;
  import java.lang.invoke.VarHandle;
  import java.time.Duration;
  import java.util.Objects;
! import java.util.function.Function;
  import jdk.internal.misc.ThreadFlock;
  import jdk.internal.invoke.MhUtil;
  
  /**
   * StructuredTaskScope implementation.
--- 26,11 ---
  
  import java.lang.invoke.MethodHandles;
  import java.lang.invoke.VarHandle;
  import java.time.Duration;
  import java.util.Objects;
! import java.util.function.UnaryOperator;
  import jdk.internal.misc.ThreadFlock;
  import jdk.internal.invoke.MhUtil;
  
  /**
   * StructuredTaskScope implementation.

*** 74,14 ***
       * Returns a new {@code StructuredTaskScope} to use the given {@code Joiner} object
       * and with configuration that is the result of applying the given function to the
       * default configuration.
       */
      static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner,
!                                                  Function<Configuration, Configuration> configFunction) {
          Objects.requireNonNull(joiner);
  
!         var config = (ConfigImpl) configFunction.apply(ConfigImpl.defaultConfig());
          var scope = new StructuredTaskScopeImpl<T, R>(joiner, config.threadFactory(), config.name());
  
          // schedule timeout
          Duration timeout = config.timeout();
          if (timeout != null) {
--- 74,14 ---
       * Returns a new {@code StructuredTaskScope} to use the given {@code Joiner} object
       * and with configuration that is the result of applying the given function to the
       * default configuration.
       */
      static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner,
!                                                  UnaryOperator<Configuration> configOperator) {
          Objects.requireNonNull(joiner);
  
!         var config = (ConfigImpl) configOperator.apply(ConfigImpl.defaultConfig());
          var scope = new StructuredTaskScopeImpl<T, R>(joiner, config.threadFactory(), config.name());
  
          // schedule timeout
          Duration timeout = config.timeout();
          if (timeout != null) {

*** 106,26 ***
          if (Thread.currentThread() != flock.owner()) {
              throw new WrongThreadException("Current thread not owner");
          }
      }
  
-     /**
-      * Throws IllegalStateException if already joined or scope is closed.
-      */
-     private void ensureNotJoined() {
-         assert Thread.currentThread() == flock.owner();
-         if (state > ST_FORKED) {
-             throw new IllegalStateException("Already joined or scope is closed");
-         }
-     }
- 
      /**
       * Throws IllegalStateException if invoked by the owner thread and the owner thread
       * has not joined.
       */
      private void ensureJoinedIfOwner() {
!         if (Thread.currentThread() == flock.owner() && state <= ST_JOIN_STARTED) {
              throw new IllegalStateException("join not called");
          }
      }
  
      /**
--- 106,16 ---
          if (Thread.currentThread() != flock.owner()) {
              throw new WrongThreadException("Current thread not owner");
          }
      }
  
      /**
       * Throws IllegalStateException if invoked by the owner thread and the owner thread
       * has not joined.
       */
      private void ensureJoinedIfOwner() {
!         if (Thread.currentThread() == flock.owner() && state < ST_JOIN_STARTED) {
              throw new IllegalStateException("join not called");
          }
      }
  
      /**

*** 193,11 ***
  
      @Override
      public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
          Objects.requireNonNull(task);
          ensureOwner();
!         ensureNotJoined();
  
          var subtask = new SubtaskImpl<U>(this, task);
  
          // notify joiner, even if cancelled
          if (joiner.onFork(subtask)) {
--- 183,13 ---
  
      @Override
      public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
          Objects.requireNonNull(task);
          ensureOwner();
!         if (state > ST_FORKED) {
+             throw new IllegalStateException("join already called or scope is closed");
+         }
  
          var subtask = new SubtaskImpl<U>(this, task);
  
          // notify joiner, even if cancelled
          if (joiner.onFork(subtask)) {

*** 232,26 ***
      }
  
      @Override
      public R join() throws InterruptedException {
          ensureOwner();
!         ensureNotJoined();
  
          // join started
          state = ST_JOIN_STARTED;
  
          // wait for all subtasks, the scope to be cancelled, or interrupt
          flock.awaitAll();
  
!         // throw if timeout expired
          if (timeoutExpired) {
!             throw new TimeoutException();
          }
-         cancelTimeout();
- 
-         // all subtasks completed or cancelled
-         state = ST_JOIN_COMPLETED;
  
          // invoke joiner to get result
          try {
              return joiner.result();
          } catch (Throwable e) {
--- 224,30 ---
      }
  
      @Override
      public R join() throws InterruptedException {
          ensureOwner();
!         if (state >= ST_JOIN_COMPLETED) {
+             throw new IllegalStateException("Already joined or scope is closed");
+         }
  
          // join started
          state = ST_JOIN_STARTED;
  
          // wait for all subtasks, the scope to be cancelled, or interrupt
          flock.awaitAll();
  
!         // all subtasks completed or scope cancelled
+         state = ST_JOIN_COMPLETED;
+ 
+         // invoke joiner onTimeout if timeout expired
          if (timeoutExpired) {
!             cancel();  // ensure cancelled before calling onTimeout
+             joiner.onTimeout();
+         } else {
+             cancelTimeout();
          }
  
          // invoke joiner to get result
          try {
              return joiner.result();
          } catch (Throwable e) {
< prev index next >