< prev index next >

src/java.base/share/classes/java/lang/VirtualThread.java

Print this page
*** 22,23 ***
   * or visit www.oracle.com if you need additional information or have any
   * questions.
   */
  package java.lang;
  
  import java.util.Locale;
  import java.util.Objects;
  import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.Executor;
  import java.util.concurrent.Executors;
  import java.util.concurrent.ForkJoinPool;
- import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
  import java.util.concurrent.ForkJoinTask;
  import java.util.concurrent.Future;
  import java.util.concurrent.RejectedExecutionException;
  import java.util.concurrent.ScheduledExecutorService;
  import java.util.concurrent.ScheduledThreadPoolExecutor;
  import java.util.concurrent.TimeUnit;
  import jdk.internal.event.VirtualThreadEndEvent;
  import jdk.internal.event.VirtualThreadStartEvent;
  import jdk.internal.event.VirtualThreadSubmitFailedEvent;
  import jdk.internal.misc.CarrierThread;
  import jdk.internal.misc.InnocuousThread;
--- 22,23 ---
   * or visit www.oracle.com if you need additional information or have any
   * questions.
   */
  package java.lang;
  
+ import java.lang.reflect.Constructor;
  import java.util.Locale;
  import java.util.Objects;
  import java.util.concurrent.CountDownLatch;
  import java.util.concurrent.Executors;
  import java.util.concurrent.ForkJoinPool;
  import java.util.concurrent.ForkJoinTask;
  import java.util.concurrent.Future;
  import java.util.concurrent.RejectedExecutionException;
  import java.util.concurrent.ScheduledExecutorService;
  import java.util.concurrent.ScheduledThreadPoolExecutor;
  import java.util.concurrent.TimeUnit;
+ import java.util.function.Supplier;
  import jdk.internal.event.VirtualThreadEndEvent;
  import jdk.internal.event.VirtualThreadStartEvent;
  import jdk.internal.event.VirtualThreadSubmitFailedEvent;
  import jdk.internal.misc.CarrierThread;
  import jdk.internal.misc.InnocuousThread;

*** 61,20 ***
   * A thread that is scheduled by the Java virtual machine rather than the operating system.
   */
  final class VirtualThread extends BaseVirtualThread {
      private static final Unsafe U = Unsafe.getUnsafe();
      private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
!     private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
  
      private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
      private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
      private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
      private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
      private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
  
      // scheduler and continuation
!     private final Executor scheduler;
      private final Continuation cont;
      private final Runnable runContinuation;
  
      // virtual thread state, accessed by VM
      private volatile int state;
--- 61,33 ---
   * A thread that is scheduled by the Java virtual machine rather than the operating system.
   */
  final class VirtualThread extends BaseVirtualThread {
      private static final Unsafe U = Unsafe.getUnsafe();
      private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
! 
+     private static final VirtualThreadScheduler DEFAULT_SCHEDULER;
+     private static final boolean IS_CUSTOM_DEFAULT_SCHEDULER;
+     static {
+         // experimental
+         String propValue = System.getProperty("jdk.virtualThreadScheduler.implClass");
+         if (propValue != null) {
+             DEFAULT_SCHEDULER = createCustomDefaultScheduler(propValue);
+             IS_CUSTOM_DEFAULT_SCHEDULER = true;
+         } else {
+             DEFAULT_SCHEDULER = createDefaultForkJoinPoolScheduler();
+             IS_CUSTOM_DEFAULT_SCHEDULER = false;
+         }
+     }
  
      private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
      private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
      private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
      private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
      private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
  
      // scheduler and continuation
!     private final VirtualThreadScheduler scheduler;
      private final Continuation cont;
      private final Runnable runContinuation;
  
      // virtual thread state, accessed by VM
      private volatile int state;

*** 166,10 ***
--- 179,13 ---
      private volatile VirtualThread next;
  
      // notified by Object.notify/notifyAll while waiting in Object.wait
      private volatile boolean notified;
  
+     // true when waiting in Object.wait, false for VM internal uninterruptible Object.wait
+     private volatile boolean interruptableWait;
+ 
      // timed-wait support
      private byte timedWaitSeqNo;
  
      // timeout for timed-park and timed-wait, only accessed on current/carrier thread
      private long timeout;

*** 184,45 ***
      private volatile CountDownLatch termination;
  
      /**
       * Returns the default scheduler.
       */
!     static Executor defaultScheduler() {
          return DEFAULT_SCHEDULER;
      }
  
      /**
       * Returns the continuation scope used for virtual threads.
       */
      static ContinuationScope continuationScope() {
          return VTHREAD_SCOPE;
      }
  
      /**
!      * Creates a new {@code VirtualThread} to run the given task with the given
!      * scheduler. If the given scheduler is {@code null} and the current thread
!      * is a platform thread then the newly created virtual thread will use the
!      * default scheduler. If given scheduler is {@code null} and the current
!      * thread is a virtual thread then the current thread's scheduler is used.
       *
!      * @param scheduler the scheduler or null
       * @param name thread name
       * @param characteristics characteristics
       * @param task the task to execute
       */
!     VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
          super(name, characteristics, /*bound*/ false);
          Objects.requireNonNull(task);
  
!         // choose scheduler if not specified
          if (scheduler == null) {
!             Thread parent = Thread.currentThread();
-             if (parent instanceof VirtualThread vparent) {
-                 scheduler = vparent.scheduler;
-             } else {
-                 scheduler = DEFAULT_SCHEDULER;
-             }
          }
  
          this.scheduler = scheduler;
          this.cont = new VThreadContinuation(this, task);
          this.runContinuation = this::runContinuation;
--- 200,58 ---
      private volatile CountDownLatch termination;
  
      /**
       * Returns the default scheduler.
       */
!     static VirtualThreadScheduler defaultScheduler() {
          return DEFAULT_SCHEDULER;
      }
  
+     /**
+      * Returns true if using a custom default scheduler.
+      */
+     static boolean isCustomDefaultScheduler() {
+         return IS_CUSTOM_DEFAULT_SCHEDULER;
+     }
+ 
      /**
       * Returns the continuation scope used for virtual threads.
       */
      static ContinuationScope continuationScope() {
          return VTHREAD_SCOPE;
      }
  
      /**
!      * Return the scheduler for this thread.
!      * @param revealBuiltin true to reveal the built-in default scheduler, false to hide
!      */
!     VirtualThreadScheduler scheduler(boolean revealBuiltin) {
!         if (scheduler instanceof BuiltinDefaultScheduler builtin && !revealBuiltin) {
+             return builtin.externalView();
+         } else {
+             return scheduler;
+         }
+     }
+ 
+     /**
+      * Creates a new {@code VirtualThread} to run the given task with the given scheduler.
       *
!      * @param scheduler the scheduler or null for default scheduler
       * @param name thread name
       * @param characteristics characteristics
       * @param task the task to execute
       */
!     VirtualThread(VirtualThreadScheduler scheduler,
+                   String name,
+                   int characteristics,
+                   Runnable task) {
          super(name, characteristics, /*bound*/ false);
          Objects.requireNonNull(task);
  
!         // use default scheduler if not provided
          if (scheduler == null) {
!             scheduler = DEFAULT_SCHEDULER;
          }
  
          this.scheduler = scheduler;
          this.cont = new VThreadContinuation(this, task);
          this.runContinuation = this::runContinuation;

*** 314,15 ***
  
      /**
       * Submits the runContinuation task to the scheduler. For the default scheduler,
       * and calling it on a worker thread, the task will be pushed to the local queue,
       * otherwise it will be pushed to an external submission queue.
-      * @param scheduler the scheduler
       * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown
       * @throws RejectedExecutionException
       */
!     private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) {
          boolean done = false;
          while (!done) {
              try {
                  // Pin the continuation to prevent the virtual thread from unmounting
                  // when submitting a task. For the default scheduler this ensures that
--- 343,14 ---
  
      /**
       * Submits the runContinuation task to the scheduler. For the default scheduler,
       * and calling it on a worker thread, the task will be pushed to the local queue,
       * otherwise it will be pushed to an external submission queue.
       * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown
       * @throws RejectedExecutionException
       */
!     private void submitRunContinuation(boolean retryOnOOME) {
          boolean done = false;
          while (!done) {
              try {
                  // Pin the continuation to prevent the virtual thread from unmounting
                  // when submitting a task. For the default scheduler this ensures that

*** 330,16 ***
                  // it avoids deadlock that could arise due to carriers and virtual
                  // threads contending for a lock.
                  if (currentThread().isVirtual()) {
                      Continuation.pin();
                      try {
!                         scheduler.execute(runContinuation);
                      } finally {
                          Continuation.unpin();
                      }
                  } else {
!                     scheduler.execute(runContinuation);
                  }
                  done = true;
              } catch (RejectedExecutionException ree) {
                  submitFailed(ree);
                  throw ree;
--- 358,16 ---
                  // it avoids deadlock that could arise due to carriers and virtual
                  // threads contending for a lock.
                  if (currentThread().isVirtual()) {
                      Continuation.pin();
                      try {
!                         scheduler.execute(this, runContinuation);
                      } finally {
                          Continuation.unpin();
                      }
                  } else {
!                     scheduler.execute(this, runContinuation);
                  }
                  done = true;
              } catch (RejectedExecutionException ree) {
                  submitFailed(ree);
                  throw ree;

*** 351,37 ***
                  }
              }
          }
      }
  
-     /**
-      * Submits the runContinuation task to the given scheduler as an external submit.
-      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
-      * @throws RejectedExecutionException
-      * @see ForkJoinPool#externalSubmit(ForkJoinTask)
-      */
-     private void externalSubmitRunContinuation(ForkJoinPool pool) {
-         assert Thread.currentThread() instanceof CarrierThread;
-         try {
-             pool.externalSubmit(ForkJoinTask.adapt(runContinuation));
-         } catch (RejectedExecutionException ree) {
-             submitFailed(ree);
-             throw ree;
-         } catch (OutOfMemoryError e) {
-             submitRunContinuation(pool, true);
-         }
-     }
- 
      /**
       * Submits the runContinuation task to the scheduler. For the default scheduler,
       * and calling it on a worker thread, the task will be pushed to the local queue,
       * otherwise it will be pushed to an external submission queue.
       * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
       * @throws RejectedExecutionException
       */
      private void submitRunContinuation() {
!         submitRunContinuation(scheduler, true);
      }
  
      /**
       * Lazy submit the runContinuation task if invoked on a carrier thread and its local
       * queue is empty. If not empty, or invoked by another thread, then this method works
--- 379,19 ---
                  }
              }
          }
      }
  
      /**
       * Submits the runContinuation task to the scheduler. For the default scheduler,
       * and calling it on a worker thread, the task will be pushed to the local queue,
       * otherwise it will be pushed to an external submission queue.
       * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
       * @throws RejectedExecutionException
       */
      private void submitRunContinuation() {
!         submitRunContinuation(true);
      }
  
      /**
       * Lazy submit the runContinuation task if invoked on a carrier thread and its local
       * queue is empty. If not empty, or invoked by another thread, then this method works

*** 389,14 ***
       * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
       * @throws RejectedExecutionException
       * @see ForkJoinPool#lazySubmit(ForkJoinTask)
       */
      private void lazySubmitRunContinuation() {
!         if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
!             ForkJoinPool pool = ct.getPool();
              try {
!                 pool.lazySubmit(ForkJoinTask.adapt(runContinuation));
              } catch (RejectedExecutionException ree) {
                  submitFailed(ree);
                  throw ree;
              } catch (OutOfMemoryError e) {
                  submitRunContinuation();
--- 399,36 ---
       * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
       * @throws RejectedExecutionException
       * @see ForkJoinPool#lazySubmit(ForkJoinTask)
       */
      private void lazySubmitRunContinuation() {
!         if (scheduler == DEFAULT_SCHEDULER
!                 && currentCarrierThread() instanceof CarrierThread ct
+                 && ct.getQueuedTaskCount() == 0) {
              try {
!                 ct.getPool().lazySubmit(ForkJoinTask.adapt(runContinuation));
+             } catch (RejectedExecutionException ree) {
+                 submitFailed(ree);
+                 throw ree;
+             } catch (OutOfMemoryError e) {
+                 submitRunContinuation();
+             }
+         } else {
+             submitRunContinuation();
+         }
+     }
+ 
+     /**
+      * Submits the runContinuation task to the scheduler. For the default scheduler, and
+      * calling it a virtual thread that uses the default scheduler, the task will be
+      * pushed to an external submission queue.
+      * @throws RejectedExecutionException
+      */
+     private void externalSubmitRunContinuation() {
+         if (scheduler == DEFAULT_SCHEDULER && currentCarrierThread() instanceof CarrierThread ct) {
+             try {
+                 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation));
              } catch (RejectedExecutionException ree) {
                  submitFailed(ree);
                  throw ree;
              } catch (OutOfMemoryError e) {
                  submitRunContinuation();

*** 420,11 ***
              } catch (RejectedExecutionException ree) {
                  submitFailed(ree);
                  throw ree;
              }
          } else {
!             submitRunContinuation(scheduler, false);
          }
      }
  
      /**
       * If enabled, emits a JFR VirtualThreadSubmitFailedEvent.
--- 452,11 ---
              } catch (RejectedExecutionException ree) {
                  submitFailed(ree);
                  throw ree;
              }
          } else {
!             submitRunContinuation(false);
          }
      }
  
      /**
       * If enabled, emits a JFR VirtualThreadSubmitFailedEvent.

*** 575,11 ***
          if (s == YIELDING) {
              setState(YIELDED);
  
              // external submit if there are no tasks in the local task queue
              if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
!                 externalSubmitRunContinuation(ct.getPool());
              } else {
                  submitRunContinuation();
              }
              return;
          }
--- 607,11 ---
          if (s == YIELDING) {
              setState(YIELDED);
  
              // external submit if there are no tasks in the local task queue
              if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
!                 externalSubmitRunContinuation();
              } else {
                  submitRunContinuation();
              }
              return;
          }

*** 597,10 ***
--- 629,11 ---
          }
  
          // Object.wait
          if (s == WAITING || s == TIMED_WAITING) {
              int newState;
+             boolean interruptable = interruptableWait;
              if (s == WAITING) {
                  setState(newState = WAIT);
              } else {
                  // For timed-wait, a timeout task is scheduled to execute. The timeout
                  // task will change the thread state to UNBLOCKED and submit the thread

*** 626,11 ***
                  }
                  return;
              }
  
              // may have been interrupted while in transition to wait state
!             if (interrupted && compareAndSetState(newState, UNBLOCKED)) {
                  submitRunContinuation();
                  return;
              }
              return;
          }
--- 659,11 ---
                  }
                  return;
              }
  
              // may have been interrupted while in transition to wait state
!             if (interruptable && interrupted && compareAndSetState(newState, UNBLOCKED)) {
                  submitRunContinuation();
                  return;
              }
              return;
          }

*** 1162,82 ***
      @Override
      StackTraceElement[] asyncGetStackTrace() {
          StackTraceElement[] stackTrace;
          do {
              stackTrace = (carrierThread != null)
!                     ? super.asyncGetStackTrace()  // mounted
!                     : tryGetStackTrace();         // unmounted
              if (stackTrace == null) {
                  Thread.yield();
              }
          } while (stackTrace == null);
          return stackTrace;
      }
  
      /**
!      * Returns the stack trace for this virtual thread if it is unmounted.
!      * Returns null if the thread is mounted or in transition.
       */
!     private StackTraceElement[] tryGetStackTrace() {
          int initialState = state() & ~SUSPENDED;
          switch (initialState) {
              case NEW, STARTED, TERMINATED -> {
!                 return new StackTraceElement[0];  // unmounted, empty stack
              }
              case RUNNING, PINNED, TIMED_PINNED -> {
!                 return null;   // mounted
              }
              case PARKED, TIMED_PARKED, BLOCKED, WAIT, TIMED_WAIT -> {
                  // unmounted, not runnable
              }
              case UNPARKED, UNBLOCKED, YIELDED -> {
                  // unmounted, runnable
              }
              case PARKING, TIMED_PARKING, BLOCKING, YIELDING, WAITING, TIMED_WAITING -> {
!                 return null;  // in transition
              }
              default -> throw new InternalError("" + initialState);
          }
  
          // thread is unmounted, prevent it from continuing
          int suspendedState = initialState | SUSPENDED;
          if (!compareAndSetState(initialState, suspendedState)) {
              return null;
          }
  
-         // get stack trace and restore state
-         StackTraceElement[] stack;
          try {
!             stack = cont.getStackTrace();
          } finally {
              assert state == suspendedState;
              setState(initialState);
!         }
!         boolean resubmit = switch (initialState) {
!             case UNPARKED, UNBLOCKED, YIELDED -> {
!                 // resubmit as task may have run while suspended
!                 yield true;
!             }
!             case PARKED, TIMED_PARKED -> {
!                 // resubmit if unparked while suspended
!                 yield parkPermit && compareAndSetState(initialState, UNPARKED);
!             }
!             case BLOCKED -> {
!                 // resubmit if unblocked while suspended
!                 yield blockPermit && compareAndSetState(BLOCKED, UNBLOCKED);
!             }
!             case WAIT, TIMED_WAIT -> {
!                 // resubmit if notified or interrupted while waiting (Object.wait)
!                 // waitTimeoutExpired will retry if the timed expired when suspended
!                 yield (notified || interrupted) && compareAndSetState(initialState, UNBLOCKED);
              }
-             default -> throw new InternalError();
-         };
-         if (resubmit) {
-             submitRunContinuation();
          }
!         return stack;
      }
  
      @Override
      public String toString() {
          StringBuilder sb = new StringBuilder("VirtualThread[#");
--- 1195,84 ---
      @Override
      StackTraceElement[] asyncGetStackTrace() {
          StackTraceElement[] stackTrace;
          do {
              stackTrace = (carrierThread != null)
!                 ? super.asyncGetStackTrace()                          // mounted
!                 : supplyIfUnmounted(cont::getStackTrace,              // unmounted
+                                     () -> new StackTraceElement[0]);
              if (stackTrace == null) {
                  Thread.yield();
              }
          } while (stackTrace == null);
          return stackTrace;
      }
  
      /**
!      * Invokes a supplier to produce a non-null result if this virtual thread is not mounted.
!      * @param supplier1 invoked if this virtual thread is alive and unmounted
+      * @param supplier2 invoked if this virtual thread is not alive
+      * @return the result; {@code null} if this virtual thread is mounted or in transition
       */
!     <T> T supplyIfUnmounted(Supplier<T> supplier1, Supplier<T> supplier2) {
          int initialState = state() & ~SUSPENDED;
          switch (initialState) {
              case NEW, STARTED, TERMINATED -> {
!                 return supplier2.get();  // terminated or not started
              }
              case RUNNING, PINNED, TIMED_PINNED -> {
!                 return null; // mounted
              }
              case PARKED, TIMED_PARKED, BLOCKED, WAIT, TIMED_WAIT -> {
                  // unmounted, not runnable
              }
              case UNPARKED, UNBLOCKED, YIELDED -> {
                  // unmounted, runnable
              }
              case PARKING, TIMED_PARKING, BLOCKING, YIELDING, WAITING, TIMED_WAITING -> {
!                 return null; // in transition
              }
              default -> throw new InternalError("" + initialState);
          }
  
          // thread is unmounted, prevent it from continuing
          int suspendedState = initialState | SUSPENDED;
          if (!compareAndSetState(initialState, suspendedState)) {
              return null;
          }
  
          try {
!             return supplier1.get();
          } finally {
              assert state == suspendedState;
              setState(initialState);
! 
!             boolean resubmit = switch (initialState) {
!                 case UNPARKED, UNBLOCKED, YIELDED -> {
!                     // resubmit as task may have run while suspended
!                     yield true;
!                 }
!                 case PARKED, TIMED_PARKED -> {
!                     // resubmit if unparked while suspended
!                     yield parkPermit && compareAndSetState(initialState, UNPARKED);
!                 }
!                 case BLOCKED -> {
!                     // resubmit if unblocked while suspended
!                     yield blockPermit && compareAndSetState(BLOCKED, UNBLOCKED);
!                 }
!                 case WAIT, TIMED_WAIT -> {
!                     // resubmit if notified or interrupted while waiting (Object.wait)
!                     // waitTimeoutExpired will retry if the timed expired when suspended
!                     yield (notified || interrupted) && compareAndSetState(initialState, UNBLOCKED);
+                 }
+                 default -> throw new InternalError();
+             };
+             if (resubmit) {
+                 submitRunContinuation();
              }
          }
! 
      }
  
      @Override
      public String toString() {
          StringBuilder sb = new StringBuilder("VirtualThread[#");

*** 1413,14 ***
          // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
          var group = Thread.virtualThreadGroup();
      }
  
      /**
!      * Creates the default ForkJoinPool scheduler.
       */
!     private static ForkJoinPool createDefaultScheduler() {
!         ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool);
          int parallelism, maxPoolSize, minRunnable;
          String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
          String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
          String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
          if (parallelismValue != null) {
--- 1448,40 ---
          // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
          var group = Thread.virtualThreadGroup();
      }
  
      /**
!      * Loads a VirtualThreadScheduler with the given class name to use at the
+      * default scheduler. The class is public in an exported package, has a public
+      * one-arg or no-arg constructor, and is visible to the system class loader.
       */
!     private static VirtualThreadScheduler createCustomDefaultScheduler(String cn) {
!         try {
+             Class<?> clazz = Class.forName(cn, true, ClassLoader.getSystemClassLoader());
+             VirtualThreadScheduler scheduler;
+             try {
+                 // 1-arg constructor
+                 Constructor<?> ctor = clazz.getConstructor(VirtualThreadScheduler.class);
+                 var builtin = createDefaultForkJoinPoolScheduler();
+                 scheduler = (VirtualThreadScheduler) ctor.newInstance(builtin.externalView());
+             } catch (NoSuchMethodException e) {
+                 // 0-arg constructor
+                 Constructor<?> ctor = clazz.getConstructor();
+                 scheduler = (VirtualThreadScheduler) ctor.newInstance();
+             }
+             System.err.println("""
+                 WARNING: Using custom default scheduler, this is an experimental feature!""");
+             return scheduler;
+         } catch (Exception ex) {
+             throw new Error(ex);
+         }
+     }
+ 
+     /**
+      * Creates the built-in default ForkJoinPool scheduler.
+      */
+     private static BuiltinDefaultScheduler createDefaultForkJoinPoolScheduler() {
          int parallelism, maxPoolSize, minRunnable;
          String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
          String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
          String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
          if (parallelismValue != null) {

*** 1437,14 ***
          if (minRunnableValue != null) {
              minRunnable = Integer.parseInt(minRunnableValue);
          } else {
              minRunnable = Integer.max(parallelism / 2, 1);
          }
!         Thread.UncaughtExceptionHandler handler = (t, e) -> { };
!         boolean asyncMode = true; // FIFO
!         return new ForkJoinPool(parallelism, factory, handler, asyncMode,
!                      0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);
      }
  
      /**
       * Schedule a runnable task to run after a delay.
       */
--- 1498,58 ---
          if (minRunnableValue != null) {
              minRunnable = Integer.parseInt(minRunnableValue);
          } else {
              minRunnable = Integer.max(parallelism / 2, 1);
          }
!         return new BuiltinDefaultScheduler(parallelism, maxPoolSize, minRunnable);
!     }
! 
!     /**
+      * The built-in default ForkJoinPool scheduler.
+      */
+     private static class BuiltinDefaultScheduler
+             extends ForkJoinPool implements VirtualThreadScheduler {
+ 
+         private static final StableValue<VirtualThreadScheduler> VIEW = StableValue.of();
+ 
+         BuiltinDefaultScheduler(int parallelism, int maxPoolSize, int minRunnable) {
+             ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool);
+             Thread.UncaughtExceptionHandler handler = (t, e) -> { };
+             boolean asyncMode = true; // FIFO
+             super(parallelism, factory, handler, asyncMode,
+                     0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);
+         }
+ 
+         @Override
+         public void execute(Thread vthread, Runnable task) {
+             execute(ForkJoinTask.adapt(task));
+         }
+ 
+         /**
+          * Wraps the scheduler to avoid leaking a direct reference.
+          */
+         VirtualThreadScheduler externalView() {
+             VirtualThreadScheduler builtin = this;
+             return VIEW.orElseSet(() -> {
+                 return new VirtualThreadScheduler() {
+                     @Override
+                     public void execute(Thread thread, Runnable task) {
+                         Objects.requireNonNull(thread);
+                         if (thread instanceof VirtualThread vthread) {
+                             VirtualThreadScheduler scheduler = vthread.scheduler;
+                             if (scheduler == this || scheduler == DEFAULT_SCHEDULER) {
+                                 builtin.execute(thread, task);
+                             } else {
+                                 throw new IllegalArgumentException();
+                             }
+                         } else {
+                             throw new UnsupportedOperationException();
+                         }
+                     }
+                 };
+             });
+         }
      }
  
      /**
       * Schedule a runnable task to run after a delay.
       */

*** 1530,6 ***
          var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
                  VirtualThread::unblockVirtualThreads);
          unblocker.setDaemon(true);
          unblocker.start();
      }
! }
--- 1635,6 ---
          var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
                  VirtualThread::unblockVirtualThreads);
          unblocker.setDaemon(true);
          unblocker.start();
      }
! }
< prev index next >