< prev index next > src/java.base/share/classes/java/lang/VirtualThread.java
Print this page
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.lang;
+ import java.nio.charset.StandardCharsets;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.Callable;
*/
final class VirtualThread extends BaseVirtualThread {
private static final Unsafe U = Unsafe.getUnsafe();
private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
! private static final ScheduledExecutorService UNPARKER = createDelayedTaskScheduler();
private static final int TRACE_PINNING_MODE = tracePinningMode();
private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
*/
final class VirtualThread extends BaseVirtualThread {
private static final Unsafe U = Unsafe.getUnsafe();
private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
! private static final ScheduledExecutorService[] DELAYED_TASK_SCHEDULERS = createDelayedTaskSchedulers();
private static final int TRACE_PINNING_MODE = tracePinningMode();
private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
* TIMED_PARKED -> RUNNABLE // unparked, schedule to continue
* TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier
*
* RUNNABLE -> RUNNING // continue execution
*
+ * RUNNABLE -> BLOCKING // blocking on monitor enter
+ * BLOCKING -> BLOCKED // blocked on monitor enter
+ * BLOCKED -> RUNNABLE // unblocked
+ *
* RUNNING -> YIELDING // Thread.yield
* YIELDING -> RUNNABLE // yield successful
* YIELDING -> RUNNING // yield failed
*/
private static final int NEW = 0;
private static final int TIMED_PARKED = 8;
private static final int TIMED_PINNED = 9;
private static final int YIELDING = 10; // Thread.yield
+ // monitor enter
+ private static final int BLOCKING = 11;
+ private static final int BLOCKED = 12;
+
private static final int TERMINATED = 99; // final state
// can be suspended from scheduling when unmounted
private static final int SUSPENDED = 1 << 8;
// parking permit
private volatile boolean parkPermit;
+ // unblocked
+ private volatile boolean unblocked;
+
// carrier thread when mounted, accessed by VM
private volatile Thread carrierThread;
// termination object when joining, created lazily if needed
private volatile CountDownLatch termination;
}
@Override
protected void onPinned(Continuation.Pinned reason) {
if (TRACE_PINNING_MODE > 0) {
boolean printAll = (TRACE_PINNING_MODE == 1);
! PinnedThreadPrinter.printStackTrace(System.out, printAll);
}
}
private static Runnable wrap(VirtualThread vthread, Runnable task) {
return new Runnable() {
@Hidden
}
@Override
protected void onPinned(Continuation.Pinned reason) {
if (TRACE_PINNING_MODE > 0) {
boolean printAll = (TRACE_PINNING_MODE == 1);
! VirtualThread vthread = (VirtualThread) Thread.currentThread();
+ int oldState = vthread.state();
+ try {
+ // avoid printing when in transition states
+ vthread.setState(RUNNING);
+ PinnedThreadPrinter.printStackTrace(System.out, printAll);
+ } finally {
+ vthread.setState(oldState);
+ }
}
}
private static Runnable wrap(VirtualThread vthread, Runnable task) {
return new Runnable() {
@Hidden
}
/**
* Submits the runContinuation task to the scheduler. For the default scheduler,
* and calling it on a worker thread, the task will be pushed to the local queue,
! * otherwise it will be pushed to a submission queue.
- *
* @throws RejectedExecutionException
*/
private void submitRunContinuation() {
try {
scheduler.execute(runContinuation);
}
/**
* Submits the runContinuation task to the scheduler. For the default scheduler,
* and calling it on a worker thread, the task will be pushed to the local queue,
! * otherwise it will be pushed to an external submission queue.
* @throws RejectedExecutionException
*/
private void submitRunContinuation() {
try {
scheduler.execute(runContinuation);
throw ree;
}
}
/**
! * Submits the runContinuation task to the scheduler with a lazy submit.
* @throws RejectedExecutionException
* @see ForkJoinPool#lazySubmit(ForkJoinTask)
*/
private void lazySubmitRunContinuation(ForkJoinPool pool) {
try {
throw ree;
}
}
/**
! * Submits the runContinuation task the scheduler. For the default scheduler, the task
+ * will be pushed to an external submission queue.
+ * @throws RejectedExecutionException
+ */
+ private void externalSubmitRunContinuation() {
+ if (scheduler == DEFAULT_SCHEDULER
+ && currentCarrierThread() instanceof CarrierThread ct) {
+ externalSubmitRunContinuation(ct.getPool());
+ } else {
+ submitRunContinuation();
+ }
+ }
+
+ /**
+ * Submits the runContinuation task to given scheduler with a lazy submit.
* @throws RejectedExecutionException
* @see ForkJoinPool#lazySubmit(ForkJoinTask)
*/
private void lazySubmitRunContinuation(ForkJoinPool pool) {
try {
throw ree;
}
}
/**
! * Submits the runContinuation task to the scheduler as an external submit.
* @throws RejectedExecutionException
* @see ForkJoinPool#externalSubmit(ForkJoinTask)
*/
private void externalSubmitRunContinuation(ForkJoinPool pool) {
try {
throw ree;
}
}
/**
! * Submits the runContinuation task to the given scheduler as an external submit.
* @throws RejectedExecutionException
* @see ForkJoinPool#externalSubmit(ForkJoinTask)
*/
private void externalSubmitRunContinuation(ForkJoinPool pool) {
try {
* current thread is the current platform thread.
*/
@ChangesCurrentThread
@ReservedStackAccess
private void unmount() {
+ assert !Thread.holdsLock(interruptLock);
+
// set Thread.currentThread() to return the platform thread
Thread carrier = this.carrierThread;
carrier.setCurrentThread(carrier);
// break connection to carrier thread, synchronized with interrupt
submitRunContinuation();
}
return;
}
+ // blocking on monitorenter
+ if (s == BLOCKING) {
+ setState(BLOCKED);
+ if (unblocked && compareAndSetState(BLOCKED, RUNNABLE)) {
+ unblocked = false;
+ submitRunContinuation();
+ }
+ return;
+ }
+
assert false;
}
/**
* Invoked after the continuation completes.
addedToContainer = true;
// scoped values may be inherited
inheritScopedValueBindings(container);
! // submit task to run thread
! submitRunContinuation();
started = true;
} finally {
if (!started) {
afterDone(addedToContainer);
}
addedToContainer = true;
// scoped values may be inherited
inheritScopedValueBindings(container);
! // submit task to run thread, using externalSubmit if possible
! externalSubmitRunContinuation();
started = true;
} finally {
if (!started) {
afterDone(addedToContainer);
}
// park the thread for the waiting time
if (nanos > 0) {
long startTime = System.nanoTime();
boolean yielded = false;
! Future<?> unparker = scheduleUnpark(this::unpark, nanos);
setState(TIMED_PARKING);
try {
yielded = yieldContinuation(); // may throw
} finally {
assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
// park the thread for the waiting time
if (nanos > 0) {
long startTime = System.nanoTime();
boolean yielded = false;
! Future<?> unparker = scheduleUnpark(nanos); // may throw OOME
setState(TIMED_PARKING);
try {
yielded = yieldContinuation(); // may throw
} finally {
assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
}
}
}
/**
! * Schedule an unpark task to run after a given delay.
*/
@ChangesCurrentThread
! private Future<?> scheduleUnpark(Runnable unparker, long nanos) {
// need to switch to current carrier thread to avoid nested parking
switchToCarrierThread();
try {
! return UNPARKER.schedule(unparker, nanos, NANOSECONDS);
} finally {
switchToVirtualThread(this);
}
}
/**
* Cancels a task if it has not completed.
*/
@ChangesCurrentThread
private void cancel(Future<?> future) {
if (!future.isDone()) {
// need to switch to current carrier thread to avoid nested parking
switchToCarrierThread();
try {
future.cancel(false);
}
}
}
/**
! * Schedule this virtual thread to be unparked after a given delay.
*/
@ChangesCurrentThread
! private Future<?> scheduleUnpark(long nanos) {
+ assert Thread.currentThread() == this;
// need to switch to current carrier thread to avoid nested parking
switchToCarrierThread();
try {
! return delayedTaskScheduler().schedule(this::unpark, nanos, NANOSECONDS);
} finally {
switchToVirtualThread(this);
}
}
/**
* Cancels a task if it has not completed.
*/
@ChangesCurrentThread
private void cancel(Future<?> future) {
+ assert Thread.currentThread() == this;
if (!future.isDone()) {
// need to switch to current carrier thread to avoid nested parking
switchToCarrierThread();
try {
future.cancel(false);
}
} else {
submitRunContinuation();
}
} else if ((s == PINNED) || (s == TIMED_PINNED)) {
! // unpark carrier thread when pinned.
synchronized (carrierThreadAccessLock()) {
Thread carrier = carrierThread;
if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) {
U.unpark(carrier);
}
}
}
}
}
/**
* Attempts to yield the current virtual thread (Thread.yield).
*/
void tryYield() {
assert Thread.currentThread() == this;
}
} else {
submitRunContinuation();
}
} else if ((s == PINNED) || (s == TIMED_PINNED)) {
! // unpark carrier thread when pinned
synchronized (carrierThreadAccessLock()) {
Thread carrier = carrierThread;
if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) {
U.unpark(carrier);
}
}
}
}
}
+ /**
+ * Re-enables this virtual thread for scheduling after blocking on monitor enter.
+ * @throws RejectedExecutionException if the scheduler cannot accept a task
+ */
+ private void unblock() {
+ assert !Thread.currentThread().isVirtual();
+ unblocked = true;
+ if (state() == BLOCKED && compareAndSetState(BLOCKED, RUNNABLE)) {
+ unblocked = false;
+ submitRunContinuation();
+ }
+ }
+
/**
* Attempts to yield the current virtual thread (Thread.yield).
*/
void tryYield() {
assert Thread.currentThread() == this;
checkAccess();
synchronized (interruptLock) {
interrupted = true;
Interruptible b = nioBlocker;
if (b != null) {
! b.interrupt(this);
}
// interrupt carrier thread if mounted
Thread carrier = carrierThread;
if (carrier != null) carrier.setInterrupt();
checkAccess();
synchronized (interruptLock) {
interrupted = true;
Interruptible b = nioBlocker;
if (b != null) {
! // ensure current thread doesn't unmount while holding interruptLock
+ Continuation.pin();
+ try {
+ b.interrupt(this);
+ } finally {
+ Continuation.unpin();
+ }
}
// interrupt carrier thread if mounted
Thread carrier = carrierThread;
if (carrier != null) carrier.setInterrupt();
@Override
boolean getAndClearInterrupt() {
assert Thread.currentThread() == this;
boolean oldValue = interrupted;
if (oldValue) {
! synchronized (interruptLock) {
! interrupted = false;
! carrierThread.clearInterrupt();
}
}
return oldValue;
}
@Override
boolean getAndClearInterrupt() {
assert Thread.currentThread() == this;
boolean oldValue = interrupted;
if (oldValue) {
! // ensure current thread doesn't unmount trying to enter interruptLock
! Continuation.pin();
! try {
+ synchronized (interruptLock) {
+ interrupted = false;
+ carrierThread.clearInterrupt();
+ }
+ } finally {
+ Continuation.unpin();
}
}
return oldValue;
}
case RUNNABLE:
// runnable, not mounted
return Thread.State.RUNNABLE;
case RUNNING:
// if mounted then return state of carrier thread
! synchronized (carrierThreadAccessLock()) {
! Thread carrierThread = this.carrierThread;
! if (carrierThread != null) {
! return carrierThread.threadState();
}
}
// runnable, mounted
return Thread.State.RUNNABLE;
case PARKING:
case TIMED_PARKING:
case YIELDING:
! // runnable, mounted, not yet waiting
return Thread.State.RUNNABLE;
case PARKED:
case PINNED:
return State.WAITING;
case TIMED_PARKED:
case TIMED_PINNED:
return State.TIMED_WAITING;
case TERMINATED:
return Thread.State.TERMINATED;
default:
throw new InternalError();
}
case RUNNABLE:
// runnable, not mounted
return Thread.State.RUNNABLE;
case RUNNING:
// if mounted then return state of carrier thread
! if (Thread.currentThread() != this) {
! synchronized (carrierThreadAccessLock()) {
! Thread carrier = this.carrierThread;
! if (carrier != null) {
+ return carrier.threadState();
+ }
}
}
// runnable, mounted
return Thread.State.RUNNABLE;
case PARKING:
case TIMED_PARKING:
case YIELDING:
! case BLOCKING:
+ // runnable, not yet waiting/blocked
return Thread.State.RUNNABLE;
case PARKED:
case PINNED:
return State.WAITING;
case TIMED_PARKED:
case TIMED_PINNED:
return State.TIMED_WAITING;
+ case BLOCKED:
+ return State.BLOCKED;
case TERMINATED:
return Thread.State.TERMINATED;
default:
throw new InternalError();
}
* Returns null if the thread is in another state.
*/
private StackTraceElement[] tryGetStackTrace() {
int initialState = state();
return switch (initialState) {
! case RUNNABLE, PARKED, TIMED_PARKED -> {
int suspendedState = initialState | SUSPENDED;
if (compareAndSetState(initialState, suspendedState)) {
try {
yield cont.getStackTrace();
} finally {
assert state == suspendedState;
setState(initialState);
// re-submit if runnable
! // re-submit if unparked while suspended
if (initialState == RUNNABLE
! || (parkPermit && compareAndSetState(initialState, RUNNABLE))) {
try {
submitRunContinuation();
} catch (RejectedExecutionException ignore) { }
}
}
* Returns null if the thread is in another state.
*/
private StackTraceElement[] tryGetStackTrace() {
int initialState = state();
return switch (initialState) {
! case RUNNABLE, PARKED, BLOCKED, TIMED_PARKED -> {
int suspendedState = initialState | SUSPENDED;
if (compareAndSetState(initialState, suspendedState)) {
try {
yield cont.getStackTrace();
} finally {
assert state == suspendedState;
setState(initialState);
// re-submit if runnable
! // re-submit if unparked or unblocked while suspended
if (initialState == RUNNABLE
! || ((parkPermit || unblocked)
+ && compareAndSetState(initialState, RUNNABLE))) {
try {
submitRunContinuation();
} catch (RejectedExecutionException ignore) { }
}
}
if (!name.isEmpty()) {
sb.append(",");
sb.append(name);
}
sb.append("]/");
Thread carrier = carrierThread;
! if (carrier != null) {
! // include the carrier thread state and name when mounted
! synchronized (carrierThreadAccessLock()) {
! carrier = carrierThread;
! if (carrier != null) {
! String stateAsString = carrier.threadState().toString();
! sb.append(stateAsString.toLowerCase(Locale.ROOT));
- sb.append('@');
- sb.append(carrier.getName());
}
}
}
// include virtual thread state when not mounted
if (carrier == null) {
String stateAsString = threadState().toString();
sb.append(stateAsString.toLowerCase(Locale.ROOT));
}
return sb.toString();
}
@Override
public int hashCode() {
return (int) threadId();
}
@Override
public boolean equals(Object obj) {
return obj == this;
}
/**
* Returns the termination object, creating it if needed.
*/
private CountDownLatch getTermination() {
CountDownLatch termination = this.termination;
if (!name.isEmpty()) {
sb.append(",");
sb.append(name);
}
sb.append("]/");
+
+ // include the carrier thread state and name when mounted
Thread carrier = carrierThread;
! if (Thread.currentThread() == this) {
! appendCarrierInfo(sb, carrier);
! } else if (carrier != null) {
! if (Thread.currentThread() != this) {
! synchronized (carrierThreadAccessLock()) {
! carrier = carrierThread; // re-read
! appendCarrierInfo(sb, carrier);
}
}
}
+
// include virtual thread state when not mounted
if (carrier == null) {
String stateAsString = threadState().toString();
sb.append(stateAsString.toLowerCase(Locale.ROOT));
}
return sb.toString();
}
+ /**
+ * Appends the carrier's state and name to the given string builder when mounted.
+ */
+ private void appendCarrierInfo(StringBuilder sb, Thread carrier) {
+ if (carrier != null) {
+ String stateAsString = carrier.threadState().toString();
+ sb.append(stateAsString.toLowerCase(Locale.ROOT));
+ sb.append('@');
+ sb.append(carrier.getName());
+ }
+ }
+
@Override
public int hashCode() {
return (int) threadId();
}
@Override
public boolean equals(Object obj) {
return obj == this;
}
+ /**
+ * Returns a ScheduledExecutorService to execute a delayed task.
+ */
+ private ScheduledExecutorService delayedTaskScheduler() {
+ long tid = Thread.currentThread().threadId();
+ int index = (int) tid & (DELAYED_TASK_SCHEDULERS.length - 1);
+ return DELAYED_TASK_SCHEDULERS[index];
+ }
+
/**
* Returns the termination object, creating it if needed.
*/
private CountDownLatch getTermination() {
CountDownLatch termination = this.termination;
int parallelism, maxPoolSize, minRunnable;
String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
if (parallelismValue != null) {
! parallelism = Integer.parseInt(parallelismValue);
} else {
parallelism = Runtime.getRuntime().availableProcessors();
}
if (maxPoolSizeValue != null) {
maxPoolSize = Integer.parseInt(maxPoolSizeValue);
! parallelism = Integer.min(parallelism, maxPoolSize);
} else {
maxPoolSize = Integer.max(parallelism, 256);
}
if (minRunnableValue != null) {
minRunnable = Integer.parseInt(minRunnableValue);
int parallelism, maxPoolSize, minRunnable;
String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
if (parallelismValue != null) {
! parallelism = Integer.max(Integer.parseInt(parallelismValue), 1);
} else {
parallelism = Runtime.getRuntime().availableProcessors();
}
if (maxPoolSizeValue != null) {
maxPoolSize = Integer.parseInt(maxPoolSizeValue);
! if (maxPoolSize > 0) {
+ parallelism = Integer.min(parallelism, maxPoolSize);
+ } else {
+ maxPoolSize = parallelism; // no spares
+ }
} else {
maxPoolSize = Integer.max(parallelism, 256);
}
if (minRunnableValue != null) {
minRunnable = Integer.parseInt(minRunnableValue);
};
return AccessController.doPrivileged(pa);
}
/**
! * Creates the ScheduledThreadPoolExecutor used for timed unpark.
*/
! private static ScheduledExecutorService createDelayedTaskScheduler() {
! String propValue = GetPropertyAction.privilegedGetProperty("jdk.unparker.maxPoolSize");
! int poolSize;
if (propValue != null) {
! poolSize = Integer.parseInt(propValue);
} else {
! poolSize = 1;
}
! ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor)
! Executors.newScheduledThreadPool(poolSize, task -> {
! return InnocuousThread.newThread("VirtualThread-unparker", task);
! });
! stpe.setRemoveOnCancelPolicy(true);
! return stpe;
}
/**
* Reads the value of the jdk.tracePinnedThreads property to determine if stack
* traces should be printed when a carrier thread is pinned when a virtual thread
};
return AccessController.doPrivileged(pa);
}
/**
! * Invoked by the VM for the Thread.vthread_scheduler diagnostic command.
*/
! private static byte[] printDefaultScheduler() {
! return String.format("%s%n", DEFAULT_SCHEDULER.toString())
! .getBytes(StandardCharsets.UTF_8);
+ }
+
+ /**
+ * Creates the ScheduledThreadPoolExecutors used to execute delayed tasks.
+ */
+ private static ScheduledExecutorService[] createDelayedTaskSchedulers() {
+ String propName = "jdk.virtualThreadScheduler.timerQueues";
+ String propValue = GetPropertyAction.privilegedGetProperty(propName);
+ int queueCount;
if (propValue != null) {
! queueCount = Integer.parseInt(propValue);
+ if (queueCount != Integer.highestOneBit(queueCount)) {
+ throw new RuntimeException("Value of " + propName + " must be power of 2");
+ }
} else {
! int ncpus = Runtime.getRuntime().availableProcessors();
+ queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1);
}
! var schedulers = new ScheduledExecutorService[queueCount];
! for (int i = 0; i < queueCount; i++) {
! ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor)
! Executors.newScheduledThreadPool(1, task -> {
! Thread t = InnocuousThread.newThread("VirtualThread-unparker", task);
! t.setDaemon(true);
+ return t;
+ });
+ stpe.setRemoveOnCancelPolicy(true);
+ schedulers[i] = stpe;
+ }
+ return schedulers;
}
/**
* Reads the value of the jdk.tracePinnedThreads property to determine if stack
* traces should be printed when a carrier thread is pinned when a virtual thread
if ("short".equalsIgnoreCase(propValue))
return 2;
}
return 0;
}
+
+ /**
+ * Unblock virtual threads that are ready to be scheduled again.
+ */
+ private static void processPendingList() {
+ // TBD invoke unblock
+ }
+
+ static {
+ var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
+ VirtualThread::processPendingList);
+ unblocker.setDaemon(true);
+ unblocker.start();
+ }
}
< prev index next >