< prev index next >

src/java.base/share/classes/java/lang/VirtualThread.java

Print this page

   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.lang;
  26 



  27 import java.util.Locale;
  28 import java.util.Objects;
  29 import java.util.concurrent.CountDownLatch;
  30 import java.util.concurrent.Executor;
  31 import java.util.concurrent.Executors;
  32 import java.util.concurrent.ForkJoinPool;
  33 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
  34 import java.util.concurrent.ForkJoinTask;
  35 import java.util.concurrent.Future;
  36 import java.util.concurrent.RejectedExecutionException;
  37 import java.util.concurrent.ScheduledExecutorService;
  38 import java.util.concurrent.ScheduledThreadPoolExecutor;
  39 import java.util.concurrent.TimeUnit;
  40 import jdk.internal.event.VirtualThreadEndEvent;

  41 import jdk.internal.event.VirtualThreadStartEvent;
  42 import jdk.internal.event.VirtualThreadSubmitFailedEvent;

  43 import jdk.internal.misc.CarrierThread;
  44 import jdk.internal.misc.InnocuousThread;
  45 import jdk.internal.misc.Unsafe;
  46 import jdk.internal.vm.Continuation;
  47 import jdk.internal.vm.ContinuationScope;
  48 import jdk.internal.vm.StackableScope;
  49 import jdk.internal.vm.ThreadContainer;
  50 import jdk.internal.vm.ThreadContainers;
  51 import jdk.internal.vm.annotation.ChangesCurrentThread;
  52 import jdk.internal.vm.annotation.Hidden;
  53 import jdk.internal.vm.annotation.IntrinsicCandidate;
  54 import jdk.internal.vm.annotation.JvmtiHideEvents;
  55 import jdk.internal.vm.annotation.JvmtiMountTransition;
  56 import jdk.internal.vm.annotation.ReservedStackAccess;
  57 import sun.nio.ch.Interruptible;
  58 import static java.util.concurrent.TimeUnit.*;
  59 
  60 /**
  61  * A thread that is scheduled by the Java virtual machine rather than the operating system.
  62  */
  63 final class VirtualThread extends BaseVirtualThread {
  64     private static final Unsafe U = Unsafe.getUnsafe();
  65     private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
  66     private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();



















  67 
  68     private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
  69     private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
  70     private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
  71     private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
  72     private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
  73 
  74     // scheduler and continuation
  75     private final Executor scheduler;
  76     private final Continuation cont;
  77     private final Runnable runContinuation;
  78 
  79     // virtual thread state, accessed by VM
  80     private volatile int state;
  81 
  82     /*
  83      * Virtual thread state transitions:
  84      *
  85      *      NEW -> STARTED         // Thread.start, schedule to run
  86      *  STARTED -> TERMINATED      // failed to start
  87      *  STARTED -> RUNNING         // first run
  88      *  RUNNING -> TERMINATED      // done
  89      *
  90      *  RUNNING -> PARKING         // Thread parking with LockSupport.park
  91      *  PARKING -> PARKED          // cont.yield successful, parked indefinitely
  92      *  PARKING -> PINNED          // cont.yield failed, parked indefinitely on carrier
  93      *   PARKED -> UNPARKED        // unparked, may be scheduled to continue
  94      *   PINNED -> RUNNING         // unparked, continue execution on same carrier
  95      * UNPARKED -> RUNNING         // continue execution after park
  96      *
  97      *       RUNNING -> TIMED_PARKING   // Thread parking with LockSupport.parkNanos

 172     private volatile boolean interruptibleWait;
 173 
 174     // timed-wait support
 175     private byte timedWaitSeqNo;
 176 
 177     // timeout for timed-park and timed-wait, only accessed on current/carrier thread
 178     private long timeout;
 179 
 180     // timer task for timed-park and timed-wait, only accessed on current/carrier thread
 181     private Future<?> timeoutTask;
 182 
 183     // carrier thread when mounted, accessed by VM
 184     private volatile Thread carrierThread;
 185 
 186     // termination object when joining, created lazily if needed
 187     private volatile CountDownLatch termination;
 188 
 189     /**
 190      * Returns the default scheduler.
 191      */
 192     static Executor defaultScheduler() {
 193         return DEFAULT_SCHEDULER;
 194     }
 195 







 196     /**
 197      * Returns the continuation scope used for virtual threads.
 198      */
 199     static ContinuationScope continuationScope() {
 200         return VTHREAD_SCOPE;
 201     }
 202 
 203     /**
 204      * Creates a new {@code VirtualThread} to run the given task with the given
 205      * scheduler. If the given scheduler is {@code null} and the current thread
 206      * is a platform thread then the newly created virtual thread will use the
 207      * default scheduler. If given scheduler is {@code null} and the current
 208      * thread is a virtual thread then the current thread's scheduler is used.








 209      *
 210      * @param scheduler the scheduler or null
 211      * @param name thread name
 212      * @param characteristics characteristics
 213      * @param task the task to execute
 214      */
 215     VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {



 216         super(name, characteristics, /*bound*/ false);
 217         Objects.requireNonNull(task);
 218 
 219         // choose scheduler if not specified
 220         if (scheduler == null) {
 221             Thread parent = Thread.currentThread();
 222             if (parent instanceof VirtualThread vparent) {
 223                 scheduler = vparent.scheduler;
 224             } else {
 225                 scheduler = DEFAULT_SCHEDULER;
 226             }
 227         }
 228 
 229         this.scheduler = scheduler;
 230         this.cont = new VThreadContinuation(this, task);
 231         this.runContinuation = this::runContinuation;




























































 232     }
 233 
 234     /**
 235      * The continuation that a virtual thread executes.
 236      */
 237     private static class VThreadContinuation extends Continuation {
 238         VThreadContinuation(VirtualThread vthread, Runnable task) {
 239             super(VTHREAD_SCOPE, wrap(vthread, task));
 240         }
 241         @Override
 242         protected void onPinned(Continuation.Pinned reason) {
 243         }
 244         private static Runnable wrap(VirtualThread vthread, Runnable task) {
 245             return new Runnable() {
 246                 @Hidden
 247                 @JvmtiHideEvents
 248                 public void run() {
 249                     vthread.notifyJvmtiStart(); // notify JVMTI
 250                     try {
 251                         vthread.run(task);

 299             if (cont.isDone()) {
 300                 afterDone();
 301             } else {
 302                 afterYield();
 303             }
 304         }
 305     }
 306 
 307     /**
 308      * Cancel timeout task when continuing after timed-park or timed-wait.
 309      * The timeout task may be executing, or may have already completed.
 310      */
 311     private void cancelTimeoutTask() {
 312         if (timeoutTask != null) {
 313             timeoutTask.cancel(false);
 314             timeoutTask = null;
 315         }
 316     }
 317 
 318     /**
 319      * Submits the runContinuation task to the scheduler. For the default scheduler,
 320      * and calling it on a worker thread, the task will be pushed to the local queue,
 321      * otherwise it will be pushed to an external submission queue.
 322      * @param scheduler the scheduler
 323      * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown
 324      * @throws RejectedExecutionException
 325      */
 326     private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) {
 327         boolean done = false;
 328         while (!done) {
 329             try {
 330                 // Pin the continuation to prevent the virtual thread from unmounting
 331                 // when submitting a task. For the default scheduler this ensures that
 332                 // the carrier doesn't change when pushing a task. For other schedulers
 333                 // it avoids deadlock that could arise due to carriers and virtual
 334                 // threads contending for a lock.
 335                 if (currentThread().isVirtual()) {
 336                     Continuation.pin();
 337                     try {
 338                         scheduler.execute(runContinuation);
 339                     } finally {
 340                         Continuation.unpin();
 341                     }
 342                 } else {
 343                     scheduler.execute(runContinuation);
 344                 }
 345                 done = true;
 346             } catch (RejectedExecutionException ree) {
 347                 submitFailed(ree);
 348                 throw ree;
 349             } catch (OutOfMemoryError e) {
 350                 if (retryOnOOME) {
 351                     U.park(false, 100_000_000); // 100ms
 352                 } else {
 353                     throw e;
 354                 }
 355             }
 356         }
 357     }
 358 
 359     /**
 360      * Submits the runContinuation task to the given scheduler as an external submit.
 361      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
 362      * @throws RejectedExecutionException
 363      * @see ForkJoinPool#externalSubmit(ForkJoinTask)
 364      */
 365     private void externalSubmitRunContinuation(ForkJoinPool pool) {
 366         assert Thread.currentThread() instanceof CarrierThread;
 367         try {
 368             pool.externalSubmit(ForkJoinTask.adapt(runContinuation));
 369         } catch (RejectedExecutionException ree) {
 370             submitFailed(ree);
 371             throw ree;
 372         } catch (OutOfMemoryError e) {
 373             submitRunContinuation(pool, true);
 374         }
 375     }
 376 
 377     /**
 378      * Submits the runContinuation task to the scheduler. For the default scheduler,
 379      * and calling it on a worker thread, the task will be pushed to the local queue,
 380      * otherwise it will be pushed to an external submission queue.
 381      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
 382      * @throws RejectedExecutionException
 383      */
 384     private void submitRunContinuation() {
 385         submitRunContinuation(scheduler, true);
 386     }
 387 
 388     /**
 389      * Lazy submit the runContinuation task if invoked on a carrier thread and its local
 390      * queue is empty. If not empty, or invoked by another thread, then this method works
 391      * like submitRunContinuation and just submits the task to the scheduler.
 392      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
 393      * @throws RejectedExecutionException
 394      * @see ForkJoinPool#lazySubmit(ForkJoinTask)
 395      */
 396     private void lazySubmitRunContinuation() {

 397         if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
 398             ForkJoinPool pool = ct.getPool();
 399             try {
 400                 pool.lazySubmit(ForkJoinTask.adapt(runContinuation));
 401             } catch (RejectedExecutionException ree) {
 402                 submitFailed(ree);
 403                 throw ree;
 404             } catch (OutOfMemoryError e) {
 405                 submitRunContinuation();
 406             }
 407         } else {
 408             submitRunContinuation();
 409         }
 410     }
 411 
 412     /**
 413      * Submits the runContinuation task to the scheduler. For the default scheduler, and
 414      * calling it a virtual thread that uses the default scheduler, the task will be
 415      * pushed to an external submission queue. This method may throw OutOfMemoryError.

 416      * @throws RejectedExecutionException
 417      * @throws OutOfMemoryError
 418      */
 419     private void externalSubmitRunContinuationOrThrow() {
 420         if (scheduler == DEFAULT_SCHEDULER && currentCarrierThread() instanceof CarrierThread ct) {

 421             try {
 422                 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation));
 423             } catch (RejectedExecutionException ree) {
 424                 submitFailed(ree);
 425                 throw ree;


 426             }
 427         } else {
 428             submitRunContinuation(scheduler, false);




































 429         }
 430     }
 431 
 432     /**
 433      * If enabled, emits a JFR VirtualThreadSubmitFailedEvent.
 434      */
 435     private void submitFailed(RejectedExecutionException ree) {
 436         var event = new VirtualThreadSubmitFailedEvent();
 437         if (event.isEnabled()) {
 438             event.javaThreadId = threadId();
 439             event.exceptionMessage = ree.getMessage();
 440             event.commit();
 441         }
 442     }
 443 
 444     /**
 445      * Runs a task in the context of this virtual thread.
 446      */
 447     private void run(Runnable task) {
 448         assert Thread.currentThread() == this && state == RUNNING;

 563                 long timeout = this.timeout;
 564                 assert timeout > 0;
 565                 timeoutTask = schedule(this::parkTimeoutExpired, timeout, NANOSECONDS);
 566                 setState(newState = TIMED_PARKED);
 567             }
 568 
 569             // may have been unparked while parking
 570             if (parkPermit && compareAndSetState(newState, UNPARKED)) {
 571                 // lazy submit if local queue is empty
 572                 lazySubmitRunContinuation();
 573             }
 574             return;
 575         }
 576 
 577         // Thread.yield
 578         if (s == YIELDING) {
 579             setState(YIELDED);
 580 
 581             // external submit if there are no tasks in the local task queue
 582             if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
 583                 externalSubmitRunContinuation(ct.getPool());
 584             } else {
 585                 submitRunContinuation();
 586             }
 587             return;
 588         }
 589 
 590         // blocking on monitorenter
 591         if (s == BLOCKING) {
 592             setState(BLOCKED);
 593 
 594             // may have been unblocked while blocking
 595             if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
 596                 // lazy submit if local queue is empty
 597                 lazySubmitRunContinuation();
 598             }
 599             return;
 600         }
 601 
 602         // Object.wait
 603         if (s == WAITING || s == TIMED_WAITING) {

 720     @Override
 721     public void run() {
 722         // do nothing
 723     }
 724 
 725     /**
 726      * Parks until unparked or interrupted. If already unparked then the parking
 727      * permit is consumed and this method completes immediately (meaning it doesn't
 728      * yield). It also completes immediately if the interrupted status is set.
 729      */
 730     @Override
 731     void park() {
 732         assert Thread.currentThread() == this;
 733 
 734         // complete immediately if parking permit available or interrupted
 735         if (getAndSetParkPermit(false) || interrupted)
 736             return;
 737 
 738         // park the thread
 739         boolean yielded = false;

 740         setState(PARKING);
 741         try {
 742             yielded = yieldContinuation();
 743         } catch (OutOfMemoryError e) {
 744             // park on carrier
 745         } finally {
 746             assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
 747             if (!yielded) {


 748                 assert state() == PARKING;
 749                 setState(RUNNING);
 750             }
 751         }
 752 
 753         // park on the carrier thread when pinned
 754         if (!yielded) {
 755             parkOnCarrierThread(false, 0);
 756         }
 757     }
 758 
 759     /**
 760      * Parks up to the given waiting time or until unparked or interrupted.
 761      * If already unparked then the parking permit is consumed and this method
 762      * completes immediately (meaning it doesn't yield). It also completes immediately
 763      * if the interrupted status is set or the waiting time is {@code <= 0}.
 764      *
 765      * @param nanos the maximum number of nanoseconds to wait.
 766      */
 767     @Override
 768     void parkNanos(long nanos) {
 769         assert Thread.currentThread() == this;
 770 
 771         // complete immediately if parking permit available or interrupted
 772         if (getAndSetParkPermit(false) || interrupted)
 773             return;
 774 
 775         // park the thread for the waiting time
 776         if (nanos > 0) {
 777             long startTime = System.nanoTime();
 778 
 779             // park the thread, afterYield will schedule the thread to unpark
 780             boolean yielded = false;

 781             timeout = nanos;
 782             setState(TIMED_PARKING);
 783             try {
 784                 yielded = yieldContinuation();
 785             } catch (OutOfMemoryError e) {
 786                 // park on carrier
 787             } finally {
 788                 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
 789                 if (!yielded) {


 790                     assert state() == TIMED_PARKING;
 791                     setState(RUNNING);
 792                 }
 793             }
 794 
 795             // park on carrier thread for remaining time when pinned (or OOME)
 796             if (!yielded) {
 797                 long remainingNanos = nanos - (System.nanoTime() - startTime);
 798                 parkOnCarrierThread(true, remainingNanos);
 799             }
 800         }
 801     }
 802 
 803     /**
 804      * Parks the current carrier thread up to the given waiting time or until
 805      * unparked or interrupted. If the virtual thread is interrupted then the
 806      * interrupted status will be propagated to the carrier thread.
 807      * @param timed true for a timed park, false for untimed
 808      * @param nanos the waiting time in nanoseconds
 809      */

1399     @JvmtiMountTransition
1400     private native void notifyJvmtiEnd();
1401 
1402     @IntrinsicCandidate
1403     @JvmtiMountTransition
1404     private native void notifyJvmtiMount(boolean hide);
1405 
1406     @IntrinsicCandidate
1407     @JvmtiMountTransition
1408     private native void notifyJvmtiUnmount(boolean hide);
1409 
1410     @IntrinsicCandidate
1411     private static native void notifyJvmtiDisableSuspend(boolean enter);
1412 
1413     private static native void registerNatives();
1414     static {
1415         registerNatives();
1416 
1417         // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
1418         var group = Thread.virtualThreadGroup();



































1419     }
1420 
1421     /**
1422      * Creates the default ForkJoinPool scheduler.

1423      */
1424     private static ForkJoinPool createDefaultScheduler() {
1425         ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool);
1426         int parallelism, maxPoolSize, minRunnable;
1427         String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
1428         String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
1429         String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
1430         if (parallelismValue != null) {
1431             parallelism = Integer.parseInt(parallelismValue);
1432         } else {
1433             parallelism = Runtime.getRuntime().availableProcessors();
1434         }
1435         if (maxPoolSizeValue != null) {
1436             maxPoolSize = Integer.parseInt(maxPoolSizeValue);
1437             parallelism = Integer.min(parallelism, maxPoolSize);
1438         } else {
1439             maxPoolSize = Integer.max(parallelism, 256);
1440         }
1441         if (minRunnableValue != null) {
1442             minRunnable = Integer.parseInt(minRunnableValue);
1443         } else {
1444             minRunnable = Integer.max(parallelism / 2, 1);
1445         }
1446         Thread.UncaughtExceptionHandler handler = (t, e) -> { };
1447         boolean asyncMode = true; // FIFO
1448         return new ForkJoinPool(parallelism, factory, handler, asyncMode,
1449                      0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);


























































1450     }
1451 
1452     /**
1453      * Schedule a runnable task to run after a delay.
1454      */
1455     private Future<?> schedule(Runnable command, long delay, TimeUnit unit) {
1456         if (scheduler instanceof ForkJoinPool pool) {
1457             return pool.schedule(command, delay, unit);
1458         } else {
1459             return DelayedTaskSchedulers.schedule(command, delay, unit);
1460         }
1461     }
1462 
1463     /**
1464      * Supports scheduling a runnable task to run after a delay. It uses a number
1465      * of ScheduledThreadPoolExecutor instances to reduce contention on the delayed
1466      * work queue used. This class is used when using a custom scheduler.
1467      */
1468     private static class DelayedTaskSchedulers {
1469         private static final ScheduledExecutorService[] INSTANCE = createDelayedTaskSchedulers();

1519                 assert changed;
1520                 vthread.unblock();
1521 
1522                 vthread = nextThread;
1523             }
1524         }
1525     }
1526 
1527     /**
1528      * Retrieves the list of virtual threads that are waiting to be unblocked, waiting
1529      * if necessary until a list of one or more threads becomes available.
1530      */
1531     private static native VirtualThread takeVirtualThreadListToUnblock();
1532 
1533     static {
1534         var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
1535                 VirtualThread::unblockVirtualThreads);
1536         unblocker.setDaemon(true);
1537         unblocker.start();
1538     }
1539 }

   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.lang;
  26 
  27 import java.lang.invoke.MethodHandles;
  28 import java.lang.invoke.VarHandle;
  29 import java.lang.reflect.Constructor;
  30 import java.util.Locale;
  31 import java.util.Objects;
  32 import java.util.concurrent.CountDownLatch;

  33 import java.util.concurrent.Executors;
  34 import java.util.concurrent.ForkJoinPool;

  35 import java.util.concurrent.ForkJoinTask;
  36 import java.util.concurrent.Future;
  37 import java.util.concurrent.RejectedExecutionException;
  38 import java.util.concurrent.ScheduledExecutorService;
  39 import java.util.concurrent.ScheduledThreadPoolExecutor;
  40 import java.util.concurrent.TimeUnit;
  41 import jdk.internal.event.VirtualThreadEndEvent;
  42 import jdk.internal.event.VirtualThreadParkEvent;
  43 import jdk.internal.event.VirtualThreadStartEvent;
  44 import jdk.internal.event.VirtualThreadSubmitFailedEvent;
  45 import jdk.internal.invoke.MhUtil;
  46 import jdk.internal.misc.CarrierThread;
  47 import jdk.internal.misc.InnocuousThread;
  48 import jdk.internal.misc.Unsafe;
  49 import jdk.internal.vm.Continuation;
  50 import jdk.internal.vm.ContinuationScope;
  51 import jdk.internal.vm.StackableScope;
  52 import jdk.internal.vm.ThreadContainer;
  53 import jdk.internal.vm.ThreadContainers;
  54 import jdk.internal.vm.annotation.ChangesCurrentThread;
  55 import jdk.internal.vm.annotation.Hidden;
  56 import jdk.internal.vm.annotation.IntrinsicCandidate;
  57 import jdk.internal.vm.annotation.JvmtiHideEvents;
  58 import jdk.internal.vm.annotation.JvmtiMountTransition;
  59 import jdk.internal.vm.annotation.ReservedStackAccess;
  60 import sun.nio.ch.Interruptible;
  61 import static java.util.concurrent.TimeUnit.*;
  62 
  63 /**
  64  * A thread that is scheduled by the Java virtual machine rather than the operating system.
  65  */
  66 final class VirtualThread extends BaseVirtualThread {
  67     private static final Unsafe U = Unsafe.getUnsafe();
  68     private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
  69 
  70     private static final ForkJoinPool BUILTIN_SCHEDULER;
  71     private static final VirtualThreadScheduler DEFAULT_SCHEDULER;
  72     static {
  73         // experimental
  74         String propValue = System.getProperty("jdk.virtualThreadScheduler.implClass");
  75         if (propValue != null) {
  76             var builtinScheduler = createBuiltinDefaultScheduler(true);
  77             VirtualThreadScheduler defaultScheduler = builtinScheduler.externalView();
  78             for (String cn: propValue.split(",")) {
  79                 defaultScheduler = loadCustomScheduler(defaultScheduler, cn);
  80             }
  81             BUILTIN_SCHEDULER = builtinScheduler;
  82             DEFAULT_SCHEDULER = defaultScheduler;
  83         } else {
  84             var builtinScheduler = createBuiltinDefaultScheduler(false);
  85             BUILTIN_SCHEDULER = builtinScheduler;
  86             DEFAULT_SCHEDULER = builtinScheduler;
  87         }
  88     }
  89 
  90     private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
  91     private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
  92     private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
  93     private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
  94     private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
  95 
  96     // scheduler and continuation
  97     private final VirtualThreadScheduler scheduler;
  98     private final Continuation cont;
  99     private final VirtualThreadTask runContinuation;
 100 
 101     // virtual thread state, accessed by VM
 102     private volatile int state;
 103 
 104     /*
 105      * Virtual thread state transitions:
 106      *
 107      *      NEW -> STARTED         // Thread.start, schedule to run
 108      *  STARTED -> TERMINATED      // failed to start
 109      *  STARTED -> RUNNING         // first run
 110      *  RUNNING -> TERMINATED      // done
 111      *
 112      *  RUNNING -> PARKING         // Thread parking with LockSupport.park
 113      *  PARKING -> PARKED          // cont.yield successful, parked indefinitely
 114      *  PARKING -> PINNED          // cont.yield failed, parked indefinitely on carrier
 115      *   PARKED -> UNPARKED        // unparked, may be scheduled to continue
 116      *   PINNED -> RUNNING         // unparked, continue execution on same carrier
 117      * UNPARKED -> RUNNING         // continue execution after park
 118      *
 119      *       RUNNING -> TIMED_PARKING   // Thread parking with LockSupport.parkNanos

 194     private volatile boolean interruptibleWait;
 195 
 196     // timed-wait support
 197     private byte timedWaitSeqNo;
 198 
 199     // timeout for timed-park and timed-wait, only accessed on current/carrier thread
 200     private long timeout;
 201 
 202     // timer task for timed-park and timed-wait, only accessed on current/carrier thread
 203     private Future<?> timeoutTask;
 204 
 205     // carrier thread when mounted, accessed by VM
 206     private volatile Thread carrierThread;
 207 
 208     // termination object when joining, created lazily if needed
 209     private volatile CountDownLatch termination;
 210 
 211     /**
 212      * Returns the default scheduler.
 213      */
 214     static VirtualThreadScheduler defaultScheduler() {
 215         return DEFAULT_SCHEDULER;
 216     }
 217 
 218     /**
 219      * Returns true if using a custom default scheduler.
 220      */
 221     static boolean isCustomDefaultScheduler() {
 222         return DEFAULT_SCHEDULER != BUILTIN_SCHEDULER;
 223     }
 224 
 225     /**
 226      * Returns the continuation scope used for virtual threads.
 227      */
 228     static ContinuationScope continuationScope() {
 229         return VTHREAD_SCOPE;
 230     }
 231 
 232     /**
 233      * Return the scheduler for this thread.
 234      * @param revealBuiltin true to reveal the built-in scheduler, false to hide
 235      */
 236     VirtualThreadScheduler scheduler(boolean revealBuiltin) {
 237         if (scheduler instanceof BuiltinDefaultScheduler builtin && !revealBuiltin) {
 238             return builtin.externalView();
 239         } else {
 240             return scheduler;
 241         }
 242     }
 243 
 244     /**
 245      * Creates a new {@code VirtualThread} to run the given task with the given scheduler.
 246      *
 247      * @param scheduler the scheduler or null for default scheduler
 248      * @param name thread name
 249      * @param characteristics characteristics
 250      * @param task the task to execute
 251      */
 252     VirtualThread(VirtualThreadScheduler scheduler,
 253                   String name,
 254                   int characteristics,
 255                   Runnable task) {
 256         super(name, characteristics, /*bound*/ false);
 257         Objects.requireNonNull(task);
 258 
 259         // use default scheduler if not provided
 260         if (scheduler == null) {
 261             scheduler = DEFAULT_SCHEDULER;





 262         }
 263 
 264         this.scheduler = scheduler;
 265         this.cont = new VThreadContinuation(this, task);
 266 
 267         if (scheduler == BUILTIN_SCHEDULER) {
 268             this.runContinuation = new BuiltinSchedulerTask(this);
 269         } else {
 270             this.runContinuation = new CustomSchedulerTask(this);
 271         }
 272     }
 273 
 274     /**
 275      * The task to execute when using the built-in scheduler.
 276      */
 277     static final class BuiltinSchedulerTask implements VirtualThreadTask {
 278         private final VirtualThread vthread;
 279         BuiltinSchedulerTask(VirtualThread vthread) {
 280             this.vthread = vthread;
 281         }
 282         @Override
 283         public Object attach(Object att) {
 284             throw new UnsupportedOperationException();
 285         }
 286         @Override
 287         public Object attachment() {
 288             throw new UnsupportedOperationException();
 289         }
 290         @Override
 291         public Thread thread() {
 292             return vthread;
 293         }
 294         @Override
 295         public void run() {
 296             vthread.runContinuation();;
 297         }
 298     }
 299 
 300     /**
 301      * The task to execute when using a custom scheduler.
 302      */
 303     static final class CustomSchedulerTask implements VirtualThreadTask {
 304         private static final VarHandle ATT =
 305                 MhUtil.findVarHandle(MethodHandles.lookup(), "att", Object.class);
 306         private final VirtualThread vthread;
 307         private volatile Object att;
 308         CustomSchedulerTask(VirtualThread vthread) {
 309             this.vthread = vthread;
 310         }
 311         @Override
 312         public Object attach(Object att) {
 313             return ATT.getAndSet(this, att);
 314         }
 315         @Override
 316         public Object attachment() {
 317             return att;
 318         }
 319         @Override
 320         public Thread thread() {
 321             return vthread;
 322         }
 323         @Override
 324         public void run() {
 325             vthread.runContinuation();;
 326         }
 327     }
 328 
 329     /**
 330      * The continuation that a virtual thread executes.
 331      */
 332     private static class VThreadContinuation extends Continuation {
 333         VThreadContinuation(VirtualThread vthread, Runnable task) {
 334             super(VTHREAD_SCOPE, wrap(vthread, task));
 335         }
 336         @Override
 337         protected void onPinned(Continuation.Pinned reason) {
 338         }
 339         private static Runnable wrap(VirtualThread vthread, Runnable task) {
 340             return new Runnable() {
 341                 @Hidden
 342                 @JvmtiHideEvents
 343                 public void run() {
 344                     vthread.notifyJvmtiStart(); // notify JVMTI
 345                     try {
 346                         vthread.run(task);

 394             if (cont.isDone()) {
 395                 afterDone();
 396             } else {
 397                 afterYield();
 398             }
 399         }
 400     }
 401 
 402     /**
 403      * Cancel timeout task when continuing after timed-park or timed-wait.
 404      * The timeout task may be executing, or may have already completed.
 405      */
 406     private void cancelTimeoutTask() {
 407         if (timeoutTask != null) {
 408             timeoutTask.cancel(false);
 409             timeoutTask = null;
 410         }
 411     }
 412 
 413     /**
 414      * Submits the runContinuation task to the scheduler. For the built-in scheduler,
 415      * the task will be pushed to the local queue if possible, otherwise it will be
 416      * pushed to an external submission queue.

 417      * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown
 418      * @throws RejectedExecutionException
 419      */
 420     private void submitRunContinuation(boolean retryOnOOME) {
 421         boolean done = false;
 422         while (!done) {
 423             try {
 424                 // Pin the continuation to prevent the virtual thread from unmounting
 425                 // when submitting a task. For the default scheduler this ensures that
 426                 // the carrier doesn't change when pushing a task. For other schedulers
 427                 // it avoids deadlock that could arise due to carriers and virtual
 428                 // threads contending for a lock.
 429                 if (currentThread().isVirtual()) {
 430                     Continuation.pin();
 431                     try {
 432                         scheduler.onContinue(runContinuation);
 433                     } finally {
 434                         Continuation.unpin();
 435                     }
 436                 } else {
 437                     scheduler.onContinue(runContinuation);
 438                 }
 439                 done = true;
 440             } catch (RejectedExecutionException ree) {
 441                 submitFailed(ree);
 442                 throw ree;
 443             } catch (OutOfMemoryError e) {
 444                 if (retryOnOOME) {
 445                     U.park(false, 100_000_000); // 100ms
 446                 } else {
 447                     throw e;
 448                 }
 449             }
 450         }
 451     }
 452 


















 453     /**
 454      * Submits the runContinuation task to the scheduler. For the default scheduler,
 455      * and calling it on a worker thread, the task will be pushed to the local queue,
 456      * otherwise it will be pushed to an external submission queue.
 457      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
 458      * @throws RejectedExecutionException
 459      */
 460     private void submitRunContinuation() {
 461         submitRunContinuation(true);
 462     }
 463 
 464     /**
 465      * Invoked from a carrier thread to lazy submit the runContinuation task to the
 466      * carrier's local queue if the queue is empty. If not empty, or invoked by a thread
 467      * for a custom scheduler, then it just submits the task to the scheduler.
 468      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
 469      * @throws RejectedExecutionException
 470      * @see ForkJoinPool#lazySubmit(ForkJoinTask)
 471      */
 472     private void lazySubmitRunContinuation() {
 473         assert !currentThread().isVirtual();
 474         if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {

 475             try {
 476                 ct.getPool().lazySubmit(ForkJoinTask.adapt(runContinuation));
 477             } catch (RejectedExecutionException ree) {
 478                 submitFailed(ree);
 479                 throw ree;
 480             } catch (OutOfMemoryError e) {
 481                 submitRunContinuation();
 482             }
 483         } else {
 484             submitRunContinuation();
 485         }
 486     }
 487 
 488     /**
 489      * Invoked from a carrier thread to externally submit the runContinuation task to the
 490      * scheduler. If invoked by a thread for a custom scheduler, then it just submits the
 491      * task to the scheduler.
 492      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
 493      * @throws RejectedExecutionException
 494      * @see ForkJoinPool#externalSubmit(ForkJoinTask)
 495      */
 496     private void externalSubmitRunContinuation() {
 497         assert !currentThread().isVirtual();
 498         if (currentThread() instanceof CarrierThread ct) {
 499             try {
 500                 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation));
 501             } catch (RejectedExecutionException ree) {
 502                 submitFailed(ree);
 503                 throw ree;
 504             } catch (OutOfMemoryError e) {
 505                 submitRunContinuation();
 506             }
 507         } else {
 508             submitRunContinuation();
 509         }
 510     }
 511 
 512     /**
 513      * Invoked from Thread.start to externally submit the runContinuation task to the
 514      * scheduler. If this virtual thread is scheduled by the built-in scheduler,
 515      * and this method is called from a virtual thread scheduled by the built-in
 516      * scheduler, then it uses externalSubmit to ensure that the task is pushed to an
 517      * external submission queue rather than the local queue.
 518      * @throws RejectedExecutionException
 519      * @throws OutOfMemoryError
 520      * @see ForkJoinPool#externalSubmit(ForkJoinTask)
 521      */
 522     private void externalSubmitRunContinuationOrThrow() {
 523         try {
 524             if (currentThread().isVirtual()) {
 525                 // Pin the continuation to prevent the virtual thread from unmounting
 526                 // when submitting a task. This avoids deadlock that could arise due to
 527                 // carriers and virtual threads contending for a lock.
 528                 Continuation.pin();
 529                 try {
 530                     if (scheduler == BUILTIN_SCHEDULER
 531                             && currentCarrierThread() instanceof CarrierThread ct) {
 532                         ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation));
 533                     } else {
 534                         scheduler.onStart(runContinuation);
 535                     }
 536                 } finally {
 537                     Continuation.unpin();
 538                 }
 539             } else {
 540                 scheduler.onStart(runContinuation);
 541             }
 542         } catch (RejectedExecutionException ree) {
 543             submitFailed(ree);
 544             throw ree;
 545         }
 546     }
 547 
 548     /**
 549      * If enabled, emits a JFR VirtualThreadSubmitFailedEvent.
 550      */
 551     private void submitFailed(RejectedExecutionException ree) {
 552         var event = new VirtualThreadSubmitFailedEvent();
 553         if (event.isEnabled()) {
 554             event.javaThreadId = threadId();
 555             event.exceptionMessage = ree.getMessage();
 556             event.commit();
 557         }
 558     }
 559 
 560     /**
 561      * Runs a task in the context of this virtual thread.
 562      */
 563     private void run(Runnable task) {
 564         assert Thread.currentThread() == this && state == RUNNING;

 679                 long timeout = this.timeout;
 680                 assert timeout > 0;
 681                 timeoutTask = schedule(this::parkTimeoutExpired, timeout, NANOSECONDS);
 682                 setState(newState = TIMED_PARKED);
 683             }
 684 
 685             // may have been unparked while parking
 686             if (parkPermit && compareAndSetState(newState, UNPARKED)) {
 687                 // lazy submit if local queue is empty
 688                 lazySubmitRunContinuation();
 689             }
 690             return;
 691         }
 692 
 693         // Thread.yield
 694         if (s == YIELDING) {
 695             setState(YIELDED);
 696 
 697             // external submit if there are no tasks in the local task queue
 698             if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
 699                 externalSubmitRunContinuation();
 700             } else {
 701                 submitRunContinuation();
 702             }
 703             return;
 704         }
 705 
 706         // blocking on monitorenter
 707         if (s == BLOCKING) {
 708             setState(BLOCKED);
 709 
 710             // may have been unblocked while blocking
 711             if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
 712                 // lazy submit if local queue is empty
 713                 lazySubmitRunContinuation();
 714             }
 715             return;
 716         }
 717 
 718         // Object.wait
 719         if (s == WAITING || s == TIMED_WAITING) {

 836     @Override
 837     public void run() {
 838         // do nothing
 839     }
 840 
 841     /**
 842      * Parks until unparked or interrupted. If already unparked then the parking
 843      * permit is consumed and this method completes immediately (meaning it doesn't
 844      * yield). It also completes immediately if the interrupted status is set.
 845      */
 846     @Override
 847     void park() {
 848         assert Thread.currentThread() == this;
 849 
 850         // complete immediately if parking permit available or interrupted
 851         if (getAndSetParkPermit(false) || interrupted)
 852             return;
 853 
 854         // park the thread
 855         boolean yielded = false;
 856         long eventStartTime = VirtualThreadParkEvent.eventStartTime();
 857         setState(PARKING);
 858         try {
 859             yielded = yieldContinuation();
 860         } catch (OutOfMemoryError e) {
 861             // park on carrier
 862         } finally {
 863             assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
 864             if (yielded) {
 865                 VirtualThreadParkEvent.offer(eventStartTime, Long.MIN_VALUE);
 866             } else {
 867                 assert state() == PARKING;
 868                 setState(RUNNING);
 869             }
 870         }
 871 
 872         // park on the carrier thread when pinned
 873         if (!yielded) {
 874             parkOnCarrierThread(false, 0);
 875         }
 876     }
 877 
 878     /**
 879      * Parks up to the given waiting time or until unparked or interrupted.
 880      * If already unparked then the parking permit is consumed and this method
 881      * completes immediately (meaning it doesn't yield). It also completes immediately
 882      * if the interrupted status is set or the waiting time is {@code <= 0}.
 883      *
 884      * @param nanos the maximum number of nanoseconds to wait.
 885      */
 886     @Override
 887     void parkNanos(long nanos) {
 888         assert Thread.currentThread() == this;
 889 
 890         // complete immediately if parking permit available or interrupted
 891         if (getAndSetParkPermit(false) || interrupted)
 892             return;
 893 
 894         // park the thread for the waiting time
 895         if (nanos > 0) {
 896             long startTime = System.nanoTime();
 897 
 898             // park the thread, afterYield will schedule the thread to unpark
 899             boolean yielded = false;
 900             long eventStartTime = VirtualThreadParkEvent.eventStartTime();
 901             timeout = nanos;
 902             setState(TIMED_PARKING);
 903             try {
 904                 yielded = yieldContinuation();
 905             } catch (OutOfMemoryError e) {
 906                 // park on carrier
 907             } finally {
 908                 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
 909                 if (yielded) {
 910                     VirtualThreadParkEvent.offer(eventStartTime, nanos);
 911                 } else {
 912                     assert state() == TIMED_PARKING;
 913                     setState(RUNNING);
 914                 }
 915             }
 916 
 917             // park on carrier thread for remaining time when pinned (or OOME)
 918             if (!yielded) {
 919                 long remainingNanos = nanos - (System.nanoTime() - startTime);
 920                 parkOnCarrierThread(true, remainingNanos);
 921             }
 922         }
 923     }
 924 
 925     /**
 926      * Parks the current carrier thread up to the given waiting time or until
 927      * unparked or interrupted. If the virtual thread is interrupted then the
 928      * interrupted status will be propagated to the carrier thread.
 929      * @param timed true for a timed park, false for untimed
 930      * @param nanos the waiting time in nanoseconds
 931      */

1521     @JvmtiMountTransition
1522     private native void notifyJvmtiEnd();
1523 
1524     @IntrinsicCandidate
1525     @JvmtiMountTransition
1526     private native void notifyJvmtiMount(boolean hide);
1527 
1528     @IntrinsicCandidate
1529     @JvmtiMountTransition
1530     private native void notifyJvmtiUnmount(boolean hide);
1531 
1532     @IntrinsicCandidate
1533     private static native void notifyJvmtiDisableSuspend(boolean enter);
1534 
1535     private static native void registerNatives();
1536     static {
1537         registerNatives();
1538 
1539         // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
1540         var group = Thread.virtualThreadGroup();
1541 
1542         // ensure event class is initialized
1543         try {
1544             MethodHandles.lookup().ensureInitialized(VirtualThreadParkEvent.class);
1545         } catch (IllegalAccessException e) {
1546             throw new ExceptionInInitializerError(e);
1547         }
1548     }
1549 
1550     /**
1551      * Loads a VirtualThreadScheduler with the given class name. The class must be public
1552      * in an exported package, with public one-arg or no-arg constructor, and be visible
1553      * to the system class loader.
1554      * @param delegate the scheduler that the custom scheduler may delegate to
1555      * @param cn the class name of the custom scheduler
1556      */
1557     private static VirtualThreadScheduler loadCustomScheduler(VirtualThreadScheduler delegate, String cn) {
1558         try {
1559             Class<?> clazz = Class.forName(cn, true, ClassLoader.getSystemClassLoader());
1560             VirtualThreadScheduler scheduler;
1561             try {
1562                 // 1-arg constructor
1563                 Constructor<?> ctor = clazz.getConstructor(VirtualThreadScheduler.class);
1564                 scheduler = (VirtualThreadScheduler) ctor.newInstance(delegate);
1565             } catch (NoSuchMethodException e) {
1566                 // 0-arg constructor
1567                 Constructor<?> ctor = clazz.getConstructor();
1568                 scheduler = (VirtualThreadScheduler) ctor.newInstance();
1569             }
1570             System.err.println("""
1571                 WARNING: Using custom default scheduler, this is an experimental feature!""");
1572             return scheduler;
1573         } catch (Exception ex) {
1574             throw new Error(ex);
1575         }
1576     }
1577 
1578     /**
1579      * Creates the built-in ForkJoinPool scheduler.
1580      * @param wrapped true if wrapped by a custom default scheduler
1581      */
1582     private static BuiltinDefaultScheduler createBuiltinDefaultScheduler(boolean wrapped) {

1583         int parallelism, maxPoolSize, minRunnable;
1584         String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
1585         String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
1586         String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
1587         if (parallelismValue != null) {
1588             parallelism = Integer.parseInt(parallelismValue);
1589         } else {
1590             parallelism = Runtime.getRuntime().availableProcessors();
1591         }
1592         if (maxPoolSizeValue != null) {
1593             maxPoolSize = Integer.parseInt(maxPoolSizeValue);
1594             parallelism = Integer.min(parallelism, maxPoolSize);
1595         } else {
1596             maxPoolSize = Integer.max(parallelism, 256);
1597         }
1598         if (minRunnableValue != null) {
1599             minRunnable = Integer.parseInt(minRunnableValue);
1600         } else {
1601             minRunnable = Integer.max(parallelism / 2, 1);
1602         }
1603         return new BuiltinDefaultScheduler(parallelism, maxPoolSize, minRunnable, wrapped);
1604     }
1605 
1606     /**
1607      * The built-in ForkJoinPool scheduler.
1608      */
1609     private static class BuiltinDefaultScheduler
1610             extends ForkJoinPool implements VirtualThreadScheduler {
1611 
1612         private static final StableValue<VirtualThreadScheduler> VIEW = StableValue.of();
1613 
1614         BuiltinDefaultScheduler(int parallelism, int maxPoolSize, int minRunnable, boolean wrapped) {
1615             ForkJoinWorkerThreadFactory factory = wrapped
1616                     ? ForkJoinPool.defaultForkJoinWorkerThreadFactory
1617                     : CarrierThread::new;
1618             Thread.UncaughtExceptionHandler handler = (t, e) -> { };
1619             boolean asyncMode = true; // FIFO
1620             super(parallelism, factory, handler, asyncMode,
1621                     0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);
1622         }
1623 
1624         private void adaptAndExecute(Runnable task) {
1625             execute(ForkJoinTask.adapt(task));
1626         }
1627 
1628         @Override
1629         public void onStart(VirtualThreadTask task) {
1630             adaptAndExecute(task);
1631         }
1632 
1633         @Override
1634         public void onContinue(VirtualThreadTask task) {
1635             adaptAndExecute(task);
1636         }
1637 
1638         /**
1639          * Wraps the scheduler to avoid leaking a direct reference.
1640          */
1641         VirtualThreadScheduler externalView() {
1642             BuiltinDefaultScheduler builtin = this;
1643             return VIEW.orElseSet(() -> {
1644                 return new VirtualThreadScheduler() {
1645                     private void execute(VirtualThreadTask task) {
1646                         var vthread = (VirtualThread) task.thread();
1647                         VirtualThreadScheduler scheduler = vthread.scheduler;
1648                         if (scheduler == this || scheduler == DEFAULT_SCHEDULER) {
1649                             builtin.adaptAndExecute(task);
1650                         } else {
1651                             throw new IllegalArgumentException();
1652                         }
1653                     }
1654                     @Override
1655                     public void onStart(VirtualThreadTask task) {
1656                         execute(task);
1657                     }
1658                     @Override
1659                     public void onContinue(VirtualThreadTask task) {
1660                         execute(task);
1661                     }
1662                 };
1663             });
1664         }
1665     }
1666 
1667     /**
1668      * Schedule a runnable task to run after a delay.
1669      */
1670     private Future<?> schedule(Runnable command, long delay, TimeUnit unit) {
1671         if (scheduler instanceof ForkJoinPool pool) {
1672             return pool.schedule(command, delay, unit);
1673         } else {
1674             return DelayedTaskSchedulers.schedule(command, delay, unit);
1675         }
1676     }
1677 
1678     /**
1679      * Supports scheduling a runnable task to run after a delay. It uses a number
1680      * of ScheduledThreadPoolExecutor instances to reduce contention on the delayed
1681      * work queue used. This class is used when using a custom scheduler.
1682      */
1683     private static class DelayedTaskSchedulers {
1684         private static final ScheduledExecutorService[] INSTANCE = createDelayedTaskSchedulers();

1734                 assert changed;
1735                 vthread.unblock();
1736 
1737                 vthread = nextThread;
1738             }
1739         }
1740     }
1741 
1742     /**
1743      * Retrieves the list of virtual threads that are waiting to be unblocked, waiting
1744      * if necessary until a list of one or more threads becomes available.
1745      */
1746     private static native VirtualThread takeVirtualThreadListToUnblock();
1747 
1748     static {
1749         var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
1750                 VirtualThread::unblockVirtualThreads);
1751         unblocker.setDaemon(true);
1752         unblocker.start();
1753     }
1754 }
< prev index next >