< prev index next >

src/java.base/share/classes/java/lang/VirtualThread.java

Print this page
*** 22,24 ***
   * or visit www.oracle.com if you need additional information or have any
   * questions.
   */
  package java.lang;
  
  import java.util.Locale;
  import java.util.Objects;
  import java.util.concurrent.CountDownLatch;
  import java.util.concurrent.Executor;
  import java.util.concurrent.Executors;
  import java.util.concurrent.ForkJoinPool;
  import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
  import java.util.concurrent.ForkJoinTask;
- import java.util.concurrent.ForkJoinWorkerThread;
  import java.util.concurrent.Future;
  import java.util.concurrent.RejectedExecutionException;
  import java.util.concurrent.ScheduledExecutorService;
  import java.util.concurrent.ScheduledThreadPoolExecutor;
  import java.util.concurrent.TimeUnit;
  import jdk.internal.event.VirtualThreadEndEvent;
  import jdk.internal.event.VirtualThreadStartEvent;
  import jdk.internal.event.VirtualThreadSubmitFailedEvent;
  import jdk.internal.misc.CarrierThread;
  import jdk.internal.misc.InnocuousThread;
--- 22,26 ---
   * or visit www.oracle.com if you need additional information or have any
   * questions.
   */
  package java.lang;
  
+ import java.lang.reflect.Constructor;
+ import java.util.Arrays;
  import java.util.Locale;
  import java.util.Objects;
  import java.util.concurrent.CountDownLatch;
  import java.util.concurrent.Executor;
  import java.util.concurrent.Executors;
  import java.util.concurrent.ForkJoinPool;
  import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
  import java.util.concurrent.ForkJoinTask;
  import java.util.concurrent.Future;
  import java.util.concurrent.RejectedExecutionException;
  import java.util.concurrent.ScheduledExecutorService;
  import java.util.concurrent.ScheduledThreadPoolExecutor;
  import java.util.concurrent.TimeUnit;
+ import java.util.stream.Stream;
  import jdk.internal.event.VirtualThreadEndEvent;
  import jdk.internal.event.VirtualThreadStartEvent;
  import jdk.internal.event.VirtualThreadSubmitFailedEvent;
  import jdk.internal.misc.CarrierThread;
  import jdk.internal.misc.InnocuousThread;

*** 62,11 ***
   * A thread that is scheduled by the Java virtual machine rather than the operating system.
   */
  final class VirtualThread extends BaseVirtualThread {
      private static final Unsafe U = Unsafe.getUnsafe();
      private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
!     private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
      private static final ScheduledExecutorService[] DELAYED_TASK_SCHEDULERS = createDelayedTaskSchedulers();
  
      private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
      private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
      private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
--- 64,11 ---
   * A thread that is scheduled by the Java virtual machine rather than the operating system.
   */
  final class VirtualThread extends BaseVirtualThread {
      private static final Unsafe U = Unsafe.getUnsafe();
      private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
!     private static final Executor DEFAULT_SCHEDULER = createDefaultScheduler();
      private static final ScheduledExecutorService[] DELAYED_TASK_SCHEDULERS = createDelayedTaskSchedulers();
  
      private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
      private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
      private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");

*** 190,10 ***
--- 192,17 ---
       */
      static Executor defaultScheduler() {
          return DEFAULT_SCHEDULER;
      }
  
+     /**
+      * Returns a stream of the delayed task schedulers used to support timed operations.
+      */
+     static Stream<ScheduledExecutorService> delayedTaskSchedulers() {
+         return Arrays.stream(DELAYED_TASK_SCHEDULERS);
+     }
+ 
      /**
       * Returns the continuation scope used for virtual threads.
       */
      static ContinuationScope continuationScope() {
          return VTHREAD_SCOPE;

*** 1400,14 ***
  
          // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
          var group = Thread.virtualThreadGroup();
      }
  
      /**
       * Creates the default ForkJoinPool scheduler.
       */
!     private static ForkJoinPool createDefaultScheduler() {
          ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool);
          int parallelism, maxPoolSize, minRunnable;
          String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
          String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
          String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
--- 1409,42 ---
  
          // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
          var group = Thread.virtualThreadGroup();
      }
  
+     /**
+      * Creates the default scheduler.
+      * If the system property {@code jdk.virtualThreadScheduler.implClass} is set then
+      * its value is the name of a class that implements java.util.concurrent.Executor.
+      * The class is public in an exported package, has a public no-arg constructor,
+      * and is visible to the system class loader.
+      * If the system property is not set then the default scheduler will be a
+      * ForkJoinPool instance.
+      */
+     private static Executor createDefaultScheduler() {
+         String propValue = System.getProperty("jdk.virtualThreadScheduler.implClass");
+         if (propValue != null) {
+             try {
+                 Class<?> clazz = Class.forName(propValue, true,
+                         ClassLoader.getSystemClassLoader());
+                 Constructor<?> ctor = clazz.getConstructor();
+                 var scheduler = (Executor) ctor.newInstance();
+                 System.err.println("""
+                     WARNING: Using custom scheduler, this is an experimental feature.""");
+                 return scheduler;
+             } catch (Exception ex) {
+                 throw new Error(ex);
+             }
+         } else {
+             return createDefaultForkJoinPoolScheduler();
+         }
+     }
+ 
      /**
       * Creates the default ForkJoinPool scheduler.
       */
!     private static ForkJoinPool createDefaultForkJoinPoolScheduler() {
          ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool);
          int parallelism, maxPoolSize, minRunnable;
          String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
          String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
          String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
< prev index next >