1 /*
   2  * Copyright (c) 2018, 2022, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.lang;
  26 
  27 import java.lang.ref.Reference;
  28 import java.security.AccessController;
  29 import java.security.PrivilegedAction;
  30 import java.util.Locale;
  31 import java.util.Objects;
  32 import java.util.concurrent.CountDownLatch;
  33 import java.util.concurrent.Executor;
  34 import java.util.concurrent.Executors;
  35 import java.util.concurrent.ForkJoinPool;
  36 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
  37 import java.util.concurrent.ForkJoinTask;
  38 import java.util.concurrent.ForkJoinWorkerThread;
  39 import java.util.concurrent.Future;
  40 import java.util.concurrent.RejectedExecutionException;
  41 import java.util.concurrent.ScheduledExecutorService;
  42 import java.util.concurrent.ScheduledThreadPoolExecutor;
  43 import jdk.internal.event.ThreadSleepEvent;
  44 import jdk.internal.event.VirtualThreadEndEvent;
  45 import jdk.internal.event.VirtualThreadPinnedEvent;
  46 import jdk.internal.event.VirtualThreadStartEvent;
  47 import jdk.internal.event.VirtualThreadSubmitFailedEvent;
  48 import jdk.internal.misc.CarrierThread;
  49 import jdk.internal.misc.InnocuousThread;
  50 import jdk.internal.misc.Unsafe;
  51 import jdk.internal.vm.Continuation;
  52 import jdk.internal.vm.ContinuationScope;
  53 import jdk.internal.vm.StackableScope;
  54 import jdk.internal.vm.ThreadContainer;
  55 import jdk.internal.vm.ThreadContainers;
  56 import jdk.internal.vm.annotation.ChangesCurrentThread;
  57 import jdk.internal.vm.annotation.ForceInline;
  58 import jdk.internal.vm.annotation.Hidden;
  59 import jdk.internal.vm.annotation.JvmtiMountTransition;
  60 import sun.nio.ch.Interruptible;
  61 import sun.security.action.GetPropertyAction;
  62 import static java.util.concurrent.TimeUnit.*;
  63 
  64 /**
  65  * A thread that is scheduled by the Java virtual machine rather than the operating
  66  * system.
  67  */
  68 final class VirtualThread extends BaseVirtualThread {
  69     private static final Unsafe U = Unsafe.getUnsafe();
  70     private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
  71     private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
  72     private static final ScheduledExecutorService UNPARKER = createDelayedTaskScheduler();
  73     private static final int TRACE_PINNING_MODE = tracePinningMode();
  74 
  75     private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
  76     private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
  77     private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
  78     private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
  79 
  80     // scheduler and continuation
  81     private final Executor scheduler;
  82     private final Continuation cont;
  83     private final Runnable runContinuation;
  84 
  85     // virtual thread state, accessed by VM
  86     private volatile int state;
  87 
  88     /*
  89      * Virtual thread state and transitions:
  90      *
  91      *      NEW -> STARTED         // Thread.start
  92      *  STARTED -> TERMINATED      // failed to start
  93      *  STARTED -> RUNNING         // first run
  94      *
  95      *  RUNNING -> PARKING         // Thread attempts to park
  96      *  PARKING -> PARKED          // cont.yield successful, thread is parked
  97      *  PARKING -> PINNED          // cont.yield failed, thread is pinned
  98      *
  99      *   PARKED -> RUNNABLE        // unpark or interrupted
 100      *   PINNED -> RUNNABLE        // unpark or interrupted
 101      *
 102      * RUNNABLE -> RUNNING         // continue execution
 103      *
 104      *  RUNNING -> YIELDING        // Thread.yield
 105      * YIELDING -> RUNNABLE        // yield successful
 106      * YIELDING -> RUNNING         // yield failed
 107      *
 108      *  RUNNING -> TERMINATED      // done
 109      */
 110     private static final int NEW      = 0;
 111     private static final int STARTED  = 1;
 112     private static final int RUNNABLE = 2;     // runnable-unmounted
 113     private static final int RUNNING  = 3;     // runnable-mounted
 114     private static final int PARKING  = 4;
 115     private static final int PARKED   = 5;     // unmounted
 116     private static final int PINNED   = 6;     // mounted
 117     private static final int YIELDING = 7;     // Thread.yield
 118     private static final int TERMINATED = 99;  // final state
 119 
 120     // can be suspended from scheduling when unmounted
 121     private static final int SUSPENDED = 1 << 8;
 122     private static final int RUNNABLE_SUSPENDED = (RUNNABLE | SUSPENDED);
 123     private static final int PARKED_SUSPENDED   = (PARKED | SUSPENDED);
 124 
 125     // parking permit
 126     private volatile boolean parkPermit;
 127 
 128     // carrier thread when mounted, accessed by VM
 129     private volatile Thread carrierThread;
 130 
 131     // termination object when joining, created lazily if needed
 132     private volatile CountDownLatch termination;
 133 
 134     /**
 135      * Returns the continuation scope used for virtual threads.
 136      */
 137     static ContinuationScope continuationScope() {
 138         return VTHREAD_SCOPE;
 139     }
 140 
 141     /**
 142      * Creates a new {@code VirtualThread} to run the given task with the given
 143      * scheduler. If the given scheduler is {@code null} and the current thread
 144      * is a platform thread then the newly created virtual thread will use the
 145      * default scheduler. If given scheduler is {@code null} and the current
 146      * thread is a virtual thread then the current thread's scheduler is used.
 147      *
 148      * @param scheduler the scheduler or null
 149      * @param name thread name
 150      * @param characteristics characteristics
 151      * @param task the task to execute
 152      */
 153     VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
 154         super(name, characteristics, /*bound*/ false);
 155         Objects.requireNonNull(task);
 156 
 157         // choose scheduler if not specified
 158         if (scheduler == null) {
 159             Thread parent = Thread.currentThread();
 160             if (parent instanceof VirtualThread vparent) {
 161                 scheduler = vparent.scheduler;
 162             } else {
 163                 scheduler = DEFAULT_SCHEDULER;
 164             }
 165         }
 166 
 167         this.scheduler = scheduler;
 168         this.cont = new VThreadContinuation(this, task);
 169         this.runContinuation = this::runContinuation;
 170     }
 171 
 172     /**
 173      * The continuation that a virtual thread executes.
 174      */
 175     private static class VThreadContinuation extends Continuation {
 176         VThreadContinuation(VirtualThread vthread, Runnable task) {
 177             super(VTHREAD_SCOPE, () -> vthread.run(task));
 178         }
 179         @Override
 180         protected void onPinned(Continuation.Pinned reason) {
 181             if (TRACE_PINNING_MODE > 0) {
 182                 boolean printAll = (TRACE_PINNING_MODE == 1);
 183                 PinnedThreadPrinter.printStackTrace(System.out, printAll);
 184             }
 185         }
 186     }
 187 
 188     /**
 189      * Runs or continues execution of the continuation on the current thread.
 190      */
 191     private void runContinuation() {
 192         // the carrier must be a platform thread
 193         if (Thread.currentThread().isVirtual()) {
 194             throw new WrongThreadException();
 195         }
 196 
 197         // set state to RUNNING
 198         boolean firstRun;
 199         int initialState = state();
 200         if (initialState == STARTED && compareAndSetState(STARTED, RUNNING)) {
 201             // first run
 202             firstRun = true;
 203         } else if (initialState == RUNNABLE && compareAndSetState(RUNNABLE, RUNNING)) {
 204             // consume parking permit
 205             setParkPermit(false);
 206             firstRun = false;
 207         } else {
 208             // not runnable
 209             return;
 210         }
 211 
 212         // notify JVMTI before mount
 213         if (notifyJvmtiEvents) notifyJvmtiMountBegin(firstRun);
 214 
 215         try {
 216             cont.run();
 217         } finally {
 218             if (cont.isDone()) {
 219                 afterTerminate(/*executed*/ true);
 220             } else {
 221                 afterYield();
 222             }
 223         }
 224     }
 225 
 226     /**
 227      * Submits the runContinuation task to the scheduler. For the default scheduler,
 228      * and calling it on a worker thread, the task will be pushed to the local queue,
 229      * otherwise it will be pushed to a submission queue.
 230      *
 231      * @throws RejectedExecutionException
 232      */
 233     private void submitRunContinuation() {
 234         try {
 235             scheduler.execute(runContinuation);
 236         } catch (RejectedExecutionException ree) {
 237             submitFailed(ree);
 238             throw ree;
 239         }
 240     }
 241 
 242     /**
 243      * Submits the runContinuation task to the scheduler with a lazy submit.
 244      * @throws RejectedExecutionException
 245      * @see ForkJoinPool#lazySubmit(ForkJoinTask)
 246      */
 247     private void lazySubmitRunContinuation(ForkJoinPool pool) {
 248         try {
 249             pool.lazySubmit(ForkJoinTask.adapt(runContinuation));
 250         } catch (RejectedExecutionException ree) {
 251             submitFailed(ree);
 252             throw ree;
 253         }
 254     }
 255 
 256     /**
 257      * Submits the runContinuation task to the scheduler as an external submit.
 258      * @throws RejectedExecutionException
 259      * @see ForkJoinPool#externalSubmit(ForkJoinTask)
 260      */
 261     private void externalSubmitRunContinuation(ForkJoinPool pool) {
 262         try {
 263             pool.externalSubmit(ForkJoinTask.adapt(runContinuation));
 264         } catch (RejectedExecutionException ree) {
 265             submitFailed(ree);
 266             throw ree;
 267         }
 268     }
 269 
 270     /**
 271      * If enabled, emits a JFR VirtualThreadSubmitFailedEvent.
 272      */
 273     private void submitFailed(RejectedExecutionException ree) {
 274         var event = new VirtualThreadSubmitFailedEvent();
 275         if (event.isEnabled()) {
 276             event.javaThreadId = threadId();
 277             event.exceptionMessage = ree.getMessage();
 278             event.commit();
 279         }
 280     }
 281 
 282     /**
 283      * Runs a task in the context of this virtual thread. The virtual thread is
 284      * mounted on the current (carrier) thread before the task runs. It unmounts
 285      * from its carrier thread when the task completes.
 286      */
 287     @ChangesCurrentThread
 288     private void run(Runnable task) {
 289         assert state == RUNNING;
 290 
 291         // first mount
 292         mount();
 293         if (notifyJvmtiEvents) notifyJvmtiMountEnd(true);
 294 
 295         // emit JFR event if enabled
 296         if (VirtualThreadStartEvent.isTurnedOn()) {
 297             var event = new VirtualThreadStartEvent();
 298             event.javaThreadId = threadId();
 299             event.commit();
 300         }
 301 
 302         Object bindings = scopedValueBindings();
 303         try {
 304             runWith(bindings, task);
 305         } catch (Throwable exc) {
 306             dispatchUncaughtException(exc);
 307         } finally {
 308             try {
 309                 // pop any remaining scopes from the stack, this may block
 310                 StackableScope.popAll();
 311 
 312                 // emit JFR event if enabled
 313                 if (VirtualThreadEndEvent.isTurnedOn()) {
 314                     var event = new VirtualThreadEndEvent();
 315                     event.javaThreadId = threadId();
 316                     event.commit();
 317                 }
 318 
 319             } finally {
 320                 // last unmount
 321                 if (notifyJvmtiEvents) notifyJvmtiUnmountBegin(true);
 322                 unmount();
 323 
 324                 // final state
 325                 setState(TERMINATED);
 326             }
 327         }
 328     }
 329 
 330     @Hidden
 331     @ForceInline
 332     private void runWith(Object bindings, Runnable op) {
 333         ensureMaterializedForStackWalk(bindings);
 334         op.run();
 335         Reference.reachabilityFence(bindings);
 336     }
 337 
 338     /**
 339      * Mounts this virtual thread onto the current platform thread. On
 340      * return, the current thread is the virtual thread.
 341      */
 342     @ChangesCurrentThread
 343     private void mount() {
 344         // sets the carrier thread
 345         Thread carrier = Thread.currentCarrierThread();
 346         setCarrierThread(carrier);
 347 
 348         // sync up carrier thread interrupt status if needed
 349         if (interrupted) {
 350             carrier.setInterrupt();
 351         } else if (carrier.isInterrupted()) {
 352             synchronized (interruptLock) {
 353                 // need to recheck interrupt status
 354                 if (!interrupted) {
 355                     carrier.clearInterrupt();
 356                 }
 357             }
 358         }
 359 
 360         // set Thread.currentThread() to return this virtual thread
 361         carrier.setCurrentThread(this);
 362     }
 363 
 364     /**
 365      * Unmounts this virtual thread from the carrier. On return, the
 366      * current thread is the current platform thread.
 367      */
 368     @ChangesCurrentThread
 369     private void unmount() {
 370         // set Thread.currentThread() to return the platform thread
 371         Thread carrier = this.carrierThread;
 372         carrier.setCurrentThread(carrier);
 373 
 374         // break connection to carrier thread, synchronized with interrupt
 375         synchronized (interruptLock) {
 376             setCarrierThread(null);
 377         }
 378         carrier.clearInterrupt();
 379     }
 380 
 381     /**
 382      * Sets the current thread to the current carrier thread.
 383      * @return true if JVMTI was notified
 384      */
 385     @ChangesCurrentThread
 386     @JvmtiMountTransition
 387     private boolean switchToCarrierThread() {
 388         boolean notifyJvmti = notifyJvmtiEvents;
 389         if (notifyJvmti) {
 390             notifyJvmtiHideFrames(true);
 391         }
 392         Thread carrier = this.carrierThread;
 393         assert Thread.currentThread() == this
 394                 && carrier == Thread.currentCarrierThread();
 395         carrier.setCurrentThread(carrier);
 396         return notifyJvmti;
 397     }
 398 
 399     /**
 400      * Sets the current thread to the given virtual thread.
 401      * If {@code notifyJvmti} is true then JVMTI is notified.
 402      */
 403     @ChangesCurrentThread
 404     @JvmtiMountTransition
 405     private void switchToVirtualThread(VirtualThread vthread, boolean notifyJvmti) {
 406         Thread carrier = vthread.carrierThread;
 407         assert carrier == Thread.currentCarrierThread();
 408         carrier.setCurrentThread(vthread);
 409         if (notifyJvmti) {
 410             notifyJvmtiHideFrames(false);
 411         }
 412     }
 413 
 414     /**
 415      * Unmounts this virtual thread, invokes Continuation.yield, and re-mounts the
 416      * thread when continued. When enabled, JVMTI must be notified from this method.
 417      * @return true if the yield was successful
 418      */
 419     @ChangesCurrentThread
 420     private boolean yieldContinuation() {
 421         // unmount
 422         if (notifyJvmtiEvents) notifyJvmtiUnmountBegin(false);
 423         unmount();
 424         try {
 425             return Continuation.yield(VTHREAD_SCOPE);
 426         } finally {
 427             // re-mount
 428             mount();
 429             if (notifyJvmtiEvents) notifyJvmtiMountEnd(false);
 430         }
 431     }
 432 
 433     /**
 434      * Invoked after the continuation yields. If parking then it sets the state
 435      * and also re-submits the task to continue if unparked while parking.
 436      * If yielding due to Thread.yield then it just submits the task to continue.
 437      */
 438     private void afterYield() {
 439         int s = state();
 440         assert (s == PARKING || s == YIELDING) && (carrierThread == null);
 441 
 442         if (s == PARKING) {
 443             setState(PARKED);
 444 
 445             // notify JVMTI that unmount has completed, thread is parked
 446             if (notifyJvmtiEvents) notifyJvmtiUnmountEnd(false);
 447 
 448             // may have been unparked while parking
 449             if (parkPermit && compareAndSetState(PARKED, RUNNABLE)) {
 450                 // lazy submit to continue on the current thread as carrier if possible
 451                 if (currentThread() instanceof CarrierThread ct) {
 452                     lazySubmitRunContinuation(ct.getPool());
 453                 } else {
 454                     submitRunContinuation();
 455                 }
 456 
 457             }
 458         } else if (s == YIELDING) {   // Thread.yield
 459             setState(RUNNABLE);
 460 
 461             // notify JVMTI that unmount has completed, thread is runnable
 462             if (notifyJvmtiEvents) notifyJvmtiUnmountEnd(false);
 463 
 464             // external submit if there are no tasks in the local task queue
 465             if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
 466                 externalSubmitRunContinuation(ct.getPool());
 467             } else {
 468                 submitRunContinuation();
 469             }
 470         }
 471     }
 472 
 473     /**
 474      * Invoked after the thread terminates (or start failed). This method
 475      * notifies anyone waiting for the thread to terminate.
 476      *
 477      * @param executed true if the thread executed, false if it failed to start
 478      */
 479     private void afterTerminate(boolean executed) {
 480         assert (state() == TERMINATED) && (carrierThread == null);
 481 
 482         if (executed) {
 483             if (notifyJvmtiEvents) notifyJvmtiUnmountEnd(true);
 484         }
 485 
 486         // notify anyone waiting for this virtual thread to terminate
 487         CountDownLatch termination = this.termination;
 488         if (termination != null) {
 489             assert termination.getCount() == 1;
 490             termination.countDown();
 491         }
 492 
 493         if (executed) {
 494             // notify container if thread executed
 495             threadContainer().onExit(this);
 496 
 497             // clear references to thread locals
 498             clearReferences();
 499         }
 500     }
 501 
 502     /**
 503      * Schedules this {@code VirtualThread} to execute.
 504      *
 505      * @throws IllegalStateException if the container is shutdown or closed
 506      * @throws IllegalThreadStateException if the thread has already been started
 507      * @throws RejectedExecutionException if the scheduler cannot accept a task
 508      */
 509     @Override
 510     void start(ThreadContainer container) {
 511         if (!compareAndSetState(NEW, STARTED)) {
 512             throw new IllegalThreadStateException("Already started");
 513         }
 514 
 515         // bind thread to container
 516         setThreadContainer(container);
 517 
 518         // start thread
 519         boolean started = false;
 520         container.onStart(this); // may throw
 521         try {
 522             // scoped values may be inherited
 523             inheritScopedValueBindings(container);
 524 
 525             // submit task to run thread
 526             submitRunContinuation();
 527             started = true;
 528         } finally {
 529             if (!started) {
 530                 setState(TERMINATED);
 531                 container.onExit(this);
 532                 afterTerminate(/*executed*/ false);
 533             }
 534         }
 535     }
 536 
 537     @Override
 538     public void start() {
 539         start(ThreadContainers.root());
 540     }
 541 
 542     @Override
 543     public void run() {
 544         // do nothing
 545     }
 546 
 547     /**
 548      * Parks until unparked or interrupted. If already unparked then the parking
 549      * permit is consumed and this method completes immediately (meaning it doesn't
 550      * yield). It also completes immediately if the interrupt status is set.
 551      */
 552     @Override
 553     void park() {
 554         assert Thread.currentThread() == this;
 555 
 556         // complete immediately if parking permit available or interrupted
 557         if (getAndSetParkPermit(false) || interrupted)
 558             return;
 559 
 560         // park the thread
 561         setState(PARKING);
 562         try {
 563             if (!yieldContinuation()) {
 564                 // park on the carrier thread when pinned
 565                 parkOnCarrierThread(false, 0);
 566             }
 567         } finally {
 568             assert (Thread.currentThread() == this) && (state() == RUNNING);
 569         }
 570     }
 571 
 572     /**
 573      * Parks up to the given waiting time or until unparked or interrupted.
 574      * If already unparked then the parking permit is consumed and this method
 575      * completes immediately (meaning it doesn't yield). It also completes immediately
 576      * if the interrupt status is set or the waiting time is {@code <= 0}.
 577      *
 578      * @param nanos the maximum number of nanoseconds to wait.
 579      */
 580     @Override
 581     void parkNanos(long nanos) {
 582         assert Thread.currentThread() == this;
 583 
 584         // complete immediately if parking permit available or interrupted
 585         if (getAndSetParkPermit(false) || interrupted)
 586             return;
 587 
 588         // park the thread for the waiting time
 589         if (nanos > 0) {
 590             long startTime = System.nanoTime();
 591 
 592             boolean yielded;
 593             Future<?> unparker = scheduleUnpark(this::unpark, nanos);
 594             setState(PARKING);
 595             try {
 596                 yielded = yieldContinuation();
 597             } finally {
 598                 assert (Thread.currentThread() == this)
 599                         && (state() == RUNNING || state() == PARKING);
 600                 cancel(unparker);
 601             }
 602 
 603             // park on carrier thread for remaining time when pinned
 604             if (!yielded) {
 605                 long deadline = startTime + nanos;
 606                 if (deadline < 0L)
 607                     deadline = Long.MAX_VALUE;
 608                 parkOnCarrierThread(true, deadline - System.nanoTime());
 609             }
 610         }
 611     }
 612 
 613     /**
 614      * Parks the current carrier thread up to the given waiting time or until
 615      * unparked or interrupted. If the virtual thread is interrupted then the
 616      * interrupt status will be propagated to the carrier thread.
 617      * @param timed true for a timed park, false for untimed
 618      * @param nanos the waiting time in nanoseconds
 619      */
 620     private void parkOnCarrierThread(boolean timed, long nanos) {
 621         assert state() == PARKING;
 622 
 623         var pinnedEvent = new VirtualThreadPinnedEvent();
 624         pinnedEvent.begin();
 625 
 626         setState(PINNED);
 627         try {
 628             if (!parkPermit) {
 629                 if (!timed) {
 630                     U.park(false, 0);
 631                 } else if (nanos > 0) {
 632                     U.park(false, nanos);
 633                 }
 634             }
 635         } finally {
 636             setState(RUNNING);
 637         }
 638 
 639         // consume parking permit
 640         setParkPermit(false);
 641 
 642         pinnedEvent.commit();
 643     }
 644 
 645     /**
 646      * Schedule an unpark task to run after a given delay.
 647      */
 648     @ChangesCurrentThread
 649     private Future<?> scheduleUnpark(Runnable unparker, long nanos) {
 650         // need to switch to current carrier thread to avoid nested parking
 651         boolean notifyJvmti = switchToCarrierThread();
 652         try {
 653             return UNPARKER.schedule(unparker, nanos, NANOSECONDS);
 654         } finally {
 655             switchToVirtualThread(this, notifyJvmti);
 656         }
 657     }
 658 
 659     /**
 660      * Cancels a task if it has not completed.
 661      */
 662     @ChangesCurrentThread
 663     private void cancel(Future<?> future) {
 664         if (!future.isDone()) {
 665             // need to switch to current carrier thread to avoid nested parking
 666             boolean notifyJvmti = switchToCarrierThread();
 667             try {
 668                 future.cancel(false);
 669             } finally {
 670                 switchToVirtualThread(this, notifyJvmti);
 671             }
 672         }
 673     }
 674 
 675     /**
 676      * Re-enables this virtual thread for scheduling. If the virtual thread was
 677      * {@link #park() parked} then it will be unblocked, otherwise its next call
 678      * to {@code park} or {@linkplain #parkNanos(long) parkNanos} is guaranteed
 679      * not to block.
 680      * @throws RejectedExecutionException if the scheduler cannot accept a task
 681      */
 682     @Override
 683     @ChangesCurrentThread
 684     void unpark() {
 685         Thread currentThread = Thread.currentThread();
 686         if (!getAndSetParkPermit(true) && currentThread != this) {
 687             int s = state();
 688             if (s == PARKED && compareAndSetState(PARKED, RUNNABLE)) {
 689                 if (currentThread instanceof VirtualThread vthread) {
 690                     boolean notifyJvmti = vthread.switchToCarrierThread();
 691                     try {
 692                         submitRunContinuation();
 693                     } finally {
 694                         switchToVirtualThread(vthread, notifyJvmti);
 695                     }
 696                 } else {
 697                     submitRunContinuation();
 698                 }
 699             } else if (s == PINNED) {
 700                 // unpark carrier thread when pinned.
 701                 synchronized (carrierThreadAccessLock()) {
 702                     Thread carrier = carrierThread;
 703                     if (carrier != null && state() == PINNED) {
 704                         U.unpark(carrier);
 705                     }
 706                 }
 707             }
 708         }
 709     }
 710 
 711     /**
 712      * Attempts to yield the current virtual thread (Thread.yield).
 713      */
 714     void tryYield() {
 715         assert Thread.currentThread() == this;
 716         setState(YIELDING);
 717         try {
 718             yieldContinuation();
 719         } finally {
 720             assert Thread.currentThread() == this;
 721             if (state() != RUNNING) {
 722                 assert state() == YIELDING;
 723                 setState(RUNNING);
 724             }
 725         }
 726     }
 727 
 728     /**
 729      * Sleep the current virtual thread for the given sleep time.
 730      *
 731      * @param nanos the maximum number of nanoseconds to sleep
 732      * @throws InterruptedException if interrupted while sleeping
 733      */
 734     void sleepNanos(long nanos) throws InterruptedException {
 735         assert Thread.currentThread() == this;
 736         if (nanos >= 0) {
 737             if (ThreadSleepEvent.isTurnedOn()) {
 738                 ThreadSleepEvent event = new ThreadSleepEvent();
 739                 try {
 740                     event.time = nanos;
 741                     event.begin();
 742                     doSleepNanos(nanos);
 743                 } finally {
 744                     event.commit();
 745                 }
 746             } else {
 747                 doSleepNanos(nanos);
 748             }
 749         }
 750     }
 751 
 752     /**
 753      * Sleep the current thread for the given sleep time (in nanoseconds). If
 754      * nanos is 0 then the thread will attempt to yield.
 755      *
 756      * @implNote This implementation parks the thread for the given sleeping time
 757      * and will therefore be observed in PARKED state during the sleep. Parking
 758      * will consume the parking permit so this method makes available the parking
 759      * permit after the sleep. This may be observed as a spurious, but benign,
 760      * wakeup when the thread subsequently attempts to park.
 761      */
 762     private void doSleepNanos(long nanos) throws InterruptedException {
 763         assert nanos >= 0;
 764         if (getAndClearInterrupt())
 765             throw new InterruptedException();
 766         if (nanos == 0) {
 767             tryYield();
 768         } else {
 769             // park for the sleep time
 770             try {
 771                 long remainingNanos = nanos;
 772                 long startNanos = System.nanoTime();
 773                 while (remainingNanos > 0) {
 774                     parkNanos(remainingNanos);
 775                     if (getAndClearInterrupt()) {
 776                         throw new InterruptedException();
 777                     }
 778                     remainingNanos = nanos - (System.nanoTime() - startNanos);
 779                 }
 780             } finally {
 781                 // may have been unparked while sleeping
 782                 setParkPermit(true);
 783             }
 784         }
 785     }
 786 
 787     /**
 788      * Waits up to {@code nanos} nanoseconds for this virtual thread to terminate.
 789      * A timeout of {@code 0} means to wait forever.
 790      *
 791      * @throws InterruptedException if interrupted while waiting
 792      * @return true if the thread has terminated
 793      */
 794     boolean joinNanos(long nanos) throws InterruptedException {
 795         if (state() == TERMINATED)
 796             return true;
 797 
 798         // ensure termination object exists, then re-check state
 799         CountDownLatch termination = getTermination();
 800         if (state() == TERMINATED)
 801             return true;
 802 
 803         // wait for virtual thread to terminate
 804         if (nanos == 0) {
 805             termination.await();
 806         } else {
 807             boolean terminated = termination.await(nanos, NANOSECONDS);
 808             if (!terminated) {
 809                 // waiting time elapsed
 810                 return false;
 811             }
 812         }
 813         assert state() == TERMINATED;
 814         return true;
 815     }
 816 
 817     @Override
 818     @SuppressWarnings("removal")
 819     public void interrupt() {
 820         if (Thread.currentThread() != this) {
 821             checkAccess();
 822             synchronized (interruptLock) {
 823                 interrupted = true;
 824                 Interruptible b = nioBlocker;
 825                 if (b != null) {
 826                     b.interrupt(this);
 827                 }
 828 
 829                 // interrupt carrier thread if mounted
 830                 Thread carrier = carrierThread;
 831                 if (carrier != null) carrier.setInterrupt();
 832             }
 833         } else {
 834             interrupted = true;
 835             carrierThread.setInterrupt();
 836         }
 837         unpark();
 838     }
 839 
 840     @Override
 841     public boolean isInterrupted() {
 842         return interrupted;
 843     }
 844 
 845     @Override
 846     boolean getAndClearInterrupt() {
 847         assert Thread.currentThread() == this;
 848         synchronized (interruptLock) {
 849             boolean oldValue = interrupted;
 850             if (oldValue)
 851                 interrupted = false;
 852             carrierThread.clearInterrupt();
 853             return oldValue;
 854         }
 855     }
 856 
 857     @Override
 858     Thread.State threadState() {
 859         switch (state()) {
 860             case NEW:
 861                 return Thread.State.NEW;
 862             case STARTED:
 863                 // return NEW if thread container not yet set
 864                 if (threadContainer() == null) {
 865                     return Thread.State.NEW;
 866                 } else {
 867                     return Thread.State.RUNNABLE;
 868                 }
 869             case RUNNABLE:
 870             case RUNNABLE_SUSPENDED:
 871                 // runnable, not mounted
 872                 return Thread.State.RUNNABLE;
 873             case RUNNING:
 874                 // if mounted then return state of carrier thread
 875                 synchronized (carrierThreadAccessLock()) {
 876                     Thread carrierThread = this.carrierThread;
 877                     if (carrierThread != null) {
 878                         return carrierThread.threadState();
 879                     }
 880                 }
 881                 // runnable, mounted
 882                 return Thread.State.RUNNABLE;
 883             case PARKING:
 884             case YIELDING:
 885                 // runnable, mounted, not yet waiting
 886                 return Thread.State.RUNNABLE;
 887             case PARKED:
 888             case PARKED_SUSPENDED:
 889             case PINNED:
 890                 return Thread.State.WAITING;
 891             case TERMINATED:
 892                 return Thread.State.TERMINATED;
 893             default:
 894                 throw new InternalError();
 895         }
 896     }
 897 
 898     @Override
 899     boolean alive() {
 900         int s = state;
 901         return (s != NEW && s != TERMINATED);
 902     }
 903 
 904     @Override
 905     boolean isTerminated() {
 906         return (state == TERMINATED);
 907     }
 908 
 909     @Override
 910     StackTraceElement[] asyncGetStackTrace() {
 911         StackTraceElement[] stackTrace;
 912         do {
 913             stackTrace = (carrierThread != null)
 914                     ? super.asyncGetStackTrace()  // mounted
 915                     : tryGetStackTrace();         // unmounted
 916             if (stackTrace == null) {
 917                 Thread.yield();
 918             }
 919         } while (stackTrace == null);
 920         return stackTrace;
 921     }
 922 
 923     /**
 924      * Returns the stack trace for this virtual thread if it is unmounted.
 925      * Returns null if the thread is in another state.
 926      */
 927     private StackTraceElement[] tryGetStackTrace() {
 928         int initialState = state();
 929         return switch (initialState) {
 930             case RUNNABLE, PARKED -> {
 931                 int suspendedState = initialState | SUSPENDED;
 932                 if (compareAndSetState(initialState, suspendedState)) {
 933                     try {
 934                         yield cont.getStackTrace();
 935                     } finally {
 936                         assert state == suspendedState;
 937                         setState(initialState);
 938 
 939                         // re-submit if runnable
 940                         // re-submit if unparked while suspended
 941                         if (initialState == RUNNABLE
 942                             || (parkPermit && compareAndSetState(PARKED, RUNNABLE))) {
 943                             try {
 944                                 submitRunContinuation();
 945                             } catch (RejectedExecutionException ignore) { }
 946                         }
 947                     }
 948                 }
 949                 yield null;
 950             }
 951             case NEW, STARTED, TERMINATED ->  new StackTraceElement[0];  // empty stack
 952             default -> null;
 953         };
 954     }
 955 
 956     @Override
 957     public String toString() {
 958         StringBuilder sb = new StringBuilder("VirtualThread[#");
 959         sb.append(threadId());
 960         String name = getName();
 961         if (!name.isEmpty()) {
 962             sb.append(",");
 963             sb.append(name);
 964         }
 965         sb.append("]/");
 966         Thread carrier = carrierThread;
 967         if (carrier != null) {
 968             // include the carrier thread state and name when mounted
 969             synchronized (carrierThreadAccessLock()) {
 970                 carrier = carrierThread;
 971                 if (carrier != null) {
 972                     String stateAsString = carrier.threadState().toString();
 973                     sb.append(stateAsString.toLowerCase(Locale.ROOT));
 974                     sb.append('@');
 975                     sb.append(carrier.getName());
 976                 }
 977             }
 978         }
 979         // include virtual thread state when not mounted
 980         if (carrier == null) {
 981             String stateAsString = threadState().toString();
 982             sb.append(stateAsString.toLowerCase(Locale.ROOT));
 983         }
 984         return sb.toString();
 985     }
 986 
 987     @Override
 988     public int hashCode() {
 989         return (int) threadId();
 990     }
 991 
 992     @Override
 993     public boolean equals(Object obj) {
 994         return obj == this;
 995     }
 996 
 997     /**
 998      * Returns the termination object, creating it if needed.
 999      */
1000     private CountDownLatch getTermination() {
1001         CountDownLatch termination = this.termination;
1002         if (termination == null) {
1003             termination = new CountDownLatch(1);
1004             if (!U.compareAndSetReference(this, TERMINATION, null, termination)) {
1005                 termination = this.termination;
1006             }
1007         }
1008         return termination;
1009     }
1010 
1011     /**
1012      * Returns the lock object to synchronize on when accessing carrierThread.
1013      * The lock prevents carrierThread from being reset to null during unmount.
1014      */
1015     private Object carrierThreadAccessLock() {
1016         // return interruptLock as unmount has to coordinate with interrupt
1017         return interruptLock;
1018     }
1019 
1020     // -- wrappers for get/set of state, parking permit, and carrier thread --
1021 
1022     private int state() {
1023         return state;  // volatile read
1024     }
1025 
1026     private void setState(int newValue) {
1027         state = newValue;  // volatile write
1028     }
1029 
1030     private boolean compareAndSetState(int expectedValue, int newValue) {
1031         return U.compareAndSetInt(this, STATE, expectedValue, newValue);
1032     }
1033 
1034     private void setParkPermit(boolean newValue) {
1035         if (parkPermit != newValue) {
1036             parkPermit = newValue;
1037         }
1038     }
1039 
1040     private boolean getAndSetParkPermit(boolean newValue) {
1041         if (parkPermit != newValue) {
1042             return U.getAndSetBoolean(this, PARK_PERMIT, newValue);
1043         } else {
1044             return newValue;
1045         }
1046     }
1047 
1048     private void setCarrierThread(Thread carrier) {
1049         // U.putReferenceRelease(this, CARRIER_THREAD, carrier);
1050         this.carrierThread = carrier;
1051     }
1052 
1053     // -- JVM TI support --
1054 
1055     private static volatile boolean notifyJvmtiEvents;  // set by VM
1056 
1057     @JvmtiMountTransition
1058     private native void notifyJvmtiMountBegin(boolean firstMount);
1059 
1060     @JvmtiMountTransition
1061     private native void notifyJvmtiMountEnd(boolean firstMount);
1062 
1063     @JvmtiMountTransition
1064     private native void notifyJvmtiUnmountBegin(boolean lastUnmount);
1065 
1066     @JvmtiMountTransition
1067     private native void notifyJvmtiUnmountEnd(boolean lastUnmount);
1068 
1069     @JvmtiMountTransition
1070     private native void notifyJvmtiHideFrames(boolean hide);
1071 
1072     private static native void registerNatives();
1073     static {
1074         registerNatives();
1075     }
1076 
1077     /**
1078      * Creates the default scheduler.
1079      */
1080     @SuppressWarnings("removal")
1081     private static ForkJoinPool createDefaultScheduler() {
1082         ForkJoinWorkerThreadFactory factory = pool -> {
1083             PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread(pool);
1084             return AccessController.doPrivileged(pa);
1085         };
1086         PrivilegedAction<ForkJoinPool> pa = () -> {
1087             int parallelism, maxPoolSize, minRunnable;
1088             String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
1089             String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
1090             String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
1091             if (parallelismValue != null) {
1092                 parallelism = Integer.parseInt(parallelismValue);
1093             } else {
1094                 parallelism = Runtime.getRuntime().availableProcessors();
1095             }
1096             if (maxPoolSizeValue != null) {
1097                 maxPoolSize = Integer.parseInt(maxPoolSizeValue);
1098                 parallelism = Integer.min(parallelism, maxPoolSize);
1099             } else {
1100                 maxPoolSize = Integer.max(parallelism, 256);
1101             }
1102             if (minRunnableValue != null) {
1103                 minRunnable = Integer.parseInt(minRunnableValue);
1104             } else {
1105                 minRunnable = Integer.max(parallelism / 2, 1);
1106             }
1107             Thread.UncaughtExceptionHandler handler = (t, e) -> { };
1108             boolean asyncMode = true; // FIFO
1109             return new ForkJoinPool(parallelism, factory, handler, asyncMode,
1110                          0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);
1111         };
1112         return AccessController.doPrivileged(pa);
1113     }
1114 
1115     /**
1116      * Creates the ScheduledThreadPoolExecutor used for timed unpark.
1117      */
1118     private static ScheduledExecutorService createDelayedTaskScheduler() {
1119         String propValue = GetPropertyAction.privilegedGetProperty("jdk.unparker.maxPoolSize");
1120         int poolSize;
1121         if (propValue != null) {
1122             poolSize = Integer.parseInt(propValue);
1123         } else {
1124             poolSize = 1;
1125         }
1126         ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor)
1127             Executors.newScheduledThreadPool(poolSize, task -> {
1128                 return InnocuousThread.newThread("VirtualThread-unparker", task);
1129             });
1130         stpe.setRemoveOnCancelPolicy(true);
1131         return stpe;
1132     }
1133 
1134     /**
1135      * Reads the value of the jdk.tracePinnedThreads property to determine if stack
1136      * traces should be printed when a carrier thread is pinned when a virtual thread
1137      * attempts to park.
1138      */
1139     private static int tracePinningMode() {
1140         String propValue = GetPropertyAction.privilegedGetProperty("jdk.tracePinnedThreads");
1141         if (propValue != null) {
1142             if (propValue.length() == 0 || "full".equalsIgnoreCase(propValue))
1143                 return 1;
1144             if ("short".equalsIgnoreCase(propValue))
1145                 return 2;
1146         }
1147         return 0;
1148     }
1149 }