1 /* 2 * Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.lang; 26 27 import java.security.AccessController; 28 import java.security.PrivilegedAction; 29 import java.util.Locale; 30 import java.util.Objects; 31 import java.util.concurrent.Callable; 32 import java.util.concurrent.CountDownLatch; 33 import java.util.concurrent.Executor; 34 import java.util.concurrent.Executors; 35 import java.util.concurrent.ForkJoinPool; 36 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory; 37 import java.util.concurrent.ForkJoinTask; 38 import java.util.concurrent.ForkJoinWorkerThread; 39 import java.util.concurrent.Future; 40 import java.util.concurrent.RejectedExecutionException; 41 import java.util.concurrent.ScheduledExecutorService; 42 import java.util.concurrent.ScheduledThreadPoolExecutor; 43 import jdk.internal.event.VirtualThreadEndEvent; 44 import jdk.internal.event.VirtualThreadPinnedEvent; 45 import jdk.internal.event.VirtualThreadStartEvent; 46 import jdk.internal.event.VirtualThreadSubmitFailedEvent; 47 import jdk.internal.misc.CarrierThread; 48 import jdk.internal.misc.InnocuousThread; 49 import jdk.internal.misc.Unsafe; 50 import jdk.internal.vm.Continuation; 51 import jdk.internal.vm.ContinuationScope; 52 import jdk.internal.vm.StackableScope; 53 import jdk.internal.vm.ThreadContainer; 54 import jdk.internal.vm.ThreadContainers; 55 import jdk.internal.vm.annotation.ChangesCurrentThread; 56 import jdk.internal.vm.annotation.Hidden; 57 import jdk.internal.vm.annotation.IntrinsicCandidate; 58 import jdk.internal.vm.annotation.JvmtiMountTransition; 59 import jdk.internal.vm.annotation.ReservedStackAccess; 60 import sun.nio.ch.Interruptible; 61 import sun.security.action.GetPropertyAction; 62 import static java.util.concurrent.TimeUnit.*; 63 64 /** 65 * A thread that is scheduled by the Java virtual machine rather than the operating 66 * system. 67 */ 68 final class VirtualThread extends BaseVirtualThread { 69 private static final Unsafe U = Unsafe.getUnsafe(); 70 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads"); 71 private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler(); 72 private static final ScheduledExecutorService UNPARKER = createDelayedTaskScheduler(); 73 private static final int TRACE_PINNING_MODE = tracePinningMode(); 74 75 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state"); 76 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit"); 77 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread"); 78 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination"); 79 80 // scheduler and continuation 81 private final Executor scheduler; 82 private final Continuation cont; 83 private final Runnable runContinuation; 84 85 // virtual thread state, accessed by VM 86 private volatile int state; 87 88 /* 89 * Virtual thread state transitions: 90 * 91 * NEW -> STARTED // Thread.start, schedule to run 92 * STARTED -> TERMINATED // failed to start 93 * STARTED -> RUNNING // first run 94 * RUNNING -> TERMINATED // done 95 * 96 * RUNNING -> PARKING // Thread parking with LockSupport.park 97 * PARKING -> PARKED // cont.yield successful, parked indefinitely 98 * PARKING -> PINNED // cont.yield failed, parked indefinitely on carrier 99 * PARKED -> RUNNABLE // unparked, schedule to continue 100 * PINNED -> RUNNING // unparked, continue execution on same carrier 101 * 102 * RUNNING -> TIMED_PARKING // Thread parking with LockSupport.parkNanos 103 * TIMED_PARKING -> TIMED_PARKED // cont.yield successful, timed-parked 104 * TIMED_PARKING -> TIMED_PINNED // cont.yield failed, timed-parked on carrier 105 * TIMED_PARKED -> RUNNABLE // unparked, schedule to continue 106 * TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier 107 * 108 * RUNNABLE -> RUNNING // continue execution 109 * 110 * RUNNING -> YIELDING // Thread.yield 111 * YIELDING -> RUNNABLE // yield successful 112 * YIELDING -> RUNNING // yield failed 113 */ 114 private static final int NEW = 0; 115 private static final int STARTED = 1; 116 private static final int RUNNABLE = 2; // runnable-unmounted 117 private static final int RUNNING = 3; // runnable-mounted 118 119 // untimed parking 120 private static final int PARKING = 4; 121 private static final int PARKED = 5; // unmounted 122 private static final int PINNED = 6; // mounted 123 124 // timed parking 125 private static final int TIMED_PARKING = 7; 126 private static final int TIMED_PARKED = 8; 127 private static final int TIMED_PINNED = 9; 128 129 private static final int YIELDING = 10; // Thread.yield 130 131 private static final int TERMINATED = 99; // final state 132 133 // can be suspended from scheduling when unmounted 134 private static final int SUSPENDED = 1 << 8; 135 136 // parking permit 137 private volatile boolean parkPermit; 138 139 // carrier thread when mounted, accessed by VM 140 private volatile Thread carrierThread; 141 142 // termination object when joining, created lazily if needed 143 private volatile CountDownLatch termination; 144 145 /** 146 * Returns the continuation scope used for virtual threads. 147 */ 148 static ContinuationScope continuationScope() { 149 return VTHREAD_SCOPE; 150 } 151 152 /** 153 * Creates a new {@code VirtualThread} to run the given task with the given 154 * scheduler. If the given scheduler is {@code null} and the current thread 155 * is a platform thread then the newly created virtual thread will use the 156 * default scheduler. If given scheduler is {@code null} and the current 157 * thread is a virtual thread then the current thread's scheduler is used. 158 * 159 * @param scheduler the scheduler or null 160 * @param name thread name 161 * @param characteristics characteristics 162 * @param task the task to execute 163 */ 164 VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) { 165 super(name, characteristics, /*bound*/ false); 166 Objects.requireNonNull(task); 167 168 // choose scheduler if not specified 169 if (scheduler == null) { 170 Thread parent = Thread.currentThread(); 171 if (parent instanceof VirtualThread vparent) { 172 scheduler = vparent.scheduler; 173 } else { 174 scheduler = DEFAULT_SCHEDULER; 175 } 176 } 177 178 this.scheduler = scheduler; 179 this.cont = new VThreadContinuation(this, task); 180 this.runContinuation = this::runContinuation; 181 } 182 183 /** 184 * The continuation that a virtual thread executes. 185 */ 186 private static class VThreadContinuation extends Continuation { 187 VThreadContinuation(VirtualThread vthread, Runnable task) { 188 super(VTHREAD_SCOPE, wrap(vthread, task)); 189 } 190 @Override 191 protected void onPinned(Continuation.Pinned reason) { 192 if (TRACE_PINNING_MODE > 0) { 193 boolean printAll = (TRACE_PINNING_MODE == 1); 194 PinnedThreadPrinter.printStackTrace(System.out, printAll); 195 } 196 } 197 private static Runnable wrap(VirtualThread vthread, Runnable task) { 198 return new Runnable() { 199 @Hidden 200 public void run() { 201 vthread.run(task); 202 } 203 }; 204 } 205 } 206 207 /** 208 * Runs or continues execution on the current thread. The virtual thread is mounted 209 * on the current thread before the task runs or continues. It unmounts when the 210 * task completes or yields. 211 */ 212 @ChangesCurrentThread 213 private void runContinuation() { 214 // the carrier must be a platform thread 215 if (Thread.currentThread().isVirtual()) { 216 throw new WrongThreadException(); 217 } 218 219 // set state to RUNNING 220 int initialState = state(); 221 if (initialState == STARTED && compareAndSetState(STARTED, RUNNING)) { 222 // first run 223 } else if (initialState == RUNNABLE && compareAndSetState(RUNNABLE, RUNNING)) { 224 // consume parking permit 225 setParkPermit(false); 226 } else { 227 // not runnable 228 return; 229 } 230 231 mount(); 232 try { 233 cont.run(); 234 } finally { 235 unmount(); 236 if (cont.isDone()) { 237 afterDone(); 238 } else { 239 afterYield(); 240 } 241 } 242 } 243 244 /** 245 * Submits the runContinuation task to the scheduler. For the default scheduler, 246 * and calling it on a worker thread, the task will be pushed to the local queue, 247 * otherwise it will be pushed to a submission queue. 248 * 249 * @throws RejectedExecutionException 250 */ 251 private void submitRunContinuation() { 252 try { 253 scheduler.execute(runContinuation); 254 } catch (RejectedExecutionException ree) { 255 submitFailed(ree); 256 throw ree; 257 } 258 } 259 260 /** 261 * Submits the runContinuation task to the scheduler with a lazy submit. 262 * @throws RejectedExecutionException 263 * @see ForkJoinPool#lazySubmit(ForkJoinTask) 264 */ 265 private void lazySubmitRunContinuation(ForkJoinPool pool) { 266 try { 267 pool.lazySubmit(ForkJoinTask.adapt(runContinuation)); 268 } catch (RejectedExecutionException ree) { 269 submitFailed(ree); 270 throw ree; 271 } 272 } 273 274 /** 275 * Submits the runContinuation task to the scheduler as an external submit. 276 * @throws RejectedExecutionException 277 * @see ForkJoinPool#externalSubmit(ForkJoinTask) 278 */ 279 private void externalSubmitRunContinuation(ForkJoinPool pool) { 280 try { 281 pool.externalSubmit(ForkJoinTask.adapt(runContinuation)); 282 } catch (RejectedExecutionException ree) { 283 submitFailed(ree); 284 throw ree; 285 } 286 } 287 288 /** 289 * If enabled, emits a JFR VirtualThreadSubmitFailedEvent. 290 */ 291 private void submitFailed(RejectedExecutionException ree) { 292 var event = new VirtualThreadSubmitFailedEvent(); 293 if (event.isEnabled()) { 294 event.javaThreadId = threadId(); 295 event.exceptionMessage = ree.getMessage(); 296 event.commit(); 297 } 298 } 299 300 /** 301 * Runs a task in the context of this virtual thread. 302 */ 303 private void run(Runnable task) { 304 assert Thread.currentThread() == this && state == RUNNING; 305 306 // notify JVMTI, may post VirtualThreadStart event 307 notifyJvmtiStart(); 308 309 // emit JFR event if enabled 310 if (VirtualThreadStartEvent.isTurnedOn()) { 311 var event = new VirtualThreadStartEvent(); 312 event.javaThreadId = threadId(); 313 event.commit(); 314 } 315 316 Object bindings = Thread.scopedValueBindings(); 317 try { 318 runWith(bindings, task); 319 } catch (Throwable exc) { 320 dispatchUncaughtException(exc); 321 } finally { 322 try { 323 // pop any remaining scopes from the stack, this may block 324 StackableScope.popAll(); 325 326 // emit JFR event if enabled 327 if (VirtualThreadEndEvent.isTurnedOn()) { 328 var event = new VirtualThreadEndEvent(); 329 event.javaThreadId = threadId(); 330 event.commit(); 331 } 332 333 } finally { 334 // notify JVMTI, may post VirtualThreadEnd event 335 notifyJvmtiEnd(); 336 } 337 } 338 } 339 340 /** 341 * Mounts this virtual thread onto the current platform thread. On 342 * return, the current thread is the virtual thread. 343 */ 344 @ChangesCurrentThread 345 @ReservedStackAccess 346 private void mount() { 347 // notify JVMTI before mount 348 notifyJvmtiMount(/*hide*/true); 349 350 // sets the carrier thread 351 Thread carrier = Thread.currentCarrierThread(); 352 setCarrierThread(carrier); 353 354 // sync up carrier thread interrupt status if needed 355 if (interrupted) { 356 carrier.setInterrupt(); 357 } else if (carrier.isInterrupted()) { 358 synchronized (interruptLock) { 359 // need to recheck interrupt status 360 if (!interrupted) { 361 carrier.clearInterrupt(); 362 } 363 } 364 } 365 366 // set Thread.currentThread() to return this virtual thread 367 carrier.setCurrentThread(this); 368 } 369 370 /** 371 * Unmounts this virtual thread from the carrier. On return, the 372 * current thread is the current platform thread. 373 */ 374 @ChangesCurrentThread 375 @ReservedStackAccess 376 private void unmount() { 377 // set Thread.currentThread() to return the platform thread 378 Thread carrier = this.carrierThread; 379 carrier.setCurrentThread(carrier); 380 381 // break connection to carrier thread, synchronized with interrupt 382 synchronized (interruptLock) { 383 setCarrierThread(null); 384 } 385 carrier.clearInterrupt(); 386 387 // notify JVMTI after unmount 388 notifyJvmtiUnmount(/*hide*/false); 389 } 390 391 /** 392 * Sets the current thread to the current carrier thread. 393 */ 394 @ChangesCurrentThread 395 @JvmtiMountTransition 396 private void switchToCarrierThread() { 397 notifyJvmtiHideFrames(true); 398 Thread carrier = this.carrierThread; 399 assert Thread.currentThread() == this 400 && carrier == Thread.currentCarrierThread(); 401 carrier.setCurrentThread(carrier); 402 } 403 404 /** 405 * Sets the current thread to the given virtual thread. 406 */ 407 @ChangesCurrentThread 408 @JvmtiMountTransition 409 private void switchToVirtualThread(VirtualThread vthread) { 410 Thread carrier = vthread.carrierThread; 411 assert carrier == Thread.currentCarrierThread(); 412 carrier.setCurrentThread(vthread); 413 notifyJvmtiHideFrames(false); 414 } 415 416 /** 417 * Executes the given value returning task on the current carrier thread. 418 */ 419 @ChangesCurrentThread 420 <V> V executeOnCarrierThread(Callable<V> task) throws Exception { 421 assert Thread.currentThread() == this; 422 switchToCarrierThread(); 423 try { 424 return task.call(); 425 } finally { 426 switchToVirtualThread(this); 427 } 428 } 429 430 /** 431 * Invokes Continuation.yield, notifying JVMTI (if enabled) to hide frames until 432 * the continuation continues. 433 */ 434 @Hidden 435 private boolean yieldContinuation() { 436 notifyJvmtiUnmount(/*hide*/true); 437 try { 438 return Continuation.yield(VTHREAD_SCOPE); 439 } finally { 440 notifyJvmtiMount(/*hide*/false); 441 } 442 } 443 444 /** 445 * Invoked after the continuation yields. If parking then it sets the state 446 * and also re-submits the task to continue if unparked while parking. 447 * If yielding due to Thread.yield then it just submits the task to continue. 448 */ 449 private void afterYield() { 450 assert carrierThread == null; 451 452 int s = state(); 453 454 // LockSupport.park/parkNanos 455 if (s == PARKING || s == TIMED_PARKING) { 456 int newState = (s == PARKING) ? PARKED : TIMED_PARKED; 457 setState(newState); 458 459 // may have been unparked while parking 460 if (parkPermit && compareAndSetState(newState, RUNNABLE)) { 461 // lazy submit to continue on the current thread as carrier if possible 462 if (currentThread() instanceof CarrierThread ct) { 463 lazySubmitRunContinuation(ct.getPool()); 464 } else { 465 submitRunContinuation(); 466 } 467 468 } 469 return; 470 } 471 472 // Thread.yield 473 if (s == YIELDING) { 474 setState(RUNNABLE); 475 476 // external submit if there are no tasks in the local task queue 477 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 478 externalSubmitRunContinuation(ct.getPool()); 479 } else { 480 submitRunContinuation(); 481 } 482 return; 483 } 484 485 assert false; 486 } 487 488 /** 489 * Invoked after the continuation completes. 490 */ 491 private void afterDone() { 492 afterDone(true); 493 } 494 495 /** 496 * Invoked after the continuation completes (or start failed). Sets the thread 497 * state to TERMINATED and notifies anyone waiting for the thread to terminate. 498 * 499 * @param notifyContainer true if its container should be notified 500 */ 501 private void afterDone(boolean notifyContainer) { 502 assert carrierThread == null; 503 setState(TERMINATED); 504 505 // notify anyone waiting for this virtual thread to terminate 506 CountDownLatch termination = this.termination; 507 if (termination != null) { 508 assert termination.getCount() == 1; 509 termination.countDown(); 510 } 511 512 // notify container 513 if (notifyContainer) { 514 threadContainer().onExit(this); 515 } 516 517 // clear references to thread locals 518 clearReferences(); 519 } 520 521 /** 522 * Schedules this {@code VirtualThread} to execute. 523 * 524 * @throws IllegalStateException if the container is shutdown or closed 525 * @throws IllegalThreadStateException if the thread has already been started 526 * @throws RejectedExecutionException if the scheduler cannot accept a task 527 */ 528 @Override 529 void start(ThreadContainer container) { 530 if (!compareAndSetState(NEW, STARTED)) { 531 throw new IllegalThreadStateException("Already started"); 532 } 533 534 // bind thread to container 535 assert threadContainer() == null; 536 setThreadContainer(container); 537 538 // start thread 539 boolean addedToContainer = false; 540 boolean started = false; 541 try { 542 container.onStart(this); // may throw 543 addedToContainer = true; 544 545 // scoped values may be inherited 546 inheritScopedValueBindings(container); 547 548 // submit task to run thread 549 submitRunContinuation(); 550 started = true; 551 } finally { 552 if (!started) { 553 afterDone(addedToContainer); 554 } 555 } 556 } 557 558 @Override 559 public void start() { 560 start(ThreadContainers.root()); 561 } 562 563 @Override 564 public void run() { 565 // do nothing 566 } 567 568 /** 569 * Parks until unparked or interrupted. If already unparked then the parking 570 * permit is consumed and this method completes immediately (meaning it doesn't 571 * yield). It also completes immediately if the interrupt status is set. 572 */ 573 @Override 574 void park() { 575 assert Thread.currentThread() == this; 576 577 // complete immediately if parking permit available or interrupted 578 if (getAndSetParkPermit(false) || interrupted) 579 return; 580 581 // park the thread 582 boolean yielded = false; 583 setState(PARKING); 584 try { 585 yielded = yieldContinuation(); // may throw 586 } finally { 587 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 588 if (!yielded) { 589 assert state() == PARKING; 590 setState(RUNNING); 591 } 592 } 593 594 // park on the carrier thread when pinned 595 if (!yielded) { 596 parkOnCarrierThread(false, 0); 597 } 598 } 599 600 /** 601 * Parks up to the given waiting time or until unparked or interrupted. 602 * If already unparked then the parking permit is consumed and this method 603 * completes immediately (meaning it doesn't yield). It also completes immediately 604 * if the interrupt status is set or the waiting time is {@code <= 0}. 605 * 606 * @param nanos the maximum number of nanoseconds to wait. 607 */ 608 @Override 609 void parkNanos(long nanos) { 610 assert Thread.currentThread() == this; 611 612 // complete immediately if parking permit available or interrupted 613 if (getAndSetParkPermit(false) || interrupted) 614 return; 615 616 // park the thread for the waiting time 617 if (nanos > 0) { 618 long startTime = System.nanoTime(); 619 620 boolean yielded = false; 621 Future<?> unparker = scheduleUnpark(this::unpark, nanos); 622 setState(TIMED_PARKING); 623 try { 624 yielded = yieldContinuation(); // may throw 625 } finally { 626 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 627 if (!yielded) { 628 assert state() == TIMED_PARKING; 629 setState(RUNNING); 630 } 631 cancel(unparker); 632 } 633 634 // park on carrier thread for remaining time when pinned 635 if (!yielded) { 636 long remainingNanos = nanos - (System.nanoTime() - startTime); 637 parkOnCarrierThread(true, remainingNanos); 638 } 639 } 640 } 641 642 /** 643 * Parks the current carrier thread up to the given waiting time or until 644 * unparked or interrupted. If the virtual thread is interrupted then the 645 * interrupt status will be propagated to the carrier thread. 646 * @param timed true for a timed park, false for untimed 647 * @param nanos the waiting time in nanoseconds 648 */ 649 private void parkOnCarrierThread(boolean timed, long nanos) { 650 assert state() == RUNNING; 651 652 VirtualThreadPinnedEvent event; 653 try { 654 event = new VirtualThreadPinnedEvent(); 655 event.begin(); 656 } catch (OutOfMemoryError e) { 657 event = null; 658 } 659 660 setState(timed ? TIMED_PINNED : PINNED); 661 try { 662 if (!parkPermit) { 663 if (!timed) { 664 U.park(false, 0); 665 } else if (nanos > 0) { 666 U.park(false, nanos); 667 } 668 } 669 } finally { 670 setState(RUNNING); 671 } 672 673 // consume parking permit 674 setParkPermit(false); 675 676 if (event != null) { 677 try { 678 event.commit(); 679 } catch (OutOfMemoryError e) { 680 // ignore 681 } 682 } 683 } 684 685 /** 686 * Schedule an unpark task to run after a given delay. 687 */ 688 @ChangesCurrentThread 689 private Future<?> scheduleUnpark(Runnable unparker, long nanos) { 690 // need to switch to current carrier thread to avoid nested parking 691 switchToCarrierThread(); 692 try { 693 return UNPARKER.schedule(unparker, nanos, NANOSECONDS); 694 } finally { 695 switchToVirtualThread(this); 696 } 697 } 698 699 /** 700 * Cancels a task if it has not completed. 701 */ 702 @ChangesCurrentThread 703 private void cancel(Future<?> future) { 704 if (!future.isDone()) { 705 // need to switch to current carrier thread to avoid nested parking 706 switchToCarrierThread(); 707 try { 708 future.cancel(false); 709 } finally { 710 switchToVirtualThread(this); 711 } 712 } 713 } 714 715 /** 716 * Re-enables this virtual thread for scheduling. If the virtual thread was 717 * {@link #park() parked} then it will be unblocked, otherwise its next call 718 * to {@code park} or {@linkplain #parkNanos(long) parkNanos} is guaranteed 719 * not to block. 720 * @throws RejectedExecutionException if the scheduler cannot accept a task 721 */ 722 @Override 723 @ChangesCurrentThread 724 void unpark() { 725 Thread currentThread = Thread.currentThread(); 726 if (!getAndSetParkPermit(true) && currentThread != this) { 727 int s = state(); 728 boolean parked = (s == PARKED) || (s == TIMED_PARKED); 729 if (parked && compareAndSetState(s, RUNNABLE)) { 730 if (currentThread instanceof VirtualThread vthread) { 731 vthread.switchToCarrierThread(); 732 try { 733 submitRunContinuation(); 734 } finally { 735 switchToVirtualThread(vthread); 736 } 737 } else { 738 submitRunContinuation(); 739 } 740 } else if ((s == PINNED) || (s == TIMED_PINNED)) { 741 // unpark carrier thread when pinned. 742 synchronized (carrierThreadAccessLock()) { 743 Thread carrier = carrierThread; 744 if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) { 745 U.unpark(carrier); 746 } 747 } 748 } 749 } 750 } 751 752 /** 753 * Attempts to yield the current virtual thread (Thread.yield). 754 */ 755 void tryYield() { 756 assert Thread.currentThread() == this; 757 setState(YIELDING); 758 boolean yielded = false; 759 try { 760 yielded = yieldContinuation(); // may throw 761 } finally { 762 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 763 if (!yielded) { 764 assert state() == YIELDING; 765 setState(RUNNING); 766 } 767 } 768 } 769 770 /** 771 * Sleep the current thread for the given sleep time (in nanoseconds). If 772 * nanos is 0 then the thread will attempt to yield. 773 * 774 * @implNote This implementation parks the thread for the given sleeping time 775 * and will therefore be observed in PARKED state during the sleep. Parking 776 * will consume the parking permit so this method makes available the parking 777 * permit after the sleep. This may be observed as a spurious, but benign, 778 * wakeup when the thread subsequently attempts to park. 779 * 780 * @param nanos the maximum number of nanoseconds to sleep 781 * @throws InterruptedException if interrupted while sleeping 782 */ 783 void sleepNanos(long nanos) throws InterruptedException { 784 assert Thread.currentThread() == this && nanos >= 0; 785 if (getAndClearInterrupt()) 786 throw new InterruptedException(); 787 if (nanos == 0) { 788 tryYield(); 789 } else { 790 // park for the sleep time 791 try { 792 long remainingNanos = nanos; 793 long startNanos = System.nanoTime(); 794 while (remainingNanos > 0) { 795 parkNanos(remainingNanos); 796 if (getAndClearInterrupt()) { 797 throw new InterruptedException(); 798 } 799 remainingNanos = nanos - (System.nanoTime() - startNanos); 800 } 801 } finally { 802 // may have been unparked while sleeping 803 setParkPermit(true); 804 } 805 } 806 } 807 808 /** 809 * Waits up to {@code nanos} nanoseconds for this virtual thread to terminate. 810 * A timeout of {@code 0} means to wait forever. 811 * 812 * @throws InterruptedException if interrupted while waiting 813 * @return true if the thread has terminated 814 */ 815 boolean joinNanos(long nanos) throws InterruptedException { 816 if (state() == TERMINATED) 817 return true; 818 819 // ensure termination object exists, then re-check state 820 CountDownLatch termination = getTermination(); 821 if (state() == TERMINATED) 822 return true; 823 824 // wait for virtual thread to terminate 825 if (nanos == 0) { 826 termination.await(); 827 } else { 828 boolean terminated = termination.await(nanos, NANOSECONDS); 829 if (!terminated) { 830 // waiting time elapsed 831 return false; 832 } 833 } 834 assert state() == TERMINATED; 835 return true; 836 } 837 838 @Override 839 @SuppressWarnings("removal") 840 public void interrupt() { 841 if (Thread.currentThread() != this) { 842 checkAccess(); 843 synchronized (interruptLock) { 844 interrupted = true; 845 Interruptible b = nioBlocker; 846 if (b != null) { 847 b.interrupt(this); 848 } 849 850 // interrupt carrier thread if mounted 851 Thread carrier = carrierThread; 852 if (carrier != null) carrier.setInterrupt(); 853 } 854 } else { 855 interrupted = true; 856 carrierThread.setInterrupt(); 857 } 858 unpark(); 859 } 860 861 @Override 862 public boolean isInterrupted() { 863 return interrupted; 864 } 865 866 @Override 867 boolean getAndClearInterrupt() { 868 assert Thread.currentThread() == this; 869 boolean oldValue = interrupted; 870 if (oldValue) { 871 synchronized (interruptLock) { 872 interrupted = false; 873 carrierThread.clearInterrupt(); 874 } 875 } 876 return oldValue; 877 } 878 879 @Override 880 Thread.State threadState() { 881 int s = state(); 882 switch (s & ~SUSPENDED) { 883 case NEW: 884 return Thread.State.NEW; 885 case STARTED: 886 // return NEW if thread container not yet set 887 if (threadContainer() == null) { 888 return Thread.State.NEW; 889 } else { 890 return Thread.State.RUNNABLE; 891 } 892 case RUNNABLE: 893 // runnable, not mounted 894 return Thread.State.RUNNABLE; 895 case RUNNING: 896 // if mounted then return state of carrier thread 897 synchronized (carrierThreadAccessLock()) { 898 Thread carrierThread = this.carrierThread; 899 if (carrierThread != null) { 900 return carrierThread.threadState(); 901 } 902 } 903 // runnable, mounted 904 return Thread.State.RUNNABLE; 905 case PARKING: 906 case TIMED_PARKING: 907 case YIELDING: 908 // runnable, mounted, not yet waiting 909 return Thread.State.RUNNABLE; 910 case PARKED: 911 case PINNED: 912 return State.WAITING; 913 case TIMED_PARKED: 914 case TIMED_PINNED: 915 return State.TIMED_WAITING; 916 case TERMINATED: 917 return Thread.State.TERMINATED; 918 default: 919 throw new InternalError(); 920 } 921 } 922 923 @Override 924 boolean alive() { 925 int s = state; 926 return (s != NEW && s != TERMINATED); 927 } 928 929 @Override 930 boolean isTerminated() { 931 return (state == TERMINATED); 932 } 933 934 @Override 935 StackTraceElement[] asyncGetStackTrace() { 936 StackTraceElement[] stackTrace; 937 do { 938 stackTrace = (carrierThread != null) 939 ? super.asyncGetStackTrace() // mounted 940 : tryGetStackTrace(); // unmounted 941 if (stackTrace == null) { 942 Thread.yield(); 943 } 944 } while (stackTrace == null); 945 return stackTrace; 946 } 947 948 /** 949 * Returns the stack trace for this virtual thread if it is unmounted. 950 * Returns null if the thread is in another state. 951 */ 952 private StackTraceElement[] tryGetStackTrace() { 953 int initialState = state(); 954 return switch (initialState) { 955 case RUNNABLE, PARKED, TIMED_PARKED -> { 956 int suspendedState = initialState | SUSPENDED; 957 if (compareAndSetState(initialState, suspendedState)) { 958 try { 959 yield cont.getStackTrace(); 960 } finally { 961 assert state == suspendedState; 962 setState(initialState); 963 964 // re-submit if runnable 965 // re-submit if unparked while suspended 966 if (initialState == RUNNABLE 967 || (parkPermit && compareAndSetState(initialState, RUNNABLE))) { 968 try { 969 submitRunContinuation(); 970 } catch (RejectedExecutionException ignore) { } 971 } 972 } 973 } 974 yield null; 975 } 976 case NEW, STARTED, TERMINATED -> new StackTraceElement[0]; // empty stack 977 default -> null; 978 }; 979 } 980 981 @Override 982 public String toString() { 983 StringBuilder sb = new StringBuilder("VirtualThread[#"); 984 sb.append(threadId()); 985 String name = getName(); 986 if (!name.isEmpty()) { 987 sb.append(","); 988 sb.append(name); 989 } 990 sb.append("]/"); 991 Thread carrier = carrierThread; 992 if (carrier != null) { 993 // include the carrier thread state and name when mounted 994 synchronized (carrierThreadAccessLock()) { 995 carrier = carrierThread; 996 if (carrier != null) { 997 String stateAsString = carrier.threadState().toString(); 998 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 999 sb.append('@'); 1000 sb.append(carrier.getName()); 1001 } 1002 } 1003 } 1004 // include virtual thread state when not mounted 1005 if (carrier == null) { 1006 String stateAsString = threadState().toString(); 1007 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1008 } 1009 return sb.toString(); 1010 } 1011 1012 @Override 1013 public int hashCode() { 1014 return (int) threadId(); 1015 } 1016 1017 @Override 1018 public boolean equals(Object obj) { 1019 return obj == this; 1020 } 1021 1022 /** 1023 * Returns the termination object, creating it if needed. 1024 */ 1025 private CountDownLatch getTermination() { 1026 CountDownLatch termination = this.termination; 1027 if (termination == null) { 1028 termination = new CountDownLatch(1); 1029 if (!U.compareAndSetReference(this, TERMINATION, null, termination)) { 1030 termination = this.termination; 1031 } 1032 } 1033 return termination; 1034 } 1035 1036 /** 1037 * Returns the lock object to synchronize on when accessing carrierThread. 1038 * The lock prevents carrierThread from being reset to null during unmount. 1039 */ 1040 private Object carrierThreadAccessLock() { 1041 // return interruptLock as unmount has to coordinate with interrupt 1042 return interruptLock; 1043 } 1044 1045 // -- wrappers for get/set of state, parking permit, and carrier thread -- 1046 1047 private int state() { 1048 return state; // volatile read 1049 } 1050 1051 private void setState(int newValue) { 1052 state = newValue; // volatile write 1053 } 1054 1055 private boolean compareAndSetState(int expectedValue, int newValue) { 1056 return U.compareAndSetInt(this, STATE, expectedValue, newValue); 1057 } 1058 1059 private void setParkPermit(boolean newValue) { 1060 if (parkPermit != newValue) { 1061 parkPermit = newValue; 1062 } 1063 } 1064 1065 private boolean getAndSetParkPermit(boolean newValue) { 1066 if (parkPermit != newValue) { 1067 return U.getAndSetBoolean(this, PARK_PERMIT, newValue); 1068 } else { 1069 return newValue; 1070 } 1071 } 1072 1073 private void setCarrierThread(Thread carrier) { 1074 // U.putReferenceRelease(this, CARRIER_THREAD, carrier); 1075 this.carrierThread = carrier; 1076 } 1077 1078 // -- JVM TI support -- 1079 1080 @IntrinsicCandidate 1081 @JvmtiMountTransition 1082 private native void notifyJvmtiStart(); 1083 1084 @IntrinsicCandidate 1085 @JvmtiMountTransition 1086 private native void notifyJvmtiEnd(); 1087 1088 @IntrinsicCandidate 1089 @JvmtiMountTransition 1090 private native void notifyJvmtiMount(boolean hide); 1091 1092 @IntrinsicCandidate 1093 @JvmtiMountTransition 1094 private native void notifyJvmtiUnmount(boolean hide); 1095 1096 @IntrinsicCandidate 1097 @JvmtiMountTransition 1098 private native void notifyJvmtiHideFrames(boolean hide); 1099 1100 private static native void registerNatives(); 1101 static { 1102 registerNatives(); 1103 } 1104 1105 /** 1106 * Creates the default scheduler. 1107 */ 1108 @SuppressWarnings("removal") 1109 private static ForkJoinPool createDefaultScheduler() { 1110 ForkJoinWorkerThreadFactory factory = pool -> { 1111 PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread(pool); 1112 return AccessController.doPrivileged(pa); 1113 }; 1114 PrivilegedAction<ForkJoinPool> pa = () -> { 1115 int parallelism, maxPoolSize, minRunnable; 1116 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism"); 1117 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize"); 1118 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable"); 1119 if (parallelismValue != null) { 1120 parallelism = Integer.parseInt(parallelismValue); 1121 } else { 1122 parallelism = Runtime.getRuntime().availableProcessors(); 1123 } 1124 if (maxPoolSizeValue != null) { 1125 maxPoolSize = Integer.parseInt(maxPoolSizeValue); 1126 parallelism = Integer.min(parallelism, maxPoolSize); 1127 } else { 1128 maxPoolSize = Integer.max(parallelism, 256); 1129 } 1130 if (minRunnableValue != null) { 1131 minRunnable = Integer.parseInt(minRunnableValue); 1132 } else { 1133 minRunnable = Integer.max(parallelism / 2, 1); 1134 } 1135 Thread.UncaughtExceptionHandler handler = (t, e) -> { }; 1136 boolean asyncMode = true; // FIFO 1137 return new ForkJoinPool(parallelism, factory, handler, asyncMode, 1138 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS); 1139 }; 1140 return AccessController.doPrivileged(pa); 1141 } 1142 1143 /** 1144 * Creates the ScheduledThreadPoolExecutor used for timed unpark. 1145 */ 1146 private static ScheduledExecutorService createDelayedTaskScheduler() { 1147 String propValue = GetPropertyAction.privilegedGetProperty("jdk.unparker.maxPoolSize"); 1148 int poolSize; 1149 if (propValue != null) { 1150 poolSize = Integer.parseInt(propValue); 1151 } else { 1152 poolSize = 1; 1153 } 1154 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) 1155 Executors.newScheduledThreadPool(poolSize, task -> { 1156 return InnocuousThread.newThread("VirtualThread-unparker", task); 1157 }); 1158 stpe.setRemoveOnCancelPolicy(true); 1159 return stpe; 1160 } 1161 1162 /** 1163 * Reads the value of the jdk.tracePinnedThreads property to determine if stack 1164 * traces should be printed when a carrier thread is pinned when a virtual thread 1165 * attempts to park. 1166 */ 1167 private static int tracePinningMode() { 1168 String propValue = GetPropertyAction.privilegedGetProperty("jdk.tracePinnedThreads"); 1169 if (propValue != null) { 1170 if (propValue.length() == 0 || "full".equalsIgnoreCase(propValue)) 1171 return 1; 1172 if ("short".equalsIgnoreCase(propValue)) 1173 return 2; 1174 } 1175 return 0; 1176 } 1177 }