< prev index next > src/java.base/share/classes/java/lang/VirtualThread.java
Print this page
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.lang;
+ import java.lang.invoke.MethodHandles;
+ import java.lang.invoke.VarHandle;
+ import java.lang.reflect.Constructor;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import jdk.internal.event.VirtualThreadEndEvent;
import jdk.internal.event.VirtualThreadStartEvent;
import jdk.internal.event.VirtualThreadSubmitFailedEvent;
+ import jdk.internal.invoke.MhUtil;
import jdk.internal.misc.CarrierThread;
import jdk.internal.misc.InnocuousThread;
import jdk.internal.misc.Unsafe;
import jdk.internal.vm.Continuation;
import jdk.internal.vm.ContinuationScope;
* A thread that is scheduled by the Java virtual machine rather than the operating system.
*/
final class VirtualThread extends BaseVirtualThread {
private static final Unsafe U = Unsafe.getUnsafe();
private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
- private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
+
+ private static final Executor DEFAULT_SCHEDULER;
+ private static final boolean USE_CUSTOM_RUNNER;
+ static {
+ // experimental
+ String propValue = System.getProperty("jdk.virtualThreadScheduler.implClass");
+ if (propValue != null) {
+ DEFAULT_SCHEDULER = createCustomDefaultScheduler(propValue);
+ USE_CUSTOM_RUNNER = true;
+ } else {
+ DEFAULT_SCHEDULER = createDefaultForkJoinPoolScheduler();
+ USE_CUSTOM_RUNNER = false;
+ }
+ }
private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
private volatile VirtualThread next;
// notified by Object.notify/notifyAll while waiting in Object.wait
private volatile boolean notified;
+ // true when waiting in Object.wait, false for VM internal uninterruptible Object.wait
+ private volatile boolean interruptableWait;
+
// timed-wait support
private byte timedWaitSeqNo;
// timeout for timed-park and timed-wait, only accessed on current/carrier thread
private long timeout;
static ContinuationScope continuationScope() {
return VTHREAD_SCOPE;
}
/**
- * Creates a new {@code VirtualThread} to run the given task with the given
- * scheduler. If the given scheduler is {@code null} and the current thread
- * is a platform thread then the newly created virtual thread will use the
- * default scheduler. If given scheduler is {@code null} and the current
- * thread is a virtual thread then the current thread's scheduler is used.
+ * Return the scheduler for this thread.
+ */
+ Executor scheduler() {
+ return scheduler;
+ }
+
+ /**
+ * Creates a new {@code VirtualThread} to run the given task with the given scheduler.
*
- * @param scheduler the scheduler or null
+ * @param scheduler the scheduler or null for default scheduler
* @param name thread name
* @param characteristics characteristics
* @param task the task to execute
*/
VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
super(name, characteristics, /*bound*/ false);
Objects.requireNonNull(task);
- // choose scheduler if not specified
+ // use default scheduler if not provided
if (scheduler == null) {
- Thread parent = Thread.currentThread();
- if (parent instanceof VirtualThread vparent) {
- scheduler = vparent.scheduler;
- } else {
- scheduler = DEFAULT_SCHEDULER;
- }
+ scheduler = DEFAULT_SCHEDULER;
}
this.scheduler = scheduler;
this.cont = new VThreadContinuation(this, task);
- this.runContinuation = this::runContinuation;
+ if (USE_CUSTOM_RUNNER || (scheduler != DEFAULT_SCHEDULER)) {
+ this.runContinuation = new CustomRunner(this);
+ } else {
+ this.runContinuation = this::runContinuation;
+ }
}
/**
* The continuation that a virtual thread executes.
*/
}
};
}
}
+ /**
+ * The task to execute when using a custom scheduler.
+ */
+ private static class CustomRunner implements VirtualThreadTask {
+ private static final VarHandle ATTACHMENT =
+ MhUtil.findVarHandle(MethodHandles.lookup(), "attachment", Object.class);
+ private final VirtualThread vthread;
+ private volatile Object attachment;
+ CustomRunner(VirtualThread vthread) {
+ this.vthread = vthread;
+ }
+ @Override
+ public void run() {
+ vthread.runContinuation();
+ }
+ @Override
+ public Thread thread() {
+ return vthread;
+ }
+ @Override
+ public Object attach(Object ob) {
+ return ATTACHMENT.getAndSet(this, ob);
+ }
+ @Override
+ public Object attachment() {
+ return attachment;
+ }
+ @Override
+ public String toString() {
+ return vthread.toString();
+ }
+ }
+
+ /**
+ * Returns the object attached to the virtual thread's task.
+ */
+ Object currentTaskAttachment() {
+ assert Thread.currentThread() == this;
+ if (runContinuation instanceof CustomRunner runner) {
+ return runner.attachment();
+ } else {
+ return null;
+ }
+ }
+
/**
* Runs or continues execution on the current thread. The virtual thread is mounted
* on the current thread before the task runs or continues. It unmounts when the
* task completes or yields.
*/
timeoutTask.cancel(false);
timeoutTask = null;
}
}
+ /**
+ * Submits the given task to the given executor. If the scheduler is a
+ * ForkJoinPool then the task is first adapted to a ForkJoinTask.
+ */
+ private void submit(Executor executor, Runnable task) {
+ if (executor instanceof ForkJoinPool pool) {
+ pool.submit(ForkJoinTask.adapt(task));
+ } else {
+ executor.execute(task);
+ }
+ }
+
/**
* Submits the runContinuation task to the scheduler. For the default scheduler,
* and calling it on a worker thread, the task will be pushed to the local queue,
* otherwise it will be pushed to an external submission queue.
* @param scheduler the scheduler
// it avoids deadlock that could arise due to carriers and virtual
// threads contending for a lock.
if (currentThread().isVirtual()) {
Continuation.pin();
try {
- scheduler.execute(runContinuation);
+ submit(scheduler, runContinuation);
} finally {
Continuation.unpin();
}
} else {
- scheduler.execute(runContinuation);
+ submit(scheduler, runContinuation);
}
done = true;
} catch (RejectedExecutionException ree) {
submitFailed(ree);
throw ree;
}
// Object.wait
if (s == WAITING || s == TIMED_WAITING) {
int newState;
+ boolean interruptable = interruptableWait;
if (s == WAITING) {
setState(newState = WAIT);
} else {
// For timed-wait, a timeout task is scheduled to execute. The timeout
// task will change the thread state to UNBLOCKED and submit the thread
}
return;
}
// may have been interrupted while in transition to wait state
- if (interrupted && compareAndSetState(newState, UNBLOCKED)) {
+ if (interruptable && interrupted && compareAndSetState(newState, UNBLOCKED)) {
submitRunContinuation();
return;
}
return;
}
// ensure VTHREAD_GROUP is created, may be accessed by JVMTI
var group = Thread.virtualThreadGroup();
}
+ /**
+ * Loads a java.util.concurrent.Executor with the given class name to use at the
+ * default scheduler. The class is public in an exported package, has a public
+ * no-arg constructor, and is visible to the system class loader.
+ */
+ private static Executor createCustomDefaultScheduler(String cn) {
+ try {
+ Class<?> clazz = Class.forName(cn, true, ClassLoader.getSystemClassLoader());
+ Constructor<?> ctor = clazz.getConstructor();
+ var scheduler = (Executor) ctor.newInstance();
+ System.err.println("""
+ WARNING: Using custom default scheduler, this is an experimental feature!""");
+ return scheduler;
+ } catch (Exception ex) {
+ throw new Error(ex);
+ }
+ }
+
/**
* Creates the default ForkJoinPool scheduler.
*/
- private static ForkJoinPool createDefaultScheduler() {
+ private static ForkJoinPool createDefaultForkJoinPoolScheduler() {
ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool);
int parallelism, maxPoolSize, minRunnable;
String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
VirtualThread::unblockVirtualThreads);
unblocker.setDaemon(true);
unblocker.start();
}
- }
+ }
< prev index next >