1 /* 2 * Copyright (c) 2018, 2022, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.lang; 26 27 import java.lang.ref.Reference; 28 import java.security.AccessController; 29 import java.security.PrivilegedAction; 30 import java.util.Locale; 31 import java.util.Objects; 32 import java.util.concurrent.CountDownLatch; 33 import java.util.concurrent.Executor; 34 import java.util.concurrent.Executors; 35 import java.util.concurrent.ForkJoinPool; 36 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory; 37 import java.util.concurrent.ForkJoinTask; 38 import java.util.concurrent.ForkJoinWorkerThread; 39 import java.util.concurrent.Future; 40 import java.util.concurrent.RejectedExecutionException; 41 import java.util.concurrent.ScheduledExecutorService; 42 import java.util.concurrent.ScheduledThreadPoolExecutor; 43 import jdk.internal.event.ThreadSleepEvent; 44 import jdk.internal.event.VirtualThreadEndEvent; 45 import jdk.internal.event.VirtualThreadPinnedEvent; 46 import jdk.internal.event.VirtualThreadStartEvent; 47 import jdk.internal.event.VirtualThreadSubmitFailedEvent; 48 import jdk.internal.misc.CarrierThread; 49 import jdk.internal.misc.InnocuousThread; 50 import jdk.internal.misc.Unsafe; 51 import jdk.internal.vm.Continuation; 52 import jdk.internal.vm.ContinuationScope; 53 import jdk.internal.vm.StackableScope; 54 import jdk.internal.vm.ThreadContainer; 55 import jdk.internal.vm.ThreadContainers; 56 import jdk.internal.vm.annotation.ChangesCurrentThread; 57 import jdk.internal.vm.annotation.ForceInline; 58 import jdk.internal.vm.annotation.Hidden; 59 import jdk.internal.vm.annotation.IntrinsicCandidate; 60 import jdk.internal.vm.annotation.JvmtiMountTransition; 61 import sun.nio.ch.Interruptible; 62 import sun.security.action.GetPropertyAction; 63 import static java.util.concurrent.TimeUnit.*; 64 65 /** 66 * A thread that is scheduled by the Java virtual machine rather than the operating 67 * system. 68 */ 69 final class VirtualThread extends BaseVirtualThread { 70 private static final Unsafe U = Unsafe.getUnsafe(); 71 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads"); 72 private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler(); 73 private static final ScheduledExecutorService UNPARKER = createDelayedTaskScheduler(); 74 private static final int TRACE_PINNING_MODE = tracePinningMode(); 75 76 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state"); 77 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit"); 78 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread"); 79 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination"); 80 81 // scheduler and continuation 82 private final Executor scheduler; 83 private final Continuation cont; 84 private final Runnable runContinuation; 85 86 // virtual thread state, accessed by VM 87 private volatile int state; 88 89 /* 90 * Virtual thread state and transitions: 91 * 92 * NEW -> STARTED // Thread.start 93 * STARTED -> TERMINATED // failed to start 94 * STARTED -> RUNNING // first run 95 * 96 * RUNNING -> PARKING // Thread attempts to park 97 * PARKING -> PARKED // cont.yield successful, thread is parked 98 * PARKING -> PINNED // cont.yield failed, thread is pinned 99 * 100 * PARKED -> RUNNABLE // unpark or interrupted 101 * PINNED -> RUNNABLE // unpark or interrupted 102 * 103 * RUNNABLE -> RUNNING // continue execution 104 * 105 * RUNNING -> YIELDING // Thread.yield 106 * YIELDING -> RUNNABLE // yield successful 107 * YIELDING -> RUNNING // yield failed 108 * 109 * RUNNING -> TERMINATED // done 110 */ 111 private static final int NEW = 0; 112 private static final int STARTED = 1; 113 private static final int RUNNABLE = 2; // runnable-unmounted 114 private static final int RUNNING = 3; // runnable-mounted 115 private static final int PARKING = 4; 116 private static final int PARKED = 5; // unmounted 117 private static final int PINNED = 6; // mounted 118 private static final int YIELDING = 7; // Thread.yield 119 private static final int TERMINATED = 99; // final state 120 121 // can be suspended from scheduling when unmounted 122 private static final int SUSPENDED = 1 << 8; 123 private static final int RUNNABLE_SUSPENDED = (RUNNABLE | SUSPENDED); 124 private static final int PARKED_SUSPENDED = (PARKED | SUSPENDED); 125 126 // parking permit 127 private volatile boolean parkPermit; 128 129 // carrier thread when mounted, accessed by VM 130 private volatile Thread carrierThread; 131 132 // termination object when joining, created lazily if needed 133 private volatile CountDownLatch termination; 134 135 /** 136 * Returns the continuation scope used for virtual threads. 137 */ 138 static ContinuationScope continuationScope() { 139 return VTHREAD_SCOPE; 140 } 141 142 /** 143 * Creates a new {@code VirtualThread} to run the given task with the given 144 * scheduler. If the given scheduler is {@code null} and the current thread 145 * is a platform thread then the newly created virtual thread will use the 146 * default scheduler. If given scheduler is {@code null} and the current 147 * thread is a virtual thread then the current thread's scheduler is used. 148 * 149 * @param scheduler the scheduler or null 150 * @param name thread name 151 * @param characteristics characteristics 152 * @param task the task to execute 153 */ 154 VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) { 155 super(name, characteristics, /*bound*/ false); 156 Objects.requireNonNull(task); 157 158 // choose scheduler if not specified 159 if (scheduler == null) { 160 Thread parent = Thread.currentThread(); 161 if (parent instanceof VirtualThread vparent) { 162 scheduler = vparent.scheduler; 163 } else { 164 scheduler = DEFAULT_SCHEDULER; 165 } 166 } 167 168 this.scheduler = scheduler; 169 this.cont = new VThreadContinuation(this, task); 170 this.runContinuation = this::runContinuation; 171 } 172 173 /** 174 * The continuation that a virtual thread executes. 175 */ 176 private static class VThreadContinuation extends Continuation { 177 VThreadContinuation(VirtualThread vthread, Runnable task) { 178 super(VTHREAD_SCOPE, () -> vthread.run(task)); 179 } 180 @Override 181 protected void onPinned(Continuation.Pinned reason) { 182 if (TRACE_PINNING_MODE > 0) { 183 boolean printAll = (TRACE_PINNING_MODE == 1); 184 PinnedThreadPrinter.printStackTrace(System.out, printAll); 185 } 186 } 187 } 188 189 /** 190 * Runs or continues execution of the continuation on the current thread. 191 */ 192 private void runContinuation() { 193 // the carrier must be a platform thread 194 if (Thread.currentThread().isVirtual()) { 195 throw new WrongThreadException(); 196 } 197 198 // set state to RUNNING 199 boolean firstRun; 200 int initialState = state(); 201 if (initialState == STARTED && compareAndSetState(STARTED, RUNNING)) { 202 // first run 203 firstRun = true; 204 } else if (initialState == RUNNABLE && compareAndSetState(RUNNABLE, RUNNING)) { 205 // consume parking permit 206 setParkPermit(false); 207 firstRun = false; 208 } else { 209 // not runnable 210 return; 211 } 212 213 // notify JVMTI before mount 214 notifyJvmtiMount(true, firstRun); 215 216 try { 217 cont.run(); 218 } finally { 219 if (cont.isDone()) { 220 afterTerminate(/*executed*/ true); 221 } else { 222 afterYield(); 223 } 224 } 225 } 226 227 /** 228 * Submits the runContinuation task to the scheduler. For the default scheduler, 229 * and calling it on a worker thread, the task will be pushed to the local queue, 230 * otherwise it will be pushed to a submission queue. 231 * 232 * @throws RejectedExecutionException 233 */ 234 private void submitRunContinuation() { 235 try { 236 scheduler.execute(runContinuation); 237 } catch (RejectedExecutionException ree) { 238 submitFailed(ree); 239 throw ree; 240 } 241 } 242 243 /** 244 * Submits the runContinuation task to the scheduler with a lazy submit. 245 * @throws RejectedExecutionException 246 * @see ForkJoinPool#lazySubmit(ForkJoinTask) 247 */ 248 private void lazySubmitRunContinuation(ForkJoinPool pool) { 249 try { 250 pool.lazySubmit(ForkJoinTask.adapt(runContinuation)); 251 } catch (RejectedExecutionException ree) { 252 submitFailed(ree); 253 throw ree; 254 } 255 } 256 257 /** 258 * Submits the runContinuation task to the scheduler as an external submit. 259 * @throws RejectedExecutionException 260 * @see ForkJoinPool#externalSubmit(ForkJoinTask) 261 */ 262 private void externalSubmitRunContinuation(ForkJoinPool pool) { 263 try { 264 pool.externalSubmit(ForkJoinTask.adapt(runContinuation)); 265 } catch (RejectedExecutionException ree) { 266 submitFailed(ree); 267 throw ree; 268 } 269 } 270 271 /** 272 * If enabled, emits a JFR VirtualThreadSubmitFailedEvent. 273 */ 274 private void submitFailed(RejectedExecutionException ree) { 275 var event = new VirtualThreadSubmitFailedEvent(); 276 if (event.isEnabled()) { 277 event.javaThreadId = threadId(); 278 event.exceptionMessage = ree.getMessage(); 279 event.commit(); 280 } 281 } 282 283 /** 284 * Runs a task in the context of this virtual thread. The virtual thread is 285 * mounted on the current (carrier) thread before the task runs. It unmounts 286 * from its carrier thread when the task completes. 287 */ 288 @ChangesCurrentThread 289 private void run(Runnable task) { 290 assert state == RUNNING; 291 292 // first mount 293 mount(); 294 notifyJvmtiMount(false, true); 295 296 // emit JFR event if enabled 297 if (VirtualThreadStartEvent.isTurnedOn()) { 298 var event = new VirtualThreadStartEvent(); 299 event.javaThreadId = threadId(); 300 event.commit(); 301 } 302 303 Object bindings = scopedValueBindings(); 304 try { 305 runWith(bindings, task); 306 } catch (Throwable exc) { 307 dispatchUncaughtException(exc); 308 } finally { 309 try { 310 // pop any remaining scopes from the stack, this may block 311 StackableScope.popAll(); 312 313 // emit JFR event if enabled 314 if (VirtualThreadEndEvent.isTurnedOn()) { 315 var event = new VirtualThreadEndEvent(); 316 event.javaThreadId = threadId(); 317 event.commit(); 318 } 319 320 } finally { 321 // last unmount 322 notifyJvmtiUnmount(true, true); 323 unmount(); 324 325 // final state 326 setState(TERMINATED); 327 } 328 } 329 } 330 331 @Hidden 332 @ForceInline 333 private void runWith(Object bindings, Runnable op) { 334 ensureMaterializedForStackWalk(bindings); 335 op.run(); 336 Reference.reachabilityFence(bindings); 337 } 338 339 /** 340 * Mounts this virtual thread onto the current platform thread. On 341 * return, the current thread is the virtual thread. 342 */ 343 @ChangesCurrentThread 344 private void mount() { 345 // sets the carrier thread 346 Thread carrier = Thread.currentCarrierThread(); 347 setCarrierThread(carrier); 348 349 // sync up carrier thread interrupt status if needed 350 if (interrupted) { 351 carrier.setInterrupt(); 352 } else if (carrier.isInterrupted()) { 353 synchronized (interruptLock) { 354 // need to recheck interrupt status 355 if (!interrupted) { 356 carrier.clearInterrupt(); 357 } 358 } 359 } 360 361 // set Thread.currentThread() to return this virtual thread 362 carrier.setCurrentThread(this); 363 } 364 365 /** 366 * Unmounts this virtual thread from the carrier. On return, the 367 * current thread is the current platform thread. 368 */ 369 @ChangesCurrentThread 370 private void unmount() { 371 // set Thread.currentThread() to return the platform thread 372 Thread carrier = this.carrierThread; 373 carrier.setCurrentThread(carrier); 374 375 // break connection to carrier thread, synchronized with interrupt 376 synchronized (interruptLock) { 377 setCarrierThread(null); 378 } 379 carrier.clearInterrupt(); 380 } 381 382 /** 383 * Sets the current thread to the current carrier thread. 384 */ 385 @ChangesCurrentThread 386 @JvmtiMountTransition 387 private void switchToCarrierThread() { 388 notifyJvmtiHideFrames(true); 389 Thread carrier = this.carrierThread; 390 assert Thread.currentThread() == this 391 && carrier == Thread.currentCarrierThread(); 392 carrier.setCurrentThread(carrier); 393 } 394 395 /** 396 * Sets the current thread to the given virtual thread. 397 */ 398 @ChangesCurrentThread 399 @JvmtiMountTransition 400 private void switchToVirtualThread(VirtualThread vthread) { 401 Thread carrier = vthread.carrierThread; 402 assert carrier == Thread.currentCarrierThread(); 403 carrier.setCurrentThread(vthread); 404 notifyJvmtiHideFrames(false); 405 } 406 407 /** 408 * Unmounts this virtual thread, invokes Continuation.yield, and re-mounts the 409 * thread when continued. When enabled, JVMTI must be notified from this method. 410 * @return true if the yield was successful 411 */ 412 @ChangesCurrentThread 413 private boolean yieldContinuation() { 414 // unmount 415 notifyJvmtiUnmount(true, false); 416 unmount(); 417 try { 418 return Continuation.yield(VTHREAD_SCOPE); 419 } finally { 420 // re-mount 421 mount(); 422 notifyJvmtiMount(false, false); 423 } 424 } 425 426 /** 427 * Invoked after the continuation yields. If parking then it sets the state 428 * and also re-submits the task to continue if unparked while parking. 429 * If yielding due to Thread.yield then it just submits the task to continue. 430 */ 431 private void afterYield() { 432 int s = state(); 433 assert (s == PARKING || s == YIELDING) && (carrierThread == null); 434 435 if (s == PARKING) { 436 setState(PARKED); 437 438 // notify JVMTI that unmount has completed, thread is parked 439 notifyJvmtiUnmount(false, false); 440 441 // may have been unparked while parking 442 if (parkPermit && compareAndSetState(PARKED, RUNNABLE)) { 443 // lazy submit to continue on the current thread as carrier if possible 444 if (currentThread() instanceof CarrierThread ct) { 445 lazySubmitRunContinuation(ct.getPool()); 446 } else { 447 submitRunContinuation(); 448 } 449 450 } 451 } else if (s == YIELDING) { // Thread.yield 452 setState(RUNNABLE); 453 454 // notify JVMTI that unmount has completed, thread is runnable 455 notifyJvmtiUnmount(false, false); 456 457 // external submit if there are no tasks in the local task queue 458 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 459 externalSubmitRunContinuation(ct.getPool()); 460 } else { 461 submitRunContinuation(); 462 } 463 } 464 } 465 466 /** 467 * Invoked after the thread terminates (or start failed). This method 468 * notifies anyone waiting for the thread to terminate. 469 * 470 * @param executed true if the thread executed, false if it failed to start 471 */ 472 private void afterTerminate(boolean executed) { 473 assert (state() == TERMINATED) && (carrierThread == null); 474 475 if (executed) { 476 notifyJvmtiUnmount(false, true); 477 } 478 479 // notify anyone waiting for this virtual thread to terminate 480 CountDownLatch termination = this.termination; 481 if (termination != null) { 482 assert termination.getCount() == 1; 483 termination.countDown(); 484 } 485 486 if (executed) { 487 // notify container if thread executed 488 threadContainer().onExit(this); 489 490 // clear references to thread locals 491 clearReferences(); 492 } 493 } 494 495 /** 496 * Schedules this {@code VirtualThread} to execute. 497 * 498 * @throws IllegalStateException if the container is shutdown or closed 499 * @throws IllegalThreadStateException if the thread has already been started 500 * @throws RejectedExecutionException if the scheduler cannot accept a task 501 */ 502 @Override 503 void start(ThreadContainer container) { 504 if (!compareAndSetState(NEW, STARTED)) { 505 throw new IllegalThreadStateException("Already started"); 506 } 507 508 // bind thread to container 509 setThreadContainer(container); 510 511 // start thread 512 boolean started = false; 513 container.onStart(this); // may throw 514 try { 515 // scoped values may be inherited 516 inheritScopedValueBindings(container); 517 518 // submit task to run thread 519 submitRunContinuation(); 520 started = true; 521 } finally { 522 if (!started) { 523 setState(TERMINATED); 524 container.onExit(this); 525 afterTerminate(/*executed*/ false); 526 } 527 } 528 } 529 530 @Override 531 public void start() { 532 start(ThreadContainers.root()); 533 } 534 535 @Override 536 public void run() { 537 // do nothing 538 } 539 540 /** 541 * Parks until unparked or interrupted. If already unparked then the parking 542 * permit is consumed and this method completes immediately (meaning it doesn't 543 * yield). It also completes immediately if the interrupt status is set. 544 */ 545 @Override 546 void park() { 547 assert Thread.currentThread() == this; 548 549 // complete immediately if parking permit available or interrupted 550 if (getAndSetParkPermit(false) || interrupted) 551 return; 552 553 // park the thread 554 setState(PARKING); 555 try { 556 if (!yieldContinuation()) { 557 // park on the carrier thread when pinned 558 parkOnCarrierThread(false, 0); 559 } 560 } finally { 561 assert (Thread.currentThread() == this) && (state() == RUNNING); 562 } 563 } 564 565 /** 566 * Parks up to the given waiting time or until unparked or interrupted. 567 * If already unparked then the parking permit is consumed and this method 568 * completes immediately (meaning it doesn't yield). It also completes immediately 569 * if the interrupt status is set or the waiting time is {@code <= 0}. 570 * 571 * @param nanos the maximum number of nanoseconds to wait. 572 */ 573 @Override 574 void parkNanos(long nanos) { 575 assert Thread.currentThread() == this; 576 577 // complete immediately if parking permit available or interrupted 578 if (getAndSetParkPermit(false) || interrupted) 579 return; 580 581 // park the thread for the waiting time 582 if (nanos > 0) { 583 long startTime = System.nanoTime(); 584 585 boolean yielded; 586 Future<?> unparker = scheduleUnpark(this::unpark, nanos); 587 setState(PARKING); 588 try { 589 yielded = yieldContinuation(); 590 } finally { 591 assert (Thread.currentThread() == this) 592 && (state() == RUNNING || state() == PARKING); 593 cancel(unparker); 594 } 595 596 // park on carrier thread for remaining time when pinned 597 if (!yielded) { 598 long deadline = startTime + nanos; 599 if (deadline < 0L) 600 deadline = Long.MAX_VALUE; 601 parkOnCarrierThread(true, deadline - System.nanoTime()); 602 } 603 } 604 } 605 606 /** 607 * Parks the current carrier thread up to the given waiting time or until 608 * unparked or interrupted. If the virtual thread is interrupted then the 609 * interrupt status will be propagated to the carrier thread. 610 * @param timed true for a timed park, false for untimed 611 * @param nanos the waiting time in nanoseconds 612 */ 613 private void parkOnCarrierThread(boolean timed, long nanos) { 614 assert state() == PARKING; 615 616 var pinnedEvent = new VirtualThreadPinnedEvent(); 617 pinnedEvent.begin(); 618 619 setState(PINNED); 620 try { 621 if (!parkPermit) { 622 if (!timed) { 623 U.park(false, 0); 624 } else if (nanos > 0) { 625 U.park(false, nanos); 626 } 627 } 628 } finally { 629 setState(RUNNING); 630 } 631 632 // consume parking permit 633 setParkPermit(false); 634 635 pinnedEvent.commit(); 636 } 637 638 /** 639 * Schedule an unpark task to run after a given delay. 640 */ 641 @ChangesCurrentThread 642 private Future<?> scheduleUnpark(Runnable unparker, long nanos) { 643 // need to switch to current carrier thread to avoid nested parking 644 switchToCarrierThread(); 645 try { 646 return UNPARKER.schedule(unparker, nanos, NANOSECONDS); 647 } finally { 648 switchToVirtualThread(this); 649 } 650 } 651 652 /** 653 * Cancels a task if it has not completed. 654 */ 655 @ChangesCurrentThread 656 private void cancel(Future<?> future) { 657 if (!future.isDone()) { 658 // need to switch to current carrier thread to avoid nested parking 659 switchToCarrierThread(); 660 try { 661 future.cancel(false); 662 } finally { 663 switchToVirtualThread(this); 664 } 665 } 666 } 667 668 /** 669 * Re-enables this virtual thread for scheduling. If the virtual thread was 670 * {@link #park() parked} then it will be unblocked, otherwise its next call 671 * to {@code park} or {@linkplain #parkNanos(long) parkNanos} is guaranteed 672 * not to block. 673 * @throws RejectedExecutionException if the scheduler cannot accept a task 674 */ 675 @Override 676 @ChangesCurrentThread 677 void unpark() { 678 Thread currentThread = Thread.currentThread(); 679 if (!getAndSetParkPermit(true) && currentThread != this) { 680 int s = state(); 681 if (s == PARKED && compareAndSetState(PARKED, RUNNABLE)) { 682 if (currentThread instanceof VirtualThread vthread) { 683 vthread.switchToCarrierThread(); 684 try { 685 submitRunContinuation(); 686 } finally { 687 switchToVirtualThread(vthread); 688 } 689 } else { 690 submitRunContinuation(); 691 } 692 } else if (s == PINNED) { 693 // unpark carrier thread when pinned. 694 synchronized (carrierThreadAccessLock()) { 695 Thread carrier = carrierThread; 696 if (carrier != null && state() == PINNED) { 697 U.unpark(carrier); 698 } 699 } 700 } 701 } 702 } 703 704 /** 705 * Attempts to yield the current virtual thread (Thread.yield). 706 */ 707 void tryYield() { 708 assert Thread.currentThread() == this; 709 setState(YIELDING); 710 try { 711 yieldContinuation(); 712 } finally { 713 assert Thread.currentThread() == this; 714 if (state() != RUNNING) { 715 assert state() == YIELDING; 716 setState(RUNNING); 717 } 718 } 719 } 720 721 /** 722 * Sleep the current virtual thread for the given sleep time. 723 * 724 * @param nanos the maximum number of nanoseconds to sleep 725 * @throws InterruptedException if interrupted while sleeping 726 */ 727 void sleepNanos(long nanos) throws InterruptedException { 728 assert Thread.currentThread() == this; 729 if (nanos >= 0) { 730 if (ThreadSleepEvent.isTurnedOn()) { 731 ThreadSleepEvent event = new ThreadSleepEvent(); 732 try { 733 event.time = nanos; 734 event.begin(); 735 doSleepNanos(nanos); 736 } finally { 737 event.commit(); 738 } 739 } else { 740 doSleepNanos(nanos); 741 } 742 } 743 } 744 745 /** 746 * Sleep the current thread for the given sleep time (in nanoseconds). If 747 * nanos is 0 then the thread will attempt to yield. 748 * 749 * @implNote This implementation parks the thread for the given sleeping time 750 * and will therefore be observed in PARKED state during the sleep. Parking 751 * will consume the parking permit so this method makes available the parking 752 * permit after the sleep. This may be observed as a spurious, but benign, 753 * wakeup when the thread subsequently attempts to park. 754 */ 755 private void doSleepNanos(long nanos) throws InterruptedException { 756 assert nanos >= 0; 757 if (getAndClearInterrupt()) 758 throw new InterruptedException(); 759 if (nanos == 0) { 760 tryYield(); 761 } else { 762 // park for the sleep time 763 try { 764 long remainingNanos = nanos; 765 long startNanos = System.nanoTime(); 766 while (remainingNanos > 0) { 767 parkNanos(remainingNanos); 768 if (getAndClearInterrupt()) { 769 throw new InterruptedException(); 770 } 771 remainingNanos = nanos - (System.nanoTime() - startNanos); 772 } 773 } finally { 774 // may have been unparked while sleeping 775 setParkPermit(true); 776 } 777 } 778 } 779 780 /** 781 * Waits up to {@code nanos} nanoseconds for this virtual thread to terminate. 782 * A timeout of {@code 0} means to wait forever. 783 * 784 * @throws InterruptedException if interrupted while waiting 785 * @return true if the thread has terminated 786 */ 787 boolean joinNanos(long nanos) throws InterruptedException { 788 if (state() == TERMINATED) 789 return true; 790 791 // ensure termination object exists, then re-check state 792 CountDownLatch termination = getTermination(); 793 if (state() == TERMINATED) 794 return true; 795 796 // wait for virtual thread to terminate 797 if (nanos == 0) { 798 termination.await(); 799 } else { 800 boolean terminated = termination.await(nanos, NANOSECONDS); 801 if (!terminated) { 802 // waiting time elapsed 803 return false; 804 } 805 } 806 assert state() == TERMINATED; 807 return true; 808 } 809 810 @Override 811 @SuppressWarnings("removal") 812 public void interrupt() { 813 if (Thread.currentThread() != this) { 814 checkAccess(); 815 synchronized (interruptLock) { 816 interrupted = true; 817 Interruptible b = nioBlocker; 818 if (b != null) { 819 b.interrupt(this); 820 } 821 822 // interrupt carrier thread if mounted 823 Thread carrier = carrierThread; 824 if (carrier != null) carrier.setInterrupt(); 825 } 826 } else { 827 interrupted = true; 828 carrierThread.setInterrupt(); 829 } 830 unpark(); 831 } 832 833 @Override 834 public boolean isInterrupted() { 835 return interrupted; 836 } 837 838 @Override 839 boolean getAndClearInterrupt() { 840 assert Thread.currentThread() == this; 841 synchronized (interruptLock) { 842 boolean oldValue = interrupted; 843 if (oldValue) 844 interrupted = false; 845 carrierThread.clearInterrupt(); 846 return oldValue; 847 } 848 } 849 850 @Override 851 Thread.State threadState() { 852 switch (state()) { 853 case NEW: 854 return Thread.State.NEW; 855 case STARTED: 856 // return NEW if thread container not yet set 857 if (threadContainer() == null) { 858 return Thread.State.NEW; 859 } else { 860 return Thread.State.RUNNABLE; 861 } 862 case RUNNABLE: 863 case RUNNABLE_SUSPENDED: 864 // runnable, not mounted 865 return Thread.State.RUNNABLE; 866 case RUNNING: 867 // if mounted then return state of carrier thread 868 synchronized (carrierThreadAccessLock()) { 869 Thread carrierThread = this.carrierThread; 870 if (carrierThread != null) { 871 return carrierThread.threadState(); 872 } 873 } 874 // runnable, mounted 875 return Thread.State.RUNNABLE; 876 case PARKING: 877 case YIELDING: 878 // runnable, mounted, not yet waiting 879 return Thread.State.RUNNABLE; 880 case PARKED: 881 case PARKED_SUSPENDED: 882 case PINNED: 883 return Thread.State.WAITING; 884 case TERMINATED: 885 return Thread.State.TERMINATED; 886 default: 887 throw new InternalError(); 888 } 889 } 890 891 @Override 892 boolean alive() { 893 int s = state; 894 return (s != NEW && s != TERMINATED); 895 } 896 897 @Override 898 boolean isTerminated() { 899 return (state == TERMINATED); 900 } 901 902 @Override 903 StackTraceElement[] asyncGetStackTrace() { 904 StackTraceElement[] stackTrace; 905 do { 906 stackTrace = (carrierThread != null) 907 ? super.asyncGetStackTrace() // mounted 908 : tryGetStackTrace(); // unmounted 909 if (stackTrace == null) { 910 Thread.yield(); 911 } 912 } while (stackTrace == null); 913 return stackTrace; 914 } 915 916 /** 917 * Returns the stack trace for this virtual thread if it is unmounted. 918 * Returns null if the thread is in another state. 919 */ 920 private StackTraceElement[] tryGetStackTrace() { 921 int initialState = state(); 922 return switch (initialState) { 923 case RUNNABLE, PARKED -> { 924 int suspendedState = initialState | SUSPENDED; 925 if (compareAndSetState(initialState, suspendedState)) { 926 try { 927 yield cont.getStackTrace(); 928 } finally { 929 assert state == suspendedState; 930 setState(initialState); 931 932 // re-submit if runnable 933 // re-submit if unparked while suspended 934 if (initialState == RUNNABLE 935 || (parkPermit && compareAndSetState(PARKED, RUNNABLE))) { 936 try { 937 submitRunContinuation(); 938 } catch (RejectedExecutionException ignore) { } 939 } 940 } 941 } 942 yield null; 943 } 944 case NEW, STARTED, TERMINATED -> new StackTraceElement[0]; // empty stack 945 default -> null; 946 }; 947 } 948 949 @Override 950 public String toString() { 951 StringBuilder sb = new StringBuilder("VirtualThread[#"); 952 sb.append(threadId()); 953 String name = getName(); 954 if (!name.isEmpty()) { 955 sb.append(","); 956 sb.append(name); 957 } 958 sb.append("]/"); 959 Thread carrier = carrierThread; 960 if (carrier != null) { 961 // include the carrier thread state and name when mounted 962 synchronized (carrierThreadAccessLock()) { 963 carrier = carrierThread; 964 if (carrier != null) { 965 String stateAsString = carrier.threadState().toString(); 966 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 967 sb.append('@'); 968 sb.append(carrier.getName()); 969 } 970 } 971 } 972 // include virtual thread state when not mounted 973 if (carrier == null) { 974 String stateAsString = threadState().toString(); 975 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 976 } 977 return sb.toString(); 978 } 979 980 @Override 981 public int hashCode() { 982 return (int) threadId(); 983 } 984 985 @Override 986 public boolean equals(Object obj) { 987 return obj == this; 988 } 989 990 /** 991 * Returns the termination object, creating it if needed. 992 */ 993 private CountDownLatch getTermination() { 994 CountDownLatch termination = this.termination; 995 if (termination == null) { 996 termination = new CountDownLatch(1); 997 if (!U.compareAndSetReference(this, TERMINATION, null, termination)) { 998 termination = this.termination; 999 } 1000 } 1001 return termination; 1002 } 1003 1004 /** 1005 * Returns the lock object to synchronize on when accessing carrierThread. 1006 * The lock prevents carrierThread from being reset to null during unmount. 1007 */ 1008 private Object carrierThreadAccessLock() { 1009 // return interruptLock as unmount has to coordinate with interrupt 1010 return interruptLock; 1011 } 1012 1013 // -- wrappers for get/set of state, parking permit, and carrier thread -- 1014 1015 private int state() { 1016 return state; // volatile read 1017 } 1018 1019 private void setState(int newValue) { 1020 state = newValue; // volatile write 1021 } 1022 1023 private boolean compareAndSetState(int expectedValue, int newValue) { 1024 return U.compareAndSetInt(this, STATE, expectedValue, newValue); 1025 } 1026 1027 private void setParkPermit(boolean newValue) { 1028 if (parkPermit != newValue) { 1029 parkPermit = newValue; 1030 } 1031 } 1032 1033 private boolean getAndSetParkPermit(boolean newValue) { 1034 if (parkPermit != newValue) { 1035 return U.getAndSetBoolean(this, PARK_PERMIT, newValue); 1036 } else { 1037 return newValue; 1038 } 1039 } 1040 1041 private void setCarrierThread(Thread carrier) { 1042 // U.putReferenceRelease(this, CARRIER_THREAD, carrier); 1043 this.carrierThread = carrier; 1044 } 1045 1046 // -- JVM TI support -- 1047 1048 @IntrinsicCandidate 1049 @JvmtiMountTransition 1050 private native void notifyJvmtiMount(boolean hide, boolean firstMount); 1051 1052 @IntrinsicCandidate 1053 @JvmtiMountTransition 1054 private native void notifyJvmtiUnmount(boolean hide, boolean lastUnmount); 1055 1056 @IntrinsicCandidate 1057 @JvmtiMountTransition 1058 private native void notifyJvmtiHideFrames(boolean hide); 1059 1060 private static native void registerNatives(); 1061 static { 1062 registerNatives(); 1063 } 1064 1065 /** 1066 * Creates the default scheduler. 1067 */ 1068 @SuppressWarnings("removal") 1069 private static ForkJoinPool createDefaultScheduler() { 1070 ForkJoinWorkerThreadFactory factory = pool -> { 1071 PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread(pool); 1072 return AccessController.doPrivileged(pa); 1073 }; 1074 PrivilegedAction<ForkJoinPool> pa = () -> { 1075 int parallelism, maxPoolSize, minRunnable; 1076 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism"); 1077 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize"); 1078 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable"); 1079 if (parallelismValue != null) { 1080 parallelism = Integer.parseInt(parallelismValue); 1081 } else { 1082 parallelism = Runtime.getRuntime().availableProcessors(); 1083 } 1084 if (maxPoolSizeValue != null) { 1085 maxPoolSize = Integer.parseInt(maxPoolSizeValue); 1086 parallelism = Integer.min(parallelism, maxPoolSize); 1087 } else { 1088 maxPoolSize = Integer.max(parallelism, 256); 1089 } 1090 if (minRunnableValue != null) { 1091 minRunnable = Integer.parseInt(minRunnableValue); 1092 } else { 1093 minRunnable = Integer.max(parallelism / 2, 1); 1094 } 1095 Thread.UncaughtExceptionHandler handler = (t, e) -> { }; 1096 boolean asyncMode = true; // FIFO 1097 return new ForkJoinPool(parallelism, factory, handler, asyncMode, 1098 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS); 1099 }; 1100 return AccessController.doPrivileged(pa); 1101 } 1102 1103 /** 1104 * Creates the ScheduledThreadPoolExecutor used for timed unpark. 1105 */ 1106 private static ScheduledExecutorService createDelayedTaskScheduler() { 1107 String propValue = GetPropertyAction.privilegedGetProperty("jdk.unparker.maxPoolSize"); 1108 int poolSize; 1109 if (propValue != null) { 1110 poolSize = Integer.parseInt(propValue); 1111 } else { 1112 poolSize = 1; 1113 } 1114 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) 1115 Executors.newScheduledThreadPool(poolSize, task -> { 1116 return InnocuousThread.newThread("VirtualThread-unparker", task); 1117 }); 1118 stpe.setRemoveOnCancelPolicy(true); 1119 return stpe; 1120 } 1121 1122 /** 1123 * Reads the value of the jdk.tracePinnedThreads property to determine if stack 1124 * traces should be printed when a carrier thread is pinned when a virtual thread 1125 * attempts to park. 1126 */ 1127 private static int tracePinningMode() { 1128 String propValue = GetPropertyAction.privilegedGetProperty("jdk.tracePinnedThreads"); 1129 if (propValue != null) { 1130 if (propValue.length() == 0 || "full".equalsIgnoreCase(propValue)) 1131 return 1; 1132 if ("short".equalsIgnoreCase(propValue)) 1133 return 2; 1134 } 1135 return 0; 1136 } 1137 }