< prev index next >

src/java.base/share/classes/java/lang/VirtualThread.java

Print this page

   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.lang;
  26 

  27 import java.util.Locale;
  28 import java.util.Objects;
  29 import java.util.concurrent.CountDownLatch;
  30 import java.util.concurrent.Executor;
  31 import java.util.concurrent.Executors;
  32 import java.util.concurrent.ForkJoinPool;
  33 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
  34 import java.util.concurrent.ForkJoinTask;
  35 import java.util.concurrent.Future;
  36 import java.util.concurrent.RejectedExecutionException;
  37 import java.util.concurrent.ScheduledExecutorService;
  38 import java.util.concurrent.ScheduledThreadPoolExecutor;
  39 import java.util.concurrent.TimeUnit;

  40 import jdk.internal.event.VirtualThreadEndEvent;
  41 import jdk.internal.event.VirtualThreadStartEvent;
  42 import jdk.internal.event.VirtualThreadSubmitFailedEvent;
  43 import jdk.internal.misc.CarrierThread;
  44 import jdk.internal.misc.InnocuousThread;
  45 import jdk.internal.misc.Unsafe;
  46 import jdk.internal.vm.Continuation;
  47 import jdk.internal.vm.ContinuationScope;
  48 import jdk.internal.vm.StackableScope;
  49 import jdk.internal.vm.ThreadContainer;
  50 import jdk.internal.vm.ThreadContainers;
  51 import jdk.internal.vm.annotation.ChangesCurrentThread;
  52 import jdk.internal.vm.annotation.Hidden;
  53 import jdk.internal.vm.annotation.IntrinsicCandidate;
  54 import jdk.internal.vm.annotation.JvmtiHideEvents;
  55 import jdk.internal.vm.annotation.JvmtiMountTransition;
  56 import jdk.internal.vm.annotation.ReservedStackAccess;
  57 import sun.nio.ch.Interruptible;
  58 import static java.util.concurrent.TimeUnit.*;
  59 
  60 /**
  61  * A thread that is scheduled by the Java virtual machine rather than the operating system.
  62  */
  63 final class VirtualThread extends BaseVirtualThread {
  64     private static final Unsafe U = Unsafe.getUnsafe();
  65     private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
  66     private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();













  67 
  68     private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
  69     private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
  70     private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
  71     private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
  72     private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
  73 
  74     // scheduler and continuation
  75     private final Executor scheduler;
  76     private final Continuation cont;
  77     private final Runnable runContinuation;
  78 
  79     // virtual thread state, accessed by VM
  80     private volatile int state;
  81 
  82     /*
  83      * Virtual thread state transitions:
  84      *
  85      *      NEW -> STARTED         // Thread.start, schedule to run
  86      *  STARTED -> TERMINATED      // failed to start
  87      *  STARTED -> RUNNING         // first run
  88      *  RUNNING -> TERMINATED      // done
  89      *
  90      *  RUNNING -> PARKING         // Thread parking with LockSupport.park
  91      *  PARKING -> PARKED          // cont.yield successful, parked indefinitely
  92      *  PARKING -> PINNED          // cont.yield failed, parked indefinitely on carrier
  93      *   PARKED -> UNPARKED        // unparked, may be scheduled to continue
  94      *   PINNED -> RUNNING         // unparked, continue execution on same carrier
  95      * UNPARKED -> RUNNING         // continue execution after park

 151     private static final int TERMINATED = 99;  // final state
 152 
 153     // can be suspended from scheduling when unmounted
 154     private static final int SUSPENDED = 1 << 8;
 155 
 156     // parking permit made available by LockSupport.unpark
 157     private volatile boolean parkPermit;
 158 
 159     // blocking permit made available by unblocker thread when another thread exits monitor
 160     private volatile boolean blockPermit;
 161 
 162     // true when on the list of virtual threads waiting to be unblocked
 163     private volatile boolean onWaitingList;
 164 
 165     // next virtual thread on the list of virtual threads waiting to be unblocked
 166     private volatile VirtualThread next;
 167 
 168     // notified by Object.notify/notifyAll while waiting in Object.wait
 169     private volatile boolean notified;
 170 



 171     // timed-wait support
 172     private byte timedWaitSeqNo;
 173 
 174     // timeout for timed-park and timed-wait, only accessed on current/carrier thread
 175     private long timeout;
 176 
 177     // timer task for timed-park and timed-wait, only accessed on current/carrier thread
 178     private Future<?> timeoutTask;
 179 
 180     // carrier thread when mounted, accessed by VM
 181     private volatile Thread carrierThread;
 182 
 183     // termination object when joining, created lazily if needed
 184     private volatile CountDownLatch termination;
 185 
 186     /**
 187      * Returns the default scheduler.
 188      */
 189     static Executor defaultScheduler() {
 190         return DEFAULT_SCHEDULER;
 191     }
 192 







 193     /**
 194      * Returns the continuation scope used for virtual threads.
 195      */
 196     static ContinuationScope continuationScope() {
 197         return VTHREAD_SCOPE;
 198     }
 199 
 200     /**
 201      * Creates a new {@code VirtualThread} to run the given task with the given
 202      * scheduler. If the given scheduler is {@code null} and the current thread
 203      * is a platform thread then the newly created virtual thread will use the
 204      * default scheduler. If given scheduler is {@code null} and the current
 205      * thread is a virtual thread then the current thread's scheduler is used.








 206      *
 207      * @param scheduler the scheduler or null
 208      * @param name thread name
 209      * @param characteristics characteristics
 210      * @param task the task to execute
 211      */
 212     VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {



 213         super(name, characteristics, /*bound*/ false);
 214         Objects.requireNonNull(task);
 215 
 216         // choose scheduler if not specified
 217         if (scheduler == null) {
 218             Thread parent = Thread.currentThread();
 219             if (parent instanceof VirtualThread vparent) {
 220                 scheduler = vparent.scheduler;
 221             } else {
 222                 scheduler = DEFAULT_SCHEDULER;
 223             }
 224         }
 225 
 226         this.scheduler = scheduler;
 227         this.cont = new VThreadContinuation(this, task);
 228         this.runContinuation = this::runContinuation;
 229     }
 230 
 231     /**
 232      * The continuation that a virtual thread executes.
 233      */
 234     private static class VThreadContinuation extends Continuation {
 235         VThreadContinuation(VirtualThread vthread, Runnable task) {
 236             super(VTHREAD_SCOPE, wrap(vthread, task));
 237         }
 238         @Override
 239         protected void onPinned(Continuation.Pinned reason) {
 240         }
 241         private static Runnable wrap(VirtualThread vthread, Runnable task) {
 242             return new Runnable() {
 243                 @Hidden

 299                 afterYield();
 300             }
 301         }
 302     }
 303 
 304     /**
 305      * Cancel timeout task when continuing after timed-park or timed-wait.
 306      * The timeout task may be executing, or may have already completed.
 307      */
 308     private void cancelTimeoutTask() {
 309         if (timeoutTask != null) {
 310             timeoutTask.cancel(false);
 311             timeoutTask = null;
 312         }
 313     }
 314 
 315     /**
 316      * Submits the runContinuation task to the scheduler. For the default scheduler,
 317      * and calling it on a worker thread, the task will be pushed to the local queue,
 318      * otherwise it will be pushed to an external submission queue.
 319      * @param scheduler the scheduler
 320      * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown
 321      * @throws RejectedExecutionException
 322      */
 323     private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) {
 324         boolean done = false;
 325         while (!done) {
 326             try {
 327                 // Pin the continuation to prevent the virtual thread from unmounting
 328                 // when submitting a task. For the default scheduler this ensures that
 329                 // the carrier doesn't change when pushing a task. For other schedulers
 330                 // it avoids deadlock that could arise due to carriers and virtual
 331                 // threads contending for a lock.
 332                 if (currentThread().isVirtual()) {
 333                     Continuation.pin();
 334                     try {
 335                         scheduler.execute(runContinuation);
 336                     } finally {
 337                         Continuation.unpin();
 338                     }
 339                 } else {
 340                     scheduler.execute(runContinuation);
 341                 }
 342                 done = true;
 343             } catch (RejectedExecutionException ree) {
 344                 submitFailed(ree);
 345                 throw ree;
 346             } catch (OutOfMemoryError e) {
 347                 if (retryOnOOME) {
 348                     U.park(false, 100_000_000); // 100ms
 349                 } else {
 350                     throw e;
 351                 }
 352             }
 353         }
 354     }
 355 
 356     /**
 357      * Submits the runContinuation task to the given scheduler as an external submit.
 358      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
 359      * @throws RejectedExecutionException
 360      * @see ForkJoinPool#externalSubmit(ForkJoinTask)
 361      */
 362     private void externalSubmitRunContinuation(ForkJoinPool pool) {
 363         assert Thread.currentThread() instanceof CarrierThread;
 364         try {
 365             pool.externalSubmit(ForkJoinTask.adapt(runContinuation));
 366         } catch (RejectedExecutionException ree) {
 367             submitFailed(ree);
 368             throw ree;
 369         } catch (OutOfMemoryError e) {
 370             submitRunContinuation(pool, true);
 371         }
 372     }
 373 
 374     /**
 375      * Submits the runContinuation task to the scheduler. For the default scheduler,
 376      * and calling it on a worker thread, the task will be pushed to the local queue,
 377      * otherwise it will be pushed to an external submission queue.
 378      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
 379      * @throws RejectedExecutionException
 380      */
 381     private void submitRunContinuation() {
 382         submitRunContinuation(scheduler, true);
 383     }
 384 
 385     /**
 386      * Lazy submit the runContinuation task if invoked on a carrier thread and its local
 387      * queue is empty. If not empty, or invoked by another thread, then this method works
 388      * like submitRunContinuation and just submits the task to the scheduler.
 389      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
 390      * @throws RejectedExecutionException
 391      * @see ForkJoinPool#lazySubmit(ForkJoinTask)
 392      */
 393     private void lazySubmitRunContinuation() {
 394         if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
 395             ForkJoinPool pool = ct.getPool();

 396             try {
 397                 pool.lazySubmit(ForkJoinTask.adapt(runContinuation));





















 398             } catch (RejectedExecutionException ree) {
 399                 submitFailed(ree);
 400                 throw ree;
 401             } catch (OutOfMemoryError e) {
 402                 submitRunContinuation();
 403             }
 404         } else {
 405             submitRunContinuation();
 406         }
 407     }
 408 
 409     /**
 410      * Submits the runContinuation task to the scheduler. For the default scheduler, and
 411      * calling it a virtual thread that uses the default scheduler, the task will be
 412      * pushed to an external submission queue. This method may throw OutOfMemoryError.
 413      * @throws RejectedExecutionException
 414      * @throws OutOfMemoryError
 415      */
 416     private void externalSubmitRunContinuationOrThrow() {
 417         if (scheduler == DEFAULT_SCHEDULER && currentCarrierThread() instanceof CarrierThread ct) {
 418             try {
 419                 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation));
 420             } catch (RejectedExecutionException ree) {
 421                 submitFailed(ree);
 422                 throw ree;
 423             }
 424         } else {
 425             submitRunContinuation(scheduler, false);
 426         }
 427     }
 428 
 429     /**
 430      * If enabled, emits a JFR VirtualThreadSubmitFailedEvent.
 431      */
 432     private void submitFailed(RejectedExecutionException ree) {
 433         var event = new VirtualThreadSubmitFailedEvent();
 434         if (event.isEnabled()) {
 435             event.javaThreadId = threadId();
 436             event.exceptionMessage = ree.getMessage();
 437             event.commit();
 438         }
 439     }
 440 
 441     /**
 442      * Runs a task in the context of this virtual thread.
 443      */
 444     private void run(Runnable task) {
 445         assert Thread.currentThread() == this && state == RUNNING;

 560                 long timeout = this.timeout;
 561                 assert timeout > 0;
 562                 timeoutTask = schedule(this::parkTimeoutExpired, timeout, NANOSECONDS);
 563                 setState(newState = TIMED_PARKED);
 564             }
 565 
 566             // may have been unparked while parking
 567             if (parkPermit && compareAndSetState(newState, UNPARKED)) {
 568                 // lazy submit if local queue is empty
 569                 lazySubmitRunContinuation();
 570             }
 571             return;
 572         }
 573 
 574         // Thread.yield
 575         if (s == YIELDING) {
 576             setState(YIELDED);
 577 
 578             // external submit if there are no tasks in the local task queue
 579             if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
 580                 externalSubmitRunContinuation(ct.getPool());
 581             } else {
 582                 submitRunContinuation();
 583             }
 584             return;
 585         }
 586 
 587         // blocking on monitorenter
 588         if (s == BLOCKING) {
 589             setState(BLOCKED);
 590 
 591             // may have been unblocked while blocking
 592             if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
 593                 // lazy submit if local queue is empty
 594                 lazySubmitRunContinuation();
 595             }
 596             return;
 597         }
 598 
 599         // Object.wait
 600         if (s == WAITING || s == TIMED_WAITING) {
 601             int newState;

 602             if (s == WAITING) {
 603                 setState(newState = WAIT);
 604             } else {
 605                 // For timed-wait, a timeout task is scheduled to execute. The timeout
 606                 // task will change the thread state to UNBLOCKED and submit the thread
 607                 // to the scheduler. A sequence number is used to ensure that the timeout
 608                 // task only unblocks the thread for this timed-wait. We synchronize with
 609                 // the timeout task to coordinate access to the sequence number and to
 610                 // ensure the timeout task doesn't execute until the thread has got to
 611                 // the TIMED_WAIT state.
 612                 long timeout = this.timeout;
 613                 assert timeout > 0;
 614                 synchronized (timedWaitLock()) {
 615                     byte seqNo = ++timedWaitSeqNo;
 616                     timeoutTask = schedule(() -> waitTimeoutExpired(seqNo), timeout, MILLISECONDS);
 617                     setState(newState = TIMED_WAIT);
 618                 }
 619             }
 620 
 621             // may have been notified while in transition to wait state
 622             if (notified && compareAndSetState(newState, BLOCKED)) {
 623                 // may have even been unblocked already
 624                 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
 625                     submitRunContinuation();
 626                 }
 627                 return;
 628             }
 629 
 630             // may have been interrupted while in transition to wait state
 631             if (interrupted && compareAndSetState(newState, UNBLOCKED)) {
 632                 submitRunContinuation();
 633                 return;
 634             }
 635             return;
 636         }
 637 
 638         assert false;
 639     }
 640 
 641     /**
 642      * Invoked after the continuation completes.
 643      */
 644     private void afterDone() {
 645         afterDone(true);
 646     }
 647 
 648     /**
 649      * Invoked after the continuation completes (or start failed). Sets the thread
 650      * state to TERMINATED and notifies anyone waiting for the thread to terminate.
 651      *

1147                 throw new InternalError();
1148         }
1149     }
1150 
1151     @Override
1152     boolean alive() {
1153         int s = state;
1154         return (s != NEW && s != TERMINATED);
1155     }
1156 
1157     @Override
1158     boolean isTerminated() {
1159         return (state == TERMINATED);
1160     }
1161 
1162     @Override
1163     StackTraceElement[] asyncGetStackTrace() {
1164         StackTraceElement[] stackTrace;
1165         do {
1166             stackTrace = (carrierThread != null)
1167                     ? super.asyncGetStackTrace()  // mounted
1168                     : tryGetStackTrace();         // unmounted

1169             if (stackTrace == null) {
1170                 Thread.yield();
1171             }
1172         } while (stackTrace == null);
1173         return stackTrace;
1174     }
1175 
1176     /**
1177      * Returns the stack trace for this virtual thread if it is unmounted.
1178      * Returns null if the thread is mounted or in transition.


1179      */
1180     private StackTraceElement[] tryGetStackTrace() {
1181         int initialState = state() & ~SUSPENDED;
1182         switch (initialState) {
1183             case NEW, STARTED, TERMINATED -> {
1184                 return new StackTraceElement[0];  // unmounted, empty stack
1185             }
1186             case RUNNING, PINNED, TIMED_PINNED -> {
1187                 return null;   // mounted
1188             }
1189             case PARKED, TIMED_PARKED, BLOCKED, WAIT, TIMED_WAIT -> {
1190                 // unmounted, not runnable
1191             }
1192             case UNPARKED, UNBLOCKED, YIELDED -> {
1193                 // unmounted, runnable
1194             }
1195             case PARKING, TIMED_PARKING, BLOCKING, YIELDING, WAITING, TIMED_WAITING -> {
1196                 return null;  // in transition
1197             }
1198             default -> throw new InternalError("" + initialState);
1199         }
1200 
1201         // thread is unmounted, prevent it from continuing
1202         int suspendedState = initialState | SUSPENDED;
1203         if (!compareAndSetState(initialState, suspendedState)) {
1204             return null;
1205         }
1206 
1207         // get stack trace and restore state
1208         StackTraceElement[] stack;
1209         try {
1210             stack = cont.getStackTrace();
1211         } finally {
1212             assert state == suspendedState;
1213             setState(initialState);
1214         }
1215         boolean resubmit = switch (initialState) {
1216             case UNPARKED, UNBLOCKED, YIELDED -> {
1217                 // resubmit as task may have run while suspended
1218                 yield true;
1219             }
1220             case PARKED, TIMED_PARKED -> {
1221                 // resubmit if unparked while suspended
1222                 yield parkPermit && compareAndSetState(initialState, UNPARKED);
1223             }
1224             case BLOCKED -> {
1225                 // resubmit if unblocked while suspended
1226                 yield blockPermit && compareAndSetState(BLOCKED, UNBLOCKED);
1227             }
1228             case WAIT, TIMED_WAIT -> {
1229                 // resubmit if notified or interrupted while waiting (Object.wait)
1230                 // waitTimeoutExpired will retry if the timed expired when suspended
1231                 yield (notified || interrupted) && compareAndSetState(initialState, UNBLOCKED);





1232             }
1233             default -> throw new InternalError();
1234         };
1235         if (resubmit) {
1236             submitRunContinuation();
1237         }
1238         return stack;
1239     }
1240 
1241     @Override
1242     public String toString() {
1243         StringBuilder sb = new StringBuilder("VirtualThread[#");
1244         sb.append(threadId());
1245         String name = getName();
1246         if (!name.isEmpty()) {
1247             sb.append(",");
1248             sb.append(name);
1249         }
1250         sb.append("]/");
1251 
1252         // add the carrier state and thread name when mounted
1253         boolean mounted;
1254         if (Thread.currentThread() == this) {
1255             mounted = appendCarrierInfo(sb);
1256         } else {
1257             disableSuspendAndPreempt();
1258             try {

1398     @IntrinsicCandidate
1399     @JvmtiMountTransition
1400     private native void notifyJvmtiMount(boolean hide);
1401 
1402     @IntrinsicCandidate
1403     @JvmtiMountTransition
1404     private native void notifyJvmtiUnmount(boolean hide);
1405 
1406     @IntrinsicCandidate
1407     private static native void notifyJvmtiDisableSuspend(boolean enter);
1408 
1409     private static native void registerNatives();
1410     static {
1411         registerNatives();
1412 
1413         // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
1414         var group = Thread.virtualThreadGroup();
1415     }
1416 
1417     /**
1418      * Creates the default ForkJoinPool scheduler.


1419      */
1420     private static ForkJoinPool createDefaultScheduler() {
1421         ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool);
























1422         int parallelism, maxPoolSize, minRunnable;
1423         String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
1424         String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
1425         String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
1426         if (parallelismValue != null) {
1427             parallelism = Integer.parseInt(parallelismValue);
1428         } else {
1429             parallelism = Runtime.getRuntime().availableProcessors();
1430         }
1431         if (maxPoolSizeValue != null) {
1432             maxPoolSize = Integer.parseInt(maxPoolSizeValue);
1433             parallelism = Integer.min(parallelism, maxPoolSize);
1434         } else {
1435             maxPoolSize = Integer.max(parallelism, 256);
1436         }
1437         if (minRunnableValue != null) {
1438             minRunnable = Integer.parseInt(minRunnableValue);
1439         } else {
1440             minRunnable = Integer.max(parallelism / 2, 1);
1441         }
1442         Thread.UncaughtExceptionHandler handler = (t, e) -> { };
1443         boolean asyncMode = true; // FIFO
1444         return new ForkJoinPool(parallelism, factory, handler, asyncMode,
1445                      0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);












































1446     }
1447 
1448     /**
1449      * Schedule a runnable task to run after a delay.
1450      */
1451     private Future<?> schedule(Runnable command, long delay, TimeUnit unit) {
1452         if (scheduler instanceof ForkJoinPool pool) {
1453             return pool.schedule(command, delay, unit);
1454         } else {
1455             return DelayedTaskSchedulers.schedule(command, delay, unit);
1456         }
1457     }
1458 
1459     /**
1460      * Supports scheduling a runnable task to run after a delay. It uses a number
1461      * of ScheduledThreadPoolExecutor instances to reduce contention on the delayed
1462      * work queue used. This class is used when using a custom scheduler.
1463      */
1464     private static class DelayedTaskSchedulers {
1465         private static final ScheduledExecutorService[] INSTANCE = createDelayedTaskSchedulers();

1515                 assert changed;
1516                 vthread.unblock();
1517 
1518                 vthread = nextThread;
1519             }
1520         }
1521     }
1522 
1523     /**
1524      * Retrieves the list of virtual threads that are waiting to be unblocked, waiting
1525      * if necessary until a list of one or more threads becomes available.
1526      */
1527     private static native VirtualThread takeVirtualThreadListToUnblock();
1528 
1529     static {
1530         var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
1531                 VirtualThread::unblockVirtualThreads);
1532         unblocker.setDaemon(true);
1533         unblocker.start();
1534     }
1535 }

   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.lang;
  26 
  27 import java.lang.reflect.Constructor;
  28 import java.util.Locale;
  29 import java.util.Objects;
  30 import java.util.concurrent.CountDownLatch;

  31 import java.util.concurrent.Executors;
  32 import java.util.concurrent.ForkJoinPool;

  33 import java.util.concurrent.ForkJoinTask;
  34 import java.util.concurrent.Future;
  35 import java.util.concurrent.RejectedExecutionException;
  36 import java.util.concurrent.ScheduledExecutorService;
  37 import java.util.concurrent.ScheduledThreadPoolExecutor;
  38 import java.util.concurrent.TimeUnit;
  39 import java.util.function.Supplier;
  40 import jdk.internal.event.VirtualThreadEndEvent;
  41 import jdk.internal.event.VirtualThreadStartEvent;
  42 import jdk.internal.event.VirtualThreadSubmitFailedEvent;
  43 import jdk.internal.misc.CarrierThread;
  44 import jdk.internal.misc.InnocuousThread;
  45 import jdk.internal.misc.Unsafe;
  46 import jdk.internal.vm.Continuation;
  47 import jdk.internal.vm.ContinuationScope;
  48 import jdk.internal.vm.StackableScope;
  49 import jdk.internal.vm.ThreadContainer;
  50 import jdk.internal.vm.ThreadContainers;
  51 import jdk.internal.vm.annotation.ChangesCurrentThread;
  52 import jdk.internal.vm.annotation.Hidden;
  53 import jdk.internal.vm.annotation.IntrinsicCandidate;
  54 import jdk.internal.vm.annotation.JvmtiHideEvents;
  55 import jdk.internal.vm.annotation.JvmtiMountTransition;
  56 import jdk.internal.vm.annotation.ReservedStackAccess;
  57 import sun.nio.ch.Interruptible;
  58 import static java.util.concurrent.TimeUnit.*;
  59 
  60 /**
  61  * A thread that is scheduled by the Java virtual machine rather than the operating system.
  62  */
  63 final class VirtualThread extends BaseVirtualThread {
  64     private static final Unsafe U = Unsafe.getUnsafe();
  65     private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
  66 
  67     private static final VirtualThreadScheduler DEFAULT_SCHEDULER;
  68     private static final boolean IS_CUSTOM_DEFAULT_SCHEDULER;
  69     static {
  70         // experimental
  71         String propValue = System.getProperty("jdk.virtualThreadScheduler.implClass");
  72         if (propValue != null) {
  73             DEFAULT_SCHEDULER = createCustomDefaultScheduler(propValue);
  74             IS_CUSTOM_DEFAULT_SCHEDULER = true;
  75         } else {
  76             DEFAULT_SCHEDULER = createDefaultForkJoinPoolScheduler();
  77             IS_CUSTOM_DEFAULT_SCHEDULER = false;
  78         }
  79     }
  80 
  81     private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
  82     private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
  83     private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
  84     private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
  85     private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
  86 
  87     // scheduler and continuation
  88     private final VirtualThreadScheduler scheduler;
  89     private final Continuation cont;
  90     private final Runnable runContinuation;
  91 
  92     // virtual thread state, accessed by VM
  93     private volatile int state;
  94 
  95     /*
  96      * Virtual thread state transitions:
  97      *
  98      *      NEW -> STARTED         // Thread.start, schedule to run
  99      *  STARTED -> TERMINATED      // failed to start
 100      *  STARTED -> RUNNING         // first run
 101      *  RUNNING -> TERMINATED      // done
 102      *
 103      *  RUNNING -> PARKING         // Thread parking with LockSupport.park
 104      *  PARKING -> PARKED          // cont.yield successful, parked indefinitely
 105      *  PARKING -> PINNED          // cont.yield failed, parked indefinitely on carrier
 106      *   PARKED -> UNPARKED        // unparked, may be scheduled to continue
 107      *   PINNED -> RUNNING         // unparked, continue execution on same carrier
 108      * UNPARKED -> RUNNING         // continue execution after park

 164     private static final int TERMINATED = 99;  // final state
 165 
 166     // can be suspended from scheduling when unmounted
 167     private static final int SUSPENDED = 1 << 8;
 168 
 169     // parking permit made available by LockSupport.unpark
 170     private volatile boolean parkPermit;
 171 
 172     // blocking permit made available by unblocker thread when another thread exits monitor
 173     private volatile boolean blockPermit;
 174 
 175     // true when on the list of virtual threads waiting to be unblocked
 176     private volatile boolean onWaitingList;
 177 
 178     // next virtual thread on the list of virtual threads waiting to be unblocked
 179     private volatile VirtualThread next;
 180 
 181     // notified by Object.notify/notifyAll while waiting in Object.wait
 182     private volatile boolean notified;
 183 
 184     // true when waiting in Object.wait, false for VM internal uninterruptible Object.wait
 185     private volatile boolean interruptableWait;
 186 
 187     // timed-wait support
 188     private byte timedWaitSeqNo;
 189 
 190     // timeout for timed-park and timed-wait, only accessed on current/carrier thread
 191     private long timeout;
 192 
 193     // timer task for timed-park and timed-wait, only accessed on current/carrier thread
 194     private Future<?> timeoutTask;
 195 
 196     // carrier thread when mounted, accessed by VM
 197     private volatile Thread carrierThread;
 198 
 199     // termination object when joining, created lazily if needed
 200     private volatile CountDownLatch termination;
 201 
 202     /**
 203      * Returns the default scheduler.
 204      */
 205     static VirtualThreadScheduler defaultScheduler() {
 206         return DEFAULT_SCHEDULER;
 207     }
 208 
 209     /**
 210      * Returns true if using a custom default scheduler.
 211      */
 212     static boolean isCustomDefaultScheduler() {
 213         return IS_CUSTOM_DEFAULT_SCHEDULER;
 214     }
 215 
 216     /**
 217      * Returns the continuation scope used for virtual threads.
 218      */
 219     static ContinuationScope continuationScope() {
 220         return VTHREAD_SCOPE;
 221     }
 222 
 223     /**
 224      * Return the scheduler for this thread.
 225      * @param revealBuiltin true to reveal the built-in default scheduler, false to hide
 226      */
 227     VirtualThreadScheduler scheduler(boolean revealBuiltin) {
 228         if (scheduler instanceof BuiltinDefaultScheduler builtin && !revealBuiltin) {
 229             return builtin.externalView();
 230         } else {
 231             return scheduler;
 232         }
 233     }
 234 
 235     /**
 236      * Creates a new {@code VirtualThread} to run the given task with the given scheduler.
 237      *
 238      * @param scheduler the scheduler or null for default scheduler
 239      * @param name thread name
 240      * @param characteristics characteristics
 241      * @param task the task to execute
 242      */
 243     VirtualThread(VirtualThreadScheduler scheduler,
 244                   String name,
 245                   int characteristics,
 246                   Runnable task) {
 247         super(name, characteristics, /*bound*/ false);
 248         Objects.requireNonNull(task);
 249 
 250         // use default scheduler if not provided
 251         if (scheduler == null) {
 252             scheduler = DEFAULT_SCHEDULER;





 253         }
 254 
 255         this.scheduler = scheduler;
 256         this.cont = new VThreadContinuation(this, task);
 257         this.runContinuation = this::runContinuation;
 258     }
 259 
 260     /**
 261      * The continuation that a virtual thread executes.
 262      */
 263     private static class VThreadContinuation extends Continuation {
 264         VThreadContinuation(VirtualThread vthread, Runnable task) {
 265             super(VTHREAD_SCOPE, wrap(vthread, task));
 266         }
 267         @Override
 268         protected void onPinned(Continuation.Pinned reason) {
 269         }
 270         private static Runnable wrap(VirtualThread vthread, Runnable task) {
 271             return new Runnable() {
 272                 @Hidden

 328                 afterYield();
 329             }
 330         }
 331     }
 332 
 333     /**
 334      * Cancel timeout task when continuing after timed-park or timed-wait.
 335      * The timeout task may be executing, or may have already completed.
 336      */
 337     private void cancelTimeoutTask() {
 338         if (timeoutTask != null) {
 339             timeoutTask.cancel(false);
 340             timeoutTask = null;
 341         }
 342     }
 343 
 344     /**
 345      * Submits the runContinuation task to the scheduler. For the default scheduler,
 346      * and calling it on a worker thread, the task will be pushed to the local queue,
 347      * otherwise it will be pushed to an external submission queue.

 348      * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown
 349      * @throws RejectedExecutionException
 350      */
 351     private void submitRunContinuation(boolean retryOnOOME) {
 352         boolean done = false;
 353         while (!done) {
 354             try {
 355                 // Pin the continuation to prevent the virtual thread from unmounting
 356                 // when submitting a task. For the default scheduler this ensures that
 357                 // the carrier doesn't change when pushing a task. For other schedulers
 358                 // it avoids deadlock that could arise due to carriers and virtual
 359                 // threads contending for a lock.
 360                 if (currentThread().isVirtual()) {
 361                     Continuation.pin();
 362                     try {
 363                         scheduler.execute(this, runContinuation);
 364                     } finally {
 365                         Continuation.unpin();
 366                     }
 367                 } else {
 368                     scheduler.execute(this, runContinuation);
 369                 }
 370                 done = true;
 371             } catch (RejectedExecutionException ree) {
 372                 submitFailed(ree);
 373                 throw ree;
 374             } catch (OutOfMemoryError e) {
 375                 if (retryOnOOME) {
 376                     U.park(false, 100_000_000); // 100ms
 377                 } else {
 378                     throw e;
 379                 }
 380             }
 381         }
 382     }
 383 


















 384     /**
 385      * Submits the runContinuation task to the scheduler. For the default scheduler,
 386      * and calling it on a worker thread, the task will be pushed to the local queue,
 387      * otherwise it will be pushed to an external submission queue.
 388      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
 389      * @throws RejectedExecutionException
 390      */
 391     private void submitRunContinuation() {
 392         submitRunContinuation(true);
 393     }
 394 
 395     /**
 396      * Lazy submit the runContinuation task if invoked on a carrier thread and its local
 397      * queue is empty. If not empty, or invoked by another thread, then this method works
 398      * like submitRunContinuation and just submits the task to the scheduler.
 399      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
 400      * @throws RejectedExecutionException
 401      * @see ForkJoinPool#lazySubmit(ForkJoinTask)
 402      */
 403     private void lazySubmitRunContinuation() {
 404         if (scheduler == DEFAULT_SCHEDULER
 405                 && currentCarrierThread() instanceof CarrierThread ct
 406                 && ct.getQueuedTaskCount() == 0) {
 407             try {
 408                 ct.getPool().lazySubmit(ForkJoinTask.adapt(runContinuation));
 409             } catch (RejectedExecutionException ree) {
 410                 submitFailed(ree);
 411                 throw ree;
 412             } catch (OutOfMemoryError e) {
 413                 submitRunContinuation();
 414             }
 415         } else {
 416             submitRunContinuation();
 417         }
 418     }
 419 
 420     /**
 421      * Submits the runContinuation task to the scheduler. For the default scheduler, and
 422      * calling it a virtual thread that uses the default scheduler, the task will be
 423      * pushed to an external submission queue.
 424      * @throws RejectedExecutionException
 425      */
 426     private void externalSubmitRunContinuation() {
 427         if (scheduler == DEFAULT_SCHEDULER && currentCarrierThread() instanceof CarrierThread ct) {
 428             try {
 429                 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation));
 430             } catch (RejectedExecutionException ree) {
 431                 submitFailed(ree);
 432                 throw ree;
 433             } catch (OutOfMemoryError e) {
 434                 submitRunContinuation();
 435             }
 436         } else {
 437             submitRunContinuation();
 438         }
 439     }
 440 
 441     /**
 442      * Submits the runContinuation task to the scheduler. For the default scheduler, and
 443      * calling it a virtual thread that uses the default scheduler, the task will be
 444      * pushed to an external submission queue. This method may throw OutOfMemoryError.
 445      * @throws RejectedExecutionException
 446      * @throws OutOfMemoryError
 447      */
 448     private void externalSubmitRunContinuationOrThrow() {
 449         if (scheduler == DEFAULT_SCHEDULER && currentCarrierThread() instanceof CarrierThread ct) {
 450             try {
 451                 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation));
 452             } catch (RejectedExecutionException ree) {
 453                 submitFailed(ree);
 454                 throw ree;
 455             }
 456         } else {
 457             submitRunContinuation(false);
 458         }
 459     }
 460 
 461     /**
 462      * If enabled, emits a JFR VirtualThreadSubmitFailedEvent.
 463      */
 464     private void submitFailed(RejectedExecutionException ree) {
 465         var event = new VirtualThreadSubmitFailedEvent();
 466         if (event.isEnabled()) {
 467             event.javaThreadId = threadId();
 468             event.exceptionMessage = ree.getMessage();
 469             event.commit();
 470         }
 471     }
 472 
 473     /**
 474      * Runs a task in the context of this virtual thread.
 475      */
 476     private void run(Runnable task) {
 477         assert Thread.currentThread() == this && state == RUNNING;

 592                 long timeout = this.timeout;
 593                 assert timeout > 0;
 594                 timeoutTask = schedule(this::parkTimeoutExpired, timeout, NANOSECONDS);
 595                 setState(newState = TIMED_PARKED);
 596             }
 597 
 598             // may have been unparked while parking
 599             if (parkPermit && compareAndSetState(newState, UNPARKED)) {
 600                 // lazy submit if local queue is empty
 601                 lazySubmitRunContinuation();
 602             }
 603             return;
 604         }
 605 
 606         // Thread.yield
 607         if (s == YIELDING) {
 608             setState(YIELDED);
 609 
 610             // external submit if there are no tasks in the local task queue
 611             if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
 612                 externalSubmitRunContinuation();
 613             } else {
 614                 submitRunContinuation();
 615             }
 616             return;
 617         }
 618 
 619         // blocking on monitorenter
 620         if (s == BLOCKING) {
 621             setState(BLOCKED);
 622 
 623             // may have been unblocked while blocking
 624             if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
 625                 // lazy submit if local queue is empty
 626                 lazySubmitRunContinuation();
 627             }
 628             return;
 629         }
 630 
 631         // Object.wait
 632         if (s == WAITING || s == TIMED_WAITING) {
 633             int newState;
 634             boolean interruptable = interruptableWait;
 635             if (s == WAITING) {
 636                 setState(newState = WAIT);
 637             } else {
 638                 // For timed-wait, a timeout task is scheduled to execute. The timeout
 639                 // task will change the thread state to UNBLOCKED and submit the thread
 640                 // to the scheduler. A sequence number is used to ensure that the timeout
 641                 // task only unblocks the thread for this timed-wait. We synchronize with
 642                 // the timeout task to coordinate access to the sequence number and to
 643                 // ensure the timeout task doesn't execute until the thread has got to
 644                 // the TIMED_WAIT state.
 645                 long timeout = this.timeout;
 646                 assert timeout > 0;
 647                 synchronized (timedWaitLock()) {
 648                     byte seqNo = ++timedWaitSeqNo;
 649                     timeoutTask = schedule(() -> waitTimeoutExpired(seqNo), timeout, MILLISECONDS);
 650                     setState(newState = TIMED_WAIT);
 651                 }
 652             }
 653 
 654             // may have been notified while in transition to wait state
 655             if (notified && compareAndSetState(newState, BLOCKED)) {
 656                 // may have even been unblocked already
 657                 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
 658                     submitRunContinuation();
 659                 }
 660                 return;
 661             }
 662 
 663             // may have been interrupted while in transition to wait state
 664             if (interruptable && interrupted && compareAndSetState(newState, UNBLOCKED)) {
 665                 submitRunContinuation();
 666                 return;
 667             }
 668             return;
 669         }
 670 
 671         assert false;
 672     }
 673 
 674     /**
 675      * Invoked after the continuation completes.
 676      */
 677     private void afterDone() {
 678         afterDone(true);
 679     }
 680 
 681     /**
 682      * Invoked after the continuation completes (or start failed). Sets the thread
 683      * state to TERMINATED and notifies anyone waiting for the thread to terminate.
 684      *

1180                 throw new InternalError();
1181         }
1182     }
1183 
1184     @Override
1185     boolean alive() {
1186         int s = state;
1187         return (s != NEW && s != TERMINATED);
1188     }
1189 
1190     @Override
1191     boolean isTerminated() {
1192         return (state == TERMINATED);
1193     }
1194 
1195     @Override
1196     StackTraceElement[] asyncGetStackTrace() {
1197         StackTraceElement[] stackTrace;
1198         do {
1199             stackTrace = (carrierThread != null)
1200                 ? super.asyncGetStackTrace()                          // mounted
1201                 : supplyIfUnmounted(cont::getStackTrace,              // unmounted
1202                                     () -> new StackTraceElement[0]);
1203             if (stackTrace == null) {
1204                 Thread.yield();
1205             }
1206         } while (stackTrace == null);
1207         return stackTrace;
1208     }
1209 
1210     /**
1211      * Invokes a supplier to produce a non-null result if this virtual thread is not mounted.
1212      * @param supplier1 invoked if this virtual thread is alive and unmounted
1213      * @param supplier2 invoked if this virtual thread is not alive
1214      * @return the result; {@code null} if this virtual thread is mounted or in transition
1215      */
1216     <T> T supplyIfUnmounted(Supplier<T> supplier1, Supplier<T> supplier2) {
1217         int initialState = state() & ~SUSPENDED;
1218         switch (initialState) {
1219             case NEW, STARTED, TERMINATED -> {
1220                 return supplier2.get();  // terminated or not started
1221             }
1222             case RUNNING, PINNED, TIMED_PINNED -> {
1223                 return null; // mounted
1224             }
1225             case PARKED, TIMED_PARKED, BLOCKED, WAIT, TIMED_WAIT -> {
1226                 // unmounted, not runnable
1227             }
1228             case UNPARKED, UNBLOCKED, YIELDED -> {
1229                 // unmounted, runnable
1230             }
1231             case PARKING, TIMED_PARKING, BLOCKING, YIELDING, WAITING, TIMED_WAITING -> {
1232                 return null; // in transition
1233             }
1234             default -> throw new InternalError("" + initialState);
1235         }
1236 
1237         // thread is unmounted, prevent it from continuing
1238         int suspendedState = initialState | SUSPENDED;
1239         if (!compareAndSetState(initialState, suspendedState)) {
1240             return null;
1241         }
1242 


1243         try {
1244             return supplier1.get();
1245         } finally {
1246             assert state == suspendedState;
1247             setState(initialState);
1248 
1249             boolean resubmit = switch (initialState) {
1250                 case UNPARKED, UNBLOCKED, YIELDED -> {
1251                     // resubmit as task may have run while suspended
1252                     yield true;
1253                 }
1254                 case PARKED, TIMED_PARKED -> {
1255                     // resubmit if unparked while suspended
1256                     yield parkPermit && compareAndSetState(initialState, UNPARKED);
1257                 }
1258                 case BLOCKED -> {
1259                     // resubmit if unblocked while suspended
1260                     yield blockPermit && compareAndSetState(BLOCKED, UNBLOCKED);
1261                 }
1262                 case WAIT, TIMED_WAIT -> {
1263                     // resubmit if notified or interrupted while waiting (Object.wait)
1264                     // waitTimeoutExpired will retry if the timed expired when suspended
1265                     yield (notified || interrupted) && compareAndSetState(initialState, UNBLOCKED);
1266                 }
1267                 default -> throw new InternalError();
1268             };
1269             if (resubmit) {
1270                 submitRunContinuation();
1271             }




1272         }
1273 
1274     }
1275 
1276     @Override
1277     public String toString() {
1278         StringBuilder sb = new StringBuilder("VirtualThread[#");
1279         sb.append(threadId());
1280         String name = getName();
1281         if (!name.isEmpty()) {
1282             sb.append(",");
1283             sb.append(name);
1284         }
1285         sb.append("]/");
1286 
1287         // add the carrier state and thread name when mounted
1288         boolean mounted;
1289         if (Thread.currentThread() == this) {
1290             mounted = appendCarrierInfo(sb);
1291         } else {
1292             disableSuspendAndPreempt();
1293             try {

1433     @IntrinsicCandidate
1434     @JvmtiMountTransition
1435     private native void notifyJvmtiMount(boolean hide);
1436 
1437     @IntrinsicCandidate
1438     @JvmtiMountTransition
1439     private native void notifyJvmtiUnmount(boolean hide);
1440 
1441     @IntrinsicCandidate
1442     private static native void notifyJvmtiDisableSuspend(boolean enter);
1443 
1444     private static native void registerNatives();
1445     static {
1446         registerNatives();
1447 
1448         // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
1449         var group = Thread.virtualThreadGroup();
1450     }
1451 
1452     /**
1453      * Loads a VirtualThreadScheduler with the given class name to use at the
1454      * default scheduler. The class is public in an exported package, has a public
1455      * one-arg or no-arg constructor, and is visible to the system class loader.
1456      */
1457     private static VirtualThreadScheduler createCustomDefaultScheduler(String cn) {
1458         try {
1459             Class<?> clazz = Class.forName(cn, true, ClassLoader.getSystemClassLoader());
1460             VirtualThreadScheduler scheduler;
1461             try {
1462                 // 1-arg constructor
1463                 Constructor<?> ctor = clazz.getConstructor(VirtualThreadScheduler.class);
1464                 var builtin = createDefaultForkJoinPoolScheduler();
1465                 scheduler = (VirtualThreadScheduler) ctor.newInstance(builtin.externalView());
1466             } catch (NoSuchMethodException e) {
1467                 // 0-arg constructor
1468                 Constructor<?> ctor = clazz.getConstructor();
1469                 scheduler = (VirtualThreadScheduler) ctor.newInstance();
1470             }
1471             System.err.println("""
1472                 WARNING: Using custom default scheduler, this is an experimental feature!""");
1473             return scheduler;
1474         } catch (Exception ex) {
1475             throw new Error(ex);
1476         }
1477     }
1478 
1479     /**
1480      * Creates the built-in default ForkJoinPool scheduler.
1481      */
1482     private static BuiltinDefaultScheduler createDefaultForkJoinPoolScheduler() {
1483         int parallelism, maxPoolSize, minRunnable;
1484         String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
1485         String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
1486         String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
1487         if (parallelismValue != null) {
1488             parallelism = Integer.parseInt(parallelismValue);
1489         } else {
1490             parallelism = Runtime.getRuntime().availableProcessors();
1491         }
1492         if (maxPoolSizeValue != null) {
1493             maxPoolSize = Integer.parseInt(maxPoolSizeValue);
1494             parallelism = Integer.min(parallelism, maxPoolSize);
1495         } else {
1496             maxPoolSize = Integer.max(parallelism, 256);
1497         }
1498         if (minRunnableValue != null) {
1499             minRunnable = Integer.parseInt(minRunnableValue);
1500         } else {
1501             minRunnable = Integer.max(parallelism / 2, 1);
1502         }
1503         return new BuiltinDefaultScheduler(parallelism, maxPoolSize, minRunnable);
1504     }
1505 
1506     /**
1507      * The built-in default ForkJoinPool scheduler.
1508      */
1509     private static class BuiltinDefaultScheduler
1510             extends ForkJoinPool implements VirtualThreadScheduler {
1511 
1512         private static final StableValue<VirtualThreadScheduler> VIEW = StableValue.of();
1513 
1514         BuiltinDefaultScheduler(int parallelism, int maxPoolSize, int minRunnable) {
1515             ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool);
1516             Thread.UncaughtExceptionHandler handler = (t, e) -> { };
1517             boolean asyncMode = true; // FIFO
1518             super(parallelism, factory, handler, asyncMode,
1519                     0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);
1520         }
1521 
1522         @Override
1523         public void execute(Thread vthread, Runnable task) {
1524             execute(ForkJoinTask.adapt(task));
1525         }
1526 
1527         /**
1528          * Wraps the scheduler to avoid leaking a direct reference.
1529          */
1530         VirtualThreadScheduler externalView() {
1531             VirtualThreadScheduler builtin = this;
1532             return VIEW.orElseSet(() -> {
1533                 return new VirtualThreadScheduler() {
1534                     @Override
1535                     public void execute(Thread thread, Runnable task) {
1536                         Objects.requireNonNull(thread);
1537                         if (thread instanceof VirtualThread vthread) {
1538                             VirtualThreadScheduler scheduler = vthread.scheduler;
1539                             if (scheduler == this || scheduler == DEFAULT_SCHEDULER) {
1540                                 builtin.execute(thread, task);
1541                             } else {
1542                                 throw new IllegalArgumentException();
1543                             }
1544                         } else {
1545                             throw new UnsupportedOperationException();
1546                         }
1547                     }
1548                 };
1549             });
1550         }
1551     }
1552 
1553     /**
1554      * Schedule a runnable task to run after a delay.
1555      */
1556     private Future<?> schedule(Runnable command, long delay, TimeUnit unit) {
1557         if (scheduler instanceof ForkJoinPool pool) {
1558             return pool.schedule(command, delay, unit);
1559         } else {
1560             return DelayedTaskSchedulers.schedule(command, delay, unit);
1561         }
1562     }
1563 
1564     /**
1565      * Supports scheduling a runnable task to run after a delay. It uses a number
1566      * of ScheduledThreadPoolExecutor instances to reduce contention on the delayed
1567      * work queue used. This class is used when using a custom scheduler.
1568      */
1569     private static class DelayedTaskSchedulers {
1570         private static final ScheduledExecutorService[] INSTANCE = createDelayedTaskSchedulers();

1620                 assert changed;
1621                 vthread.unblock();
1622 
1623                 vthread = nextThread;
1624             }
1625         }
1626     }
1627 
1628     /**
1629      * Retrieves the list of virtual threads that are waiting to be unblocked, waiting
1630      * if necessary until a list of one or more threads becomes available.
1631      */
1632     private static native VirtualThread takeVirtualThreadListToUnblock();
1633 
1634     static {
1635         var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
1636                 VirtualThread::unblockVirtualThreads);
1637         unblocker.setDaemon(true);
1638         unblocker.start();
1639     }
1640 }
< prev index next >