< prev index next > src/java.base/share/classes/java/util/concurrent/StructuredTaskScopeImpl.java
Print this page
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.time.Duration;
import java.util.Objects;
- import java.util.function.Function;
+ import java.util.function.UnaryOperator;
import jdk.internal.misc.ThreadFlock;
import jdk.internal.invoke.MhUtil;
/**
* StructuredTaskScope implementation.
* Returns a new {@code StructuredTaskScope} to use the given {@code Joiner} object
* and with configuration that is the result of applying the given function to the
* default configuration.
*/
static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner,
- Function<Configuration, Configuration> configFunction) {
+ UnaryOperator<Configuration> configOperator) {
Objects.requireNonNull(joiner);
- var config = (ConfigImpl) configFunction.apply(ConfigImpl.defaultConfig());
+ var config = (ConfigImpl) configOperator.apply(ConfigImpl.defaultConfig());
var scope = new StructuredTaskScopeImpl<T, R>(joiner, config.threadFactory(), config.name());
// schedule timeout
Duration timeout = config.timeout();
if (timeout != null) {
if (Thread.currentThread() != flock.owner()) {
throw new WrongThreadException("Current thread not owner");
}
}
- /**
- * Throws IllegalStateException if already joined or scope is closed.
- */
- private void ensureNotJoined() {
- assert Thread.currentThread() == flock.owner();
- if (state > ST_FORKED) {
- throw new IllegalStateException("Already joined or scope is closed");
- }
- }
-
/**
* Throws IllegalStateException if invoked by the owner thread and the owner thread
* has not joined.
*/
private void ensureJoinedIfOwner() {
- if (Thread.currentThread() == flock.owner() && state <= ST_JOIN_STARTED) {
+ if (Thread.currentThread() == flock.owner() && state < ST_JOIN_STARTED) {
throw new IllegalStateException("join not called");
}
}
/**
@Override
public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
Objects.requireNonNull(task);
ensureOwner();
- ensureNotJoined();
+ if (state > ST_FORKED) {
+ throw new IllegalStateException("join already called or scope is closed");
+ }
var subtask = new SubtaskImpl<U>(this, task);
// notify joiner, even if cancelled
if (joiner.onFork(subtask)) {
}
@Override
public R join() throws InterruptedException {
ensureOwner();
- ensureNotJoined();
+ if (state >= ST_JOIN_COMPLETED) {
+ throw new IllegalStateException("Already joined or scope is closed");
+ }
// join started
state = ST_JOIN_STARTED;
// wait for all subtasks, the scope to be cancelled, or interrupt
flock.awaitAll();
- // throw if timeout expired
+ // all subtasks completed or scope cancelled
+ state = ST_JOIN_COMPLETED;
+
+ // invoke joiner onTimeout if timeout expired
if (timeoutExpired) {
- throw new TimeoutException();
+ cancel(); // ensure cancelled before calling onTimeout
+ joiner.onTimeout();
+ } else {
+ cancelTimeout();
}
- cancelTimeout();
-
- // all subtasks completed or cancelled
- state = ST_JOIN_COMPLETED;
// invoke joiner to get result
try {
return joiner.result();
} catch (Throwable e) {
< prev index next >