< prev index next >

src/java.base/share/classes/java/util/concurrent/StructuredTaskScopeImpl.java

Print this page
@@ -26,11 +26,11 @@
  
  import java.lang.invoke.MethodHandles;
  import java.lang.invoke.VarHandle;
  import java.time.Duration;
  import java.util.Objects;
- import java.util.function.Function;
+ import java.util.function.UnaryOperator;
  import jdk.internal.misc.ThreadFlock;
  import jdk.internal.invoke.MhUtil;
  
  /**
   * StructuredTaskScope implementation.

@@ -74,14 +74,14 @@
       * Returns a new {@code StructuredTaskScope} to use the given {@code Joiner} object
       * and with configuration that is the result of applying the given function to the
       * default configuration.
       */
      static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner,
-                                                  Function<Configuration, Configuration> configFunction) {
+                                                  UnaryOperator<Configuration> configOperator) {
          Objects.requireNonNull(joiner);
  
-         var config = (ConfigImpl) configFunction.apply(ConfigImpl.defaultConfig());
+         var config = (ConfigImpl) configOperator.apply(ConfigImpl.defaultConfig());
          var scope = new StructuredTaskScopeImpl<T, R>(joiner, config.threadFactory(), config.name());
  
          // schedule timeout
          Duration timeout = config.timeout();
          if (timeout != null) {

@@ -106,26 +106,16 @@
          if (Thread.currentThread() != flock.owner()) {
              throw new WrongThreadException("Current thread not owner");
          }
      }
  
-     /**
-      * Throws IllegalStateException if already joined or scope is closed.
-      */
-     private void ensureNotJoined() {
-         assert Thread.currentThread() == flock.owner();
-         if (state > ST_FORKED) {
-             throw new IllegalStateException("Already joined or scope is closed");
-         }
-     }
- 
      /**
       * Throws IllegalStateException if invoked by the owner thread and the owner thread
       * has not joined.
       */
      private void ensureJoinedIfOwner() {
-         if (Thread.currentThread() == flock.owner() && state <= ST_JOIN_STARTED) {
+         if (Thread.currentThread() == flock.owner() && state < ST_JOIN_STARTED) {
              throw new IllegalStateException("join not called");
          }
      }
  
      /**

@@ -193,11 +183,13 @@
  
      @Override
      public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
          Objects.requireNonNull(task);
          ensureOwner();
-         ensureNotJoined();
+         if (state > ST_FORKED) {
+             throw new IllegalStateException("join already called or scope is closed");
+         }
  
          var subtask = new SubtaskImpl<U>(this, task);
  
          // notify joiner, even if cancelled
          if (joiner.onFork(subtask)) {

@@ -232,26 +224,30 @@
      }
  
      @Override
      public R join() throws InterruptedException {
          ensureOwner();
-         ensureNotJoined();
+         if (state >= ST_JOIN_COMPLETED) {
+             throw new IllegalStateException("Already joined or scope is closed");
+         }
  
          // join started
          state = ST_JOIN_STARTED;
  
          // wait for all subtasks, the scope to be cancelled, or interrupt
          flock.awaitAll();
  
-         // throw if timeout expired
+         // all subtasks completed or scope cancelled
+         state = ST_JOIN_COMPLETED;
+ 
+         // invoke joiner onTimeout if timeout expired
          if (timeoutExpired) {
-             throw new TimeoutException();
+             cancel();  // ensure cancelled before calling onTimeout
+             joiner.onTimeout();
+         } else {
+             cancelTimeout();
          }
-         cancelTimeout();
- 
-         // all subtasks completed or cancelled
-         state = ST_JOIN_COMPLETED;
  
          // invoke joiner to get result
          try {
              return joiner.result();
          } catch (Throwable e) {
< prev index next >