< prev index next > src/java.base/share/classes/java/util/concurrent/Joiners.java
Print this page
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.StructuredTaskScope.Joiner;
import java.util.concurrent.StructuredTaskScope.Subtask;
import java.util.function.Predicate;
- import java.util.stream.Stream;
import jdk.internal.invoke.MhUtil;
/**
* Built-in StructuredTaskScope.Joiner implementations.
*/
}
return state;
}
/**
! * A joiner that returns a stream of all subtasks when all subtasks complete
* successfully. Cancels the scope if any subtask fails.
*/
! static final class AllSuccessful<T> implements Joiner<T, Stream<Subtask<T>>> {
private static final VarHandle FIRST_EXCEPTION =
MhUtil.findVarHandle(MethodHandles.lookup(), "firstException", Throwable.class);
! // list of forked subtasks, only accessed by owner thread
! private final List<Subtask<T>> subtasks = new ArrayList<>();
private volatile Throwable firstException;
@Override
! public boolean onFork(Subtask<? extends T> subtask) {
ensureUnavailable(subtask);
! @SuppressWarnings("unchecked")
! var s = (Subtask<T>) subtask;
! subtasks.add(s);
return false;
}
@Override
! public boolean onComplete(Subtask<? extends T> subtask) {
Subtask.State state = ensureCompleted(subtask);
return (state == Subtask.State.FAILED)
&& (firstException == null)
&& FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
}
@Override
! public Stream<Subtask<T>> result() throws Throwable {
Throwable ex = firstException;
! if (ex != null) {
! throw ex;
! } else {
! return subtasks.stream();
}
}
}
/**
}
return state;
}
/**
! * A joiner that returns a list of all results when all subtasks complete
* successfully. Cancels the scope if any subtask fails.
*/
! static final class AllSuccessful<T> implements Joiner<T, List<T>> {
private static final VarHandle FIRST_EXCEPTION =
MhUtil.findVarHandle(MethodHandles.lookup(), "firstException", Throwable.class);
! // list of forked subtasks, created lazily, only accessed by owner thread
! private List<Subtask<T>> subtasks;
private volatile Throwable firstException;
@Override
! public boolean onFork(Subtask<T> subtask) {
ensureUnavailable(subtask);
! if (subtasks == null) {
! subtasks = new ArrayList<>();
! }
+ subtasks.add(subtask);
return false;
}
@Override
! public boolean onComplete(Subtask<T> subtask) {
Subtask.State state = ensureCompleted(subtask);
return (state == Subtask.State.FAILED)
&& (firstException == null)
&& FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
}
@Override
! public List<T> result() throws Throwable {
Throwable ex = firstException;
! try {
! if (ex != null) {
! throw ex;
! }
+ return (subtasks != null)
+ ? subtasks.stream().map(Subtask::get).toList()
+ : List.of();
+ } finally {
+ subtasks = null; // allow subtasks to be GC'ed
}
}
}
/**
case SUCCESS -> 2;
};
}
@Override
! public boolean onComplete(Subtask<? extends T> subtask) {
Subtask.State state = ensureCompleted(subtask);
Subtask<T> s;
while (((s = this.subtask) == null)
|| SUBTASK_STATE_COMPARATOR.compare(s.state(), state) < 0) {
if (SUBTASK.compareAndSet(this, s, subtask)) {
case SUCCESS -> 2;
};
}
@Override
! public boolean onComplete(Subtask<T> subtask) {
Subtask.State state = ensureCompleted(subtask);
Subtask<T> s;
while (((s = this.subtask) == null)
|| SUBTASK_STATE_COMPARATOR.compare(s.state(), state) < 0) {
if (SUBTASK.compareAndSet(this, s, subtask)) {
private static final VarHandle FIRST_EXCEPTION =
MhUtil.findVarHandle(MethodHandles.lookup(), "firstException", Throwable.class);
private volatile Throwable firstException;
@Override
! public boolean onComplete(Subtask<? extends T> subtask) {
Subtask.State state = ensureCompleted(subtask);
return (state == Subtask.State.FAILED)
&& (firstException == null)
&& FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
}
private static final VarHandle FIRST_EXCEPTION =
MhUtil.findVarHandle(MethodHandles.lookup(), "firstException", Throwable.class);
private volatile Throwable firstException;
@Override
! public boolean onComplete(Subtask<T> subtask) {
Subtask.State state = ensureCompleted(subtask);
return (state == Subtask.State.FAILED)
&& (firstException == null)
&& FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
}
}
}
}
/**
! * A joiner that returns a stream of all subtasks.
*/
! static final class AllSubtasks<T> implements Joiner<T, Stream<Subtask<T>>> {
! private final Predicate<Subtask<? extends T>> isDone;
! // list of forked subtasks, only accessed by owner thread
! private final List<Subtask<T>> subtasks = new ArrayList<>();
! AllSubtasks(Predicate<Subtask<? extends T>> isDone) {
this.isDone = Objects.requireNonNull(isDone);
}
@Override
! public boolean onFork(Subtask<? extends T> subtask) {
ensureUnavailable(subtask);
! @SuppressWarnings("unchecked")
! var s = (Subtask<T>) subtask;
! subtasks.add(s);
return false;
}
@Override
! public boolean onComplete(Subtask<? extends T> subtask) {
ensureCompleted(subtask);
return isDone.test(subtask);
}
@Override
! public Stream<Subtask<T>> result() {
! return subtasks.stream();
}
}
}
}
}
}
/**
! * A joiner that returns a list of all subtasks.
*/
! static final class AllSubtasks<T> implements Joiner<T, List<Subtask<T>>> {
! private final Predicate<Subtask<T>> isDone;
! // list of forked subtasks, created lazily, only accessed by owner thread
! private List<Subtask<T>> subtasks;
! AllSubtasks(Predicate<Subtask<T>> isDone) {
this.isDone = Objects.requireNonNull(isDone);
}
@Override
! public boolean onFork(Subtask<T> subtask) {
ensureUnavailable(subtask);
! if (subtasks == null) {
! subtasks = new ArrayList<>();
! }
+ subtasks.add(subtask);
return false;
}
@Override
! public boolean onComplete(Subtask<T> subtask) {
ensureCompleted(subtask);
return isDone.test(subtask);
}
@Override
! public void onTimeout() {
! // do nothing, this joiner does not throw TimeoutException
+ }
+
+ @Override
+ public List<Subtask<T>> result() {
+ if (subtasks != null) {
+ List<Subtask<T>> result = List.copyOf(subtasks);
+ subtasks = null; // allow subtasks to be GC'ed
+ return result;
+ } else {
+ return List.of();
+ }
}
}
}
< prev index next >