< prev index next > src/java.base/share/classes/java/lang/VirtualThread.java
Print this page
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.lang;
+ import java.lang.reflect.Constructor;
import java.security.AccessController;
import java.security.PrivilegedAction;
+ import java.util.Arrays;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import jdk.internal.event.VirtualThreadEndEvent;
- import jdk.internal.event.VirtualThreadPinnedEvent;
import jdk.internal.event.VirtualThreadStartEvent;
import jdk.internal.event.VirtualThreadSubmitFailedEvent;
import jdk.internal.misc.CarrierThread;
import jdk.internal.misc.InnocuousThread;
import jdk.internal.misc.Unsafe;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+ import java.util.stream.Stream;
import jdk.internal.event.VirtualThreadEndEvent;
import jdk.internal.event.VirtualThreadStartEvent;
import jdk.internal.event.VirtualThreadSubmitFailedEvent;
import jdk.internal.misc.CarrierThread;
import jdk.internal.misc.InnocuousThread;
import jdk.internal.misc.Unsafe;
* A thread that is scheduled by the Java virtual machine rather than the operating system.
*/
final class VirtualThread extends BaseVirtualThread {
private static final Unsafe U = Unsafe.getUnsafe();
private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
! private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
private static final ScheduledExecutorService[] DELAYED_TASK_SCHEDULERS = createDelayedTaskSchedulers();
- private static final int TRACE_PINNING_MODE = tracePinningMode();
private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
// scheduler and continuation
private final Executor scheduler;
private final Continuation cont;
private final Runnable runContinuation;
* A thread that is scheduled by the Java virtual machine rather than the operating system.
*/
final class VirtualThread extends BaseVirtualThread {
private static final Unsafe U = Unsafe.getUnsafe();
private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
! private static final Executor DEFAULT_SCHEDULER = createDefaultScheduler();
private static final ScheduledExecutorService[] DELAYED_TASK_SCHEDULERS = createDelayedTaskSchedulers();
private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
+ private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
// scheduler and continuation
private final Executor scheduler;
private final Continuation cont;
private final Runnable runContinuation;
* TIMED_PARKING -> TIMED_PARKED // cont.yield successful, timed-parked
* TIMED_PARKING -> TIMED_PINNED // cont.yield failed, timed-parked on carrier
* TIMED_PARKED -> UNPARKED // unparked, may be scheduled to continue
* TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier
*
+ * RUNNING -> BLOCKING // blocking on monitor enter
+ * BLOCKING -> BLOCKED // blocked on monitor enter
+ * BLOCKED -> UNBLOCKED // unblocked, may be scheduled to continue
+ * UNBLOCKED -> RUNNING // continue execution after blocked on monitor enter
+ *
+ * RUNNING -> WAITING // transitional state during wait on monitor
+ * WAITING -> WAITED // waiting on monitor
+ * WAITED -> BLOCKED // notified, waiting to be unblocked by monitor owner
+ * WAITED -> UNBLOCKED // timed-out/interrupted
+ *
+ * RUNNING -> TIMED_WAITING // transition state during timed-waiting on monitor
+ * TIMED_WAITING -> TIMED_WAITED // timed-waiting on monitor
+ * TIMED_WAITED -> BLOCKED // notified, waiting to be unblocked by monitor owner
+ * TIMED_WAITED -> UNBLOCKED // timed-out/interrupted
+ *
* RUNNING -> YIELDING // Thread.yield
* YIELDING -> YIELDED // cont.yield successful, may be scheduled to continue
* YIELDING -> RUNNING // cont.yield failed
* YIELDED -> RUNNING // continue execution after Thread.yield
*/
// Thread.yield
private static final int YIELDING = 10;
private static final int YIELDED = 11; // unmounted but runnable
+ // monitor enter
+ private static final int BLOCKING = 12;
+ private static final int BLOCKED = 13; // unmounted
+ private static final int UNBLOCKED = 14; // unmounted but runnable
+
+ // monitor wait/timed-wait
+ private static final int WAITING = 15;
+ private static final int WAIT = 16; // waiting in Object.wait
+ private static final int TIMED_WAITING = 17;
+ private static final int TIMED_WAIT = 18; // waiting in timed-Object.wait
+
private static final int TERMINATED = 99; // final state
// can be suspended from scheduling when unmounted
private static final int SUSPENDED = 1 << 8;
// parking permit
private volatile boolean parkPermit;
+ // used to mark thread as ready to be unblocked
+ private volatile boolean unblocked;
+
+ // notified by Object.notify/notifyAll while waiting in Object.wait
+ private volatile boolean notified;
+
+ // timed-wait support
+ private long waitTimeout;
+ private byte timedWaitNonce;
+ private volatile Future<?> waitTimeoutTask;
+
+ // a positive value if "responsible thread" blocked on monitor enter, accessed by VM
+ private volatile byte recheckInterval;
+
// carrier thread when mounted, accessed by VM
private volatile Thread carrierThread;
// termination object when joining, created lazily if needed
private volatile CountDownLatch termination;
+ // has the value 1 when on the list of virtual threads waiting to be unblocked
+ private volatile byte onWaitingList;
+
+ // next virtual thread on the list of virtual threads waiting to be unblocked
+ private volatile VirtualThread next;
+
+ /**
+ * Returns the default scheduler.
+ */
+ static Executor defaultScheduler() {
+ return DEFAULT_SCHEDULER;
+ }
+
+ /**
+ * Returns a stream of the delayed task schedulers used to support timed operations.
+ */
+ static Stream<ScheduledExecutorService> delayedTaskSchedulers() {
+ return Arrays.stream(DELAYED_TASK_SCHEDULERS);
+ }
+
/**
* Returns the continuation scope used for virtual threads.
*/
static ContinuationScope continuationScope() {
return VTHREAD_SCOPE;
VThreadContinuation(VirtualThread vthread, Runnable task) {
super(VTHREAD_SCOPE, wrap(vthread, task));
}
@Override
protected void onPinned(Continuation.Pinned reason) {
! if (TRACE_PINNING_MODE > 0) {
! boolean printAll = (TRACE_PINNING_MODE == 1);
- VirtualThread vthread = (VirtualThread) Thread.currentThread();
- int oldState = vthread.state();
- try {
- // avoid printing when in transition states
- vthread.setState(RUNNING);
- PinnedThreadPrinter.printStackTrace(System.out, reason, printAll);
- } finally {
- vthread.setState(oldState);
- }
- }
}
private static Runnable wrap(VirtualThread vthread, Runnable task) {
return new Runnable() {
@Hidden
public void run() {
VThreadContinuation(VirtualThread vthread, Runnable task) {
super(VTHREAD_SCOPE, wrap(vthread, task));
}
@Override
protected void onPinned(Continuation.Pinned reason) {
! // emit JFR event
! virtualThreadPinnedEvent(reason.reasonCode(), reason.reasonString());
}
private static Runnable wrap(VirtualThread vthread, Runnable task) {
return new Runnable() {
@Hidden
public void run() {
}
};
}
}
+ /**
+ * jdk.VirtualThreadPinned is emitted by HotSpot VM when pinned. Call into VM to
+ * emit event to avoid having a JFR event in Java with the same name (but different ID)
+ * to events emitted by the VM.
+ */
+ private static native void virtualThreadPinnedEvent(int reason, String reasonString);
+
/**
* Runs or continues execution on the current thread. The virtual thread is mounted
* on the current thread before the task runs or continues. It unmounts when the
* task completes or yields.
*/
throw new WrongThreadException();
}
// set state to RUNNING
int initialState = state();
! if (initialState == STARTED || initialState == UNPARKED || initialState == YIELDED) {
// newly started or continue after parking/blocking/Thread.yield
if (!compareAndSetState(initialState, RUNNING)) {
return;
}
// consume parking permit when continuing after parking
throw new WrongThreadException();
}
// set state to RUNNING
int initialState = state();
! if (initialState == STARTED || initialState == UNPARKED
+ || initialState == UNBLOCKED || initialState == YIELDED) {
// newly started or continue after parking/blocking/Thread.yield
if (!compareAndSetState(initialState, RUNNING)) {
return;
}
// consume parking permit when continuing after parking
notifyJvmtiHideFrames(true);
Thread carrier = this.carrierThread;
assert Thread.currentThread() == this
&& carrier == Thread.currentCarrierThread();
carrier.setCurrentThread(carrier);
+ Thread.setCurrentLockId(this.threadId()); // keep lock ID of virtual thread
}
/**
* Sets the current thread to the given virtual thread.
*/
submitRunContinuation();
}
return;
}
+ // blocking on monitorenter
+ if (s == BLOCKING) {
+ setState(BLOCKED);
+
+ // may have been unblocked while blocking
+ if (unblocked && compareAndSetState(BLOCKED, UNBLOCKED)) {
+ unblocked = false;
+ submitRunContinuation();
+ return;
+ }
+
+ // if thread is the designated responsible thread for a monitor then schedule
+ // it to wakeup so that it can check and recover. See objectMonitor.cpp.
+ int recheckInterval = this.recheckInterval;
+ if (recheckInterval > 0) {
+ assert recheckInterval >= 1 && recheckInterval <= 6;
+ // 4 ^ (recheckInterval - 1) = 1, 4, 16, ... 1024
+ long delay = 1 << (recheckInterval - 1) << (recheckInterval - 1);
+ schedule(this::unblock, delay, MILLISECONDS);
+ }
+ return;
+ }
+
+ // Object.wait
+ if (s == WAITING || s == TIMED_WAITING) {
+ byte nonce;
+ int newState;
+ if (s == WAITING) {
+ nonce = 0; // not used
+ setState(newState = WAIT);
+ } else {
+ // synchronize with timeout task (previous timed-wait may be running)
+ synchronized (timedWaitLock()) {
+ nonce = ++timedWaitNonce;
+ setState(newState = TIMED_WAIT);
+ }
+ }
+
+ // may have been notified while in transition to wait state
+ if (notified && compareAndSetState(newState, BLOCKED)) {
+ // may have even been unblocked already
+ if (unblocked && compareAndSetState(BLOCKED, UNBLOCKED)) {
+ unblocked = false;
+ submitRunContinuation();
+ }
+ return;
+ }
+
+ // may have been interrupted while in transition to wait state
+ if (interrupted && compareAndSetState(newState, UNBLOCKED)) {
+ submitRunContinuation();
+ return;
+ }
+
+ // schedule wakeup
+ if (newState == TIMED_WAIT) {
+ assert waitTimeout > 0;
+ waitTimeoutTask = schedule(() -> waitTimeoutExpired(nonce), waitTimeout, MILLISECONDS);
+ }
+ return;
+ }
+
assert false;
}
/**
* Invoked after the continuation completes.
// park the thread
boolean yielded = false;
setState(PARKING);
try {
! yielded = yieldContinuation(); // may throw
} finally {
assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
if (!yielded) {
assert state() == PARKING;
setState(RUNNING);
// park the thread
boolean yielded = false;
setState(PARKING);
try {
! yielded = yieldContinuation();
+ } catch (OutOfMemoryError e) {
+ // park on carrier
} finally {
assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
if (!yielded) {
assert state() == PARKING;
setState(RUNNING);
// park the thread for the waiting time
if (nanos > 0) {
long startTime = System.nanoTime();
boolean yielded = false;
! Future<?> unparker = scheduleUnpark(nanos); // may throw OOME
- setState(TIMED_PARKING);
try {
! yielded = yieldContinuation(); // may throw
! } finally {
! assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
! if (!yielded) {
! assert state() == TIMED_PARKING;
! setState(RUNNING);
}
- cancel(unparker);
}
! // park on carrier thread for remaining time when pinned
if (!yielded) {
long remainingNanos = nanos - (System.nanoTime() - startTime);
parkOnCarrierThread(true, remainingNanos);
}
}
// park the thread for the waiting time
if (nanos > 0) {
long startTime = System.nanoTime();
boolean yielded = false;
! Future<?> unparker = null;
try {
! unparker = scheduleUnpark(nanos);
! } catch (OutOfMemoryError e) {
! // park on carrier
! }
! if (unparker != null) {
! setState(TIMED_PARKING);
+ try {
+ yielded = yieldContinuation();
+ } catch (OutOfMemoryError e) {
+ // park on carrier
+ } finally {
+ assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
+ if (!yielded) {
+ assert state() == TIMED_PARKING;
+ setState(RUNNING);
+ }
+ cancel(unparker);
}
}
! // park on carrier thread for remaining time when pinned (or OOME)
if (!yielded) {
long remainingNanos = nanos - (System.nanoTime() - startTime);
parkOnCarrierThread(true, remainingNanos);
}
}
* @param nanos the waiting time in nanoseconds
*/
private void parkOnCarrierThread(boolean timed, long nanos) {
assert state() == RUNNING;
- VirtualThreadPinnedEvent event;
- try {
- event = new VirtualThreadPinnedEvent();
- event.begin();
- } catch (OutOfMemoryError e) {
- event = null;
- }
-
setState(timed ? TIMED_PINNED : PINNED);
try {
if (!parkPermit) {
if (!timed) {
U.park(false, 0);
setState(RUNNING);
}
// consume parking permit
setParkPermit(false);
-
- if (event != null) {
- try {
- event.commit();
- } catch (OutOfMemoryError e) {
- // ignore
- }
- }
}
/**
! * Schedule this virtual thread to be unparked after a given delay.
*/
@ChangesCurrentThread
private Future<?> scheduleUnpark(long nanos) {
assert Thread.currentThread() == this;
// need to switch to current carrier thread to avoid nested parking
setState(RUNNING);
}
// consume parking permit
setParkPermit(false);
}
/**
! * Invoked by parkNanos to schedule this virtual thread to be unparked after
+ * a given delay.
*/
@ChangesCurrentThread
private Future<?> scheduleUnpark(long nanos) {
assert Thread.currentThread() == this;
// need to switch to current carrier thread to avoid nested parking
switchToVirtualThread(this);
}
}
/**
! * Cancels a task if it has not completed.
*/
@ChangesCurrentThread
private void cancel(Future<?> future) {
assert Thread.currentThread() == this;
if (!future.isDone()) {
switchToVirtualThread(this);
}
}
/**
! * Invoked by parkNanos to cancel the unpark timer.
*/
@ChangesCurrentThread
private void cancel(Future<?> future) {
assert Thread.currentThread() == this;
if (!future.isDone()) {
return;
}
}
}
+ /**
+ * Invoked by unblocker thread to unblock this virtual thread.
+ */
+ private void unblock() {
+ assert !Thread.currentThread().isVirtual();
+ unblocked = true;
+ if (state() == BLOCKED && compareAndSetState(BLOCKED, UNBLOCKED)) {
+ unblocked = false;
+ submitRunContinuation();
+ }
+ }
+
+ /**
+ * Invoked by timer thread when wait timeout for virtual thread has expired.
+ * If the virtual thread is in timed-wait then this method will unblock the thread
+ * and submit its task so that it continues and attempts to reenter the monitor.
+ * This method does nothing if the thread has been woken by notify or interrupt.
+ */
+ private void waitTimeoutExpired(byte nounce) {
+ assert !Thread.currentThread().isVirtual();
+ for (;;) {
+ boolean unblocked = false;
+ synchronized (timedWaitLock()) {
+ if (nounce != timedWaitNonce) {
+ // this timeout task is for a past timed-wait
+ return;
+ }
+ int s = state();
+ if (s == TIMED_WAIT) {
+ unblocked = compareAndSetState(TIMED_WAIT, UNBLOCKED);
+ } else if (s != (TIMED_WAIT | SUSPENDED)) {
+ // notified or interrupted, no longer waiting
+ return;
+ }
+ }
+ if (unblocked) {
+ submitRunContinuation();
+ return;
+ }
+ // need to retry when thread is suspended in time-wait
+ Thread.yield();
+ }
+ }
+
+ /**
+ * Invoked by Object.wait to cancel the wait timer.
+ */
+ void cancelWaitTimeout() {
+ assert Thread.currentThread() == this;
+ Future<?> timeoutTask = this.waitTimeoutTask;
+ if (timeoutTask != null) {
+ // Pin the continuation to prevent the virtual thread from unmounting
+ // when there is contention removing the task. This avoids deadlock that
+ // could arise due to carriers and virtual threads contending for a
+ // lock on the delay queue.
+ Continuation.pin();
+ try {
+ timeoutTask.cancel(false);
+ } finally {
+ Continuation.unpin();
+ }
+ }
+ }
+
/**
* Attempts to yield the current virtual thread (Thread.yield).
*/
void tryYield() {
assert Thread.currentThread() == this;
}
// make available parking permit, unpark thread if parked
unpark();
+ // if thread is waiting in Object.wait then schedule to try to reenter
+ int s = state();
+ if ((s == WAIT || s == TIMED_WAIT) && compareAndSetState(s, UNBLOCKED)) {
+ submitRunContinuation();
+ }
+
} else {
interrupted = true;
carrierThread.setInterrupt();
setParkPermit(true);
}
}
case UNPARKED:
case YIELDED:
// runnable, not mounted
return Thread.State.RUNNABLE;
+ case UNBLOCKED:
+ // if designated responsible thread for monitor then thread is blocked
+ if (isResponsibleForMonitor()) {
+ return Thread.State.BLOCKED;
+ } else {
+ return Thread.State.RUNNABLE;
+ }
case RUNNING:
+ // if designated responsible thread for monitor then thread is blocked
+ if (isResponsibleForMonitor()) {
+ return Thread.State.BLOCKED;
+ }
// if mounted then return state of carrier thread
if (Thread.currentThread() != this) {
disableSuspendAndPreempt();
try {
synchronized (carrierThreadAccessLock()) {
// runnable, mounted
return Thread.State.RUNNABLE;
case PARKING:
case TIMED_PARKING:
case YIELDING:
+ case WAITING:
+ case TIMED_WAITING:
// runnable, in transition
return Thread.State.RUNNABLE;
case PARKED:
case PINNED:
+ case WAIT:
return State.WAITING;
case TIMED_PARKED:
case TIMED_PINNED:
+ case TIMED_WAIT:
return State.TIMED_WAITING;
+ case BLOCKING:
+ case BLOCKED:
+ return State.BLOCKED;
case TERMINATED:
return Thread.State.TERMINATED;
default:
throw new InternalError();
}
}
+ /**
+ * Returns true if thread is the designated responsible thread for a monitor.
+ * See objectMonitor.cpp for details.
+ */
+ private boolean isResponsibleForMonitor() {
+ return (recheckInterval > 0);
+ }
+
@Override
boolean alive() {
int s = state;
return (s != NEW && s != TERMINATED);
}
return new StackTraceElement[0]; // unmounted, empty stack
}
case RUNNING, PINNED, TIMED_PINNED -> {
return null; // mounted
}
! case PARKED, TIMED_PARKED -> {
// unmounted, not runnable
}
! case UNPARKED, YIELDED -> {
// unmounted, runnable
}
! case PARKING, TIMED_PARKING, YIELDING -> {
return null; // in transition
}
default -> throw new InternalError("" + initialState);
}
return new StackTraceElement[0]; // unmounted, empty stack
}
case RUNNING, PINNED, TIMED_PINNED -> {
return null; // mounted
}
! case PARKED, TIMED_PARKED, BLOCKED, WAIT, TIMED_WAIT -> {
// unmounted, not runnable
}
! case UNPARKED, UNBLOCKED, YIELDED -> {
// unmounted, runnable
}
! case PARKING, TIMED_PARKING, BLOCKING, YIELDING, WAITING, TIMED_WAITING -> {
return null; // in transition
}
default -> throw new InternalError("" + initialState);
}
} finally {
assert state == suspendedState;
setState(initialState);
}
boolean resubmit = switch (initialState) {
! case UNPARKED, YIELDED -> {
// resubmit as task may have run while suspended
yield true;
}
case PARKED, TIMED_PARKED -> {
// resubmit if unparked while suspended
yield parkPermit && compareAndSetState(initialState, UNPARKED);
}
default -> throw new InternalError();
};
if (resubmit) {
submitRunContinuation();
}
} finally {
assert state == suspendedState;
setState(initialState);
}
boolean resubmit = switch (initialState) {
! case UNPARKED, UNBLOCKED, YIELDED -> {
// resubmit as task may have run while suspended
yield true;
}
case PARKED, TIMED_PARKED -> {
// resubmit if unparked while suspended
yield parkPermit && compareAndSetState(initialState, UNPARKED);
}
+ case BLOCKED -> {
+ // resubmit if unblocked while suspended or the responsible thread for a monitor
+ yield (unblocked || isResponsibleForMonitor())
+ && compareAndSetState(BLOCKED, UNBLOCKED);
+ }
+ case WAIT, TIMED_WAIT -> {
+ // resubmit if notified or interrupted while waiting (Object.wait)
+ // waitTimeoutExpired will retry if the timed expired when suspended
+ yield (notified || interrupted) && compareAndSetState(initialState, UNBLOCKED);
+ }
default -> throw new InternalError();
};
if (resubmit) {
submitRunContinuation();
}
private Object carrierThreadAccessLock() {
// return interruptLock as unmount has to coordinate with interrupt
return interruptLock;
}
+ /**
+ * Returns a lock object to coordinating timed-wait setup and timeout handling.
+ */
+ private Object timedWaitLock() {
+ // use this object for now to avoid the overhead of introducing another lock
+ return runContinuation;
+ }
+
/**
* Disallow the current thread be suspended or preempted.
*/
private void disableSuspendAndPreempt() {
notifyJvmtiDisableSuspend(true);
private boolean compareAndSetState(int expectedValue, int newValue) {
return U.compareAndSetInt(this, STATE, expectedValue, newValue);
}
+ private boolean compareAndSetOnWaitingList(byte expectedValue, byte newValue) {
+ return U.compareAndSetByte(this, ON_WAITING_LIST, expectedValue, newValue);
+ }
+
private void setParkPermit(boolean newValue) {
if (parkPermit != newValue) {
parkPermit = newValue;
}
}
static {
registerNatives();
// ensure VTHREAD_GROUP is created, may be accessed by JVMTI
var group = Thread.virtualThreadGroup();
! // ensure VirtualThreadPinnedEvent is loaded/initialized
! U.ensureClassInitialized(VirtualThreadPinnedEvent.class);
}
/**
* Creates the default ForkJoinPool scheduler.
*/
@SuppressWarnings("removal")
! private static ForkJoinPool createDefaultScheduler() {
ForkJoinWorkerThreadFactory factory = pool -> {
PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread(pool);
return AccessController.doPrivileged(pa);
};
PrivilegedAction<ForkJoinPool> pa = () -> {
static {
registerNatives();
// ensure VTHREAD_GROUP is created, may be accessed by JVMTI
var group = Thread.virtualThreadGroup();
+ }
! /**
! * Creates the default scheduler.
+ * If the system property {@code jdk.virtualThreadScheduler.implClass} is set then
+ * its value is the name of a class that implements java.util.concurrent.Executor.
+ * The class is public in an exported package, has a public no-arg constructor,
+ * and is visible to the system class loader.
+ * If the system property is not set then the default scheduler will be a
+ * ForkJoinPool instance.
+ */
+ private static Executor createDefaultScheduler() {
+ String propName = "jdk.virtualThreadScheduler.implClass";
+ String propValue = GetPropertyAction.privilegedGetProperty(propName);
+ if (propValue != null) {
+ try {
+ Class<?> clazz = Class.forName(propValue, true,
+ ClassLoader.getSystemClassLoader());
+ Constructor<?> ctor = clazz.getConstructor();
+ var scheduler = (Executor) ctor.newInstance();
+ System.err.println("""
+ WARNING: Using custom scheduler, this is an experimental feature.""");
+ return scheduler;
+ } catch (Exception ex) {
+ throw new Error(ex);
+ }
+ } else {
+ return createDefaultForkJoinPoolScheduler();
+ }
}
/**
* Creates the default ForkJoinPool scheduler.
*/
@SuppressWarnings("removal")
! private static ForkJoinPool createDefaultForkJoinPoolScheduler() {
ForkJoinWorkerThreadFactory factory = pool -> {
PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread(pool);
return AccessController.doPrivileged(pa);
};
PrivilegedAction<ForkJoinPool> pa = () -> {
}
return schedulers;
}
/**
! * Reads the value of the jdk.tracePinnedThreads property to determine if stack
! * traces should be printed when a carrier thread is pinned when a virtual thread
- * attempts to park.
*/
! private static int tracePinningMode() {
! String propValue = GetPropertyAction.privilegedGetProperty("jdk.tracePinnedThreads");
! if (propValue != null) {
! if (propValue.length() == 0 || "full".equalsIgnoreCase(propValue))
! return 1;
! if ("short".equalsIgnoreCase(propValue))
! return 2;
}
! return 0;
}
}
}
return schedulers;
}
/**
! * Schedule virtual threads that are ready to be scheduled after they blocked on
! * monitor enter.
*/
! private static void unblockVirtualThreads() {
! while (true) {
! VirtualThread vthread = takeVirtualThreadListToUnblock();
! while (vthread != null) {
! assert vthread.onWaitingList == 1;
! VirtualThread nextThread = vthread.next;
!
+ // remove from list and unblock
+ vthread.next = null;
+ boolean changed = vthread.compareAndSetOnWaitingList((byte) 1, (byte) 0);
+ assert changed;
+ vthread.unblock();
+
+ vthread = nextThread;
+ }
}
! }
+
+ /**
+ * Retrieves the list of virtual threads that are waiting to be unblocked, waiting
+ * if necessary until a list of one or more threads becomes available.
+ */
+ private static native VirtualThread takeVirtualThreadListToUnblock();
+
+ static {
+ var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
+ VirtualThread::unblockVirtualThreads);
+ unblocker.setDaemon(true);
+ unblocker.start();
}
}
< prev index next >