< prev index next > src/java.base/share/classes/java/lang/VirtualThread.java
Print this page
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.lang;
+ import java.lang.invoke.MethodHandles;
+ import java.lang.invoke.VarHandle;
+ import java.lang.reflect.Constructor;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
- import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import jdk.internal.event.VirtualThreadEndEvent;
+ import jdk.internal.event.VirtualThreadParkEvent;
import jdk.internal.event.VirtualThreadStartEvent;
import jdk.internal.event.VirtualThreadSubmitFailedEvent;
+ import jdk.internal.invoke.MhUtil;
import jdk.internal.misc.CarrierThread;
import jdk.internal.misc.InnocuousThread;
import jdk.internal.misc.Unsafe;
import jdk.internal.vm.Continuation;
import jdk.internal.vm.ContinuationScope;
* A thread that is scheduled by the Java virtual machine rather than the operating system.
*/
final class VirtualThread extends BaseVirtualThread {
private static final Unsafe U = Unsafe.getUnsafe();
private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
- private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
+
+ private static final BuiltinScheduler BUILTIN_SCHEDULER;
+ private static final VirtualThreadScheduler DEFAULT_SCHEDULER;
+ private static final VirtualThreadScheduler EXTERNAL_VIEW;
+ static {
+ // experimental
+ String propValue = System.getProperty("jdk.virtualThreadScheduler.implClass");
+ if (propValue != null) {
+ BuiltinScheduler builtinScheduler = createBuiltinScheduler(true);
+ VirtualThreadScheduler externalView = builtinScheduler.createExternalView();
+ VirtualThreadScheduler defaultScheduler = loadCustomScheduler(externalView, propValue);
+ BUILTIN_SCHEDULER = builtinScheduler;
+ DEFAULT_SCHEDULER = defaultScheduler;
+ EXTERNAL_VIEW = externalView;
+ } else {
+ var builtinScheduler = createBuiltinScheduler(false);
+ BUILTIN_SCHEDULER = builtinScheduler;
+ DEFAULT_SCHEDULER = builtinScheduler;
+ EXTERNAL_VIEW = builtinScheduler.createExternalView();
+ }
+ }
private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
// scheduler and continuation
- private final Executor scheduler;
+ private final VirtualThreadScheduler scheduler;
private final Continuation cont;
- private final Runnable runContinuation;
+ private final VirtualThreadTask runContinuation;
// virtual thread state, accessed by VM
private volatile int state;
/*
// termination object when joining, created lazily if needed
private volatile CountDownLatch termination;
/**
- * Returns the default scheduler.
+ * Return the built-in scheduler.
+ */
+ static VirtualThreadScheduler builtinScheduler() {
+ return BUILTIN_SCHEDULER;
+ }
+
+ /**
+ * Returns the default scheduler, usually the same as the built-in scheduler.
*/
- static Executor defaultScheduler() {
+ static VirtualThreadScheduler defaultScheduler() {
return DEFAULT_SCHEDULER;
}
/**
* Returns the continuation scope used for virtual threads.
static ContinuationScope continuationScope() {
return VTHREAD_SCOPE;
}
/**
- * Creates a new {@code VirtualThread} to run the given task with the given
- * scheduler. If the given scheduler is {@code null} and the current thread
- * is a platform thread then the newly created virtual thread will use the
- * default scheduler. If given scheduler is {@code null} and the current
- * thread is a virtual thread then the current thread's scheduler is used.
+ * Return the scheduler for this thread.
+ * @param trusted true if caller is trusted, false if not trusted
+ */
+ VirtualThreadScheduler scheduler(boolean trusted) {
+ if (scheduler == BUILTIN_SCHEDULER && !trusted) {
+ return EXTERNAL_VIEW;
+ } else {
+ return scheduler;
+ }
+ }
+
+ /**
+ * Creates a new {@code VirtualThread} to run the given task with the given scheduler.
*
- * @param scheduler the scheduler or null
+ * @param scheduler the scheduler or null for default scheduler
+ * @param preferredCarrier the preferred carrier or null
* @param name thread name
* @param characteristics characteristics
* @param task the task to execute
*/
- VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
+ VirtualThread(VirtualThreadScheduler scheduler,
+ Thread preferredCarrier,
+ String name,
+ int characteristics,
+ Runnable task,
+ Object att) {
super(name, characteristics, /*bound*/ false);
Objects.requireNonNull(task);
- // choose scheduler if not specified
+ // use default scheduler if not provided
if (scheduler == null) {
- Thread parent = Thread.currentThread();
- if (parent instanceof VirtualThread vparent) {
- scheduler = vparent.scheduler;
- } else {
- scheduler = DEFAULT_SCHEDULER;
- }
+ scheduler = DEFAULT_SCHEDULER;
+ } else if (scheduler == EXTERNAL_VIEW) {
+ throw new UnsupportedOperationException();
}
-
this.scheduler = scheduler;
this.cont = new VThreadContinuation(this, task);
- this.runContinuation = this::runContinuation;
+
+ if (scheduler == BUILTIN_SCHEDULER) {
+ this.runContinuation = new BuiltinSchedulerTask(this);
+ } else {
+ this.runContinuation = new CustomSchedulerTask(this, preferredCarrier, att);
+ }
+ }
+
+ /**
+ * The task to execute when using the built-in scheduler.
+ */
+ static final class BuiltinSchedulerTask implements VirtualThreadTask {
+ private final VirtualThread vthread;
+ BuiltinSchedulerTask(VirtualThread vthread) {
+ this.vthread = vthread;
+ }
+ @Override
+ public Thread thread() {
+ return vthread;
+ }
+ @Override
+ public void run() {
+ vthread.runContinuation();;
+ }
+ @Override
+ public Thread preferredCarrier() {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public Object attach(Object att) {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public Object attachment() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ /**
+ * The task to execute when using a custom scheduler.
+ */
+ static final class CustomSchedulerTask implements VirtualThreadTask {
+ private static final VarHandle ATT =
+ MhUtil.findVarHandle(MethodHandles.lookup(), "att", Object.class);
+ private final VirtualThread vthread;
+ private final Thread preferredCarrier;
+ private volatile Object att;
+ CustomSchedulerTask(VirtualThread vthread, Thread preferredCarrier, Object att) {
+ this.vthread = vthread;
+ this.preferredCarrier = preferredCarrier;
+ if (att != null) {
+ this.att = att;
+ }
+ }
+ @Override
+ public Thread thread() {
+ return vthread;
+ }
+ @Override
+ public void run() {
+ vthread.runContinuation();;
+ }
+ @Override
+ public Thread preferredCarrier() {
+ return preferredCarrier;
+ }
+ @Override
+ public Object attach(Object att) {
+ return ATT.getAndSet(this, att);
+ }
+ @Override
+ public Object attachment() {
+ return att;
+ }
}
/**
* The continuation that a virtual thread executes.
*/
timeoutTask = null;
}
}
/**
- * Submits the given task to the given executor. If the scheduler is a
- * ForkJoinPool then the task is first adapted to a ForkJoinTask.
- */
- private void submit(Executor executor, Runnable task) {
- if (executor instanceof ForkJoinPool pool) {
- pool.submit(ForkJoinTask.adapt(task));
- } else {
- executor.execute(task);
- }
- }
-
- /**
- * Submits the runContinuation task to the scheduler. For the default scheduler,
- * and calling it on a worker thread, the task will be pushed to the local queue,
- * otherwise it will be pushed to an external submission queue.
- * @param scheduler the scheduler
+ * Submits the runContinuation task to the scheduler. For the built-in scheduler,
+ * the task will be pushed to the local queue if possible, otherwise it will be
+ * pushed to an external submission queue.
* @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown
* @throws RejectedExecutionException
*/
- private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) {
+ private void submitRunContinuation(boolean retryOnOOME) {
boolean done = false;
while (!done) {
try {
// Pin the continuation to prevent the virtual thread from unmounting
// when submitting a task. For the default scheduler this ensures that
// it avoids deadlock that could arise due to carriers and virtual
// threads contending for a lock.
if (currentThread().isVirtual()) {
Continuation.pin();
try {
- submit(scheduler, runContinuation);
+ scheduler.onContinue(runContinuation);
} finally {
Continuation.unpin();
}
} else {
- submit(scheduler, runContinuation);
+ scheduler.onContinue(runContinuation);
}
done = true;
} catch (RejectedExecutionException ree) {
submitFailed(ree);
throw ree;
}
}
}
}
- /**
- * Submits the runContinuation task to the given scheduler as an external submit.
- * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
- * @throws RejectedExecutionException
- * @see ForkJoinPool#externalSubmit(ForkJoinTask)
- */
- private void externalSubmitRunContinuation(ForkJoinPool pool) {
- assert Thread.currentThread() instanceof CarrierThread;
- try {
- pool.externalSubmit(ForkJoinTask.adapt(runContinuation));
- } catch (RejectedExecutionException ree) {
- submitFailed(ree);
- throw ree;
- } catch (OutOfMemoryError e) {
- submitRunContinuation(pool, true);
- }
- }
-
/**
* Submits the runContinuation task to the scheduler. For the default scheduler,
* and calling it on a worker thread, the task will be pushed to the local queue,
* otherwise it will be pushed to an external submission queue.
* If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
* @throws RejectedExecutionException
*/
private void submitRunContinuation() {
- submitRunContinuation(scheduler, true);
+ submitRunContinuation(true);
}
/**
- * Lazy submit the runContinuation task if invoked on a carrier thread and its local
- * queue is empty. If not empty, or invoked by another thread, then this method works
- * like submitRunContinuation and just submits the task to the scheduler.
+ * Invoked from a carrier thread to lazy submit the runContinuation task to the
+ * carrier's local queue if the queue is empty. If not empty, or invoked by a thread
+ * for a custom scheduler, then it just submits the task to the scheduler.
* If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
* @throws RejectedExecutionException
* @see ForkJoinPool#lazySubmit(ForkJoinTask)
*/
private void lazySubmitRunContinuation() {
+ assert !currentThread().isVirtual();
if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
- ForkJoinPool pool = ct.getPool();
try {
- pool.lazySubmit(ForkJoinTask.adapt(runContinuation));
+ ct.getPool().lazySubmit(ForkJoinTask.adapt(runContinuation));
} catch (RejectedExecutionException ree) {
submitFailed(ree);
throw ree;
} catch (OutOfMemoryError e) {
submitRunContinuation();
submitRunContinuation();
}
}
/**
- * Submits the runContinuation task to the scheduler. For the default scheduler, and
- * calling it a virtual thread that uses the default scheduler, the task will be
- * pushed to an external submission queue. This method may throw OutOfMemoryError.
+ * Invoked from a carrier thread to externally submit the runContinuation task to the
+ * scheduler. If invoked by a thread for a custom scheduler, then it just submits the
+ * task to the scheduler.
+ * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
* @throws RejectedExecutionException
- * @throws OutOfMemoryError
+ * @see ForkJoinPool#externalSubmit(ForkJoinTask)
*/
- private void externalSubmitRunContinuationOrThrow() {
- if (scheduler == DEFAULT_SCHEDULER && currentCarrierThread() instanceof CarrierThread ct) {
+ private void externalSubmitRunContinuation() {
+ assert !currentThread().isVirtual();
+ if (currentThread() instanceof CarrierThread ct) {
try {
ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation));
} catch (RejectedExecutionException ree) {
submitFailed(ree);
throw ree;
+ } catch (OutOfMemoryError e) {
+ submitRunContinuation();
}
} else {
- submitRunContinuation(scheduler, false);
+ submitRunContinuation();
+ }
+ }
+
+ /**
+ * Invoked from Thread.start to externally submit the runContinuation task to the
+ * scheduler. If this virtual thread is scheduled by the built-in scheduler,
+ * and this method is called from a virtual thread scheduled by the built-in
+ * scheduler, then it uses externalSubmit to ensure that the task is pushed to an
+ * external submission queue rather than the local queue.
+ * @throws RejectedExecutionException
+ * @throws OutOfMemoryError
+ * @see ForkJoinPool#externalSubmit(ForkJoinTask)
+ */
+ private void externalSubmitRunContinuationOrThrow() {
+ try {
+ if (currentThread().isVirtual()) {
+ // Pin the continuation to prevent the virtual thread from unmounting
+ // when submitting a task. This avoids deadlock that could arise due to
+ // carriers and virtual threads contending for a lock.
+ Continuation.pin();
+ try {
+ if (scheduler == BUILTIN_SCHEDULER
+ && currentCarrierThread() instanceof CarrierThread ct) {
+ ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation));
+ } else {
+ scheduler.onStart(runContinuation);
+ }
+ } finally {
+ Continuation.unpin();
+ }
+ } else {
+ scheduler.onStart(runContinuation);
+ }
+ } catch (RejectedExecutionException ree) {
+ submitFailed(ree);
+ throw ree;
}
}
/**
* If enabled, emits a JFR VirtualThreadSubmitFailedEvent.
if (s == YIELDING) {
setState(YIELDED);
// external submit if there are no tasks in the local task queue
if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
- externalSubmitRunContinuation(ct.getPool());
+ externalSubmitRunContinuation();
} else {
submitRunContinuation();
}
return;
}
if (getAndSetParkPermit(false) || interrupted)
return;
// park the thread
boolean yielded = false;
+ long eventStartTime = VirtualThreadParkEvent.eventStartTime();
setState(PARKING);
try {
yielded = yieldContinuation();
} catch (OutOfMemoryError e) {
// park on carrier
} finally {
assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
- if (!yielded) {
+ if (yielded) {
+ VirtualThreadParkEvent.offer(eventStartTime, Long.MIN_VALUE);
+ } else {
assert state() == PARKING;
setState(RUNNING);
}
}
if (nanos > 0) {
long startTime = System.nanoTime();
// park the thread, afterYield will schedule the thread to unpark
boolean yielded = false;
+ long eventStartTime = VirtualThreadParkEvent.eventStartTime();
timeout = nanos;
setState(TIMED_PARKING);
try {
yielded = yieldContinuation();
} catch (OutOfMemoryError e) {
// park on carrier
} finally {
assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
- if (!yielded) {
+ if (yielded) {
+ VirtualThreadParkEvent.offer(eventStartTime, nanos);
+ } else {
assert state() == TIMED_PARKING;
setState(RUNNING);
}
}
static {
registerNatives();
// ensure VTHREAD_GROUP is created, may be accessed by JVMTI
var group = Thread.virtualThreadGroup();
+
+ // ensure event class is initialized
+ try {
+ MethodHandles.lookup().ensureInitialized(VirtualThreadParkEvent.class);
+ } catch (IllegalAccessException e) {
+ throw new ExceptionInInitializerError(e);
+ }
}
/**
- * Creates the default ForkJoinPool scheduler.
+ * Loads a VirtualThreadScheduler with the given class name. The class must be public
+ * in an exported package, with public one-arg or no-arg constructor, and be visible
+ * to the system class loader.
+ * @param delegate the scheduler that the custom scheduler may delegate to
+ * @param cn the class name of the custom scheduler
*/
- private static ForkJoinPool createDefaultScheduler() {
- ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool);
+ private static VirtualThreadScheduler loadCustomScheduler(VirtualThreadScheduler delegate, String cn) {
+ VirtualThreadScheduler scheduler;
+ try {
+ Class<?> clazz = Class.forName(cn, true, ClassLoader.getSystemClassLoader());
+ // 1-arg constructor
+ try {
+ Constructor<?> ctor = clazz.getConstructor(VirtualThreadScheduler.class);
+ return (VirtualThreadScheduler) ctor.newInstance(delegate);
+ } catch (NoSuchMethodException e) {
+ // 0-arg constructor
+ Constructor<?> ctor = clazz.getConstructor();
+ scheduler = (VirtualThreadScheduler) ctor.newInstance();
+ }
+ } catch (Exception ex) {
+ throw new Error(ex);
+ }
+ System.err.println("WARNING: Using custom default scheduler, this is an experimental feature!");
+ return scheduler;
+ }
+
+ /**
+ * Creates the built-in ForkJoinPool scheduler.
+ * @param wrapped true if wrapped by a custom default scheduler
+ */
+ private static BuiltinScheduler createBuiltinScheduler(boolean wrapped) {
int parallelism, maxPoolSize, minRunnable;
String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
if (parallelismValue != null) {
if (minRunnableValue != null) {
minRunnable = Integer.parseInt(minRunnableValue);
} else {
minRunnable = Integer.max(parallelism / 2, 1);
}
- Thread.UncaughtExceptionHandler handler = (t, e) -> { };
- boolean asyncMode = true; // FIFO
- return new ForkJoinPool(parallelism, factory, handler, asyncMode,
- 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);
+ return new BuiltinScheduler(parallelism, maxPoolSize, minRunnable, wrapped);
+ }
+
+ /**
+ * The built-in ForkJoinPool scheduler.
+ */
+ private static class BuiltinScheduler
+ extends ForkJoinPool implements VirtualThreadScheduler {
+
+ BuiltinScheduler(int parallelism, int maxPoolSize, int minRunnable, boolean wrapped) {
+ ForkJoinWorkerThreadFactory factory = wrapped
+ ? ForkJoinPool.defaultForkJoinWorkerThreadFactory
+ : CarrierThread::new;
+ Thread.UncaughtExceptionHandler handler = (t, e) -> { };
+ boolean asyncMode = true; // FIFO
+ super(parallelism, factory, handler, asyncMode,
+ 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);
+ }
+
+ private void adaptAndExecute(Runnable task) {
+ execute(ForkJoinTask.adapt(task));
+ }
+
+ @Override
+ public void onStart(VirtualThreadTask task) {
+ adaptAndExecute(task);
+ }
+
+ @Override
+ public void onContinue(VirtualThreadTask task) {
+ adaptAndExecute(task);
+ }
+
+ /**
+ * Wraps the scheduler to avoid leaking a direct reference with
+ * {@link VirtualThreadScheduler#current()}.
+ */
+ VirtualThreadScheduler createExternalView() {
+ BuiltinScheduler builtin = this;
+ return new VirtualThreadScheduler() {
+ private void execute(VirtualThreadTask task) {
+ var vthread = (VirtualThread) task.thread();
+ VirtualThreadScheduler scheduler = vthread.scheduler;
+ if (scheduler == this || scheduler == DEFAULT_SCHEDULER) {
+ builtin.adaptAndExecute(task);
+ } else {
+ throw new IllegalArgumentException();
+ }
+ }
+ @Override
+ public void onStart(VirtualThreadTask task) {
+ execute(task);
+ }
+ @Override
+ public void onContinue(VirtualThreadTask task) {
+ execute(task);
+ }
+ @Override
+ public String toString() {
+ return builtin.toString();
+ }
+ };
+ }
}
/**
* Schedule a runnable task to run after a delay.
*/
private Future<?> schedule(Runnable command, long delay, TimeUnit unit) {
- if (scheduler instanceof ForkJoinPool pool) {
- return pool.schedule(command, delay, unit);
+ if (scheduler == BUILTIN_SCHEDULER) {
+ return BUILTIN_SCHEDULER.schedule(command, delay, unit);
} else {
return DelayedTaskSchedulers.schedule(command, delay, unit);
}
}
< prev index next >