< prev index next >

src/java.base/share/classes/java/util/concurrent/Joiners.java

Print this page
@@ -32,11 +32,10 @@
  import java.util.NoSuchElementException;
  import java.util.Objects;
  import java.util.concurrent.StructuredTaskScope.Joiner;
  import java.util.concurrent.StructuredTaskScope.Subtask;
  import java.util.function.Predicate;
- import java.util.stream.Stream;
  import jdk.internal.invoke.MhUtil;
  
  /**
   * Built-in StructuredTaskScope.Joiner implementations.
   */

@@ -62,46 +61,52 @@
          }
          return state;
      }
  
      /**
-      * A joiner that returns a stream of all subtasks when all subtasks complete
+      * A joiner that returns a list of all results when all subtasks complete
       * successfully. Cancels the scope if any subtask fails.
       */
-     static final class AllSuccessful<T> implements Joiner<T, Stream<Subtask<T>>> {
+     static final class AllSuccessful<T> implements Joiner<T, List<T>> {
          private static final VarHandle FIRST_EXCEPTION =
                  MhUtil.findVarHandle(MethodHandles.lookup(), "firstException", Throwable.class);
  
-         // list of forked subtasks, only accessed by owner thread
-         private final List<Subtask<T>> subtasks = new ArrayList<>();
+         // list of forked subtasks, created lazily, only accessed by owner thread
+         private List<Subtask<T>> subtasks;
  
          private volatile Throwable firstException;
  
          @Override
-         public boolean onFork(Subtask<? extends T> subtask) {
+         public boolean onFork(Subtask<T> subtask) {
              ensureUnavailable(subtask);
-             @SuppressWarnings("unchecked")
-             var s = (Subtask<T>) subtask;
-             subtasks.add(s);
+             if (subtasks == null) {
+                 subtasks = new ArrayList<>();
+             }
+             subtasks.add(subtask);
              return false;
          }
  
          @Override
-         public boolean onComplete(Subtask<? extends T> subtask) {
+         public boolean onComplete(Subtask<T> subtask) {
              Subtask.State state = ensureCompleted(subtask);
              return (state == Subtask.State.FAILED)
                      && (firstException == null)
                      && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
          }
  
          @Override
-         public Stream<Subtask<T>> result() throws Throwable {
+         public List<T> result() throws Throwable {
              Throwable ex = firstException;
-             if (ex != null) {
-                 throw ex;
-             } else {
-                 return subtasks.stream();
+             try {
+                 if (ex != null) {
+                     throw ex;
+                 }
+                 return (subtasks != null)
+                         ? subtasks.stream().map(Subtask::get).toList()
+                         : List.of();
+             } finally {
+                 subtasks = null;  // allow subtasks to be GC'ed
              }
          }
      }
  
      /**

@@ -128,11 +133,11 @@
                  case SUCCESS     -> 2;
              };
          }
  
          @Override
-         public boolean onComplete(Subtask<? extends T> subtask) {
+         public boolean onComplete(Subtask<T> subtask) {
              Subtask.State state = ensureCompleted(subtask);
              Subtask<T> s;
              while (((s = this.subtask) == null)
                      || SUBTASK_STATE_COMPARATOR.compare(s.state(), state) < 0) {
                  if (SUBTASK.compareAndSet(this, s, subtask)) {

@@ -164,11 +169,11 @@
          private static final VarHandle FIRST_EXCEPTION =
                  MhUtil.findVarHandle(MethodHandles.lookup(), "firstException", Throwable.class);
          private volatile Throwable firstException;
  
          @Override
-         public boolean onComplete(Subtask<? extends T> subtask) {
+         public boolean onComplete(Subtask<T> subtask) {
              Subtask.State state = ensureCompleted(subtask);
              return (state == Subtask.State.FAILED)
                      && (firstException == null)
                      && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
          }

@@ -183,38 +188,50 @@
              }
          }
      }
  
      /**
-      * A joiner that returns a stream of all subtasks.
+      * A joiner that returns a list of all subtasks.
       */
-     static final class AllSubtasks<T> implements Joiner<T, Stream<Subtask<T>>> {
-         private final Predicate<Subtask<? extends T>> isDone;
+     static final class AllSubtasks<T> implements Joiner<T, List<Subtask<T>>> {
+         private final Predicate<Subtask<T>> isDone;
  
-         // list of forked subtasks, only accessed by owner thread
-         private final List<Subtask<T>> subtasks = new ArrayList<>();
+         // list of forked subtasks, created lazily, only accessed by owner thread
+         private List<Subtask<T>> subtasks;
  
-         AllSubtasks(Predicate<Subtask<? extends T>> isDone) {
+         AllSubtasks(Predicate<Subtask<T>> isDone) {
              this.isDone = Objects.requireNonNull(isDone);
          }
  
          @Override
-         public boolean onFork(Subtask<? extends T> subtask) {
+         public boolean onFork(Subtask<T> subtask) {
              ensureUnavailable(subtask);
-             @SuppressWarnings("unchecked")
-             var s = (Subtask<T>) subtask;
-             subtasks.add(s);
+             if (subtasks == null) {
+                 subtasks = new ArrayList<>();
+             }
+             subtasks.add(subtask);
              return false;
          }
  
          @Override
-         public boolean onComplete(Subtask<? extends T> subtask) {
+         public boolean onComplete(Subtask<T> subtask) {
              ensureCompleted(subtask);
              return isDone.test(subtask);
          }
  
          @Override
-         public Stream<Subtask<T>> result() {
-             return subtasks.stream();
+         public void onTimeout() {
+             // do nothing, this joiner does not throw TimeoutException
+         }
+ 
+         @Override
+         public List<Subtask<T>> result() {
+             if (subtasks != null) {
+                 List<Subtask<T>> result = List.copyOf(subtasks);
+                 subtasks = null;  // allow subtasks to be GC'ed
+                 return result;
+             } else {
+                 return List.of();
+             }
          }
      }
  }
< prev index next >