< prev index next >

src/java.base/share/classes/java/lang/VirtualThread.java

Print this page
*** 22,26 ***
   * or visit www.oracle.com if you need additional information or have any
   * questions.
   */
  package java.lang;
  
  import java.util.Locale;
  import java.util.Objects;
  import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.Executor;
  import java.util.concurrent.Executors;
  import java.util.concurrent.ForkJoinPool;
- import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
  import java.util.concurrent.ForkJoinTask;
  import java.util.concurrent.Future;
  import java.util.concurrent.RejectedExecutionException;
  import java.util.concurrent.ScheduledExecutorService;
  import java.util.concurrent.ScheduledThreadPoolExecutor;
  import java.util.concurrent.TimeUnit;
  import jdk.internal.event.VirtualThreadEndEvent;
  import jdk.internal.event.VirtualThreadStartEvent;
  import jdk.internal.event.VirtualThreadSubmitFailedEvent;
  import jdk.internal.misc.CarrierThread;
  import jdk.internal.misc.InnocuousThread;
  import jdk.internal.misc.Unsafe;
  import jdk.internal.vm.Continuation;
  import jdk.internal.vm.ContinuationScope;
--- 22,29 ---
   * or visit www.oracle.com if you need additional information or have any
   * questions.
   */
  package java.lang;
  
+ import java.lang.invoke.MethodHandles;
+ import java.lang.invoke.VarHandle;
+ import java.lang.reflect.Constructor;
  import java.util.Locale;
  import java.util.Objects;
  import java.util.concurrent.CountDownLatch;
  import java.util.concurrent.Executors;
  import java.util.concurrent.ForkJoinPool;
  import java.util.concurrent.ForkJoinTask;
  import java.util.concurrent.Future;
  import java.util.concurrent.RejectedExecutionException;
  import java.util.concurrent.ScheduledExecutorService;
  import java.util.concurrent.ScheduledThreadPoolExecutor;
  import java.util.concurrent.TimeUnit;
  import jdk.internal.event.VirtualThreadEndEvent;
+ import jdk.internal.event.VirtualThreadParkEvent;
  import jdk.internal.event.VirtualThreadStartEvent;
  import jdk.internal.event.VirtualThreadSubmitFailedEvent;
+ import jdk.internal.invoke.MhUtil;
  import jdk.internal.misc.CarrierThread;
  import jdk.internal.misc.InnocuousThread;
  import jdk.internal.misc.Unsafe;
  import jdk.internal.vm.Continuation;
  import jdk.internal.vm.ContinuationScope;

*** 61,22 ***
   * A thread that is scheduled by the Java virtual machine rather than the operating system.
   */
  final class VirtualThread extends BaseVirtualThread {
      private static final Unsafe U = Unsafe.getUnsafe();
      private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
!     private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
  
      private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
      private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
      private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
      private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
      private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
  
      // scheduler and continuation
!     private final Executor scheduler;
      private final Continuation cont;
!     private final Runnable runContinuation;
  
      // virtual thread state, accessed by VM
      private volatile int state;
  
      /*
--- 64,42 ---
   * A thread that is scheduled by the Java virtual machine rather than the operating system.
   */
  final class VirtualThread extends BaseVirtualThread {
      private static final Unsafe U = Unsafe.getUnsafe();
      private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
! 
+     private static final BuiltinScheduler BUILTIN_SCHEDULER;
+     private static final VirtualThreadScheduler DEFAULT_SCHEDULER;
+     private static final VirtualThreadScheduler EXTERNAL_VIEW;
+     static {
+         // experimental
+         String propValue = System.getProperty("jdk.virtualThreadScheduler.implClass");
+         if (propValue != null) {
+             BuiltinScheduler builtinScheduler = createBuiltinScheduler(true);
+             VirtualThreadScheduler externalView = builtinScheduler.createExternalView();
+             VirtualThreadScheduler defaultScheduler = loadCustomScheduler(externalView, propValue);
+             BUILTIN_SCHEDULER = builtinScheduler;
+             DEFAULT_SCHEDULER = defaultScheduler;
+             EXTERNAL_VIEW = externalView;
+         } else {
+             var builtinScheduler = createBuiltinScheduler(false);
+             BUILTIN_SCHEDULER = builtinScheduler;
+             DEFAULT_SCHEDULER = builtinScheduler;
+             EXTERNAL_VIEW = builtinScheduler.createExternalView();
+         }
+     }
  
      private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
      private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
      private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
      private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
      private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
  
      // scheduler and continuation
!     private final VirtualThreadScheduler scheduler;
      private final Continuation cont;
!     private final VirtualThreadTask runContinuation;
  
      // virtual thread state, accessed by VM
      private volatile int state;
  
      /*

*** 87,30 ***
       *  STARTED -> RUNNING         // first run
       *  RUNNING -> TERMINATED      // done
       *
       *  RUNNING -> PARKING         // Thread parking with LockSupport.park
       *  PARKING -> PARKED          // cont.yield successful, parked indefinitely
-      *  PARKING -> PINNED          // cont.yield failed, parked indefinitely on carrier
       *   PARKED -> UNPARKED        // unparked, may be scheduled to continue
-      *   PINNED -> RUNNING         // unparked, continue execution on same carrier
       * UNPARKED -> RUNNING         // continue execution after park
       *
       *       RUNNING -> TIMED_PARKING   // Thread parking with LockSupport.parkNanos
       * TIMED_PARKING -> TIMED_PARKED    // cont.yield successful, timed-parked
-      * TIMED_PARKING -> TIMED_PINNED    // cont.yield failed, timed-parked on carrier
       *  TIMED_PARKED -> UNPARKED        // unparked, may be scheduled to continue
       *  TIMED_PINNED -> RUNNING         // unparked, continue execution on same carrier
       *
       *   RUNNING -> BLOCKING       // blocking on monitor enter
       *  BLOCKING -> BLOCKED        // blocked on monitor enter
       *   BLOCKED -> UNBLOCKED      // unblocked, may be scheduled to continue
       * UNBLOCKED -> RUNNING        // continue execution after blocked on monitor enter
       *
       *   RUNNING -> WAITING        // transitional state during wait on monitor
       *   WAITING -> WAIT           // waiting on monitor
       *      WAIT -> BLOCKED        // notified, waiting to be unblocked by monitor owner
!      *      WAIT -> UNBLOCKED      // timed-out/interrupted
       *
       *       RUNNING -> TIMED_WAITING   // transition state during timed-waiting on monitor
       * TIMED_WAITING -> TIMED_WAIT      // timed-waiting on monitor
       *    TIMED_WAIT -> BLOCKED         // notified, waiting to be unblocked by monitor owner
       *    TIMED_WAIT -> UNBLOCKED       // timed-out/interrupted
--- 110,34 ---
       *  STARTED -> RUNNING         // first run
       *  RUNNING -> TERMINATED      // done
       *
       *  RUNNING -> PARKING         // Thread parking with LockSupport.park
       *  PARKING -> PARKED          // cont.yield successful, parked indefinitely
       *   PARKED -> UNPARKED        // unparked, may be scheduled to continue
       * UNPARKED -> RUNNING         // continue execution after park
       *
+      *  PARKING -> RUNNING         // cont.yield failed, need to park on carrier
+      *  RUNNING -> PINNED          // park on carrier
+      *   PINNED -> RUNNING         // unparked, continue execution on same carrier
+      *
       *       RUNNING -> TIMED_PARKING   // Thread parking with LockSupport.parkNanos
       * TIMED_PARKING -> TIMED_PARKED    // cont.yield successful, timed-parked
       *  TIMED_PARKED -> UNPARKED        // unparked, may be scheduled to continue
+      *
+      * TIMED_PARKING -> RUNNING         // cont.yield failed, need to park on carrier
+      *       RUNNING -> TIMED_PINNED    // park on carrier
       *  TIMED_PINNED -> RUNNING         // unparked, continue execution on same carrier
       *
       *   RUNNING -> BLOCKING       // blocking on monitor enter
       *  BLOCKING -> BLOCKED        // blocked on monitor enter
       *   BLOCKED -> UNBLOCKED      // unblocked, may be scheduled to continue
       * UNBLOCKED -> RUNNING        // continue execution after blocked on monitor enter
       *
       *   RUNNING -> WAITING        // transitional state during wait on monitor
       *   WAITING -> WAIT           // waiting on monitor
       *      WAIT -> BLOCKED        // notified, waiting to be unblocked by monitor owner
!      *      WAIT -> UNBLOCKED      // interrupted
       *
       *       RUNNING -> TIMED_WAITING   // transition state during timed-waiting on monitor
       * TIMED_WAITING -> TIMED_WAIT      // timed-waiting on monitor
       *    TIMED_WAIT -> BLOCKED         // notified, waiting to be unblocked by monitor owner
       *    TIMED_WAIT -> UNBLOCKED       // timed-out/interrupted

*** 148,13 ***
      private static final int TIMED_WAITING = 17;
      private static final int TIMED_WAIT    = 18;    // waiting in timed-Object.wait
  
      private static final int TERMINATED = 99;  // final state
  
-     // can be suspended from scheduling when unmounted
-     private static final int SUSPENDED = 1 << 8;
- 
      // parking permit made available by LockSupport.unpark
      private volatile boolean parkPermit;
  
      // blocking permit made available by unblocker thread when another thread exits monitor
      private volatile boolean blockPermit;
--- 175,10 ---

*** 185,13 ***
  
      // termination object when joining, created lazily if needed
      private volatile CountDownLatch termination;
  
      /**
!      * Returns the default scheduler.
       */
!     static Executor defaultScheduler() {
          return DEFAULT_SCHEDULER;
      }
  
      /**
       * Returns the continuation scope used for virtual threads.
--- 209,20 ---
  
      // termination object when joining, created lazily if needed
      private volatile CountDownLatch termination;
  
      /**
!      * Return the built-in scheduler.
       */
!     static VirtualThreadScheduler builtinScheduler() {
+         return BUILTIN_SCHEDULER;
+     }
+ 
+     /**
+      * Returns the default scheduler, usually the same as the built-in scheduler.
+      */
+     static VirtualThreadScheduler defaultScheduler() {
          return DEFAULT_SCHEDULER;
      }
  
      /**
       * Returns the continuation scope used for virtual threads.

*** 199,38 ***
      static ContinuationScope continuationScope() {
          return VTHREAD_SCOPE;
      }
  
      /**
!      * Creates a new {@code VirtualThread} to run the given task with the given
!      * scheduler. If the given scheduler is {@code null} and the current thread
!      * is a platform thread then the newly created virtual thread will use the
!      * default scheduler. If given scheduler is {@code null} and the current
!      * thread is a virtual thread then the current thread's scheduler is used.
       *
!      * @param scheduler the scheduler or null
       * @param name thread name
       * @param characteristics characteristics
       * @param task the task to execute
       */
!     VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
          super(name, characteristics, /*bound*/ false);
          Objects.requireNonNull(task);
  
!         // choose scheduler if not specified
          if (scheduler == null) {
!             Thread parent = Thread.currentThread();
!             if (parent instanceof VirtualThread vparent) {
!                 scheduler = vparent.scheduler;
-             } else {
-                 scheduler = DEFAULT_SCHEDULER;
-             }
          }
- 
          this.scheduler = scheduler;
          this.cont = new VThreadContinuation(this, task);
!         this.runContinuation = this::runContinuation;
      }
  
      /**
       * The continuation that a virtual thread executes.
       */
--- 230,121 ---
      static ContinuationScope continuationScope() {
          return VTHREAD_SCOPE;
      }
  
      /**
!      * Return the scheduler for this thread.
!      * @param trusted true if caller is trusted, false if not trusted
!      */
!     VirtualThreadScheduler scheduler(boolean trusted) {
!         if (scheduler == BUILTIN_SCHEDULER && !trusted) {
+             return EXTERNAL_VIEW;
+         } else {
+             return scheduler;
+         }
+     }
+ 
+     /**
+      * Creates a new {@code VirtualThread} to run the given task with the given scheduler.
       *
!      * @param scheduler the scheduler or null for default scheduler
+      * @param preferredCarrier the preferred carrier or null
       * @param name thread name
       * @param characteristics characteristics
       * @param task the task to execute
       */
!     VirtualThread(VirtualThreadScheduler scheduler,
+                   Thread preferredCarrier,
+                   String name,
+                   int characteristics,
+                   Runnable task,
+                   Object att) {
          super(name, characteristics, /*bound*/ false);
          Objects.requireNonNull(task);
  
!         // use default scheduler if not provided
          if (scheduler == null) {
!             scheduler = DEFAULT_SCHEDULER;
!         } else if (scheduler == EXTERNAL_VIEW) {
!             throw new UnsupportedOperationException();
          }
          this.scheduler = scheduler;
          this.cont = new VThreadContinuation(this, task);
! 
+         if (scheduler == BUILTIN_SCHEDULER) {
+             this.runContinuation = new BuiltinSchedulerTask(this);
+         } else {
+             this.runContinuation = new CustomSchedulerTask(this, preferredCarrier, att);
+         }
+     }
+ 
+     /**
+      * The task to execute when using the built-in scheduler.
+      */
+     static final class BuiltinSchedulerTask implements VirtualThreadTask {
+         private final VirtualThread vthread;
+         BuiltinSchedulerTask(VirtualThread vthread) {
+             this.vthread = vthread;
+         }
+         @Override
+         public Thread thread() {
+             return vthread;
+         }
+         @Override
+         public void run() {
+             vthread.runContinuation();;
+         }
+         @Override
+         public Thread preferredCarrier() {
+             throw new UnsupportedOperationException();
+         }
+         @Override
+         public Object attach(Object att) {
+             throw new UnsupportedOperationException();
+         }
+         @Override
+         public Object attachment() {
+             throw new UnsupportedOperationException();
+         }
+     }
+ 
+     /**
+      * The task to execute when using a custom scheduler.
+      */
+     static final class CustomSchedulerTask implements VirtualThreadTask {
+         private static final VarHandle ATT =
+                 MhUtil.findVarHandle(MethodHandles.lookup(), "att", Object.class);
+         private final VirtualThread vthread;
+         private final Thread preferredCarrier;
+         private volatile Object att;
+         CustomSchedulerTask(VirtualThread vthread, Thread preferredCarrier, Object att) {
+             this.vthread = vthread;
+             this.preferredCarrier = preferredCarrier;
+             if (att != null) {
+                 this.att = att;
+             }
+         }
+         @Override
+         public Thread thread() {
+             return vthread;
+         }
+         @Override
+         public void run() {
+             vthread.runContinuation();;
+         }
+         @Override
+         public Thread preferredCarrier() {
+             return preferredCarrier;
+         }
+         @Override
+         public Object attach(Object att) {
+             return ATT.getAndSet(this, att);
+         }
+         @Override
+         public Object attachment() {
+             return att;
+         }
      }
  
      /**
       * The continuation that a virtual thread executes.
       */

*** 314,30 ***
              timeoutTask = null;
          }
      }
  
      /**
!      * Submits the given task to the given executor. If the scheduler is a
!      * ForkJoinPool then the task is first adapted to a ForkJoinTask.
!      */
-     private void submit(Executor executor, Runnable task) {
-         if (executor instanceof ForkJoinPool pool) {
-             pool.submit(ForkJoinTask.adapt(task));
-         } else {
-             executor.execute(task);
-         }
-     }
- 
-     /**
-      * Submits the runContinuation task to the scheduler. For the default scheduler,
-      * and calling it on a worker thread, the task will be pushed to the local queue,
-      * otherwise it will be pushed to an external submission queue.
-      * @param scheduler the scheduler
       * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown
       * @throws RejectedExecutionException
       */
!     private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) {
          boolean done = false;
          while (!done) {
              try {
                  // Pin the continuation to prevent the virtual thread from unmounting
                  // when submitting a task. For the default scheduler this ensures that
--- 428,17 ---
              timeoutTask = null;
          }
      }
  
      /**
!      * Submits the runContinuation task to the scheduler. For the built-in scheduler,
!      * the task will be pushed to the local queue if possible, otherwise it will be
!      * pushed to an external submission queue.
       * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown
       * @throws RejectedExecutionException
       */
!     private void submitRunContinuation(boolean retryOnOOME) {
          boolean done = false;
          while (!done) {
              try {
                  // Pin the continuation to prevent the virtual thread from unmounting
                  // when submitting a task. For the default scheduler this ensures that

*** 345,16 ***
                  // it avoids deadlock that could arise due to carriers and virtual
                  // threads contending for a lock.
                  if (currentThread().isVirtual()) {
                      Continuation.pin();
                      try {
!                         submit(scheduler, runContinuation);
                      } finally {
                          Continuation.unpin();
                      }
                  } else {
!                     submit(scheduler, runContinuation);
                  }
                  done = true;
              } catch (RejectedExecutionException ree) {
                  submitFailed(ree);
                  throw ree;
--- 446,16 ---
                  // it avoids deadlock that could arise due to carriers and virtual
                  // threads contending for a lock.
                  if (currentThread().isVirtual()) {
                      Continuation.pin();
                      try {
!                         scheduler.onContinue(runContinuation);
                      } finally {
                          Continuation.unpin();
                      }
                  } else {
!                     scheduler.onContinue(runContinuation);
                  }
                  done = true;
              } catch (RejectedExecutionException ree) {
                  submitFailed(ree);
                  throw ree;

*** 366,52 ***
                  }
              }
          }
      }
  
-     /**
-      * Submits the runContinuation task to the given scheduler as an external submit.
-      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
-      * @throws RejectedExecutionException
-      * @see ForkJoinPool#externalSubmit(ForkJoinTask)
-      */
-     private void externalSubmitRunContinuation(ForkJoinPool pool) {
-         assert Thread.currentThread() instanceof CarrierThread;
-         try {
-             pool.externalSubmit(ForkJoinTask.adapt(runContinuation));
-         } catch (RejectedExecutionException ree) {
-             submitFailed(ree);
-             throw ree;
-         } catch (OutOfMemoryError e) {
-             submitRunContinuation(pool, true);
-         }
-     }
- 
      /**
       * Submits the runContinuation task to the scheduler. For the default scheduler,
       * and calling it on a worker thread, the task will be pushed to the local queue,
       * otherwise it will be pushed to an external submission queue.
       * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
       * @throws RejectedExecutionException
       */
      private void submitRunContinuation() {
!         submitRunContinuation(scheduler, true);
      }
  
      /**
!      * Lazy submit the runContinuation task if invoked on a carrier thread and its local
!      * queue is empty. If not empty, or invoked by another thread, then this method works
!      * like submitRunContinuation and just submits the task to the scheduler.
       * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
       * @throws RejectedExecutionException
       * @see ForkJoinPool#lazySubmit(ForkJoinTask)
       */
      private void lazySubmitRunContinuation() {
          if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
-             ForkJoinPool pool = ct.getPool();
              try {
!                 pool.lazySubmit(ForkJoinTask.adapt(runContinuation));
              } catch (RejectedExecutionException ree) {
                  submitFailed(ree);
                  throw ree;
              } catch (OutOfMemoryError e) {
                  submitRunContinuation();
--- 467,34 ---
                  }
              }
          }
      }
  
      /**
       * Submits the runContinuation task to the scheduler. For the default scheduler,
       * and calling it on a worker thread, the task will be pushed to the local queue,
       * otherwise it will be pushed to an external submission queue.
       * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
       * @throws RejectedExecutionException
       */
      private void submitRunContinuation() {
!         submitRunContinuation(true);
      }
  
      /**
!      * Invoked from a carrier thread to lazy submit the runContinuation task to the
!      * carrier's local queue if the queue is empty. If not empty, or invoked by a thread
!      * for a custom scheduler, then it just submits the task to the scheduler.
       * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
       * @throws RejectedExecutionException
       * @see ForkJoinPool#lazySubmit(ForkJoinTask)
       */
      private void lazySubmitRunContinuation() {
+         assert !currentThread().isVirtual();
          if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
              try {
!                 ct.getPool().lazySubmit(ForkJoinTask.adapt(runContinuation));
              } catch (RejectedExecutionException ree) {
                  submitFailed(ree);
                  throw ree;
              } catch (OutOfMemoryError e) {
                  submitRunContinuation();

*** 420,26 ***
              submitRunContinuation();
          }
      }
  
      /**
!      * Submits the runContinuation task to the scheduler. For the default scheduler, and
!      * calling it a virtual thread that uses the default scheduler, the task will be
!      * pushed to an external submission queue. This method may throw OutOfMemoryError.
       * @throws RejectedExecutionException
!      * @throws OutOfMemoryError
       */
!     private void externalSubmitRunContinuationOrThrow() {
!         if (scheduler == DEFAULT_SCHEDULER && currentCarrierThread() instanceof CarrierThread ct) {
              try {
                  ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation));
              } catch (RejectedExecutionException ree) {
                  submitFailed(ree);
                  throw ree;
              }
          } else {
!             submitRunContinuation(scheduler, false);
          }
      }
  
      /**
       * If enabled, emits a JFR VirtualThreadSubmitFailedEvent.
--- 503,66 ---
              submitRunContinuation();
          }
      }
  
      /**
!      * Invoked from a carrier thread to externally submit the runContinuation task to the
!      * scheduler. If invoked by a thread for a custom scheduler, then it just submits the
!      * task to the scheduler.
+      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
       * @throws RejectedExecutionException
!      * @see ForkJoinPool#externalSubmit(ForkJoinTask)
       */
!     private void externalSubmitRunContinuation() {
!         assert !currentThread().isVirtual();
+         if (currentThread() instanceof CarrierThread ct) {
              try {
                  ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation));
              } catch (RejectedExecutionException ree) {
                  submitFailed(ree);
                  throw ree;
+             } catch (OutOfMemoryError e) {
+                 submitRunContinuation();
              }
          } else {
!             submitRunContinuation();
+         }
+     }
+ 
+     /**
+      * Invoked from Thread.start to externally submit the runContinuation task to the
+      * scheduler. If this virtual thread is scheduled by the built-in scheduler,
+      * and this method is called from a virtual thread scheduled by the built-in
+      * scheduler, then it uses externalSubmit to ensure that the task is pushed to an
+      * external submission queue rather than the local queue.
+      * @throws RejectedExecutionException
+      * @throws OutOfMemoryError
+      * @see ForkJoinPool#externalSubmit(ForkJoinTask)
+      */
+     private void externalSubmitRunContinuationOrThrow() {
+         try {
+             if (currentThread().isVirtual()) {
+                 // Pin the continuation to prevent the virtual thread from unmounting
+                 // when submitting a task. This avoids deadlock that could arise due to
+                 // carriers and virtual threads contending for a lock.
+                 Continuation.pin();
+                 try {
+                     if (scheduler == BUILTIN_SCHEDULER
+                             && currentCarrierThread() instanceof CarrierThread ct) {
+                         ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation));
+                     } else {
+                         scheduler.onStart(runContinuation);
+                     }
+                 } finally {
+                     Continuation.unpin();
+                 }
+             } else {
+                 scheduler.onStart(runContinuation);
+             }
+         } catch (RejectedExecutionException ree) {
+             submitFailed(ree);
+             throw ree;
          }
      }
  
      /**
       * If enabled, emits a JFR VirtualThreadSubmitFailedEvent.

*** 489,11 ***
       * return, the current thread is the virtual thread.
       */
      @ChangesCurrentThread
      @ReservedStackAccess
      private void mount() {
!         startTransition(/*is_mount*/true);
          // We assume following volatile accesses provide equivalent
          // of acquire ordering, otherwise we need U.loadFence() here.
  
          // sets the carrier thread
          Thread carrier = Thread.currentCarrierThread();
--- 612,11 ---
       * return, the current thread is the virtual thread.
       */
      @ChangesCurrentThread
      @ReservedStackAccess
      private void mount() {
!         startTransition(/*mount*/true);
          // We assume following volatile accesses provide equivalent
          // of acquire ordering, otherwise we need U.loadFence() here.
  
          // sets the carrier thread
          Thread carrier = Thread.currentCarrierThread();

*** 534,24 ***
          }
          carrier.clearInterrupt();
  
          // We assume previous volatile accesses provide equivalent
          // of release ordering, otherwise we need U.storeFence() here.
!         endTransition(/*is_mount*/false);
      }
  
      /**
       * Invokes Continuation.yield, notifying JVMTI (if enabled) to hide frames until
       * the continuation continues.
       */
      @Hidden
      private boolean yieldContinuation() {
!         startTransition(/*is_mount*/false);
          try {
              return Continuation.yield(VTHREAD_SCOPE);
          } finally {
!             endTransition(/*is_mount*/true);
          }
      }
  
      /**
       * Invoked in the context of the carrier thread after the Continuation yields when
--- 657,24 ---
          }
          carrier.clearInterrupt();
  
          // We assume previous volatile accesses provide equivalent
          // of release ordering, otherwise we need U.storeFence() here.
!         endTransition(/*mount*/false);
      }
  
      /**
       * Invokes Continuation.yield, notifying JVMTI (if enabled) to hide frames until
       * the continuation continues.
       */
      @Hidden
      private boolean yieldContinuation() {
!         startTransition(/*mount*/false);
          try {
              return Continuation.yield(VTHREAD_SCOPE);
          } finally {
!             endTransition(/*mount*/true);
          }
      }
  
      /**
       * Invoked in the context of the carrier thread after the Continuation yields when

*** 592,11 ***
          if (s == YIELDING) {
              setState(YIELDED);
  
              // external submit if there are no tasks in the local task queue
              if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
!                 externalSubmitRunContinuation(ct.getPool());
              } else {
                  submitRunContinuation();
              }
              return;
          }
--- 715,11 ---
          if (s == YIELDING) {
              setState(YIELDED);
  
              // external submit if there are no tasks in the local task queue
              if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
!                 externalSubmitRunContinuation();
              } else {
                  submitRunContinuation();
              }
              return;
          }

*** 749,18 ***
          if (getAndSetParkPermit(false) || interrupted)
              return;
  
          // park the thread
          boolean yielded = false;
          setState(PARKING);
          try {
              yielded = yieldContinuation();
          } catch (OutOfMemoryError e) {
              // park on carrier
          } finally {
              assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
!             if (!yielded) {
                  assert state() == PARKING;
                  setState(RUNNING);
              }
          }
  
--- 872,21 ---
          if (getAndSetParkPermit(false) || interrupted)
              return;
  
          // park the thread
          boolean yielded = false;
+         long eventStartTime = VirtualThreadParkEvent.eventStartTime();
          setState(PARKING);
          try {
              yielded = yieldContinuation();
          } catch (OutOfMemoryError e) {
              // park on carrier
          } finally {
              assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
!             if (yielded) {
+                 VirtualThreadParkEvent.offer(eventStartTime, Long.MIN_VALUE);
+             } else {
                  assert state() == PARKING;
                  setState(RUNNING);
              }
          }
  

*** 790,19 ***
          if (nanos > 0) {
              long startTime = System.nanoTime();
  
              // park the thread, afterYield will schedule the thread to unpark
              boolean yielded = false;
              timeout = nanos;
              setState(TIMED_PARKING);
              try {
                  yielded = yieldContinuation();
              } catch (OutOfMemoryError e) {
                  // park on carrier
              } finally {
                  assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
!                 if (!yielded) {
                      assert state() == TIMED_PARKING;
                      setState(RUNNING);
                  }
              }
  
--- 916,22 ---
          if (nanos > 0) {
              long startTime = System.nanoTime();
  
              // park the thread, afterYield will schedule the thread to unpark
              boolean yielded = false;
+             long eventStartTime = VirtualThreadParkEvent.eventStartTime();
              timeout = nanos;
              setState(TIMED_PARKING);
              try {
                  yielded = yieldContinuation();
              } catch (OutOfMemoryError e) {
                  // park on carrier
              } finally {
                  assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
!                 if (yielded) {
+                     VirtualThreadParkEvent.offer(eventStartTime, nanos);
+                 } else {
                      assert state() == TIMED_PARKING;
                      setState(RUNNING);
                  }
              }
  

*** 902,14 ***
      /**
       * Invoked by FJP worker thread or STPE thread when park timeout expires.
       */
      private void parkTimeoutExpired() {
          assert !VirtualThread.currentThread().isVirtual();
!         if (!getAndSetParkPermit(true)
!                 && (state() == TIMED_PARKED)
!                 && compareAndSetState(TIMED_PARKED, UNPARKED)) {
!             lazySubmitRunContinuation();
          }
      }
  
      /**
       * Invoked by FJP worker thread or STPE thread when wait timeout expires.
--- 1031,15 ---
      /**
       * Invoked by FJP worker thread or STPE thread when park timeout expires.
       */
      private void parkTimeoutExpired() {
          assert !VirtualThread.currentThread().isVirtual();
!         if (!getAndSetParkPermit(true)) {
!             int s = state();
!             if ((s == PARKED || s == TIMED_PARKED) && compareAndSetState(s, UNPARKED)) {
!                 lazySubmitRunContinuation();
+             }
          }
      }
  
      /**
       * Invoked by FJP worker thread or STPE thread when wait timeout expires.

*** 917,32 ***
       * and submit its task so that it continues and attempts to reenter the monitor.
       * This method does nothing if the thread has been woken by notify or interrupt.
       */
      private void waitTimeoutExpired(byte seqNo) {
          assert !Thread.currentThread().isVirtual();
!         for (;;) {
!             boolean unblocked = false;
!             synchronized (timedWaitLock()) {
!                 if (seqNo != timedWaitSeqNo) {
!                     // this timeout task is for a past timed-wait
-                     return;
-                 }
-                 int s = state();
-                 if (s == TIMED_WAIT) {
-                     unblocked = compareAndSetState(TIMED_WAIT, UNBLOCKED);
-                 } else if (s != (TIMED_WAIT | SUSPENDED)) {
-                     // notified or interrupted, no longer waiting
-                     return;
-                 }
              }
!             if (unblocked) {
!                 lazySubmitRunContinuation();
                  return;
              }
-             // need to retry when thread is suspended in time-wait
-             Thread.yield();
          }
      }
  
      /**
       * Attempts to yield the current virtual thread (Thread.yield).
       */
--- 1047,23 ---
       * and submit its task so that it continues and attempts to reenter the monitor.
       * This method does nothing if the thread has been woken by notify or interrupt.
       */
      private void waitTimeoutExpired(byte seqNo) {
          assert !Thread.currentThread().isVirtual();
! 
!         synchronized (timedWaitLock()) {
!             if (seqNo != timedWaitSeqNo) {
!                 // this timeout task is for a past timed-wait
!                 return;
              }
!             if (!compareAndSetState(TIMED_WAIT, UNBLOCKED)) {
!                 // already unblocked
                  return;
              }
          }
+ 
+         lazySubmitRunContinuation();
      }
  
      /**
       * Attempts to yield the current virtual thread (Thread.yield).
       */

*** 1106,12 ***
          return oldValue;
      }
  
      @Override
      Thread.State threadState() {
!         int s = state();
-         switch (s & ~SUSPENDED) {
              case NEW:
                  return Thread.State.NEW;
              case STARTED:
                  // return NEW if thread container not yet set
                  if (threadContainer() == null) {
--- 1227,11 ---
          return oldValue;
      }
  
      @Override
      Thread.State threadState() {
!         switch (state()) {
              case NEW:
                  return Thread.State.NEW;
              case STARTED:
                  // return NEW if thread container not yet set
                  if (threadContainer() == null) {

*** 1175,89 ***
      @Override
      boolean isTerminated() {
          return (state == TERMINATED);
      }
  
-     @Override
-     StackTraceElement[] asyncGetStackTrace() {
-         StackTraceElement[] stackTrace;
-         do {
-             stackTrace = (carrierThread != null)
-                     ? super.asyncGetStackTrace()  // mounted
-                     : tryGetStackTrace();         // unmounted
-             if (stackTrace == null) {
-                 Thread.yield();
-             }
-         } while (stackTrace == null);
-         return stackTrace;
-     }
- 
-     /**
-      * Returns the stack trace for this virtual thread if it is unmounted.
-      * Returns null if the thread is mounted or in transition.
-      */
-     private StackTraceElement[] tryGetStackTrace() {
-         int initialState = state() & ~SUSPENDED;
-         switch (initialState) {
-             case NEW, STARTED, TERMINATED -> {
-                 return new StackTraceElement[0];  // unmounted, empty stack
-             }
-             case RUNNING, PINNED, TIMED_PINNED -> {
-                 return null;   // mounted
-             }
-             case PARKED, TIMED_PARKED, BLOCKED, WAIT, TIMED_WAIT -> {
-                 // unmounted, not runnable
-             }
-             case UNPARKED, UNBLOCKED, YIELDED -> {
-                 // unmounted, runnable
-             }
-             case PARKING, TIMED_PARKING, BLOCKING, YIELDING, WAITING, TIMED_WAITING -> {
-                 return null;  // in transition
-             }
-             default -> throw new InternalError("" + initialState);
-         }
- 
-         // thread is unmounted, prevent it from continuing
-         int suspendedState = initialState | SUSPENDED;
-         if (!compareAndSetState(initialState, suspendedState)) {
-             return null;
-         }
- 
-         // get stack trace and restore state
-         StackTraceElement[] stack;
-         try {
-             stack = cont.getStackTrace();
-         } finally {
-             assert state == suspendedState;
-             setState(initialState);
-         }
-         boolean resubmit = switch (initialState) {
-             case UNPARKED, UNBLOCKED, YIELDED -> {
-                 // resubmit as task may have run while suspended
-                 yield true;
-             }
-             case PARKED, TIMED_PARKED -> {
-                 // resubmit if unparked while suspended
-                 yield parkPermit && compareAndSetState(initialState, UNPARKED);
-             }
-             case BLOCKED -> {
-                 // resubmit if unblocked while suspended
-                 yield blockPermit && compareAndSetState(BLOCKED, UNBLOCKED);
-             }
-             case WAIT, TIMED_WAIT -> {
-                 // resubmit if notified or interrupted while waiting (Object.wait)
-                 // waitTimeoutExpired will retry if the timed expired when suspended
-                 yield (notified || interrupted) && compareAndSetState(initialState, UNBLOCKED);
-             }
-             default -> throw new InternalError();
-         };
-         if (resubmit) {
-             submitRunContinuation();
-         }
-         return stack;
-     }
- 
      @Override
      public String toString() {
          StringBuilder sb = new StringBuilder("VirtualThread[#");
          sb.append(threadId());
          String name = getName();
--- 1295,10 ---

*** 1424,32 ***
      @JvmtiMountTransition
      private native void startFinalTransition();
  
      @IntrinsicCandidate
      @JvmtiMountTransition
!     private native void startTransition(boolean is_mount);
  
      @IntrinsicCandidate
      @JvmtiMountTransition
!     private native void endTransition(boolean is_mount);
  
      @IntrinsicCandidate
      private static native void notifyJvmtiDisableSuspend(boolean enter);
  
      private static native void registerNatives();
      static {
          registerNatives();
  
          // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
          var group = Thread.virtualThreadGroup();
      }
  
      /**
!      * Creates the default ForkJoinPool scheduler.
       */
!     private static ForkJoinPool createDefaultScheduler() {
-         ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool);
          int parallelism, maxPoolSize, minRunnable;
          String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
          String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
          String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
          if (parallelismValue != null) {
--- 1465,66 ---
      @JvmtiMountTransition
      private native void startFinalTransition();
  
      @IntrinsicCandidate
      @JvmtiMountTransition
!     private native void startTransition(boolean mount);
  
      @IntrinsicCandidate
      @JvmtiMountTransition
!     private native void endTransition(boolean mount);
  
      @IntrinsicCandidate
      private static native void notifyJvmtiDisableSuspend(boolean enter);
  
      private static native void registerNatives();
      static {
          registerNatives();
  
          // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
          var group = Thread.virtualThreadGroup();
+ 
+         // ensure event class is initialized
+         try {
+             MethodHandles.lookup().ensureInitialized(VirtualThreadParkEvent.class);
+         } catch (IllegalAccessException e) {
+             throw new ExceptionInInitializerError(e);
+         }
+     }
+ 
+     /**
+      * Loads a VirtualThreadScheduler with the given class name. The class must be public
+      * in an exported package, with public one-arg or no-arg constructor, and be visible
+      * to the system class loader.
+      * @param delegate the scheduler that the custom scheduler may delegate to
+      * @param cn the class name of the custom scheduler
+      */
+     private static VirtualThreadScheduler loadCustomScheduler(VirtualThreadScheduler delegate, String cn) {
+         VirtualThreadScheduler scheduler;
+         try {
+             Class<?> clazz = Class.forName(cn, true, ClassLoader.getSystemClassLoader());
+             // 1-arg constructor
+             try {
+                 Constructor<?> ctor = clazz.getConstructor(VirtualThreadScheduler.class);
+                 return (VirtualThreadScheduler) ctor.newInstance(delegate);
+             } catch (NoSuchMethodException e) {
+                 // 0-arg constructor
+                 Constructor<?> ctor = clazz.getConstructor();
+                 scheduler = (VirtualThreadScheduler) ctor.newInstance();
+             }
+         } catch (Exception ex) {
+             throw new Error(ex);
+         }
+         System.err.println("WARNING: Using custom default scheduler, this is an experimental feature!");
+         return scheduler;
      }
  
      /**
!      * Creates the built-in ForkJoinPool scheduler.
+      * @param wrapped true if wrapped by a custom default scheduler
       */
!     private static BuiltinScheduler createBuiltinScheduler(boolean wrapped) {
          int parallelism, maxPoolSize, minRunnable;
          String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
          String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
          String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
          if (parallelismValue != null) {

*** 1466,22 ***
          if (minRunnableValue != null) {
              minRunnable = Integer.parseInt(minRunnableValue);
          } else {
              minRunnable = Integer.max(parallelism / 2, 1);
          }
!         Thread.UncaughtExceptionHandler handler = (t, e) -> { };
!         boolean asyncMode = true; // FIFO
!         return new ForkJoinPool(parallelism, factory, handler, asyncMode,
!                      0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);
      }
  
      /**
       * Schedule a runnable task to run after a delay.
       */
      private Future<?> schedule(Runnable command, long delay, TimeUnit unit) {
!         if (scheduler instanceof ForkJoinPool pool) {
!             return pool.schedule(command, delay, unit);
          } else {
              return DelayedTaskSchedulers.schedule(command, delay, unit);
          }
      }
  
--- 1541,81 ---
          if (minRunnableValue != null) {
              minRunnable = Integer.parseInt(minRunnableValue);
          } else {
              minRunnable = Integer.max(parallelism / 2, 1);
          }
!         return new BuiltinScheduler(parallelism, maxPoolSize, minRunnable, wrapped);
!     }
! 
!     /**
+      * The built-in ForkJoinPool scheduler.
+      */
+     private static class BuiltinScheduler
+             extends ForkJoinPool implements VirtualThreadScheduler {
+ 
+         BuiltinScheduler(int parallelism, int maxPoolSize, int minRunnable, boolean wrapped) {
+             ForkJoinWorkerThreadFactory factory = wrapped
+                     ? ForkJoinPool.defaultForkJoinWorkerThreadFactory
+                     : CarrierThread::new;
+             Thread.UncaughtExceptionHandler handler = (t, e) -> { };
+             boolean asyncMode = true; // FIFO
+             super(parallelism, factory, handler, asyncMode,
+                     0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);
+         }
+ 
+         private void adaptAndExecute(Runnable task) {
+             execute(ForkJoinTask.adapt(task));
+         }
+ 
+         @Override
+         public void onStart(VirtualThreadTask task) {
+             adaptAndExecute(task);
+         }
+ 
+         @Override
+         public void onContinue(VirtualThreadTask task) {
+             adaptAndExecute(task);
+         }
+ 
+         /**
+          * Wraps the scheduler to avoid leaking a direct reference with
+          * {@link VirtualThreadScheduler#current()}.
+          */
+         VirtualThreadScheduler createExternalView() {
+             BuiltinScheduler builtin = this;
+             return new VirtualThreadScheduler() {
+                 private void execute(VirtualThreadTask task) {
+                     var vthread = (VirtualThread) task.thread();
+                     VirtualThreadScheduler scheduler = vthread.scheduler;
+                     if (scheduler == this || scheduler == DEFAULT_SCHEDULER) {
+                         builtin.adaptAndExecute(task);
+                     } else {
+                         throw new IllegalArgumentException();
+                     }
+                 }
+                 @Override
+                 public void onStart(VirtualThreadTask task) {
+                     execute(task);
+                 }
+                 @Override
+                 public void onContinue(VirtualThreadTask task) {
+                     execute(task);
+                 }
+                 @Override
+                 public String toString() {
+                     return builtin.toString();
+                 }
+             };
+         }
      }
  
      /**
       * Schedule a runnable task to run after a delay.
       */
      private Future<?> schedule(Runnable command, long delay, TimeUnit unit) {
!         if (scheduler == BUILTIN_SCHEDULER) {
!             return BUILTIN_SCHEDULER.schedule(command, delay, unit);
          } else {
              return DelayedTaskSchedulers.schedule(command, delay, unit);
          }
      }
  
< prev index next >