< prev index next > src/java.base/share/classes/java/lang/VirtualThread.java
Print this page
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.lang;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
- import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import jdk.internal.event.VirtualThreadEndEvent;
import jdk.internal.event.VirtualThreadStartEvent;
import jdk.internal.event.VirtualThreadSubmitFailedEvent;
import jdk.internal.misc.CarrierThread;
import jdk.internal.misc.InnocuousThread;
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.lang;
+ import java.lang.reflect.Constructor;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+ import java.util.function.Supplier;
import jdk.internal.event.VirtualThreadEndEvent;
import jdk.internal.event.VirtualThreadStartEvent;
import jdk.internal.event.VirtualThreadSubmitFailedEvent;
import jdk.internal.misc.CarrierThread;
import jdk.internal.misc.InnocuousThread;
* A thread that is scheduled by the Java virtual machine rather than the operating system.
*/
final class VirtualThread extends BaseVirtualThread {
private static final Unsafe U = Unsafe.getUnsafe();
private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
! private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
// scheduler and continuation
! private final Executor scheduler;
private final Continuation cont;
private final Runnable runContinuation;
// virtual thread state, accessed by VM
private volatile int state;
* A thread that is scheduled by the Java virtual machine rather than the operating system.
*/
final class VirtualThread extends BaseVirtualThread {
private static final Unsafe U = Unsafe.getUnsafe();
private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
!
+ private static final VirtualThreadScheduler DEFAULT_SCHEDULER;
+ private static final boolean IS_CUSTOM_DEFAULT_SCHEDULER;
+ static {
+ // experimental
+ String propValue = System.getProperty("jdk.virtualThreadScheduler.implClass");
+ if (propValue != null) {
+ DEFAULT_SCHEDULER = createCustomDefaultScheduler(propValue);
+ IS_CUSTOM_DEFAULT_SCHEDULER = true;
+ } else {
+ DEFAULT_SCHEDULER = createDefaultForkJoinPoolScheduler();
+ IS_CUSTOM_DEFAULT_SCHEDULER = false;
+ }
+ }
private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
// scheduler and continuation
! private final VirtualThreadScheduler scheduler;
private final Continuation cont;
private final Runnable runContinuation;
// virtual thread state, accessed by VM
private volatile int state;
private volatile VirtualThread next;
// notified by Object.notify/notifyAll while waiting in Object.wait
private volatile boolean notified;
+ // true when waiting in Object.wait, false for VM internal uninterruptible Object.wait
+ private volatile boolean interruptableWait;
+
// timed-wait support
private byte timedWaitSeqNo;
// timeout for timed-park and timed-wait, only accessed on current/carrier thread
private long timeout;
private volatile CountDownLatch termination;
/**
* Returns the default scheduler.
*/
! static Executor defaultScheduler() {
return DEFAULT_SCHEDULER;
}
/**
* Returns the continuation scope used for virtual threads.
*/
static ContinuationScope continuationScope() {
return VTHREAD_SCOPE;
}
/**
! * Creates a new {@code VirtualThread} to run the given task with the given
! * scheduler. If the given scheduler is {@code null} and the current thread
! * is a platform thread then the newly created virtual thread will use the
! * default scheduler. If given scheduler is {@code null} and the current
! * thread is a virtual thread then the current thread's scheduler is used.
*
! * @param scheduler the scheduler or null
* @param name thread name
* @param characteristics characteristics
* @param task the task to execute
*/
! VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
super(name, characteristics, /*bound*/ false);
Objects.requireNonNull(task);
! // choose scheduler if not specified
if (scheduler == null) {
! Thread parent = Thread.currentThread();
- if (parent instanceof VirtualThread vparent) {
- scheduler = vparent.scheduler;
- } else {
- scheduler = DEFAULT_SCHEDULER;
- }
}
this.scheduler = scheduler;
this.cont = new VThreadContinuation(this, task);
this.runContinuation = this::runContinuation;
private volatile CountDownLatch termination;
/**
* Returns the default scheduler.
*/
! static VirtualThreadScheduler defaultScheduler() {
return DEFAULT_SCHEDULER;
}
+ /**
+ * Returns true if using a custom default scheduler.
+ */
+ static boolean isCustomDefaultScheduler() {
+ return IS_CUSTOM_DEFAULT_SCHEDULER;
+ }
+
/**
* Returns the continuation scope used for virtual threads.
*/
static ContinuationScope continuationScope() {
return VTHREAD_SCOPE;
}
/**
! * Return the scheduler for this thread.
! * @param revealBuiltin true to reveal the built-in default scheduler, false to hide
! */
! VirtualThreadScheduler scheduler(boolean revealBuiltin) {
! if (scheduler instanceof BuiltinDefaultScheduler builtin && !revealBuiltin) {
+ return builtin.externalView();
+ } else {
+ return scheduler;
+ }
+ }
+
+ /**
+ * Creates a new {@code VirtualThread} to run the given task with the given scheduler.
*
! * @param scheduler the scheduler or null for default scheduler
* @param name thread name
* @param characteristics characteristics
* @param task the task to execute
*/
! VirtualThread(VirtualThreadScheduler scheduler,
+ String name,
+ int characteristics,
+ Runnable task) {
super(name, characteristics, /*bound*/ false);
Objects.requireNonNull(task);
! // use default scheduler if not provided
if (scheduler == null) {
! scheduler = DEFAULT_SCHEDULER;
}
this.scheduler = scheduler;
this.cont = new VThreadContinuation(this, task);
this.runContinuation = this::runContinuation;
/**
* Submits the runContinuation task to the scheduler. For the default scheduler,
* and calling it on a worker thread, the task will be pushed to the local queue,
* otherwise it will be pushed to an external submission queue.
- * @param scheduler the scheduler
* @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown
* @throws RejectedExecutionException
*/
! private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) {
boolean done = false;
while (!done) {
try {
// Pin the continuation to prevent the virtual thread from unmounting
// when submitting a task. For the default scheduler this ensures that
/**
* Submits the runContinuation task to the scheduler. For the default scheduler,
* and calling it on a worker thread, the task will be pushed to the local queue,
* otherwise it will be pushed to an external submission queue.
* @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown
* @throws RejectedExecutionException
*/
! private void submitRunContinuation(boolean retryOnOOME) {
boolean done = false;
while (!done) {
try {
// Pin the continuation to prevent the virtual thread from unmounting
// when submitting a task. For the default scheduler this ensures that
// it avoids deadlock that could arise due to carriers and virtual
// threads contending for a lock.
if (currentThread().isVirtual()) {
Continuation.pin();
try {
! scheduler.execute(runContinuation);
} finally {
Continuation.unpin();
}
} else {
! scheduler.execute(runContinuation);
}
done = true;
} catch (RejectedExecutionException ree) {
submitFailed(ree);
throw ree;
// it avoids deadlock that could arise due to carriers and virtual
// threads contending for a lock.
if (currentThread().isVirtual()) {
Continuation.pin();
try {
! scheduler.execute(this, runContinuation);
} finally {
Continuation.unpin();
}
} else {
! scheduler.execute(this, runContinuation);
}
done = true;
} catch (RejectedExecutionException ree) {
submitFailed(ree);
throw ree;
}
}
}
}
- /**
- * Submits the runContinuation task to the given scheduler as an external submit.
- * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
- * @throws RejectedExecutionException
- * @see ForkJoinPool#externalSubmit(ForkJoinTask)
- */
- private void externalSubmitRunContinuation(ForkJoinPool pool) {
- assert Thread.currentThread() instanceof CarrierThread;
- try {
- pool.externalSubmit(ForkJoinTask.adapt(runContinuation));
- } catch (RejectedExecutionException ree) {
- submitFailed(ree);
- throw ree;
- } catch (OutOfMemoryError e) {
- submitRunContinuation(pool, true);
- }
- }
-
/**
* Submits the runContinuation task to the scheduler. For the default scheduler,
* and calling it on a worker thread, the task will be pushed to the local queue,
* otherwise it will be pushed to an external submission queue.
* If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
* @throws RejectedExecutionException
*/
private void submitRunContinuation() {
! submitRunContinuation(scheduler, true);
}
/**
* Lazy submit the runContinuation task if invoked on a carrier thread and its local
* queue is empty. If not empty, or invoked by another thread, then this method works
}
}
}
}
/**
* Submits the runContinuation task to the scheduler. For the default scheduler,
* and calling it on a worker thread, the task will be pushed to the local queue,
* otherwise it will be pushed to an external submission queue.
* If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
* @throws RejectedExecutionException
*/
private void submitRunContinuation() {
! submitRunContinuation(true);
}
/**
* Lazy submit the runContinuation task if invoked on a carrier thread and its local
* queue is empty. If not empty, or invoked by another thread, then this method works
* If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
* @throws RejectedExecutionException
* @see ForkJoinPool#lazySubmit(ForkJoinTask)
*/
private void lazySubmitRunContinuation() {
! if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
! ForkJoinPool pool = ct.getPool();
try {
! pool.lazySubmit(ForkJoinTask.adapt(runContinuation));
} catch (RejectedExecutionException ree) {
submitFailed(ree);
throw ree;
} catch (OutOfMemoryError e) {
submitRunContinuation();
* If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
* @throws RejectedExecutionException
* @see ForkJoinPool#lazySubmit(ForkJoinTask)
*/
private void lazySubmitRunContinuation() {
! if (scheduler == DEFAULT_SCHEDULER
! && currentCarrierThread() instanceof CarrierThread ct
+ && ct.getQueuedTaskCount() == 0) {
try {
! ct.getPool().lazySubmit(ForkJoinTask.adapt(runContinuation));
+ } catch (RejectedExecutionException ree) {
+ submitFailed(ree);
+ throw ree;
+ } catch (OutOfMemoryError e) {
+ submitRunContinuation();
+ }
+ } else {
+ submitRunContinuation();
+ }
+ }
+
+ /**
+ * Submits the runContinuation task to the scheduler. For the default scheduler, and
+ * calling it a virtual thread that uses the default scheduler, the task will be
+ * pushed to an external submission queue.
+ * @throws RejectedExecutionException
+ */
+ private void externalSubmitRunContinuation() {
+ if (scheduler == DEFAULT_SCHEDULER && currentCarrierThread() instanceof CarrierThread ct) {
+ try {
+ ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation));
} catch (RejectedExecutionException ree) {
submitFailed(ree);
throw ree;
} catch (OutOfMemoryError e) {
submitRunContinuation();
} catch (RejectedExecutionException ree) {
submitFailed(ree);
throw ree;
}
} else {
! submitRunContinuation(scheduler, false);
}
}
/**
* If enabled, emits a JFR VirtualThreadSubmitFailedEvent.
} catch (RejectedExecutionException ree) {
submitFailed(ree);
throw ree;
}
} else {
! submitRunContinuation(false);
}
}
/**
* If enabled, emits a JFR VirtualThreadSubmitFailedEvent.
if (s == YIELDING) {
setState(YIELDED);
// external submit if there are no tasks in the local task queue
if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
! externalSubmitRunContinuation(ct.getPool());
} else {
submitRunContinuation();
}
return;
}
if (s == YIELDING) {
setState(YIELDED);
// external submit if there are no tasks in the local task queue
if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
! externalSubmitRunContinuation();
} else {
submitRunContinuation();
}
return;
}
}
// Object.wait
if (s == WAITING || s == TIMED_WAITING) {
int newState;
+ boolean interruptable = interruptableWait;
if (s == WAITING) {
setState(newState = WAIT);
} else {
// For timed-wait, a timeout task is scheduled to execute. The timeout
// task will change the thread state to UNBLOCKED and submit the thread
}
return;
}
// may have been interrupted while in transition to wait state
! if (interrupted && compareAndSetState(newState, UNBLOCKED)) {
submitRunContinuation();
return;
}
return;
}
}
return;
}
// may have been interrupted while in transition to wait state
! if (interruptable && interrupted && compareAndSetState(newState, UNBLOCKED)) {
submitRunContinuation();
return;
}
return;
}
@Override
StackTraceElement[] asyncGetStackTrace() {
StackTraceElement[] stackTrace;
do {
stackTrace = (carrierThread != null)
! ? super.asyncGetStackTrace() // mounted
! : tryGetStackTrace(); // unmounted
if (stackTrace == null) {
Thread.yield();
}
} while (stackTrace == null);
return stackTrace;
}
/**
! * Returns the stack trace for this virtual thread if it is unmounted.
! * Returns null if the thread is mounted or in transition.
*/
! private StackTraceElement[] tryGetStackTrace() {
int initialState = state() & ~SUSPENDED;
switch (initialState) {
case NEW, STARTED, TERMINATED -> {
! return new StackTraceElement[0]; // unmounted, empty stack
}
case RUNNING, PINNED, TIMED_PINNED -> {
! return null; // mounted
}
case PARKED, TIMED_PARKED, BLOCKED, WAIT, TIMED_WAIT -> {
// unmounted, not runnable
}
case UNPARKED, UNBLOCKED, YIELDED -> {
// unmounted, runnable
}
case PARKING, TIMED_PARKING, BLOCKING, YIELDING, WAITING, TIMED_WAITING -> {
! return null; // in transition
}
default -> throw new InternalError("" + initialState);
}
// thread is unmounted, prevent it from continuing
int suspendedState = initialState | SUSPENDED;
if (!compareAndSetState(initialState, suspendedState)) {
return null;
}
- // get stack trace and restore state
- StackTraceElement[] stack;
try {
! stack = cont.getStackTrace();
} finally {
assert state == suspendedState;
setState(initialState);
! }
! boolean resubmit = switch (initialState) {
! case UNPARKED, UNBLOCKED, YIELDED -> {
! // resubmit as task may have run while suspended
! yield true;
! }
! case PARKED, TIMED_PARKED -> {
! // resubmit if unparked while suspended
! yield parkPermit && compareAndSetState(initialState, UNPARKED);
! }
! case BLOCKED -> {
! // resubmit if unblocked while suspended
! yield blockPermit && compareAndSetState(BLOCKED, UNBLOCKED);
! }
! case WAIT, TIMED_WAIT -> {
! // resubmit if notified or interrupted while waiting (Object.wait)
! // waitTimeoutExpired will retry if the timed expired when suspended
! yield (notified || interrupted) && compareAndSetState(initialState, UNBLOCKED);
}
- default -> throw new InternalError();
- };
- if (resubmit) {
- submitRunContinuation();
}
! return stack;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("VirtualThread[#");
@Override
StackTraceElement[] asyncGetStackTrace() {
StackTraceElement[] stackTrace;
do {
stackTrace = (carrierThread != null)
! ? super.asyncGetStackTrace() // mounted
! : supplyIfUnmounted(cont::getStackTrace, // unmounted
+ () -> new StackTraceElement[0]);
if (stackTrace == null) {
Thread.yield();
}
} while (stackTrace == null);
return stackTrace;
}
/**
! * Invokes a supplier to produce a non-null result if this virtual thread is not mounted.
! * @param supplier1 invoked if this virtual thread is alive and unmounted
+ * @param supplier2 invoked if this virtual thread is not alive
+ * @return the result; {@code null} if this virtual thread is mounted or in transition
*/
! <T> T supplyIfUnmounted(Supplier<T> supplier1, Supplier<T> supplier2) {
int initialState = state() & ~SUSPENDED;
switch (initialState) {
case NEW, STARTED, TERMINATED -> {
! return supplier2.get(); // terminated or not started
}
case RUNNING, PINNED, TIMED_PINNED -> {
! return null; // mounted
}
case PARKED, TIMED_PARKED, BLOCKED, WAIT, TIMED_WAIT -> {
// unmounted, not runnable
}
case UNPARKED, UNBLOCKED, YIELDED -> {
// unmounted, runnable
}
case PARKING, TIMED_PARKING, BLOCKING, YIELDING, WAITING, TIMED_WAITING -> {
! return null; // in transition
}
default -> throw new InternalError("" + initialState);
}
// thread is unmounted, prevent it from continuing
int suspendedState = initialState | SUSPENDED;
if (!compareAndSetState(initialState, suspendedState)) {
return null;
}
try {
! return supplier1.get();
} finally {
assert state == suspendedState;
setState(initialState);
!
! boolean resubmit = switch (initialState) {
! case UNPARKED, UNBLOCKED, YIELDED -> {
! // resubmit as task may have run while suspended
! yield true;
! }
! case PARKED, TIMED_PARKED -> {
! // resubmit if unparked while suspended
! yield parkPermit && compareAndSetState(initialState, UNPARKED);
! }
! case BLOCKED -> {
! // resubmit if unblocked while suspended
! yield blockPermit && compareAndSetState(BLOCKED, UNBLOCKED);
! }
! case WAIT, TIMED_WAIT -> {
! // resubmit if notified or interrupted while waiting (Object.wait)
! // waitTimeoutExpired will retry if the timed expired when suspended
! yield (notified || interrupted) && compareAndSetState(initialState, UNBLOCKED);
+ }
+ default -> throw new InternalError();
+ };
+ if (resubmit) {
+ submitRunContinuation();
}
}
!
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("VirtualThread[#");
// ensure VTHREAD_GROUP is created, may be accessed by JVMTI
var group = Thread.virtualThreadGroup();
}
/**
! * Creates the default ForkJoinPool scheduler.
*/
! private static ForkJoinPool createDefaultScheduler() {
! ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool);
int parallelism, maxPoolSize, minRunnable;
String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
if (parallelismValue != null) {
// ensure VTHREAD_GROUP is created, may be accessed by JVMTI
var group = Thread.virtualThreadGroup();
}
/**
! * Loads a VirtualThreadScheduler with the given class name to use at the
+ * default scheduler. The class is public in an exported package, has a public
+ * one-arg or no-arg constructor, and is visible to the system class loader.
*/
! private static VirtualThreadScheduler createCustomDefaultScheduler(String cn) {
! try {
+ Class<?> clazz = Class.forName(cn, true, ClassLoader.getSystemClassLoader());
+ VirtualThreadScheduler scheduler;
+ try {
+ // 1-arg constructor
+ Constructor<?> ctor = clazz.getConstructor(VirtualThreadScheduler.class);
+ var builtin = createDefaultForkJoinPoolScheduler();
+ scheduler = (VirtualThreadScheduler) ctor.newInstance(builtin.externalView());
+ } catch (NoSuchMethodException e) {
+ // 0-arg constructor
+ Constructor<?> ctor = clazz.getConstructor();
+ scheduler = (VirtualThreadScheduler) ctor.newInstance();
+ }
+ System.err.println("""
+ WARNING: Using custom default scheduler, this is an experimental feature!""");
+ return scheduler;
+ } catch (Exception ex) {
+ throw new Error(ex);
+ }
+ }
+
+ /**
+ * Creates the built-in default ForkJoinPool scheduler.
+ */
+ private static BuiltinDefaultScheduler createDefaultForkJoinPoolScheduler() {
int parallelism, maxPoolSize, minRunnable;
String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
if (parallelismValue != null) {
if (minRunnableValue != null) {
minRunnable = Integer.parseInt(minRunnableValue);
} else {
minRunnable = Integer.max(parallelism / 2, 1);
}
! Thread.UncaughtExceptionHandler handler = (t, e) -> { };
! boolean asyncMode = true; // FIFO
! return new ForkJoinPool(parallelism, factory, handler, asyncMode,
! 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);
}
/**
* Schedule a runnable task to run after a delay.
*/
if (minRunnableValue != null) {
minRunnable = Integer.parseInt(minRunnableValue);
} else {
minRunnable = Integer.max(parallelism / 2, 1);
}
! return new BuiltinDefaultScheduler(parallelism, maxPoolSize, minRunnable);
! }
!
! /**
+ * The built-in default ForkJoinPool scheduler.
+ */
+ private static class BuiltinDefaultScheduler
+ extends ForkJoinPool implements VirtualThreadScheduler {
+
+ private static final StableValue<VirtualThreadScheduler> VIEW = StableValue.of();
+
+ BuiltinDefaultScheduler(int parallelism, int maxPoolSize, int minRunnable) {
+ ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool);
+ Thread.UncaughtExceptionHandler handler = (t, e) -> { };
+ boolean asyncMode = true; // FIFO
+ super(parallelism, factory, handler, asyncMode,
+ 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);
+ }
+
+ @Override
+ public void execute(Thread vthread, Runnable task) {
+ execute(ForkJoinTask.adapt(task));
+ }
+
+ /**
+ * Wraps the scheduler to avoid leaking a direct reference.
+ */
+ VirtualThreadScheduler externalView() {
+ VirtualThreadScheduler builtin = this;
+ return VIEW.orElseSet(() -> {
+ return new VirtualThreadScheduler() {
+ @Override
+ public void execute(Thread thread, Runnable task) {
+ Objects.requireNonNull(thread);
+ if (thread instanceof VirtualThread vthread) {
+ VirtualThreadScheduler scheduler = vthread.scheduler;
+ if (scheduler == this || scheduler == DEFAULT_SCHEDULER) {
+ builtin.execute(thread, task);
+ } else {
+ throw new IllegalArgumentException();
+ }
+ } else {
+ throw new UnsupportedOperationException();
+ }
+ }
+ };
+ });
+ }
}
/**
* Schedule a runnable task to run after a delay.
*/
var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
VirtualThread::unblockVirtualThreads);
unblocker.setDaemon(true);
unblocker.start();
}
! }
var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
VirtualThread::unblockVirtualThreads);
unblocker.setDaemon(true);
unblocker.start();
}
! }
< prev index next >