< prev index next >

src/java.base/share/classes/java/lang/VirtualThread.java

Print this page

   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.lang;
  26 



  27 import java.util.Locale;
  28 import java.util.Objects;
  29 import java.util.concurrent.CountDownLatch;
  30 import java.util.concurrent.Executor;
  31 import java.util.concurrent.Executors;
  32 import java.util.concurrent.ForkJoinPool;
  33 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
  34 import java.util.concurrent.ForkJoinTask;
  35 import java.util.concurrent.Future;
  36 import java.util.concurrent.RejectedExecutionException;
  37 import java.util.concurrent.ScheduledExecutorService;
  38 import java.util.concurrent.ScheduledThreadPoolExecutor;
  39 import java.util.concurrent.TimeUnit;
  40 import jdk.internal.event.VirtualThreadEndEvent;
  41 import jdk.internal.event.VirtualThreadStartEvent;
  42 import jdk.internal.event.VirtualThreadSubmitFailedEvent;

  43 import jdk.internal.misc.CarrierThread;
  44 import jdk.internal.misc.InnocuousThread;
  45 import jdk.internal.misc.Unsafe;
  46 import jdk.internal.vm.Continuation;
  47 import jdk.internal.vm.ContinuationScope;
  48 import jdk.internal.vm.StackableScope;
  49 import jdk.internal.vm.ThreadContainer;
  50 import jdk.internal.vm.ThreadContainers;
  51 import jdk.internal.vm.annotation.ChangesCurrentThread;
  52 import jdk.internal.vm.annotation.Hidden;
  53 import jdk.internal.vm.annotation.IntrinsicCandidate;
  54 import jdk.internal.vm.annotation.JvmtiHideEvents;
  55 import jdk.internal.vm.annotation.JvmtiMountTransition;
  56 import jdk.internal.vm.annotation.ReservedStackAccess;
  57 import sun.nio.ch.Interruptible;
  58 import static java.util.concurrent.TimeUnit.*;
  59 
  60 /**
  61  * A thread that is scheduled by the Java virtual machine rather than the operating system.
  62  */
  63 final class VirtualThread extends BaseVirtualThread {
  64     private static final Unsafe U = Unsafe.getUnsafe();
  65     private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
  66     private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();













  67 
  68     private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
  69     private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
  70     private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
  71     private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
  72     private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
  73 
  74     // scheduler and continuation
  75     private final Executor scheduler;
  76     private final Continuation cont;
  77     private final Runnable runContinuation;
  78 
  79     // virtual thread state, accessed by VM
  80     private volatile int state;
  81 
  82     /*
  83      * Virtual thread state transitions:
  84      *
  85      *      NEW -> STARTED         // Thread.start, schedule to run
  86      *  STARTED -> TERMINATED      // failed to start

 151     private static final int TERMINATED = 99;  // final state
 152 
 153     // can be suspended from scheduling when unmounted
 154     private static final int SUSPENDED = 1 << 8;
 155 
 156     // parking permit made available by LockSupport.unpark
 157     private volatile boolean parkPermit;
 158 
 159     // blocking permit made available by unblocker thread when another thread exits monitor
 160     private volatile boolean blockPermit;
 161 
 162     // true when on the list of virtual threads waiting to be unblocked
 163     private volatile boolean onWaitingList;
 164 
 165     // next virtual thread on the list of virtual threads waiting to be unblocked
 166     private volatile VirtualThread next;
 167 
 168     // notified by Object.notify/notifyAll while waiting in Object.wait
 169     private volatile boolean notified;
 170 



 171     // timed-wait support
 172     private byte timedWaitSeqNo;
 173 
 174     // timeout for timed-park and timed-wait, only accessed on current/carrier thread
 175     private long timeout;
 176 
 177     // timer task for timed-park and timed-wait, only accessed on current/carrier thread
 178     private Future<?> timeoutTask;
 179 
 180     // carrier thread when mounted, accessed by VM
 181     private volatile Thread carrierThread;
 182 
 183     // termination object when joining, created lazily if needed
 184     private volatile CountDownLatch termination;
 185 
 186     /**
 187      * Returns the default scheduler.
 188      */
 189     static Executor defaultScheduler() {
 190         return DEFAULT_SCHEDULER;
 191     }
 192 







 193     /**
 194      * Returns the continuation scope used for virtual threads.
 195      */
 196     static ContinuationScope continuationScope() {
 197         return VTHREAD_SCOPE;
 198     }
 199 
 200     /**
 201      * Creates a new {@code VirtualThread} to run the given task with the given
 202      * scheduler. If the given scheduler is {@code null} and the current thread
 203      * is a platform thread then the newly created virtual thread will use the
 204      * default scheduler. If given scheduler is {@code null} and the current
 205      * thread is a virtual thread then the current thread's scheduler is used.



 206      *
 207      * @param scheduler the scheduler or null
 208      * @param name thread name
 209      * @param characteristics characteristics
 210      * @param task the task to execute
 211      */
 212     VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
 213         super(name, characteristics, /*bound*/ false);
 214         Objects.requireNonNull(task);
 215 
 216         // choose scheduler if not specified
 217         if (scheduler == null) {
 218             Thread parent = Thread.currentThread();
 219             if (parent instanceof VirtualThread vparent) {
 220                 scheduler = vparent.scheduler;
 221             } else {
 222                 scheduler = DEFAULT_SCHEDULER;
 223             }
 224         }
 225 
 226         this.scheduler = scheduler;
 227         this.cont = new VThreadContinuation(this, task);
 228         this.runContinuation = this::runContinuation;




 229     }
 230 
 231     /**
 232      * The continuation that a virtual thread executes.
 233      */
 234     private static class VThreadContinuation extends Continuation {
 235         VThreadContinuation(VirtualThread vthread, Runnable task) {
 236             super(VTHREAD_SCOPE, wrap(vthread, task));
 237         }
 238         @Override
 239         protected void onPinned(Continuation.Pinned reason) {
 240         }
 241         private static Runnable wrap(VirtualThread vthread, Runnable task) {
 242             return new Runnable() {
 243                 @Hidden
 244                 @JvmtiHideEvents
 245                 public void run() {
 246                     vthread.notifyJvmtiStart(); // notify JVMTI
 247                     try {
 248                         vthread.run(task);
 249                     } finally {
 250                         vthread.notifyJvmtiEnd(); // notify JVMTI
 251                     }
 252                 }
 253             };
 254         }
 255     }
 256 













































 257     /**
 258      * Runs or continues execution on the current thread. The virtual thread is mounted
 259      * on the current thread before the task runs or continues. It unmounts when the
 260      * task completes or yields.
 261      */
 262     @ChangesCurrentThread // allow mount/unmount to be inlined
 263     private void runContinuation() {
 264         // the carrier must be a platform thread
 265         if (Thread.currentThread().isVirtual()) {
 266             throw new WrongThreadException();
 267         }
 268 
 269         // set state to RUNNING
 270         int initialState = state();
 271         if (initialState == STARTED || initialState == UNPARKED
 272                 || initialState == UNBLOCKED || initialState == YIELDED) {
 273             // newly started or continue after parking/blocking/Thread.yield
 274             if (!compareAndSetState(initialState, RUNNING)) {
 275                 return;
 276             }

 295             unmount();
 296             if (cont.isDone()) {
 297                 afterDone();
 298             } else {
 299                 afterYield();
 300             }
 301         }
 302     }
 303 
 304     /**
 305      * Cancel timeout task when continuing after timed-park or timed-wait.
 306      * The timeout task may be executing, or may have already completed.
 307      */
 308     private void cancelTimeoutTask() {
 309         if (timeoutTask != null) {
 310             timeoutTask.cancel(false);
 311             timeoutTask = null;
 312         }
 313     }
 314 












 315     /**
 316      * Submits the runContinuation task to the scheduler. For the default scheduler,
 317      * and calling it on a worker thread, the task will be pushed to the local queue,
 318      * otherwise it will be pushed to an external submission queue.
 319      * @param scheduler the scheduler
 320      * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown
 321      * @throws RejectedExecutionException
 322      */
 323     private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) {
 324         boolean done = false;
 325         while (!done) {
 326             try {
 327                 // Pin the continuation to prevent the virtual thread from unmounting
 328                 // when submitting a task. For the default scheduler this ensures that
 329                 // the carrier doesn't change when pushing a task. For other schedulers
 330                 // it avoids deadlock that could arise due to carriers and virtual
 331                 // threads contending for a lock.
 332                 if (currentThread().isVirtual()) {
 333                     Continuation.pin();
 334                     try {
 335                         scheduler.execute(runContinuation);
 336                     } finally {
 337                         Continuation.unpin();
 338                     }
 339                 } else {
 340                     scheduler.execute(runContinuation);
 341                 }
 342                 done = true;
 343             } catch (RejectedExecutionException ree) {
 344                 submitFailed(ree);
 345                 throw ree;
 346             } catch (OutOfMemoryError e) {
 347                 if (retryOnOOME) {
 348                     U.park(false, 100_000_000); // 100ms
 349                 } else {
 350                     throw e;
 351                 }
 352             }
 353         }
 354     }
 355 
 356     /**
 357      * Submits the runContinuation task to the given scheduler as an external submit.
 358      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
 359      * @throws RejectedExecutionException
 360      * @see ForkJoinPool#externalSubmit(ForkJoinTask)

 582                 submitRunContinuation();
 583             }
 584             return;
 585         }
 586 
 587         // blocking on monitorenter
 588         if (s == BLOCKING) {
 589             setState(BLOCKED);
 590 
 591             // may have been unblocked while blocking
 592             if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
 593                 // lazy submit if local queue is empty
 594                 lazySubmitRunContinuation();
 595             }
 596             return;
 597         }
 598 
 599         // Object.wait
 600         if (s == WAITING || s == TIMED_WAITING) {
 601             int newState;

 602             if (s == WAITING) {
 603                 setState(newState = WAIT);
 604             } else {
 605                 // For timed-wait, a timeout task is scheduled to execute. The timeout
 606                 // task will change the thread state to UNBLOCKED and submit the thread
 607                 // to the scheduler. A sequence number is used to ensure that the timeout
 608                 // task only unblocks the thread for this timed-wait. We synchronize with
 609                 // the timeout task to coordinate access to the sequence number and to
 610                 // ensure the timeout task doesn't execute until the thread has got to
 611                 // the TIMED_WAIT state.
 612                 long timeout = this.timeout;
 613                 assert timeout > 0;
 614                 synchronized (timedWaitLock()) {
 615                     byte seqNo = ++timedWaitSeqNo;
 616                     timeoutTask = schedule(() -> waitTimeoutExpired(seqNo), timeout, MILLISECONDS);
 617                     setState(newState = TIMED_WAIT);
 618                 }
 619             }
 620 
 621             // may have been notified while in transition to wait state
 622             if (notified && compareAndSetState(newState, BLOCKED)) {
 623                 // may have even been unblocked already
 624                 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
 625                     submitRunContinuation();
 626                 }
 627                 return;
 628             }
 629 
 630             // may have been interrupted while in transition to wait state
 631             if (interrupted && compareAndSetState(newState, UNBLOCKED)) {
 632                 submitRunContinuation();
 633                 return;
 634             }
 635             return;
 636         }
 637 
 638         assert false;
 639     }
 640 
 641     /**
 642      * Invoked after the continuation completes.
 643      */
 644     private void afterDone() {
 645         afterDone(true);
 646     }
 647 
 648     /**
 649      * Invoked after the continuation completes (or start failed). Sets the thread
 650      * state to TERMINATED and notifies anyone waiting for the thread to terminate.
 651      *

1397 
1398     @IntrinsicCandidate
1399     @JvmtiMountTransition
1400     private native void notifyJvmtiMount(boolean hide);
1401 
1402     @IntrinsicCandidate
1403     @JvmtiMountTransition
1404     private native void notifyJvmtiUnmount(boolean hide);
1405 
1406     @IntrinsicCandidate
1407     private static native void notifyJvmtiDisableSuspend(boolean enter);
1408 
1409     private static native void registerNatives();
1410     static {
1411         registerNatives();
1412 
1413         // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
1414         var group = Thread.virtualThreadGroup();
1415     }
1416 


















1417     /**
1418      * Creates the default ForkJoinPool scheduler.
1419      */
1420     private static ForkJoinPool createDefaultScheduler() {
1421         ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool);
1422         int parallelism, maxPoolSize, minRunnable;
1423         String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
1424         String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
1425         String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
1426         if (parallelismValue != null) {
1427             parallelism = Integer.parseInt(parallelismValue);
1428         } else {
1429             parallelism = Runtime.getRuntime().availableProcessors();
1430         }
1431         if (maxPoolSizeValue != null) {
1432             maxPoolSize = Integer.parseInt(maxPoolSizeValue);
1433             parallelism = Integer.min(parallelism, maxPoolSize);
1434         } else {
1435             maxPoolSize = Integer.max(parallelism, 256);
1436         }
1437         if (minRunnableValue != null) {
1438             minRunnable = Integer.parseInt(minRunnableValue);
1439         } else {
1440             minRunnable = Integer.max(parallelism / 2, 1);

1515                 assert changed;
1516                 vthread.unblock();
1517 
1518                 vthread = nextThread;
1519             }
1520         }
1521     }
1522 
1523     /**
1524      * Retrieves the list of virtual threads that are waiting to be unblocked, waiting
1525      * if necessary until a list of one or more threads becomes available.
1526      */
1527     private static native VirtualThread takeVirtualThreadListToUnblock();
1528 
1529     static {
1530         var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
1531                 VirtualThread::unblockVirtualThreads);
1532         unblocker.setDaemon(true);
1533         unblocker.start();
1534     }
1535 }

   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.lang;
  26 
  27 import java.lang.invoke.MethodHandles;
  28 import java.lang.invoke.VarHandle;
  29 import java.lang.reflect.Constructor;
  30 import java.util.Locale;
  31 import java.util.Objects;
  32 import java.util.concurrent.CountDownLatch;
  33 import java.util.concurrent.Executor;
  34 import java.util.concurrent.Executors;
  35 import java.util.concurrent.ForkJoinPool;
  36 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
  37 import java.util.concurrent.ForkJoinTask;
  38 import java.util.concurrent.Future;
  39 import java.util.concurrent.RejectedExecutionException;
  40 import java.util.concurrent.ScheduledExecutorService;
  41 import java.util.concurrent.ScheduledThreadPoolExecutor;
  42 import java.util.concurrent.TimeUnit;
  43 import jdk.internal.event.VirtualThreadEndEvent;
  44 import jdk.internal.event.VirtualThreadStartEvent;
  45 import jdk.internal.event.VirtualThreadSubmitFailedEvent;
  46 import jdk.internal.invoke.MhUtil;
  47 import jdk.internal.misc.CarrierThread;
  48 import jdk.internal.misc.InnocuousThread;
  49 import jdk.internal.misc.Unsafe;
  50 import jdk.internal.vm.Continuation;
  51 import jdk.internal.vm.ContinuationScope;
  52 import jdk.internal.vm.StackableScope;
  53 import jdk.internal.vm.ThreadContainer;
  54 import jdk.internal.vm.ThreadContainers;
  55 import jdk.internal.vm.annotation.ChangesCurrentThread;
  56 import jdk.internal.vm.annotation.Hidden;
  57 import jdk.internal.vm.annotation.IntrinsicCandidate;
  58 import jdk.internal.vm.annotation.JvmtiHideEvents;
  59 import jdk.internal.vm.annotation.JvmtiMountTransition;
  60 import jdk.internal.vm.annotation.ReservedStackAccess;
  61 import sun.nio.ch.Interruptible;
  62 import static java.util.concurrent.TimeUnit.*;
  63 
  64 /**
  65  * A thread that is scheduled by the Java virtual machine rather than the operating system.
  66  */
  67 final class VirtualThread extends BaseVirtualThread {
  68     private static final Unsafe U = Unsafe.getUnsafe();
  69     private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
  70 
  71     private static final Executor DEFAULT_SCHEDULER;
  72     private static final boolean USE_CUSTOM_RUNNER;
  73     static {
  74         // experimental
  75         String propValue = System.getProperty("jdk.virtualThreadScheduler.implClass");
  76         if (propValue != null) {
  77             DEFAULT_SCHEDULER = createCustomDefaultScheduler(propValue);
  78             USE_CUSTOM_RUNNER = true;
  79         } else {
  80             DEFAULT_SCHEDULER = createDefaultForkJoinPoolScheduler();
  81             USE_CUSTOM_RUNNER = false;
  82         }
  83     }
  84 
  85     private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
  86     private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
  87     private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
  88     private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
  89     private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
  90 
  91     // scheduler and continuation
  92     private final Executor scheduler;
  93     private final Continuation cont;
  94     private final Runnable runContinuation;
  95 
  96     // virtual thread state, accessed by VM
  97     private volatile int state;
  98 
  99     /*
 100      * Virtual thread state transitions:
 101      *
 102      *      NEW -> STARTED         // Thread.start, schedule to run
 103      *  STARTED -> TERMINATED      // failed to start

 168     private static final int TERMINATED = 99;  // final state
 169 
 170     // can be suspended from scheduling when unmounted
 171     private static final int SUSPENDED = 1 << 8;
 172 
 173     // parking permit made available by LockSupport.unpark
 174     private volatile boolean parkPermit;
 175 
 176     // blocking permit made available by unblocker thread when another thread exits monitor
 177     private volatile boolean blockPermit;
 178 
 179     // true when on the list of virtual threads waiting to be unblocked
 180     private volatile boolean onWaitingList;
 181 
 182     // next virtual thread on the list of virtual threads waiting to be unblocked
 183     private volatile VirtualThread next;
 184 
 185     // notified by Object.notify/notifyAll while waiting in Object.wait
 186     private volatile boolean notified;
 187 
 188     // true when waiting in Object.wait, false for VM internal uninterruptible Object.wait
 189     private volatile boolean interruptableWait;
 190 
 191     // timed-wait support
 192     private byte timedWaitSeqNo;
 193 
 194     // timeout for timed-park and timed-wait, only accessed on current/carrier thread
 195     private long timeout;
 196 
 197     // timer task for timed-park and timed-wait, only accessed on current/carrier thread
 198     private Future<?> timeoutTask;
 199 
 200     // carrier thread when mounted, accessed by VM
 201     private volatile Thread carrierThread;
 202 
 203     // termination object when joining, created lazily if needed
 204     private volatile CountDownLatch termination;
 205 
 206     /**
 207      * Returns the default scheduler.
 208      */
 209     static Executor defaultScheduler() {
 210         return DEFAULT_SCHEDULER;
 211     }
 212 
 213     /**
 214      * Returns true if using a custom default scheduler.
 215      */
 216     static boolean isCustomDefaultScheduler() {
 217         return USE_CUSTOM_RUNNER;
 218     }
 219 
 220     /**
 221      * Returns the continuation scope used for virtual threads.
 222      */
 223     static ContinuationScope continuationScope() {
 224         return VTHREAD_SCOPE;
 225     }
 226 
 227     /**
 228      * Return the scheduler for this thread.
 229      */
 230     Executor scheduler() {
 231         return scheduler;
 232     }
 233 
 234     /**
 235      * Creates a new {@code VirtualThread} to run the given task with the given scheduler.
 236      *
 237      * @param scheduler the scheduler or null for default scheduler
 238      * @param name thread name
 239      * @param characteristics characteristics
 240      * @param task the task to execute
 241      */
 242     VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
 243         super(name, characteristics, /*bound*/ false);
 244         Objects.requireNonNull(task);
 245 
 246         // use default scheduler if not provided
 247         if (scheduler == null) {
 248             scheduler = DEFAULT_SCHEDULER;





 249         }
 250 
 251         this.scheduler = scheduler;
 252         this.cont = new VThreadContinuation(this, task);
 253         if (USE_CUSTOM_RUNNER || (scheduler != DEFAULT_SCHEDULER)) {
 254             this.runContinuation = new CustomRunner(this);
 255         } else {
 256             this.runContinuation = this::runContinuation;
 257         }
 258     }
 259 
 260     /**
 261      * The continuation that a virtual thread executes.
 262      */
 263     private static class VThreadContinuation extends Continuation {
 264         VThreadContinuation(VirtualThread vthread, Runnable task) {
 265             super(VTHREAD_SCOPE, wrap(vthread, task));
 266         }
 267         @Override
 268         protected void onPinned(Continuation.Pinned reason) {
 269         }
 270         private static Runnable wrap(VirtualThread vthread, Runnable task) {
 271             return new Runnable() {
 272                 @Hidden
 273                 @JvmtiHideEvents
 274                 public void run() {
 275                     vthread.notifyJvmtiStart(); // notify JVMTI
 276                     try {
 277                         vthread.run(task);
 278                     } finally {
 279                         vthread.notifyJvmtiEnd(); // notify JVMTI
 280                     }
 281                 }
 282             };
 283         }
 284     }
 285 
 286     /**
 287      * The task to execute when using a custom scheduler.
 288      */
 289     private static class CustomRunner implements VirtualThreadTask {
 290         private static final VarHandle ATTACHMENT =
 291                 MhUtil.findVarHandle(MethodHandles.lookup(), "attachment", Object.class);
 292         private final VirtualThread vthread;
 293         private volatile Object attachment;
 294         CustomRunner(VirtualThread vthread) {
 295             this.vthread = vthread;
 296         }
 297         @Override
 298         public void run() {
 299             vthread.runContinuation();
 300         }
 301         @Override
 302         public Thread thread() {
 303             return vthread;
 304         }
 305         @Override
 306         public Object attach(Object ob) {
 307             return ATTACHMENT.getAndSet(this, ob);
 308         }
 309         @Override
 310         public Object attachment() {
 311             return attachment;
 312         }
 313         @Override
 314         public String toString() {
 315             return vthread.toString();
 316         }
 317     }
 318 
 319     /**
 320      * Returns the object attached to the virtual thread's task.
 321      */
 322     Object currentTaskAttachment() {
 323         assert Thread.currentThread() == this;
 324         if (runContinuation instanceof CustomRunner runner) {
 325             return runner.attachment();
 326         } else {
 327             return null;
 328         }
 329     }
 330 
 331     /**
 332      * Runs or continues execution on the current thread. The virtual thread is mounted
 333      * on the current thread before the task runs or continues. It unmounts when the
 334      * task completes or yields.
 335      */
 336     @ChangesCurrentThread // allow mount/unmount to be inlined
 337     private void runContinuation() {
 338         // the carrier must be a platform thread
 339         if (Thread.currentThread().isVirtual()) {
 340             throw new WrongThreadException();
 341         }
 342 
 343         // set state to RUNNING
 344         int initialState = state();
 345         if (initialState == STARTED || initialState == UNPARKED
 346                 || initialState == UNBLOCKED || initialState == YIELDED) {
 347             // newly started or continue after parking/blocking/Thread.yield
 348             if (!compareAndSetState(initialState, RUNNING)) {
 349                 return;
 350             }

 369             unmount();
 370             if (cont.isDone()) {
 371                 afterDone();
 372             } else {
 373                 afterYield();
 374             }
 375         }
 376     }
 377 
 378     /**
 379      * Cancel timeout task when continuing after timed-park or timed-wait.
 380      * The timeout task may be executing, or may have already completed.
 381      */
 382     private void cancelTimeoutTask() {
 383         if (timeoutTask != null) {
 384             timeoutTask.cancel(false);
 385             timeoutTask = null;
 386         }
 387     }
 388 
 389     /**
 390      * Submits the given task to the given executor. If the scheduler is a
 391      * ForkJoinPool then the task is first adapted to a ForkJoinTask.
 392      */
 393     private void submit(Executor executor, Runnable task) {
 394         if (executor instanceof ForkJoinPool pool) {
 395             pool.submit(ForkJoinTask.adapt(task));
 396         } else {
 397             executor.execute(task);
 398         }
 399     }
 400 
 401     /**
 402      * Submits the runContinuation task to the scheduler. For the default scheduler,
 403      * and calling it on a worker thread, the task will be pushed to the local queue,
 404      * otherwise it will be pushed to an external submission queue.
 405      * @param scheduler the scheduler
 406      * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown
 407      * @throws RejectedExecutionException
 408      */
 409     private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) {
 410         boolean done = false;
 411         while (!done) {
 412             try {
 413                 // Pin the continuation to prevent the virtual thread from unmounting
 414                 // when submitting a task. For the default scheduler this ensures that
 415                 // the carrier doesn't change when pushing a task. For other schedulers
 416                 // it avoids deadlock that could arise due to carriers and virtual
 417                 // threads contending for a lock.
 418                 if (currentThread().isVirtual()) {
 419                     Continuation.pin();
 420                     try {
 421                         submit(scheduler, runContinuation);
 422                     } finally {
 423                         Continuation.unpin();
 424                     }
 425                 } else {
 426                     submit(scheduler, runContinuation);
 427                 }
 428                 done = true;
 429             } catch (RejectedExecutionException ree) {
 430                 submitFailed(ree);
 431                 throw ree;
 432             } catch (OutOfMemoryError e) {
 433                 if (retryOnOOME) {
 434                     U.park(false, 100_000_000); // 100ms
 435                 } else {
 436                     throw e;
 437                 }
 438             }
 439         }
 440     }
 441 
 442     /**
 443      * Submits the runContinuation task to the given scheduler as an external submit.
 444      * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
 445      * @throws RejectedExecutionException
 446      * @see ForkJoinPool#externalSubmit(ForkJoinTask)

 668                 submitRunContinuation();
 669             }
 670             return;
 671         }
 672 
 673         // blocking on monitorenter
 674         if (s == BLOCKING) {
 675             setState(BLOCKED);
 676 
 677             // may have been unblocked while blocking
 678             if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
 679                 // lazy submit if local queue is empty
 680                 lazySubmitRunContinuation();
 681             }
 682             return;
 683         }
 684 
 685         // Object.wait
 686         if (s == WAITING || s == TIMED_WAITING) {
 687             int newState;
 688             boolean interruptable = interruptableWait;
 689             if (s == WAITING) {
 690                 setState(newState = WAIT);
 691             } else {
 692                 // For timed-wait, a timeout task is scheduled to execute. The timeout
 693                 // task will change the thread state to UNBLOCKED and submit the thread
 694                 // to the scheduler. A sequence number is used to ensure that the timeout
 695                 // task only unblocks the thread for this timed-wait. We synchronize with
 696                 // the timeout task to coordinate access to the sequence number and to
 697                 // ensure the timeout task doesn't execute until the thread has got to
 698                 // the TIMED_WAIT state.
 699                 long timeout = this.timeout;
 700                 assert timeout > 0;
 701                 synchronized (timedWaitLock()) {
 702                     byte seqNo = ++timedWaitSeqNo;
 703                     timeoutTask = schedule(() -> waitTimeoutExpired(seqNo), timeout, MILLISECONDS);
 704                     setState(newState = TIMED_WAIT);
 705                 }
 706             }
 707 
 708             // may have been notified while in transition to wait state
 709             if (notified && compareAndSetState(newState, BLOCKED)) {
 710                 // may have even been unblocked already
 711                 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
 712                     submitRunContinuation();
 713                 }
 714                 return;
 715             }
 716 
 717             // may have been interrupted while in transition to wait state
 718             if (interruptable && interrupted && compareAndSetState(newState, UNBLOCKED)) {
 719                 submitRunContinuation();
 720                 return;
 721             }
 722             return;
 723         }
 724 
 725         assert false;
 726     }
 727 
 728     /**
 729      * Invoked after the continuation completes.
 730      */
 731     private void afterDone() {
 732         afterDone(true);
 733     }
 734 
 735     /**
 736      * Invoked after the continuation completes (or start failed). Sets the thread
 737      * state to TERMINATED and notifies anyone waiting for the thread to terminate.
 738      *

1484 
1485     @IntrinsicCandidate
1486     @JvmtiMountTransition
1487     private native void notifyJvmtiMount(boolean hide);
1488 
1489     @IntrinsicCandidate
1490     @JvmtiMountTransition
1491     private native void notifyJvmtiUnmount(boolean hide);
1492 
1493     @IntrinsicCandidate
1494     private static native void notifyJvmtiDisableSuspend(boolean enter);
1495 
1496     private static native void registerNatives();
1497     static {
1498         registerNatives();
1499 
1500         // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
1501         var group = Thread.virtualThreadGroup();
1502     }
1503 
1504     /**
1505      * Loads a java.util.concurrent.Executor with the given class name to use at the
1506      * default scheduler. The class is public in an exported package, has a public
1507      * no-arg constructor, and is visible to the system class loader.
1508      */
1509     private static Executor createCustomDefaultScheduler(String cn) {
1510         try {
1511             Class<?> clazz = Class.forName(cn, true, ClassLoader.getSystemClassLoader());
1512             Constructor<?> ctor = clazz.getConstructor();
1513             var scheduler = (Executor) ctor.newInstance();
1514             System.err.println("""
1515                 WARNING: Using custom default scheduler, this is an experimental feature!""");
1516             return scheduler;
1517         } catch (Exception ex) {
1518             throw new Error(ex);
1519         }
1520     }
1521 
1522     /**
1523      * Creates the default ForkJoinPool scheduler.
1524      */
1525     private static ForkJoinPool createDefaultForkJoinPoolScheduler() {
1526         ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool);
1527         int parallelism, maxPoolSize, minRunnable;
1528         String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
1529         String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
1530         String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
1531         if (parallelismValue != null) {
1532             parallelism = Integer.parseInt(parallelismValue);
1533         } else {
1534             parallelism = Runtime.getRuntime().availableProcessors();
1535         }
1536         if (maxPoolSizeValue != null) {
1537             maxPoolSize = Integer.parseInt(maxPoolSizeValue);
1538             parallelism = Integer.min(parallelism, maxPoolSize);
1539         } else {
1540             maxPoolSize = Integer.max(parallelism, 256);
1541         }
1542         if (minRunnableValue != null) {
1543             minRunnable = Integer.parseInt(minRunnableValue);
1544         } else {
1545             minRunnable = Integer.max(parallelism / 2, 1);

1620                 assert changed;
1621                 vthread.unblock();
1622 
1623                 vthread = nextThread;
1624             }
1625         }
1626     }
1627 
1628     /**
1629      * Retrieves the list of virtual threads that are waiting to be unblocked, waiting
1630      * if necessary until a list of one or more threads becomes available.
1631      */
1632     private static native VirtualThread takeVirtualThreadListToUnblock();
1633 
1634     static {
1635         var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
1636                 VirtualThread::unblockVirtualThreads);
1637         unblocker.setDaemon(true);
1638         unblocker.start();
1639     }
1640 }
< prev index next >