1 /* 2 * Copyright (c) 2018, 2022, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.lang; 26 27 import java.lang.ref.Reference; 28 import java.security.AccessController; 29 import java.security.PrivilegedAction; 30 import java.util.Locale; 31 import java.util.Objects; 32 import java.util.concurrent.CountDownLatch; 33 import java.util.concurrent.Executor; 34 import java.util.concurrent.Executors; 35 import java.util.concurrent.ForkJoinPool; 36 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory; 37 import java.util.concurrent.ForkJoinTask; 38 import java.util.concurrent.ForkJoinWorkerThread; 39 import java.util.concurrent.Future; 40 import java.util.concurrent.RejectedExecutionException; 41 import java.util.concurrent.ScheduledExecutorService; 42 import java.util.concurrent.ScheduledThreadPoolExecutor; 43 import jdk.internal.event.ThreadSleepEvent; 44 import jdk.internal.event.VirtualThreadEndEvent; 45 import jdk.internal.event.VirtualThreadPinnedEvent; 46 import jdk.internal.event.VirtualThreadStartEvent; 47 import jdk.internal.event.VirtualThreadSubmitFailedEvent; 48 import jdk.internal.misc.CarrierThread; 49 import jdk.internal.misc.InnocuousThread; 50 import jdk.internal.misc.Unsafe; 51 import jdk.internal.vm.Continuation; 52 import jdk.internal.vm.ContinuationScope; 53 import jdk.internal.vm.StackableScope; 54 import jdk.internal.vm.ThreadContainer; 55 import jdk.internal.vm.ThreadContainers; 56 import jdk.internal.vm.annotation.ChangesCurrentThread; 57 import jdk.internal.vm.annotation.ForceInline; 58 import jdk.internal.vm.annotation.Hidden; 59 import jdk.internal.vm.annotation.JvmtiMountTransition; 60 import sun.nio.ch.Interruptible; 61 import sun.security.action.GetPropertyAction; 62 import static java.util.concurrent.TimeUnit.*; 63 64 /** 65 * A thread that is scheduled by the Java virtual machine rather than the operating 66 * system. 67 */ 68 final class VirtualThread extends BaseVirtualThread { 69 private static final Unsafe U = Unsafe.getUnsafe(); 70 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads"); 71 private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler(); 72 private static final ScheduledExecutorService UNPARKER = createDelayedTaskScheduler(); 73 private static final int TRACE_PINNING_MODE = tracePinningMode(); 74 75 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state"); 76 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit"); 77 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread"); 78 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination"); 79 80 // scheduler and continuation 81 private final Executor scheduler; 82 private final Continuation cont; 83 private final Runnable runContinuation; 84 85 // virtual thread state, accessed by VM 86 private volatile int state; 87 88 /* 89 * Virtual thread state and transitions: 90 * 91 * NEW -> STARTED // Thread.start 92 * STARTED -> TERMINATED // failed to start 93 * STARTED -> RUNNING // first run 94 * 95 * RUNNING -> PARKING // Thread attempts to park 96 * PARKING -> PARKED // cont.yield successful, thread is parked 97 * PARKING -> PINNED // cont.yield failed, thread is pinned 98 * 99 * PARKED -> RUNNABLE // unpark or interrupted 100 * PINNED -> RUNNABLE // unpark or interrupted 101 * 102 * RUNNABLE -> RUNNING // continue execution 103 * 104 * RUNNING -> YIELDING // Thread.yield 105 * YIELDING -> RUNNABLE // yield successful 106 * YIELDING -> RUNNING // yield failed 107 * 108 * RUNNING -> TERMINATED // done 109 */ 110 private static final int NEW = 0; 111 private static final int STARTED = 1; 112 private static final int RUNNABLE = 2; // runnable-unmounted 113 private static final int RUNNING = 3; // runnable-mounted 114 private static final int PARKING = 4; 115 private static final int PARKED = 5; // unmounted 116 private static final int PINNED = 6; // mounted 117 private static final int YIELDING = 7; // Thread.yield 118 private static final int TERMINATED = 99; // final state 119 120 // can be suspended from scheduling when unmounted 121 private static final int SUSPENDED = 1 << 8; 122 private static final int RUNNABLE_SUSPENDED = (RUNNABLE | SUSPENDED); 123 private static final int PARKED_SUSPENDED = (PARKED | SUSPENDED); 124 125 // parking permit 126 private volatile boolean parkPermit; 127 128 // carrier thread when mounted, accessed by VM 129 private volatile Thread carrierThread; 130 131 // termination object when joining, created lazily if needed 132 private volatile CountDownLatch termination; 133 134 /** 135 * Returns the continuation scope used for virtual threads. 136 */ 137 static ContinuationScope continuationScope() { 138 return VTHREAD_SCOPE; 139 } 140 141 /** 142 * Creates a new {@code VirtualThread} to run the given task with the given 143 * scheduler. If the given scheduler is {@code null} and the current thread 144 * is a platform thread then the newly created virtual thread will use the 145 * default scheduler. If given scheduler is {@code null} and the current 146 * thread is a virtual thread then the current thread's scheduler is used. 147 * 148 * @param scheduler the scheduler or null 149 * @param name thread name 150 * @param characteristics characteristics 151 * @param task the task to execute 152 */ 153 VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) { 154 super(name, characteristics, /*bound*/ false); 155 Objects.requireNonNull(task); 156 157 // choose scheduler if not specified 158 if (scheduler == null) { 159 Thread parent = Thread.currentThread(); 160 if (parent instanceof VirtualThread vparent) { 161 scheduler = vparent.scheduler; 162 } else { 163 scheduler = DEFAULT_SCHEDULER; 164 } 165 } 166 167 this.scheduler = scheduler; 168 this.cont = new VThreadContinuation(this, task); 169 this.runContinuation = this::runContinuation; 170 } 171 172 /** 173 * The continuation that a virtual thread executes. 174 */ 175 private static class VThreadContinuation extends Continuation { 176 VThreadContinuation(VirtualThread vthread, Runnable task) { 177 super(VTHREAD_SCOPE, () -> vthread.run(task)); 178 } 179 @Override 180 protected void onPinned(Continuation.Pinned reason) { 181 if (TRACE_PINNING_MODE > 0) { 182 boolean printAll = (TRACE_PINNING_MODE == 1); 183 PinnedThreadPrinter.printStackTrace(System.out, printAll); 184 } 185 } 186 } 187 188 /** 189 * Runs or continues execution of the continuation on the current thread. 190 */ 191 private void runContinuation() { 192 // the carrier must be a platform thread 193 if (Thread.currentThread().isVirtual()) { 194 throw new WrongThreadException(); 195 } 196 197 // set state to RUNNING 198 boolean firstRun; 199 int initialState = state(); 200 if (initialState == STARTED && compareAndSetState(STARTED, RUNNING)) { 201 // first run 202 firstRun = true; 203 } else if (initialState == RUNNABLE && compareAndSetState(RUNNABLE, RUNNING)) { 204 // consume parking permit 205 setParkPermit(false); 206 firstRun = false; 207 } else { 208 // not runnable 209 return; 210 } 211 212 // notify JVMTI before mount 213 if (notifyJvmtiEvents) notifyJvmtiMountBegin(firstRun); 214 215 try { 216 cont.run(); 217 } finally { 218 if (cont.isDone()) { 219 afterTerminate(/*executed*/ true); 220 } else { 221 afterYield(); 222 } 223 } 224 } 225 226 /** 227 * Submits the runContinuation task to the scheduler. For the default scheduler, 228 * and calling it on a worker thread, the task will be pushed to the local queue, 229 * otherwise it will be pushed to a submission queue. 230 * 231 * @throws RejectedExecutionException 232 */ 233 private void submitRunContinuation() { 234 try { 235 scheduler.execute(runContinuation); 236 } catch (RejectedExecutionException ree) { 237 submitFailed(ree); 238 throw ree; 239 } 240 } 241 242 /** 243 * Submits the runContinuation task to the scheduler with a lazy submit. 244 * @throws RejectedExecutionException 245 * @see ForkJoinPool#lazySubmit(ForkJoinTask) 246 */ 247 private void lazySubmitRunContinuation(ForkJoinPool pool) { 248 try { 249 pool.lazySubmit(ForkJoinTask.adapt(runContinuation)); 250 } catch (RejectedExecutionException ree) { 251 submitFailed(ree); 252 throw ree; 253 } 254 } 255 256 /** 257 * Submits the runContinuation task to the scheduler as an external submit. 258 * @throws RejectedExecutionException 259 * @see ForkJoinPool#externalSubmit(ForkJoinTask) 260 */ 261 private void externalSubmitRunContinuation(ForkJoinPool pool) { 262 try { 263 pool.externalSubmit(ForkJoinTask.adapt(runContinuation)); 264 } catch (RejectedExecutionException ree) { 265 submitFailed(ree); 266 throw ree; 267 } 268 } 269 270 /** 271 * If enabled, emits a JFR VirtualThreadSubmitFailedEvent. 272 */ 273 private void submitFailed(RejectedExecutionException ree) { 274 var event = new VirtualThreadSubmitFailedEvent(); 275 if (event.isEnabled()) { 276 event.javaThreadId = threadId(); 277 event.exceptionMessage = ree.getMessage(); 278 event.commit(); 279 } 280 } 281 282 /** 283 * Runs a task in the context of this virtual thread. The virtual thread is 284 * mounted on the current (carrier) thread before the task runs. It unmounts 285 * from its carrier thread when the task completes. 286 */ 287 @ChangesCurrentThread 288 private void run(Runnable task) { 289 assert state == RUNNING; 290 boolean notifyJvmti = notifyJvmtiEvents; 291 292 // first mount 293 mount(); 294 if (notifyJvmti) notifyJvmtiMountEnd(true); 295 296 // emit JFR event if enabled 297 if (VirtualThreadStartEvent.isTurnedOn()) { 298 var event = new VirtualThreadStartEvent(); 299 event.javaThreadId = threadId(); 300 event.commit(); 301 } 302 303 Object bindings = scopedValueBindings(); 304 try { 305 runWith(bindings, task); 306 } catch (Throwable exc) { 307 dispatchUncaughtException(exc); 308 } finally { 309 try { 310 // pop any remaining scopes from the stack, this may block 311 StackableScope.popAll(); 312 313 // emit JFR event if enabled 314 if (VirtualThreadEndEvent.isTurnedOn()) { 315 var event = new VirtualThreadEndEvent(); 316 event.javaThreadId = threadId(); 317 event.commit(); 318 } 319 320 } finally { 321 // last unmount 322 if (notifyJvmti) notifyJvmtiUnmountBegin(true); 323 unmount(); 324 325 // final state 326 setState(TERMINATED); 327 } 328 } 329 } 330 331 @Hidden 332 @ForceInline 333 private void runWith(Object bindings, Runnable op) { 334 ensureMaterializedForStackWalk(bindings); 335 op.run(); 336 Reference.reachabilityFence(bindings); 337 } 338 339 /** 340 * Mounts this virtual thread onto the current platform thread. On 341 * return, the current thread is the virtual thread. 342 */ 343 @ChangesCurrentThread 344 private void mount() { 345 // sets the carrier thread 346 Thread carrier = Thread.currentCarrierThread(); 347 setCarrierThread(carrier); 348 349 // sync up carrier thread interrupt status if needed 350 if (interrupted) { 351 carrier.setInterrupt(); 352 } else if (carrier.isInterrupted()) { 353 synchronized (interruptLock) { 354 // need to recheck interrupt status 355 if (!interrupted) { 356 carrier.clearInterrupt(); 357 } 358 } 359 } 360 361 // set Thread.currentThread() to return this virtual thread 362 carrier.setCurrentThread(this); 363 } 364 365 /** 366 * Unmounts this virtual thread from the carrier. On return, the 367 * current thread is the current platform thread. 368 */ 369 @ChangesCurrentThread 370 private void unmount() { 371 // set Thread.currentThread() to return the platform thread 372 Thread carrier = this.carrierThread; 373 carrier.setCurrentThread(carrier); 374 375 // break connection to carrier thread, synchronized with interrupt 376 synchronized (interruptLock) { 377 setCarrierThread(null); 378 } 379 carrier.clearInterrupt(); 380 } 381 382 /** 383 * Sets the current thread to the current carrier thread. 384 * @return true if JVMTI was notified 385 */ 386 @ChangesCurrentThread 387 @JvmtiMountTransition 388 private boolean switchToCarrierThread() { 389 boolean notifyJvmti = notifyJvmtiEvents; 390 if (notifyJvmti) { 391 notifyJvmtiHideFrames(true); 392 } 393 Thread carrier = this.carrierThread; 394 assert Thread.currentThread() == this 395 && carrier == Thread.currentCarrierThread(); 396 carrier.setCurrentThread(carrier); 397 return notifyJvmti; 398 } 399 400 /** 401 * Sets the current thread to the given virtual thread. 402 * If {@code notifyJvmti} is true then JVMTI is notified. 403 */ 404 @ChangesCurrentThread 405 @JvmtiMountTransition 406 private void switchToVirtualThread(VirtualThread vthread, boolean notifyJvmti) { 407 Thread carrier = vthread.carrierThread; 408 assert carrier == Thread.currentCarrierThread(); 409 carrier.setCurrentThread(vthread); 410 if (notifyJvmti) { 411 notifyJvmtiHideFrames(false); 412 } 413 } 414 415 /** 416 * Unmounts this virtual thread, invokes Continuation.yield, and re-mounts the 417 * thread when continued. When enabled, JVMTI must be notified from this method. 418 * @return true if the yield was successful 419 */ 420 @ChangesCurrentThread 421 private boolean yieldContinuation() { 422 boolean notifyJvmti = notifyJvmtiEvents; 423 424 // unmount 425 if (notifyJvmti) notifyJvmtiUnmountBegin(false); 426 unmount(); 427 try { 428 return Continuation.yield(VTHREAD_SCOPE); 429 } finally { 430 // re-mount 431 mount(); 432 if (notifyJvmti) notifyJvmtiMountEnd(false); 433 } 434 } 435 436 /** 437 * Invoked after the continuation yields. If parking then it sets the state 438 * and also re-submits the task to continue if unparked while parking. 439 * If yielding due to Thread.yield then it just submits the task to continue. 440 */ 441 private void afterYield() { 442 int s = state(); 443 assert (s == PARKING || s == YIELDING) && (carrierThread == null); 444 445 if (s == PARKING) { 446 setState(PARKED); 447 448 // notify JVMTI that unmount has completed, thread is parked 449 if (notifyJvmtiEvents) notifyJvmtiUnmountEnd(false); 450 451 // may have been unparked while parking 452 if (parkPermit && compareAndSetState(PARKED, RUNNABLE)) { 453 // lazy submit to continue on the current thread as carrier if possible 454 if (currentThread() instanceof CarrierThread ct) { 455 lazySubmitRunContinuation(ct.getPool()); 456 } else { 457 submitRunContinuation(); 458 } 459 460 } 461 } else if (s == YIELDING) { // Thread.yield 462 setState(RUNNABLE); 463 464 // notify JVMTI that unmount has completed, thread is runnable 465 if (notifyJvmtiEvents) notifyJvmtiUnmountEnd(false); 466 467 // external submit if there are no tasks in the local task queue 468 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 469 externalSubmitRunContinuation(ct.getPool()); 470 } else { 471 submitRunContinuation(); 472 } 473 } 474 } 475 476 /** 477 * Invoked after the thread terminates (or start failed). This method 478 * notifies anyone waiting for the thread to terminate. 479 * 480 * @param executed true if the thread executed, false if it failed to start 481 */ 482 private void afterTerminate(boolean executed) { 483 assert (state() == TERMINATED) && (carrierThread == null); 484 485 if (executed) { 486 if (notifyJvmtiEvents) notifyJvmtiUnmountEnd(true); 487 } 488 489 // notify anyone waiting for this virtual thread to terminate 490 CountDownLatch termination = this.termination; 491 if (termination != null) { 492 assert termination.getCount() == 1; 493 termination.countDown(); 494 } 495 496 if (executed) { 497 // notify container if thread executed 498 threadContainer().onExit(this); 499 500 // clear references to thread locals 501 clearReferences(); 502 } 503 } 504 505 /** 506 * Schedules this {@code VirtualThread} to execute. 507 * 508 * @throws IllegalStateException if the container is shutdown or closed 509 * @throws IllegalThreadStateException if the thread has already been started 510 * @throws RejectedExecutionException if the scheduler cannot accept a task 511 */ 512 @Override 513 void start(ThreadContainer container) { 514 if (!compareAndSetState(NEW, STARTED)) { 515 throw new IllegalThreadStateException("Already started"); 516 } 517 518 // bind thread to container 519 setThreadContainer(container); 520 521 // start thread 522 boolean started = false; 523 container.onStart(this); // may throw 524 try { 525 // scoped values may be inherited 526 inheritScopedValueBindings(container); 527 528 // submit task to run thread 529 submitRunContinuation(); 530 started = true; 531 } finally { 532 if (!started) { 533 setState(TERMINATED); 534 container.onExit(this); 535 afterTerminate(/*executed*/ false); 536 } 537 } 538 } 539 540 @Override 541 public void start() { 542 start(ThreadContainers.root()); 543 } 544 545 @Override 546 public void run() { 547 // do nothing 548 } 549 550 /** 551 * Parks until unparked or interrupted. If already unparked then the parking 552 * permit is consumed and this method completes immediately (meaning it doesn't 553 * yield). It also completes immediately if the interrupt status is set. 554 */ 555 @Override 556 void park() { 557 assert Thread.currentThread() == this; 558 559 // complete immediately if parking permit available or interrupted 560 if (getAndSetParkPermit(false) || interrupted) 561 return; 562 563 // park the thread 564 setState(PARKING); 565 try { 566 if (!yieldContinuation()) { 567 // park on the carrier thread when pinned 568 parkOnCarrierThread(false, 0); 569 } 570 } finally { 571 assert (Thread.currentThread() == this) && (state() == RUNNING); 572 } 573 } 574 575 /** 576 * Parks up to the given waiting time or until unparked or interrupted. 577 * If already unparked then the parking permit is consumed and this method 578 * completes immediately (meaning it doesn't yield). It also completes immediately 579 * if the interrupt status is set or the waiting time is {@code <= 0}. 580 * 581 * @param nanos the maximum number of nanoseconds to wait. 582 */ 583 @Override 584 void parkNanos(long nanos) { 585 assert Thread.currentThread() == this; 586 587 // complete immediately if parking permit available or interrupted 588 if (getAndSetParkPermit(false) || interrupted) 589 return; 590 591 // park the thread for the waiting time 592 if (nanos > 0) { 593 long startTime = System.nanoTime(); 594 595 boolean yielded; 596 Future<?> unparker = scheduleUnpark(this::unpark, nanos); 597 setState(PARKING); 598 try { 599 yielded = yieldContinuation(); 600 } finally { 601 assert (Thread.currentThread() == this) 602 && (state() == RUNNING || state() == PARKING); 603 cancel(unparker); 604 } 605 606 // park on carrier thread for remaining time when pinned 607 if (!yielded) { 608 long deadline = startTime + nanos; 609 if (deadline < 0L) 610 deadline = Long.MAX_VALUE; 611 parkOnCarrierThread(true, deadline - System.nanoTime()); 612 } 613 } 614 } 615 616 /** 617 * Parks the current carrier thread up to the given waiting time or until 618 * unparked or interrupted. If the virtual thread is interrupted then the 619 * interrupt status will be propagated to the carrier thread. 620 * @param timed true for a timed park, false for untimed 621 * @param nanos the waiting time in nanoseconds 622 */ 623 private void parkOnCarrierThread(boolean timed, long nanos) { 624 assert state() == PARKING; 625 626 var pinnedEvent = new VirtualThreadPinnedEvent(); 627 pinnedEvent.begin(); 628 629 setState(PINNED); 630 try { 631 if (!parkPermit) { 632 if (!timed) { 633 U.park(false, 0); 634 } else if (nanos > 0) { 635 U.park(false, nanos); 636 } 637 } 638 } finally { 639 setState(RUNNING); 640 } 641 642 // consume parking permit 643 setParkPermit(false); 644 645 pinnedEvent.commit(); 646 } 647 648 /** 649 * Schedule an unpark task to run after a given delay. 650 */ 651 @ChangesCurrentThread 652 private Future<?> scheduleUnpark(Runnable unparker, long nanos) { 653 // need to switch to current carrier thread to avoid nested parking 654 boolean notifyJvmti = switchToCarrierThread(); 655 try { 656 return UNPARKER.schedule(unparker, nanos, NANOSECONDS); 657 } finally { 658 switchToVirtualThread(this, notifyJvmti); 659 } 660 } 661 662 /** 663 * Cancels a task if it has not completed. 664 */ 665 @ChangesCurrentThread 666 private void cancel(Future<?> future) { 667 if (!future.isDone()) { 668 // need to switch to current carrier thread to avoid nested parking 669 boolean notifyJvmti = switchToCarrierThread(); 670 try { 671 future.cancel(false); 672 } finally { 673 switchToVirtualThread(this, notifyJvmti); 674 } 675 } 676 } 677 678 /** 679 * Re-enables this virtual thread for scheduling. If the virtual thread was 680 * {@link #park() parked} then it will be unblocked, otherwise its next call 681 * to {@code park} or {@linkplain #parkNanos(long) parkNanos} is guaranteed 682 * not to block. 683 * @throws RejectedExecutionException if the scheduler cannot accept a task 684 */ 685 @Override 686 @ChangesCurrentThread 687 void unpark() { 688 Thread currentThread = Thread.currentThread(); 689 if (!getAndSetParkPermit(true) && currentThread != this) { 690 int s = state(); 691 if (s == PARKED && compareAndSetState(PARKED, RUNNABLE)) { 692 if (currentThread instanceof VirtualThread vthread) { 693 boolean notifyJvmti = vthread.switchToCarrierThread(); 694 try { 695 submitRunContinuation(); 696 } finally { 697 switchToVirtualThread(vthread, notifyJvmti); 698 } 699 } else { 700 submitRunContinuation(); 701 } 702 } else if (s == PINNED) { 703 // unpark carrier thread when pinned. 704 synchronized (carrierThreadAccessLock()) { 705 Thread carrier = carrierThread; 706 if (carrier != null && state() == PINNED) { 707 U.unpark(carrier); 708 } 709 } 710 } 711 } 712 } 713 714 /** 715 * Attempts to yield the current virtual thread (Thread.yield). 716 */ 717 void tryYield() { 718 assert Thread.currentThread() == this; 719 setState(YIELDING); 720 try { 721 yieldContinuation(); 722 } finally { 723 assert Thread.currentThread() == this; 724 if (state() != RUNNING) { 725 assert state() == YIELDING; 726 setState(RUNNING); 727 } 728 } 729 } 730 731 /** 732 * Sleep the current virtual thread for the given sleep time. 733 * 734 * @param nanos the maximum number of nanoseconds to sleep 735 * @throws InterruptedException if interrupted while sleeping 736 */ 737 void sleepNanos(long nanos) throws InterruptedException { 738 assert Thread.currentThread() == this; 739 if (nanos >= 0) { 740 if (ThreadSleepEvent.isTurnedOn()) { 741 ThreadSleepEvent event = new ThreadSleepEvent(); 742 try { 743 event.time = nanos; 744 event.begin(); 745 doSleepNanos(nanos); 746 } finally { 747 event.commit(); 748 } 749 } else { 750 doSleepNanos(nanos); 751 } 752 } 753 } 754 755 /** 756 * Sleep the current thread for the given sleep time (in nanoseconds). If 757 * nanos is 0 then the thread will attempt to yield. 758 * 759 * @implNote This implementation parks the thread for the given sleeping time 760 * and will therefore be observed in PARKED state during the sleep. Parking 761 * will consume the parking permit so this method makes available the parking 762 * permit after the sleep. This may be observed as a spurious, but benign, 763 * wakeup when the thread subsequently attempts to park. 764 */ 765 private void doSleepNanos(long nanos) throws InterruptedException { 766 assert nanos >= 0; 767 if (getAndClearInterrupt()) 768 throw new InterruptedException(); 769 if (nanos == 0) { 770 tryYield(); 771 } else { 772 // park for the sleep time 773 try { 774 long remainingNanos = nanos; 775 long startNanos = System.nanoTime(); 776 while (remainingNanos > 0) { 777 parkNanos(remainingNanos); 778 if (getAndClearInterrupt()) { 779 throw new InterruptedException(); 780 } 781 remainingNanos = nanos - (System.nanoTime() - startNanos); 782 } 783 } finally { 784 // may have been unparked while sleeping 785 setParkPermit(true); 786 } 787 } 788 } 789 790 /** 791 * Waits up to {@code nanos} nanoseconds for this virtual thread to terminate. 792 * A timeout of {@code 0} means to wait forever. 793 * 794 * @throws InterruptedException if interrupted while waiting 795 * @return true if the thread has terminated 796 */ 797 boolean joinNanos(long nanos) throws InterruptedException { 798 if (state() == TERMINATED) 799 return true; 800 801 // ensure termination object exists, then re-check state 802 CountDownLatch termination = getTermination(); 803 if (state() == TERMINATED) 804 return true; 805 806 // wait for virtual thread to terminate 807 if (nanos == 0) { 808 termination.await(); 809 } else { 810 boolean terminated = termination.await(nanos, NANOSECONDS); 811 if (!terminated) { 812 // waiting time elapsed 813 return false; 814 } 815 } 816 assert state() == TERMINATED; 817 return true; 818 } 819 820 @Override 821 @SuppressWarnings("removal") 822 public void interrupt() { 823 if (Thread.currentThread() != this) { 824 checkAccess(); 825 synchronized (interruptLock) { 826 interrupted = true; 827 Interruptible b = nioBlocker; 828 if (b != null) { 829 b.interrupt(this); 830 } 831 832 // interrupt carrier thread if mounted 833 Thread carrier = carrierThread; 834 if (carrier != null) carrier.setInterrupt(); 835 } 836 } else { 837 interrupted = true; 838 carrierThread.setInterrupt(); 839 } 840 unpark(); 841 } 842 843 @Override 844 public boolean isInterrupted() { 845 return interrupted; 846 } 847 848 @Override 849 boolean getAndClearInterrupt() { 850 assert Thread.currentThread() == this; 851 synchronized (interruptLock) { 852 boolean oldValue = interrupted; 853 if (oldValue) 854 interrupted = false; 855 carrierThread.clearInterrupt(); 856 return oldValue; 857 } 858 } 859 860 @Override 861 Thread.State threadState() { 862 switch (state()) { 863 case NEW: 864 return Thread.State.NEW; 865 case STARTED: 866 // return NEW if thread container not yet set 867 if (threadContainer() == null) { 868 return Thread.State.NEW; 869 } else { 870 return Thread.State.RUNNABLE; 871 } 872 case RUNNABLE: 873 case RUNNABLE_SUSPENDED: 874 // runnable, not mounted 875 return Thread.State.RUNNABLE; 876 case RUNNING: 877 // if mounted then return state of carrier thread 878 synchronized (carrierThreadAccessLock()) { 879 Thread carrierThread = this.carrierThread; 880 if (carrierThread != null) { 881 return carrierThread.threadState(); 882 } 883 } 884 // runnable, mounted 885 return Thread.State.RUNNABLE; 886 case PARKING: 887 case YIELDING: 888 // runnable, mounted, not yet waiting 889 return Thread.State.RUNNABLE; 890 case PARKED: 891 case PARKED_SUSPENDED: 892 case PINNED: 893 return Thread.State.WAITING; 894 case TERMINATED: 895 return Thread.State.TERMINATED; 896 default: 897 throw new InternalError(); 898 } 899 } 900 901 @Override 902 boolean alive() { 903 int s = state; 904 return (s != NEW && s != TERMINATED); 905 } 906 907 @Override 908 boolean isTerminated() { 909 return (state == TERMINATED); 910 } 911 912 @Override 913 StackTraceElement[] asyncGetStackTrace() { 914 StackTraceElement[] stackTrace; 915 do { 916 stackTrace = (carrierThread != null) 917 ? super.asyncGetStackTrace() // mounted 918 : tryGetStackTrace(); // unmounted 919 if (stackTrace == null) { 920 Thread.yield(); 921 } 922 } while (stackTrace == null); 923 return stackTrace; 924 } 925 926 /** 927 * Returns the stack trace for this virtual thread if it is unmounted. 928 * Returns null if the thread is in another state. 929 */ 930 private StackTraceElement[] tryGetStackTrace() { 931 int initialState = state(); 932 return switch (initialState) { 933 case RUNNABLE, PARKED -> { 934 int suspendedState = initialState | SUSPENDED; 935 if (compareAndSetState(initialState, suspendedState)) { 936 try { 937 yield cont.getStackTrace(); 938 } finally { 939 assert state == suspendedState; 940 setState(initialState); 941 942 // re-submit if runnable 943 // re-submit if unparked while suspended 944 if (initialState == RUNNABLE 945 || (parkPermit && compareAndSetState(PARKED, RUNNABLE))) { 946 try { 947 submitRunContinuation(); 948 } catch (RejectedExecutionException ignore) { } 949 } 950 } 951 } 952 yield null; 953 } 954 case NEW, STARTED, TERMINATED -> new StackTraceElement[0]; // empty stack 955 default -> null; 956 }; 957 } 958 959 @Override 960 public String toString() { 961 StringBuilder sb = new StringBuilder("VirtualThread[#"); 962 sb.append(threadId()); 963 String name = getName(); 964 if (!name.isEmpty()) { 965 sb.append(","); 966 sb.append(name); 967 } 968 sb.append("]/"); 969 Thread carrier = carrierThread; 970 if (carrier != null) { 971 // include the carrier thread state and name when mounted 972 synchronized (carrierThreadAccessLock()) { 973 carrier = carrierThread; 974 if (carrier != null) { 975 String stateAsString = carrier.threadState().toString(); 976 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 977 sb.append('@'); 978 sb.append(carrier.getName()); 979 } 980 } 981 } 982 // include virtual thread state when not mounted 983 if (carrier == null) { 984 String stateAsString = threadState().toString(); 985 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 986 } 987 return sb.toString(); 988 } 989 990 @Override 991 public int hashCode() { 992 return (int) threadId(); 993 } 994 995 @Override 996 public boolean equals(Object obj) { 997 return obj == this; 998 } 999 1000 /** 1001 * Returns the termination object, creating it if needed. 1002 */ 1003 private CountDownLatch getTermination() { 1004 CountDownLatch termination = this.termination; 1005 if (termination == null) { 1006 termination = new CountDownLatch(1); 1007 if (!U.compareAndSetReference(this, TERMINATION, null, termination)) { 1008 termination = this.termination; 1009 } 1010 } 1011 return termination; 1012 } 1013 1014 /** 1015 * Returns the lock object to synchronize on when accessing carrierThread. 1016 * The lock prevents carrierThread from being reset to null during unmount. 1017 */ 1018 private Object carrierThreadAccessLock() { 1019 // return interruptLock as unmount has to coordinate with interrupt 1020 return interruptLock; 1021 } 1022 1023 // -- wrappers for get/set of state, parking permit, and carrier thread -- 1024 1025 private int state() { 1026 return state; // volatile read 1027 } 1028 1029 private void setState(int newValue) { 1030 state = newValue; // volatile write 1031 } 1032 1033 private boolean compareAndSetState(int expectedValue, int newValue) { 1034 return U.compareAndSetInt(this, STATE, expectedValue, newValue); 1035 } 1036 1037 private void setParkPermit(boolean newValue) { 1038 if (parkPermit != newValue) { 1039 parkPermit = newValue; 1040 } 1041 } 1042 1043 private boolean getAndSetParkPermit(boolean newValue) { 1044 if (parkPermit != newValue) { 1045 return U.getAndSetBoolean(this, PARK_PERMIT, newValue); 1046 } else { 1047 return newValue; 1048 } 1049 } 1050 1051 private void setCarrierThread(Thread carrier) { 1052 // U.putReferenceRelease(this, CARRIER_THREAD, carrier); 1053 this.carrierThread = carrier; 1054 } 1055 1056 // -- JVM TI support -- 1057 1058 private static volatile boolean notifyJvmtiEvents; // set by VM 1059 1060 @JvmtiMountTransition 1061 private native void notifyJvmtiMountBegin(boolean firstMount); 1062 1063 @JvmtiMountTransition 1064 private native void notifyJvmtiMountEnd(boolean firstMount); 1065 1066 @JvmtiMountTransition 1067 private native void notifyJvmtiUnmountBegin(boolean lastUnmount); 1068 1069 @JvmtiMountTransition 1070 private native void notifyJvmtiUnmountEnd(boolean lastUnmount); 1071 1072 @JvmtiMountTransition 1073 private native void notifyJvmtiHideFrames(boolean hide); 1074 1075 private static native void registerNatives(); 1076 static { 1077 registerNatives(); 1078 } 1079 1080 /** 1081 * Creates the default scheduler. 1082 */ 1083 @SuppressWarnings("removal") 1084 private static ForkJoinPool createDefaultScheduler() { 1085 ForkJoinWorkerThreadFactory factory = pool -> { 1086 PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread(pool); 1087 return AccessController.doPrivileged(pa); 1088 }; 1089 PrivilegedAction<ForkJoinPool> pa = () -> { 1090 int parallelism, maxPoolSize, minRunnable; 1091 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism"); 1092 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize"); 1093 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable"); 1094 if (parallelismValue != null) { 1095 parallelism = Integer.parseInt(parallelismValue); 1096 } else { 1097 parallelism = Runtime.getRuntime().availableProcessors(); 1098 } 1099 if (maxPoolSizeValue != null) { 1100 maxPoolSize = Integer.parseInt(maxPoolSizeValue); 1101 parallelism = Integer.min(parallelism, maxPoolSize); 1102 } else { 1103 maxPoolSize = Integer.max(parallelism, 256); 1104 } 1105 if (minRunnableValue != null) { 1106 minRunnable = Integer.parseInt(minRunnableValue); 1107 } else { 1108 minRunnable = Integer.max(parallelism / 2, 1); 1109 } 1110 Thread.UncaughtExceptionHandler handler = (t, e) -> { }; 1111 boolean asyncMode = true; // FIFO 1112 return new ForkJoinPool(parallelism, factory, handler, asyncMode, 1113 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS); 1114 }; 1115 return AccessController.doPrivileged(pa); 1116 } 1117 1118 /** 1119 * Creates the ScheduledThreadPoolExecutor used for timed unpark. 1120 */ 1121 private static ScheduledExecutorService createDelayedTaskScheduler() { 1122 String propValue = GetPropertyAction.privilegedGetProperty("jdk.unparker.maxPoolSize"); 1123 int poolSize; 1124 if (propValue != null) { 1125 poolSize = Integer.parseInt(propValue); 1126 } else { 1127 poolSize = 1; 1128 } 1129 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) 1130 Executors.newScheduledThreadPool(poolSize, task -> { 1131 return InnocuousThread.newThread("VirtualThread-unparker", task); 1132 }); 1133 stpe.setRemoveOnCancelPolicy(true); 1134 return stpe; 1135 } 1136 1137 /** 1138 * Reads the value of the jdk.tracePinnedThreads property to determine if stack 1139 * traces should be printed when a carrier thread is pinned when a virtual thread 1140 * attempts to park. 1141 */ 1142 private static int tracePinningMode() { 1143 String propValue = GetPropertyAction.privilegedGetProperty("jdk.tracePinnedThreads"); 1144 if (propValue != null) { 1145 if (propValue.length() == 0 || "full".equalsIgnoreCase(propValue)) 1146 return 1; 1147 if ("short".equalsIgnoreCase(propValue)) 1148 return 2; 1149 } 1150 return 0; 1151 } 1152 } --- EOF ---