< prev index next >

src/java.base/share/classes/java/lang/VirtualThread.java

Print this page

   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.lang;
  26 



  27 import java.util.Locale;
  28 import java.util.Objects;
  29 import java.util.concurrent.Executor;
  30 import java.util.concurrent.Executors;
  31 import java.util.concurrent.ForkJoinPool;
  32 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
  33 import java.util.concurrent.ForkJoinTask;
  34 import java.util.concurrent.Future;

  35 import java.util.concurrent.RejectedExecutionException;
  36 import java.util.concurrent.ScheduledExecutorService;

  37 import java.util.concurrent.ScheduledThreadPoolExecutor;


  38 import java.util.concurrent.TimeUnit;
  39 import jdk.internal.event.VirtualThreadEndEvent;
  40 import jdk.internal.event.VirtualThreadStartEvent;
  41 import jdk.internal.event.VirtualThreadSubmitFailedEvent;

  42 import jdk.internal.misc.CarrierThread;
  43 import jdk.internal.misc.InnocuousThread;
  44 import jdk.internal.misc.Unsafe;
  45 import jdk.internal.vm.Continuation;
  46 import jdk.internal.vm.ContinuationScope;
  47 import jdk.internal.vm.StackableScope;
  48 import jdk.internal.vm.ThreadContainer;
  49 import jdk.internal.vm.ThreadContainers;
  50 import jdk.internal.vm.annotation.ChangesCurrentThread;
  51 import jdk.internal.vm.annotation.Hidden;
  52 import jdk.internal.vm.annotation.IntrinsicCandidate;
  53 import jdk.internal.vm.annotation.JvmtiHideEvents;
  54 import jdk.internal.vm.annotation.JvmtiMountTransition;
  55 import jdk.internal.vm.annotation.ReservedStackAccess;
  56 import sun.nio.ch.Interruptible;
  57 import static java.util.concurrent.TimeUnit.*;
  58 
  59 /**
  60  * A thread that is scheduled by the Java virtual machine rather than the operating system.
  61  */
  62 final class VirtualThread extends BaseVirtualThread {
  63     private static final Unsafe U = Unsafe.getUnsafe();
  64     private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
  65     private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();






















  66 
  67     private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
  68     private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
  69     private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
  70     private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
  71 
  72     // scheduler and continuation
  73     private final Executor scheduler;
  74     private final Continuation cont;
  75     private final Runnable runContinuation;
  76 
  77     // virtual thread state, accessed by VM
  78     private volatile int state;
  79 
  80     /*
  81      * Virtual thread state transitions:
  82      *
  83      *      NEW -> STARTED         // Thread.start, schedule to run
  84      *  STARTED -> TERMINATED      // failed to start
  85      *  STARTED -> RUNNING         // first run
  86      *  RUNNING -> TERMINATED      // done
  87      *
  88      *  RUNNING -> PARKING         // Thread parking with LockSupport.park
  89      *  PARKING -> PARKED          // cont.yield successful, parked indefinitely
  90      *   PARKED -> UNPARKED        // unparked, may be scheduled to continue
  91      * UNPARKED -> RUNNING         // continue execution after park
  92      *
  93      *  PARKING -> RUNNING         // cont.yield failed, need to park on carrier
  94      *  RUNNING -> PINNED          // park on carrier
  95      *   PINNED -> RUNNING         // unparked, continue execution on same carrier

 169 
 170     // true when waiting in Object.wait, false for VM internal uninterruptible Object.wait
 171     private volatile boolean interruptibleWait;
 172 
 173     // timed-wait support
 174     private byte timedWaitSeqNo;
 175 
 176     // timeout for timed-park and timed-wait, only accessed on current/carrier thread
 177     private long timeout;
 178 
 179     // timer task for timed-park and timed-wait, only accessed on current/carrier thread
 180     private Future<?> timeoutTask;
 181 
 182     // carrier thread when mounted, accessed by VM
 183     private volatile Thread carrierThread;
 184 
 185     // true to notifyAll after this virtual thread terminates
 186     private volatile boolean notifyAllAfterTerminate;
 187 
 188     /**
 189      * Returns the default scheduler.

 190      */
 191     static Executor defaultScheduler() {







 192         return DEFAULT_SCHEDULER;
 193     }
 194 
 195     /**
 196      * Returns the continuation scope used for virtual threads.
 197      */
 198     static ContinuationScope continuationScope() {
 199         return VTHREAD_SCOPE;
 200     }
 201 
 202     /**
 203      * Creates a new {@code VirtualThread} to run the given task with the given
 204      * scheduler. If the given scheduler is {@code null} and the current thread
 205      * is a platform thread then the newly created virtual thread will use the
 206      * default scheduler. If given scheduler is {@code null} and the current
 207      * thread is a virtual thread then the current thread's scheduler is used.



 208      *
 209      * @param scheduler the scheduler or null

 210      * @param name thread name
 211      * @param characteristics characteristics
 212      * @param task the task to execute
 213      */
 214     VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {




 215         super(name, characteristics, /*bound*/ false);
 216         Objects.requireNonNull(task);
 217 
 218         // choose scheduler if not specified
 219         if (scheduler == null) {
 220             Thread parent = Thread.currentThread();
 221             if (parent instanceof VirtualThread vparent) {
 222                 scheduler = vparent.scheduler;
 223             } else {
 224                 scheduler = DEFAULT_SCHEDULER;
 225             }
 226         }
 227 
 228         this.scheduler = scheduler;
 229         this.cont = new VThreadContinuation(this, task);
 230         this.runContinuation = this::runContinuation;





























































 231     }
 232 
 233     /**
 234      * The continuation that a virtual thread executes.
 235      */
 236     private static class VThreadContinuation extends Continuation {
 237         VThreadContinuation(VirtualThread vthread, Runnable task) {
 238             super(VTHREAD_SCOPE, wrap(vthread, task));
 239         }
 240         @Override
 241         protected void onPinned(Continuation.Pinned reason) {
 242         }
 243         private static Runnable wrap(VirtualThread vthread, Runnable task) {
 244             return new Runnable() {
 245                 @Hidden
 246                 @JvmtiHideEvents
 247                 public void run() {
 248                     vthread.endFirstTransition();
 249                     try {
 250                         vthread.run(task);

 298             if (cont.isDone()) {
 299                 afterDone();
 300             } else {
 301                 afterYield();
 302             }
 303         }
 304     }
 305 
 306     /**
 307      * Cancel timeout task when continuing after timed-park or timed-wait.
 308      * The timeout task may be executing, or may have already completed.
 309      */
 310     private void cancelTimeoutTask() {
 311         if (timeoutTask != null) {
 312             timeoutTask.cancel(false);
 313             timeoutTask = null;
 314         }
 315     }
 316 
 317     /**
 318      * Submits the given task to the given executor. If the scheduler is a
 319      * ForkJoinPool then the task is first adapted to a ForkJoinTask.
 320      */
 321     private void submit(Executor executor, Runnable task) {
 322         if (executor instanceof ForkJoinPool pool) {
 323             pool.submit(ForkJoinTask.adapt(task));
 324         } else {
 325             executor.execute(task);
 326         }
 327     }
 328 
 329     /**
 330      * Submits the runContinuation task to the scheduler. For the default scheduler,
 331      * and calling it on a worker thread, the task will be pushed to the local queue,
 332      * otherwise it will be pushed to an external submission queue.
 333      * @param scheduler the scheduler
 334      * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown
 335      * @throws RejectedExecutionException
 336      */
 337     private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) {
 338         boolean done = false;
 339         while (!done) {
 340             try {
 341                 // Pin the continuation to prevent the virtual thread from unmounting
 342                 // when submitting a task. For the default scheduler this ensures that
 343                 // the carrier doesn't change when pushing a task. For other schedulers
 344                 // it avoids deadlock that could arise due to carriers and virtual
 345                 // threads contending for a lock.
 346                 if (currentThread().isVirtual()) {
 347                     Continuation.pin();
 348                     try {
 349                         submit(scheduler, runContinuation);
 350                     } finally {
 351                         Continuation.unpin();
 352                     }
 353                 } else {
 354                     submit(scheduler, runContinuation);
 355                 }
 356                 done = true;
 357             } catch (RejectedExecutionException ree) {
 358                 submitFailed(ree);
 359                 throw ree;
 360             } catch (OutOfMemoryError e) {
 361                 if (retryOnOOME) {
 362                     U.park(false, 100_000_000); // 100ms
 363                 } else {
 364                     throw e;
 365                 }
 366             }
 367         }
 368     }
 369 
 370     /**
 371      * Submits the runContinuation task to the given scheduler as an external submit.
 372      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
 373      * @throws RejectedExecutionException
 374      * @see ForkJoinPool#externalSubmit(ForkJoinTask)
 375      */
 376     private void externalSubmitRunContinuation(ForkJoinPool pool) {
 377         assert Thread.currentThread() instanceof CarrierThread;
 378         try {
 379             pool.externalSubmit(ForkJoinTask.adapt(runContinuation));
 380         } catch (RejectedExecutionException ree) {
 381             submitFailed(ree);
 382             throw ree;
 383         } catch (OutOfMemoryError e) {
 384             submitRunContinuation(pool, true);
 385         }
 386     }
 387 
 388     /**
 389      * Submits the runContinuation task to the scheduler. For the default scheduler,
 390      * and calling it on a worker thread, the task will be pushed to the local queue,
 391      * otherwise it will be pushed to an external submission queue.
 392      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
 393      * @throws RejectedExecutionException
 394      */
 395     private void submitRunContinuation() {
 396         submitRunContinuation(scheduler, true);
 397     }
 398 
 399     /**
 400      * Lazy submit the runContinuation task if invoked on a carrier thread and its local
 401      * queue is empty. If not empty, or invoked by another thread, then this method works
 402      * like submitRunContinuation and just submits the task to the scheduler.
 403      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
 404      * @throws RejectedExecutionException
 405      * @see ForkJoinPool#lazySubmit(ForkJoinTask)
 406      */
 407     private void lazySubmitRunContinuation() {

 408         if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
 409             ForkJoinPool pool = ct.getPool();
 410             try {
 411                 pool.lazySubmit(ForkJoinTask.adapt(runContinuation));
 412             } catch (RejectedExecutionException ree) {
 413                 submitFailed(ree);
 414                 throw ree;
 415             } catch (OutOfMemoryError e) {
 416                 submitRunContinuation();
 417             }
 418         } else {
 419             submitRunContinuation();
 420         }
 421     }
 422 
 423     /**
 424      * Submits the runContinuation task to the scheduler. For the default scheduler, and
 425      * calling it a virtual thread that uses the default scheduler, the task will be
 426      * pushed to an external submission queue. This method may throw OutOfMemoryError.

 427      * @throws RejectedExecutionException
 428      * @throws OutOfMemoryError
 429      */
 430     private void externalSubmitRunContinuationOrThrow() {
 431         if (scheduler == DEFAULT_SCHEDULER && currentCarrierThread() instanceof CarrierThread ct) {

 432             try {
 433                 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation));
 434             } catch (RejectedExecutionException ree) {
 435                 submitFailed(ree);
 436                 throw ree;


 437             }
 438         } else {
 439             submitRunContinuation(scheduler, false);
 440         }
 441     }
 442 
 443     /**
 444      * If enabled, emits a JFR VirtualThreadSubmitFailedEvent.
 445      */
 446     private void submitFailed(RejectedExecutionException ree) {
 447         var event = new VirtualThreadSubmitFailedEvent();
 448         if (event.isEnabled()) {
 449             event.javaThreadId = threadId();
 450             event.exceptionMessage = ree.getMessage();
 451             event.commit();
 452         }
 453     }
 454 
 455     /**
 456      * Runs a task in the context of this virtual thread.
 457      */
 458     private void run(Runnable task) {
 459         assert Thread.currentThread() == this && state == RUNNING;

 576                 long timeout = this.timeout;
 577                 assert timeout > 0;
 578                 timeoutTask = schedule(this::parkTimeoutExpired, timeout, NANOSECONDS);
 579                 setState(newState = TIMED_PARKED);
 580             }
 581 
 582             // may have been unparked while parking
 583             if (parkPermit && compareAndSetState(newState, UNPARKED)) {
 584                 // lazy submit if local queue is empty
 585                 lazySubmitRunContinuation();
 586             }
 587             return;
 588         }
 589 
 590         // Thread.yield
 591         if (s == YIELDING) {
 592             setState(YIELDED);
 593 
 594             // external submit if there are no tasks in the local task queue
 595             if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
 596                 externalSubmitRunContinuation(ct.getPool());
 597             } else {
 598                 submitRunContinuation();
 599             }
 600             return;
 601         }
 602 
 603         // blocking on monitorenter
 604         if (s == BLOCKING) {
 605             setState(BLOCKED);
 606 
 607             // may have been unblocked while blocking
 608             if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
 609                 // lazy submit if local queue is empty
 610                 lazySubmitRunContinuation();
 611             }
 612             return;
 613         }
 614 
 615         // Object.wait
 616         if (s == WAITING || s == TIMED_WAITING) {

 681                 notifyAll();
 682             }
 683         }
 684 
 685         // notify container
 686         if (notifyContainer) {
 687             threadContainer().remove(this);
 688         }
 689 
 690         // clear references to thread locals
 691         clearReferences();
 692     }
 693 
 694     /**
 695      * Schedules this {@code VirtualThread} to execute.
 696      *
 697      * @throws IllegalStateException if the container is shutdown or closed
 698      * @throws IllegalThreadStateException if the thread has already been started
 699      * @throws RejectedExecutionException if the scheduler cannot accept a task
 700      */
 701     @Override
 702     void start(ThreadContainer container) {
 703         if (!compareAndSetState(NEW, STARTED)) {
 704             throw new IllegalThreadStateException("Already started");
 705         }
 706 
 707         // bind thread to container
 708         assert threadContainer() == null;
 709         setThreadContainer(container);
 710 
 711         // start thread
 712         boolean addedToContainer = false;
 713         boolean started = false;
 714         try {
 715             container.add(this);  // may throw
 716             addedToContainer = true;
 717 
 718             // scoped values may be inherited
 719             inheritScopedValueBindings(container);
 720 
 721             // submit task to run thread, using externalSubmit if possible
 722             externalSubmitRunContinuationOrThrow();


























 723             started = true;
 724         } finally {
 725             if (!started) {
 726                 afterDone(addedToContainer);
 727             }
 728         }
 729     }
 730 





 731     @Override
 732     public void start() {
 733         start(ThreadContainers.root());







 734     }
 735 
 736     @Override
 737     public void run() {
 738         // do nothing
 739     }
 740 
 741     /**
 742      * Invoked by Thread.join before a thread waits for this virtual thread to terminate.
 743      */
 744     void beforeJoin() {
 745         notifyAllAfterTerminate = true;
 746     }
 747 
 748     /**
 749      * Parks until unparked or interrupted. If already unparked then the parking
 750      * permit is consumed and this method completes immediately (meaning it doesn't
 751      * yield). It also completes immediately if the interrupted status is set.
 752      */
 753     @Override

 857      * Call into VM when pinned to record a JFR jdk.VirtualThreadPinned event.
 858      * Recording the event in the VM avoids having JFR event recorded in Java
 859      * with the same name, but different ID, to events recorded by the VM.
 860      */
 861     @Hidden
 862     private static native void postPinnedEvent(String op);
 863 
 864     /**
 865      * Re-enables this virtual thread for scheduling. If this virtual thread is parked
 866      * then its task is scheduled to continue, otherwise its next call to {@code park} or
 867      * {@linkplain #parkNanos(long) parkNanos} is guaranteed not to block.
 868      * @param lazySubmit to use lazySubmit if possible
 869      * @throws RejectedExecutionException if the scheduler cannot accept a task
 870      */
 871     private void unpark(boolean lazySubmit) {
 872         if (!getAndSetParkPermit(true) && currentThread() != this) {
 873             int s = state();
 874 
 875             // unparked while parked
 876             if ((s == PARKED || s == TIMED_PARKED) && compareAndSetState(s, UNPARKED)) {
 877                 if (lazySubmit) {
 878                     lazySubmitRunContinuation();
 879                 } else {
 880                     submitRunContinuation();













 881                 }


 882                 return;
 883             }
 884 
 885             // unparked while parked when pinned
 886             if (s == PINNED || s == TIMED_PINNED) {
 887                 // unpark carrier thread when pinned
 888                 disableSuspendAndPreempt();
 889                 try {
 890                     synchronized (carrierThreadAccessLock()) {
 891                         Thread carrier = carrierThread;
 892                         if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) {
 893                             U.unpark(carrier);
 894                         }
 895                     }
 896                 } finally {
 897                     enableSuspendAndPreempt();
 898                 }
 899                 return;
 900             }
 901         }
 902     }
 903 
 904     @Override
 905     void unpark() {
 906         unpark(false);
 907     }
 908 





 909     /**
 910      * Invoked by unblocker thread to unblock this virtual thread.
 911      */
 912     private void unblock() {
 913         assert !Thread.currentThread().isVirtual();
 914         blockPermit = true;
 915         if (state() == BLOCKED && compareAndSetState(BLOCKED, UNBLOCKED)) {
 916             submitRunContinuation();
 917         }
 918     }
 919 
 920     /**
 921      * Invoked by FJP worker thread or STPE thread when park timeout expires.
 922      */
 923     private void parkTimeoutExpired() {
 924         assert !VirtualThread.currentThread().isVirtual();
 925         unpark(true);
 926     }
 927 
 928     /**

1308     @IntrinsicCandidate
1309     @JvmtiMountTransition
1310     private native void startTransition(boolean mount);
1311 
1312     @IntrinsicCandidate
1313     @JvmtiMountTransition
1314     private native void endTransition(boolean mount);
1315 
1316     @IntrinsicCandidate
1317     private static native void notifyJvmtiDisableSuspend(boolean enter);
1318 
1319     private static native void registerNatives();
1320     static {
1321         registerNatives();
1322 
1323         // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
1324         var group = Thread.virtualThreadGroup();
1325     }
1326 
1327     /**
1328      * Creates the default ForkJoinPool scheduler.




























1329      */
1330     private static ForkJoinPool createDefaultScheduler() {
1331         ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool);
1332         int parallelism, maxPoolSize, minRunnable;
1333         String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
1334         String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
1335         String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
1336         if (parallelismValue != null) {
1337             parallelism = Integer.parseInt(parallelismValue);
1338         } else {
1339             parallelism = Runtime.getRuntime().availableProcessors();
1340         }
1341         if (maxPoolSizeValue != null) {
1342             maxPoolSize = Integer.parseInt(maxPoolSizeValue);
1343             parallelism = Integer.min(parallelism, maxPoolSize);
1344         } else {
1345             maxPoolSize = Integer.max(parallelism, 256);
1346         }
1347         if (minRunnableValue != null) {
1348             minRunnable = Integer.parseInt(minRunnableValue);
1349         } else {
1350             minRunnable = Integer.max(parallelism / 2, 1);
1351         }
1352         Thread.UncaughtExceptionHandler handler = (t, e) -> { };
1353         boolean asyncMode = true; // FIFO
1354         return new ForkJoinPool(parallelism, factory, handler, asyncMode,
1355                      0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);



























































































1356     }
1357 
1358     /**
1359      * Schedule a runnable task to run after a delay.
1360      */
1361     private Future<?> schedule(Runnable command, long delay, TimeUnit unit) {
1362         if (scheduler instanceof ForkJoinPool pool) {
1363             return pool.schedule(command, delay, unit);
1364         } else {
1365             return DelayedTaskSchedulers.schedule(command, delay, unit);


1366         }
1367     }
1368 
1369     /**
1370      * Supports scheduling a runnable task to run after a delay. It uses a number
1371      * of ScheduledThreadPoolExecutor instances to reduce contention on the delayed
1372      * work queue used. This class is used when using a custom scheduler.
1373      */
1374     private static class DelayedTaskSchedulers {
1375         private static final ScheduledExecutorService[] INSTANCE = createDelayedTaskSchedulers();
1376 
1377         static Future<?> schedule(Runnable command, long delay, TimeUnit unit) {
1378             long tid = Thread.currentThread().threadId();
1379             int index = (int) tid & (INSTANCE.length - 1);
1380             return INSTANCE[index].schedule(command, delay, unit);
1381         }
1382 
1383         private static ScheduledExecutorService[] createDelayedTaskSchedulers() {
1384             String propName = "jdk.virtualThreadScheduler.timerQueues";
1385             String propValue = System.getProperty(propName);
1386             int queueCount;
1387             if (propValue != null) {
1388                 queueCount = Integer.parseInt(propValue);
1389                 if (queueCount != Integer.highestOneBit(queueCount)) {
1390                     throw new RuntimeException("Value of " + propName + " must be power of 2");
1391                 }
1392             } else {
1393                 int ncpus = Runtime.getRuntime().availableProcessors();
1394                 queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1);

   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.lang;
  26 
  27 import java.lang.invoke.MethodHandles;
  28 import java.lang.invoke.VarHandle;
  29 import java.lang.reflect.Constructor;
  30 import java.util.Locale;
  31 import java.util.Objects;

  32 import java.util.concurrent.Executors;
  33 import java.util.concurrent.ForkJoinPool;

  34 import java.util.concurrent.ForkJoinTask;
  35 import java.util.concurrent.Future;
  36 import java.util.concurrent.LinkedTransferQueue;
  37 import java.util.concurrent.RejectedExecutionException;
  38 import java.util.concurrent.ScheduledExecutorService;
  39 import java.util.concurrent.ScheduledFuture;
  40 import java.util.concurrent.ScheduledThreadPoolExecutor;
  41 import java.util.concurrent.ThreadFactory;
  42 import java.util.concurrent.ThreadPoolExecutor;
  43 import java.util.concurrent.TimeUnit;
  44 import jdk.internal.event.VirtualThreadEndEvent;
  45 import jdk.internal.event.VirtualThreadStartEvent;
  46 import jdk.internal.event.VirtualThreadSubmitFailedEvent;
  47 import jdk.internal.invoke.MhUtil;
  48 import jdk.internal.misc.CarrierThread;
  49 import jdk.internal.misc.InnocuousThread;
  50 import jdk.internal.misc.Unsafe;
  51 import jdk.internal.vm.Continuation;
  52 import jdk.internal.vm.ContinuationScope;
  53 import jdk.internal.vm.StackableScope;
  54 import jdk.internal.vm.ThreadContainer;
  55 import jdk.internal.vm.ThreadContainers;
  56 import jdk.internal.vm.annotation.ChangesCurrentThread;
  57 import jdk.internal.vm.annotation.Hidden;
  58 import jdk.internal.vm.annotation.IntrinsicCandidate;
  59 import jdk.internal.vm.annotation.JvmtiHideEvents;
  60 import jdk.internal.vm.annotation.JvmtiMountTransition;
  61 import jdk.internal.vm.annotation.ReservedStackAccess;
  62 import sun.nio.ch.Interruptible;
  63 import static java.util.concurrent.TimeUnit.*;
  64 
  65 /**
  66  * A thread that is scheduled by the Java virtual machine rather than the operating system.
  67  */
  68 final class VirtualThread extends BaseVirtualThread {
  69     private static final Unsafe U = Unsafe.getUnsafe();
  70     private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
  71 
  72     private static final VirtualThreadScheduler BUILTIN_SCHEDULER;
  73     private static final VirtualThreadScheduler DEFAULT_SCHEDULER;
  74     private static final VirtualThreadScheduler EXTERNAL_VIEW;
  75     private static final boolean USE_STPE;
  76     static {
  77         // experimental
  78         String propValue = System.getProperty("jdk.virtualThreadScheduler.implClass");
  79         if (propValue != null) {
  80             VirtualThreadScheduler builtinScheduler = createBuiltinScheduler(true);
  81             VirtualThreadScheduler externalView = createExternalView(builtinScheduler);
  82             VirtualThreadScheduler defaultScheduler = loadCustomScheduler(externalView, propValue);
  83             BUILTIN_SCHEDULER = builtinScheduler;
  84             DEFAULT_SCHEDULER = defaultScheduler;
  85             EXTERNAL_VIEW = externalView;
  86         } else {
  87             var builtinScheduler = createBuiltinScheduler(false);
  88             BUILTIN_SCHEDULER = builtinScheduler;
  89             DEFAULT_SCHEDULER = builtinScheduler;
  90             EXTERNAL_VIEW = createExternalView(builtinScheduler);
  91         }
  92         USE_STPE = Boolean.getBoolean("jdk.virtualThreadScheduler.useSTPE");
  93     }
  94 
  95     private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
  96     private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
  97     private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
  98     private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
  99 
 100     // scheduler and continuation
 101     private final VirtualThreadScheduler scheduler;
 102     private final Continuation cont;
 103     private final VThreadTask runContinuation;
 104 
 105     // virtual thread state, accessed by VM
 106     private volatile int state;
 107 
 108     /*
 109      * Virtual thread state transitions:
 110      *
 111      *      NEW -> STARTED         // Thread.start, schedule to run
 112      *  STARTED -> TERMINATED      // failed to start
 113      *  STARTED -> RUNNING         // first run
 114      *  RUNNING -> TERMINATED      // done
 115      *
 116      *  RUNNING -> PARKING         // Thread parking with LockSupport.park
 117      *  PARKING -> PARKED          // cont.yield successful, parked indefinitely
 118      *   PARKED -> UNPARKED        // unparked, may be scheduled to continue
 119      * UNPARKED -> RUNNING         // continue execution after park
 120      *
 121      *  PARKING -> RUNNING         // cont.yield failed, need to park on carrier
 122      *  RUNNING -> PINNED          // park on carrier
 123      *   PINNED -> RUNNING         // unparked, continue execution on same carrier

 197 
 198     // true when waiting in Object.wait, false for VM internal uninterruptible Object.wait
 199     private volatile boolean interruptibleWait;
 200 
 201     // timed-wait support
 202     private byte timedWaitSeqNo;
 203 
 204     // timeout for timed-park and timed-wait, only accessed on current/carrier thread
 205     private long timeout;
 206 
 207     // timer task for timed-park and timed-wait, only accessed on current/carrier thread
 208     private Future<?> timeoutTask;
 209 
 210     // carrier thread when mounted, accessed by VM
 211     private volatile Thread carrierThread;
 212 
 213     // true to notifyAll after this virtual thread terminates
 214     private volatile boolean notifyAllAfterTerminate;
 215 
 216     /**
 217      * Return the built-in scheduler.
 218      * @param trusted true if caller is trusted, false if not trusted
 219      */
 220     static VirtualThreadScheduler builtinScheduler(boolean trusted) {
 221         return trusted ? BUILTIN_SCHEDULER : EXTERNAL_VIEW;
 222     }
 223 
 224     /**
 225      * Returns the default scheduler, usually the same as the built-in scheduler.
 226      */
 227     static VirtualThreadScheduler defaultScheduler() {
 228         return DEFAULT_SCHEDULER;
 229     }
 230 
 231     /**
 232      * Returns the continuation scope used for virtual threads.
 233      */
 234     static ContinuationScope continuationScope() {
 235         return VTHREAD_SCOPE;
 236     }
 237 
 238     /**
 239      * Returns the task to start/continue this virtual thread.
 240      */
 241     VirtualThreadTask virtualThreadTask() {
 242         return runContinuation;
 243     }
 244 
 245     /**
 246      * Creates a new {@code VirtualThread} to run the given task with the given scheduler.
 247      *
 248      * @param scheduler the scheduler or null for default scheduler
 249      * @param preferredCarrier the preferred carrier or null
 250      * @param name thread name
 251      * @param characteristics characteristics
 252      * @param task the task to execute
 253      */
 254     VirtualThread(VirtualThreadScheduler scheduler,
 255                   Thread preferredCarrier,
 256                   String name,
 257                   int characteristics,
 258                   Runnable task) {
 259         super(name, characteristics, /*bound*/ false);
 260         Objects.requireNonNull(task);
 261 
 262         // use default scheduler if not provided
 263         if (scheduler == null) {
 264             scheduler = DEFAULT_SCHEDULER;
 265         } else if (scheduler == EXTERNAL_VIEW) {
 266             throw new UnsupportedOperationException();



 267         }

 268         this.scheduler = scheduler;
 269         this.cont = new VThreadContinuation(this, task);
 270 
 271         if (scheduler == BUILTIN_SCHEDULER) {
 272             this.runContinuation = new VThreadTask(this);
 273         } else {
 274             this.runContinuation = new CustomVThreadTask(this, preferredCarrier);
 275         }
 276     }
 277 
 278     /**
 279      * The task to start/continue a virtual thread.
 280      */
 281     static non-sealed class VThreadTask implements VirtualThreadTask {
 282         private final VirtualThread vthread;
 283         VThreadTask(VirtualThread vthread) {
 284             this.vthread = vthread;
 285         }
 286         @Override
 287         public final Thread thread() {
 288             return vthread;
 289         }
 290         @Override
 291         public final void run() {
 292             vthread.runContinuation();;
 293         }
 294         @Override
 295         public Thread preferredCarrier() {
 296             throw new UnsupportedOperationException();
 297         }
 298         @Override
 299         public Object attach(Object att) {
 300             throw new UnsupportedOperationException();
 301         }
 302         @Override
 303         public Object attachment() {
 304             throw new UnsupportedOperationException();
 305         }
 306     }
 307 
 308     /**
 309      * The task to start/continue a virtual thread when using a custom scheduler.
 310      */
 311     static final class CustomVThreadTask extends VThreadTask {
 312         private static final VarHandle ATT =
 313                 MhUtil.findVarHandle(MethodHandles.lookup(), "att", Object.class);
 314         private final Thread preferredCarrier;
 315         private volatile Object att;
 316         CustomVThreadTask(VirtualThread vthread, Thread preferredCarrier) {
 317             super(vthread);
 318             this.preferredCarrier = preferredCarrier;
 319         }
 320         @Override
 321         public Thread preferredCarrier() {
 322             return preferredCarrier;
 323         }
 324         @Override
 325         public Object attach(Object att) {
 326             return ATT.getAndSet(this, att);
 327         }
 328         @Override
 329         public Object attachment() {
 330             return att;
 331         }
 332     }
 333 
 334     /**
 335      * The continuation that a virtual thread executes.
 336      */
 337     private static class VThreadContinuation extends Continuation {
 338         VThreadContinuation(VirtualThread vthread, Runnable task) {
 339             super(VTHREAD_SCOPE, wrap(vthread, task));
 340         }
 341         @Override
 342         protected void onPinned(Continuation.Pinned reason) {
 343         }
 344         private static Runnable wrap(VirtualThread vthread, Runnable task) {
 345             return new Runnable() {
 346                 @Hidden
 347                 @JvmtiHideEvents
 348                 public void run() {
 349                     vthread.endFirstTransition();
 350                     try {
 351                         vthread.run(task);

 399             if (cont.isDone()) {
 400                 afterDone();
 401             } else {
 402                 afterYield();
 403             }
 404         }
 405     }
 406 
 407     /**
 408      * Cancel timeout task when continuing after timed-park or timed-wait.
 409      * The timeout task may be executing, or may have already completed.
 410      */
 411     private void cancelTimeoutTask() {
 412         if (timeoutTask != null) {
 413             timeoutTask.cancel(false);
 414             timeoutTask = null;
 415         }
 416     }
 417 
 418     /**
 419      * Submits the runContinuation task to the scheduler. For the built-in scheduler,
 420      * the task will be pushed to the local queue if possible, otherwise it will be
 421      * pushed to an external submission queue.













 422      * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown
 423      * @throws RejectedExecutionException
 424      */
 425     private void submitRunContinuation(boolean retryOnOOME) {
 426         boolean done = false;
 427         while (!done) {
 428             try {
 429                 // Pin the continuation to prevent the virtual thread from unmounting
 430                 // when submitting a task. For the default scheduler this ensures that
 431                 // the carrier doesn't change when pushing a task. For other schedulers
 432                 // it avoids deadlock that could arise due to carriers and virtual
 433                 // threads contending for a lock.
 434                 if (currentThread().isVirtual()) {
 435                     Continuation.pin();
 436                     try {
 437                         scheduler.onContinue(runContinuation);
 438                     } finally {
 439                         Continuation.unpin();
 440                     }
 441                 } else {
 442                     scheduler.onContinue(runContinuation);
 443                 }
 444                 done = true;
 445             } catch (RejectedExecutionException ree) {
 446                 submitFailed(ree);
 447                 throw ree;
 448             } catch (OutOfMemoryError e) {
 449                 if (retryOnOOME) {
 450                     U.park(false, 100_000_000); // 100ms
 451                 } else {
 452                     throw e;
 453                 }
 454             }
 455         }
 456     }
 457 


















 458     /**
 459      * Submits the runContinuation task to the scheduler. For the default scheduler,
 460      * and calling it on a worker thread, the task will be pushed to the local queue,
 461      * otherwise it will be pushed to an external submission queue.
 462      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
 463      * @throws RejectedExecutionException
 464      */
 465     private void submitRunContinuation() {
 466         submitRunContinuation(true);
 467     }
 468 
 469     /**
 470      * Invoked from a carrier thread to lazy submit the runContinuation task to the
 471      * carrier's local queue if the queue is empty. If not empty, or invoked by a thread
 472      * for a custom scheduler, then it just submits the task to the scheduler.
 473      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
 474      * @throws RejectedExecutionException
 475      * @see ForkJoinPool#lazySubmit(ForkJoinTask)
 476      */
 477     private void lazySubmitRunContinuation() {
 478         assert !currentThread().isVirtual();
 479         if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {

 480             try {
 481                 ct.getPool().lazySubmit(ForkJoinTask.adapt(runContinuation));
 482             } catch (RejectedExecutionException ree) {
 483                 submitFailed(ree);
 484                 throw ree;
 485             } catch (OutOfMemoryError e) {
 486                 submitRunContinuation();
 487             }
 488         } else {
 489             submitRunContinuation();
 490         }
 491     }
 492 
 493     /**
 494      * Invoked from a carrier thread to externally submit the runContinuation task to the
 495      * scheduler. If invoked by a thread for a custom scheduler, then it just submits the
 496      * task to the scheduler.
 497      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
 498      * @throws RejectedExecutionException
 499      * @see ForkJoinPool#externalSubmit(ForkJoinTask)
 500      */
 501     private void externalSubmitRunContinuation() {
 502         assert !currentThread().isVirtual();
 503         if (currentThread() instanceof CarrierThread ct) {
 504             try {
 505                 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation));
 506             } catch (RejectedExecutionException ree) {
 507                 submitFailed(ree);
 508                 throw ree;
 509             } catch (OutOfMemoryError e) {
 510                 submitRunContinuation();
 511             }
 512         } else {
 513             submitRunContinuation();
 514         }
 515     }
 516 
 517     /**
 518      * If enabled, emits a JFR VirtualThreadSubmitFailedEvent.
 519      */
 520     private void submitFailed(RejectedExecutionException ree) {
 521         var event = new VirtualThreadSubmitFailedEvent();
 522         if (event.isEnabled()) {
 523             event.javaThreadId = threadId();
 524             event.exceptionMessage = ree.getMessage();
 525             event.commit();
 526         }
 527     }
 528 
 529     /**
 530      * Runs a task in the context of this virtual thread.
 531      */
 532     private void run(Runnable task) {
 533         assert Thread.currentThread() == this && state == RUNNING;

 650                 long timeout = this.timeout;
 651                 assert timeout > 0;
 652                 timeoutTask = schedule(this::parkTimeoutExpired, timeout, NANOSECONDS);
 653                 setState(newState = TIMED_PARKED);
 654             }
 655 
 656             // may have been unparked while parking
 657             if (parkPermit && compareAndSetState(newState, UNPARKED)) {
 658                 // lazy submit if local queue is empty
 659                 lazySubmitRunContinuation();
 660             }
 661             return;
 662         }
 663 
 664         // Thread.yield
 665         if (s == YIELDING) {
 666             setState(YIELDED);
 667 
 668             // external submit if there are no tasks in the local task queue
 669             if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
 670                 externalSubmitRunContinuation();
 671             } else {
 672                 submitRunContinuation();
 673             }
 674             return;
 675         }
 676 
 677         // blocking on monitorenter
 678         if (s == BLOCKING) {
 679             setState(BLOCKED);
 680 
 681             // may have been unblocked while blocking
 682             if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
 683                 // lazy submit if local queue is empty
 684                 lazySubmitRunContinuation();
 685             }
 686             return;
 687         }
 688 
 689         // Object.wait
 690         if (s == WAITING || s == TIMED_WAITING) {

 755                 notifyAll();
 756             }
 757         }
 758 
 759         // notify container
 760         if (notifyContainer) {
 761             threadContainer().remove(this);
 762         }
 763 
 764         // clear references to thread locals
 765         clearReferences();
 766     }
 767 
 768     /**
 769      * Schedules this {@code VirtualThread} to execute.
 770      *
 771      * @throws IllegalStateException if the container is shutdown or closed
 772      * @throws IllegalThreadStateException if the thread has already been started
 773      * @throws RejectedExecutionException if the scheduler cannot accept a task
 774      */
 775     private void start(ThreadContainer container, boolean lazy) {

 776         if (!compareAndSetState(NEW, STARTED)) {
 777             throw new IllegalThreadStateException("Already started");
 778         }
 779 
 780         // bind thread to container
 781         assert threadContainer() == null;
 782         setThreadContainer(container);
 783 
 784         // start thread
 785         boolean addedToContainer = false;
 786         boolean started = false;
 787         try {
 788             container.add(this);  // may throw
 789             addedToContainer = true;
 790 
 791             // scoped values may be inherited
 792             inheritScopedValueBindings(container);
 793 
 794             // submit task to schedule
 795             try {
 796                 if (currentThread().isVirtual()) {
 797                     Continuation.pin();
 798                     try {
 799                         if (scheduler == BUILTIN_SCHEDULER
 800                                 && currentCarrierThread() instanceof CarrierThread ct) {
 801                             ForkJoinPool pool = ct.getPool();
 802                             ForkJoinTask<?> task = ForkJoinTask.adapt(runContinuation);
 803                             if (lazy) {
 804                                 pool.lazySubmit(task);
 805                             } else {
 806                                 pool.externalSubmit(task);
 807                             }
 808                         } else {
 809                             scheduler.onStart(runContinuation);
 810                         }
 811                     } finally {
 812                         Continuation.unpin();
 813                     }
 814                 } else {
 815                     scheduler.onStart(runContinuation);
 816                 }
 817             } catch (RejectedExecutionException ree) {
 818                 submitFailed(ree);
 819                 throw ree;
 820             }
 821 
 822             started = true;
 823         } finally {
 824             if (!started) {
 825                 afterDone(addedToContainer);
 826             }
 827         }
 828     }
 829 
 830     @Override
 831     void start(ThreadContainer container) {
 832         start(container, false);
 833     }
 834 
 835     @Override
 836     public void start() {
 837         start(ThreadContainers.root(), false);
 838     }
 839 
 840     /**
 841      * Schedules this thread to begin execution without guarantee that it will execute.
 842      */
 843     void lazyStart() {
 844         start(ThreadContainers.root(), true);
 845     }
 846 
 847     @Override
 848     public void run() {
 849         // do nothing
 850     }
 851 
 852     /**
 853      * Invoked by Thread.join before a thread waits for this virtual thread to terminate.
 854      */
 855     void beforeJoin() {
 856         notifyAllAfterTerminate = true;
 857     }
 858 
 859     /**
 860      * Parks until unparked or interrupted. If already unparked then the parking
 861      * permit is consumed and this method completes immediately (meaning it doesn't
 862      * yield). It also completes immediately if the interrupted status is set.
 863      */
 864     @Override

 968      * Call into VM when pinned to record a JFR jdk.VirtualThreadPinned event.
 969      * Recording the event in the VM avoids having JFR event recorded in Java
 970      * with the same name, but different ID, to events recorded by the VM.
 971      */
 972     @Hidden
 973     private static native void postPinnedEvent(String op);
 974 
 975     /**
 976      * Re-enables this virtual thread for scheduling. If this virtual thread is parked
 977      * then its task is scheduled to continue, otherwise its next call to {@code park} or
 978      * {@linkplain #parkNanos(long) parkNanos} is guaranteed not to block.
 979      * @param lazySubmit to use lazySubmit if possible
 980      * @throws RejectedExecutionException if the scheduler cannot accept a task
 981      */
 982     private void unpark(boolean lazySubmit) {
 983         if (!getAndSetParkPermit(true) && currentThread() != this) {
 984             int s = state();
 985 
 986             // unparked while parked
 987             if ((s == PARKED || s == TIMED_PARKED) && compareAndSetState(s, UNPARKED)) {
 988 
 989                 if (lazySubmit && currentThread().isVirtual()) {
 990                     Continuation.pin();
 991                     try {
 992                         if (scheduler == BUILTIN_SCHEDULER
 993                                 && currentCarrierThread() instanceof CarrierThread ct) {
 994                             ct.getPool().lazySubmit(ForkJoinTask.adapt(runContinuation));
 995                             return;
 996                         }
 997                     } catch (RejectedExecutionException ree) {
 998                         submitFailed(ree);
 999                         throw ree;
1000                     } catch (OutOfMemoryError e) {
1001                         // fall-though
1002                     } finally {
1003                         Continuation.unpin();
1004                     }
1005                 }
1006 
1007                 submitRunContinuation();
1008                 return;
1009             }
1010 
1011             // unparked while parked when pinned
1012             if (s == PINNED || s == TIMED_PINNED) {
1013                 // unpark carrier thread when pinned
1014                 disableSuspendAndPreempt();
1015                 try {
1016                     synchronized (carrierThreadAccessLock()) {
1017                         Thread carrier = carrierThread;
1018                         if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) {
1019                             U.unpark(carrier);
1020                         }
1021                     }
1022                 } finally {
1023                     enableSuspendAndPreempt();
1024                 }
1025                 return;
1026             }
1027         }
1028     }
1029 
1030     @Override
1031     void unpark() {
1032         unpark(false);
1033     }
1034 
1035     @Override
1036     void lazyUnpark() {
1037         unpark(true);
1038     }
1039 
1040     /**
1041      * Invoked by unblocker thread to unblock this virtual thread.
1042      */
1043     private void unblock() {
1044         assert !Thread.currentThread().isVirtual();
1045         blockPermit = true;
1046         if (state() == BLOCKED && compareAndSetState(BLOCKED, UNBLOCKED)) {
1047             submitRunContinuation();
1048         }
1049     }
1050 
1051     /**
1052      * Invoked by FJP worker thread or STPE thread when park timeout expires.
1053      */
1054     private void parkTimeoutExpired() {
1055         assert !VirtualThread.currentThread().isVirtual();
1056         unpark(true);
1057     }
1058 
1059     /**

1439     @IntrinsicCandidate
1440     @JvmtiMountTransition
1441     private native void startTransition(boolean mount);
1442 
1443     @IntrinsicCandidate
1444     @JvmtiMountTransition
1445     private native void endTransition(boolean mount);
1446 
1447     @IntrinsicCandidate
1448     private static native void notifyJvmtiDisableSuspend(boolean enter);
1449 
1450     private static native void registerNatives();
1451     static {
1452         registerNatives();
1453 
1454         // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
1455         var group = Thread.virtualThreadGroup();
1456     }
1457 
1458     /**
1459      * Loads a VirtualThreadScheduler with the given class name. The class must be public
1460      * in an exported package, with public one-arg or no-arg constructor, and be visible
1461      * to the system class loader.
1462      * @param delegate the scheduler that the custom scheduler may delegate to
1463      * @param cn the class name of the custom scheduler
1464      */
1465     private static VirtualThreadScheduler loadCustomScheduler(VirtualThreadScheduler delegate, String cn) {
1466         VirtualThreadScheduler scheduler;
1467         try {
1468             Class<?> clazz = Class.forName(cn, true, ClassLoader.getSystemClassLoader());
1469             // 1-arg constructor
1470             try {
1471                 Constructor<?> ctor = clazz.getConstructor(VirtualThreadScheduler.class);
1472                 return (VirtualThreadScheduler) ctor.newInstance(delegate);
1473             } catch (NoSuchMethodException e) {
1474                 // 0-arg constructor
1475                 Constructor<?> ctor = clazz.getConstructor();
1476                 scheduler = (VirtualThreadScheduler) ctor.newInstance();
1477             }
1478         } catch (Exception ex) {
1479             throw new Error(ex);
1480         }
1481         System.err.println("WARNING: Using custom default scheduler, this is an experimental feature!");
1482         return scheduler;
1483     }
1484 
1485     /**
1486      * Creates the built-in ForkJoinPool scheduler.
1487      * @param wrapped true if wrapped by a custom default scheduler
1488      */
1489     private static VirtualThreadScheduler createBuiltinScheduler(boolean wrapped) {

1490         int parallelism, maxPoolSize, minRunnable;
1491         String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
1492         String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
1493         String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
1494         if (parallelismValue != null) {
1495             parallelism = Integer.parseInt(parallelismValue);
1496         } else {
1497             parallelism = Runtime.getRuntime().availableProcessors();
1498         }
1499         if (maxPoolSizeValue != null) {
1500             maxPoolSize = Integer.parseInt(maxPoolSizeValue);
1501             parallelism = Integer.min(parallelism, maxPoolSize);
1502         } else {
1503             maxPoolSize = Integer.max(parallelism, 256);
1504         }
1505         if (minRunnableValue != null) {
1506             minRunnable = Integer.parseInt(minRunnableValue);
1507         } else {
1508             minRunnable = Integer.max(parallelism / 2, 1);
1509         }
1510         if (Boolean.getBoolean("jdk.virtualThreadScheduler.useTPE")) {
1511             return new BuiltinThreadPoolExecutorScheduler(parallelism);
1512         } else {
1513             return new BuiltinForkJoinPoolScheduler(parallelism, maxPoolSize, minRunnable, wrapped);
1514         }
1515     }
1516 
1517     /**
1518      * The built-in ForkJoinPool scheduler.
1519      */
1520     private static class BuiltinForkJoinPoolScheduler
1521             extends ForkJoinPool implements VirtualThreadScheduler {
1522 
1523         BuiltinForkJoinPoolScheduler(int parallelism, int maxPoolSize, int minRunnable, boolean wrapped) {
1524             ForkJoinWorkerThreadFactory factory = wrapped
1525                     ? ForkJoinPool.defaultForkJoinWorkerThreadFactory
1526                     : CarrierThread::new;
1527             Thread.UncaughtExceptionHandler handler = (t, e) -> { };
1528             boolean asyncMode = true; // FIFO
1529             super(parallelism, factory, handler, asyncMode,
1530                     0, maxPoolSize, minRunnable, pool -> true, 30L, SECONDS);
1531         }
1532 
1533         @Override
1534         public void onStart(VirtualThreadTask task) {
1535             execute(ForkJoinTask.adapt(task));
1536         }
1537 
1538         @Override
1539         public void onContinue(VirtualThreadTask task) {
1540             execute(ForkJoinTask.adapt(task));
1541         }
1542 
1543         @Override
1544         public ScheduledFuture<?> schedule(Runnable task, long delay, TimeUnit unit) {
1545             return super.schedule(task, delay, unit);
1546         }
1547     }
1548 
1549     /**
1550      * Built-in ThreadPoolExecutor scheduler.
1551      */
1552     private static class BuiltinThreadPoolExecutorScheduler
1553             extends ThreadPoolExecutor implements VirtualThreadScheduler {
1554 
1555         BuiltinThreadPoolExecutorScheduler(int maxPoolSize) {
1556             ThreadFactory factory = task -> {
1557                 Thread t = InnocuousThread.newThread(task);
1558                 t.setDaemon(true);
1559                 return t;
1560             };
1561             super(maxPoolSize, maxPoolSize,
1562                     0L, SECONDS,
1563                     new LinkedTransferQueue<>(),
1564                     factory);
1565         }
1566 
1567         @Override
1568         public void onStart(VirtualThreadTask task) {
1569             execute(task);
1570         }
1571 
1572         @Override
1573         public void onContinue(VirtualThreadTask task) {
1574             execute(task);
1575         }
1576     }
1577 
1578     /**
1579      * Wraps the scheduler to avoid leaking a direct reference to built-in scheduler.
1580      */
1581     static VirtualThreadScheduler createExternalView(VirtualThreadScheduler delegate) {
1582         return new VirtualThreadScheduler() {
1583             private void check(VirtualThreadTask task) {
1584                 var vthread = (VirtualThread) task.thread();
1585                 VirtualThreadScheduler scheduler = vthread.scheduler;
1586                 if (scheduler != this && scheduler != DEFAULT_SCHEDULER) {
1587                     throw new IllegalArgumentException();
1588                 }
1589             }
1590             @Override
1591             public void onStart(VirtualThreadTask task) {
1592                 check(task);
1593                 delegate.onStart(task);
1594             }
1595             @Override
1596             public void onContinue(VirtualThreadTask task) {
1597                 check(task);
1598                 delegate.onContinue(task);
1599             }
1600             @Override
1601             public String toString() {
1602                 return delegate.toString();
1603             }
1604         };
1605     }
1606 
1607     /**
1608      * Schedule a runnable task to run after a delay.
1609      */
1610     private Future<?> schedule(Runnable command, long delay, TimeUnit unit) {
1611         if (USE_STPE) {


1612             return DelayedTaskSchedulers.schedule(command, delay, unit);
1613         } else {
1614             return scheduler.schedule(command, delay, unit);
1615         }
1616     }
1617 
1618     /**
1619      * Supports scheduling a runnable task to run after a delay. It uses a number
1620      * of ScheduledThreadPoolExecutor instances to reduce contention on the delayed
1621      * work queue used. This class is used when using a custom scheduler.
1622      */
1623     static class DelayedTaskSchedulers {
1624         private static final ScheduledExecutorService[] INSTANCE = createDelayedTaskSchedulers();
1625 
1626         static Future<?> schedule(Runnable command, long delay, TimeUnit unit) {
1627             long tid = Thread.currentThread().threadId();
1628             int index = (int) tid & (INSTANCE.length - 1);
1629             return INSTANCE[index].schedule(command, delay, unit);
1630         }
1631 
1632         private static ScheduledExecutorService[] createDelayedTaskSchedulers() {
1633             String propName = "jdk.virtualThreadScheduler.timerQueues";
1634             String propValue = System.getProperty(propName);
1635             int queueCount;
1636             if (propValue != null) {
1637                 queueCount = Integer.parseInt(propValue);
1638                 if (queueCount != Integer.highestOneBit(queueCount)) {
1639                     throw new RuntimeException("Value of " + propName + " must be power of 2");
1640                 }
1641             } else {
1642                 int ncpus = Runtime.getRuntime().availableProcessors();
1643                 queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1);
< prev index next >