< prev index next >

src/java.base/share/classes/java/lang/VirtualThread.java

Print this page
*** 22,10 ***
--- 22,11 ---
   * or visit www.oracle.com if you need additional information or have any
   * questions.
   */
  package java.lang;
  
+ import java.nio.charset.StandardCharsets;
  import java.security.AccessController;
  import java.security.PrivilegedAction;
  import java.util.Locale;
  import java.util.Objects;
  import java.util.concurrent.Callable;

*** 67,11 ***
   */
  final class VirtualThread extends BaseVirtualThread {
      private static final Unsafe U = Unsafe.getUnsafe();
      private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
      private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
!     private static final ScheduledExecutorService UNPARKER = createDelayedTaskScheduler();
      private static final int TRACE_PINNING_MODE = tracePinningMode();
  
      private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
      private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
      private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
--- 68,11 ---
   */
  final class VirtualThread extends BaseVirtualThread {
      private static final Unsafe U = Unsafe.getUnsafe();
      private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
      private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
!     private static final ScheduledExecutorService[] DELAYED_TASK_SCHEDULERS = createDelayedTaskSchedulers();
      private static final int TRACE_PINNING_MODE = tracePinningMode();
  
      private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
      private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
      private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");

*** 105,10 ***
--- 106,14 ---
       *  TIMED_PARKED -> RUNNABLE        // unparked, schedule to continue
       *  TIMED_PINNED -> RUNNING         // unparked, continue execution on same carrier
       *
       * RUNNABLE -> RUNNING         // continue execution
       *
+      *  RUNNABLE -> BLOCKING       // blocking on monitor enter
+      *  BLOCKING -> BLOCKED        // blocked on monitor enter
+      *   BLOCKED -> RUNNABLE       // unblocked
+      *
       *  RUNNING -> YIELDING        // Thread.yield
       * YIELDING -> RUNNABLE        // yield successful
       * YIELDING -> RUNNING         // yield failed
       */
      private static final int NEW      = 0;

*** 126,18 ***
--- 131,25 ---
      private static final int TIMED_PARKED  = 8;
      private static final int TIMED_PINNED  = 9;
  
      private static final int YIELDING   = 10;  // Thread.yield
  
+     // monitor enter
+     private static final int BLOCKING   = 11;
+     private static final int BLOCKED    = 12;
+ 
      private static final int TERMINATED = 99;  // final state
  
      // can be suspended from scheduling when unmounted
      private static final int SUSPENDED = 1 << 8;
  
      // parking permit
      private volatile boolean parkPermit;
  
+     // unblocked
+     private volatile boolean unblocked;
+ 
      // carrier thread when mounted, accessed by VM
      private volatile Thread carrierThread;
  
      // termination object when joining, created lazily if needed
      private volatile CountDownLatch termination;

*** 189,11 ***
          }
          @Override
          protected void onPinned(Continuation.Pinned reason) {
              if (TRACE_PINNING_MODE > 0) {
                  boolean printAll = (TRACE_PINNING_MODE == 1);
!                 PinnedThreadPrinter.printStackTrace(System.out, printAll);
              }
          }
          private static Runnable wrap(VirtualThread vthread, Runnable task) {
              return new Runnable() {
                  @Hidden
--- 201,19 ---
          }
          @Override
          protected void onPinned(Continuation.Pinned reason) {
              if (TRACE_PINNING_MODE > 0) {
                  boolean printAll = (TRACE_PINNING_MODE == 1);
!                 VirtualThread vthread = (VirtualThread) Thread.currentThread();
+                 int oldState = vthread.state();
+                 try {
+                     // avoid printing when in transition states
+                     vthread.setState(RUNNING);
+                     PinnedThreadPrinter.printStackTrace(System.out, printAll);
+                 } finally {
+                     vthread.setState(oldState);
+                 }
              }
          }
          private static Runnable wrap(VirtualThread vthread, Runnable task) {
              return new Runnable() {
                  @Hidden

*** 242,12 ***
      }
  
      /**
       * Submits the runContinuation task to the scheduler. For the default scheduler,
       * and calling it on a worker thread, the task will be pushed to the local queue,
!      * otherwise it will be pushed to a submission queue.
-      *
       * @throws RejectedExecutionException
       */
      private void submitRunContinuation() {
          try {
              scheduler.execute(runContinuation);
--- 262,11 ---
      }
  
      /**
       * Submits the runContinuation task to the scheduler. For the default scheduler,
       * and calling it on a worker thread, the task will be pushed to the local queue,
!      * otherwise it will be pushed to an external submission queue.
       * @throws RejectedExecutionException
       */
      private void submitRunContinuation() {
          try {
              scheduler.execute(runContinuation);

*** 256,11 ***
              throw ree;
          }
      }
  
      /**
!      * Submits the runContinuation task to the scheduler with a lazy submit.
       * @throws RejectedExecutionException
       * @see ForkJoinPool#lazySubmit(ForkJoinTask)
       */
      private void lazySubmitRunContinuation(ForkJoinPool pool) {
          try {
--- 275,25 ---
              throw ree;
          }
      }
  
      /**
!      * Submits the runContinuation task the scheduler. For the default scheduler, the task
+      * will be pushed to an external submission queue.
+      * @throws RejectedExecutionException
+      */
+     private void externalSubmitRunContinuation() {
+         if (scheduler == DEFAULT_SCHEDULER
+                 && currentCarrierThread() instanceof CarrierThread ct) {
+             externalSubmitRunContinuation(ct.getPool());
+         } else {
+             submitRunContinuation();
+         }
+     }
+ 
+     /**
+      * Submits the runContinuation task to given scheduler with a lazy submit.
       * @throws RejectedExecutionException
       * @see ForkJoinPool#lazySubmit(ForkJoinTask)
       */
      private void lazySubmitRunContinuation(ForkJoinPool pool) {
          try {

*** 270,11 ***
              throw ree;
          }
      }
  
      /**
!      * Submits the runContinuation task to the scheduler as an external submit.
       * @throws RejectedExecutionException
       * @see ForkJoinPool#externalSubmit(ForkJoinTask)
       */
      private void externalSubmitRunContinuation(ForkJoinPool pool) {
          try {
--- 303,11 ---
              throw ree;
          }
      }
  
      /**
!      * Submits the runContinuation task to the given scheduler as an external submit.
       * @throws RejectedExecutionException
       * @see ForkJoinPool#externalSubmit(ForkJoinTask)
       */
      private void externalSubmitRunContinuation(ForkJoinPool pool) {
          try {

*** 372,10 ***
--- 405,12 ---
       * current thread is the current platform thread.
       */
      @ChangesCurrentThread
      @ReservedStackAccess
      private void unmount() {
+         assert !Thread.holdsLock(interruptLock);
+ 
          // set Thread.currentThread() to return the platform thread
          Thread carrier = this.carrierThread;
          carrier.setCurrentThread(carrier);
  
          // break connection to carrier thread, synchronized with interrupt

*** 480,10 ***
--- 515,20 ---
                  submitRunContinuation();
              }
              return;
          }
  
+         // blocking on monitorenter
+         if (s == BLOCKING) {
+             setState(BLOCKED);
+             if (unblocked && compareAndSetState(BLOCKED, RUNNABLE)) {
+                 unblocked = false;
+                 submitRunContinuation();
+             }
+             return;
+         }
+ 
          assert false;
      }
  
      /**
       * Invoked after the continuation completes.

*** 543,12 ***
              addedToContainer = true;
  
              // scoped values may be inherited
              inheritScopedValueBindings(container);
  
!             // submit task to run thread
!             submitRunContinuation();
              started = true;
          } finally {
              if (!started) {
                  afterDone(addedToContainer);
              }
--- 588,12 ---
              addedToContainer = true;
  
              // scoped values may be inherited
              inheritScopedValueBindings(container);
  
!             // submit task to run thread, using externalSubmit if possible
!             externalSubmitRunContinuation();
              started = true;
          } finally {
              if (!started) {
                  afterDone(addedToContainer);
              }

*** 616,11 ***
          // park the thread for the waiting time
          if (nanos > 0) {
              long startTime = System.nanoTime();
  
              boolean yielded = false;
!             Future<?> unparker = scheduleUnpark(this::unpark, nanos);
              setState(TIMED_PARKING);
              try {
                  yielded = yieldContinuation();  // may throw
              } finally {
                  assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
--- 661,11 ---
          // park the thread for the waiting time
          if (nanos > 0) {
              long startTime = System.nanoTime();
  
              boolean yielded = false;
!             Future<?> unparker = scheduleUnpark(nanos);  // may throw OOME
              setState(TIMED_PARKING);
              try {
                  yielded = yieldContinuation();  // may throw
              } finally {
                  assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));

*** 681,28 ***
              }
          }
      }
  
      /**
!      * Schedule an unpark task to run after a given delay.
       */
      @ChangesCurrentThread
!     private Future<?> scheduleUnpark(Runnable unparker, long nanos) {
          // need to switch to current carrier thread to avoid nested parking
          switchToCarrierThread();
          try {
!             return UNPARKER.schedule(unparker, nanos, NANOSECONDS);
          } finally {
              switchToVirtualThread(this);
          }
      }
  
      /**
       * Cancels a task if it has not completed.
       */
      @ChangesCurrentThread
      private void cancel(Future<?> future) {
          if (!future.isDone()) {
              // need to switch to current carrier thread to avoid nested parking
              switchToCarrierThread();
              try {
                  future.cancel(false);
--- 726,30 ---
              }
          }
      }
  
      /**
!      * Schedule this virtual thread to be unparked after a given delay.
       */
      @ChangesCurrentThread
!     private Future<?> scheduleUnpark(long nanos) {
+         assert Thread.currentThread() == this;
          // need to switch to current carrier thread to avoid nested parking
          switchToCarrierThread();
          try {
!             return delayedTaskScheduler().schedule(this::unpark, nanos, NANOSECONDS);
          } finally {
              switchToVirtualThread(this);
          }
      }
  
      /**
       * Cancels a task if it has not completed.
       */
      @ChangesCurrentThread
      private void cancel(Future<?> future) {
+         assert Thread.currentThread() == this;
          if (!future.isDone()) {
              // need to switch to current carrier thread to avoid nested parking
              switchToCarrierThread();
              try {
                  future.cancel(false);

*** 736,21 ***
                      }
                  } else {
                      submitRunContinuation();
                  }
              } else if ((s == PINNED) || (s == TIMED_PINNED)) {
!                 // unpark carrier thread when pinned.
                  synchronized (carrierThreadAccessLock()) {
                      Thread carrier = carrierThread;
                      if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) {
                          U.unpark(carrier);
                      }
                  }
              }
          }
      }
  
      /**
       * Attempts to yield the current virtual thread (Thread.yield).
       */
      void tryYield() {
          assert Thread.currentThread() == this;
--- 783,34 ---
                      }
                  } else {
                      submitRunContinuation();
                  }
              } else if ((s == PINNED) || (s == TIMED_PINNED)) {
!                 // unpark carrier thread when pinned
                  synchronized (carrierThreadAccessLock()) {
                      Thread carrier = carrierThread;
                      if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) {
                          U.unpark(carrier);
                      }
                  }
              }
          }
      }
  
+     /**
+      * Re-enables this virtual thread for scheduling after blocking on monitor enter.
+      * @throws RejectedExecutionException if the scheduler cannot accept a task
+      */
+     private void unblock() {
+         assert !Thread.currentThread().isVirtual();
+         unblocked = true;
+         if (state() == BLOCKED && compareAndSetState(BLOCKED, RUNNABLE)) {
+             unblocked = false;
+             submitRunContinuation();
+         }
+     }
+ 
      /**
       * Attempts to yield the current virtual thread (Thread.yield).
       */
      void tryYield() {
          assert Thread.currentThread() == this;

*** 842,11 ***
              checkAccess();
              synchronized (interruptLock) {
                  interrupted = true;
                  Interruptible b = nioBlocker;
                  if (b != null) {
!                     b.interrupt(this);
                  }
  
                  // interrupt carrier thread if mounted
                  Thread carrier = carrierThread;
                  if (carrier != null) carrier.setInterrupt();
--- 902,17 ---
              checkAccess();
              synchronized (interruptLock) {
                  interrupted = true;
                  Interruptible b = nioBlocker;
                  if (b != null) {
!                     // ensure current thread doesn't unmount while holding interruptLock
+                     Continuation.pin();
+                     try {
+                         b.interrupt(this);
+                     } finally {
+                         Continuation.unpin();
+                     }
                  }
  
                  // interrupt carrier thread if mounted
                  Thread carrier = carrierThread;
                  if (carrier != null) carrier.setInterrupt();

*** 866,13 ***
      @Override
      boolean getAndClearInterrupt() {
          assert Thread.currentThread() == this;
          boolean oldValue = interrupted;
          if (oldValue) {
!             synchronized (interruptLock) {
!                 interrupted = false;
!                 carrierThread.clearInterrupt();
              }
          }
          return oldValue;
      }
  
--- 932,19 ---
      @Override
      boolean getAndClearInterrupt() {
          assert Thread.currentThread() == this;
          boolean oldValue = interrupted;
          if (oldValue) {
!             // ensure current thread doesn't unmount trying to enter interruptLock
!             Continuation.pin();
!             try {
+                 synchronized (interruptLock) {
+                     interrupted = false;
+                     carrierThread.clearInterrupt();
+                 }
+             } finally {
+                 Continuation.unpin();
              }
          }
          return oldValue;
      }
  

*** 892,29 ***
              case RUNNABLE:
                  // runnable, not mounted
                  return Thread.State.RUNNABLE;
              case RUNNING:
                  // if mounted then return state of carrier thread
!                 synchronized (carrierThreadAccessLock()) {
!                     Thread carrierThread = this.carrierThread;
!                     if (carrierThread != null) {
!                         return carrierThread.threadState();
                      }
                  }
                  // runnable, mounted
                  return Thread.State.RUNNABLE;
              case PARKING:
              case TIMED_PARKING:
              case YIELDING:
!                 // runnable, mounted, not yet waiting
                  return Thread.State.RUNNABLE;
              case PARKED:
              case PINNED:
                  return State.WAITING;
              case TIMED_PARKED:
              case TIMED_PINNED:
                  return State.TIMED_WAITING;
              case TERMINATED:
                  return Thread.State.TERMINATED;
              default:
                  throw new InternalError();
          }
--- 964,34 ---
              case RUNNABLE:
                  // runnable, not mounted
                  return Thread.State.RUNNABLE;
              case RUNNING:
                  // if mounted then return state of carrier thread
!                 if (Thread.currentThread() != this) {
!                     synchronized (carrierThreadAccessLock()) {
!                         Thread carrier = this.carrierThread;
!                         if (carrier != null) {
+                             return carrier.threadState();
+                         }
                      }
                  }
                  // runnable, mounted
                  return Thread.State.RUNNABLE;
              case PARKING:
              case TIMED_PARKING:
              case YIELDING:
!             case BLOCKING:
+                 // runnable, not yet waiting/blocked
                  return Thread.State.RUNNABLE;
              case PARKED:
              case PINNED:
                  return State.WAITING;
              case TIMED_PARKED:
              case TIMED_PINNED:
                  return State.TIMED_WAITING;
+             case BLOCKED:
+                 return State.BLOCKED;
              case TERMINATED:
                  return Thread.State.TERMINATED;
              default:
                  throw new InternalError();
          }

*** 950,23 ***
       * Returns null if the thread is in another state.
       */
      private StackTraceElement[] tryGetStackTrace() {
          int initialState = state();
          return switch (initialState) {
!             case RUNNABLE, PARKED, TIMED_PARKED -> {
                  int suspendedState = initialState | SUSPENDED;
                  if (compareAndSetState(initialState, suspendedState)) {
                      try {
                          yield cont.getStackTrace();
                      } finally {
                          assert state == suspendedState;
                          setState(initialState);
  
                          // re-submit if runnable
!                         // re-submit if unparked while suspended
                          if (initialState == RUNNABLE
!                             || (parkPermit && compareAndSetState(initialState, RUNNABLE))) {
                              try {
                                  submitRunContinuation();
                              } catch (RejectedExecutionException ignore) { }
                          }
                      }
--- 1027,24 ---
       * Returns null if the thread is in another state.
       */
      private StackTraceElement[] tryGetStackTrace() {
          int initialState = state();
          return switch (initialState) {
!             case RUNNABLE, PARKED, BLOCKED, TIMED_PARKED -> {
                  int suspendedState = initialState | SUSPENDED;
                  if (compareAndSetState(initialState, suspendedState)) {
                      try {
                          yield cont.getStackTrace();
                      } finally {
                          assert state == suspendedState;
                          setState(initialState);
  
                          // re-submit if runnable
!                         // re-submit if unparked or unblocked while suspended
                          if (initialState == RUNNABLE
!                                 || ((parkPermit || unblocked)
+                                     && compareAndSetState(initialState, RUNNABLE))) {
                              try {
                                  submitRunContinuation();
                              } catch (RejectedExecutionException ignore) { }
                          }
                      }

*** 986,41 ***
          if (!name.isEmpty()) {
              sb.append(",");
              sb.append(name);
          }
          sb.append("]/");
          Thread carrier = carrierThread;
!         if (carrier != null) {
!             // include the carrier thread state and name when mounted
!             synchronized (carrierThreadAccessLock()) {
!                 carrier = carrierThread;
!                 if (carrier != null) {
!                     String stateAsString = carrier.threadState().toString();
!                     sb.append(stateAsString.toLowerCase(Locale.ROOT));
-                     sb.append('@');
-                     sb.append(carrier.getName());
                  }
              }
          }
          // include virtual thread state when not mounted
          if (carrier == null) {
              String stateAsString = threadState().toString();
              sb.append(stateAsString.toLowerCase(Locale.ROOT));
          }
          return sb.toString();
      }
  
      @Override
      public int hashCode() {
          return (int) threadId();
      }
  
      @Override
      public boolean equals(Object obj) {
          return obj == this;
      }
  
      /**
       * Returns the termination object, creating it if needed.
       */
      private CountDownLatch getTermination() {
          CountDownLatch termination = this.termination;
--- 1064,63 ---
          if (!name.isEmpty()) {
              sb.append(",");
              sb.append(name);
          }
          sb.append("]/");
+ 
+         // include the carrier thread state and name when mounted
          Thread carrier = carrierThread;
!         if (Thread.currentThread() == this) {
!             appendCarrierInfo(sb, carrier);
!         } else if (carrier != null) {
!             if (Thread.currentThread() != this) {
!                 synchronized (carrierThreadAccessLock()) {
!                     carrier = carrierThread;  // re-read
!                     appendCarrierInfo(sb, carrier);
                  }
              }
          }
+ 
          // include virtual thread state when not mounted
          if (carrier == null) {
              String stateAsString = threadState().toString();
              sb.append(stateAsString.toLowerCase(Locale.ROOT));
          }
          return sb.toString();
      }
  
+     /**
+      * Appends the carrier's state and name to the given string builder when mounted.
+      */
+     private void appendCarrierInfo(StringBuilder sb, Thread carrier) {
+         if (carrier != null) {
+             String stateAsString = carrier.threadState().toString();
+             sb.append(stateAsString.toLowerCase(Locale.ROOT));
+             sb.append('@');
+             sb.append(carrier.getName());
+         }
+     }
+ 
      @Override
      public int hashCode() {
          return (int) threadId();
      }
  
      @Override
      public boolean equals(Object obj) {
          return obj == this;
      }
  
+     /**
+      * Returns a ScheduledExecutorService to execute a delayed task.
+      */
+     private ScheduledExecutorService delayedTaskScheduler() {
+         long tid = Thread.currentThread().threadId();
+         int index = (int) tid & (DELAYED_TASK_SCHEDULERS.length - 1);
+         return DELAYED_TASK_SCHEDULERS[index];
+     }
+ 
      /**
       * Returns the termination object, creating it if needed.
       */
      private CountDownLatch getTermination() {
          CountDownLatch termination = this.termination;

*** 1115,17 ***
              int parallelism, maxPoolSize, minRunnable;
              String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
              String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
              String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
              if (parallelismValue != null) {
!                 parallelism = Integer.parseInt(parallelismValue);
              } else {
                  parallelism = Runtime.getRuntime().availableProcessors();
              }
              if (maxPoolSizeValue != null) {
                  maxPoolSize = Integer.parseInt(maxPoolSizeValue);
!                 parallelism = Integer.min(parallelism, maxPoolSize);
              } else {
                  maxPoolSize = Integer.max(parallelism, 256);
              }
              if (minRunnableValue != null) {
                  minRunnable = Integer.parseInt(minRunnableValue);
--- 1215,21 ---
              int parallelism, maxPoolSize, minRunnable;
              String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
              String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
              String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
              if (parallelismValue != null) {
!                 parallelism = Integer.max(Integer.parseInt(parallelismValue), 1);
              } else {
                  parallelism = Runtime.getRuntime().availableProcessors();
              }
              if (maxPoolSizeValue != null) {
                  maxPoolSize = Integer.parseInt(maxPoolSizeValue);
!                 if (maxPoolSize > 0) {
+                     parallelism = Integer.min(parallelism, maxPoolSize);
+                 } else {
+                     maxPoolSize = parallelism;  // no spares
+                 }
              } else {
                  maxPoolSize = Integer.max(parallelism, 256);
              }
              if (minRunnableValue != null) {
                  minRunnable = Integer.parseInt(minRunnableValue);

*** 1139,26 ***
          };
          return AccessController.doPrivileged(pa);
      }
  
      /**
!      * Creates the ScheduledThreadPoolExecutor used for timed unpark.
       */
!     private static ScheduledExecutorService createDelayedTaskScheduler() {
!         String propValue = GetPropertyAction.privilegedGetProperty("jdk.unparker.maxPoolSize");
!         int poolSize;
          if (propValue != null) {
!             poolSize = Integer.parseInt(propValue);
          } else {
!             poolSize = 1;
          }
!         ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor)
!             Executors.newScheduledThreadPool(poolSize, task -> {
!                 return InnocuousThread.newThread("VirtualThread-unparker", task);
!             });
!         stpe.setRemoveOnCancelPolicy(true);
!         return stpe;
      }
  
      /**
       * Reads the value of the jdk.tracePinnedThreads property to determine if stack
       * traces should be printed when a carrier thread is pinned when a virtual thread
--- 1243,45 ---
          };
          return AccessController.doPrivileged(pa);
      }
  
      /**
!      * Invoked by the VM for the Thread.vthread_scheduler diagnostic command.
       */
!     private static byte[] printDefaultScheduler() {
!         return String.format("%s%n", DEFAULT_SCHEDULER.toString())
!                 .getBytes(StandardCharsets.UTF_8);
+     }
+ 
+     /**
+      * Creates the ScheduledThreadPoolExecutors used to execute delayed tasks.
+      */
+     private static ScheduledExecutorService[] createDelayedTaskSchedulers() {
+         String propName = "jdk.virtualThreadScheduler.timerQueues";
+         String propValue = GetPropertyAction.privilegedGetProperty(propName);
+         int queueCount;
          if (propValue != null) {
!             queueCount = Integer.parseInt(propValue);
+             if (queueCount != Integer.highestOneBit(queueCount)) {
+                 throw new RuntimeException("Value of " + propName + " must be power of 2");
+             }
          } else {
!             int ncpus = Runtime.getRuntime().availableProcessors();
+             queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1);
          }
!         var schedulers = new ScheduledExecutorService[queueCount];
!         for (int i = 0; i < queueCount; i++) {
!             ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor)
!                 Executors.newScheduledThreadPool(1, task -> {
!                     Thread t = InnocuousThread.newThread("VirtualThread-unparker", task);
!                     t.setDaemon(true);
+                     return t;
+                 });
+             stpe.setRemoveOnCancelPolicy(true);
+             schedulers[i] = stpe;
+         }
+         return schedulers;
      }
  
      /**
       * Reads the value of the jdk.tracePinnedThreads property to determine if stack
       * traces should be printed when a carrier thread is pinned when a virtual thread

*** 1172,6 ***
--- 1295,20 ---
              if ("short".equalsIgnoreCase(propValue))
                  return 2;
          }
          return 0;
      }
+ 
+     /**
+      * Unblock virtual threads that are ready to be scheduled again.
+      */
+     private static void processPendingList() {
+         // TBD invoke unblock
+     }
+ 
+     static {
+         var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
+                 VirtualThread::processPendingList);
+         unblocker.setDaemon(true);
+         unblocker.start();
+     }
  }
< prev index next >