1 /* 2 * Copyright (c) 2018, 2022, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.lang; 26 27 import java.lang.ref.Reference; 28 import java.security.AccessController; 29 import java.security.PrivilegedAction; 30 import java.util.Locale; 31 import java.util.Objects; 32 import java.util.concurrent.CountDownLatch; 33 import java.util.concurrent.Executor; 34 import java.util.concurrent.Executors; 35 import java.util.concurrent.ForkJoinPool; 36 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory; 37 import java.util.concurrent.ForkJoinTask; 38 import java.util.concurrent.ForkJoinWorkerThread; 39 import java.util.concurrent.Future; 40 import java.util.concurrent.RejectedExecutionException; 41 import java.util.concurrent.ScheduledExecutorService; 42 import java.util.concurrent.ScheduledThreadPoolExecutor; 43 import jdk.internal.event.ThreadSleepEvent; 44 import jdk.internal.event.VirtualThreadEndEvent; 45 import jdk.internal.event.VirtualThreadPinnedEvent; 46 import jdk.internal.event.VirtualThreadStartEvent; 47 import jdk.internal.event.VirtualThreadSubmitFailedEvent; 48 import jdk.internal.misc.CarrierThread; 49 import jdk.internal.misc.InnocuousThread; 50 import jdk.internal.misc.Unsafe; 51 import jdk.internal.vm.Continuation; 52 import jdk.internal.vm.ContinuationScope; 53 import jdk.internal.vm.StackableScope; 54 import jdk.internal.vm.ThreadContainer; 55 import jdk.internal.vm.ThreadContainers; 56 import jdk.internal.vm.annotation.ChangesCurrentThread; 57 import jdk.internal.vm.annotation.ForceInline; 58 import jdk.internal.vm.annotation.Hidden; 59 import jdk.internal.vm.annotation.JvmtiMountTransition; 60 import sun.nio.ch.Interruptible; 61 import sun.security.action.GetPropertyAction; 62 import static java.util.concurrent.TimeUnit.*; 63 64 /** 65 * A thread that is scheduled by the Java virtual machine rather than the operating 66 * system. 67 */ 68 final class VirtualThread extends BaseVirtualThread { 69 private static final Unsafe U = Unsafe.getUnsafe(); 70 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads"); 71 private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler(); 72 private static final ScheduledExecutorService UNPARKER = createDelayedTaskScheduler(); 73 private static final int TRACE_PINNING_MODE = tracePinningMode(); 74 75 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state"); 76 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit"); 77 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread"); 78 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination"); 79 80 // scheduler and continuation 81 private final Executor scheduler; 82 private final Continuation cont; 83 private final Runnable runContinuation; 84 85 // virtual thread state, accessed by VM 86 private volatile int state; 87 88 /* 89 * Virtual thread state and transitions: 90 * 91 * NEW -> STARTED // Thread.start 92 * STARTED -> TERMINATED // failed to start 93 * STARTED -> RUNNING // first run 94 * 95 * RUNNING -> PARKING // Thread attempts to park 96 * PARKING -> PARKED // cont.yield successful, thread is parked 97 * PARKING -> PINNED // cont.yield failed, thread is pinned 98 * 99 * PARKED -> RUNNABLE // unpark or interrupted 100 * PINNED -> RUNNABLE // unpark or interrupted 101 * 102 * RUNNABLE -> RUNNING // continue execution 103 * 104 * RUNNING -> YIELDING // Thread.yield 105 * YIELDING -> RUNNABLE // yield successful 106 * YIELDING -> RUNNING // yield failed 107 * 108 * RUNNING -> TERMINATED // done 109 */ 110 private static final int NEW = 0; 111 private static final int STARTED = 1; 112 private static final int RUNNABLE = 2; // runnable-unmounted 113 private static final int RUNNING = 3; // runnable-mounted 114 private static final int PARKING = 4; 115 private static final int PARKED = 5; // unmounted 116 private static final int PINNED = 6; // mounted 117 private static final int YIELDING = 7; // Thread.yield 118 private static final int TERMINATED = 99; // final state 119 120 // can be suspended from scheduling when unmounted 121 private static final int SUSPENDED = 1 << 8; 122 private static final int RUNNABLE_SUSPENDED = (RUNNABLE | SUSPENDED); 123 private static final int PARKED_SUSPENDED = (PARKED | SUSPENDED); 124 125 // parking permit 126 private volatile boolean parkPermit; 127 128 // carrier thread when mounted, accessed by VM 129 private volatile Thread carrierThread; 130 131 // termination object when joining, created lazily if needed 132 private volatile CountDownLatch termination; 133 134 /** 135 * Returns the continuation scope used for virtual threads. 136 */ 137 static ContinuationScope continuationScope() { 138 return VTHREAD_SCOPE; 139 } 140 141 /** 142 * Creates a new {@code VirtualThread} to run the given task with the given 143 * scheduler. If the given scheduler is {@code null} and the current thread 144 * is a platform thread then the newly created virtual thread will use the 145 * default scheduler. If given scheduler is {@code null} and the current 146 * thread is a virtual thread then the current thread's scheduler is used. 147 * 148 * @param scheduler the scheduler or null 149 * @param name thread name 150 * @param characteristics characteristics 151 * @param task the task to execute 152 */ 153 VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) { 154 super(name, characteristics, /*bound*/ false); 155 Objects.requireNonNull(task); 156 157 // choose scheduler if not specified 158 if (scheduler == null) { 159 Thread parent = Thread.currentThread(); 160 if (parent instanceof VirtualThread vparent) { 161 scheduler = vparent.scheduler; 162 } else { 163 scheduler = DEFAULT_SCHEDULER; 164 } 165 } 166 167 this.scheduler = scheduler; 168 this.cont = new VThreadContinuation(this, task); 169 this.runContinuation = this::runContinuation; 170 } 171 172 /** 173 * The continuation that a virtual thread executes. 174 */ 175 private static class VThreadContinuation extends Continuation { 176 VThreadContinuation(VirtualThread vthread, Runnable task) { 177 super(VTHREAD_SCOPE, () -> vthread.run(task)); 178 } 179 @Override 180 protected void onPinned(Continuation.Pinned reason) { 181 if (TRACE_PINNING_MODE > 0) { 182 boolean printAll = (TRACE_PINNING_MODE == 1); 183 PinnedThreadPrinter.printStackTrace(System.out, printAll); 184 } 185 } 186 } 187 188 /** 189 * Runs or continues execution of the continuation on the current thread. 190 */ 191 private void runContinuation() { 192 // the carrier must be a platform thread 193 if (Thread.currentThread().isVirtual()) { 194 throw new WrongThreadException(); 195 } 196 197 // set state to RUNNING 198 boolean firstRun; 199 int initialState = state(); 200 if (initialState == STARTED && compareAndSetState(STARTED, RUNNING)) { 201 // first run 202 firstRun = true; 203 } else if (initialState == RUNNABLE && compareAndSetState(RUNNABLE, RUNNING)) { 204 // consume parking permit 205 setParkPermit(false); 206 firstRun = false; 207 } else { 208 // not runnable 209 return; 210 } 211 212 // notify JVMTI before mount 213 if (notifyJvmtiEvents) notifyJvmtiMountBegin(firstRun); 214 215 try { 216 cont.run(); 217 } finally { 218 if (cont.isDone()) { 219 afterTerminate(/*executed*/ true); 220 } else { 221 afterYield(); 222 } 223 } 224 } 225 226 /** 227 * Submits the runContinuation task to the scheduler. For the default scheduler, 228 * and calling it on a worker thread, the task will be pushed to the local queue, 229 * otherwise it will be pushed to a submission queue. 230 * 231 * @throws RejectedExecutionException 232 */ 233 private void submitRunContinuation() { 234 try { 235 scheduler.execute(runContinuation); 236 } catch (RejectedExecutionException ree) { 237 submitFailed(ree); 238 throw ree; 239 } 240 } 241 242 /** 243 * Submits the runContinuation task to the scheduler with a lazy submit. 244 * @throws RejectedExecutionException 245 * @see ForkJoinPool#lazySubmit(ForkJoinTask) 246 */ 247 private void lazySubmitRunContinuation(ForkJoinPool pool) { 248 try { 249 pool.lazySubmit(ForkJoinTask.adapt(runContinuation)); 250 } catch (RejectedExecutionException ree) { 251 submitFailed(ree); 252 throw ree; 253 } 254 } 255 256 /** 257 * Submits the runContinuation task to the scheduler as an external submit. 258 * @throws RejectedExecutionException 259 * @see ForkJoinPool#externalSubmit(ForkJoinTask) 260 */ 261 private void externalSubmitRunContinuation(ForkJoinPool pool) { 262 try { 263 pool.externalSubmit(ForkJoinTask.adapt(runContinuation)); 264 } catch (RejectedExecutionException ree) { 265 submitFailed(ree); 266 throw ree; 267 } 268 } 269 270 /** 271 * If enabled, emits a JFR VirtualThreadSubmitFailedEvent. 272 */ 273 private void submitFailed(RejectedExecutionException ree) { 274 var event = new VirtualThreadSubmitFailedEvent(); 275 if (event.isEnabled()) { 276 event.javaThreadId = threadId(); 277 event.exceptionMessage = ree.getMessage(); 278 event.commit(); 279 } 280 } 281 282 /** 283 * Runs a task in the context of this virtual thread. The virtual thread is 284 * mounted on the current (carrier) thread before the task runs. It unmounts 285 * from its carrier thread when the task completes. 286 */ 287 @ChangesCurrentThread 288 private void run(Runnable task) { 289 assert state == RUNNING; 290 291 // first mount 292 mount(); 293 if (notifyJvmtiEvents) notifyJvmtiMountEnd(true); 294 295 // emit JFR event if enabled 296 if (VirtualThreadStartEvent.isTurnedOn()) { 297 var event = new VirtualThreadStartEvent(); 298 event.javaThreadId = threadId(); 299 event.commit(); 300 } 301 302 Object bindings = scopedValueBindings(); 303 try { 304 runWith(bindings, task); 305 } catch (Throwable exc) { 306 dispatchUncaughtException(exc); 307 } finally { 308 try { 309 // pop any remaining scopes from the stack, this may block 310 StackableScope.popAll(); 311 312 // emit JFR event if enabled 313 if (VirtualThreadEndEvent.isTurnedOn()) { 314 var event = new VirtualThreadEndEvent(); 315 event.javaThreadId = threadId(); 316 event.commit(); 317 } 318 319 } finally { 320 // last unmount 321 if (notifyJvmtiEvents) notifyJvmtiUnmountBegin(true); 322 unmount(); 323 324 // final state 325 setState(TERMINATED); 326 } 327 } 328 } 329 330 @Hidden 331 @ForceInline 332 private void runWith(Object bindings, Runnable op) { 333 ensureMaterializedForStackWalk(bindings); 334 op.run(); 335 Reference.reachabilityFence(bindings); 336 } 337 338 /** 339 * Mounts this virtual thread onto the current platform thread. On 340 * return, the current thread is the virtual thread. 341 */ 342 @ChangesCurrentThread 343 private void mount() { 344 // sets the carrier thread 345 Thread carrier = Thread.currentCarrierThread(); 346 setCarrierThread(carrier); 347 348 // sync up carrier thread interrupt status if needed 349 if (interrupted) { 350 carrier.setInterrupt(); 351 } else if (carrier.isInterrupted()) { 352 synchronized (interruptLock) { 353 // need to recheck interrupt status 354 if (!interrupted) { 355 carrier.clearInterrupt(); 356 } 357 } 358 } 359 360 // set Thread.currentThread() to return this virtual thread 361 carrier.setCurrentThread(this); 362 } 363 364 /** 365 * Unmounts this virtual thread from the carrier. On return, the 366 * current thread is the current platform thread. 367 */ 368 @ChangesCurrentThread 369 private void unmount() { 370 // set Thread.currentThread() to return the platform thread 371 Thread carrier = this.carrierThread; 372 carrier.setCurrentThread(carrier); 373 374 // break connection to carrier thread, synchronized with interrupt 375 synchronized (interruptLock) { 376 setCarrierThread(null); 377 } 378 carrier.clearInterrupt(); 379 } 380 381 /** 382 * Sets the current thread to the current carrier thread. 383 * @return true if JVMTI was notified 384 */ 385 @ChangesCurrentThread 386 @JvmtiMountTransition 387 private boolean switchToCarrierThread() { 388 boolean notifyJvmti = notifyJvmtiEvents; 389 if (notifyJvmti) { 390 notifyJvmtiHideFrames(true); 391 } 392 Thread carrier = this.carrierThread; 393 assert Thread.currentThread() == this 394 && carrier == Thread.currentCarrierThread(); 395 carrier.setCurrentThread(carrier); 396 return notifyJvmti; 397 } 398 399 /** 400 * Sets the current thread to the given virtual thread. 401 * If {@code notifyJvmti} is true then JVMTI is notified. 402 */ 403 @ChangesCurrentThread 404 @JvmtiMountTransition 405 private void switchToVirtualThread(VirtualThread vthread, boolean notifyJvmti) { 406 Thread carrier = vthread.carrierThread; 407 assert carrier == Thread.currentCarrierThread(); 408 carrier.setCurrentThread(vthread); 409 if (notifyJvmti) { 410 notifyJvmtiHideFrames(false); 411 } 412 } 413 414 /** 415 * Unmounts this virtual thread, invokes Continuation.yield, and re-mounts the 416 * thread when continued. When enabled, JVMTI must be notified from this method. 417 * @return true if the yield was successful 418 */ 419 @ChangesCurrentThread 420 private boolean yieldContinuation() { 421 // unmount 422 if (notifyJvmtiEvents) notifyJvmtiUnmountBegin(false); 423 unmount(); 424 try { 425 return Continuation.yield(VTHREAD_SCOPE); 426 } finally { 427 // re-mount 428 mount(); 429 if (notifyJvmtiEvents) notifyJvmtiMountEnd(false); 430 } 431 } 432 433 /** 434 * Invoked after the continuation yields. If parking then it sets the state 435 * and also re-submits the task to continue if unparked while parking. 436 * If yielding due to Thread.yield then it just submits the task to continue. 437 */ 438 private void afterYield() { 439 int s = state(); 440 assert (s == PARKING || s == YIELDING) && (carrierThread == null); 441 442 if (s == PARKING) { 443 setState(PARKED); 444 445 // notify JVMTI that unmount has completed, thread is parked 446 if (notifyJvmtiEvents) notifyJvmtiUnmountEnd(false); 447 448 // may have been unparked while parking 449 if (parkPermit && compareAndSetState(PARKED, RUNNABLE)) { 450 // lazy submit to continue on the current thread as carrier if possible 451 if (currentThread() instanceof CarrierThread ct) { 452 lazySubmitRunContinuation(ct.getPool()); 453 } else { 454 submitRunContinuation(); 455 } 456 457 } 458 } else if (s == YIELDING) { // Thread.yield 459 setState(RUNNABLE); 460 461 // notify JVMTI that unmount has completed, thread is runnable 462 if (notifyJvmtiEvents) notifyJvmtiUnmountEnd(false); 463 464 // external submit if there are no tasks in the local task queue 465 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 466 externalSubmitRunContinuation(ct.getPool()); 467 } else { 468 submitRunContinuation(); 469 } 470 } 471 } 472 473 /** 474 * Invoked after the thread terminates (or start failed). This method 475 * notifies anyone waiting for the thread to terminate. 476 * 477 * @param executed true if the thread executed, false if it failed to start 478 */ 479 private void afterTerminate(boolean executed) { 480 assert (state() == TERMINATED) && (carrierThread == null); 481 482 if (executed) { 483 if (notifyJvmtiEvents) notifyJvmtiUnmountEnd(true); 484 } 485 486 // notify anyone waiting for this virtual thread to terminate 487 CountDownLatch termination = this.termination; 488 if (termination != null) { 489 assert termination.getCount() == 1; 490 termination.countDown(); 491 } 492 493 if (executed) { 494 // notify container if thread executed 495 threadContainer().onExit(this); 496 497 // clear references to thread locals 498 clearReferences(); 499 } 500 } 501 502 /** 503 * Schedules this {@code VirtualThread} to execute. 504 * 505 * @throws IllegalStateException if the container is shutdown or closed 506 * @throws IllegalThreadStateException if the thread has already been started 507 * @throws RejectedExecutionException if the scheduler cannot accept a task 508 */ 509 @Override 510 void start(ThreadContainer container) { 511 if (!compareAndSetState(NEW, STARTED)) { 512 throw new IllegalThreadStateException("Already started"); 513 } 514 515 // bind thread to container 516 setThreadContainer(container); 517 518 // start thread 519 boolean started = false; 520 container.onStart(this); // may throw 521 try { 522 // scoped values may be inherited 523 inheritScopedValueBindings(container); 524 525 // submit task to run thread 526 submitRunContinuation(); 527 started = true; 528 } finally { 529 if (!started) { 530 setState(TERMINATED); 531 container.onExit(this); 532 afterTerminate(/*executed*/ false); 533 } 534 } 535 } 536 537 @Override 538 public void start() { 539 start(ThreadContainers.root()); 540 } 541 542 @Override 543 public void run() { 544 // do nothing 545 } 546 547 /** 548 * Parks until unparked or interrupted. If already unparked then the parking 549 * permit is consumed and this method completes immediately (meaning it doesn't 550 * yield). It also completes immediately if the interrupt status is set. 551 */ 552 @Override 553 void park() { 554 assert Thread.currentThread() == this; 555 556 // complete immediately if parking permit available or interrupted 557 if (getAndSetParkPermit(false) || interrupted) 558 return; 559 560 // park the thread 561 setState(PARKING); 562 try { 563 if (!yieldContinuation()) { 564 // park on the carrier thread when pinned 565 parkOnCarrierThread(false, 0); 566 } 567 } finally { 568 assert (Thread.currentThread() == this) && (state() == RUNNING); 569 } 570 } 571 572 /** 573 * Parks up to the given waiting time or until unparked or interrupted. 574 * If already unparked then the parking permit is consumed and this method 575 * completes immediately (meaning it doesn't yield). It also completes immediately 576 * if the interrupt status is set or the waiting time is {@code <= 0}. 577 * 578 * @param nanos the maximum number of nanoseconds to wait. 579 */ 580 @Override 581 void parkNanos(long nanos) { 582 assert Thread.currentThread() == this; 583 584 // complete immediately if parking permit available or interrupted 585 if (getAndSetParkPermit(false) || interrupted) 586 return; 587 588 // park the thread for the waiting time 589 if (nanos > 0) { 590 long startTime = System.nanoTime(); 591 592 boolean yielded; 593 Future<?> unparker = scheduleUnpark(this::unpark, nanos); 594 setState(PARKING); 595 try { 596 yielded = yieldContinuation(); 597 } finally { 598 assert (Thread.currentThread() == this) 599 && (state() == RUNNING || state() == PARKING); 600 cancel(unparker); 601 } 602 603 // park on carrier thread for remaining time when pinned 604 if (!yielded) { 605 long deadline = startTime + nanos; 606 if (deadline < 0L) 607 deadline = Long.MAX_VALUE; 608 parkOnCarrierThread(true, deadline - System.nanoTime()); 609 } 610 } 611 } 612 613 /** 614 * Parks the current carrier thread up to the given waiting time or until 615 * unparked or interrupted. If the virtual thread is interrupted then the 616 * interrupt status will be propagated to the carrier thread. 617 * @param timed true for a timed park, false for untimed 618 * @param nanos the waiting time in nanoseconds 619 */ 620 private void parkOnCarrierThread(boolean timed, long nanos) { 621 assert state() == PARKING; 622 623 var pinnedEvent = new VirtualThreadPinnedEvent(); 624 pinnedEvent.begin(); 625 626 setState(PINNED); 627 try { 628 if (!parkPermit) { 629 if (!timed) { 630 U.park(false, 0); 631 } else if (nanos > 0) { 632 U.park(false, nanos); 633 } 634 } 635 } finally { 636 setState(RUNNING); 637 } 638 639 // consume parking permit 640 setParkPermit(false); 641 642 pinnedEvent.commit(); 643 } 644 645 /** 646 * Schedule an unpark task to run after a given delay. 647 */ 648 @ChangesCurrentThread 649 private Future<?> scheduleUnpark(Runnable unparker, long nanos) { 650 // need to switch to current carrier thread to avoid nested parking 651 boolean notifyJvmti = switchToCarrierThread(); 652 try { 653 return UNPARKER.schedule(unparker, nanos, NANOSECONDS); 654 } finally { 655 switchToVirtualThread(this, notifyJvmti); 656 } 657 } 658 659 /** 660 * Cancels a task if it has not completed. 661 */ 662 @ChangesCurrentThread 663 private void cancel(Future<?> future) { 664 if (!future.isDone()) { 665 // need to switch to current carrier thread to avoid nested parking 666 boolean notifyJvmti = switchToCarrierThread(); 667 try { 668 future.cancel(false); 669 } finally { 670 switchToVirtualThread(this, notifyJvmti); 671 } 672 } 673 } 674 675 /** 676 * Re-enables this virtual thread for scheduling. If the virtual thread was 677 * {@link #park() parked} then it will be unblocked, otherwise its next call 678 * to {@code park} or {@linkplain #parkNanos(long) parkNanos} is guaranteed 679 * not to block. 680 * @throws RejectedExecutionException if the scheduler cannot accept a task 681 */ 682 @Override 683 @ChangesCurrentThread 684 void unpark() { 685 Thread currentThread = Thread.currentThread(); 686 if (!getAndSetParkPermit(true) && currentThread != this) { 687 int s = state(); 688 if (s == PARKED && compareAndSetState(PARKED, RUNNABLE)) { 689 if (currentThread instanceof VirtualThread vthread) { 690 boolean notifyJvmti = vthread.switchToCarrierThread(); 691 try { 692 submitRunContinuation(); 693 } finally { 694 switchToVirtualThread(vthread, notifyJvmti); 695 } 696 } else { 697 submitRunContinuation(); 698 } 699 } else if (s == PINNED) { 700 // unpark carrier thread when pinned. 701 synchronized (carrierThreadAccessLock()) { 702 Thread carrier = carrierThread; 703 if (carrier != null && state() == PINNED) { 704 U.unpark(carrier); 705 } 706 } 707 } 708 } 709 } 710 711 /** 712 * Attempts to yield the current virtual thread (Thread.yield). 713 */ 714 void tryYield() { 715 assert Thread.currentThread() == this; 716 setState(YIELDING); 717 try { 718 yieldContinuation(); 719 } finally { 720 assert Thread.currentThread() == this; 721 if (state() != RUNNING) { 722 assert state() == YIELDING; 723 setState(RUNNING); 724 } 725 } 726 } 727 728 /** 729 * Sleep the current virtual thread for the given sleep time. 730 * 731 * @param nanos the maximum number of nanoseconds to sleep 732 * @throws InterruptedException if interrupted while sleeping 733 */ 734 void sleepNanos(long nanos) throws InterruptedException { 735 assert Thread.currentThread() == this; 736 if (nanos >= 0) { 737 if (ThreadSleepEvent.isTurnedOn()) { 738 ThreadSleepEvent event = new ThreadSleepEvent(); 739 try { 740 event.time = nanos; 741 event.begin(); 742 doSleepNanos(nanos); 743 } finally { 744 event.commit(); 745 } 746 } else { 747 doSleepNanos(nanos); 748 } 749 } 750 } 751 752 /** 753 * Sleep the current thread for the given sleep time (in nanoseconds). If 754 * nanos is 0 then the thread will attempt to yield. 755 * 756 * @implNote This implementation parks the thread for the given sleeping time 757 * and will therefore be observed in PARKED state during the sleep. Parking 758 * will consume the parking permit so this method makes available the parking 759 * permit after the sleep. This may be observed as a spurious, but benign, 760 * wakeup when the thread subsequently attempts to park. 761 */ 762 private void doSleepNanos(long nanos) throws InterruptedException { 763 assert nanos >= 0; 764 if (getAndClearInterrupt()) 765 throw new InterruptedException(); 766 if (nanos == 0) { 767 tryYield(); 768 } else { 769 // park for the sleep time 770 try { 771 long remainingNanos = nanos; 772 long startNanos = System.nanoTime(); 773 while (remainingNanos > 0) { 774 parkNanos(remainingNanos); 775 if (getAndClearInterrupt()) { 776 throw new InterruptedException(); 777 } 778 remainingNanos = nanos - (System.nanoTime() - startNanos); 779 } 780 } finally { 781 // may have been unparked while sleeping 782 setParkPermit(true); 783 } 784 } 785 } 786 787 /** 788 * Waits up to {@code nanos} nanoseconds for this virtual thread to terminate. 789 * A timeout of {@code 0} means to wait forever. 790 * 791 * @throws InterruptedException if interrupted while waiting 792 * @return true if the thread has terminated 793 */ 794 boolean joinNanos(long nanos) throws InterruptedException { 795 if (state() == TERMINATED) 796 return true; 797 798 // ensure termination object exists, then re-check state 799 CountDownLatch termination = getTermination(); 800 if (state() == TERMINATED) 801 return true; 802 803 // wait for virtual thread to terminate 804 if (nanos == 0) { 805 termination.await(); 806 } else { 807 boolean terminated = termination.await(nanos, NANOSECONDS); 808 if (!terminated) { 809 // waiting time elapsed 810 return false; 811 } 812 } 813 assert state() == TERMINATED; 814 return true; 815 } 816 817 @Override 818 @SuppressWarnings("removal") 819 public void interrupt() { 820 if (Thread.currentThread() != this) { 821 checkAccess(); 822 synchronized (interruptLock) { 823 interrupted = true; 824 Interruptible b = nioBlocker; 825 if (b != null) { 826 b.interrupt(this); 827 } 828 829 // interrupt carrier thread if mounted 830 Thread carrier = carrierThread; 831 if (carrier != null) carrier.setInterrupt(); 832 } 833 } else { 834 interrupted = true; 835 carrierThread.setInterrupt(); 836 } 837 unpark(); 838 } 839 840 @Override 841 public boolean isInterrupted() { 842 return interrupted; 843 } 844 845 @Override 846 boolean getAndClearInterrupt() { 847 assert Thread.currentThread() == this; 848 synchronized (interruptLock) { 849 boolean oldValue = interrupted; 850 if (oldValue) 851 interrupted = false; 852 carrierThread.clearInterrupt(); 853 return oldValue; 854 } 855 } 856 857 @Override 858 Thread.State threadState() { 859 switch (state()) { 860 case NEW: 861 return Thread.State.NEW; 862 case STARTED: 863 // return NEW if thread container not yet set 864 if (threadContainer() == null) { 865 return Thread.State.NEW; 866 } else { 867 return Thread.State.RUNNABLE; 868 } 869 case RUNNABLE: 870 case RUNNABLE_SUSPENDED: 871 // runnable, not mounted 872 return Thread.State.RUNNABLE; 873 case RUNNING: 874 // if mounted then return state of carrier thread 875 synchronized (carrierThreadAccessLock()) { 876 Thread carrierThread = this.carrierThread; 877 if (carrierThread != null) { 878 return carrierThread.threadState(); 879 } 880 } 881 // runnable, mounted 882 return Thread.State.RUNNABLE; 883 case PARKING: 884 case YIELDING: 885 // runnable, mounted, not yet waiting 886 return Thread.State.RUNNABLE; 887 case PARKED: 888 case PARKED_SUSPENDED: 889 case PINNED: 890 return Thread.State.WAITING; 891 case TERMINATED: 892 return Thread.State.TERMINATED; 893 default: 894 throw new InternalError(); 895 } 896 } 897 898 @Override 899 boolean alive() { 900 int s = state; 901 return (s != NEW && s != TERMINATED); 902 } 903 904 @Override 905 boolean isTerminated() { 906 return (state == TERMINATED); 907 } 908 909 @Override 910 StackTraceElement[] asyncGetStackTrace() { 911 StackTraceElement[] stackTrace; 912 do { 913 stackTrace = (carrierThread != null) 914 ? super.asyncGetStackTrace() // mounted 915 : tryGetStackTrace(); // unmounted 916 if (stackTrace == null) { 917 Thread.yield(); 918 } 919 } while (stackTrace == null); 920 return stackTrace; 921 } 922 923 /** 924 * Returns the stack trace for this virtual thread if it is unmounted. 925 * Returns null if the thread is in another state. 926 */ 927 private StackTraceElement[] tryGetStackTrace() { 928 int initialState = state(); 929 return switch (initialState) { 930 case RUNNABLE, PARKED -> { 931 int suspendedState = initialState | SUSPENDED; 932 if (compareAndSetState(initialState, suspendedState)) { 933 try { 934 yield cont.getStackTrace(); 935 } finally { 936 assert state == suspendedState; 937 setState(initialState); 938 939 // re-submit if runnable 940 // re-submit if unparked while suspended 941 if (initialState == RUNNABLE 942 || (parkPermit && compareAndSetState(PARKED, RUNNABLE))) { 943 try { 944 submitRunContinuation(); 945 } catch (RejectedExecutionException ignore) { } 946 } 947 } 948 } 949 yield null; 950 } 951 case NEW, STARTED, TERMINATED -> new StackTraceElement[0]; // empty stack 952 default -> null; 953 }; 954 } 955 956 @Override 957 public String toString() { 958 StringBuilder sb = new StringBuilder("VirtualThread[#"); 959 sb.append(threadId()); 960 String name = getName(); 961 if (!name.isEmpty()) { 962 sb.append(","); 963 sb.append(name); 964 } 965 sb.append("]/"); 966 Thread carrier = carrierThread; 967 if (carrier != null) { 968 // include the carrier thread state and name when mounted 969 synchronized (carrierThreadAccessLock()) { 970 carrier = carrierThread; 971 if (carrier != null) { 972 String stateAsString = carrier.threadState().toString(); 973 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 974 sb.append('@'); 975 sb.append(carrier.getName()); 976 } 977 } 978 } 979 // include virtual thread state when not mounted 980 if (carrier == null) { 981 String stateAsString = threadState().toString(); 982 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 983 } 984 return sb.toString(); 985 } 986 987 @Override 988 public int hashCode() { 989 return (int) threadId(); 990 } 991 992 @Override 993 public boolean equals(Object obj) { 994 return obj == this; 995 } 996 997 /** 998 * Returns the termination object, creating it if needed. 999 */ 1000 private CountDownLatch getTermination() { 1001 CountDownLatch termination = this.termination; 1002 if (termination == null) { 1003 termination = new CountDownLatch(1); 1004 if (!U.compareAndSetReference(this, TERMINATION, null, termination)) { 1005 termination = this.termination; 1006 } 1007 } 1008 return termination; 1009 } 1010 1011 /** 1012 * Returns the lock object to synchronize on when accessing carrierThread. 1013 * The lock prevents carrierThread from being reset to null during unmount. 1014 */ 1015 private Object carrierThreadAccessLock() { 1016 // return interruptLock as unmount has to coordinate with interrupt 1017 return interruptLock; 1018 } 1019 1020 // -- wrappers for get/set of state, parking permit, and carrier thread -- 1021 1022 private int state() { 1023 return state; // volatile read 1024 } 1025 1026 private void setState(int newValue) { 1027 state = newValue; // volatile write 1028 } 1029 1030 private boolean compareAndSetState(int expectedValue, int newValue) { 1031 return U.compareAndSetInt(this, STATE, expectedValue, newValue); 1032 } 1033 1034 private void setParkPermit(boolean newValue) { 1035 if (parkPermit != newValue) { 1036 parkPermit = newValue; 1037 } 1038 } 1039 1040 private boolean getAndSetParkPermit(boolean newValue) { 1041 if (parkPermit != newValue) { 1042 return U.getAndSetBoolean(this, PARK_PERMIT, newValue); 1043 } else { 1044 return newValue; 1045 } 1046 } 1047 1048 private void setCarrierThread(Thread carrier) { 1049 // U.putReferenceRelease(this, CARRIER_THREAD, carrier); 1050 this.carrierThread = carrier; 1051 } 1052 1053 // -- JVM TI support -- 1054 1055 private static volatile boolean notifyJvmtiEvents; // set by VM 1056 1057 @JvmtiMountTransition 1058 private native void notifyJvmtiMountBegin(boolean firstMount); 1059 1060 @JvmtiMountTransition 1061 private native void notifyJvmtiMountEnd(boolean firstMount); 1062 1063 @JvmtiMountTransition 1064 private native void notifyJvmtiUnmountBegin(boolean lastUnmount); 1065 1066 @JvmtiMountTransition 1067 private native void notifyJvmtiUnmountEnd(boolean lastUnmount); 1068 1069 @JvmtiMountTransition 1070 private native void notifyJvmtiHideFrames(boolean hide); 1071 1072 private static native void registerNatives(); 1073 static { 1074 registerNatives(); 1075 } 1076 1077 /** 1078 * Creates the default scheduler. 1079 */ 1080 @SuppressWarnings("removal") 1081 private static ForkJoinPool createDefaultScheduler() { 1082 ForkJoinWorkerThreadFactory factory = pool -> { 1083 PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread(pool); 1084 return AccessController.doPrivileged(pa); 1085 }; 1086 PrivilegedAction<ForkJoinPool> pa = () -> { 1087 int parallelism, maxPoolSize, minRunnable; 1088 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism"); 1089 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize"); 1090 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable"); 1091 if (parallelismValue != null) { 1092 parallelism = Integer.parseInt(parallelismValue); 1093 } else { 1094 parallelism = Runtime.getRuntime().availableProcessors(); 1095 } 1096 if (maxPoolSizeValue != null) { 1097 maxPoolSize = Integer.parseInt(maxPoolSizeValue); 1098 parallelism = Integer.min(parallelism, maxPoolSize); 1099 } else { 1100 maxPoolSize = Integer.max(parallelism, 256); 1101 } 1102 if (minRunnableValue != null) { 1103 minRunnable = Integer.parseInt(minRunnableValue); 1104 } else { 1105 minRunnable = Integer.max(parallelism / 2, 1); 1106 } 1107 Thread.UncaughtExceptionHandler handler = (t, e) -> { }; 1108 boolean asyncMode = true; // FIFO 1109 return new ForkJoinPool(parallelism, factory, handler, asyncMode, 1110 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS); 1111 }; 1112 return AccessController.doPrivileged(pa); 1113 } 1114 1115 /** 1116 * Creates the ScheduledThreadPoolExecutor used for timed unpark. 1117 */ 1118 private static ScheduledExecutorService createDelayedTaskScheduler() { 1119 String propValue = GetPropertyAction.privilegedGetProperty("jdk.unparker.maxPoolSize"); 1120 int poolSize; 1121 if (propValue != null) { 1122 poolSize = Integer.parseInt(propValue); 1123 } else { 1124 poolSize = 1; 1125 } 1126 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) 1127 Executors.newScheduledThreadPool(poolSize, task -> { 1128 return InnocuousThread.newThread("VirtualThread-unparker", task); 1129 }); 1130 stpe.setRemoveOnCancelPolicy(true); 1131 return stpe; 1132 } 1133 1134 /** 1135 * Reads the value of the jdk.tracePinnedThreads property to determine if stack 1136 * traces should be printed when a carrier thread is pinned when a virtual thread 1137 * attempts to park. 1138 */ 1139 private static int tracePinningMode() { 1140 String propValue = GetPropertyAction.privilegedGetProperty("jdk.tracePinnedThreads"); 1141 if (propValue != null) { 1142 if (propValue.length() == 0 || "full".equalsIgnoreCase(propValue)) 1143 return 1; 1144 if ("short".equalsIgnoreCase(propValue)) 1145 return 2; 1146 } 1147 return 0; 1148 } 1149 }