< prev index next >

src/java.base/share/classes/java/lang/VirtualThread.java

Print this page
*** 22,10 ***
--- 22,11 ---
   * or visit www.oracle.com if you need additional information or have any
   * questions.
   */
  package java.lang;
  
+ import java.lang.reflect.Constructor;
  import java.util.Locale;
  import java.util.Objects;
  import java.util.concurrent.CountDownLatch;
  import java.util.concurrent.Executor;
  import java.util.concurrent.Executors;

*** 61,11 ***
   * A thread that is scheduled by the Java virtual machine rather than the operating system.
   */
  final class VirtualThread extends BaseVirtualThread {
      private static final Unsafe U = Unsafe.getUnsafe();
      private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
!     private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
  
      private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
      private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
      private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
      private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
--- 62,11 ---
   * A thread that is scheduled by the Java virtual machine rather than the operating system.
   */
  final class VirtualThread extends BaseVirtualThread {
      private static final Unsafe U = Unsafe.getUnsafe();
      private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
!     private static final Executor DEFAULT_SCHEDULER = createDefaultScheduler();
  
      private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
      private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
      private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
      private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");

*** 166,10 ***
--- 167,13 ---
      private volatile VirtualThread next;
  
      // notified by Object.notify/notifyAll while waiting in Object.wait
      private volatile boolean notified;
  
+     // true when virtual thread is executing Java level Object.wait, false on VM internal Object.wait
+     private volatile boolean interruptableWait;
+ 
      // timed-wait support
      private byte timedWaitSeqNo;
  
      // timeout for timed-park and timed-wait, only accessed on current/carrier thread
      private long timeout;

*** 597,10 ***
--- 601,11 ---
          }
  
          // Object.wait
          if (s == WAITING || s == TIMED_WAITING) {
              int newState;
+             boolean interruptable = interruptableWait;
              if (s == WAITING) {
                  setState(newState = WAIT);
              } else {
                  // For timed-wait, a timeout task is scheduled to execute. The timeout
                  // task will change the thread state to UNBLOCKED and submit the thread

*** 626,11 ***
                  }
                  return;
              }
  
              // may have been interrupted while in transition to wait state
!             if (interrupted && compareAndSetState(newState, UNBLOCKED)) {
                  submitRunContinuation();
                  return;
              }
              return;
          }
--- 631,11 ---
                  }
                  return;
              }
  
              // may have been interrupted while in transition to wait state
!             if (interruptable && interrupted && compareAndSetState(newState, UNBLOCKED)) {
                  submitRunContinuation();
                  return;
              }
              return;
          }

*** 662,11 ***
              termination.countDown();
          }
  
          // notify container
          if (notifyContainer) {
!             threadContainer().onExit(this);
          }
  
          // clear references to thread locals
          clearReferences();
      }
--- 667,11 ---
              termination.countDown();
          }
  
          // notify container
          if (notifyContainer) {
!             threadContainer().remove(this);
          }
  
          // clear references to thread locals
          clearReferences();
      }

*** 690,11 ***
  
          // start thread
          boolean addedToContainer = false;
          boolean started = false;
          try {
!             container.onStart(this);  // may throw
              addedToContainer = true;
  
              // scoped values may be inherited
              inheritScopedValueBindings(container);
  
--- 695,11 ---
  
          // start thread
          boolean addedToContainer = false;
          boolean started = false;
          try {
!             container.add(this);  // may throw
              addedToContainer = true;
  
              // scoped values may be inherited
              inheritScopedValueBindings(container);
  

*** 1348,11 ***
          notifyJvmtiDisableSuspend(false);
      }
  
      // -- wrappers for get/set of state, parking permit, and carrier thread --
  
!     private int state() {
          return state;  // volatile read
      }
  
      private void setState(int newValue) {
          state = newValue;  // volatile write
--- 1353,11 ---
          notifyJvmtiDisableSuspend(false);
      }
  
      // -- wrappers for get/set of state, parking permit, and carrier thread --
  
!     int state() {
          return state;  // volatile read
      }
  
      private void setState(int newValue) {
          state = newValue;  // volatile write

*** 1378,10 ***
--- 1383,14 ---
          } else {
              return newValue;
          }
      }
  
+     Thread carrierThread() {
+         return carrierThread;
+     }
+ 
      private void setCarrierThread(Thread carrier) {
          // U.putReferenceRelease(this, CARRIER_THREAD, carrier);
          this.carrierThread = carrier;
      }
  

*** 1412,14 ***
  
          // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
          var group = Thread.virtualThreadGroup();
      }
  
      /**
       * Creates the default ForkJoinPool scheduler.
       */
!     private static ForkJoinPool createDefaultScheduler() {
          ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool);
          int parallelism, maxPoolSize, minRunnable;
          String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
          String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
          String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
--- 1421,42 ---
  
          // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
          var group = Thread.virtualThreadGroup();
      }
  
+     /**
+      * Creates the default scheduler.
+      * If the system property {@code jdk.virtualThreadScheduler.implClass} is set then
+      * its value is the name of a class that implements java.util.concurrent.Executor.
+      * The class is public in an exported package, has a public no-arg constructor,
+      * and is visible to the system class loader.
+      * If the system property is not set then the default scheduler will be a
+      * ForkJoinPool instance.
+      */
+     private static Executor createDefaultScheduler() {
+         String propValue = System.getProperty("jdk.virtualThreadScheduler.implClass");
+         if (propValue != null) {
+             try {
+                 Class<?> clazz = Class.forName(propValue, true,
+                         ClassLoader.getSystemClassLoader());
+                 Constructor<?> ctor = clazz.getConstructor();
+                 var scheduler = (Executor) ctor.newInstance();
+                 System.err.println("""
+                     WARNING: Using custom scheduler, this is an experimental feature.""");
+                 return scheduler;
+             } catch (Exception ex) {
+                 throw new Error(ex);
+             }
+         } else {
+             return createDefaultForkJoinPoolScheduler();
+         }
+     }
+ 
      /**
       * Creates the default ForkJoinPool scheduler.
       */
!     private static ForkJoinPool createDefaultForkJoinPoolScheduler() {
          ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool);
          int parallelism, maxPoolSize, minRunnable;
          String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
          String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
          String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");

*** 1530,6 ***
          var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
                  VirtualThread::unblockVirtualThreads);
          unblocker.setDaemon(true);
          unblocker.start();
      }
! }
--- 1567,6 ---
          var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
                  VirtualThread::unblockVirtualThreads);
          unblocker.setDaemon(true);
          unblocker.start();
      }
! }
< prev index next >