< prev index next > src/java.base/share/classes/java/util/concurrent/Joiners.java
Print this page
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.StructuredTaskScope.Joiner;
import java.util.concurrent.StructuredTaskScope.Subtask;
import java.util.function.Predicate;
- import java.util.stream.Stream;
import jdk.internal.invoke.MhUtil;
/**
* Built-in StructuredTaskScope.Joiner implementations.
*/
}
return state;
}
/**
- * A joiner that returns a stream of all subtasks when all subtasks complete
+ * A joiner that returns a list of all results when all subtasks complete
* successfully. Cancels the scope if any subtask fails.
*/
- static final class AllSuccessful<T> implements Joiner<T, Stream<Subtask<T>>> {
+ static final class AllSuccessful<T> implements Joiner<T, List<T>> {
private static final VarHandle FIRST_EXCEPTION =
MhUtil.findVarHandle(MethodHandles.lookup(), "firstException", Throwable.class);
- // list of forked subtasks, only accessed by owner thread
- private final List<Subtask<T>> subtasks = new ArrayList<>();
+ // list of forked subtasks, created lazily, only accessed by owner thread
+ private List<Subtask<T>> subtasks;
private volatile Throwable firstException;
@Override
- public boolean onFork(Subtask<? extends T> subtask) {
+ public boolean onFork(Subtask<T> subtask) {
ensureUnavailable(subtask);
- @SuppressWarnings("unchecked")
- var s = (Subtask<T>) subtask;
- subtasks.add(s);
+ if (subtasks == null) {
+ subtasks = new ArrayList<>();
+ }
+ subtasks.add(subtask);
return false;
}
@Override
- public boolean onComplete(Subtask<? extends T> subtask) {
+ public boolean onComplete(Subtask<T> subtask) {
Subtask.State state = ensureCompleted(subtask);
return (state == Subtask.State.FAILED)
&& (firstException == null)
&& FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
}
@Override
- public Stream<Subtask<T>> result() throws Throwable {
+ public List<T> result() throws Throwable {
Throwable ex = firstException;
- if (ex != null) {
- throw ex;
- } else {
- return subtasks.stream();
+ try {
+ if (ex != null) {
+ throw ex;
+ }
+ return (subtasks != null)
+ ? subtasks.stream().map(Subtask::get).toList()
+ : List.of();
+ } finally {
+ subtasks = null; // allow subtasks to be GC'ed
}
}
}
/**
case SUCCESS -> 2;
};
}
@Override
- public boolean onComplete(Subtask<? extends T> subtask) {
+ public boolean onComplete(Subtask<T> subtask) {
Subtask.State state = ensureCompleted(subtask);
Subtask<T> s;
while (((s = this.subtask) == null)
|| SUBTASK_STATE_COMPARATOR.compare(s.state(), state) < 0) {
if (SUBTASK.compareAndSet(this, s, subtask)) {
private static final VarHandle FIRST_EXCEPTION =
MhUtil.findVarHandle(MethodHandles.lookup(), "firstException", Throwable.class);
private volatile Throwable firstException;
@Override
- public boolean onComplete(Subtask<? extends T> subtask) {
+ public boolean onComplete(Subtask<T> subtask) {
Subtask.State state = ensureCompleted(subtask);
return (state == Subtask.State.FAILED)
&& (firstException == null)
&& FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
}
}
}
}
/**
- * A joiner that returns a stream of all subtasks.
+ * A joiner that returns a list of all subtasks.
*/
- static final class AllSubtasks<T> implements Joiner<T, Stream<Subtask<T>>> {
- private final Predicate<Subtask<? extends T>> isDone;
+ static final class AllSubtasks<T> implements Joiner<T, List<Subtask<T>>> {
+ private final Predicate<Subtask<T>> isDone;
- // list of forked subtasks, only accessed by owner thread
- private final List<Subtask<T>> subtasks = new ArrayList<>();
+ // list of forked subtasks, created lazily, only accessed by owner thread
+ private List<Subtask<T>> subtasks;
- AllSubtasks(Predicate<Subtask<? extends T>> isDone) {
+ AllSubtasks(Predicate<Subtask<T>> isDone) {
this.isDone = Objects.requireNonNull(isDone);
}
@Override
- public boolean onFork(Subtask<? extends T> subtask) {
+ public boolean onFork(Subtask<T> subtask) {
ensureUnavailable(subtask);
- @SuppressWarnings("unchecked")
- var s = (Subtask<T>) subtask;
- subtasks.add(s);
+ if (subtasks == null) {
+ subtasks = new ArrayList<>();
+ }
+ subtasks.add(subtask);
return false;
}
@Override
- public boolean onComplete(Subtask<? extends T> subtask) {
+ public boolean onComplete(Subtask<T> subtask) {
ensureCompleted(subtask);
return isDone.test(subtask);
}
@Override
- public Stream<Subtask<T>> result() {
- return subtasks.stream();
+ public void onTimeout() {
+ // do nothing, this joiner does not throw TimeoutException
+ }
+
+ @Override
+ public List<Subtask<T>> result() {
+ if (subtasks != null) {
+ List<Subtask<T>> result = List.copyOf(subtasks);
+ subtasks = null; // allow subtasks to be GC'ed
+ return result;
+ } else {
+ return List.of();
+ }
}
}
}
< prev index next >