< prev index next > src/java.base/share/classes/java/util/concurrent/StructuredTaskScopeImpl.java
Print this page
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.time.Duration;
import java.util.Objects;
! import java.util.function.Function;
import jdk.internal.misc.ThreadFlock;
import jdk.internal.invoke.MhUtil;
/**
* StructuredTaskScope implementation.
*/
final class StructuredTaskScopeImpl<T, R> implements StructuredTaskScope<T, R> {
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.time.Duration;
import java.util.Objects;
! import java.util.function.UnaryOperator;
import jdk.internal.misc.ThreadFlock;
import jdk.internal.invoke.MhUtil;
+ import jdk.internal.vm.annotation.Stable;
/**
* StructuredTaskScope implementation.
*/
final class StructuredTaskScopeImpl<T, R> implements StructuredTaskScope<T, R> {
private final Joiner<? super T, ? extends R> joiner;
private final ThreadFactory threadFactory;
private final ThreadFlock flock;
! // state, only accessed by owner thread
! private static final int ST_NEW = 0,
- ST_FORKED = 1, // subtasks forked, need to join
ST_JOIN_STARTED = 2, // join started, can no longer fork
ST_JOIN_COMPLETED = 3, // join completed
ST_CLOSED = 4; // closed
! private int state;
-
- // timer task, only accessed by owner thread
- private Future<?> timerTask;
// set or read by any thread
private volatile boolean cancelled;
// set by the timer thread, read by the owner thread
private volatile boolean timeoutExpired;
@SuppressWarnings("this-escape")
private StructuredTaskScopeImpl(Joiner<? super T, ? extends R> joiner,
ThreadFactory threadFactory,
String name) {
this.joiner = joiner;
this.threadFactory = threadFactory;
this.flock = ThreadFlock.open((name != null) ? name : Objects.toIdentityString(this));
- this.state = ST_NEW;
}
/**
* Returns a new {@code StructuredTaskScope} to use the given {@code Joiner} object
* and with configuration that is the result of applying the given function to the
* default configuration.
*/
static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner,
! Function<Configuration, Configuration> configFunction) {
Objects.requireNonNull(joiner);
! var config = (ConfigImpl) configFunction.apply(ConfigImpl.defaultConfig());
var scope = new StructuredTaskScopeImpl<T, R>(joiner, config.threadFactory(), config.name());
// schedule timeout
Duration timeout = config.timeout();
if (timeout != null) {
private final Joiner<? super T, ? extends R> joiner;
private final ThreadFactory threadFactory;
private final ThreadFlock flock;
! // scope state, set by owner thread, read by any thread
! private static final int ST_FORKED = 1, // subtasks forked, need to join
ST_JOIN_STARTED = 2, // join started, can no longer fork
ST_JOIN_COMPLETED = 3, // join completed
ST_CLOSED = 4; // closed
! private volatile int state;
// set or read by any thread
private volatile boolean cancelled;
+ // timer task, only accessed by owner thread
+ private Future<?> timerTask;
+
// set by the timer thread, read by the owner thread
private volatile boolean timeoutExpired;
@SuppressWarnings("this-escape")
private StructuredTaskScopeImpl(Joiner<? super T, ? extends R> joiner,
ThreadFactory threadFactory,
String name) {
this.joiner = joiner;
this.threadFactory = threadFactory;
this.flock = ThreadFlock.open((name != null) ? name : Objects.toIdentityString(this));
}
/**
* Returns a new {@code StructuredTaskScope} to use the given {@code Joiner} object
* and with configuration that is the result of applying the given function to the
* default configuration.
*/
static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner,
! UnaryOperator<Configuration> configOperator) {
Objects.requireNonNull(joiner);
! var config = (ConfigImpl) configOperator.apply(ConfigImpl.defaultConfig());
var scope = new StructuredTaskScopeImpl<T, R>(joiner, config.threadFactory(), config.name());
// schedule timeout
Duration timeout = config.timeout();
if (timeout != null) {
throw new WrongThreadException("Current thread not owner");
}
}
/**
! * Throws IllegalStateException if already joined or scope is closed.
*/
! private void ensureNotJoined() {
! assert Thread.currentThread() == flock.owner();
- if (state > ST_FORKED) {
- throw new IllegalStateException("Already joined or scope is closed");
- }
- }
-
- /**
- * Throws IllegalStateException if invoked by the owner thread and the owner thread
- * has not joined.
- */
- private void ensureJoinedIfOwner() {
- if (Thread.currentThread() == flock.owner() && state <= ST_JOIN_STARTED) {
- throw new IllegalStateException("join not called");
- }
}
/**
* Interrupts all threads in this scope, except the current thread.
*/
throw new WrongThreadException("Current thread not owner");
}
}
/**
! * Returns true if join has been invoked and there is an outcome.
*/
! private boolean isJoinCompleted() {
! return state >= ST_JOIN_COMPLETED;
}
/**
* Interrupts all threads in this scope, except the current thread.
*/
@Override
public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
Objects.requireNonNull(task);
ensureOwner();
! ensureNotJoined();
var subtask = new SubtaskImpl<U>(this, task);
// notify joiner, even if cancelled
if (joiner.onFork(subtask)) {
@Override
public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
Objects.requireNonNull(task);
ensureOwner();
! int s = state;
+ if (s > ST_FORKED) {
+ throw new IllegalStateException("join already called or scope is closed");
+ }
var subtask = new SubtaskImpl<U>(this, task);
// notify joiner, even if cancelled
if (joiner.onFork(subtask)) {
if (thread == null) {
throw new RejectedExecutionException("Rejected by thread factory");
}
// attempt to start the thread
try {
flock.start(thread);
} catch (IllegalStateException e) {
// shutdown by another thread, or underlying flock is shutdown due
// to unstructured use
}
}
// force owner to join
! state = ST_FORKED;
return subtask;
}
@Override
public <U extends T> Subtask<U> fork(Runnable task) {
if (thread == null) {
throw new RejectedExecutionException("Rejected by thread factory");
}
// attempt to start the thread
+ subtask.setThread(thread);
try {
flock.start(thread);
} catch (IllegalStateException e) {
// shutdown by another thread, or underlying flock is shutdown due
// to unstructured use
}
}
// force owner to join
! if (s < ST_FORKED) {
+ state = ST_FORKED;
+ }
return subtask;
}
@Override
public <U extends T> Subtask<U> fork(Runnable task) {
}
@Override
public R join() throws InterruptedException {
ensureOwner();
! ensureNotJoined();
!
! // join started
- state = ST_JOIN_STARTED;
// wait for all subtasks, the scope to be cancelled, or interrupt
! flock.awaitAll();
!
! // throw if timeout expired
! if (timeoutExpired) {
! throw new TimeoutException();
}
- cancelTimeout();
! // all subtasks completed or cancelled
state = ST_JOIN_COMPLETED;
// invoke joiner to get result
try {
return joiner.result();
} catch (Throwable e) {
throw new FailedException(e);
}
@Override
public R join() throws InterruptedException {
ensureOwner();
! if (state >= ST_JOIN_COMPLETED) {
! throw new IllegalStateException("Already joined or scope is closed");
! }
// wait for all subtasks, the scope to be cancelled, or interrupt
! try {
! flock.awaitAll();
! } catch (InterruptedException e) {
! state = ST_JOIN_STARTED; // joining not completed, prevent new forks
! throw e;
}
! // all subtasks completed or scope cancelled
state = ST_JOIN_COMPLETED;
+ // invoke joiner onTimeout if timeout expired
+ if (timeoutExpired) {
+ cancel(); // ensure cancelled before calling onTimeout
+ joiner.onTimeout();
+ } else {
+ cancelTimeout();
+ }
+
// invoke joiner to get result
try {
return joiner.result();
} catch (Throwable e) {
throw new FailedException(e);
}
private final StructuredTaskScopeImpl<? super T, ?> scope;
private final Callable<? extends T> task;
private volatile Object result;
+ @Stable private Thread thread;
SubtaskImpl(StructuredTaskScopeImpl<? super T, ?> scope, Callable<? extends T> task) {
this.scope = scope;
this.task = task;
}
+ /**
+ * Sets the thread for this subtask.
+ */
+ void setThread(Thread thread) {
+ assert thread.getState() == Thread.State.NEW;
+ this.thread = thread;
+ }
+
+ /**
+ * Throws IllegalStateException if the caller thread is not the subtask and
+ * the scope owner has not joined.
+ */
+ private void ensureJoinedIfNotSubtask() {
+ if (Thread.currentThread() != thread && !scope.isJoinCompleted()) {
+ throw new IllegalStateException();
+ }
+ }
+
@Override
public void run() {
+ if (Thread.currentThread() != thread) {
+ throw new WrongThreadException();
+ }
+
T result = null;
Throwable ex = null;
try {
result = task.call();
} catch (Throwable e) {
}
}
@Override
public T get() {
! scope.ensureJoinedIfOwner();
Object result = this.result;
if (result instanceof AltResult) {
if (result == RESULT_NULL) return null;
} else if (result != null) {
@SuppressWarnings("unchecked")
}
}
@Override
public T get() {
! ensureJoinedIfNotSubtask();
Object result = this.result;
if (result instanceof AltResult) {
if (result == RESULT_NULL) return null;
} else if (result != null) {
@SuppressWarnings("unchecked")
"Result is unavailable or subtask did not complete successfully");
}
@Override
public Throwable exception() {
! scope.ensureJoinedIfOwner();
Object result = this.result;
if (result instanceof AltResult alt && alt.state() == State.FAILED) {
return alt.exception();
}
throw new IllegalStateException(
"Result is unavailable or subtask did not complete successfully");
}
@Override
public Throwable exception() {
! ensureJoinedIfNotSubtask();
Object result = this.result;
if (result instanceof AltResult alt && alt.state() == State.FAILED) {
return alt.exception();
}
throw new IllegalStateException(
< prev index next >