< prev index next > src/java.base/share/classes/java/lang/VirtualThread.java
Print this page
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.lang;
+ import java.nio.charset.StandardCharsets;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import jdk.internal.event.VirtualThreadEndEvent;
- import jdk.internal.event.VirtualThreadPinnedEvent;
import jdk.internal.event.VirtualThreadStartEvent;
import jdk.internal.event.VirtualThreadSubmitFailedEvent;
import jdk.internal.misc.CarrierThread;
import jdk.internal.misc.InnocuousThread;
import jdk.internal.misc.Unsafe;
*/
final class VirtualThread extends BaseVirtualThread {
private static final Unsafe U = Unsafe.getUnsafe();
private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
! private static final ScheduledExecutorService UNPARKER = createDelayedTaskScheduler();
- private static final int TRACE_PINNING_MODE = tracePinningMode();
private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
// scheduler and continuation
private final Executor scheduler;
private final Continuation cont;
private final Runnable runContinuation;
*/
final class VirtualThread extends BaseVirtualThread {
private static final Unsafe U = Unsafe.getUnsafe();
private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
! private static final ScheduledExecutorService[] DELAYED_TASK_SCHEDULERS = createDelayedTaskSchedulers();
private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
+ private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
// scheduler and continuation
private final Executor scheduler;
private final Continuation cont;
private final Runnable runContinuation;
* TIMED_PARKING -> TIMED_PARKED // cont.yield successful, timed-parked
* TIMED_PARKING -> TIMED_PINNED // cont.yield failed, timed-parked on carrier
* TIMED_PARKED -> UNPARKED // unparked, may be scheduled to continue
* TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier
*
+ * RUNNABLE -> BLOCKING // blocking on monitor enter
+ * BLOCKING -> BLOCKED // blocked on monitor enter
+ * BLOCKED -> UNBLOCKED // unblocked, may be scheduled to continue
+ * UNBLOCKED -> RUNNING // continue execution after blocked on monitor enter
+ *
* RUNNING -> YIELDING // Thread.yield
* YIELDING -> YIELDED // cont.yield successful, may be scheduled to continue
* YIELDING -> RUNNING // cont.yield failed
* YIELDED -> RUNNING // continue execution after Thread.yield
*/
// Thread.yield
private static final int YIELDING = 10;
private static final int YIELDED = 11; // unmounted but runnable
+ // monitor enter
+ private static final int BLOCKING = 12;
+ private static final int BLOCKED = 13; // unmounted
+ private static final int UNBLOCKED = 14; // unmounted but runnable
+
private static final int TERMINATED = 99; // final state
// can be suspended from scheduling when unmounted
private static final int SUSPENDED = 1 << 8;
// parking permit
private volatile boolean parkPermit;
+ // used to mark thread as ready to be unblocked while it is concurrently blocking
+ private volatile boolean unblocked;
+
+ // a positive value if "responsible thread" blocked on monitor enter, accessed by VM
+ private volatile byte recheckInterval;
+
// carrier thread when mounted, accessed by VM
private volatile Thread carrierThread;
// termination object when joining, created lazily if needed
private volatile CountDownLatch termination;
+ // has the value 1 when on the list of virtual threads waiting to be unblocked
+ private volatile byte onWaitingList;
+
+ // next virtual thread on the list of virtual threads waiting to be unblocked
+ private volatile VirtualThread next;
+
/**
* Returns the continuation scope used for virtual threads.
*/
static ContinuationScope continuationScope() {
return VTHREAD_SCOPE;
VThreadContinuation(VirtualThread vthread, Runnable task) {
super(VTHREAD_SCOPE, wrap(vthread, task));
}
@Override
protected void onPinned(Continuation.Pinned reason) {
! if (TRACE_PINNING_MODE > 0) {
! boolean printAll = (TRACE_PINNING_MODE == 1);
- VirtualThread vthread = (VirtualThread) Thread.currentThread();
- int oldState = vthread.state();
- try {
- // avoid printing when in transition states
- vthread.setState(RUNNING);
- PinnedThreadPrinter.printStackTrace(System.out, reason, printAll);
- } finally {
- vthread.setState(oldState);
- }
- }
}
private static Runnable wrap(VirtualThread vthread, Runnable task) {
return new Runnable() {
@Hidden
public void run() {
VThreadContinuation(VirtualThread vthread, Runnable task) {
super(VTHREAD_SCOPE, wrap(vthread, task));
}
@Override
protected void onPinned(Continuation.Pinned reason) {
! // emit JFR event
! virtualThreadPinnedEvent(reason.value());
}
private static Runnable wrap(VirtualThread vthread, Runnable task) {
return new Runnable() {
@Hidden
public void run() {
throw new WrongThreadException();
}
// set state to RUNNING
int initialState = state();
! if (initialState == STARTED || initialState == UNPARKED || initialState == YIELDED) {
// newly started or continue after parking/blocking/Thread.yield
if (!compareAndSetState(initialState, RUNNING)) {
return;
}
// consume parking permit when continuing after parking
throw new WrongThreadException();
}
// set state to RUNNING
int initialState = state();
! if (initialState == STARTED || initialState == UNPARKED
+ || initialState == UNBLOCKED || initialState == YIELDED) {
// newly started or continue after parking/blocking/Thread.yield
if (!compareAndSetState(initialState, RUNNING)) {
return;
}
// consume parking permit when continuing after parking
submitFailed(ree);
throw ree;
}
}
+ /**
+ * Submits the runContinuation task the scheduler. For the default scheduler, the task
+ * will be pushed to an external submission queue.
+ * @throws RejectedExecutionException
+ */
+ private void externalSubmitRunContinuation() {
+ if (scheduler == DEFAULT_SCHEDULER
+ && currentCarrierThread() instanceof CarrierThread ct) {
+ externalSubmitRunContinuation(ct.getPool());
+ } else {
+ submitRunContinuation();
+ }
+ }
+
/**
* Submits the runContinuation task to given scheduler with a lazy submit.
* @throws RejectedExecutionException
* @see ForkJoinPool#lazySubmit(ForkJoinTask)
*/
* current thread is the current platform thread.
*/
@ChangesCurrentThread
@ReservedStackAccess
private void unmount() {
+ assert !Thread.holdsLock(interruptLock);
+
// set Thread.currentThread() to return the platform thread
Thread carrier = this.carrierThread;
carrier.setCurrentThread(carrier);
// break connection to carrier thread, synchronized with interrupt
notifyJvmtiHideFrames(true);
Thread carrier = this.carrierThread;
assert Thread.currentThread() == this
&& carrier == Thread.currentCarrierThread();
carrier.setCurrentThread(carrier);
+ setLockId(this.threadId()); // keep lockid of vthread
}
/**
* Sets the current thread to the given virtual thread.
*/
* If yielding due to Thread.yield then it just submits the task to continue.
*/
private void afterYield() {
assert carrierThread == null;
+ // re-adjust parallelism if the virtual thread yielded when compensating
+ if (currentThread() instanceof CarrierThread ct) {
+ ct.endBlocking();
+ }
+
int s = state();
// LockSupport.park/parkNanos
if (s == PARKING || s == TIMED_PARKING) {
int newState = (s == PARKING) ? PARKED : TIMED_PARKED;
submitRunContinuation();
}
return;
}
+ // blocking on monitorenter
+ if (s == BLOCKING) {
+ setState(BLOCKED);
+
+ // may have been unblocked while blocking
+ if (unblocked && compareAndSetState(BLOCKED, UNBLOCKED)) {
+ unblocked = false;
+ submitRunContinuation();
+ return;
+ }
+
+ // if thread is the designated responsible thread for a monitor then schedule
+ // it to wakeup so that it can check and recover. See objectMonitor.cpp.
+ int recheckInterval = this.recheckInterval;
+ if (recheckInterval > 0 && state() == BLOCKED) {
+ assert recheckInterval >= 1 && recheckInterval <= 6;
+ // 4 ^ (recheckInterval - 1) = 1, 4, 16, ... 1024
+ long delay = 1 << (recheckInterval - 1) << (recheckInterval - 1);
+ Future<?> unblocker = delayedTaskScheduler().schedule(this::unblock, delay, MILLISECONDS);
+ // cancel if unblocked while scheduling the unblock
+ if (state() != BLOCKED) {
+ unblocker.cancel(false);
+ }
+ }
+ return;
+ }
+
assert false;
}
/**
* Invoked after the continuation completes.
addedToContainer = true;
// scoped values may be inherited
inheritScopedValueBindings(container);
! // submit task to run thread
! submitRunContinuation();
started = true;
} finally {
if (!started) {
afterDone(addedToContainer);
}
addedToContainer = true;
// scoped values may be inherited
inheritScopedValueBindings(container);
! // submit task to run thread, using externalSubmit if possible
! externalSubmitRunContinuation();
started = true;
} finally {
if (!started) {
afterDone(addedToContainer);
}
* @param nanos the waiting time in nanoseconds
*/
private void parkOnCarrierThread(boolean timed, long nanos) {
assert state() == RUNNING;
- VirtualThreadPinnedEvent event;
- try {
- event = new VirtualThreadPinnedEvent();
- event.begin();
- } catch (OutOfMemoryError e) {
- event = null;
- }
-
setState(timed ? TIMED_PINNED : PINNED);
try {
if (!parkPermit) {
if (!timed) {
U.park(false, 0);
setState(RUNNING);
}
// consume parking permit
setParkPermit(false);
-
- if (event != null) {
- try {
- event.commit();
- } catch (OutOfMemoryError e) {
- // ignore
- }
- }
}
/**
* Schedule this virtual thread to be unparked after a given delay.
*/
@ChangesCurrentThread
private Future<?> scheduleUnpark(long nanos) {
assert Thread.currentThread() == this;
// need to switch to current carrier thread to avoid nested parking
switchToCarrierThread();
try {
! return UNPARKER.schedule(this::unpark, nanos, NANOSECONDS);
} finally {
switchToVirtualThread(this);
}
}
/**
* Cancels a task if it has not completed.
*/
@ChangesCurrentThread
private void cancel(Future<?> future) {
if (!future.isDone()) {
// need to switch to current carrier thread to avoid nested parking
switchToCarrierThread();
try {
future.cancel(false);
setState(RUNNING);
}
// consume parking permit
setParkPermit(false);
}
+ /**
+ * jdk.VirtualThreadPinned is emitted by HotSpot VM when pinned. Call into VM to
+ * emit event to avoid having a JFR in Java with the same name (but different ID)
+ * to events emitted by the VM.
+ */
+ private static native void virtualThreadPinnedEvent(int reason);
+
/**
* Schedule this virtual thread to be unparked after a given delay.
*/
@ChangesCurrentThread
private Future<?> scheduleUnpark(long nanos) {
assert Thread.currentThread() == this;
// need to switch to current carrier thread to avoid nested parking
switchToCarrierThread();
try {
! return delayedTaskScheduler().schedule(this::unpark, nanos, NANOSECONDS);
} finally {
switchToVirtualThread(this);
}
}
/**
* Cancels a task if it has not completed.
*/
@ChangesCurrentThread
private void cancel(Future<?> future) {
+ assert Thread.currentThread() == this;
if (!future.isDone()) {
// need to switch to current carrier thread to avoid nested parking
switchToCarrierThread();
try {
future.cancel(false);
} else {
submitRunContinuation();
}
} else if ((s == PINNED) || (s == TIMED_PINNED)) {
// unpark carrier thread when pinned
! notifyJvmtiDisableSuspend(true);
try {
synchronized (carrierThreadAccessLock()) {
Thread carrier = carrierThread;
if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) {
U.unpark(carrier);
}
}
} finally {
! notifyJvmtiDisableSuspend(false);
}
}
}
}
/**
* Attempts to yield the current virtual thread (Thread.yield).
*/
void tryYield() {
assert Thread.currentThread() == this;
} else {
submitRunContinuation();
}
} else if ((s == PINNED) || (s == TIMED_PINNED)) {
// unpark carrier thread when pinned
! disableSuspendAndPreempt();
try {
synchronized (carrierThreadAccessLock()) {
Thread carrier = carrierThread;
if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) {
U.unpark(carrier);
}
}
} finally {
! enableSuspendAndPreempt();
}
}
}
}
+ /**
+ * Re-enables this virtual thread for scheduling after blocking on monitor enter.
+ * @throws RejectedExecutionException if the scheduler cannot accept a task
+ */
+ private void unblock() {
+ assert !Thread.currentThread().isVirtual();
+ unblocked = true;
+ if (state() == BLOCKED && compareAndSetState(BLOCKED, UNBLOCKED)) {
+ unblocked = false;
+ submitRunContinuation();
+ }
+ }
+
/**
* Attempts to yield the current virtual thread (Thread.yield).
*/
void tryYield() {
assert Thread.currentThread() == this;
return true;
}
@Override
void blockedOn(Interruptible b) {
! notifyJvmtiDisableSuspend(true);
try {
super.blockedOn(b);
} finally {
! notifyJvmtiDisableSuspend(false);
}
}
@Override
@SuppressWarnings("removal")
public void interrupt() {
if (Thread.currentThread() != this) {
checkAccess();
// if current thread is a virtual thread then prevent it from being
! // suspended when entering or holding interruptLock
Interruptible blocker;
! notifyJvmtiDisableSuspend(true);
try {
synchronized (interruptLock) {
interrupted = true;
blocker = nioBlocker();
if (blocker != null) {
return true;
}
@Override
void blockedOn(Interruptible b) {
! disableSuspendAndPreempt();
try {
super.blockedOn(b);
} finally {
! enableSuspendAndPreempt();
}
}
@Override
@SuppressWarnings("removal")
public void interrupt() {
if (Thread.currentThread() != this) {
checkAccess();
// if current thread is a virtual thread then prevent it from being
! // suspended or unmounted when entering or holding interruptLock
Interruptible blocker;
! disableSuspendAndPreempt();
try {
synchronized (interruptLock) {
interrupted = true;
blocker = nioBlocker();
if (blocker != null) {
// interrupt carrier thread if mounted
Thread carrier = carrierThread;
if (carrier != null) carrier.setInterrupt();
}
} finally {
! notifyJvmtiDisableSuspend(false);
}
// notify blocker after releasing interruptLock
if (blocker != null) {
blocker.postInterrupt();
}
} else {
interrupted = true;
carrierThread.setInterrupt();
}
unpark();
// interrupt carrier thread if mounted
Thread carrier = carrierThread;
if (carrier != null) carrier.setInterrupt();
}
} finally {
! enableSuspendAndPreempt();
}
// notify blocker after releasing interruptLock
if (blocker != null) {
blocker.postInterrupt();
}
+
} else {
interrupted = true;
carrierThread.setInterrupt();
}
unpark();
@Override
boolean getAndClearInterrupt() {
assert Thread.currentThread() == this;
boolean oldValue = interrupted;
if (oldValue) {
! notifyJvmtiDisableSuspend(true);
try {
synchronized (interruptLock) {
interrupted = false;
carrierThread.clearInterrupt();
}
} finally {
! notifyJvmtiDisableSuspend(false);
}
}
return oldValue;
}
@Override
boolean getAndClearInterrupt() {
assert Thread.currentThread() == this;
boolean oldValue = interrupted;
if (oldValue) {
! disableSuspendAndPreempt();
try {
synchronized (interruptLock) {
interrupted = false;
carrierThread.clearInterrupt();
}
} finally {
! enableSuspendAndPreempt();
}
}
return oldValue;
}
}
case UNPARKED:
case YIELDED:
// runnable, not mounted
return Thread.State.RUNNABLE;
case RUNNING:
// if mounted then return state of carrier thread
! notifyJvmtiDisableSuspend(true);
! try {
! synchronized (carrierThreadAccessLock()) {
! Thread carrierThread = this.carrierThread;
! if (carrierThread != null) {
! return carrierThread.threadState();
}
}
- } finally {
- notifyJvmtiDisableSuspend(false);
}
// runnable, mounted
return Thread.State.RUNNABLE;
case PARKING:
case TIMED_PARKING:
}
case UNPARKED:
case YIELDED:
// runnable, not mounted
return Thread.State.RUNNABLE;
+ case UNBLOCKED:
+ // if designated responsible thread for monitor then thread is blocked
+ if (isResponsibleForMonitor()) {
+ return Thread.State.BLOCKED;
+ } else {
+ return Thread.State.RUNNABLE;
+ }
case RUNNING:
+ // if designated responsible thread for monitor then thread is blocked
+ if (isResponsibleForMonitor()) {
+ return Thread.State.BLOCKED;
+ }
// if mounted then return state of carrier thread
! if (Thread.currentThread() != this) {
! disableSuspendAndPreempt();
! try {
! synchronized (carrierThreadAccessLock()) {
! Thread carrierThread = this.carrierThread;
! if (carrierThread != null) {
+ return carrierThread.threadState();
+ }
}
+ } finally {
+ enableSuspendAndPreempt();
}
}
// runnable, mounted
return Thread.State.RUNNABLE;
case PARKING:
case TIMED_PARKING:
case PINNED:
return State.WAITING;
case TIMED_PARKED:
case TIMED_PINNED:
return State.TIMED_WAITING;
+ case BLOCKING:
+ case BLOCKED:
+ return State.BLOCKED;
case TERMINATED:
return Thread.State.TERMINATED;
default:
throw new InternalError();
}
}
+ /**
+ * Returns true if thread is the designated responsible thread for a monitor.
+ * See objectMonitor.cpp for details.
+ */
+ private boolean isResponsibleForMonitor() {
+ return (recheckInterval > 0);
+ }
+
@Override
boolean alive() {
int s = state;
return (s != NEW && s != TERMINATED);
}
return new StackTraceElement[0]; // unmounted, empty stack
}
case RUNNING, PINNED, TIMED_PINNED -> {
return null; // mounted
}
! case PARKED, TIMED_PARKED -> {
// unmounted, not runnable
}
! case UNPARKED, YIELDED -> {
// unmounted, runnable
}
! case PARKING, TIMED_PARKING, YIELDING -> {
return null; // in transition
}
default -> throw new InternalError("" + initialState);
}
return new StackTraceElement[0]; // unmounted, empty stack
}
case RUNNING, PINNED, TIMED_PINNED -> {
return null; // mounted
}
! case PARKED, TIMED_PARKED, BLOCKED -> {
// unmounted, not runnable
}
! case UNPARKED, UNBLOCKED, YIELDED -> {
// unmounted, runnable
}
! case PARKING, TIMED_PARKING, BLOCKING, YIELDING -> {
return null; // in transition
}
default -> throw new InternalError("" + initialState);
}
} finally {
assert state == suspendedState;
setState(initialState);
}
boolean resubmit = switch (initialState) {
! case UNPARKED, YIELDED -> {
// resubmit as task may have run while suspended
yield true;
}
case PARKED, TIMED_PARKED -> {
// resubmit if unparked while suspended
yield parkPermit && compareAndSetState(initialState, UNPARKED);
}
default -> throw new InternalError();
};
if (resubmit) {
submitRunContinuation();
}
} finally {
assert state == suspendedState;
setState(initialState);
}
boolean resubmit = switch (initialState) {
! case UNPARKED, UNBLOCKED, YIELDED -> {
// resubmit as task may have run while suspended
yield true;
}
case PARKED, TIMED_PARKED -> {
// resubmit if unparked while suspended
yield parkPermit && compareAndSetState(initialState, UNPARKED);
}
+ case BLOCKED -> {
+ // resubmit if unblocked while suspended
+ yield unblocked && compareAndSetState(BLOCKED, UNBLOCKED);
+ }
default -> throw new InternalError();
};
if (resubmit) {
submitRunContinuation();
}
if (!name.isEmpty()) {
sb.append(",");
sb.append(name);
}
sb.append("]/");
! Thread carrier = carrierThread;
! if (carrier != null) {
! // include the carrier thread state and name when mounted
! notifyJvmtiDisableSuspend(true);
try {
synchronized (carrierThreadAccessLock()) {
! carrier = carrierThread;
- if (carrier != null) {
- String stateAsString = carrier.threadState().toString();
- sb.append(stateAsString.toLowerCase(Locale.ROOT));
- sb.append('@');
- sb.append(carrier.getName());
- }
}
} finally {
! notifyJvmtiDisableSuspend(false);
}
}
! // include virtual thread state when not mounted
! if (carrier == null) {
String stateAsString = threadState().toString();
sb.append(stateAsString.toLowerCase(Locale.ROOT));
}
return sb.toString();
}
@Override
public int hashCode() {
return (int) threadId();
}
@Override
public boolean equals(Object obj) {
return obj == this;
}
/**
* Returns the termination object, creating it if needed.
*/
private CountDownLatch getTermination() {
CountDownLatch termination = this.termination;
if (!name.isEmpty()) {
sb.append(",");
sb.append(name);
}
sb.append("]/");
!
! // add the carrier state and thread name when mounted
! boolean mounted;
! if (Thread.currentThread() == this) {
+ mounted = appendCarrierInfo(sb);
+ } else {
+ disableSuspendAndPreempt();
try {
synchronized (carrierThreadAccessLock()) {
! mounted = appendCarrierInfo(sb);
}
} finally {
! enableSuspendAndPreempt();
}
}
!
! // add virtual thread state when not mounted
+ if (!mounted) {
String stateAsString = threadState().toString();
sb.append(stateAsString.toLowerCase(Locale.ROOT));
}
+
return sb.toString();
}
+ /**
+ * Appends the carrier state and thread name to the string buffer if mounted.
+ * @return true if mounted, false if not mounted
+ */
+ private boolean appendCarrierInfo(StringBuilder sb) {
+ assert Thread.currentThread() == this || Thread.holdsLock(carrierThreadAccessLock());
+ Thread carrier = carrierThread;
+ if (carrier != null) {
+ String stateAsString = carrier.threadState().toString();
+ sb.append(stateAsString.toLowerCase(Locale.ROOT));
+ sb.append('@');
+ sb.append(carrier.getName());
+ return true;
+ } else {
+ return false;
+ }
+ }
+
@Override
public int hashCode() {
return (int) threadId();
}
@Override
public boolean equals(Object obj) {
return obj == this;
}
+ /**
+ * Returns a ScheduledExecutorService to execute a delayed task.
+ */
+ private ScheduledExecutorService delayedTaskScheduler() {
+ long tid = Thread.currentThread().threadId();
+ int index = (int) tid & (DELAYED_TASK_SCHEDULERS.length - 1);
+ return DELAYED_TASK_SCHEDULERS[index];
+ }
+
/**
* Returns the termination object, creating it if needed.
*/
private CountDownLatch getTermination() {
CountDownLatch termination = this.termination;
private Object carrierThreadAccessLock() {
// return interruptLock as unmount has to coordinate with interrupt
return interruptLock;
}
+ /**
+ * Disallow the current thread be suspended or preempted.
+ */
+ private void disableSuspendAndPreempt() {
+ notifyJvmtiDisableSuspend(true);
+ Continuation.pin();
+ }
+
+ /**
+ * Allow the current thread be suspended or preempted.
+ */
+ private void enableSuspendAndPreempt() {
+ Continuation.unpin();
+ notifyJvmtiDisableSuspend(false);
+ }
+
// -- wrappers for get/set of state, parking permit, and carrier thread --
private int state() {
return state; // volatile read
}
private boolean compareAndSetState(int expectedValue, int newValue) {
return U.compareAndSetInt(this, STATE, expectedValue, newValue);
}
+ private boolean compareAndSetOnWaitingList(byte expectedValue, byte newValue) {
+ return U.compareAndSetByte(this, ON_WAITING_LIST, expectedValue, newValue);
+ }
+
private void setParkPermit(boolean newValue) {
if (parkPermit != newValue) {
parkPermit = newValue;
}
}
private void setCarrierThread(Thread carrier) {
// U.putReferenceRelease(this, CARRIER_THREAD, carrier);
this.carrierThread = carrier;
}
+ @IntrinsicCandidate
+ private static native void setLockId(long tid);
+
// -- JVM TI support --
@IntrinsicCandidate
@JvmtiMountTransition
private native void notifyJvmtiStart();
};
return AccessController.doPrivileged(pa);
}
/**
! * Creates the ScheduledThreadPoolExecutor used for timed unpark.
*/
! private static ScheduledExecutorService createDelayedTaskScheduler() {
! String propValue = GetPropertyAction.privilegedGetProperty("jdk.unparker.maxPoolSize");
! int poolSize;
if (propValue != null) {
! poolSize = Integer.parseInt(propValue);
} else {
! poolSize = 1;
}
! ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor)
- Executors.newScheduledThreadPool(poolSize, task -> {
- return InnocuousThread.newThread("VirtualThread-unparker", task);
- });
- stpe.setRemoveOnCancelPolicy(true);
- return stpe;
}
/**
! * Reads the value of the jdk.tracePinnedThreads property to determine if stack
! * traces should be printed when a carrier thread is pinned when a virtual thread
- * attempts to park.
*/
! private static int tracePinningMode() {
! String propValue = GetPropertyAction.privilegedGetProperty("jdk.tracePinnedThreads");
! if (propValue != null) {
! if (propValue.length() == 0 || "full".equalsIgnoreCase(propValue))
! return 1;
! if ("short".equalsIgnoreCase(propValue))
! return 2;
}
! return 0;
}
}
};
return AccessController.doPrivileged(pa);
}
/**
! * Invoked by the VM for the Thread.vthread_scheduler diagnostic command.
*/
! private static byte[] printDefaultScheduler() {
! return String.format("%s%n", DEFAULT_SCHEDULER.toString())
! .getBytes(StandardCharsets.UTF_8);
+ }
+
+ /**
+ * Creates the ScheduledThreadPoolExecutors used to execute delayed tasks.
+ */
+ private static ScheduledExecutorService[] createDelayedTaskSchedulers() {
+ String propName = "jdk.virtualThreadScheduler.timerQueues";
+ String propValue = GetPropertyAction.privilegedGetProperty(propName);
+ int queueCount;
if (propValue != null) {
! queueCount = Integer.parseInt(propValue);
+ if (queueCount != Integer.highestOneBit(queueCount)) {
+ throw new RuntimeException("Value of " + propName + " must be power of 2");
+ }
} else {
! int ncpus = Runtime.getRuntime().availableProcessors();
+ queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1);
+ }
+ var schedulers = new ScheduledExecutorService[queueCount];
+ for (int i = 0; i < queueCount; i++) {
+ ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor)
+ Executors.newScheduledThreadPool(1, task -> {
+ Thread t = InnocuousThread.newThread("VirtualThread-unparker", task);
+ t.setDaemon(true);
+ return t;
+ });
+ stpe.setRemoveOnCancelPolicy(true);
+ schedulers[i] = stpe;
}
! return schedulers;
}
/**
! * Schedule virtual threads that are ready to be scheduled after they blocked on
! * monitor enter.
*/
! private static void unblockVirtualThreads() {
! while (true) {
! VirtualThread vthread = takeVirtualThreadListToUnblock();
! while (vthread != null) {
! assert vthread.onWaitingList == 1;
! VirtualThread nextThread = vthread.next;
!
+ // remove from list and unblock
+ vthread.next = null;
+ boolean changed = vthread.compareAndSetOnWaitingList((byte) 1, (byte) 0);
+ assert changed;
+ vthread.unblock();
+
+ vthread = nextThread;
+ }
}
! }
+
+ /**
+ * Retrieves the list of virtual threads that are waiting to be unblocked, waiting
+ * if necessary until a list of one or more threads becomes available.
+ */
+ private static native VirtualThread takeVirtualThreadListToUnblock();
+
+ static {
+ var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
+ VirtualThread::unblockVirtualThreads);
+ unblocker.setDaemon(true);
+ unblocker.start();
}
}
< prev index next >