< prev index next >

src/java.base/share/classes/java/lang/VirtualThread.java

Print this page
@@ -22,10 +22,13 @@
   * or visit www.oracle.com if you need additional information or have any
   * questions.
   */
  package java.lang;
  
+ import java.lang.invoke.MethodHandles;
+ import java.lang.invoke.VarHandle;
+ import java.lang.reflect.Constructor;
  import java.util.Locale;
  import java.util.Objects;
  import java.util.concurrent.CountDownLatch;
  import java.util.concurrent.Executor;
  import java.util.concurrent.Executors;

@@ -38,10 +41,11 @@
  import java.util.concurrent.ScheduledThreadPoolExecutor;
  import java.util.concurrent.TimeUnit;
  import jdk.internal.event.VirtualThreadEndEvent;
  import jdk.internal.event.VirtualThreadStartEvent;
  import jdk.internal.event.VirtualThreadSubmitFailedEvent;
+ import jdk.internal.invoke.MhUtil;
  import jdk.internal.misc.CarrierThread;
  import jdk.internal.misc.InnocuousThread;
  import jdk.internal.misc.Unsafe;
  import jdk.internal.vm.Continuation;
  import jdk.internal.vm.ContinuationScope;

@@ -61,11 +65,24 @@
   * A thread that is scheduled by the Java virtual machine rather than the operating system.
   */
  final class VirtualThread extends BaseVirtualThread {
      private static final Unsafe U = Unsafe.getUnsafe();
      private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
-     private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
+ 
+     private static final Executor DEFAULT_SCHEDULER;
+     private static final boolean USE_CUSTOM_RUNNER;
+     static {
+         // experimental
+         String propValue = System.getProperty("jdk.virtualThreadScheduler.implClass");
+         if (propValue != null) {
+             DEFAULT_SCHEDULER = createCustomDefaultScheduler(propValue);
+             USE_CUSTOM_RUNNER = true;
+         } else {
+             DEFAULT_SCHEDULER = createDefaultForkJoinPoolScheduler();
+             USE_CUSTOM_RUNNER = false;
+         }
+     }
  
      private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
      private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
      private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
      private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");

@@ -166,10 +183,13 @@
      private volatile VirtualThread next;
  
      // notified by Object.notify/notifyAll while waiting in Object.wait
      private volatile boolean notified;
  
+     // true when waiting in Object.wait, false for VM internal uninterruptible Object.wait
+     private volatile boolean interruptableWait;
+ 
      // timed-wait support
      private byte timedWaitSeqNo;
  
      // timeout for timed-park and timed-wait, only accessed on current/carrier thread
      private long timeout;

@@ -196,38 +216,40 @@
      static ContinuationScope continuationScope() {
          return VTHREAD_SCOPE;
      }
  
      /**
-      * Creates a new {@code VirtualThread} to run the given task with the given
-      * scheduler. If the given scheduler is {@code null} and the current thread
-      * is a platform thread then the newly created virtual thread will use the
-      * default scheduler. If given scheduler is {@code null} and the current
-      * thread is a virtual thread then the current thread's scheduler is used.
+      * Return the scheduler for this thread.
+      */
+     Executor scheduler() {
+         return scheduler;
+     }
+ 
+     /**
+      * Creates a new {@code VirtualThread} to run the given task with the given scheduler.
       *
-      * @param scheduler the scheduler or null
+      * @param scheduler the scheduler or null for default scheduler
       * @param name thread name
       * @param characteristics characteristics
       * @param task the task to execute
       */
      VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
          super(name, characteristics, /*bound*/ false);
          Objects.requireNonNull(task);
  
-         // choose scheduler if not specified
+         // use default scheduler if not provided
          if (scheduler == null) {
-             Thread parent = Thread.currentThread();
-             if (parent instanceof VirtualThread vparent) {
-                 scheduler = vparent.scheduler;
-             } else {
-                 scheduler = DEFAULT_SCHEDULER;
-             }
+             scheduler = DEFAULT_SCHEDULER;
          }
  
          this.scheduler = scheduler;
          this.cont = new VThreadContinuation(this, task);
-         this.runContinuation = this::runContinuation;
+         if (USE_CUSTOM_RUNNER || (scheduler != DEFAULT_SCHEDULER)) {
+             this.runContinuation = new CustomRunner(this);
+         } else {
+             this.runContinuation = this::runContinuation;
+         }
      }
  
      /**
       * The continuation that a virtual thread executes.
       */

@@ -252,10 +274,55 @@
                  }
              };
          }
      }
  
+     /**
+      * The task to execute when using a custom scheduler.
+      */
+     private static class CustomRunner implements VirtualThreadTask {
+         private static final VarHandle ATTACHMENT =
+                 MhUtil.findVarHandle(MethodHandles.lookup(), "attachment", Object.class);
+         private final VirtualThread vthread;
+         private volatile Object attachment;
+         CustomRunner(VirtualThread vthread) {
+             this.vthread = vthread;
+         }
+         @Override
+         public void run() {
+             vthread.runContinuation();
+         }
+         @Override
+         public Thread thread() {
+             return vthread;
+         }
+         @Override
+         public Object attach(Object ob) {
+             return ATTACHMENT.getAndSet(this, ob);
+         }
+         @Override
+         public Object attachment() {
+             return attachment;
+         }
+         @Override
+         public String toString() {
+             return vthread.toString();
+         }
+     }
+ 
+     /**
+      * Returns the object attached to the virtual thread's task.
+      */
+     Object currentTaskAttachment() {
+         assert Thread.currentThread() == this;
+         if (runContinuation instanceof CustomRunner runner) {
+             return runner.attachment();
+         } else {
+             return null;
+         }
+     }
+ 
      /**
       * Runs or continues execution on the current thread. The virtual thread is mounted
       * on the current thread before the task runs or continues. It unmounts when the
       * task completes or yields.
       */

@@ -310,10 +377,22 @@
              timeoutTask.cancel(false);
              timeoutTask = null;
          }
      }
  
+     /**
+      * Submits the given task to the given executor. If the scheduler is a
+      * ForkJoinPool then the task is first adapted to a ForkJoinTask.
+      */
+     private void submit(Executor executor, Runnable task) {
+         if (executor instanceof ForkJoinPool pool) {
+             pool.submit(ForkJoinTask.adapt(task));
+         } else {
+             executor.execute(task);
+         }
+     }
+ 
      /**
       * Submits the runContinuation task to the scheduler. For the default scheduler,
       * and calling it on a worker thread, the task will be pushed to the local queue,
       * otherwise it will be pushed to an external submission queue.
       * @param scheduler the scheduler

@@ -330,16 +409,16 @@
                  // it avoids deadlock that could arise due to carriers and virtual
                  // threads contending for a lock.
                  if (currentThread().isVirtual()) {
                      Continuation.pin();
                      try {
-                         scheduler.execute(runContinuation);
+                         submit(scheduler, runContinuation);
                      } finally {
                          Continuation.unpin();
                      }
                  } else {
-                     scheduler.execute(runContinuation);
+                     submit(scheduler, runContinuation);
                  }
                  done = true;
              } catch (RejectedExecutionException ree) {
                  submitFailed(ree);
                  throw ree;

@@ -597,10 +676,11 @@
          }
  
          // Object.wait
          if (s == WAITING || s == TIMED_WAITING) {
              int newState;
+             boolean interruptable = interruptableWait;
              if (s == WAITING) {
                  setState(newState = WAIT);
              } else {
                  // For timed-wait, a timeout task is scheduled to execute. The timeout
                  // task will change the thread state to UNBLOCKED and submit the thread

@@ -626,11 +706,11 @@
                  }
                  return;
              }
  
              // may have been interrupted while in transition to wait state
-             if (interrupted && compareAndSetState(newState, UNBLOCKED)) {
+             if (interruptable && interrupted && compareAndSetState(newState, UNBLOCKED)) {
                  submitRunContinuation();
                  return;
              }
              return;
          }

@@ -1412,14 +1492,32 @@
  
          // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
          var group = Thread.virtualThreadGroup();
      }
  
+     /**
+      * Loads a java.util.concurrent.Executor with the given class name to use at the
+      * default scheduler. The class is public in an exported package, has a public
+      * no-arg constructor, and is visible to the system class loader.
+      */
+     private static Executor createCustomDefaultScheduler(String cn) {
+         try {
+             Class<?> clazz = Class.forName(cn, true, ClassLoader.getSystemClassLoader());
+             Constructor<?> ctor = clazz.getConstructor();
+             var scheduler = (Executor) ctor.newInstance();
+             System.err.println("""
+                 WARNING: Using custom default scheduler, this is an experimental feature!""");
+             return scheduler;
+         } catch (Exception ex) {
+             throw new Error(ex);
+         }
+     }
+ 
      /**
       * Creates the default ForkJoinPool scheduler.
       */
-     private static ForkJoinPool createDefaultScheduler() {
+     private static ForkJoinPool createDefaultForkJoinPoolScheduler() {
          ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool);
          int parallelism, maxPoolSize, minRunnable;
          String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
          String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
          String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");

@@ -1530,6 +1628,6 @@
          var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
                  VirtualThread::unblockVirtualThreads);
          unblocker.setDaemon(true);
          unblocker.start();
      }
- }
+ }
< prev index next >