< prev index next >

src/java.base/share/classes/java/lang/VirtualThread.java

Print this page

   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.lang;
  26 



  27 import java.util.Locale;
  28 import java.util.Objects;
  29 import java.util.concurrent.CountDownLatch;
  30 import java.util.concurrent.Executor;
  31 import java.util.concurrent.Executors;
  32 import java.util.concurrent.ForkJoinPool;
  33 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
  34 import java.util.concurrent.ForkJoinTask;
  35 import java.util.concurrent.Future;

  36 import java.util.concurrent.RejectedExecutionException;
  37 import java.util.concurrent.ScheduledExecutorService;

  38 import java.util.concurrent.ScheduledThreadPoolExecutor;


  39 import java.util.concurrent.TimeUnit;
  40 import jdk.internal.event.VirtualThreadEndEvent;

  41 import jdk.internal.event.VirtualThreadStartEvent;
  42 import jdk.internal.event.VirtualThreadSubmitFailedEvent;

  43 import jdk.internal.misc.CarrierThread;
  44 import jdk.internal.misc.InnocuousThread;
  45 import jdk.internal.misc.Unsafe;
  46 import jdk.internal.vm.Continuation;
  47 import jdk.internal.vm.ContinuationScope;
  48 import jdk.internal.vm.StackableScope;
  49 import jdk.internal.vm.ThreadContainer;
  50 import jdk.internal.vm.ThreadContainers;
  51 import jdk.internal.vm.annotation.ChangesCurrentThread;
  52 import jdk.internal.vm.annotation.Hidden;
  53 import jdk.internal.vm.annotation.IntrinsicCandidate;
  54 import jdk.internal.vm.annotation.JvmtiHideEvents;
  55 import jdk.internal.vm.annotation.JvmtiMountTransition;
  56 import jdk.internal.vm.annotation.ReservedStackAccess;
  57 import sun.nio.ch.Interruptible;
  58 import static java.util.concurrent.TimeUnit.*;
  59 
  60 /**
  61  * A thread that is scheduled by the Java virtual machine rather than the operating system.
  62  */
  63 final class VirtualThread extends BaseVirtualThread {
  64     private static final Unsafe U = Unsafe.getUnsafe();
  65     private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
  66     private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();






















  67 
  68     private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
  69     private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
  70     private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
  71     private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
  72     private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
  73 
  74     // scheduler and continuation
  75     private final Executor scheduler;
  76     private final Continuation cont;
  77     private final Runnable runContinuation;
  78 
  79     // virtual thread state, accessed by VM
  80     private volatile int state;
  81 
  82     /*
  83      * Virtual thread state transitions:
  84      *
  85      *      NEW -> STARTED         // Thread.start, schedule to run
  86      *  STARTED -> TERMINATED      // failed to start
  87      *  STARTED -> RUNNING         // first run
  88      *  RUNNING -> TERMINATED      // done
  89      *
  90      *  RUNNING -> PARKING         // Thread parking with LockSupport.park
  91      *  PARKING -> PARKED          // cont.yield successful, parked indefinitely
  92      *   PARKED -> UNPARKED        // unparked, may be scheduled to continue
  93      * UNPARKED -> RUNNING         // continue execution after park
  94      *
  95      *  PARKING -> RUNNING         // cont.yield failed, need to park on carrier
  96      *  RUNNING -> PINNED          // park on carrier
  97      *   PINNED -> RUNNING         // unparked, continue execution on same carrier

 171 
 172     // true when waiting in Object.wait, false for VM internal uninterruptible Object.wait
 173     private volatile boolean interruptibleWait;
 174 
 175     // timed-wait support
 176     private byte timedWaitSeqNo;
 177 
 178     // timeout for timed-park and timed-wait, only accessed on current/carrier thread
 179     private long timeout;
 180 
 181     // timer task for timed-park and timed-wait, only accessed on current/carrier thread
 182     private Future<?> timeoutTask;
 183 
 184     // carrier thread when mounted, accessed by VM
 185     private volatile Thread carrierThread;
 186 
 187     // termination object when joining, created lazily if needed
 188     private volatile CountDownLatch termination;
 189 
 190     /**
 191      * Returns the default scheduler.

 192      */
 193     static Executor defaultScheduler() {







 194         return DEFAULT_SCHEDULER;
 195     }
 196 
 197     /**
 198      * Returns the continuation scope used for virtual threads.
 199      */
 200     static ContinuationScope continuationScope() {
 201         return VTHREAD_SCOPE;
 202     }
 203 
 204     /**
 205      * Creates a new {@code VirtualThread} to run the given task with the given
 206      * scheduler. If the given scheduler is {@code null} and the current thread
 207      * is a platform thread then the newly created virtual thread will use the
 208      * default scheduler. If given scheduler is {@code null} and the current
 209      * thread is a virtual thread then the current thread's scheduler is used.



 210      *
 211      * @param scheduler the scheduler or null

 212      * @param name thread name
 213      * @param characteristics characteristics
 214      * @param task the task to execute
 215      */
 216     VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {




 217         super(name, characteristics, /*bound*/ false);
 218         Objects.requireNonNull(task);
 219 
 220         // choose scheduler if not specified
 221         if (scheduler == null) {
 222             Thread parent = Thread.currentThread();
 223             if (parent instanceof VirtualThread vparent) {
 224                 scheduler = vparent.scheduler;
 225             } else {
 226                 scheduler = DEFAULT_SCHEDULER;
 227             }
 228         }
 229 
 230         this.scheduler = scheduler;
 231         this.cont = new VThreadContinuation(this, task);
 232         this.runContinuation = this::runContinuation;





























































 233     }
 234 
 235     /**
 236      * The continuation that a virtual thread executes.
 237      */
 238     private static class VThreadContinuation extends Continuation {
 239         VThreadContinuation(VirtualThread vthread, Runnable task) {
 240             super(VTHREAD_SCOPE, wrap(vthread, task));
 241         }
 242         @Override
 243         protected void onPinned(Continuation.Pinned reason) {
 244         }
 245         private static Runnable wrap(VirtualThread vthread, Runnable task) {
 246             return new Runnable() {
 247                 @Hidden
 248                 @JvmtiHideEvents
 249                 public void run() {
 250                     vthread.endFirstTransition();
 251                     try {
 252                         vthread.run(task);

 300             if (cont.isDone()) {
 301                 afterDone();
 302             } else {
 303                 afterYield();
 304             }
 305         }
 306     }
 307 
 308     /**
 309      * Cancel timeout task when continuing after timed-park or timed-wait.
 310      * The timeout task may be executing, or may have already completed.
 311      */
 312     private void cancelTimeoutTask() {
 313         if (timeoutTask != null) {
 314             timeoutTask.cancel(false);
 315             timeoutTask = null;
 316         }
 317     }
 318 
 319     /**
 320      * Submits the given task to the given executor. If the scheduler is a
 321      * ForkJoinPool then the task is first adapted to a ForkJoinTask.
 322      */
 323     private void submit(Executor executor, Runnable task) {
 324         if (executor instanceof ForkJoinPool pool) {
 325             pool.submit(ForkJoinTask.adapt(task));
 326         } else {
 327             executor.execute(task);
 328         }
 329     }
 330 
 331     /**
 332      * Submits the runContinuation task to the scheduler. For the default scheduler,
 333      * and calling it on a worker thread, the task will be pushed to the local queue,
 334      * otherwise it will be pushed to an external submission queue.
 335      * @param scheduler the scheduler
 336      * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown
 337      * @throws RejectedExecutionException
 338      */
 339     private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) {
 340         boolean done = false;
 341         while (!done) {
 342             try {
 343                 // Pin the continuation to prevent the virtual thread from unmounting
 344                 // when submitting a task. For the default scheduler this ensures that
 345                 // the carrier doesn't change when pushing a task. For other schedulers
 346                 // it avoids deadlock that could arise due to carriers and virtual
 347                 // threads contending for a lock.
 348                 if (currentThread().isVirtual()) {
 349                     Continuation.pin();
 350                     try {
 351                         submit(scheduler, runContinuation);
 352                     } finally {
 353                         Continuation.unpin();
 354                     }
 355                 } else {
 356                     submit(scheduler, runContinuation);
 357                 }
 358                 done = true;
 359             } catch (RejectedExecutionException ree) {
 360                 submitFailed(ree);
 361                 throw ree;
 362             } catch (OutOfMemoryError e) {
 363                 if (retryOnOOME) {
 364                     U.park(false, 100_000_000); // 100ms
 365                 } else {
 366                     throw e;
 367                 }
 368             }
 369         }
 370     }
 371 
 372     /**
 373      * Submits the runContinuation task to the given scheduler as an external submit.
 374      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
 375      * @throws RejectedExecutionException
 376      * @see ForkJoinPool#externalSubmit(ForkJoinTask)
 377      */
 378     private void externalSubmitRunContinuation(ForkJoinPool pool) {
 379         assert Thread.currentThread() instanceof CarrierThread;
 380         try {
 381             pool.externalSubmit(ForkJoinTask.adapt(runContinuation));
 382         } catch (RejectedExecutionException ree) {
 383             submitFailed(ree);
 384             throw ree;
 385         } catch (OutOfMemoryError e) {
 386             submitRunContinuation(pool, true);
 387         }
 388     }
 389 
 390     /**
 391      * Submits the runContinuation task to the scheduler. For the default scheduler,
 392      * and calling it on a worker thread, the task will be pushed to the local queue,
 393      * otherwise it will be pushed to an external submission queue.
 394      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
 395      * @throws RejectedExecutionException
 396      */
 397     private void submitRunContinuation() {
 398         submitRunContinuation(scheduler, true);
 399     }
 400 
 401     /**
 402      * Lazy submit the runContinuation task if invoked on a carrier thread and its local
 403      * queue is empty. If not empty, or invoked by another thread, then this method works
 404      * like submitRunContinuation and just submits the task to the scheduler.
 405      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
 406      * @throws RejectedExecutionException
 407      * @see ForkJoinPool#lazySubmit(ForkJoinTask)
 408      */
 409     private void lazySubmitRunContinuation() {

 410         if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
 411             ForkJoinPool pool = ct.getPool();
 412             try {
 413                 pool.lazySubmit(ForkJoinTask.adapt(runContinuation));
 414             } catch (RejectedExecutionException ree) {
 415                 submitFailed(ree);
 416                 throw ree;
 417             } catch (OutOfMemoryError e) {
 418                 submitRunContinuation();
 419             }
 420         } else {
 421             submitRunContinuation();
 422         }
 423     }
 424 
 425     /**
 426      * Submits the runContinuation task to the scheduler. For the default scheduler, and
 427      * calling it a virtual thread that uses the default scheduler, the task will be
 428      * pushed to an external submission queue. This method may throw OutOfMemoryError.

 429      * @throws RejectedExecutionException
 430      * @throws OutOfMemoryError
 431      */
 432     private void externalSubmitRunContinuationOrThrow() {
 433         if (scheduler == DEFAULT_SCHEDULER && currentCarrierThread() instanceof CarrierThread ct) {

 434             try {
 435                 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation));
 436             } catch (RejectedExecutionException ree) {
 437                 submitFailed(ree);
 438                 throw ree;


 439             }
 440         } else {
 441             submitRunContinuation(scheduler, false);




































 442         }
 443     }
 444 
 445     /**
 446      * If enabled, emits a JFR VirtualThreadSubmitFailedEvent.
 447      */
 448     private void submitFailed(RejectedExecutionException ree) {
 449         var event = new VirtualThreadSubmitFailedEvent();
 450         if (event.isEnabled()) {
 451             event.javaThreadId = threadId();
 452             event.exceptionMessage = ree.getMessage();
 453             event.commit();
 454         }
 455     }
 456 
 457     /**
 458      * Runs a task in the context of this virtual thread.
 459      */
 460     private void run(Runnable task) {
 461         assert Thread.currentThread() == this && state == RUNNING;

 578                 long timeout = this.timeout;
 579                 assert timeout > 0;
 580                 timeoutTask = schedule(this::parkTimeoutExpired, timeout, NANOSECONDS);
 581                 setState(newState = TIMED_PARKED);
 582             }
 583 
 584             // may have been unparked while parking
 585             if (parkPermit && compareAndSetState(newState, UNPARKED)) {
 586                 // lazy submit if local queue is empty
 587                 lazySubmitRunContinuation();
 588             }
 589             return;
 590         }
 591 
 592         // Thread.yield
 593         if (s == YIELDING) {
 594             setState(YIELDED);
 595 
 596             // external submit if there are no tasks in the local task queue
 597             if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
 598                 externalSubmitRunContinuation(ct.getPool());
 599             } else {
 600                 submitRunContinuation();
 601             }
 602             return;
 603         }
 604 
 605         // blocking on monitorenter
 606         if (s == BLOCKING) {
 607             setState(BLOCKED);
 608 
 609             // may have been unblocked while blocking
 610             if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
 611                 // lazy submit if local queue is empty
 612                 lazySubmitRunContinuation();
 613             }
 614             return;
 615         }
 616 
 617         // Object.wait
 618         if (s == WAITING || s == TIMED_WAITING) {

 738     @Override
 739     public void run() {
 740         // do nothing
 741     }
 742 
 743     /**
 744      * Parks until unparked or interrupted. If already unparked then the parking
 745      * permit is consumed and this method completes immediately (meaning it doesn't
 746      * yield). It also completes immediately if the interrupted status is set.
 747      */
 748     @Override
 749     void park() {
 750         assert Thread.currentThread() == this;
 751 
 752         // complete immediately if parking permit available or interrupted
 753         if (getAndSetParkPermit(false) || interrupted)
 754             return;
 755 
 756         // park the thread
 757         boolean yielded = false;

 758         setState(PARKING);
 759         try {
 760             yielded = yieldContinuation();
 761         } catch (OutOfMemoryError e) {
 762             // park on carrier
 763         } finally {
 764             assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
 765             if (!yielded) {


 766                 assert state() == PARKING;
 767                 setState(RUNNING);
 768             }
 769         }
 770 
 771         // park on the carrier thread when pinned
 772         if (!yielded) {
 773             parkOnCarrierThread(false, 0);
 774         }
 775     }
 776 
 777     /**
 778      * Parks up to the given waiting time or until unparked or interrupted.
 779      * If already unparked then the parking permit is consumed and this method
 780      * completes immediately (meaning it doesn't yield). It also completes immediately
 781      * if the interrupted status is set or the waiting time is {@code <= 0}.
 782      *
 783      * @param nanos the maximum number of nanoseconds to wait.
 784      */
 785     @Override
 786     void parkNanos(long nanos) {
 787         assert Thread.currentThread() == this;
 788 
 789         // complete immediately if parking permit available or interrupted
 790         if (getAndSetParkPermit(false) || interrupted)
 791             return;
 792 
 793         // park the thread for the waiting time
 794         if (nanos > 0) {
 795             long startTime = System.nanoTime();
 796 
 797             // park the thread, afterYield will schedule the thread to unpark
 798             boolean yielded = false;

 799             timeout = nanos;
 800             setState(TIMED_PARKING);
 801             try {
 802                 yielded = yieldContinuation();
 803             } catch (OutOfMemoryError e) {
 804                 // park on carrier
 805             } finally {
 806                 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
 807                 if (!yielded) {


 808                     assert state() == TIMED_PARKING;
 809                     setState(RUNNING);
 810                 }
 811             }
 812 
 813             // park on carrier thread for remaining time when pinned (or OOME)
 814             if (!yielded) {
 815                 long remainingNanos = nanos - (System.nanoTime() - startTime);
 816                 parkOnCarrierThread(true, remainingNanos);
 817             }
 818         }
 819     }
 820 
 821     /**
 822      * Parks the current carrier thread up to the given waiting time or until
 823      * unparked or interrupted. If the virtual thread is interrupted then the
 824      * interrupted status will be propagated to the carrier thread.
 825      * @param timed true for a timed park, false for untimed
 826      * @param nanos the waiting time in nanoseconds
 827      */

1344     @JvmtiMountTransition
1345     private native void startFinalTransition();
1346 
1347     @IntrinsicCandidate
1348     @JvmtiMountTransition
1349     private native void startTransition(boolean mount);
1350 
1351     @IntrinsicCandidate
1352     @JvmtiMountTransition
1353     private native void endTransition(boolean mount);
1354 
1355     @IntrinsicCandidate
1356     private static native void notifyJvmtiDisableSuspend(boolean enter);
1357 
1358     private static native void registerNatives();
1359     static {
1360         registerNatives();
1361 
1362         // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
1363         var group = Thread.virtualThreadGroup();


































1364     }
1365 
1366     /**
1367      * Creates the default ForkJoinPool scheduler.

1368      */
1369     private static ForkJoinPool createDefaultScheduler() {
1370         ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool);
1371         int parallelism, maxPoolSize, minRunnable;
1372         String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
1373         String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
1374         String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
1375         if (parallelismValue != null) {
1376             parallelism = Integer.parseInt(parallelismValue);
1377         } else {
1378             parallelism = Runtime.getRuntime().availableProcessors();
1379         }
1380         if (maxPoolSizeValue != null) {
1381             maxPoolSize = Integer.parseInt(maxPoolSizeValue);
1382             parallelism = Integer.min(parallelism, maxPoolSize);
1383         } else {
1384             maxPoolSize = Integer.max(parallelism, 256);
1385         }
1386         if (minRunnableValue != null) {
1387             minRunnable = Integer.parseInt(minRunnableValue);
1388         } else {
1389             minRunnable = Integer.max(parallelism / 2, 1);
1390         }
1391         Thread.UncaughtExceptionHandler handler = (t, e) -> { };
1392         boolean asyncMode = true; // FIFO
1393         return new ForkJoinPool(parallelism, factory, handler, asyncMode,
1394                      0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);




























































































1395     }
1396 
1397     /**
1398      * Schedule a runnable task to run after a delay.
1399      */
1400     private Future<?> schedule(Runnable command, long delay, TimeUnit unit) {
1401         if (scheduler instanceof ForkJoinPool pool) {
1402             return pool.schedule(command, delay, unit);
1403         } else {
1404             return DelayedTaskSchedulers.schedule(command, delay, unit);


1405         }
1406     }
1407 
1408     /**
1409      * Supports scheduling a runnable task to run after a delay. It uses a number
1410      * of ScheduledThreadPoolExecutor instances to reduce contention on the delayed
1411      * work queue used. This class is used when using a custom scheduler.
1412      */
1413     private static class DelayedTaskSchedulers {
1414         private static final ScheduledExecutorService[] INSTANCE = createDelayedTaskSchedulers();
1415 
1416         static Future<?> schedule(Runnable command, long delay, TimeUnit unit) {
1417             long tid = Thread.currentThread().threadId();
1418             int index = (int) tid & (INSTANCE.length - 1);
1419             return INSTANCE[index].schedule(command, delay, unit);
1420         }
1421 
1422         private static ScheduledExecutorService[] createDelayedTaskSchedulers() {
1423             String propName = "jdk.virtualThreadScheduler.timerQueues";
1424             String propValue = System.getProperty(propName);
1425             int queueCount;
1426             if (propValue != null) {
1427                 queueCount = Integer.parseInt(propValue);
1428                 if (queueCount != Integer.highestOneBit(queueCount)) {
1429                     throw new RuntimeException("Value of " + propName + " must be power of 2");
1430                 }
1431             } else {
1432                 int ncpus = Runtime.getRuntime().availableProcessors();
1433                 queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1);

   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.lang;
  26 
  27 import java.lang.invoke.MethodHandles;
  28 import java.lang.invoke.VarHandle;
  29 import java.lang.reflect.Constructor;
  30 import java.util.Locale;
  31 import java.util.Objects;
  32 import java.util.concurrent.CountDownLatch;

  33 import java.util.concurrent.Executors;
  34 import java.util.concurrent.ForkJoinPool;

  35 import java.util.concurrent.ForkJoinTask;
  36 import java.util.concurrent.Future;
  37 import java.util.concurrent.LinkedTransferQueue;
  38 import java.util.concurrent.RejectedExecutionException;
  39 import java.util.concurrent.ScheduledExecutorService;
  40 import java.util.concurrent.ScheduledFuture;
  41 import java.util.concurrent.ScheduledThreadPoolExecutor;
  42 import java.util.concurrent.ThreadFactory;
  43 import java.util.concurrent.ThreadPoolExecutor;
  44 import java.util.concurrent.TimeUnit;
  45 import jdk.internal.event.VirtualThreadEndEvent;
  46 import jdk.internal.event.VirtualThreadParkEvent;
  47 import jdk.internal.event.VirtualThreadStartEvent;
  48 import jdk.internal.event.VirtualThreadSubmitFailedEvent;
  49 import jdk.internal.invoke.MhUtil;
  50 import jdk.internal.misc.CarrierThread;
  51 import jdk.internal.misc.InnocuousThread;
  52 import jdk.internal.misc.Unsafe;
  53 import jdk.internal.vm.Continuation;
  54 import jdk.internal.vm.ContinuationScope;
  55 import jdk.internal.vm.StackableScope;
  56 import jdk.internal.vm.ThreadContainer;
  57 import jdk.internal.vm.ThreadContainers;
  58 import jdk.internal.vm.annotation.ChangesCurrentThread;
  59 import jdk.internal.vm.annotation.Hidden;
  60 import jdk.internal.vm.annotation.IntrinsicCandidate;
  61 import jdk.internal.vm.annotation.JvmtiHideEvents;
  62 import jdk.internal.vm.annotation.JvmtiMountTransition;
  63 import jdk.internal.vm.annotation.ReservedStackAccess;
  64 import sun.nio.ch.Interruptible;
  65 import static java.util.concurrent.TimeUnit.*;
  66 
  67 /**
  68  * A thread that is scheduled by the Java virtual machine rather than the operating system.
  69  */
  70 final class VirtualThread extends BaseVirtualThread {
  71     private static final Unsafe U = Unsafe.getUnsafe();
  72     private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
  73 
  74     private static final VirtualThreadScheduler BUILTIN_SCHEDULER;
  75     private static final VirtualThreadScheduler DEFAULT_SCHEDULER;
  76     private static final VirtualThreadScheduler EXTERNAL_VIEW;
  77     private static final boolean USE_STPE;
  78     static {
  79         // experimental
  80         String propValue = System.getProperty("jdk.virtualThreadScheduler.implClass");
  81         if (propValue != null) {
  82             VirtualThreadScheduler builtinScheduler = createBuiltinScheduler(true);
  83             VirtualThreadScheduler externalView = createExternalView(builtinScheduler);
  84             VirtualThreadScheduler defaultScheduler = loadCustomScheduler(externalView, propValue);
  85             BUILTIN_SCHEDULER = builtinScheduler;
  86             DEFAULT_SCHEDULER = defaultScheduler;
  87             EXTERNAL_VIEW = externalView;
  88         } else {
  89             var builtinScheduler = createBuiltinScheduler(false);
  90             BUILTIN_SCHEDULER = builtinScheduler;
  91             DEFAULT_SCHEDULER = builtinScheduler;
  92             EXTERNAL_VIEW = createExternalView(builtinScheduler);
  93         }
  94         USE_STPE = Boolean.getBoolean("jdk.virtualThreadScheduler.useSTPE");
  95     }
  96 
  97     private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
  98     private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
  99     private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
 100     private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
 101     private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
 102 
 103     // scheduler and continuation
 104     private final VirtualThreadScheduler scheduler;
 105     private final Continuation cont;
 106     private final VThreadRunner runContinuation;
 107 
 108     // virtual thread state, accessed by VM
 109     private volatile int state;
 110 
 111     /*
 112      * Virtual thread state transitions:
 113      *
 114      *      NEW -> STARTED         // Thread.start, schedule to run
 115      *  STARTED -> TERMINATED      // failed to start
 116      *  STARTED -> RUNNING         // first run
 117      *  RUNNING -> TERMINATED      // done
 118      *
 119      *  RUNNING -> PARKING         // Thread parking with LockSupport.park
 120      *  PARKING -> PARKED          // cont.yield successful, parked indefinitely
 121      *   PARKED -> UNPARKED        // unparked, may be scheduled to continue
 122      * UNPARKED -> RUNNING         // continue execution after park
 123      *
 124      *  PARKING -> RUNNING         // cont.yield failed, need to park on carrier
 125      *  RUNNING -> PINNED          // park on carrier
 126      *   PINNED -> RUNNING         // unparked, continue execution on same carrier

 200 
 201     // true when waiting in Object.wait, false for VM internal uninterruptible Object.wait
 202     private volatile boolean interruptibleWait;
 203 
 204     // timed-wait support
 205     private byte timedWaitSeqNo;
 206 
 207     // timeout for timed-park and timed-wait, only accessed on current/carrier thread
 208     private long timeout;
 209 
 210     // timer task for timed-park and timed-wait, only accessed on current/carrier thread
 211     private Future<?> timeoutTask;
 212 
 213     // carrier thread when mounted, accessed by VM
 214     private volatile Thread carrierThread;
 215 
 216     // termination object when joining, created lazily if needed
 217     private volatile CountDownLatch termination;
 218 
 219     /**
 220      * Return the built-in scheduler.
 221      * @param trusted true if caller is trusted, false if not trusted
 222      */
 223     static VirtualThreadScheduler builtinScheduler(boolean trusted) {
 224         return trusted ? BUILTIN_SCHEDULER : EXTERNAL_VIEW;
 225     }
 226 
 227     /**
 228      * Returns the default scheduler, usually the same as the built-in scheduler.
 229      */
 230     static VirtualThreadScheduler defaultScheduler() {
 231         return DEFAULT_SCHEDULER;
 232     }
 233 
 234     /**
 235      * Returns the continuation scope used for virtual threads.
 236      */
 237     static ContinuationScope continuationScope() {
 238         return VTHREAD_SCOPE;
 239     }
 240 
 241     /**
 242      * Returns the task to start/continue this virtual thread.
 243      */
 244     VirtualThreadTask virtualThreadTask() {
 245         return runContinuation;
 246     }
 247 
 248     /**
 249      * Creates a new {@code VirtualThread} to run the given task with the given scheduler.
 250      *
 251      * @param scheduler the scheduler or null for default scheduler
 252      * @param preferredCarrier the preferred carrier or null
 253      * @param name thread name
 254      * @param characteristics characteristics
 255      * @param task the task to execute
 256      */
 257     VirtualThread(VirtualThreadScheduler scheduler,
 258                   Thread preferredCarrier,
 259                   String name,
 260                   int characteristics,
 261                   Runnable task) {
 262         super(name, characteristics, /*bound*/ false);
 263         Objects.requireNonNull(task);
 264 
 265         // use default scheduler if not provided
 266         if (scheduler == null) {
 267             scheduler = DEFAULT_SCHEDULER;
 268         } else if (scheduler == EXTERNAL_VIEW) {
 269             throw new UnsupportedOperationException();



 270         }

 271         this.scheduler = scheduler;
 272         this.cont = new VThreadContinuation(this, task);
 273 
 274         if (scheduler == BUILTIN_SCHEDULER) {
 275             this.runContinuation = new VThreadRunner(this);
 276         } else {
 277             this.runContinuation = new CustomSchedulerVThreadRunner(this, preferredCarrier);
 278         }
 279     }
 280 
 281     /**
 282      * The task to start/continue a virtual thread.
 283      */
 284     static non-sealed class VThreadRunner implements VirtualThreadTask {
 285         private final VirtualThread vthread;
 286         VThreadRunner(VirtualThread vthread) {
 287             this.vthread = vthread;
 288         }
 289         @Override
 290         public final Thread thread() {
 291             return vthread;
 292         }
 293         @Override
 294         public final void run() {
 295             vthread.runContinuation();;
 296         }
 297         @Override
 298         public Thread preferredCarrier() {
 299             throw new UnsupportedOperationException();
 300         }
 301         @Override
 302         public Object attach(Object att) {
 303             throw new UnsupportedOperationException();
 304         }
 305         @Override
 306         public Object attachment() {
 307             throw new UnsupportedOperationException();
 308         }
 309     }
 310 
 311     /**
 312      * The task to start/continue a virtual thread when using a custom scheduler.
 313      */
 314     static final class CustomSchedulerVThreadRunner extends VThreadRunner {
 315         private static final VarHandle ATT =
 316                 MhUtil.findVarHandle(MethodHandles.lookup(), "att", Object.class);
 317         private final Thread preferredCarrier;
 318         private volatile Object att;
 319         CustomSchedulerVThreadRunner(VirtualThread vthread, Thread preferredCarrier) {
 320             super(vthread);
 321             this.preferredCarrier = preferredCarrier;
 322         }
 323         @Override
 324         public Thread preferredCarrier() {
 325             return preferredCarrier;
 326         }
 327         @Override
 328         public Object attach(Object att) {
 329             return ATT.getAndSet(this, att);
 330         }
 331         @Override
 332         public Object attachment() {
 333             return att;
 334         }
 335     }
 336 
 337     /**
 338      * The continuation that a virtual thread executes.
 339      */
 340     private static class VThreadContinuation extends Continuation {
 341         VThreadContinuation(VirtualThread vthread, Runnable task) {
 342             super(VTHREAD_SCOPE, wrap(vthread, task));
 343         }
 344         @Override
 345         protected void onPinned(Continuation.Pinned reason) {
 346         }
 347         private static Runnable wrap(VirtualThread vthread, Runnable task) {
 348             return new Runnable() {
 349                 @Hidden
 350                 @JvmtiHideEvents
 351                 public void run() {
 352                     vthread.endFirstTransition();
 353                     try {
 354                         vthread.run(task);

 402             if (cont.isDone()) {
 403                 afterDone();
 404             } else {
 405                 afterYield();
 406             }
 407         }
 408     }
 409 
 410     /**
 411      * Cancel timeout task when continuing after timed-park or timed-wait.
 412      * The timeout task may be executing, or may have already completed.
 413      */
 414     private void cancelTimeoutTask() {
 415         if (timeoutTask != null) {
 416             timeoutTask.cancel(false);
 417             timeoutTask = null;
 418         }
 419     }
 420 
 421     /**
 422      * Submits the runContinuation task to the scheduler. For the built-in scheduler,
 423      * the task will be pushed to the local queue if possible, otherwise it will be
 424      * pushed to an external submission queue.













 425      * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown
 426      * @throws RejectedExecutionException
 427      */
 428     private void submitRunContinuation(boolean retryOnOOME) {
 429         boolean done = false;
 430         while (!done) {
 431             try {
 432                 // Pin the continuation to prevent the virtual thread from unmounting
 433                 // when submitting a task. For the default scheduler this ensures that
 434                 // the carrier doesn't change when pushing a task. For other schedulers
 435                 // it avoids deadlock that could arise due to carriers and virtual
 436                 // threads contending for a lock.
 437                 if (currentThread().isVirtual()) {
 438                     Continuation.pin();
 439                     try {
 440                         scheduler.onContinue(runContinuation);
 441                     } finally {
 442                         Continuation.unpin();
 443                     }
 444                 } else {
 445                     scheduler.onContinue(runContinuation);
 446                 }
 447                 done = true;
 448             } catch (RejectedExecutionException ree) {
 449                 submitFailed(ree);
 450                 throw ree;
 451             } catch (OutOfMemoryError e) {
 452                 if (retryOnOOME) {
 453                     U.park(false, 100_000_000); // 100ms
 454                 } else {
 455                     throw e;
 456                 }
 457             }
 458         }
 459     }
 460 


















 461     /**
 462      * Submits the runContinuation task to the scheduler. For the default scheduler,
 463      * and calling it on a worker thread, the task will be pushed to the local queue,
 464      * otherwise it will be pushed to an external submission queue.
 465      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
 466      * @throws RejectedExecutionException
 467      */
 468     private void submitRunContinuation() {
 469         submitRunContinuation(true);
 470     }
 471 
 472     /**
 473      * Invoked from a carrier thread to lazy submit the runContinuation task to the
 474      * carrier's local queue if the queue is empty. If not empty, or invoked by a thread
 475      * for a custom scheduler, then it just submits the task to the scheduler.
 476      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
 477      * @throws RejectedExecutionException
 478      * @see ForkJoinPool#lazySubmit(ForkJoinTask)
 479      */
 480     private void lazySubmitRunContinuation() {
 481         assert !currentThread().isVirtual();
 482         if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {

 483             try {
 484                 ct.getPool().lazySubmit(ForkJoinTask.adapt(runContinuation));
 485             } catch (RejectedExecutionException ree) {
 486                 submitFailed(ree);
 487                 throw ree;
 488             } catch (OutOfMemoryError e) {
 489                 submitRunContinuation();
 490             }
 491         } else {
 492             submitRunContinuation();
 493         }
 494     }
 495 
 496     /**
 497      * Invoked from a carrier thread to externally submit the runContinuation task to the
 498      * scheduler. If invoked by a thread for a custom scheduler, then it just submits the
 499      * task to the scheduler.
 500      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
 501      * @throws RejectedExecutionException
 502      * @see ForkJoinPool#externalSubmit(ForkJoinTask)
 503      */
 504     private void externalSubmitRunContinuation() {
 505         assert !currentThread().isVirtual();
 506         if (currentThread() instanceof CarrierThread ct) {
 507             try {
 508                 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation));
 509             } catch (RejectedExecutionException ree) {
 510                 submitFailed(ree);
 511                 throw ree;
 512             } catch (OutOfMemoryError e) {
 513                 submitRunContinuation();
 514             }
 515         } else {
 516             submitRunContinuation();
 517         }
 518     }
 519 
 520     /**
 521      * Invoked from Thread.start to externally submit the runContinuation task to the
 522      * scheduler. If this virtual thread is scheduled by the built-in scheduler,
 523      * and this method is called from a virtual thread scheduled by the built-in
 524      * scheduler, then it uses externalSubmit to ensure that the task is pushed to an
 525      * external submission queue rather than the local queue.
 526      * @throws RejectedExecutionException
 527      * @throws OutOfMemoryError
 528      * @see ForkJoinPool#externalSubmit(ForkJoinTask)
 529      */
 530     private void externalSubmitRunContinuationOrThrow() {
 531         try {
 532             if (currentThread().isVirtual()) {
 533                 // Pin the continuation to prevent the virtual thread from unmounting
 534                 // when submitting a task. This avoids deadlock that could arise due to
 535                 // carriers and virtual threads contending for a lock.
 536                 Continuation.pin();
 537                 try {
 538                     if (scheduler == BUILTIN_SCHEDULER
 539                             && currentCarrierThread() instanceof CarrierThread ct) {
 540                         ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation));
 541                     } else {
 542                         scheduler.onStart(runContinuation);
 543                     }
 544                 } finally {
 545                     Continuation.unpin();
 546                 }
 547             } else {
 548                 scheduler.onStart(runContinuation);
 549             }
 550         } catch (RejectedExecutionException ree) {
 551             submitFailed(ree);
 552             throw ree;
 553         }
 554     }
 555 
 556     /**
 557      * If enabled, emits a JFR VirtualThreadSubmitFailedEvent.
 558      */
 559     private void submitFailed(RejectedExecutionException ree) {
 560         var event = new VirtualThreadSubmitFailedEvent();
 561         if (event.isEnabled()) {
 562             event.javaThreadId = threadId();
 563             event.exceptionMessage = ree.getMessage();
 564             event.commit();
 565         }
 566     }
 567 
 568     /**
 569      * Runs a task in the context of this virtual thread.
 570      */
 571     private void run(Runnable task) {
 572         assert Thread.currentThread() == this && state == RUNNING;

 689                 long timeout = this.timeout;
 690                 assert timeout > 0;
 691                 timeoutTask = schedule(this::parkTimeoutExpired, timeout, NANOSECONDS);
 692                 setState(newState = TIMED_PARKED);
 693             }
 694 
 695             // may have been unparked while parking
 696             if (parkPermit && compareAndSetState(newState, UNPARKED)) {
 697                 // lazy submit if local queue is empty
 698                 lazySubmitRunContinuation();
 699             }
 700             return;
 701         }
 702 
 703         // Thread.yield
 704         if (s == YIELDING) {
 705             setState(YIELDED);
 706 
 707             // external submit if there are no tasks in the local task queue
 708             if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
 709                 externalSubmitRunContinuation();
 710             } else {
 711                 submitRunContinuation();
 712             }
 713             return;
 714         }
 715 
 716         // blocking on monitorenter
 717         if (s == BLOCKING) {
 718             setState(BLOCKED);
 719 
 720             // may have been unblocked while blocking
 721             if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
 722                 // lazy submit if local queue is empty
 723                 lazySubmitRunContinuation();
 724             }
 725             return;
 726         }
 727 
 728         // Object.wait
 729         if (s == WAITING || s == TIMED_WAITING) {

 849     @Override
 850     public void run() {
 851         // do nothing
 852     }
 853 
 854     /**
 855      * Parks until unparked or interrupted. If already unparked then the parking
 856      * permit is consumed and this method completes immediately (meaning it doesn't
 857      * yield). It also completes immediately if the interrupted status is set.
 858      */
 859     @Override
 860     void park() {
 861         assert Thread.currentThread() == this;
 862 
 863         // complete immediately if parking permit available or interrupted
 864         if (getAndSetParkPermit(false) || interrupted)
 865             return;
 866 
 867         // park the thread
 868         boolean yielded = false;
 869         long eventStartTime = VirtualThreadParkEvent.eventStartTime();
 870         setState(PARKING);
 871         try {
 872             yielded = yieldContinuation();
 873         } catch (OutOfMemoryError e) {
 874             // park on carrier
 875         } finally {
 876             assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
 877             if (yielded) {
 878                 VirtualThreadParkEvent.offer(eventStartTime, Long.MIN_VALUE);
 879             } else {
 880                 assert state() == PARKING;
 881                 setState(RUNNING);
 882             }
 883         }
 884 
 885         // park on the carrier thread when pinned
 886         if (!yielded) {
 887             parkOnCarrierThread(false, 0);
 888         }
 889     }
 890 
 891     /**
 892      * Parks up to the given waiting time or until unparked or interrupted.
 893      * If already unparked then the parking permit is consumed and this method
 894      * completes immediately (meaning it doesn't yield). It also completes immediately
 895      * if the interrupted status is set or the waiting time is {@code <= 0}.
 896      *
 897      * @param nanos the maximum number of nanoseconds to wait.
 898      */
 899     @Override
 900     void parkNanos(long nanos) {
 901         assert Thread.currentThread() == this;
 902 
 903         // complete immediately if parking permit available or interrupted
 904         if (getAndSetParkPermit(false) || interrupted)
 905             return;
 906 
 907         // park the thread for the waiting time
 908         if (nanos > 0) {
 909             long startTime = System.nanoTime();
 910 
 911             // park the thread, afterYield will schedule the thread to unpark
 912             boolean yielded = false;
 913             long eventStartTime = VirtualThreadParkEvent.eventStartTime();
 914             timeout = nanos;
 915             setState(TIMED_PARKING);
 916             try {
 917                 yielded = yieldContinuation();
 918             } catch (OutOfMemoryError e) {
 919                 // park on carrier
 920             } finally {
 921                 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
 922                 if (yielded) {
 923                     VirtualThreadParkEvent.offer(eventStartTime, nanos);
 924                 } else {
 925                     assert state() == TIMED_PARKING;
 926                     setState(RUNNING);
 927                 }
 928             }
 929 
 930             // park on carrier thread for remaining time when pinned (or OOME)
 931             if (!yielded) {
 932                 long remainingNanos = nanos - (System.nanoTime() - startTime);
 933                 parkOnCarrierThread(true, remainingNanos);
 934             }
 935         }
 936     }
 937 
 938     /**
 939      * Parks the current carrier thread up to the given waiting time or until
 940      * unparked or interrupted. If the virtual thread is interrupted then the
 941      * interrupted status will be propagated to the carrier thread.
 942      * @param timed true for a timed park, false for untimed
 943      * @param nanos the waiting time in nanoseconds
 944      */

1461     @JvmtiMountTransition
1462     private native void startFinalTransition();
1463 
1464     @IntrinsicCandidate
1465     @JvmtiMountTransition
1466     private native void startTransition(boolean mount);
1467 
1468     @IntrinsicCandidate
1469     @JvmtiMountTransition
1470     private native void endTransition(boolean mount);
1471 
1472     @IntrinsicCandidate
1473     private static native void notifyJvmtiDisableSuspend(boolean enter);
1474 
1475     private static native void registerNatives();
1476     static {
1477         registerNatives();
1478 
1479         // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
1480         var group = Thread.virtualThreadGroup();
1481 
1482         // ensure event class is initialized
1483         try {
1484             MethodHandles.lookup().ensureInitialized(VirtualThreadParkEvent.class);
1485         } catch (IllegalAccessException e) {
1486             throw new ExceptionInInitializerError(e);
1487         }
1488     }
1489 
1490     /**
1491      * Loads a VirtualThreadScheduler with the given class name. The class must be public
1492      * in an exported package, with public one-arg or no-arg constructor, and be visible
1493      * to the system class loader.
1494      * @param delegate the scheduler that the custom scheduler may delegate to
1495      * @param cn the class name of the custom scheduler
1496      */
1497     private static VirtualThreadScheduler loadCustomScheduler(VirtualThreadScheduler delegate, String cn) {
1498         VirtualThreadScheduler scheduler;
1499         try {
1500             Class<?> clazz = Class.forName(cn, true, ClassLoader.getSystemClassLoader());
1501             // 1-arg constructor
1502             try {
1503                 Constructor<?> ctor = clazz.getConstructor(VirtualThreadScheduler.class);
1504                 return (VirtualThreadScheduler) ctor.newInstance(delegate);
1505             } catch (NoSuchMethodException e) {
1506                 // 0-arg constructor
1507                 Constructor<?> ctor = clazz.getConstructor();
1508                 scheduler = (VirtualThreadScheduler) ctor.newInstance();
1509             }
1510         } catch (Exception ex) {
1511             throw new Error(ex);
1512         }
1513         System.err.println("WARNING: Using custom default scheduler, this is an experimental feature!");
1514         return scheduler;
1515     }
1516 
1517     /**
1518      * Creates the built-in ForkJoinPool scheduler.
1519      * @param wrapped true if wrapped by a custom default scheduler
1520      */
1521     private static VirtualThreadScheduler createBuiltinScheduler(boolean wrapped) {

1522         int parallelism, maxPoolSize, minRunnable;
1523         String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
1524         String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
1525         String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
1526         if (parallelismValue != null) {
1527             parallelism = Integer.parseInt(parallelismValue);
1528         } else {
1529             parallelism = Runtime.getRuntime().availableProcessors();
1530         }
1531         if (maxPoolSizeValue != null) {
1532             maxPoolSize = Integer.parseInt(maxPoolSizeValue);
1533             parallelism = Integer.min(parallelism, maxPoolSize);
1534         } else {
1535             maxPoolSize = Integer.max(parallelism, 256);
1536         }
1537         if (minRunnableValue != null) {
1538             minRunnable = Integer.parseInt(minRunnableValue);
1539         } else {
1540             minRunnable = Integer.max(parallelism / 2, 1);
1541         }
1542         if (Boolean.getBoolean("jdk.virtualThreadScheduler.useTPE")) {
1543             return new BuiltinThreadPoolExecutorScheduler(parallelism);
1544         } else {
1545             return new BuiltinForkJoinPoolScheduler(parallelism, maxPoolSize, minRunnable, wrapped);
1546         }
1547     }
1548 
1549     /**
1550      * The built-in ForkJoinPool scheduler.
1551      */
1552     private static class BuiltinForkJoinPoolScheduler
1553             extends ForkJoinPool implements VirtualThreadScheduler {
1554 
1555         BuiltinForkJoinPoolScheduler(int parallelism, int maxPoolSize, int minRunnable, boolean wrapped) {
1556             ForkJoinWorkerThreadFactory factory = wrapped
1557                     ? ForkJoinPool.defaultForkJoinWorkerThreadFactory
1558                     : CarrierThread::new;
1559             Thread.UncaughtExceptionHandler handler = (t, e) -> { };
1560             boolean asyncMode = true; // FIFO
1561             super(parallelism, factory, handler, asyncMode,
1562                     0, maxPoolSize, minRunnable, pool -> true, 30L, SECONDS);
1563         }
1564 
1565         @Override
1566         public void onStart(VirtualThreadTask task) {
1567             execute(ForkJoinTask.adapt(task));
1568         }
1569 
1570         @Override
1571         public void onContinue(VirtualThreadTask task) {
1572             execute(ForkJoinTask.adapt(task));
1573         }
1574 
1575         @Override
1576         public ScheduledFuture<?> schedule(Runnable task, long delay, TimeUnit unit) {
1577             return super.schedule(task, delay, unit);
1578         }
1579     }
1580 
1581     /**
1582      * Built-in ThreadPoolExecutor scheduler.
1583      */
1584     private static class BuiltinThreadPoolExecutorScheduler
1585             extends ThreadPoolExecutor implements VirtualThreadScheduler {
1586 
1587         BuiltinThreadPoolExecutorScheduler(int maxPoolSize) {
1588             ThreadFactory factory = task -> {
1589                 Thread t = InnocuousThread.newThread(task);
1590                 t.setDaemon(true);
1591                 return t;
1592             };
1593             super(maxPoolSize, maxPoolSize,
1594                     0L, SECONDS,
1595                     new LinkedTransferQueue<>(),
1596                     factory);
1597         }
1598 
1599         @Override
1600         public void onStart(VirtualThreadTask task) {
1601             execute(task);
1602         }
1603 
1604         @Override
1605         public void onContinue(VirtualThreadTask task) {
1606             execute(task);
1607         }
1608     }
1609 
1610     /**
1611      * Wraps the scheduler to avoid leaking a direct reference with
1612      * {@link VirtualThreadScheduler#current()}.
1613      */
1614     static VirtualThreadScheduler createExternalView(VirtualThreadScheduler delegate) {
1615         return new VirtualThreadScheduler() {
1616             private void check(VirtualThreadTask task) {
1617                 var vthread = (VirtualThread) task.thread();
1618                 VirtualThreadScheduler scheduler = vthread.scheduler;
1619                 if (scheduler != this && scheduler != DEFAULT_SCHEDULER) {
1620                     throw new IllegalArgumentException();
1621                 }
1622             }
1623             @Override
1624             public void onStart(VirtualThreadTask task) {
1625                 check(task);
1626                 delegate.onStart(task);
1627             }
1628             @Override
1629             public void onContinue(VirtualThreadTask task) {
1630                 check(task);
1631                 delegate.onContinue(task);
1632             }
1633             @Override
1634             public String toString() {
1635                 return delegate.toString();
1636             }
1637         };
1638     }
1639 
1640     /**
1641      * Schedule a runnable task to run after a delay.
1642      */
1643     private Future<?> schedule(Runnable command, long delay, TimeUnit unit) {
1644         if (USE_STPE) {


1645             return DelayedTaskSchedulers.schedule(command, delay, unit);
1646         } else {
1647             return scheduler.schedule(command, delay, unit);
1648         }
1649     }
1650 
1651     /**
1652      * Supports scheduling a runnable task to run after a delay. It uses a number
1653      * of ScheduledThreadPoolExecutor instances to reduce contention on the delayed
1654      * work queue used. This class is used when using a custom scheduler.
1655      */
1656     static class DelayedTaskSchedulers {
1657         private static final ScheduledExecutorService[] INSTANCE = createDelayedTaskSchedulers();
1658 
1659         static Future<?> schedule(Runnable command, long delay, TimeUnit unit) {
1660             long tid = Thread.currentThread().threadId();
1661             int index = (int) tid & (INSTANCE.length - 1);
1662             return INSTANCE[index].schedule(command, delay, unit);
1663         }
1664 
1665         private static ScheduledExecutorService[] createDelayedTaskSchedulers() {
1666             String propName = "jdk.virtualThreadScheduler.timerQueues";
1667             String propValue = System.getProperty(propName);
1668             int queueCount;
1669             if (propValue != null) {
1670                 queueCount = Integer.parseInt(propValue);
1671                 if (queueCount != Integer.highestOneBit(queueCount)) {
1672                     throw new RuntimeException("Value of " + propName + " must be power of 2");
1673                 }
1674             } else {
1675                 int ncpus = Runtime.getRuntime().availableProcessors();
1676                 queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1);
< prev index next >