< prev index next >

src/java.base/share/classes/java/lang/VirtualThread.java

Print this page

   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.lang;
  26 



  27 import java.util.Locale;
  28 import java.util.Objects;
  29 import java.util.concurrent.CountDownLatch;
  30 import java.util.concurrent.Executor;
  31 import java.util.concurrent.Executors;
  32 import java.util.concurrent.ForkJoinPool;
  33 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
  34 import java.util.concurrent.ForkJoinTask;
  35 import java.util.concurrent.Future;
  36 import java.util.concurrent.RejectedExecutionException;
  37 import java.util.concurrent.ScheduledExecutorService;
  38 import java.util.concurrent.ScheduledThreadPoolExecutor;
  39 import java.util.concurrent.TimeUnit;
  40 import jdk.internal.event.VirtualThreadEndEvent;
  41 import jdk.internal.event.VirtualThreadStartEvent;
  42 import jdk.internal.event.VirtualThreadSubmitFailedEvent;

  43 import jdk.internal.misc.CarrierThread;
  44 import jdk.internal.misc.InnocuousThread;
  45 import jdk.internal.misc.Unsafe;
  46 import jdk.internal.vm.Continuation;
  47 import jdk.internal.vm.ContinuationScope;
  48 import jdk.internal.vm.StackableScope;
  49 import jdk.internal.vm.ThreadContainer;
  50 import jdk.internal.vm.ThreadContainers;
  51 import jdk.internal.vm.annotation.ChangesCurrentThread;
  52 import jdk.internal.vm.annotation.Hidden;
  53 import jdk.internal.vm.annotation.IntrinsicCandidate;
  54 import jdk.internal.vm.annotation.JvmtiHideEvents;
  55 import jdk.internal.vm.annotation.JvmtiMountTransition;
  56 import jdk.internal.vm.annotation.ReservedStackAccess;
  57 import sun.nio.ch.Interruptible;
  58 import static java.util.concurrent.TimeUnit.*;
  59 
  60 /**
  61  * A thread that is scheduled by the Java virtual machine rather than the operating system.
  62  */
  63 final class VirtualThread extends BaseVirtualThread {
  64     private static final Unsafe U = Unsafe.getUnsafe();
  65     private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
  66     private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();













  67 
  68     private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
  69     private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
  70     private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
  71     private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
  72     private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
  73 
  74     // scheduler and continuation
  75     private final Executor scheduler;
  76     private final Continuation cont;
  77     private final Runnable runContinuation;
  78 
  79     // virtual thread state, accessed by VM
  80     private volatile int state;
  81 
  82     /*
  83      * Virtual thread state transitions:
  84      *
  85      *      NEW -> STARTED         // Thread.start, schedule to run
  86      *  STARTED -> TERMINATED      // failed to start

 151     private static final int TERMINATED = 99;  // final state
 152 
 153     // can be suspended from scheduling when unmounted
 154     private static final int SUSPENDED = 1 << 8;
 155 
 156     // parking permit made available by LockSupport.unpark
 157     private volatile boolean parkPermit;
 158 
 159     // blocking permit made available by unblocker thread when another thread exits monitor
 160     private volatile boolean blockPermit;
 161 
 162     // true when on the list of virtual threads waiting to be unblocked
 163     private volatile boolean onWaitingList;
 164 
 165     // next virtual thread on the list of virtual threads waiting to be unblocked
 166     private volatile VirtualThread next;
 167 
 168     // notified by Object.notify/notifyAll while waiting in Object.wait
 169     private volatile boolean notified;
 170 



 171     // timed-wait support
 172     private byte timedWaitSeqNo;
 173 
 174     // timeout for timed-park and timed-wait, only accessed on current/carrier thread
 175     private long timeout;
 176 
 177     // timer task for timed-park and timed-wait, only accessed on current/carrier thread
 178     private Future<?> timeoutTask;
 179 
 180     // carrier thread when mounted, accessed by VM
 181     private volatile Thread carrierThread;
 182 
 183     // termination object when joining, created lazily if needed
 184     private volatile CountDownLatch termination;
 185 
 186     /**
 187      * Returns the default scheduler.
 188      */
 189     static Executor defaultScheduler() {
 190         return DEFAULT_SCHEDULER;
 191     }
 192 
 193     /**
 194      * Returns the continuation scope used for virtual threads.
 195      */
 196     static ContinuationScope continuationScope() {
 197         return VTHREAD_SCOPE;
 198     }
 199 
 200     /**
 201      * Creates a new {@code VirtualThread} to run the given task with the given
 202      * scheduler. If the given scheduler is {@code null} and the current thread
 203      * is a platform thread then the newly created virtual thread will use the
 204      * default scheduler. If given scheduler is {@code null} and the current
 205      * thread is a virtual thread then the current thread's scheduler is used.



 206      *
 207      * @param scheduler the scheduler or null
 208      * @param name thread name
 209      * @param characteristics characteristics
 210      * @param task the task to execute
 211      */
 212     VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
 213         super(name, characteristics, /*bound*/ false);
 214         Objects.requireNonNull(task);
 215 
 216         // choose scheduler if not specified
 217         if (scheduler == null) {
 218             Thread parent = Thread.currentThread();
 219             if (parent instanceof VirtualThread vparent) {
 220                 scheduler = vparent.scheduler;
 221             } else {
 222                 scheduler = DEFAULT_SCHEDULER;
 223             }
 224         }
 225 
 226         this.scheduler = scheduler;
 227         this.cont = new VThreadContinuation(this, task);
 228         this.runContinuation = this::runContinuation;




 229     }
 230 
 231     /**
 232      * The continuation that a virtual thread executes.
 233      */
 234     private static class VThreadContinuation extends Continuation {
 235         VThreadContinuation(VirtualThread vthread, Runnable task) {
 236             super(VTHREAD_SCOPE, wrap(vthread, task));
 237         }
 238         @Override
 239         protected void onPinned(Continuation.Pinned reason) {
 240         }
 241         private static Runnable wrap(VirtualThread vthread, Runnable task) {
 242             return new Runnable() {
 243                 @Hidden
 244                 @JvmtiHideEvents
 245                 public void run() {
 246                     vthread.notifyJvmtiStart(); // notify JVMTI
 247                     try {
 248                         vthread.run(task);
 249                     } finally {
 250                         vthread.notifyJvmtiEnd(); // notify JVMTI
 251                     }
 252                 }
 253             };
 254         }
 255     }
 256 













































 257     /**
 258      * Runs or continues execution on the current thread. The virtual thread is mounted
 259      * on the current thread before the task runs or continues. It unmounts when the
 260      * task completes or yields.
 261      */
 262     @ChangesCurrentThread // allow mount/unmount to be inlined
 263     private void runContinuation() {
 264         // the carrier must be a platform thread
 265         if (Thread.currentThread().isVirtual()) {
 266             throw new WrongThreadException();
 267         }
 268 
 269         // set state to RUNNING
 270         int initialState = state();
 271         if (initialState == STARTED || initialState == UNPARKED
 272                 || initialState == UNBLOCKED || initialState == YIELDED) {
 273             // newly started or continue after parking/blocking/Thread.yield
 274             if (!compareAndSetState(initialState, RUNNING)) {
 275                 return;
 276             }

 295             unmount();
 296             if (cont.isDone()) {
 297                 afterDone();
 298             } else {
 299                 afterYield();
 300             }
 301         }
 302     }
 303 
 304     /**
 305      * Cancel timeout task when continuing after timed-park or timed-wait.
 306      * The timeout task may be executing, or may have already completed.
 307      */
 308     private void cancelTimeoutTask() {
 309         if (timeoutTask != null) {
 310             timeoutTask.cancel(false);
 311             timeoutTask = null;
 312         }
 313     }
 314 












 315     /**
 316      * Submits the runContinuation task to the scheduler. For the default scheduler,
 317      * and calling it on a worker thread, the task will be pushed to the local queue,
 318      * otherwise it will be pushed to an external submission queue.
 319      * @param scheduler the scheduler
 320      * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown
 321      * @throws RejectedExecutionException
 322      */
 323     private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) {
 324         boolean done = false;
 325         while (!done) {
 326             try {
 327                 // Pin the continuation to prevent the virtual thread from unmounting
 328                 // when submitting a task. For the default scheduler this ensures that
 329                 // the carrier doesn't change when pushing a task. For other schedulers
 330                 // it avoids deadlock that could arise due to carriers and virtual
 331                 // threads contending for a lock.
 332                 if (currentThread().isVirtual()) {
 333                     Continuation.pin();
 334                     try {
 335                         scheduler.execute(runContinuation);
 336                     } finally {
 337                         Continuation.unpin();
 338                     }
 339                 } else {
 340                     scheduler.execute(runContinuation);
 341                 }
 342                 done = true;
 343             } catch (RejectedExecutionException ree) {
 344                 submitFailed(ree);
 345                 throw ree;
 346             } catch (OutOfMemoryError e) {
 347                 if (retryOnOOME) {
 348                     U.park(false, 100_000_000); // 100ms
 349                 } else {
 350                     throw e;
 351                 }
 352             }
 353         }
 354     }
 355 
 356     /**
 357      * Submits the runContinuation task to the given scheduler as an external submit.
 358      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
 359      * @throws RejectedExecutionException
 360      * @see ForkJoinPool#externalSubmit(ForkJoinTask)

 582                 submitRunContinuation();
 583             }
 584             return;
 585         }
 586 
 587         // blocking on monitorenter
 588         if (s == BLOCKING) {
 589             setState(BLOCKED);
 590 
 591             // may have been unblocked while blocking
 592             if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
 593                 // lazy submit if local queue is empty
 594                 lazySubmitRunContinuation();
 595             }
 596             return;
 597         }
 598 
 599         // Object.wait
 600         if (s == WAITING || s == TIMED_WAITING) {
 601             int newState;

 602             if (s == WAITING) {
 603                 setState(newState = WAIT);
 604             } else {
 605                 // For timed-wait, a timeout task is scheduled to execute. The timeout
 606                 // task will change the thread state to UNBLOCKED and submit the thread
 607                 // to the scheduler. A sequence number is used to ensure that the timeout
 608                 // task only unblocks the thread for this timed-wait. We synchronize with
 609                 // the timeout task to coordinate access to the sequence number and to
 610                 // ensure the timeout task doesn't execute until the thread has got to
 611                 // the TIMED_WAIT state.
 612                 long timeout = this.timeout;
 613                 assert timeout > 0;
 614                 synchronized (timedWaitLock()) {
 615                     byte seqNo = ++timedWaitSeqNo;
 616                     timeoutTask = schedule(() -> waitTimeoutExpired(seqNo), timeout, MILLISECONDS);
 617                     setState(newState = TIMED_WAIT);
 618                 }
 619             }
 620 
 621             // may have been notified while in transition to wait state
 622             if (notified && compareAndSetState(newState, BLOCKED)) {
 623                 // may have even been unblocked already
 624                 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
 625                     submitRunContinuation();
 626                 }
 627                 return;
 628             }
 629 
 630             // may have been interrupted while in transition to wait state
 631             if (interrupted && compareAndSetState(newState, UNBLOCKED)) {
 632                 submitRunContinuation();
 633                 return;
 634             }
 635             return;
 636         }
 637 
 638         assert false;
 639     }
 640 
 641     /**
 642      * Invoked after the continuation completes.
 643      */
 644     private void afterDone() {
 645         afterDone(true);
 646     }
 647 
 648     /**
 649      * Invoked after the continuation completes (or start failed). Sets the thread
 650      * state to TERMINATED and notifies anyone waiting for the thread to terminate.
 651      *

1397 
1398     @IntrinsicCandidate
1399     @JvmtiMountTransition
1400     private native void notifyJvmtiMount(boolean hide);
1401 
1402     @IntrinsicCandidate
1403     @JvmtiMountTransition
1404     private native void notifyJvmtiUnmount(boolean hide);
1405 
1406     @IntrinsicCandidate
1407     private static native void notifyJvmtiDisableSuspend(boolean enter);
1408 
1409     private static native void registerNatives();
1410     static {
1411         registerNatives();
1412 
1413         // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
1414         var group = Thread.virtualThreadGroup();
1415     }
1416 


















1417     /**
1418      * Creates the default ForkJoinPool scheduler.
1419      */
1420     private static ForkJoinPool createDefaultScheduler() {
1421         ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool);
1422         int parallelism, maxPoolSize, minRunnable;
1423         String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
1424         String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
1425         String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
1426         if (parallelismValue != null) {
1427             parallelism = Integer.parseInt(parallelismValue);
1428         } else {
1429             parallelism = Runtime.getRuntime().availableProcessors();
1430         }
1431         if (maxPoolSizeValue != null) {
1432             maxPoolSize = Integer.parseInt(maxPoolSizeValue);
1433             parallelism = Integer.min(parallelism, maxPoolSize);
1434         } else {
1435             maxPoolSize = Integer.max(parallelism, 256);
1436         }
1437         if (minRunnableValue != null) {
1438             minRunnable = Integer.parseInt(minRunnableValue);
1439         } else {
1440             minRunnable = Integer.max(parallelism / 2, 1);

1515                 assert changed;
1516                 vthread.unblock();
1517 
1518                 vthread = nextThread;
1519             }
1520         }
1521     }
1522 
1523     /**
1524      * Retrieves the list of virtual threads that are waiting to be unblocked, waiting
1525      * if necessary until a list of one or more threads becomes available.
1526      */
1527     private static native VirtualThread takeVirtualThreadListToUnblock();
1528 
1529     static {
1530         var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
1531                 VirtualThread::unblockVirtualThreads);
1532         unblocker.setDaemon(true);
1533         unblocker.start();
1534     }
1535 }

   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.lang;
  26 
  27 import java.lang.invoke.MethodHandles;
  28 import java.lang.invoke.VarHandle;
  29 import java.lang.reflect.Constructor;
  30 import java.util.Locale;
  31 import java.util.Objects;
  32 import java.util.concurrent.CountDownLatch;
  33 import java.util.concurrent.Executor;
  34 import java.util.concurrent.Executors;
  35 import java.util.concurrent.ForkJoinPool;
  36 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
  37 import java.util.concurrent.ForkJoinTask;
  38 import java.util.concurrent.Future;
  39 import java.util.concurrent.RejectedExecutionException;
  40 import java.util.concurrent.ScheduledExecutorService;
  41 import java.util.concurrent.ScheduledThreadPoolExecutor;
  42 import java.util.concurrent.TimeUnit;
  43 import jdk.internal.event.VirtualThreadEndEvent;
  44 import jdk.internal.event.VirtualThreadStartEvent;
  45 import jdk.internal.event.VirtualThreadSubmitFailedEvent;
  46 import jdk.internal.invoke.MhUtil;
  47 import jdk.internal.misc.CarrierThread;
  48 import jdk.internal.misc.InnocuousThread;
  49 import jdk.internal.misc.Unsafe;
  50 import jdk.internal.vm.Continuation;
  51 import jdk.internal.vm.ContinuationScope;
  52 import jdk.internal.vm.StackableScope;
  53 import jdk.internal.vm.ThreadContainer;
  54 import jdk.internal.vm.ThreadContainers;
  55 import jdk.internal.vm.annotation.ChangesCurrentThread;
  56 import jdk.internal.vm.annotation.Hidden;
  57 import jdk.internal.vm.annotation.IntrinsicCandidate;
  58 import jdk.internal.vm.annotation.JvmtiHideEvents;
  59 import jdk.internal.vm.annotation.JvmtiMountTransition;
  60 import jdk.internal.vm.annotation.ReservedStackAccess;
  61 import sun.nio.ch.Interruptible;
  62 import static java.util.concurrent.TimeUnit.*;
  63 
  64 /**
  65  * A thread that is scheduled by the Java virtual machine rather than the operating system.
  66  */
  67 final class VirtualThread extends BaseVirtualThread {
  68     private static final Unsafe U = Unsafe.getUnsafe();
  69     private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
  70 
  71     private static final Executor DEFAULT_SCHEDULER;
  72     private static final boolean USE_CUSTOM_RUNNER;
  73     static {
  74         // experimental
  75         String propValue = System.getProperty("jdk.virtualThreadScheduler.implClass");
  76         if (propValue != null) {
  77             DEFAULT_SCHEDULER = createCustomDefaultScheduler(propValue);
  78             USE_CUSTOM_RUNNER = true;
  79         } else {
  80             DEFAULT_SCHEDULER = createDefaultForkJoinPoolScheduler();
  81             USE_CUSTOM_RUNNER = false;
  82         }
  83     }
  84 
  85     private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
  86     private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
  87     private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
  88     private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
  89     private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
  90 
  91     // scheduler and continuation
  92     private final Executor scheduler;
  93     private final Continuation cont;
  94     private final Runnable runContinuation;
  95 
  96     // virtual thread state, accessed by VM
  97     private volatile int state;
  98 
  99     /*
 100      * Virtual thread state transitions:
 101      *
 102      *      NEW -> STARTED         // Thread.start, schedule to run
 103      *  STARTED -> TERMINATED      // failed to start

 168     private static final int TERMINATED = 99;  // final state
 169 
 170     // can be suspended from scheduling when unmounted
 171     private static final int SUSPENDED = 1 << 8;
 172 
 173     // parking permit made available by LockSupport.unpark
 174     private volatile boolean parkPermit;
 175 
 176     // blocking permit made available by unblocker thread when another thread exits monitor
 177     private volatile boolean blockPermit;
 178 
 179     // true when on the list of virtual threads waiting to be unblocked
 180     private volatile boolean onWaitingList;
 181 
 182     // next virtual thread on the list of virtual threads waiting to be unblocked
 183     private volatile VirtualThread next;
 184 
 185     // notified by Object.notify/notifyAll while waiting in Object.wait
 186     private volatile boolean notified;
 187 
 188     // true when waiting in Object.wait, false for VM internal uninterruptible Object.wait
 189     private volatile boolean interruptableWait;
 190 
 191     // timed-wait support
 192     private byte timedWaitSeqNo;
 193 
 194     // timeout for timed-park and timed-wait, only accessed on current/carrier thread
 195     private long timeout;
 196 
 197     // timer task for timed-park and timed-wait, only accessed on current/carrier thread
 198     private Future<?> timeoutTask;
 199 
 200     // carrier thread when mounted, accessed by VM
 201     private volatile Thread carrierThread;
 202 
 203     // termination object when joining, created lazily if needed
 204     private volatile CountDownLatch termination;
 205 
 206     /**
 207      * Returns the default scheduler.
 208      */
 209     static Executor defaultScheduler() {
 210         return DEFAULT_SCHEDULER;
 211     }
 212 
 213     /**
 214      * Returns the continuation scope used for virtual threads.
 215      */
 216     static ContinuationScope continuationScope() {
 217         return VTHREAD_SCOPE;
 218     }
 219 
 220     /**
 221      * Return the scheduler for this thread.
 222      */
 223     Executor scheduler() {
 224         return scheduler;
 225     }
 226 
 227     /**
 228      * Creates a new {@code VirtualThread} to run the given task with the given scheduler.
 229      *
 230      * @param scheduler the scheduler or null for default scheduler
 231      * @param name thread name
 232      * @param characteristics characteristics
 233      * @param task the task to execute
 234      */
 235     VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
 236         super(name, characteristics, /*bound*/ false);
 237         Objects.requireNonNull(task);
 238 
 239         // use default scheduler if not provided
 240         if (scheduler == null) {
 241             scheduler = DEFAULT_SCHEDULER;





 242         }
 243 
 244         this.scheduler = scheduler;
 245         this.cont = new VThreadContinuation(this, task);
 246         if (USE_CUSTOM_RUNNER || (scheduler != DEFAULT_SCHEDULER)) {
 247             this.runContinuation = new CustomRunner(this);
 248         } else {
 249             this.runContinuation = this::runContinuation;
 250         }
 251     }
 252 
 253     /**
 254      * The continuation that a virtual thread executes.
 255      */
 256     private static class VThreadContinuation extends Continuation {
 257         VThreadContinuation(VirtualThread vthread, Runnable task) {
 258             super(VTHREAD_SCOPE, wrap(vthread, task));
 259         }
 260         @Override
 261         protected void onPinned(Continuation.Pinned reason) {
 262         }
 263         private static Runnable wrap(VirtualThread vthread, Runnable task) {
 264             return new Runnable() {
 265                 @Hidden
 266                 @JvmtiHideEvents
 267                 public void run() {
 268                     vthread.notifyJvmtiStart(); // notify JVMTI
 269                     try {
 270                         vthread.run(task);
 271                     } finally {
 272                         vthread.notifyJvmtiEnd(); // notify JVMTI
 273                     }
 274                 }
 275             };
 276         }
 277     }
 278 
 279     /**
 280      * The task to execute when using a custom scheduler.
 281      */
 282     private static class CustomRunner implements VirtualThreadTask {
 283         private static final VarHandle ATTACHMENT =
 284                 MhUtil.findVarHandle(MethodHandles.lookup(), "attachment", Object.class);
 285         private final VirtualThread vthread;
 286         private volatile Object attachment;
 287         CustomRunner(VirtualThread vthread) {
 288             this.vthread = vthread;
 289         }
 290         @Override
 291         public void run() {
 292             vthread.runContinuation();
 293         }
 294         @Override
 295         public Thread thread() {
 296             return vthread;
 297         }
 298         @Override
 299         public Object attach(Object ob) {
 300             return ATTACHMENT.getAndSet(this, ob);
 301         }
 302         @Override
 303         public Object attachment() {
 304             return attachment;
 305         }
 306         @Override
 307         public String toString() {
 308             return vthread.toString();
 309         }
 310     }
 311 
 312     /**
 313      * Returns the object attached to the virtual thread's task.
 314      */
 315     Object currentTaskAttachment() {
 316         assert Thread.currentThread() == this;
 317         if (runContinuation instanceof CustomRunner runner) {
 318             return runner.attachment();
 319         } else {
 320             return null;
 321         }
 322     }
 323 
 324     /**
 325      * Runs or continues execution on the current thread. The virtual thread is mounted
 326      * on the current thread before the task runs or continues. It unmounts when the
 327      * task completes or yields.
 328      */
 329     @ChangesCurrentThread // allow mount/unmount to be inlined
 330     private void runContinuation() {
 331         // the carrier must be a platform thread
 332         if (Thread.currentThread().isVirtual()) {
 333             throw new WrongThreadException();
 334         }
 335 
 336         // set state to RUNNING
 337         int initialState = state();
 338         if (initialState == STARTED || initialState == UNPARKED
 339                 || initialState == UNBLOCKED || initialState == YIELDED) {
 340             // newly started or continue after parking/blocking/Thread.yield
 341             if (!compareAndSetState(initialState, RUNNING)) {
 342                 return;
 343             }

 362             unmount();
 363             if (cont.isDone()) {
 364                 afterDone();
 365             } else {
 366                 afterYield();
 367             }
 368         }
 369     }
 370 
 371     /**
 372      * Cancel timeout task when continuing after timed-park or timed-wait.
 373      * The timeout task may be executing, or may have already completed.
 374      */
 375     private void cancelTimeoutTask() {
 376         if (timeoutTask != null) {
 377             timeoutTask.cancel(false);
 378             timeoutTask = null;
 379         }
 380     }
 381 
 382     /**
 383      * Submits the given task to the given executor. If the scheduler is a
 384      * ForkJoinPool then the task is first adapted to a ForkJoinTask.
 385      */
 386     private void submit(Executor executor, Runnable task) {
 387         if (executor instanceof ForkJoinPool pool) {
 388             pool.submit(ForkJoinTask.adapt(task));
 389         } else {
 390             executor.execute(task);
 391         }
 392     }
 393 
 394     /**
 395      * Submits the runContinuation task to the scheduler. For the default scheduler,
 396      * and calling it on a worker thread, the task will be pushed to the local queue,
 397      * otherwise it will be pushed to an external submission queue.
 398      * @param scheduler the scheduler
 399      * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown
 400      * @throws RejectedExecutionException
 401      */
 402     private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) {
 403         boolean done = false;
 404         while (!done) {
 405             try {
 406                 // Pin the continuation to prevent the virtual thread from unmounting
 407                 // when submitting a task. For the default scheduler this ensures that
 408                 // the carrier doesn't change when pushing a task. For other schedulers
 409                 // it avoids deadlock that could arise due to carriers and virtual
 410                 // threads contending for a lock.
 411                 if (currentThread().isVirtual()) {
 412                     Continuation.pin();
 413                     try {
 414                         submit(scheduler, runContinuation);
 415                     } finally {
 416                         Continuation.unpin();
 417                     }
 418                 } else {
 419                     submit(scheduler, runContinuation);
 420                 }
 421                 done = true;
 422             } catch (RejectedExecutionException ree) {
 423                 submitFailed(ree);
 424                 throw ree;
 425             } catch (OutOfMemoryError e) {
 426                 if (retryOnOOME) {
 427                     U.park(false, 100_000_000); // 100ms
 428                 } else {
 429                     throw e;
 430                 }
 431             }
 432         }
 433     }
 434 
 435     /**
 436      * Submits the runContinuation task to the given scheduler as an external submit.
 437      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
 438      * @throws RejectedExecutionException
 439      * @see ForkJoinPool#externalSubmit(ForkJoinTask)

 661                 submitRunContinuation();
 662             }
 663             return;
 664         }
 665 
 666         // blocking on monitorenter
 667         if (s == BLOCKING) {
 668             setState(BLOCKED);
 669 
 670             // may have been unblocked while blocking
 671             if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
 672                 // lazy submit if local queue is empty
 673                 lazySubmitRunContinuation();
 674             }
 675             return;
 676         }
 677 
 678         // Object.wait
 679         if (s == WAITING || s == TIMED_WAITING) {
 680             int newState;
 681             boolean interruptable = interruptableWait;
 682             if (s == WAITING) {
 683                 setState(newState = WAIT);
 684             } else {
 685                 // For timed-wait, a timeout task is scheduled to execute. The timeout
 686                 // task will change the thread state to UNBLOCKED and submit the thread
 687                 // to the scheduler. A sequence number is used to ensure that the timeout
 688                 // task only unblocks the thread for this timed-wait. We synchronize with
 689                 // the timeout task to coordinate access to the sequence number and to
 690                 // ensure the timeout task doesn't execute until the thread has got to
 691                 // the TIMED_WAIT state.
 692                 long timeout = this.timeout;
 693                 assert timeout > 0;
 694                 synchronized (timedWaitLock()) {
 695                     byte seqNo = ++timedWaitSeqNo;
 696                     timeoutTask = schedule(() -> waitTimeoutExpired(seqNo), timeout, MILLISECONDS);
 697                     setState(newState = TIMED_WAIT);
 698                 }
 699             }
 700 
 701             // may have been notified while in transition to wait state
 702             if (notified && compareAndSetState(newState, BLOCKED)) {
 703                 // may have even been unblocked already
 704                 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
 705                     submitRunContinuation();
 706                 }
 707                 return;
 708             }
 709 
 710             // may have been interrupted while in transition to wait state
 711             if (interruptable && interrupted && compareAndSetState(newState, UNBLOCKED)) {
 712                 submitRunContinuation();
 713                 return;
 714             }
 715             return;
 716         }
 717 
 718         assert false;
 719     }
 720 
 721     /**
 722      * Invoked after the continuation completes.
 723      */
 724     private void afterDone() {
 725         afterDone(true);
 726     }
 727 
 728     /**
 729      * Invoked after the continuation completes (or start failed). Sets the thread
 730      * state to TERMINATED and notifies anyone waiting for the thread to terminate.
 731      *

1477 
1478     @IntrinsicCandidate
1479     @JvmtiMountTransition
1480     private native void notifyJvmtiMount(boolean hide);
1481 
1482     @IntrinsicCandidate
1483     @JvmtiMountTransition
1484     private native void notifyJvmtiUnmount(boolean hide);
1485 
1486     @IntrinsicCandidate
1487     private static native void notifyJvmtiDisableSuspend(boolean enter);
1488 
1489     private static native void registerNatives();
1490     static {
1491         registerNatives();
1492 
1493         // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
1494         var group = Thread.virtualThreadGroup();
1495     }
1496 
1497     /**
1498      * Loads a java.util.concurrent.Executor with the given class name to use at the
1499      * default scheduler. The class is public in an exported package, has a public
1500      * no-arg constructor, and is visible to the system class loader.
1501      */
1502     private static Executor createCustomDefaultScheduler(String cn) {
1503         try {
1504             Class<?> clazz = Class.forName(cn, true, ClassLoader.getSystemClassLoader());
1505             Constructor<?> ctor = clazz.getConstructor();
1506             var scheduler = (Executor) ctor.newInstance();
1507             System.err.println("""
1508                 WARNING: Using custom default scheduler, this is an experimental feature!""");
1509             return scheduler;
1510         } catch (Exception ex) {
1511             throw new Error(ex);
1512         }
1513     }
1514 
1515     /**
1516      * Creates the default ForkJoinPool scheduler.
1517      */
1518     private static ForkJoinPool createDefaultForkJoinPoolScheduler() {
1519         ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool);
1520         int parallelism, maxPoolSize, minRunnable;
1521         String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
1522         String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
1523         String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
1524         if (parallelismValue != null) {
1525             parallelism = Integer.parseInt(parallelismValue);
1526         } else {
1527             parallelism = Runtime.getRuntime().availableProcessors();
1528         }
1529         if (maxPoolSizeValue != null) {
1530             maxPoolSize = Integer.parseInt(maxPoolSizeValue);
1531             parallelism = Integer.min(parallelism, maxPoolSize);
1532         } else {
1533             maxPoolSize = Integer.max(parallelism, 256);
1534         }
1535         if (minRunnableValue != null) {
1536             minRunnable = Integer.parseInt(minRunnableValue);
1537         } else {
1538             minRunnable = Integer.max(parallelism / 2, 1);

1613                 assert changed;
1614                 vthread.unblock();
1615 
1616                 vthread = nextThread;
1617             }
1618         }
1619     }
1620 
1621     /**
1622      * Retrieves the list of virtual threads that are waiting to be unblocked, waiting
1623      * if necessary until a list of one or more threads becomes available.
1624      */
1625     private static native VirtualThread takeVirtualThreadListToUnblock();
1626 
1627     static {
1628         var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
1629                 VirtualThread::unblockVirtualThreads);
1630         unblocker.setDaemon(true);
1631         unblocker.start();
1632     }
1633 }
< prev index next >