1 /* 2 * Copyright (c) 2018, 2024, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.lang; 26 27 import java.nio.charset.StandardCharsets; 28 import java.security.AccessController; 29 import java.security.PrivilegedAction; 30 import java.util.Locale; 31 import java.util.Objects; 32 import java.util.concurrent.Callable; 33 import java.util.concurrent.CountDownLatch; 34 import java.util.concurrent.Executor; 35 import java.util.concurrent.Executors; 36 import java.util.concurrent.ForkJoinPool; 37 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory; 38 import java.util.concurrent.ForkJoinTask; 39 import java.util.concurrent.ForkJoinWorkerThread; 40 import java.util.concurrent.Future; 41 import java.util.concurrent.RejectedExecutionException; 42 import java.util.concurrent.ScheduledExecutorService; 43 import java.util.concurrent.ScheduledThreadPoolExecutor; 44 import jdk.internal.event.VirtualThreadEndEvent; 45 import jdk.internal.event.VirtualThreadStartEvent; 46 import jdk.internal.event.VirtualThreadSubmitFailedEvent; 47 import jdk.internal.misc.CarrierThread; 48 import jdk.internal.misc.InnocuousThread; 49 import jdk.internal.misc.Unsafe; 50 import jdk.internal.vm.Continuation; 51 import jdk.internal.vm.ContinuationScope; 52 import jdk.internal.vm.StackableScope; 53 import jdk.internal.vm.ThreadContainer; 54 import jdk.internal.vm.ThreadContainers; 55 import jdk.internal.vm.annotation.ChangesCurrentThread; 56 import jdk.internal.vm.annotation.Hidden; 57 import jdk.internal.vm.annotation.IntrinsicCandidate; 58 import jdk.internal.vm.annotation.JvmtiMountTransition; 59 import jdk.internal.vm.annotation.ReservedStackAccess; 60 import sun.nio.ch.Interruptible; 61 import sun.security.action.GetPropertyAction; 62 import static java.util.concurrent.TimeUnit.*; 63 64 /** 65 * A thread that is scheduled by the Java virtual machine rather than the operating 66 * system. 67 */ 68 final class VirtualThread extends BaseVirtualThread { 69 private static final Unsafe U = Unsafe.getUnsafe(); 70 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads"); 71 private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler(); 72 private static final ScheduledExecutorService[] DELAYED_TASK_SCHEDULERS = createDelayedTaskSchedulers(); 73 74 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state"); 75 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit"); 76 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread"); 77 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination"); 78 private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList"); 79 80 // scheduler and continuation 81 private final Executor scheduler; 82 private final Continuation cont; 83 private final Runnable runContinuation; 84 85 // virtual thread state, accessed by VM 86 private volatile int state; 87 88 /* 89 * Virtual thread state transitions: 90 * 91 * NEW -> STARTED // Thread.start, schedule to run 92 * STARTED -> TERMINATED // failed to start 93 * STARTED -> RUNNING // first run 94 * RUNNING -> TERMINATED // done 95 * 96 * RUNNING -> PARKING // Thread parking with LockSupport.park 97 * PARKING -> PARKED // cont.yield successful, parked indefinitely 98 * PARKING -> PINNED // cont.yield failed, parked indefinitely on carrier 99 * PARKED -> UNPARKED // unparked, may be scheduled to continue 100 * PINNED -> RUNNING // unparked, continue execution on same carrier 101 * UNPARKED -> RUNNING // continue execution after park 102 * 103 * RUNNING -> TIMED_PARKING // Thread parking with LockSupport.parkNanos 104 * TIMED_PARKING -> TIMED_PARKED // cont.yield successful, timed-parked 105 * TIMED_PARKING -> TIMED_PINNED // cont.yield failed, timed-parked on carrier 106 * TIMED_PARKED -> UNPARKED // unparked, may be scheduled to continue 107 * TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier 108 * 109 * RUNNABLE -> BLOCKING // blocking on monitor enter 110 * BLOCKING -> BLOCKED // blocked on monitor enter 111 * BLOCKED -> UNBLOCKED // unblocked, may be scheduled to continue 112 * UNBLOCKED -> RUNNING // continue execution after blocked on monitor enter 113 * 114 * RUNNING -> YIELDING // Thread.yield 115 * YIELDING -> YIELDED // cont.yield successful, may be scheduled to continue 116 * YIELDING -> RUNNING // cont.yield failed 117 * YIELDED -> RUNNING // continue execution after Thread.yield 118 */ 119 private static final int NEW = 0; 120 private static final int STARTED = 1; 121 private static final int RUNNING = 2; // runnable-mounted 122 123 // untimed and timed parking 124 private static final int PARKING = 3; 125 private static final int PARKED = 4; // unmounted 126 private static final int PINNED = 5; // mounted 127 private static final int TIMED_PARKING = 6; 128 private static final int TIMED_PARKED = 7; // unmounted 129 private static final int TIMED_PINNED = 8; // mounted 130 private static final int UNPARKED = 9; // unmounted but runnable 131 132 // Thread.yield 133 private static final int YIELDING = 10; 134 private static final int YIELDED = 11; // unmounted but runnable 135 136 // monitor enter 137 private static final int BLOCKING = 12; 138 private static final int BLOCKED = 13; // unmounted 139 private static final int UNBLOCKED = 14; // unmounted but runnable 140 141 private static final int TERMINATED = 99; // final state 142 143 // can be suspended from scheduling when unmounted 144 private static final int SUSPENDED = 1 << 8; 145 146 // parking permit 147 private volatile boolean parkPermit; 148 149 // used to mark thread as ready to be unblocked while it is concurrently blocking 150 private volatile boolean unblocked; 151 152 // a positive value if "responsible thread" blocked on monitor enter, accessed by VM 153 private volatile byte recheckInterval; 154 155 // carrier thread when mounted, accessed by VM 156 private volatile Thread carrierThread; 157 158 // termination object when joining, created lazily if needed 159 private volatile CountDownLatch termination; 160 161 // has the value 1 when on the list of virtual threads waiting to be unblocked 162 private volatile byte onWaitingList; 163 164 // next virtual thread on the list of virtual threads waiting to be unblocked 165 private volatile VirtualThread next; 166 167 /** 168 * Returns the continuation scope used for virtual threads. 169 */ 170 static ContinuationScope continuationScope() { 171 return VTHREAD_SCOPE; 172 } 173 174 /** 175 * Creates a new {@code VirtualThread} to run the given task with the given 176 * scheduler. If the given scheduler is {@code null} and the current thread 177 * is a platform thread then the newly created virtual thread will use the 178 * default scheduler. If given scheduler is {@code null} and the current 179 * thread is a virtual thread then the current thread's scheduler is used. 180 * 181 * @param scheduler the scheduler or null 182 * @param name thread name 183 * @param characteristics characteristics 184 * @param task the task to execute 185 */ 186 VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) { 187 super(name, characteristics, /*bound*/ false); 188 Objects.requireNonNull(task); 189 190 // choose scheduler if not specified 191 if (scheduler == null) { 192 Thread parent = Thread.currentThread(); 193 if (parent instanceof VirtualThread vparent) { 194 scheduler = vparent.scheduler; 195 } else { 196 scheduler = DEFAULT_SCHEDULER; 197 } 198 } 199 200 this.scheduler = scheduler; 201 this.cont = new VThreadContinuation(this, task); 202 this.runContinuation = this::runContinuation; 203 } 204 205 /** 206 * The continuation that a virtual thread executes. 207 */ 208 private static class VThreadContinuation extends Continuation { 209 VThreadContinuation(VirtualThread vthread, Runnable task) { 210 super(VTHREAD_SCOPE, wrap(vthread, task)); 211 } 212 @Override 213 protected void onPinned(Continuation.Pinned reason) { 214 // emit JFR event 215 virtualThreadPinnedEvent(reason.value()); 216 } 217 private static Runnable wrap(VirtualThread vthread, Runnable task) { 218 return new Runnable() { 219 @Hidden 220 public void run() { 221 vthread.run(task); 222 } 223 }; 224 } 225 } 226 227 /** 228 * Runs or continues execution on the current thread. The virtual thread is mounted 229 * on the current thread before the task runs or continues. It unmounts when the 230 * task completes or yields. 231 */ 232 @ChangesCurrentThread 233 private void runContinuation() { 234 // the carrier must be a platform thread 235 if (Thread.currentThread().isVirtual()) { 236 throw new WrongThreadException(); 237 } 238 239 // set state to RUNNING 240 int initialState = state(); 241 if (initialState == STARTED || initialState == UNPARKED 242 || initialState == UNBLOCKED || initialState == YIELDED) { 243 // newly started or continue after parking/blocking/Thread.yield 244 if (!compareAndSetState(initialState, RUNNING)) { 245 return; 246 } 247 // consume parking permit when continuing after parking 248 if (initialState == UNPARKED) { 249 setParkPermit(false); 250 } 251 } else { 252 // not runnable 253 return; 254 } 255 256 mount(); 257 try { 258 cont.run(); 259 } finally { 260 unmount(); 261 if (cont.isDone()) { 262 afterDone(); 263 } else { 264 afterYield(); 265 } 266 } 267 } 268 269 /** 270 * Submits the runContinuation task to the scheduler. For the default scheduler, 271 * and calling it on a worker thread, the task will be pushed to the local queue, 272 * otherwise it will be pushed to an external submission queue. 273 * @throws RejectedExecutionException 274 */ 275 private void submitRunContinuation() { 276 try { 277 scheduler.execute(runContinuation); 278 } catch (RejectedExecutionException ree) { 279 submitFailed(ree); 280 throw ree; 281 } 282 } 283 284 /** 285 * Submits the runContinuation task the scheduler. For the default scheduler, the task 286 * will be pushed to an external submission queue. 287 * @throws RejectedExecutionException 288 */ 289 private void externalSubmitRunContinuation() { 290 if (scheduler == DEFAULT_SCHEDULER 291 && currentCarrierThread() instanceof CarrierThread ct) { 292 externalSubmitRunContinuation(ct.getPool()); 293 } else { 294 submitRunContinuation(); 295 } 296 } 297 298 /** 299 * Submits the runContinuation task to given scheduler with a lazy submit. 300 * @throws RejectedExecutionException 301 * @see ForkJoinPool#lazySubmit(ForkJoinTask) 302 */ 303 private void lazySubmitRunContinuation(ForkJoinPool pool) { 304 try { 305 pool.lazySubmit(ForkJoinTask.adapt(runContinuation)); 306 } catch (RejectedExecutionException ree) { 307 submitFailed(ree); 308 throw ree; 309 } 310 } 311 312 /** 313 * Submits the runContinuation task to the given scheduler as an external submit. 314 * @throws RejectedExecutionException 315 * @see ForkJoinPool#externalSubmit(ForkJoinTask) 316 */ 317 private void externalSubmitRunContinuation(ForkJoinPool pool) { 318 try { 319 pool.externalSubmit(ForkJoinTask.adapt(runContinuation)); 320 } catch (RejectedExecutionException ree) { 321 submitFailed(ree); 322 throw ree; 323 } 324 } 325 326 /** 327 * If enabled, emits a JFR VirtualThreadSubmitFailedEvent. 328 */ 329 private void submitFailed(RejectedExecutionException ree) { 330 var event = new VirtualThreadSubmitFailedEvent(); 331 if (event.isEnabled()) { 332 event.javaThreadId = threadId(); 333 event.exceptionMessage = ree.getMessage(); 334 event.commit(); 335 } 336 } 337 338 /** 339 * Runs a task in the context of this virtual thread. 340 */ 341 private void run(Runnable task) { 342 assert Thread.currentThread() == this && state == RUNNING; 343 344 // notify JVMTI, may post VirtualThreadStart event 345 notifyJvmtiStart(); 346 347 // emit JFR event if enabled 348 if (VirtualThreadStartEvent.isTurnedOn()) { 349 var event = new VirtualThreadStartEvent(); 350 event.javaThreadId = threadId(); 351 event.commit(); 352 } 353 354 Object bindings = Thread.scopedValueBindings(); 355 try { 356 runWith(bindings, task); 357 } catch (Throwable exc) { 358 dispatchUncaughtException(exc); 359 } finally { 360 try { 361 // pop any remaining scopes from the stack, this may block 362 StackableScope.popAll(); 363 364 // emit JFR event if enabled 365 if (VirtualThreadEndEvent.isTurnedOn()) { 366 var event = new VirtualThreadEndEvent(); 367 event.javaThreadId = threadId(); 368 event.commit(); 369 } 370 371 } finally { 372 // notify JVMTI, may post VirtualThreadEnd event 373 notifyJvmtiEnd(); 374 } 375 } 376 } 377 378 /** 379 * Mounts this virtual thread onto the current platform thread. On 380 * return, the current thread is the virtual thread. 381 */ 382 @ChangesCurrentThread 383 @ReservedStackAccess 384 private void mount() { 385 // notify JVMTI before mount 386 notifyJvmtiMount(/*hide*/true); 387 388 // sets the carrier thread 389 Thread carrier = Thread.currentCarrierThread(); 390 setCarrierThread(carrier); 391 392 // sync up carrier thread interrupt status if needed 393 if (interrupted) { 394 carrier.setInterrupt(); 395 } else if (carrier.isInterrupted()) { 396 synchronized (interruptLock) { 397 // need to recheck interrupt status 398 if (!interrupted) { 399 carrier.clearInterrupt(); 400 } 401 } 402 } 403 404 // set Thread.currentThread() to return this virtual thread 405 carrier.setCurrentThread(this); 406 } 407 408 /** 409 * Unmounts this virtual thread from the carrier. On return, the 410 * current thread is the current platform thread. 411 */ 412 @ChangesCurrentThread 413 @ReservedStackAccess 414 private void unmount() { 415 assert !Thread.holdsLock(interruptLock); 416 417 // set Thread.currentThread() to return the platform thread 418 Thread carrier = this.carrierThread; 419 carrier.setCurrentThread(carrier); 420 421 // break connection to carrier thread, synchronized with interrupt 422 synchronized (interruptLock) { 423 setCarrierThread(null); 424 } 425 carrier.clearInterrupt(); 426 427 // notify JVMTI after unmount 428 notifyJvmtiUnmount(/*hide*/false); 429 } 430 431 /** 432 * Sets the current thread to the current carrier thread. 433 */ 434 @ChangesCurrentThread 435 @JvmtiMountTransition 436 private void switchToCarrierThread() { 437 notifyJvmtiHideFrames(true); 438 Thread carrier = this.carrierThread; 439 assert Thread.currentThread() == this 440 && carrier == Thread.currentCarrierThread(); 441 carrier.setCurrentThread(carrier); 442 setLockId(this.threadId()); // keep lockid of vthread 443 } 444 445 /** 446 * Sets the current thread to the given virtual thread. 447 */ 448 @ChangesCurrentThread 449 @JvmtiMountTransition 450 private void switchToVirtualThread(VirtualThread vthread) { 451 Thread carrier = vthread.carrierThread; 452 assert carrier == Thread.currentCarrierThread(); 453 carrier.setCurrentThread(vthread); 454 notifyJvmtiHideFrames(false); 455 } 456 457 /** 458 * Executes the given value returning task on the current carrier thread. 459 */ 460 @ChangesCurrentThread 461 <V> V executeOnCarrierThread(Callable<V> task) throws Exception { 462 assert Thread.currentThread() == this; 463 switchToCarrierThread(); 464 try { 465 return task.call(); 466 } finally { 467 switchToVirtualThread(this); 468 } 469 } 470 471 /** 472 * Invokes Continuation.yield, notifying JVMTI (if enabled) to hide frames until 473 * the continuation continues. 474 */ 475 @Hidden 476 private boolean yieldContinuation() { 477 notifyJvmtiUnmount(/*hide*/true); 478 try { 479 return Continuation.yield(VTHREAD_SCOPE); 480 } finally { 481 notifyJvmtiMount(/*hide*/false); 482 } 483 } 484 485 /** 486 * Invoked after the continuation yields. If parking then it sets the state 487 * and also re-submits the task to continue if unparked while parking. 488 * If yielding due to Thread.yield then it just submits the task to continue. 489 */ 490 private void afterYield() { 491 assert carrierThread == null; 492 493 // re-adjust parallelism if the virtual thread yielded when compensating 494 if (currentThread() instanceof CarrierThread ct) { 495 ct.endBlocking(); 496 } 497 498 int s = state(); 499 500 // LockSupport.park/parkNanos 501 if (s == PARKING || s == TIMED_PARKING) { 502 int newState = (s == PARKING) ? PARKED : TIMED_PARKED; 503 setState(newState); 504 505 // may have been unparked while parking 506 if (parkPermit && compareAndSetState(newState, UNPARKED)) { 507 // lazy submit to continue on the current thread as carrier if possible 508 if (currentThread() instanceof CarrierThread ct) { 509 lazySubmitRunContinuation(ct.getPool()); 510 } else { 511 submitRunContinuation(); 512 } 513 514 } 515 return; 516 } 517 518 // Thread.yield 519 if (s == YIELDING) { 520 setState(YIELDED); 521 522 // external submit if there are no tasks in the local task queue 523 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 524 externalSubmitRunContinuation(ct.getPool()); 525 } else { 526 submitRunContinuation(); 527 } 528 return; 529 } 530 531 // blocking on monitorenter 532 if (s == BLOCKING) { 533 setState(BLOCKED); 534 535 // may have been unblocked while blocking 536 if (unblocked && compareAndSetState(BLOCKED, UNBLOCKED)) { 537 unblocked = false; 538 submitRunContinuation(); 539 return; 540 } 541 542 // if thread is the designated responsible thread for a monitor then schedule 543 // it to wakeup so that it can check and recover. See objectMonitor.cpp. 544 int recheckInterval = this.recheckInterval; 545 if (recheckInterval > 0 && state() == BLOCKED) { 546 assert recheckInterval >= 1 && recheckInterval <= 6; 547 // 4 ^ (recheckInterval - 1) = 1, 4, 16, ... 1024 548 long delay = 1 << (recheckInterval - 1) << (recheckInterval - 1); 549 Future<?> unblocker = delayedTaskScheduler().schedule(this::unblock, delay, MILLISECONDS); 550 // cancel if unblocked while scheduling the unblock 551 if (state() != BLOCKED) { 552 unblocker.cancel(false); 553 } 554 } 555 return; 556 } 557 558 assert false; 559 } 560 561 /** 562 * Invoked after the continuation completes. 563 */ 564 private void afterDone() { 565 afterDone(true); 566 } 567 568 /** 569 * Invoked after the continuation completes (or start failed). Sets the thread 570 * state to TERMINATED and notifies anyone waiting for the thread to terminate. 571 * 572 * @param notifyContainer true if its container should be notified 573 */ 574 private void afterDone(boolean notifyContainer) { 575 assert carrierThread == null; 576 setState(TERMINATED); 577 578 // notify anyone waiting for this virtual thread to terminate 579 CountDownLatch termination = this.termination; 580 if (termination != null) { 581 assert termination.getCount() == 1; 582 termination.countDown(); 583 } 584 585 // notify container 586 if (notifyContainer) { 587 threadContainer().onExit(this); 588 } 589 590 // clear references to thread locals 591 clearReferences(); 592 } 593 594 /** 595 * Schedules this {@code VirtualThread} to execute. 596 * 597 * @throws IllegalStateException if the container is shutdown or closed 598 * @throws IllegalThreadStateException if the thread has already been started 599 * @throws RejectedExecutionException if the scheduler cannot accept a task 600 */ 601 @Override 602 void start(ThreadContainer container) { 603 if (!compareAndSetState(NEW, STARTED)) { 604 throw new IllegalThreadStateException("Already started"); 605 } 606 607 // bind thread to container 608 assert threadContainer() == null; 609 setThreadContainer(container); 610 611 // start thread 612 boolean addedToContainer = false; 613 boolean started = false; 614 try { 615 container.onStart(this); // may throw 616 addedToContainer = true; 617 618 // scoped values may be inherited 619 inheritScopedValueBindings(container); 620 621 // submit task to run thread, using externalSubmit if possible 622 externalSubmitRunContinuation(); 623 started = true; 624 } finally { 625 if (!started) { 626 afterDone(addedToContainer); 627 } 628 } 629 } 630 631 @Override 632 public void start() { 633 start(ThreadContainers.root()); 634 } 635 636 @Override 637 public void run() { 638 // do nothing 639 } 640 641 /** 642 * Parks until unparked or interrupted. If already unparked then the parking 643 * permit is consumed and this method completes immediately (meaning it doesn't 644 * yield). It also completes immediately if the interrupt status is set. 645 */ 646 @Override 647 void park() { 648 assert Thread.currentThread() == this; 649 650 // complete immediately if parking permit available or interrupted 651 if (getAndSetParkPermit(false) || interrupted) 652 return; 653 654 // park the thread 655 boolean yielded = false; 656 setState(PARKING); 657 try { 658 yielded = yieldContinuation(); // may throw 659 } finally { 660 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 661 if (!yielded) { 662 assert state() == PARKING; 663 setState(RUNNING); 664 } 665 } 666 667 // park on the carrier thread when pinned 668 if (!yielded) { 669 parkOnCarrierThread(false, 0); 670 } 671 } 672 673 /** 674 * Parks up to the given waiting time or until unparked or interrupted. 675 * If already unparked then the parking permit is consumed and this method 676 * completes immediately (meaning it doesn't yield). It also completes immediately 677 * if the interrupt status is set or the waiting time is {@code <= 0}. 678 * 679 * @param nanos the maximum number of nanoseconds to wait. 680 */ 681 @Override 682 void parkNanos(long nanos) { 683 assert Thread.currentThread() == this; 684 685 // complete immediately if parking permit available or interrupted 686 if (getAndSetParkPermit(false) || interrupted) 687 return; 688 689 // park the thread for the waiting time 690 if (nanos > 0) { 691 long startTime = System.nanoTime(); 692 693 boolean yielded = false; 694 Future<?> unparker = scheduleUnpark(nanos); // may throw OOME 695 setState(TIMED_PARKING); 696 try { 697 yielded = yieldContinuation(); // may throw 698 } finally { 699 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 700 if (!yielded) { 701 assert state() == TIMED_PARKING; 702 setState(RUNNING); 703 } 704 cancel(unparker); 705 } 706 707 // park on carrier thread for remaining time when pinned 708 if (!yielded) { 709 long remainingNanos = nanos - (System.nanoTime() - startTime); 710 parkOnCarrierThread(true, remainingNanos); 711 } 712 } 713 } 714 715 /** 716 * Parks the current carrier thread up to the given waiting time or until 717 * unparked or interrupted. If the virtual thread is interrupted then the 718 * interrupt status will be propagated to the carrier thread. 719 * @param timed true for a timed park, false for untimed 720 * @param nanos the waiting time in nanoseconds 721 */ 722 private void parkOnCarrierThread(boolean timed, long nanos) { 723 assert state() == RUNNING; 724 725 setState(timed ? TIMED_PINNED : PINNED); 726 try { 727 if (!parkPermit) { 728 if (!timed) { 729 U.park(false, 0); 730 } else if (nanos > 0) { 731 U.park(false, nanos); 732 } 733 } 734 } finally { 735 setState(RUNNING); 736 } 737 738 // consume parking permit 739 setParkPermit(false); 740 } 741 742 /** 743 * jdk.VirtualThreadPinned is emitted by HotSpot VM when pinned. Call into VM to 744 * emit event to avoid having a JFR in Java with the same name (but different ID) 745 * to events emitted by the VM. 746 */ 747 private static native void virtualThreadPinnedEvent(int reason); 748 749 /** 750 * Schedule this virtual thread to be unparked after a given delay. 751 */ 752 @ChangesCurrentThread 753 private Future<?> scheduleUnpark(long nanos) { 754 assert Thread.currentThread() == this; 755 // need to switch to current carrier thread to avoid nested parking 756 switchToCarrierThread(); 757 try { 758 return delayedTaskScheduler().schedule(this::unpark, nanos, NANOSECONDS); 759 } finally { 760 switchToVirtualThread(this); 761 } 762 } 763 764 /** 765 * Cancels a task if it has not completed. 766 */ 767 @ChangesCurrentThread 768 private void cancel(Future<?> future) { 769 assert Thread.currentThread() == this; 770 if (!future.isDone()) { 771 // need to switch to current carrier thread to avoid nested parking 772 switchToCarrierThread(); 773 try { 774 future.cancel(false); 775 } finally { 776 switchToVirtualThread(this); 777 } 778 } 779 } 780 781 /** 782 * Re-enables this virtual thread for scheduling. If the virtual thread was 783 * {@link #park() parked} then it will be unblocked, otherwise its next call 784 * to {@code park} or {@linkplain #parkNanos(long) parkNanos} is guaranteed 785 * not to block. 786 * @throws RejectedExecutionException if the scheduler cannot accept a task 787 */ 788 @Override 789 @ChangesCurrentThread 790 void unpark() { 791 Thread currentThread = Thread.currentThread(); 792 if (!getAndSetParkPermit(true) && currentThread != this) { 793 int s = state(); 794 boolean parked = (s == PARKED) || (s == TIMED_PARKED); 795 if (parked && compareAndSetState(s, UNPARKED)) { 796 if (currentThread instanceof VirtualThread vthread) { 797 vthread.switchToCarrierThread(); 798 try { 799 submitRunContinuation(); 800 } finally { 801 switchToVirtualThread(vthread); 802 } 803 } else { 804 submitRunContinuation(); 805 } 806 } else if ((s == PINNED) || (s == TIMED_PINNED)) { 807 // unpark carrier thread when pinned 808 disableSuspendAndPreempt(); 809 try { 810 synchronized (carrierThreadAccessLock()) { 811 Thread carrier = carrierThread; 812 if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) { 813 U.unpark(carrier); 814 } 815 } 816 } finally { 817 enableSuspendAndPreempt(); 818 } 819 } 820 } 821 } 822 823 /** 824 * Re-enables this virtual thread for scheduling after blocking on monitor enter. 825 * @throws RejectedExecutionException if the scheduler cannot accept a task 826 */ 827 private void unblock() { 828 assert !Thread.currentThread().isVirtual(); 829 unblocked = true; 830 if (state() == BLOCKED && compareAndSetState(BLOCKED, UNBLOCKED)) { 831 unblocked = false; 832 submitRunContinuation(); 833 } 834 } 835 836 /** 837 * Attempts to yield the current virtual thread (Thread.yield). 838 */ 839 void tryYield() { 840 assert Thread.currentThread() == this; 841 setState(YIELDING); 842 boolean yielded = false; 843 try { 844 yielded = yieldContinuation(); // may throw 845 } finally { 846 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 847 if (!yielded) { 848 assert state() == YIELDING; 849 setState(RUNNING); 850 } 851 } 852 } 853 854 /** 855 * Sleep the current thread for the given sleep time (in nanoseconds). If 856 * nanos is 0 then the thread will attempt to yield. 857 * 858 * @implNote This implementation parks the thread for the given sleeping time 859 * and will therefore be observed in PARKED state during the sleep. Parking 860 * will consume the parking permit so this method makes available the parking 861 * permit after the sleep. This may be observed as a spurious, but benign, 862 * wakeup when the thread subsequently attempts to park. 863 * 864 * @param nanos the maximum number of nanoseconds to sleep 865 * @throws InterruptedException if interrupted while sleeping 866 */ 867 void sleepNanos(long nanos) throws InterruptedException { 868 assert Thread.currentThread() == this && nanos >= 0; 869 if (getAndClearInterrupt()) 870 throw new InterruptedException(); 871 if (nanos == 0) { 872 tryYield(); 873 } else { 874 // park for the sleep time 875 try { 876 long remainingNanos = nanos; 877 long startNanos = System.nanoTime(); 878 while (remainingNanos > 0) { 879 parkNanos(remainingNanos); 880 if (getAndClearInterrupt()) { 881 throw new InterruptedException(); 882 } 883 remainingNanos = nanos - (System.nanoTime() - startNanos); 884 } 885 } finally { 886 // may have been unparked while sleeping 887 setParkPermit(true); 888 } 889 } 890 } 891 892 /** 893 * Waits up to {@code nanos} nanoseconds for this virtual thread to terminate. 894 * A timeout of {@code 0} means to wait forever. 895 * 896 * @throws InterruptedException if interrupted while waiting 897 * @return true if the thread has terminated 898 */ 899 boolean joinNanos(long nanos) throws InterruptedException { 900 if (state() == TERMINATED) 901 return true; 902 903 // ensure termination object exists, then re-check state 904 CountDownLatch termination = getTermination(); 905 if (state() == TERMINATED) 906 return true; 907 908 // wait for virtual thread to terminate 909 if (nanos == 0) { 910 termination.await(); 911 } else { 912 boolean terminated = termination.await(nanos, NANOSECONDS); 913 if (!terminated) { 914 // waiting time elapsed 915 return false; 916 } 917 } 918 assert state() == TERMINATED; 919 return true; 920 } 921 922 @Override 923 void blockedOn(Interruptible b) { 924 disableSuspendAndPreempt(); 925 try { 926 super.blockedOn(b); 927 } finally { 928 enableSuspendAndPreempt(); 929 } 930 } 931 932 @Override 933 @SuppressWarnings("removal") 934 public void interrupt() { 935 if (Thread.currentThread() != this) { 936 checkAccess(); 937 938 // if current thread is a virtual thread then prevent it from being 939 // suspended or unmounted when entering or holding interruptLock 940 Interruptible blocker; 941 disableSuspendAndPreempt(); 942 try { 943 synchronized (interruptLock) { 944 interrupted = true; 945 blocker = nioBlocker(); 946 if (blocker != null) { 947 blocker.interrupt(this); 948 } 949 950 // interrupt carrier thread if mounted 951 Thread carrier = carrierThread; 952 if (carrier != null) carrier.setInterrupt(); 953 } 954 } finally { 955 enableSuspendAndPreempt(); 956 } 957 958 // notify blocker after releasing interruptLock 959 if (blocker != null) { 960 blocker.postInterrupt(); 961 } 962 963 } else { 964 interrupted = true; 965 carrierThread.setInterrupt(); 966 } 967 unpark(); 968 } 969 970 @Override 971 public boolean isInterrupted() { 972 return interrupted; 973 } 974 975 @Override 976 boolean getAndClearInterrupt() { 977 assert Thread.currentThread() == this; 978 boolean oldValue = interrupted; 979 if (oldValue) { 980 disableSuspendAndPreempt(); 981 try { 982 synchronized (interruptLock) { 983 interrupted = false; 984 carrierThread.clearInterrupt(); 985 } 986 } finally { 987 enableSuspendAndPreempt(); 988 } 989 } 990 return oldValue; 991 } 992 993 @Override 994 Thread.State threadState() { 995 int s = state(); 996 switch (s & ~SUSPENDED) { 997 case NEW: 998 return Thread.State.NEW; 999 case STARTED: 1000 // return NEW if thread container not yet set 1001 if (threadContainer() == null) { 1002 return Thread.State.NEW; 1003 } else { 1004 return Thread.State.RUNNABLE; 1005 } 1006 case UNPARKED: 1007 case YIELDED: 1008 // runnable, not mounted 1009 return Thread.State.RUNNABLE; 1010 case UNBLOCKED: 1011 // if designated responsible thread for monitor then thread is blocked 1012 if (isResponsibleForMonitor()) { 1013 return Thread.State.BLOCKED; 1014 } else { 1015 return Thread.State.RUNNABLE; 1016 } 1017 case RUNNING: 1018 // if designated responsible thread for monitor then thread is blocked 1019 if (isResponsibleForMonitor()) { 1020 return Thread.State.BLOCKED; 1021 } 1022 // if mounted then return state of carrier thread 1023 if (Thread.currentThread() != this) { 1024 disableSuspendAndPreempt(); 1025 try { 1026 synchronized (carrierThreadAccessLock()) { 1027 Thread carrierThread = this.carrierThread; 1028 if (carrierThread != null) { 1029 return carrierThread.threadState(); 1030 } 1031 } 1032 } finally { 1033 enableSuspendAndPreempt(); 1034 } 1035 } 1036 // runnable, mounted 1037 return Thread.State.RUNNABLE; 1038 case PARKING: 1039 case TIMED_PARKING: 1040 case YIELDING: 1041 // runnable, in transition 1042 return Thread.State.RUNNABLE; 1043 case PARKED: 1044 case PINNED: 1045 return State.WAITING; 1046 case TIMED_PARKED: 1047 case TIMED_PINNED: 1048 return State.TIMED_WAITING; 1049 case BLOCKING: 1050 case BLOCKED: 1051 return State.BLOCKED; 1052 case TERMINATED: 1053 return Thread.State.TERMINATED; 1054 default: 1055 throw new InternalError(); 1056 } 1057 } 1058 1059 /** 1060 * Returns true if thread is the designated responsible thread for a monitor. 1061 * See objectMonitor.cpp for details. 1062 */ 1063 private boolean isResponsibleForMonitor() { 1064 return (recheckInterval > 0); 1065 } 1066 1067 @Override 1068 boolean alive() { 1069 int s = state; 1070 return (s != NEW && s != TERMINATED); 1071 } 1072 1073 @Override 1074 boolean isTerminated() { 1075 return (state == TERMINATED); 1076 } 1077 1078 @Override 1079 StackTraceElement[] asyncGetStackTrace() { 1080 StackTraceElement[] stackTrace; 1081 do { 1082 stackTrace = (carrierThread != null) 1083 ? super.asyncGetStackTrace() // mounted 1084 : tryGetStackTrace(); // unmounted 1085 if (stackTrace == null) { 1086 Thread.yield(); 1087 } 1088 } while (stackTrace == null); 1089 return stackTrace; 1090 } 1091 1092 /** 1093 * Returns the stack trace for this virtual thread if it is unmounted. 1094 * Returns null if the thread is mounted or in transition. 1095 */ 1096 private StackTraceElement[] tryGetStackTrace() { 1097 int initialState = state() & ~SUSPENDED; 1098 switch (initialState) { 1099 case NEW, STARTED, TERMINATED -> { 1100 return new StackTraceElement[0]; // unmounted, empty stack 1101 } 1102 case RUNNING, PINNED, TIMED_PINNED -> { 1103 return null; // mounted 1104 } 1105 case PARKED, TIMED_PARKED, BLOCKED -> { 1106 // unmounted, not runnable 1107 } 1108 case UNPARKED, UNBLOCKED, YIELDED -> { 1109 // unmounted, runnable 1110 } 1111 case PARKING, TIMED_PARKING, BLOCKING, YIELDING -> { 1112 return null; // in transition 1113 } 1114 default -> throw new InternalError("" + initialState); 1115 } 1116 1117 // thread is unmounted, prevent it from continuing 1118 int suspendedState = initialState | SUSPENDED; 1119 if (!compareAndSetState(initialState, suspendedState)) { 1120 return null; 1121 } 1122 1123 // get stack trace and restore state 1124 StackTraceElement[] stack; 1125 try { 1126 stack = cont.getStackTrace(); 1127 } finally { 1128 assert state == suspendedState; 1129 setState(initialState); 1130 } 1131 boolean resubmit = switch (initialState) { 1132 case UNPARKED, UNBLOCKED, YIELDED -> { 1133 // resubmit as task may have run while suspended 1134 yield true; 1135 } 1136 case PARKED, TIMED_PARKED -> { 1137 // resubmit if unparked while suspended 1138 yield parkPermit && compareAndSetState(initialState, UNPARKED); 1139 } 1140 case BLOCKED -> { 1141 // resubmit if unblocked while suspended 1142 yield unblocked && compareAndSetState(BLOCKED, UNBLOCKED); 1143 } 1144 default -> throw new InternalError(); 1145 }; 1146 if (resubmit) { 1147 submitRunContinuation(); 1148 } 1149 return stack; 1150 } 1151 1152 @Override 1153 public String toString() { 1154 StringBuilder sb = new StringBuilder("VirtualThread[#"); 1155 sb.append(threadId()); 1156 String name = getName(); 1157 if (!name.isEmpty()) { 1158 sb.append(","); 1159 sb.append(name); 1160 } 1161 sb.append("]/"); 1162 1163 // add the carrier state and thread name when mounted 1164 boolean mounted; 1165 if (Thread.currentThread() == this) { 1166 mounted = appendCarrierInfo(sb); 1167 } else { 1168 disableSuspendAndPreempt(); 1169 try { 1170 synchronized (carrierThreadAccessLock()) { 1171 mounted = appendCarrierInfo(sb); 1172 } 1173 } finally { 1174 enableSuspendAndPreempt(); 1175 } 1176 } 1177 1178 // add virtual thread state when not mounted 1179 if (!mounted) { 1180 String stateAsString = threadState().toString(); 1181 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1182 } 1183 1184 return sb.toString(); 1185 } 1186 1187 /** 1188 * Appends the carrier state and thread name to the string buffer if mounted. 1189 * @return true if mounted, false if not mounted 1190 */ 1191 private boolean appendCarrierInfo(StringBuilder sb) { 1192 assert Thread.currentThread() == this || Thread.holdsLock(carrierThreadAccessLock()); 1193 Thread carrier = carrierThread; 1194 if (carrier != null) { 1195 String stateAsString = carrier.threadState().toString(); 1196 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1197 sb.append('@'); 1198 sb.append(carrier.getName()); 1199 return true; 1200 } else { 1201 return false; 1202 } 1203 } 1204 1205 @Override 1206 public int hashCode() { 1207 return (int) threadId(); 1208 } 1209 1210 @Override 1211 public boolean equals(Object obj) { 1212 return obj == this; 1213 } 1214 1215 /** 1216 * Returns a ScheduledExecutorService to execute a delayed task. 1217 */ 1218 private ScheduledExecutorService delayedTaskScheduler() { 1219 long tid = Thread.currentThread().threadId(); 1220 int index = (int) tid & (DELAYED_TASK_SCHEDULERS.length - 1); 1221 return DELAYED_TASK_SCHEDULERS[index]; 1222 } 1223 1224 /** 1225 * Returns the termination object, creating it if needed. 1226 */ 1227 private CountDownLatch getTermination() { 1228 CountDownLatch termination = this.termination; 1229 if (termination == null) { 1230 termination = new CountDownLatch(1); 1231 if (!U.compareAndSetReference(this, TERMINATION, null, termination)) { 1232 termination = this.termination; 1233 } 1234 } 1235 return termination; 1236 } 1237 1238 /** 1239 * Returns the lock object to synchronize on when accessing carrierThread. 1240 * The lock prevents carrierThread from being reset to null during unmount. 1241 */ 1242 private Object carrierThreadAccessLock() { 1243 // return interruptLock as unmount has to coordinate with interrupt 1244 return interruptLock; 1245 } 1246 1247 /** 1248 * Disallow the current thread be suspended or preempted. 1249 */ 1250 private void disableSuspendAndPreempt() { 1251 notifyJvmtiDisableSuspend(true); 1252 Continuation.pin(); 1253 } 1254 1255 /** 1256 * Allow the current thread be suspended or preempted. 1257 */ 1258 private void enableSuspendAndPreempt() { 1259 Continuation.unpin(); 1260 notifyJvmtiDisableSuspend(false); 1261 } 1262 1263 // -- wrappers for get/set of state, parking permit, and carrier thread -- 1264 1265 private int state() { 1266 return state; // volatile read 1267 } 1268 1269 private void setState(int newValue) { 1270 state = newValue; // volatile write 1271 } 1272 1273 private boolean compareAndSetState(int expectedValue, int newValue) { 1274 return U.compareAndSetInt(this, STATE, expectedValue, newValue); 1275 } 1276 1277 private boolean compareAndSetOnWaitingList(byte expectedValue, byte newValue) { 1278 return U.compareAndSetByte(this, ON_WAITING_LIST, expectedValue, newValue); 1279 } 1280 1281 private void setParkPermit(boolean newValue) { 1282 if (parkPermit != newValue) { 1283 parkPermit = newValue; 1284 } 1285 } 1286 1287 private boolean getAndSetParkPermit(boolean newValue) { 1288 if (parkPermit != newValue) { 1289 return U.getAndSetBoolean(this, PARK_PERMIT, newValue); 1290 } else { 1291 return newValue; 1292 } 1293 } 1294 1295 private void setCarrierThread(Thread carrier) { 1296 // U.putReferenceRelease(this, CARRIER_THREAD, carrier); 1297 this.carrierThread = carrier; 1298 } 1299 1300 @IntrinsicCandidate 1301 private static native void setLockId(long tid); 1302 1303 // -- JVM TI support -- 1304 1305 @IntrinsicCandidate 1306 @JvmtiMountTransition 1307 private native void notifyJvmtiStart(); 1308 1309 @IntrinsicCandidate 1310 @JvmtiMountTransition 1311 private native void notifyJvmtiEnd(); 1312 1313 @IntrinsicCandidate 1314 @JvmtiMountTransition 1315 private native void notifyJvmtiMount(boolean hide); 1316 1317 @IntrinsicCandidate 1318 @JvmtiMountTransition 1319 private native void notifyJvmtiUnmount(boolean hide); 1320 1321 @IntrinsicCandidate 1322 @JvmtiMountTransition 1323 private static native void notifyJvmtiHideFrames(boolean hide); 1324 1325 @IntrinsicCandidate 1326 private static native void notifyJvmtiDisableSuspend(boolean enter); 1327 1328 private static native void registerNatives(); 1329 static { 1330 registerNatives(); 1331 } 1332 1333 /** 1334 * Creates the default scheduler. 1335 */ 1336 @SuppressWarnings("removal") 1337 private static ForkJoinPool createDefaultScheduler() { 1338 ForkJoinWorkerThreadFactory factory = pool -> { 1339 PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread(pool); 1340 return AccessController.doPrivileged(pa); 1341 }; 1342 PrivilegedAction<ForkJoinPool> pa = () -> { 1343 int parallelism, maxPoolSize, minRunnable; 1344 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism"); 1345 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize"); 1346 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable"); 1347 if (parallelismValue != null) { 1348 parallelism = Integer.parseInt(parallelismValue); 1349 } else { 1350 parallelism = Runtime.getRuntime().availableProcessors(); 1351 } 1352 if (maxPoolSizeValue != null) { 1353 maxPoolSize = Integer.parseInt(maxPoolSizeValue); 1354 parallelism = Integer.min(parallelism, maxPoolSize); 1355 } else { 1356 maxPoolSize = Integer.max(parallelism, 256); 1357 } 1358 if (minRunnableValue != null) { 1359 minRunnable = Integer.parseInt(minRunnableValue); 1360 } else { 1361 minRunnable = Integer.max(parallelism / 2, 1); 1362 } 1363 Thread.UncaughtExceptionHandler handler = (t, e) -> { }; 1364 boolean asyncMode = true; // FIFO 1365 return new ForkJoinPool(parallelism, factory, handler, asyncMode, 1366 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS); 1367 }; 1368 return AccessController.doPrivileged(pa); 1369 } 1370 1371 /** 1372 * Invoked by the VM for the Thread.vthread_scheduler diagnostic command. 1373 */ 1374 private static byte[] printDefaultScheduler() { 1375 return String.format("%s%n", DEFAULT_SCHEDULER.toString()) 1376 .getBytes(StandardCharsets.UTF_8); 1377 } 1378 1379 /** 1380 * Creates the ScheduledThreadPoolExecutors used to execute delayed tasks. 1381 */ 1382 private static ScheduledExecutorService[] createDelayedTaskSchedulers() { 1383 String propName = "jdk.virtualThreadScheduler.timerQueues"; 1384 String propValue = GetPropertyAction.privilegedGetProperty(propName); 1385 int queueCount; 1386 if (propValue != null) { 1387 queueCount = Integer.parseInt(propValue); 1388 if (queueCount != Integer.highestOneBit(queueCount)) { 1389 throw new RuntimeException("Value of " + propName + " must be power of 2"); 1390 } 1391 } else { 1392 int ncpus = Runtime.getRuntime().availableProcessors(); 1393 queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1); 1394 } 1395 var schedulers = new ScheduledExecutorService[queueCount]; 1396 for (int i = 0; i < queueCount; i++) { 1397 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) 1398 Executors.newScheduledThreadPool(1, task -> { 1399 Thread t = InnocuousThread.newThread("VirtualThread-unparker", task); 1400 t.setDaemon(true); 1401 return t; 1402 }); 1403 stpe.setRemoveOnCancelPolicy(true); 1404 schedulers[i] = stpe; 1405 } 1406 return schedulers; 1407 } 1408 1409 /** 1410 * Schedule virtual threads that are ready to be scheduled after they blocked on 1411 * monitor enter. 1412 */ 1413 private static void unblockVirtualThreads() { 1414 while (true) { 1415 VirtualThread vthread = takeVirtualThreadListToUnblock(); 1416 while (vthread != null) { 1417 assert vthread.onWaitingList == 1; 1418 VirtualThread nextThread = vthread.next; 1419 1420 // remove from list and unblock 1421 vthread.next = null; 1422 boolean changed = vthread.compareAndSetOnWaitingList((byte) 1, (byte) 0); 1423 assert changed; 1424 vthread.unblock(); 1425 1426 vthread = nextThread; 1427 } 1428 } 1429 } 1430 1431 /** 1432 * Retrieves the list of virtual threads that are waiting to be unblocked, waiting 1433 * if necessary until a list of one or more threads becomes available. 1434 */ 1435 private static native VirtualThread takeVirtualThreadListToUnblock(); 1436 1437 static { 1438 var unblocker = InnocuousThread.newThread("VirtualThread-unblocker", 1439 VirtualThread::unblockVirtualThreads); 1440 unblocker.setDaemon(true); 1441 unblocker.start(); 1442 } 1443 }