< prev index next > src/java.base/share/classes/java/lang/VirtualThread.java
Print this page
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.lang;
+ import java.lang.reflect.Constructor;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
* A thread that is scheduled by the Java virtual machine rather than the operating system.
*/
final class VirtualThread extends BaseVirtualThread {
private static final Unsafe U = Unsafe.getUnsafe();
private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
! private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
* A thread that is scheduled by the Java virtual machine rather than the operating system.
*/
final class VirtualThread extends BaseVirtualThread {
private static final Unsafe U = Unsafe.getUnsafe();
private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
! private static final Executor DEFAULT_SCHEDULER = createDefaultScheduler();
private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
private volatile VirtualThread next;
// notified by Object.notify/notifyAll while waiting in Object.wait
private volatile boolean notified;
+ // true when virtual thread is executing Java level Object.wait, false on VM internal Object.wait
+ private volatile boolean interruptableWait;
+
// timed-wait support
private byte timedWaitSeqNo;
// timeout for timed-park and timed-wait, only accessed on current/carrier thread
private long timeout;
}
// Object.wait
if (s == WAITING || s == TIMED_WAITING) {
int newState;
+ boolean interruptable = interruptableWait;
if (s == WAITING) {
setState(newState = WAIT);
} else {
// For timed-wait, a timeout task is scheduled to execute. The timeout
// task will change the thread state to UNBLOCKED and submit the thread
}
return;
}
// may have been interrupted while in transition to wait state
! if (interrupted && compareAndSetState(newState, UNBLOCKED)) {
submitRunContinuation();
return;
}
return;
}
}
return;
}
// may have been interrupted while in transition to wait state
! if (interruptable && interrupted && compareAndSetState(newState, UNBLOCKED)) {
submitRunContinuation();
return;
}
return;
}
termination.countDown();
}
// notify container
if (notifyContainer) {
! threadContainer().onExit(this);
}
// clear references to thread locals
clearReferences();
}
termination.countDown();
}
// notify container
if (notifyContainer) {
! threadContainer().remove(this);
}
// clear references to thread locals
clearReferences();
}
// start thread
boolean addedToContainer = false;
boolean started = false;
try {
! container.onStart(this); // may throw
addedToContainer = true;
// scoped values may be inherited
inheritScopedValueBindings(container);
// start thread
boolean addedToContainer = false;
boolean started = false;
try {
! container.add(this); // may throw
addedToContainer = true;
// scoped values may be inherited
inheritScopedValueBindings(container);
notifyJvmtiDisableSuspend(false);
}
// -- wrappers for get/set of state, parking permit, and carrier thread --
! private int state() {
return state; // volatile read
}
private void setState(int newValue) {
state = newValue; // volatile write
notifyJvmtiDisableSuspend(false);
}
// -- wrappers for get/set of state, parking permit, and carrier thread --
! int state() {
return state; // volatile read
}
private void setState(int newValue) {
state = newValue; // volatile write
} else {
return newValue;
}
}
+ Thread carrierThread() {
+ return carrierThread;
+ }
+
private void setCarrierThread(Thread carrier) {
// U.putReferenceRelease(this, CARRIER_THREAD, carrier);
this.carrierThread = carrier;
}
// ensure VTHREAD_GROUP is created, may be accessed by JVMTI
var group = Thread.virtualThreadGroup();
}
/**
* Creates the default ForkJoinPool scheduler.
*/
! private static ForkJoinPool createDefaultScheduler() {
ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool);
int parallelism, maxPoolSize, minRunnable;
String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
// ensure VTHREAD_GROUP is created, may be accessed by JVMTI
var group = Thread.virtualThreadGroup();
}
+ /**
+ * Creates the default scheduler.
+ * If the system property {@code jdk.virtualThreadScheduler.implClass} is set then
+ * its value is the name of a class that implements java.util.concurrent.Executor.
+ * The class is public in an exported package, has a public no-arg constructor,
+ * and is visible to the system class loader.
+ * If the system property is not set then the default scheduler will be a
+ * ForkJoinPool instance.
+ */
+ private static Executor createDefaultScheduler() {
+ String propValue = System.getProperty("jdk.virtualThreadScheduler.implClass");
+ if (propValue != null) {
+ try {
+ Class<?> clazz = Class.forName(propValue, true,
+ ClassLoader.getSystemClassLoader());
+ Constructor<?> ctor = clazz.getConstructor();
+ var scheduler = (Executor) ctor.newInstance();
+ System.err.println("""
+ WARNING: Using custom scheduler, this is an experimental feature.""");
+ return scheduler;
+ } catch (Exception ex) {
+ throw new Error(ex);
+ }
+ } else {
+ return createDefaultForkJoinPoolScheduler();
+ }
+ }
+
/**
* Creates the default ForkJoinPool scheduler.
*/
! private static ForkJoinPool createDefaultForkJoinPoolScheduler() {
ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool);
int parallelism, maxPoolSize, minRunnable;
String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
VirtualThread::unblockVirtualThreads);
unblocker.setDaemon(true);
unblocker.start();
}
! }
var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
VirtualThread::unblockVirtualThreads);
unblocker.setDaemon(true);
unblocker.start();
}
! }
< prev index next >