< prev index next >

src/java.base/share/classes/java/lang/VirtualThread.java

Print this page

   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.lang;
  26 



  27 import java.util.Locale;
  28 import java.util.Objects;
  29 import java.util.concurrent.Executor;
  30 import java.util.concurrent.Executors;
  31 import java.util.concurrent.ForkJoinPool;
  32 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
  33 import java.util.concurrent.ForkJoinTask;
  34 import java.util.concurrent.Future;

  35 import java.util.concurrent.RejectedExecutionException;
  36 import java.util.concurrent.ScheduledExecutorService;

  37 import java.util.concurrent.ScheduledThreadPoolExecutor;


  38 import java.util.concurrent.TimeUnit;
  39 import jdk.internal.event.VirtualThreadEndEvent;
  40 import jdk.internal.event.VirtualThreadStartEvent;
  41 import jdk.internal.event.VirtualThreadSubmitFailedEvent;

  42 import jdk.internal.misc.CarrierThread;
  43 import jdk.internal.misc.InnocuousThread;
  44 import jdk.internal.misc.Unsafe;
  45 import jdk.internal.vm.Continuation;
  46 import jdk.internal.vm.ContinuationScope;
  47 import jdk.internal.vm.StackableScope;
  48 import jdk.internal.vm.ThreadContainer;
  49 import jdk.internal.vm.ThreadContainers;
  50 import jdk.internal.vm.annotation.ChangesCurrentThread;
  51 import jdk.internal.vm.annotation.Hidden;
  52 import jdk.internal.vm.annotation.IntrinsicCandidate;
  53 import jdk.internal.vm.annotation.JvmtiHideEvents;
  54 import jdk.internal.vm.annotation.JvmtiMountTransition;
  55 import jdk.internal.vm.annotation.ReservedStackAccess;
  56 import sun.nio.ch.Interruptible;
  57 import static java.util.concurrent.TimeUnit.*;
  58 
  59 /**
  60  * A thread that is scheduled by the Java virtual machine rather than the operating system.
  61  */
  62 final class VirtualThread extends BaseVirtualThread {
  63     private static final Unsafe U = Unsafe.getUnsafe();
  64     private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
  65     private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();



























  66 
  67     private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
  68     private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
  69     private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
  70     private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
  71 
  72     // scheduler and continuation
  73     private final Executor scheduler;
  74     private final Continuation cont;
  75     private final Runnable runContinuation;
  76 
  77     // virtual thread state, accessed by VM
  78     private volatile int state;
  79 
  80     /*
  81      * Virtual thread state transitions:
  82      *
  83      *      NEW -> STARTED         // Thread.start, schedule to run
  84      *  STARTED -> TERMINATED      // failed to start
  85      *  STARTED -> RUNNING         // first run
  86      *  RUNNING -> TERMINATED      // done
  87      *
  88      *  RUNNING -> PARKING         // Thread parking with LockSupport.park
  89      *  PARKING -> PARKED          // cont.yield successful, parked indefinitely
  90      *   PARKED -> UNPARKED        // unparked, may be scheduled to continue
  91      * UNPARKED -> RUNNING         // continue execution after park
  92      *
  93      *  PARKING -> RUNNING         // cont.yield failed, need to park on carrier
  94      *  RUNNING -> PINNED          // park on carrier
  95      *   PINNED -> RUNNING         // unparked, continue execution on same carrier

 169 
 170     // true when waiting in Object.wait, false for VM internal uninterruptible Object.wait
 171     private volatile boolean interruptibleWait;
 172 
 173     // timed-wait support
 174     private byte timedWaitSeqNo;
 175 
 176     // timeout for timed-park and timed-wait, only accessed on current/carrier thread
 177     private long timeout;
 178 
 179     // timer task for timed-park and timed-wait, only accessed on current/carrier thread
 180     private Future<?> timeoutTask;
 181 
 182     // carrier thread when mounted, accessed by VM
 183     private volatile Thread carrierThread;
 184 
 185     // true to notifyAll after this virtual thread terminates
 186     private volatile boolean notifyAllAfterTerminate;
 187 
 188     /**
 189      * Returns the default scheduler.








 190      */
 191     static Executor defaultScheduler() {
 192         return DEFAULT_SCHEDULER;
 193     }
 194 
 195     /**
 196      * Returns the continuation scope used for virtual threads.
 197      */
 198     static ContinuationScope continuationScope() {
 199         return VTHREAD_SCOPE;
 200     }
 201 
 202     /**
 203      * Creates a new {@code VirtualThread} to run the given task with the given
 204      * scheduler. If the given scheduler is {@code null} and the current thread
 205      * is a platform thread then the newly created virtual thread will use the
 206      * default scheduler. If given scheduler is {@code null} and the current
 207      * thread is a virtual thread then the current thread's scheduler is used.



 208      *
 209      * @param scheduler the scheduler or null

 210      * @param name thread name
 211      * @param characteristics characteristics
 212      * @param task the task to execute
 213      */
 214     VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {




 215         super(name, characteristics, /*bound*/ false);
 216         Objects.requireNonNull(task);
 217 
 218         // choose scheduler if not specified
 219         if (scheduler == null) {
 220             Thread parent = Thread.currentThread();
 221             if (parent instanceof VirtualThread vparent) {
 222                 scheduler = vparent.scheduler;
 223             } else {
 224                 scheduler = DEFAULT_SCHEDULER;
 225             }
 226         }
 227 
 228         this.scheduler = scheduler;
 229         this.cont = new VThreadContinuation(this, task);
 230         this.runContinuation = this::runContinuation;





























































 231     }
 232 
 233     /**
 234      * The continuation that a virtual thread executes.
 235      */
 236     private static class VThreadContinuation extends Continuation {
 237         VThreadContinuation(VirtualThread vthread, Runnable task) {
 238             super(VTHREAD_SCOPE, wrap(vthread, task));
 239         }
 240         @Override
 241         protected void onPinned(Continuation.Pinned reason) {
 242         }
 243         private static Runnable wrap(VirtualThread vthread, Runnable task) {
 244             return new Runnable() {
 245                 @Hidden
 246                 @JvmtiHideEvents
 247                 public void run() {
 248                     vthread.endFirstTransition();
 249                     try {
 250                         vthread.run(task);

 298             if (cont.isDone()) {
 299                 afterDone();
 300             } else {
 301                 afterYield();
 302             }
 303         }
 304     }
 305 
 306     /**
 307      * Cancel timeout task when continuing after timed-park or timed-wait.
 308      * The timeout task may be executing, or may have already completed.
 309      */
 310     private void cancelTimeoutTask() {
 311         if (timeoutTask != null) {
 312             timeoutTask.cancel(false);
 313             timeoutTask = null;
 314         }
 315     }
 316 
 317     /**
 318      * Submits the given task to the given executor. If the scheduler is a
 319      * ForkJoinPool then the task is first adapted to a ForkJoinTask.
 320      */
 321     private void submit(Executor executor, Runnable task) {
 322         if (executor instanceof ForkJoinPool pool) {
 323             pool.submit(ForkJoinTask.adapt(task));
 324         } else {
 325             executor.execute(task);
 326         }
 327     }
 328 
 329     /**
 330      * Submits the runContinuation task to the scheduler. For the default scheduler,
 331      * and calling it on a worker thread, the task will be pushed to the local queue,
 332      * otherwise it will be pushed to an external submission queue.
 333      * @param scheduler the scheduler
 334      * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown
 335      * @throws RejectedExecutionException
 336      */
 337     private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) {
 338         boolean done = false;
 339         while (!done) {
 340             try {
 341                 // Pin the continuation to prevent the virtual thread from unmounting
 342                 // when submitting a task. For the default scheduler this ensures that
 343                 // the carrier doesn't change when pushing a task. For other schedulers
 344                 // it avoids deadlock that could arise due to carriers and virtual
 345                 // threads contending for a lock.
 346                 if (currentThread().isVirtual()) {
 347                     Continuation.pin();
 348                     try {
 349                         submit(scheduler, runContinuation);
 350                     } finally {
 351                         Continuation.unpin();
 352                     }
 353                 } else {
 354                     submit(scheduler, runContinuation);
 355                 }
 356                 done = true;
 357             } catch (RejectedExecutionException ree) {
 358                 submitFailed(ree);
 359                 throw ree;
 360             } catch (OutOfMemoryError e) {
 361                 if (retryOnOOME) {
 362                     U.park(false, 100_000_000); // 100ms
 363                 } else {
 364                     throw e;
 365                 }
 366             }
 367         }
 368     }
 369 
 370     /**
 371      * Submits the runContinuation task to the given scheduler as an external submit.
 372      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
 373      * @throws RejectedExecutionException
 374      * @see ForkJoinPool#externalSubmit(ForkJoinTask)
 375      */
 376     private void externalSubmitRunContinuation(ForkJoinPool pool) {
 377         assert Thread.currentThread() instanceof CarrierThread;
 378         try {
 379             pool.externalSubmit(ForkJoinTask.adapt(runContinuation));
 380         } catch (RejectedExecutionException ree) {
 381             submitFailed(ree);
 382             throw ree;
 383         } catch (OutOfMemoryError e) {
 384             submitRunContinuation(pool, true);
 385         }
 386     }
 387 
 388     /**
 389      * Submits the runContinuation task to the scheduler. For the default scheduler,
 390      * and calling it on a worker thread, the task will be pushed to the local queue,
 391      * otherwise it will be pushed to an external submission queue.
 392      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
 393      * @throws RejectedExecutionException
 394      */
 395     private void submitRunContinuation() {
 396         submitRunContinuation(scheduler, true);
 397     }
 398 
 399     /**
 400      * Lazy submit the runContinuation task if invoked on a carrier thread and its local
 401      * queue is empty. If not empty, or invoked by another thread, then this method works
 402      * like submitRunContinuation and just submits the task to the scheduler.
 403      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
 404      * @throws RejectedExecutionException
 405      * @see ForkJoinPool#lazySubmit(ForkJoinTask)
 406      */
 407     private void lazySubmitRunContinuation() {

 408         if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
 409             ForkJoinPool pool = ct.getPool();
 410             try {
 411                 pool.lazySubmit(ForkJoinTask.adapt(runContinuation));
 412             } catch (RejectedExecutionException ree) {
 413                 submitFailed(ree);
 414                 throw ree;
 415             } catch (OutOfMemoryError e) {
 416                 submitRunContinuation();
 417             }
 418         } else {
 419             submitRunContinuation();
 420         }
 421     }
 422 
 423     /**
 424      * Submits the runContinuation task to the scheduler. For the default scheduler, and
 425      * calling it a virtual thread that uses the default scheduler, the task will be
 426      * pushed to an external submission queue. This method may throw OutOfMemoryError.

 427      * @throws RejectedExecutionException
 428      * @throws OutOfMemoryError
 429      */
 430     private void externalSubmitRunContinuationOrThrow() {
 431         if (scheduler == DEFAULT_SCHEDULER && currentCarrierThread() instanceof CarrierThread ct) {

 432             try {
 433                 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation));
 434             } catch (RejectedExecutionException ree) {
 435                 submitFailed(ree);
 436                 throw ree;


 437             }
 438         } else {
 439             submitRunContinuation(scheduler, false);
 440         }
 441     }
 442 
 443     /**
 444      * If enabled, emits a JFR VirtualThreadSubmitFailedEvent.
 445      */
 446     private void submitFailed(RejectedExecutionException ree) {
 447         var event = new VirtualThreadSubmitFailedEvent();
 448         if (event.isEnabled()) {
 449             event.javaThreadId = threadId();
 450             event.exceptionMessage = ree.getMessage();
 451             event.commit();
 452         }
 453     }
 454 
 455     /**
 456      * Runs a task in the context of this virtual thread.
 457      */
 458     private void run(Runnable task) {
 459         assert Thread.currentThread() == this && state == RUNNING;

 576                 long timeout = this.timeout;
 577                 assert timeout > 0;
 578                 timeoutTask = schedule(this::parkTimeoutExpired, timeout, NANOSECONDS);
 579                 setState(newState = TIMED_PARKED);
 580             }
 581 
 582             // may have been unparked while parking
 583             if (parkPermit && compareAndSetState(newState, UNPARKED)) {
 584                 // lazy submit if local queue is empty
 585                 lazySubmitRunContinuation();
 586             }
 587             return;
 588         }
 589 
 590         // Thread.yield
 591         if (s == YIELDING) {
 592             setState(YIELDED);
 593 
 594             // external submit if there are no tasks in the local task queue
 595             if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
 596                 externalSubmitRunContinuation(ct.getPool());
 597             } else {
 598                 submitRunContinuation();
 599             }
 600             return;
 601         }
 602 
 603         // blocking on monitorenter
 604         if (s == BLOCKING) {
 605             setState(BLOCKED);
 606 
 607             // may have been unblocked while blocking
 608             if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
 609                 // lazy submit if local queue is empty
 610                 lazySubmitRunContinuation();
 611             }
 612             return;
 613         }
 614 
 615         // Object.wait
 616         if (s == WAITING || s == TIMED_WAITING) {

 681                 notifyAll();
 682             }
 683         }
 684 
 685         // notify container
 686         if (notifyContainer) {
 687             threadContainer().remove(this);
 688         }
 689 
 690         // clear references to thread locals
 691         clearReferences();
 692     }
 693 
 694     /**
 695      * Schedules this {@code VirtualThread} to execute.
 696      *
 697      * @throws IllegalStateException if the container is shutdown or closed
 698      * @throws IllegalThreadStateException if the thread has already been started
 699      * @throws RejectedExecutionException if the scheduler cannot accept a task
 700      */
 701     @Override
 702     void start(ThreadContainer container) {
 703         if (!compareAndSetState(NEW, STARTED)) {
 704             throw new IllegalThreadStateException("Already started");
 705         }
 706 
 707         // bind thread to container
 708         assert threadContainer() == null;
 709         setThreadContainer(container);
 710 
 711         // start thread
 712         boolean addedToContainer = false;
 713         boolean started = false;
 714         try {
 715             container.add(this);  // may throw
 716             addedToContainer = true;
 717 
 718             // scoped values may be inherited
 719             inheritScopedValueBindings(container);
 720 
 721             // submit task to run thread, using externalSubmit if possible
 722             externalSubmitRunContinuationOrThrow();


























 723             started = true;
 724         } finally {
 725             if (!started) {
 726                 afterDone(addedToContainer);
 727             }
 728         }
 729     }
 730 





 731     @Override
 732     public void start() {
 733         start(ThreadContainers.root());







 734     }
 735 
 736     @Override
 737     public void run() {
 738         // do nothing
 739     }
 740 
 741     /**
 742      * Invoked by Thread.join before a thread waits for this virtual thread to terminate.
 743      */
 744     void beforeJoin() {
 745         notifyAllAfterTerminate = true;
 746     }
 747 
 748     /**
 749      * Parks until unparked or interrupted. If already unparked then the parking
 750      * permit is consumed and this method completes immediately (meaning it doesn't
 751      * yield). It also completes immediately if the interrupted status is set.
 752      */
 753     @Override

 857      * Call into VM when pinned to record a JFR jdk.VirtualThreadPinned event.
 858      * Recording the event in the VM avoids having JFR event recorded in Java
 859      * with the same name, but different ID, to events recorded by the VM.
 860      */
 861     @Hidden
 862     private static native void postPinnedEvent(String op);
 863 
 864     /**
 865      * Re-enables this virtual thread for scheduling. If this virtual thread is parked
 866      * then its task is scheduled to continue, otherwise its next call to {@code park} or
 867      * {@linkplain #parkNanos(long) parkNanos} is guaranteed not to block.
 868      * @param lazySubmit to use lazySubmit if possible
 869      * @throws RejectedExecutionException if the scheduler cannot accept a task
 870      */
 871     private void unpark(boolean lazySubmit) {
 872         if (!getAndSetParkPermit(true) && currentThread() != this) {
 873             int s = state();
 874 
 875             // unparked while parked
 876             if ((s == PARKED || s == TIMED_PARKED) && compareAndSetState(s, UNPARKED)) {
 877                 if (lazySubmit) {
 878                     lazySubmitRunContinuation();
 879                 } else {
 880                     submitRunContinuation();













 881                 }


 882                 return;
 883             }
 884 
 885             // unparked while parked when pinned
 886             if (s == PINNED || s == TIMED_PINNED) {
 887                 // unpark carrier thread when pinned
 888                 disableSuspendAndPreempt();
 889                 try {
 890                     synchronized (carrierThreadAccessLock()) {
 891                         Thread carrier = carrierThread;
 892                         if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) {
 893                             U.unpark(carrier);
 894                         }
 895                     }
 896                 } finally {
 897                     enableSuspendAndPreempt();
 898                 }
 899                 return;
 900             }
 901         }
 902     }
 903 
 904     @Override
 905     void unpark() {
 906         unpark(false);
 907     }
 908 





 909     /**
 910      * Invoked by unblocker thread to unblock this virtual thread.
 911      */
 912     private void unblock() {
 913         assert !Thread.currentThread().isVirtual();
 914         blockPermit = true;
 915         if (state() == BLOCKED && compareAndSetState(BLOCKED, UNBLOCKED)) {
 916             submitRunContinuation();
 917         }
 918     }
 919 
 920     /**
 921      * Invoked by FJP worker thread or STPE thread when park timeout expires.
 922      */
 923     private void parkTimeoutExpired() {
 924         assert !VirtualThread.currentThread().isVirtual();
 925         unpark(true);
 926     }
 927 
 928     /**

1308     @IntrinsicCandidate
1309     @JvmtiMountTransition
1310     private native void startTransition(boolean mount);
1311 
1312     @IntrinsicCandidate
1313     @JvmtiMountTransition
1314     private native void endTransition(boolean mount);
1315 
1316     @IntrinsicCandidate
1317     private static native void notifyJvmtiDisableSuspend(boolean enter);
1318 
1319     private static native void registerNatives();
1320     static {
1321         registerNatives();
1322 
1323         // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
1324         var group = Thread.virtualThreadGroup();
1325     }
1326 
1327     /**
1328      * Creates the default ForkJoinPool scheduler.




























1329      */
1330     private static ForkJoinPool createDefaultScheduler() {
1331         ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool);
1332         int parallelism, maxPoolSize, minRunnable;
1333         String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
1334         String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
1335         String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
1336         if (parallelismValue != null) {
1337             parallelism = Integer.parseInt(parallelismValue);
1338         } else {
1339             parallelism = Runtime.getRuntime().availableProcessors();
1340         }
1341         if (maxPoolSizeValue != null) {
1342             maxPoolSize = Integer.parseInt(maxPoolSizeValue);
1343             parallelism = Integer.min(parallelism, maxPoolSize);
1344         } else {
1345             maxPoolSize = Integer.max(parallelism, 256);
1346         }
1347         if (minRunnableValue != null) {
1348             minRunnable = Integer.parseInt(minRunnableValue);
1349         } else {
1350             minRunnable = Integer.max(parallelism / 2, 1);
1351         }
1352         Thread.UncaughtExceptionHandler handler = (t, e) -> { };
1353         boolean asyncMode = true; // FIFO
1354         return new ForkJoinPool(parallelism, factory, handler, asyncMode,
1355                      0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);



























































































1356     }
1357 
1358     /**
1359      * Schedule a runnable task to run after a delay.
1360      */
1361     private Future<?> schedule(Runnable command, long delay, TimeUnit unit) {
1362         if (scheduler instanceof ForkJoinPool pool) {
1363             return pool.schedule(command, delay, unit);
1364         } else {
1365             return DelayedTaskSchedulers.schedule(command, delay, unit);


1366         }
1367     }
1368 
1369     /**
1370      * Supports scheduling a runnable task to run after a delay. It uses a number
1371      * of ScheduledThreadPoolExecutor instances to reduce contention on the delayed
1372      * work queue used. This class is used when using a custom scheduler.
1373      */
1374     private static class DelayedTaskSchedulers {
1375         private static final ScheduledExecutorService[] INSTANCE = createDelayedTaskSchedulers();
1376 
1377         static Future<?> schedule(Runnable command, long delay, TimeUnit unit) {
1378             long tid = Thread.currentThread().threadId();
1379             int index = (int) tid & (INSTANCE.length - 1);
1380             return INSTANCE[index].schedule(command, delay, unit);
1381         }
1382 
1383         private static ScheduledExecutorService[] createDelayedTaskSchedulers() {
1384             String propName = "jdk.virtualThreadScheduler.timerQueues";
1385             String propValue = System.getProperty(propName);
1386             int queueCount;
1387             if (propValue != null) {
1388                 queueCount = Integer.parseInt(propValue);
1389                 if (queueCount != Integer.highestOneBit(queueCount)) {
1390                     throw new RuntimeException("Value of " + propName + " must be power of 2");
1391                 }
1392             } else {
1393                 int ncpus = Runtime.getRuntime().availableProcessors();
1394                 queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1);

   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.lang;
  26 
  27 import java.lang.invoke.MethodHandles;
  28 import java.lang.invoke.VarHandle;
  29 import java.lang.reflect.Constructor;
  30 import java.util.Locale;
  31 import java.util.Objects;

  32 import java.util.concurrent.Executors;
  33 import java.util.concurrent.ForkJoinPool;

  34 import java.util.concurrent.ForkJoinTask;
  35 import java.util.concurrent.Future;
  36 import java.util.concurrent.LinkedTransferQueue;
  37 import java.util.concurrent.RejectedExecutionException;
  38 import java.util.concurrent.ScheduledExecutorService;
  39 import java.util.concurrent.ScheduledFuture;
  40 import java.util.concurrent.ScheduledThreadPoolExecutor;
  41 import java.util.concurrent.ThreadFactory;
  42 import java.util.concurrent.ThreadPoolExecutor;
  43 import java.util.concurrent.TimeUnit;
  44 import jdk.internal.event.VirtualThreadEndEvent;
  45 import jdk.internal.event.VirtualThreadStartEvent;
  46 import jdk.internal.event.VirtualThreadSubmitFailedEvent;
  47 import jdk.internal.invoke.MhUtil;
  48 import jdk.internal.misc.CarrierThread;
  49 import jdk.internal.misc.InnocuousThread;
  50 import jdk.internal.misc.Unsafe;
  51 import jdk.internal.vm.Continuation;
  52 import jdk.internal.vm.ContinuationScope;
  53 import jdk.internal.vm.StackableScope;
  54 import jdk.internal.vm.ThreadContainer;
  55 import jdk.internal.vm.ThreadContainers;
  56 import jdk.internal.vm.annotation.ChangesCurrentThread;
  57 import jdk.internal.vm.annotation.Hidden;
  58 import jdk.internal.vm.annotation.IntrinsicCandidate;
  59 import jdk.internal.vm.annotation.JvmtiHideEvents;
  60 import jdk.internal.vm.annotation.JvmtiMountTransition;
  61 import jdk.internal.vm.annotation.ReservedStackAccess;
  62 import sun.nio.ch.Interruptible;
  63 import static java.util.concurrent.TimeUnit.*;
  64 
  65 /**
  66  * A thread that is scheduled by the Java virtual machine rather than the operating system.
  67  */
  68 final class VirtualThread extends BaseVirtualThread {
  69     private static final Unsafe U = Unsafe.getUnsafe();
  70     private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
  71 
  72     private static final VirtualThreadScheduler BUILTIN_SCHEDULER;
  73     private static final VirtualThreadScheduler DEFAULT_SCHEDULER;
  74     private static final VirtualThreadScheduler EXTERNAL_VIEW;
  75     private static final boolean USE_STPE;
  76     static {
  77         // experimental
  78         String propValue = System.getProperty("jdk.virtualThreadScheduler.implClass");
  79         if (propValue != null) {
  80             VirtualThreadScheduler builtinScheduler = createBuiltinScheduler(true);
  81             VirtualThreadScheduler externalView = createExternalView(builtinScheduler);
  82             VirtualThreadScheduler defaultScheduler = loadCustomScheduler(externalView, propValue);
  83             BUILTIN_SCHEDULER = builtinScheduler;
  84             DEFAULT_SCHEDULER = defaultScheduler;
  85             EXTERNAL_VIEW = externalView;
  86         } else {
  87             var builtinScheduler = createBuiltinScheduler(false);
  88             BUILTIN_SCHEDULER = builtinScheduler;
  89             DEFAULT_SCHEDULER = builtinScheduler;
  90             EXTERNAL_VIEW = createExternalView(builtinScheduler);
  91         }
  92         propValue = System.getProperty("jdk.virtualThreadScheduler.useSTPE");
  93         if (propValue != null) {
  94             USE_STPE = Boolean.parseBoolean(propValue);
  95         } else {
  96             USE_STPE = Runtime.getRuntime().availableProcessors() < 4;
  97         }
  98     }
  99 
 100     private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
 101     private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
 102     private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
 103     private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
 104 
 105     // scheduler and continuation
 106     private final VirtualThreadScheduler scheduler;
 107     private final Continuation cont;
 108     private final VThreadTask runContinuation;
 109 
 110     // virtual thread state, accessed by VM
 111     private volatile int state;
 112 
 113     /*
 114      * Virtual thread state transitions:
 115      *
 116      *      NEW -> STARTED         // Thread.start, schedule to run
 117      *  STARTED -> TERMINATED      // failed to start
 118      *  STARTED -> RUNNING         // first run
 119      *  RUNNING -> TERMINATED      // done
 120      *
 121      *  RUNNING -> PARKING         // Thread parking with LockSupport.park
 122      *  PARKING -> PARKED          // cont.yield successful, parked indefinitely
 123      *   PARKED -> UNPARKED        // unparked, may be scheduled to continue
 124      * UNPARKED -> RUNNING         // continue execution after park
 125      *
 126      *  PARKING -> RUNNING         // cont.yield failed, need to park on carrier
 127      *  RUNNING -> PINNED          // park on carrier
 128      *   PINNED -> RUNNING         // unparked, continue execution on same carrier

 202 
 203     // true when waiting in Object.wait, false for VM internal uninterruptible Object.wait
 204     private volatile boolean interruptibleWait;
 205 
 206     // timed-wait support
 207     private byte timedWaitSeqNo;
 208 
 209     // timeout for timed-park and timed-wait, only accessed on current/carrier thread
 210     private long timeout;
 211 
 212     // timer task for timed-park and timed-wait, only accessed on current/carrier thread
 213     private Future<?> timeoutTask;
 214 
 215     // carrier thread when mounted, accessed by VM
 216     private volatile Thread carrierThread;
 217 
 218     // true to notifyAll after this virtual thread terminates
 219     private volatile boolean notifyAllAfterTerminate;
 220 
 221     /**
 222      * Return the built-in scheduler.
 223      * @param trusted true if caller is trusted, false if not trusted
 224      */
 225     static VirtualThreadScheduler builtinScheduler(boolean trusted) {
 226         return trusted ? BUILTIN_SCHEDULER : EXTERNAL_VIEW;
 227     }
 228 
 229     /**
 230      * Returns the default scheduler, usually the same as the built-in scheduler.
 231      */
 232     static VirtualThreadScheduler defaultScheduler() {
 233         return DEFAULT_SCHEDULER;
 234     }
 235 
 236     /**
 237      * Returns the continuation scope used for virtual threads.
 238      */
 239     static ContinuationScope continuationScope() {
 240         return VTHREAD_SCOPE;
 241     }
 242 
 243     /**
 244      * Returns the task to start/continue this virtual thread.
 245      */
 246     VirtualThreadTask virtualThreadTask() {
 247         return runContinuation;
 248     }
 249 
 250     /**
 251      * Creates a new {@code VirtualThread} to run the given task with the given scheduler.
 252      *
 253      * @param scheduler the scheduler or null for default scheduler
 254      * @param preferredCarrier the preferred carrier or null
 255      * @param name thread name
 256      * @param characteristics characteristics
 257      * @param task the task to execute
 258      */
 259     VirtualThread(VirtualThreadScheduler scheduler,
 260                   Thread preferredCarrier,
 261                   String name,
 262                   int characteristics,
 263                   Runnable task) {
 264         super(name, characteristics, /*bound*/ false);
 265         Objects.requireNonNull(task);
 266 
 267         // use default scheduler if not provided
 268         if (scheduler == null) {
 269             scheduler = DEFAULT_SCHEDULER;
 270         } else if (scheduler == EXTERNAL_VIEW) {
 271             throw new UnsupportedOperationException();



 272         }

 273         this.scheduler = scheduler;
 274         this.cont = new VThreadContinuation(this, task);
 275 
 276         if (scheduler == BUILTIN_SCHEDULER) {
 277             this.runContinuation = new VThreadTask(this);
 278         } else {
 279             this.runContinuation = new CustomVThreadTask(this, preferredCarrier);
 280         }
 281     }
 282 
 283     /**
 284      * The task to start/continue a virtual thread.
 285      */
 286     static non-sealed class VThreadTask implements VirtualThreadTask {
 287         private final VirtualThread vthread;
 288         VThreadTask(VirtualThread vthread) {
 289             this.vthread = vthread;
 290         }
 291         @Override
 292         public final Thread thread() {
 293             return vthread;
 294         }
 295         @Override
 296         public final void run() {
 297             vthread.runContinuation();;
 298         }
 299         @Override
 300         public Thread preferredCarrier() {
 301             throw new UnsupportedOperationException();
 302         }
 303         @Override
 304         public Object attach(Object att) {
 305             throw new UnsupportedOperationException();
 306         }
 307         @Override
 308         public Object attachment() {
 309             throw new UnsupportedOperationException();
 310         }
 311     }
 312 
 313     /**
 314      * The task to start/continue a virtual thread when using a custom scheduler.
 315      */
 316     static final class CustomVThreadTask extends VThreadTask {
 317         private static final VarHandle ATT =
 318                 MhUtil.findVarHandle(MethodHandles.lookup(), "att", Object.class);
 319         private final Thread preferredCarrier;
 320         private volatile Object att;
 321         CustomVThreadTask(VirtualThread vthread, Thread preferredCarrier) {
 322             super(vthread);
 323             this.preferredCarrier = preferredCarrier;
 324         }
 325         @Override
 326         public Thread preferredCarrier() {
 327             return preferredCarrier;
 328         }
 329         @Override
 330         public Object attach(Object att) {
 331             return ATT.getAndSet(this, att);
 332         }
 333         @Override
 334         public Object attachment() {
 335             return att;
 336         }
 337     }
 338 
 339     /**
 340      * The continuation that a virtual thread executes.
 341      */
 342     private static class VThreadContinuation extends Continuation {
 343         VThreadContinuation(VirtualThread vthread, Runnable task) {
 344             super(VTHREAD_SCOPE, wrap(vthread, task));
 345         }
 346         @Override
 347         protected void onPinned(Continuation.Pinned reason) {
 348         }
 349         private static Runnable wrap(VirtualThread vthread, Runnable task) {
 350             return new Runnable() {
 351                 @Hidden
 352                 @JvmtiHideEvents
 353                 public void run() {
 354                     vthread.endFirstTransition();
 355                     try {
 356                         vthread.run(task);

 404             if (cont.isDone()) {
 405                 afterDone();
 406             } else {
 407                 afterYield();
 408             }
 409         }
 410     }
 411 
 412     /**
 413      * Cancel timeout task when continuing after timed-park or timed-wait.
 414      * The timeout task may be executing, or may have already completed.
 415      */
 416     private void cancelTimeoutTask() {
 417         if (timeoutTask != null) {
 418             timeoutTask.cancel(false);
 419             timeoutTask = null;
 420         }
 421     }
 422 
 423     /**
 424      * Submits the runContinuation task to the scheduler. For the built-in scheduler,
 425      * the task will be pushed to the local queue if possible, otherwise it will be
 426      * pushed to an external submission queue.













 427      * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown
 428      * @throws RejectedExecutionException
 429      */
 430     private void submitRunContinuation(boolean retryOnOOME) {
 431         boolean done = false;
 432         while (!done) {
 433             try {
 434                 // Pin the continuation to prevent the virtual thread from unmounting
 435                 // when submitting a task. For the default scheduler this ensures that
 436                 // the carrier doesn't change when pushing a task. For other schedulers
 437                 // it avoids deadlock that could arise due to carriers and virtual
 438                 // threads contending for a lock.
 439                 if (currentThread().isVirtual()) {
 440                     Continuation.pin();
 441                     try {
 442                         scheduler.onContinue(runContinuation);
 443                     } finally {
 444                         Continuation.unpin();
 445                     }
 446                 } else {
 447                     scheduler.onContinue(runContinuation);
 448                 }
 449                 done = true;
 450             } catch (RejectedExecutionException ree) {
 451                 submitFailed(ree);
 452                 throw ree;
 453             } catch (OutOfMemoryError e) {
 454                 if (retryOnOOME) {
 455                     U.park(false, 100_000_000); // 100ms
 456                 } else {
 457                     throw e;
 458                 }
 459             }
 460         }
 461     }
 462 


















 463     /**
 464      * Submits the runContinuation task to the scheduler. For the default scheduler,
 465      * and calling it on a worker thread, the task will be pushed to the local queue,
 466      * otherwise it will be pushed to an external submission queue.
 467      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
 468      * @throws RejectedExecutionException
 469      */
 470     private void submitRunContinuation() {
 471         submitRunContinuation(true);
 472     }
 473 
 474     /**
 475      * Invoked from a carrier thread to lazy submit the runContinuation task to the
 476      * carrier's local queue if the queue is empty. If not empty, or invoked by a thread
 477      * for a custom scheduler, then it just submits the task to the scheduler.
 478      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
 479      * @throws RejectedExecutionException
 480      * @see ForkJoinPool#lazySubmit(ForkJoinTask)
 481      */
 482     private void lazySubmitRunContinuation() {
 483         assert !currentThread().isVirtual();
 484         if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {

 485             try {
 486                 ct.getPool().lazySubmit(ForkJoinTask.adapt(runContinuation));
 487             } catch (RejectedExecutionException ree) {
 488                 submitFailed(ree);
 489                 throw ree;
 490             } catch (OutOfMemoryError e) {
 491                 submitRunContinuation();
 492             }
 493         } else {
 494             submitRunContinuation();
 495         }
 496     }
 497 
 498     /**
 499      * Invoked from a carrier thread to externally submit the runContinuation task to the
 500      * scheduler. If invoked by a thread for a custom scheduler, then it just submits the
 501      * task to the scheduler.
 502      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
 503      * @throws RejectedExecutionException
 504      * @see ForkJoinPool#externalSubmit(ForkJoinTask)
 505      */
 506     private void externalSubmitRunContinuation() {
 507         assert !currentThread().isVirtual();
 508         if (currentThread() instanceof CarrierThread ct) {
 509             try {
 510                 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation));
 511             } catch (RejectedExecutionException ree) {
 512                 submitFailed(ree);
 513                 throw ree;
 514             } catch (OutOfMemoryError e) {
 515                 submitRunContinuation();
 516             }
 517         } else {
 518             submitRunContinuation();
 519         }
 520     }
 521 
 522     /**
 523      * If enabled, emits a JFR VirtualThreadSubmitFailedEvent.
 524      */
 525     private void submitFailed(RejectedExecutionException ree) {
 526         var event = new VirtualThreadSubmitFailedEvent();
 527         if (event.isEnabled()) {
 528             event.javaThreadId = threadId();
 529             event.exceptionMessage = ree.getMessage();
 530             event.commit();
 531         }
 532     }
 533 
 534     /**
 535      * Runs a task in the context of this virtual thread.
 536      */
 537     private void run(Runnable task) {
 538         assert Thread.currentThread() == this && state == RUNNING;

 655                 long timeout = this.timeout;
 656                 assert timeout > 0;
 657                 timeoutTask = schedule(this::parkTimeoutExpired, timeout, NANOSECONDS);
 658                 setState(newState = TIMED_PARKED);
 659             }
 660 
 661             // may have been unparked while parking
 662             if (parkPermit && compareAndSetState(newState, UNPARKED)) {
 663                 // lazy submit if local queue is empty
 664                 lazySubmitRunContinuation();
 665             }
 666             return;
 667         }
 668 
 669         // Thread.yield
 670         if (s == YIELDING) {
 671             setState(YIELDED);
 672 
 673             // external submit if there are no tasks in the local task queue
 674             if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
 675                 externalSubmitRunContinuation();
 676             } else {
 677                 submitRunContinuation();
 678             }
 679             return;
 680         }
 681 
 682         // blocking on monitorenter
 683         if (s == BLOCKING) {
 684             setState(BLOCKED);
 685 
 686             // may have been unblocked while blocking
 687             if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
 688                 // lazy submit if local queue is empty
 689                 lazySubmitRunContinuation();
 690             }
 691             return;
 692         }
 693 
 694         // Object.wait
 695         if (s == WAITING || s == TIMED_WAITING) {

 760                 notifyAll();
 761             }
 762         }
 763 
 764         // notify container
 765         if (notifyContainer) {
 766             threadContainer().remove(this);
 767         }
 768 
 769         // clear references to thread locals
 770         clearReferences();
 771     }
 772 
 773     /**
 774      * Schedules this {@code VirtualThread} to execute.
 775      *
 776      * @throws IllegalStateException if the container is shutdown or closed
 777      * @throws IllegalThreadStateException if the thread has already been started
 778      * @throws RejectedExecutionException if the scheduler cannot accept a task
 779      */
 780     private void start(ThreadContainer container, boolean lazy) {

 781         if (!compareAndSetState(NEW, STARTED)) {
 782             throw new IllegalThreadStateException("Already started");
 783         }
 784 
 785         // bind thread to container
 786         assert threadContainer() == null;
 787         setThreadContainer(container);
 788 
 789         // start thread
 790         boolean addedToContainer = false;
 791         boolean started = false;
 792         try {
 793             container.add(this);  // may throw
 794             addedToContainer = true;
 795 
 796             // scoped values may be inherited
 797             inheritScopedValueBindings(container);
 798 
 799             // submit task to schedule
 800             try {
 801                 if (currentThread().isVirtual()) {
 802                     Continuation.pin();
 803                     try {
 804                         if (scheduler == BUILTIN_SCHEDULER
 805                                 && currentCarrierThread() instanceof CarrierThread ct) {
 806                             ForkJoinPool pool = ct.getPool();
 807                             ForkJoinTask<?> task = ForkJoinTask.adapt(runContinuation);
 808                             if (lazy) {
 809                                 pool.lazySubmit(task);
 810                             } else {
 811                                 pool.externalSubmit(task);
 812                             }
 813                         } else {
 814                             scheduler.onStart(runContinuation);
 815                         }
 816                     } finally {
 817                         Continuation.unpin();
 818                     }
 819                 } else {
 820                     scheduler.onStart(runContinuation);
 821                 }
 822             } catch (RejectedExecutionException ree) {
 823                 submitFailed(ree);
 824                 throw ree;
 825             }
 826 
 827             started = true;
 828         } finally {
 829             if (!started) {
 830                 afterDone(addedToContainer);
 831             }
 832         }
 833     }
 834 
 835     @Override
 836     void start(ThreadContainer container) {
 837         start(container, false);
 838     }
 839 
 840     @Override
 841     public void start() {
 842         start(ThreadContainers.root(), false);
 843     }
 844 
 845     /**
 846      * Schedules this thread to begin execution without guarantee that it will execute.
 847      */
 848     void lazyStart() {
 849         start(ThreadContainers.root(), true);
 850     }
 851 
 852     @Override
 853     public void run() {
 854         // do nothing
 855     }
 856 
 857     /**
 858      * Invoked by Thread.join before a thread waits for this virtual thread to terminate.
 859      */
 860     void beforeJoin() {
 861         notifyAllAfterTerminate = true;
 862     }
 863 
 864     /**
 865      * Parks until unparked or interrupted. If already unparked then the parking
 866      * permit is consumed and this method completes immediately (meaning it doesn't
 867      * yield). It also completes immediately if the interrupted status is set.
 868      */
 869     @Override

 973      * Call into VM when pinned to record a JFR jdk.VirtualThreadPinned event.
 974      * Recording the event in the VM avoids having JFR event recorded in Java
 975      * with the same name, but different ID, to events recorded by the VM.
 976      */
 977     @Hidden
 978     private static native void postPinnedEvent(String op);
 979 
 980     /**
 981      * Re-enables this virtual thread for scheduling. If this virtual thread is parked
 982      * then its task is scheduled to continue, otherwise its next call to {@code park} or
 983      * {@linkplain #parkNanos(long) parkNanos} is guaranteed not to block.
 984      * @param lazySubmit to use lazySubmit if possible
 985      * @throws RejectedExecutionException if the scheduler cannot accept a task
 986      */
 987     private void unpark(boolean lazySubmit) {
 988         if (!getAndSetParkPermit(true) && currentThread() != this) {
 989             int s = state();
 990 
 991             // unparked while parked
 992             if ((s == PARKED || s == TIMED_PARKED) && compareAndSetState(s, UNPARKED)) {
 993 
 994                 if (lazySubmit && currentThread().isVirtual()) {
 995                     Continuation.pin();
 996                     try {
 997                         if (scheduler == BUILTIN_SCHEDULER
 998                                 && currentCarrierThread() instanceof CarrierThread ct) {
 999                             ct.getPool().lazySubmit(ForkJoinTask.adapt(runContinuation));
1000                             return;
1001                         }
1002                     } catch (RejectedExecutionException ree) {
1003                         submitFailed(ree);
1004                         throw ree;
1005                     } catch (OutOfMemoryError e) {
1006                         // fall-though
1007                     } finally {
1008                         Continuation.unpin();
1009                     }
1010                 }
1011 
1012                 submitRunContinuation();
1013                 return;
1014             }
1015 
1016             // unparked while parked when pinned
1017             if (s == PINNED || s == TIMED_PINNED) {
1018                 // unpark carrier thread when pinned
1019                 disableSuspendAndPreempt();
1020                 try {
1021                     synchronized (carrierThreadAccessLock()) {
1022                         Thread carrier = carrierThread;
1023                         if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) {
1024                             U.unpark(carrier);
1025                         }
1026                     }
1027                 } finally {
1028                     enableSuspendAndPreempt();
1029                 }
1030                 return;
1031             }
1032         }
1033     }
1034 
1035     @Override
1036     void unpark() {
1037         unpark(false);
1038     }
1039 
1040     @Override
1041     void lazyUnpark() {
1042         unpark(true);
1043     }
1044 
1045     /**
1046      * Invoked by unblocker thread to unblock this virtual thread.
1047      */
1048     private void unblock() {
1049         assert !Thread.currentThread().isVirtual();
1050         blockPermit = true;
1051         if (state() == BLOCKED && compareAndSetState(BLOCKED, UNBLOCKED)) {
1052             submitRunContinuation();
1053         }
1054     }
1055 
1056     /**
1057      * Invoked by FJP worker thread or STPE thread when park timeout expires.
1058      */
1059     private void parkTimeoutExpired() {
1060         assert !VirtualThread.currentThread().isVirtual();
1061         unpark(true);
1062     }
1063 
1064     /**

1444     @IntrinsicCandidate
1445     @JvmtiMountTransition
1446     private native void startTransition(boolean mount);
1447 
1448     @IntrinsicCandidate
1449     @JvmtiMountTransition
1450     private native void endTransition(boolean mount);
1451 
1452     @IntrinsicCandidate
1453     private static native void notifyJvmtiDisableSuspend(boolean enter);
1454 
1455     private static native void registerNatives();
1456     static {
1457         registerNatives();
1458 
1459         // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
1460         var group = Thread.virtualThreadGroup();
1461     }
1462 
1463     /**
1464      * Loads a VirtualThreadScheduler with the given class name. The class must be public
1465      * in an exported package, with public one-arg or no-arg constructor, and be visible
1466      * to the system class loader.
1467      * @param delegate the scheduler that the custom scheduler may delegate to
1468      * @param cn the class name of the custom scheduler
1469      */
1470     private static VirtualThreadScheduler loadCustomScheduler(VirtualThreadScheduler delegate, String cn) {
1471         VirtualThreadScheduler scheduler;
1472         try {
1473             Class<?> clazz = Class.forName(cn, true, ClassLoader.getSystemClassLoader());
1474             // 1-arg constructor
1475             try {
1476                 Constructor<?> ctor = clazz.getConstructor(VirtualThreadScheduler.class);
1477                 return (VirtualThreadScheduler) ctor.newInstance(delegate);
1478             } catch (NoSuchMethodException e) {
1479                 // 0-arg constructor
1480                 Constructor<?> ctor = clazz.getConstructor();
1481                 scheduler = (VirtualThreadScheduler) ctor.newInstance();
1482             }
1483         } catch (Exception ex) {
1484             throw new Error(ex);
1485         }
1486         System.err.println("WARNING: Using custom default scheduler, this is an experimental feature!");
1487         return scheduler;
1488     }
1489 
1490     /**
1491      * Creates the built-in ForkJoinPool scheduler.
1492      * @param wrapped true if wrapped by a custom default scheduler
1493      */
1494     private static VirtualThreadScheduler createBuiltinScheduler(boolean wrapped) {

1495         int parallelism, maxPoolSize, minRunnable;
1496         String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
1497         String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
1498         String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
1499         if (parallelismValue != null) {
1500             parallelism = Integer.parseInt(parallelismValue);
1501         } else {
1502             parallelism = Runtime.getRuntime().availableProcessors();
1503         }
1504         if (maxPoolSizeValue != null) {
1505             maxPoolSize = Integer.parseInt(maxPoolSizeValue);
1506             parallelism = Integer.min(parallelism, maxPoolSize);
1507         } else {
1508             maxPoolSize = Integer.max(parallelism, 256);
1509         }
1510         if (minRunnableValue != null) {
1511             minRunnable = Integer.parseInt(minRunnableValue);
1512         } else {
1513             minRunnable = Integer.max(parallelism / 2, 1);
1514         }
1515         if (Boolean.getBoolean("jdk.virtualThreadScheduler.useTPE")) {
1516             return new BuiltinThreadPoolExecutorScheduler(parallelism);
1517         } else {
1518             return new BuiltinForkJoinPoolScheduler(parallelism, maxPoolSize, minRunnable, wrapped);
1519         }
1520     }
1521 
1522     /**
1523      * The built-in ForkJoinPool scheduler.
1524      */
1525     private static class BuiltinForkJoinPoolScheduler
1526             extends ForkJoinPool implements VirtualThreadScheduler {
1527 
1528         BuiltinForkJoinPoolScheduler(int parallelism, int maxPoolSize, int minRunnable, boolean wrapped) {
1529             ForkJoinWorkerThreadFactory factory = wrapped
1530                     ? ForkJoinPool.defaultForkJoinWorkerThreadFactory
1531                     : CarrierThread::new;
1532             Thread.UncaughtExceptionHandler handler = (t, e) -> { };
1533             boolean asyncMode = true; // FIFO
1534             super(parallelism, factory, handler, asyncMode,
1535                     0, maxPoolSize, minRunnable, pool -> true, 30L, SECONDS);
1536         }
1537 
1538         @Override
1539         public void onStart(VirtualThreadTask task) {
1540             execute(ForkJoinTask.adapt(task));
1541         }
1542 
1543         @Override
1544         public void onContinue(VirtualThreadTask task) {
1545             execute(ForkJoinTask.adapt(task));
1546         }
1547 
1548         @Override
1549         public ScheduledFuture<?> schedule(Runnable task, long delay, TimeUnit unit) {
1550             return super.schedule(task, delay, unit);
1551         }
1552     }
1553 
1554     /**
1555      * Built-in ThreadPoolExecutor scheduler.
1556      */
1557     private static class BuiltinThreadPoolExecutorScheduler
1558             extends ThreadPoolExecutor implements VirtualThreadScheduler {
1559 
1560         BuiltinThreadPoolExecutorScheduler(int maxPoolSize) {
1561             ThreadFactory factory = task -> {
1562                 Thread t = InnocuousThread.newThread(task);
1563                 t.setDaemon(true);
1564                 return t;
1565             };
1566             super(maxPoolSize, maxPoolSize,
1567                     0L, SECONDS,
1568                     new LinkedTransferQueue<>(),
1569                     factory);
1570         }
1571 
1572         @Override
1573         public void onStart(VirtualThreadTask task) {
1574             execute(task);
1575         }
1576 
1577         @Override
1578         public void onContinue(VirtualThreadTask task) {
1579             execute(task);
1580         }
1581     }
1582 
1583     /**
1584      * Wraps the scheduler to avoid leaking a direct reference to built-in scheduler.
1585      */
1586     static VirtualThreadScheduler createExternalView(VirtualThreadScheduler delegate) {
1587         return new VirtualThreadScheduler() {
1588             private void check(VirtualThreadTask task) {
1589                 var vthread = (VirtualThread) task.thread();
1590                 VirtualThreadScheduler scheduler = vthread.scheduler;
1591                 if (scheduler != this && scheduler != DEFAULT_SCHEDULER) {
1592                     throw new IllegalArgumentException();
1593                 }
1594             }
1595             @Override
1596             public void onStart(VirtualThreadTask task) {
1597                 check(task);
1598                 delegate.onStart(task);
1599             }
1600             @Override
1601             public void onContinue(VirtualThreadTask task) {
1602                 check(task);
1603                 delegate.onContinue(task);
1604             }
1605             @Override
1606             public String toString() {
1607                 return delegate.toString();
1608             }
1609         };
1610     }
1611 
1612     /**
1613      * Schedule a runnable task to run after a delay.
1614      */
1615     private Future<?> schedule(Runnable command, long delay, TimeUnit unit) {
1616         if (USE_STPE) {


1617             return DelayedTaskSchedulers.schedule(command, delay, unit);
1618         } else {
1619             return scheduler.schedule(command, delay, unit);
1620         }
1621     }
1622 
1623     /**
1624      * Supports scheduling a runnable task to run after a delay. It uses a number
1625      * of ScheduledThreadPoolExecutor instances to reduce contention on the delayed
1626      * work queue used. This class is used when using a custom scheduler.
1627      */
1628     static class DelayedTaskSchedulers {
1629         private static final ScheduledExecutorService[] INSTANCE = createDelayedTaskSchedulers();
1630 
1631         static Future<?> schedule(Runnable command, long delay, TimeUnit unit) {
1632             long tid = Thread.currentThread().threadId();
1633             int index = (int) tid & (INSTANCE.length - 1);
1634             return INSTANCE[index].schedule(command, delay, unit);
1635         }
1636 
1637         private static ScheduledExecutorService[] createDelayedTaskSchedulers() {
1638             String propName = "jdk.virtualThreadScheduler.timerQueues";
1639             String propValue = System.getProperty(propName);
1640             int queueCount;
1641             if (propValue != null) {
1642                 queueCount = Integer.parseInt(propValue);
1643                 if (queueCount != Integer.highestOneBit(queueCount)) {
1644                     throw new RuntimeException("Value of " + propName + " must be power of 2");
1645                 }
1646             } else {
1647                 int ncpus = Runtime.getRuntime().availableProcessors();
1648                 queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1);
< prev index next >