1 /* 2 * Copyright (c) 2018, 2024, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.lang; 26 27 import java.security.AccessController; 28 import java.security.PrivilegedAction; 29 import java.util.Locale; 30 import java.util.Objects; 31 import java.util.concurrent.Callable; 32 import java.util.concurrent.CountDownLatch; 33 import java.util.concurrent.Executor; 34 import java.util.concurrent.Executors; 35 import java.util.concurrent.ForkJoinPool; 36 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory; 37 import java.util.concurrent.ForkJoinTask; 38 import java.util.concurrent.ForkJoinWorkerThread; 39 import java.util.concurrent.Future; 40 import java.util.concurrent.RejectedExecutionException; 41 import java.util.concurrent.ScheduledExecutorService; 42 import java.util.concurrent.ScheduledThreadPoolExecutor; 43 import java.util.concurrent.TimeUnit; 44 import jdk.internal.event.VirtualThreadEndEvent; 45 import jdk.internal.event.VirtualThreadPinnedEvent; 46 import jdk.internal.event.VirtualThreadStartEvent; 47 import jdk.internal.event.VirtualThreadSubmitFailedEvent; 48 import jdk.internal.misc.CarrierThread; 49 import jdk.internal.misc.InnocuousThread; 50 import jdk.internal.misc.Unsafe; 51 import jdk.internal.vm.Continuation; 52 import jdk.internal.vm.ContinuationScope; 53 import jdk.internal.vm.StackableScope; 54 import jdk.internal.vm.ThreadContainer; 55 import jdk.internal.vm.ThreadContainers; 56 import jdk.internal.vm.annotation.ChangesCurrentThread; 57 import jdk.internal.vm.annotation.Hidden; 58 import jdk.internal.vm.annotation.IntrinsicCandidate; 59 import jdk.internal.vm.annotation.JvmtiMountTransition; 60 import jdk.internal.vm.annotation.ReservedStackAccess; 61 import sun.nio.ch.Interruptible; 62 import sun.security.action.GetPropertyAction; 63 import static java.util.concurrent.TimeUnit.*; 64 65 /** 66 * A thread that is scheduled by the Java virtual machine rather than the operating system. 67 */ 68 final class VirtualThread extends BaseVirtualThread { 69 private static final Unsafe U = Unsafe.getUnsafe(); 70 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads"); 71 private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler(); 72 private static final ScheduledExecutorService[] DELAYED_TASK_SCHEDULERS = createDelayedTaskSchedulers(); 73 private static final int TRACE_PINNING_MODE = tracePinningMode(); 74 75 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state"); 76 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit"); 77 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread"); 78 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination"); 79 80 // scheduler and continuation 81 private final Executor scheduler; 82 private final Continuation cont; 83 private final Runnable runContinuation; 84 85 // virtual thread state, accessed by VM 86 private volatile int state; 87 88 /* 89 * Virtual thread state transitions: 90 * 91 * NEW -> STARTED // Thread.start, schedule to run 92 * STARTED -> TERMINATED // failed to start 93 * STARTED -> RUNNING // first run 94 * RUNNING -> TERMINATED // done 95 * 96 * RUNNING -> PARKING // Thread parking with LockSupport.park 97 * PARKING -> PARKED // cont.yield successful, parked indefinitely 98 * PARKING -> PINNED // cont.yield failed, parked indefinitely on carrier 99 * PARKED -> UNPARKED // unparked, may be scheduled to continue 100 * PINNED -> RUNNING // unparked, continue execution on same carrier 101 * UNPARKED -> RUNNING // continue execution after park 102 * 103 * RUNNING -> TIMED_PARKING // Thread parking with LockSupport.parkNanos 104 * TIMED_PARKING -> TIMED_PARKED // cont.yield successful, timed-parked 105 * TIMED_PARKING -> TIMED_PINNED // cont.yield failed, timed-parked on carrier 106 * TIMED_PARKED -> UNPARKED // unparked, may be scheduled to continue 107 * TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier 108 * 109 * RUNNING -> YIELDING // Thread.yield 110 * YIELDING -> YIELDED // cont.yield successful, may be scheduled to continue 111 * YIELDING -> RUNNING // cont.yield failed 112 * YIELDED -> RUNNING // continue execution after Thread.yield 113 */ 114 private static final int NEW = 0; 115 private static final int STARTED = 1; 116 private static final int RUNNING = 2; // runnable-mounted 117 118 // untimed and timed parking 119 private static final int PARKING = 3; 120 private static final int PARKED = 4; // unmounted 121 private static final int PINNED = 5; // mounted 122 private static final int TIMED_PARKING = 6; 123 private static final int TIMED_PARKED = 7; // unmounted 124 private static final int TIMED_PINNED = 8; // mounted 125 private static final int UNPARKED = 9; // unmounted but runnable 126 127 // Thread.yield 128 private static final int YIELDING = 10; 129 private static final int YIELDED = 11; // unmounted but runnable 130 131 private static final int TERMINATED = 99; // final state 132 133 // can be suspended from scheduling when unmounted 134 private static final int SUSPENDED = 1 << 8; 135 136 // parking permit 137 private volatile boolean parkPermit; 138 139 // carrier thread when mounted, accessed by VM 140 private volatile Thread carrierThread; 141 142 // termination object when joining, created lazily if needed 143 private volatile CountDownLatch termination; 144 145 146 /** 147 * Returns the default scheduler. 148 */ 149 static Executor defaultScheduler() { 150 return DEFAULT_SCHEDULER; 151 } 152 153 /** 154 * Returns the continuation scope used for virtual threads. 155 */ 156 static ContinuationScope continuationScope() { 157 return VTHREAD_SCOPE; 158 } 159 160 /** 161 * Creates a new {@code VirtualThread} to run the given task with the given 162 * scheduler. If the given scheduler is {@code null} and the current thread 163 * is a platform thread then the newly created virtual thread will use the 164 * default scheduler. If given scheduler is {@code null} and the current 165 * thread is a virtual thread then the current thread's scheduler is used. 166 * 167 * @param scheduler the scheduler or null 168 * @param name thread name 169 * @param characteristics characteristics 170 * @param task the task to execute 171 */ 172 VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) { 173 super(name, characteristics, /*bound*/ false); 174 Objects.requireNonNull(task); 175 176 // choose scheduler if not specified 177 if (scheduler == null) { 178 Thread parent = Thread.currentThread(); 179 if (parent instanceof VirtualThread vparent) { 180 scheduler = vparent.scheduler; 181 } else { 182 scheduler = DEFAULT_SCHEDULER; 183 } 184 } 185 186 this.scheduler = scheduler; 187 this.cont = new VThreadContinuation(this, task); 188 this.runContinuation = this::runContinuation; 189 } 190 191 /** 192 * The continuation that a virtual thread executes. 193 */ 194 private static class VThreadContinuation extends Continuation { 195 VThreadContinuation(VirtualThread vthread, Runnable task) { 196 super(VTHREAD_SCOPE, wrap(vthread, task)); 197 } 198 @Override 199 protected void onPinned(Continuation.Pinned reason) { 200 if (TRACE_PINNING_MODE > 0) { 201 boolean printAll = (TRACE_PINNING_MODE == 1); 202 VirtualThread vthread = (VirtualThread) Thread.currentThread(); 203 int oldState = vthread.state(); 204 try { 205 // avoid printing when in transition states 206 vthread.setState(RUNNING); 207 PinnedThreadPrinter.printStackTrace(System.out, reason, printAll); 208 } finally { 209 vthread.setState(oldState); 210 } 211 } 212 } 213 private static Runnable wrap(VirtualThread vthread, Runnable task) { 214 return new Runnable() { 215 @Hidden 216 public void run() { 217 vthread.run(task); 218 } 219 }; 220 } 221 } 222 223 /** 224 * Runs or continues execution on the current thread. The virtual thread is mounted 225 * on the current thread before the task runs or continues. It unmounts when the 226 * task completes or yields. 227 */ 228 @ChangesCurrentThread // allow mount/unmount to be inlined 229 private void runContinuation() { 230 // the carrier must be a platform thread 231 if (Thread.currentThread().isVirtual()) { 232 throw new WrongThreadException(); 233 } 234 235 // set state to RUNNING 236 int initialState = state(); 237 if (initialState == STARTED || initialState == UNPARKED || initialState == YIELDED) { 238 // newly started or continue after parking/blocking/Thread.yield 239 if (!compareAndSetState(initialState, RUNNING)) { 240 return; 241 } 242 // consume parking permit when continuing after parking 243 if (initialState == UNPARKED) { 244 setParkPermit(false); 245 } 246 } else { 247 // not runnable 248 return; 249 } 250 251 mount(); 252 try { 253 cont.run(); 254 } finally { 255 unmount(); 256 if (cont.isDone()) { 257 afterDone(); 258 } else { 259 afterYield(); 260 } 261 } 262 } 263 264 /** 265 * Submits the runContinuation task to the scheduler. For the default scheduler, 266 * and calling it on a worker thread, the task will be pushed to the local queue, 267 * otherwise it will be pushed to an external submission queue. 268 * @param scheduler the scheduler 269 * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown 270 * @throws RejectedExecutionException 271 */ 272 @ChangesCurrentThread 273 private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) { 274 boolean done = false; 275 while (!done) { 276 try { 277 // The scheduler's execute method is invoked in the context of the 278 // carrier thread. For the default scheduler this ensures that the 279 // current thread is a ForkJoinWorkerThread so the task will be pushed 280 // to the local queue. For other schedulers, it avoids deadlock that 281 // would arise due to platform and virtual threads contending for a 282 // lock on the scheduler's submission queue. 283 if (currentThread() instanceof VirtualThread vthread) { 284 vthread.switchToCarrierThread(); 285 try { 286 scheduler.execute(runContinuation); 287 } finally { 288 switchToVirtualThread(vthread); 289 } 290 } else { 291 scheduler.execute(runContinuation); 292 } 293 done = true; 294 } catch (RejectedExecutionException ree) { 295 submitFailed(ree); 296 throw ree; 297 } catch (OutOfMemoryError e) { 298 if (retryOnOOME) { 299 U.park(false, 100_000_000); // 100ms 300 } else { 301 throw e; 302 } 303 } 304 } 305 } 306 307 /** 308 * Submits the runContinuation task to given scheduler with a lazy submit. 309 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 310 * @throws RejectedExecutionException 311 * @see ForkJoinPool#lazySubmit(ForkJoinTask) 312 */ 313 private void lazySubmitRunContinuation(ForkJoinPool pool) { 314 assert Thread.currentThread() instanceof CarrierThread; 315 try { 316 pool.lazySubmit(ForkJoinTask.adapt(runContinuation)); 317 } catch (RejectedExecutionException ree) { 318 submitFailed(ree); 319 throw ree; 320 } catch (OutOfMemoryError e) { 321 submitRunContinuation(pool, true); 322 } 323 } 324 325 /** 326 * Submits the runContinuation task to the given scheduler as an external submit. 327 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 328 * @throws RejectedExecutionException 329 * @see ForkJoinPool#externalSubmit(ForkJoinTask) 330 */ 331 private void externalSubmitRunContinuation(ForkJoinPool pool) { 332 assert Thread.currentThread() instanceof CarrierThread; 333 try { 334 pool.externalSubmit(ForkJoinTask.adapt(runContinuation)); 335 } catch (RejectedExecutionException ree) { 336 submitFailed(ree); 337 throw ree; 338 } catch (OutOfMemoryError e) { 339 submitRunContinuation(pool, true); 340 } 341 } 342 343 /** 344 * Submits the runContinuation task to the scheduler. For the default scheduler, 345 * and calling it on a worker thread, the task will be pushed to the local queue, 346 * otherwise it will be pushed to an external submission queue. 347 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 348 * @throws RejectedExecutionException 349 */ 350 private void submitRunContinuation() { 351 submitRunContinuation(scheduler, true); 352 } 353 354 /** 355 * Submits the runContinuation task to the scheduler. For the default scheduler, and 356 * calling it a virtual thread that uses the default scheduler, the task will be 357 * pushed to an external submission queue. This method may throw OutOfMemoryError. 358 * @throws RejectedExecutionException 359 * @throws OutOfMemoryError 360 */ 361 private void externalSubmitRunContinuationOrThrow() { 362 if (scheduler == DEFAULT_SCHEDULER && currentCarrierThread() instanceof CarrierThread ct) { 363 try { 364 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation)); 365 } catch (RejectedExecutionException ree) { 366 submitFailed(ree); 367 throw ree; 368 } 369 } else { 370 submitRunContinuation(scheduler, false); 371 } 372 } 373 374 /** 375 * If enabled, emits a JFR VirtualThreadSubmitFailedEvent. 376 */ 377 private void submitFailed(RejectedExecutionException ree) { 378 var event = new VirtualThreadSubmitFailedEvent(); 379 if (event.isEnabled()) { 380 event.javaThreadId = threadId(); 381 event.exceptionMessage = ree.getMessage(); 382 event.commit(); 383 } 384 } 385 386 /** 387 * Runs a task in the context of this virtual thread. 388 */ 389 private void run(Runnable task) { 390 assert Thread.currentThread() == this && state == RUNNING; 391 392 // notify JVMTI, may post VirtualThreadStart event 393 notifyJvmtiStart(); 394 395 // emit JFR event if enabled 396 if (VirtualThreadStartEvent.isTurnedOn()) { 397 var event = new VirtualThreadStartEvent(); 398 event.javaThreadId = threadId(); 399 event.commit(); 400 } 401 402 Object bindings = Thread.scopedValueBindings(); 403 try { 404 runWith(bindings, task); 405 } catch (Throwable exc) { 406 dispatchUncaughtException(exc); 407 } finally { 408 try { 409 // pop any remaining scopes from the stack, this may block 410 StackableScope.popAll(); 411 412 // emit JFR event if enabled 413 if (VirtualThreadEndEvent.isTurnedOn()) { 414 var event = new VirtualThreadEndEvent(); 415 event.javaThreadId = threadId(); 416 event.commit(); 417 } 418 419 } finally { 420 // notify JVMTI, may post VirtualThreadEnd event 421 notifyJvmtiEnd(); 422 } 423 } 424 } 425 426 /** 427 * Mounts this virtual thread onto the current platform thread. On 428 * return, the current thread is the virtual thread. 429 */ 430 @ChangesCurrentThread 431 @ReservedStackAccess 432 private void mount() { 433 // notify JVMTI before mount 434 notifyJvmtiMount(/*hide*/true); 435 436 // sets the carrier thread 437 Thread carrier = Thread.currentCarrierThread(); 438 setCarrierThread(carrier); 439 440 // sync up carrier thread interrupt status if needed 441 if (interrupted) { 442 carrier.setInterrupt(); 443 } else if (carrier.isInterrupted()) { 444 synchronized (interruptLock) { 445 // need to recheck interrupt status 446 if (!interrupted) { 447 carrier.clearInterrupt(); 448 } 449 } 450 } 451 452 // set Thread.currentThread() to return this virtual thread 453 carrier.setCurrentThread(this); 454 } 455 456 /** 457 * Unmounts this virtual thread from the carrier. On return, the 458 * current thread is the current platform thread. 459 */ 460 @ChangesCurrentThread 461 @ReservedStackAccess 462 private void unmount() { 463 assert !Thread.holdsLock(interruptLock); 464 465 // set Thread.currentThread() to return the platform thread 466 Thread carrier = this.carrierThread; 467 carrier.setCurrentThread(carrier); 468 469 // break connection to carrier thread, synchronized with interrupt 470 synchronized (interruptLock) { 471 setCarrierThread(null); 472 } 473 carrier.clearInterrupt(); 474 475 // notify JVMTI after unmount 476 notifyJvmtiUnmount(/*hide*/false); 477 } 478 479 /** 480 * Sets the current thread to the current carrier thread. 481 */ 482 @ChangesCurrentThread 483 @JvmtiMountTransition 484 private void switchToCarrierThread() { 485 notifyJvmtiHideFrames(true); 486 Thread carrier = this.carrierThread; 487 assert Thread.currentThread() == this 488 && carrier == Thread.currentCarrierThread(); 489 carrier.setCurrentThread(carrier); 490 } 491 492 /** 493 * Sets the current thread to the given virtual thread. 494 */ 495 @ChangesCurrentThread 496 @JvmtiMountTransition 497 private static void switchToVirtualThread(VirtualThread vthread) { 498 Thread carrier = vthread.carrierThread; 499 assert carrier == Thread.currentCarrierThread(); 500 carrier.setCurrentThread(vthread); 501 notifyJvmtiHideFrames(false); 502 } 503 504 /** 505 * Executes the given value returning task on the current carrier thread. 506 */ 507 @ChangesCurrentThread 508 <V> V executeOnCarrierThread(Callable<V> task) throws Exception { 509 assert Thread.currentThread() == this; 510 switchToCarrierThread(); 511 try { 512 return task.call(); 513 } finally { 514 switchToVirtualThread(this); 515 } 516 } 517 518 /** 519 * Invokes Continuation.yield, notifying JVMTI (if enabled) to hide frames until 520 * the continuation continues. 521 */ 522 @Hidden 523 private boolean yieldContinuation() { 524 notifyJvmtiUnmount(/*hide*/true); 525 try { 526 return Continuation.yield(VTHREAD_SCOPE); 527 } finally { 528 notifyJvmtiMount(/*hide*/false); 529 } 530 } 531 532 /** 533 * Invoked after the continuation yields. If parking then it sets the state 534 * and also re-submits the task to continue if unparked while parking. 535 * If yielding due to Thread.yield then it just submits the task to continue. 536 */ 537 private void afterYield() { 538 assert carrierThread == null; 539 540 // re-adjust parallelism if the virtual thread yielded when compensating 541 if (currentThread() instanceof CarrierThread ct) { 542 ct.endBlocking(); 543 } 544 545 int s = state(); 546 547 // LockSupport.park/parkNanos 548 if (s == PARKING || s == TIMED_PARKING) { 549 int newState = (s == PARKING) ? PARKED : TIMED_PARKED; 550 setState(newState); 551 552 // may have been unparked while parking 553 if (parkPermit && compareAndSetState(newState, UNPARKED)) { 554 // lazy submit to continue on the current carrier if possible 555 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 556 lazySubmitRunContinuation(ct.getPool()); 557 } else { 558 submitRunContinuation(); 559 } 560 } 561 return; 562 } 563 564 // Thread.yield 565 if (s == YIELDING) { 566 setState(YIELDED); 567 568 // external submit if there are no tasks in the local task queue 569 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 570 externalSubmitRunContinuation(ct.getPool()); 571 } else { 572 submitRunContinuation(); 573 } 574 return; 575 } 576 577 assert false; 578 } 579 580 /** 581 * Invoked after the continuation completes. 582 */ 583 private void afterDone() { 584 afterDone(true); 585 } 586 587 /** 588 * Invoked after the continuation completes (or start failed). Sets the thread 589 * state to TERMINATED and notifies anyone waiting for the thread to terminate. 590 * 591 * @param notifyContainer true if its container should be notified 592 */ 593 private void afterDone(boolean notifyContainer) { 594 assert carrierThread == null; 595 setState(TERMINATED); 596 597 // notify anyone waiting for this virtual thread to terminate 598 CountDownLatch termination = this.termination; 599 if (termination != null) { 600 assert termination.getCount() == 1; 601 termination.countDown(); 602 } 603 604 // notify container 605 if (notifyContainer) { 606 threadContainer().onExit(this); 607 } 608 609 // clear references to thread locals 610 clearReferences(); 611 } 612 613 /** 614 * Schedules this {@code VirtualThread} to execute. 615 * 616 * @throws IllegalStateException if the container is shutdown or closed 617 * @throws IllegalThreadStateException if the thread has already been started 618 * @throws RejectedExecutionException if the scheduler cannot accept a task 619 */ 620 @Override 621 void start(ThreadContainer container) { 622 if (!compareAndSetState(NEW, STARTED)) { 623 throw new IllegalThreadStateException("Already started"); 624 } 625 626 // bind thread to container 627 assert threadContainer() == null; 628 setThreadContainer(container); 629 630 // start thread 631 boolean addedToContainer = false; 632 boolean started = false; 633 try { 634 container.onStart(this); // may throw 635 addedToContainer = true; 636 637 // scoped values may be inherited 638 inheritScopedValueBindings(container); 639 640 // submit task to run thread, using externalSubmit if possible 641 externalSubmitRunContinuationOrThrow(); 642 started = true; 643 } finally { 644 if (!started) { 645 afterDone(addedToContainer); 646 } 647 } 648 } 649 650 @Override 651 public void start() { 652 start(ThreadContainers.root()); 653 } 654 655 @Override 656 public void run() { 657 // do nothing 658 } 659 660 /** 661 * Parks until unparked or interrupted. If already unparked then the parking 662 * permit is consumed and this method completes immediately (meaning it doesn't 663 * yield). It also completes immediately if the interrupt status is set. 664 */ 665 @Override 666 void park() { 667 assert Thread.currentThread() == this; 668 669 // complete immediately if parking permit available or interrupted 670 if (getAndSetParkPermit(false) || interrupted) 671 return; 672 673 // park the thread 674 boolean yielded = false; 675 setState(PARKING); 676 try { 677 yielded = yieldContinuation(); // may throw 678 } finally { 679 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 680 if (!yielded) { 681 assert state() == PARKING; 682 setState(RUNNING); 683 } 684 } 685 686 // park on the carrier thread when pinned 687 if (!yielded) { 688 parkOnCarrierThread(false, 0); 689 } 690 } 691 692 /** 693 * Parks up to the given waiting time or until unparked or interrupted. 694 * If already unparked then the parking permit is consumed and this method 695 * completes immediately (meaning it doesn't yield). It also completes immediately 696 * if the interrupt status is set or the waiting time is {@code <= 0}. 697 * 698 * @param nanos the maximum number of nanoseconds to wait. 699 */ 700 @Override 701 void parkNanos(long nanos) { 702 assert Thread.currentThread() == this; 703 704 // complete immediately if parking permit available or interrupted 705 if (getAndSetParkPermit(false) || interrupted) 706 return; 707 708 // park the thread for the waiting time 709 if (nanos > 0) { 710 long startTime = System.nanoTime(); 711 712 boolean yielded = false; 713 Future<?> unparker = scheduleUnpark(nanos); // may throw OOME 714 setState(TIMED_PARKING); 715 try { 716 yielded = yieldContinuation(); // may throw 717 } finally { 718 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 719 if (!yielded) { 720 assert state() == TIMED_PARKING; 721 setState(RUNNING); 722 } 723 cancel(unparker); 724 } 725 726 // park on carrier thread for remaining time when pinned 727 if (!yielded) { 728 long remainingNanos = nanos - (System.nanoTime() - startTime); 729 parkOnCarrierThread(true, remainingNanos); 730 } 731 } 732 } 733 734 /** 735 * Parks the current carrier thread up to the given waiting time or until 736 * unparked or interrupted. If the virtual thread is interrupted then the 737 * interrupt status will be propagated to the carrier thread. 738 * @param timed true for a timed park, false for untimed 739 * @param nanos the waiting time in nanoseconds 740 */ 741 private void parkOnCarrierThread(boolean timed, long nanos) { 742 assert state() == RUNNING; 743 744 VirtualThreadPinnedEvent event; 745 try { 746 event = new VirtualThreadPinnedEvent(); 747 event.begin(); 748 } catch (OutOfMemoryError e) { 749 event = null; 750 } 751 752 setState(timed ? TIMED_PINNED : PINNED); 753 try { 754 if (!parkPermit) { 755 if (!timed) { 756 U.park(false, 0); 757 } else if (nanos > 0) { 758 U.park(false, nanos); 759 } 760 } 761 } finally { 762 setState(RUNNING); 763 } 764 765 // consume parking permit 766 setParkPermit(false); 767 768 if (event != null) { 769 try { 770 event.commit(); 771 } catch (OutOfMemoryError e) { 772 // ignore 773 } 774 } 775 } 776 777 /** 778 * Schedule this virtual thread to be unparked after a given delay. 779 */ 780 @ChangesCurrentThread 781 private Future<?> scheduleUnpark(long nanos) { 782 assert Thread.currentThread() == this; 783 // need to switch to current carrier thread to avoid nested parking 784 switchToCarrierThread(); 785 try { 786 return schedule(this::unpark, nanos, NANOSECONDS); 787 } finally { 788 switchToVirtualThread(this); 789 } 790 } 791 792 /** 793 * Cancels a task if it has not completed. 794 */ 795 @ChangesCurrentThread 796 private void cancel(Future<?> future) { 797 assert Thread.currentThread() == this; 798 if (!future.isDone()) { 799 // need to switch to current carrier thread to avoid nested parking 800 switchToCarrierThread(); 801 try { 802 future.cancel(false); 803 } finally { 804 switchToVirtualThread(this); 805 } 806 } 807 } 808 809 /** 810 * Re-enables this virtual thread for scheduling. If this virtual thread is parked 811 * then its task is scheduled to continue, otherwise its next call to {@code park} or 812 * {@linkplain #parkNanos(long) parkNanos} is guaranteed not to block. 813 * @throws RejectedExecutionException if the scheduler cannot accept a task 814 */ 815 @Override 816 void unpark() { 817 if (!getAndSetParkPermit(true) && currentThread() != this) { 818 int s = state(); 819 820 // unparked while parked 821 if ((s == PARKED || s == TIMED_PARKED) && compareAndSetState(s, UNPARKED)) { 822 submitRunContinuation(); 823 return; 824 } 825 826 // unparked while parked when pinned 827 if (s == PINNED || s == TIMED_PINNED) { 828 // unpark carrier thread when pinned 829 disableSuspendAndPreempt(); 830 try { 831 synchronized (carrierThreadAccessLock()) { 832 Thread carrier = carrierThread; 833 if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) { 834 U.unpark(carrier); 835 } 836 } 837 } finally { 838 enableSuspendAndPreempt(); 839 } 840 return; 841 } 842 } 843 } 844 845 /** 846 * Attempts to yield the current virtual thread (Thread.yield). 847 */ 848 void tryYield() { 849 assert Thread.currentThread() == this; 850 setState(YIELDING); 851 boolean yielded = false; 852 try { 853 yielded = yieldContinuation(); // may throw 854 } finally { 855 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 856 if (!yielded) { 857 assert state() == YIELDING; 858 setState(RUNNING); 859 } 860 } 861 } 862 863 /** 864 * Sleep the current thread for the given sleep time (in nanoseconds). If 865 * nanos is 0 then the thread will attempt to yield. 866 * 867 * @implNote This implementation parks the thread for the given sleeping time 868 * and will therefore be observed in PARKED state during the sleep. Parking 869 * will consume the parking permit so this method makes available the parking 870 * permit after the sleep. This may be observed as a spurious, but benign, 871 * wakeup when the thread subsequently attempts to park. 872 * 873 * @param nanos the maximum number of nanoseconds to sleep 874 * @throws InterruptedException if interrupted while sleeping 875 */ 876 void sleepNanos(long nanos) throws InterruptedException { 877 assert Thread.currentThread() == this && nanos >= 0; 878 if (getAndClearInterrupt()) 879 throw new InterruptedException(); 880 if (nanos == 0) { 881 tryYield(); 882 } else { 883 // park for the sleep time 884 try { 885 long remainingNanos = nanos; 886 long startNanos = System.nanoTime(); 887 while (remainingNanos > 0) { 888 parkNanos(remainingNanos); 889 if (getAndClearInterrupt()) { 890 throw new InterruptedException(); 891 } 892 remainingNanos = nanos - (System.nanoTime() - startNanos); 893 } 894 } finally { 895 // may have been unparked while sleeping 896 setParkPermit(true); 897 } 898 } 899 } 900 901 /** 902 * Waits up to {@code nanos} nanoseconds for this virtual thread to terminate. 903 * A timeout of {@code 0} means to wait forever. 904 * 905 * @throws InterruptedException if interrupted while waiting 906 * @return true if the thread has terminated 907 */ 908 boolean joinNanos(long nanos) throws InterruptedException { 909 if (state() == TERMINATED) 910 return true; 911 912 // ensure termination object exists, then re-check state 913 CountDownLatch termination = getTermination(); 914 if (state() == TERMINATED) 915 return true; 916 917 // wait for virtual thread to terminate 918 if (nanos == 0) { 919 termination.await(); 920 } else { 921 boolean terminated = termination.await(nanos, NANOSECONDS); 922 if (!terminated) { 923 // waiting time elapsed 924 return false; 925 } 926 } 927 assert state() == TERMINATED; 928 return true; 929 } 930 931 @Override 932 void blockedOn(Interruptible b) { 933 disableSuspendAndPreempt(); 934 try { 935 super.blockedOn(b); 936 } finally { 937 enableSuspendAndPreempt(); 938 } 939 } 940 941 @Override 942 @SuppressWarnings("removal") 943 public void interrupt() { 944 if (Thread.currentThread() != this) { 945 checkAccess(); 946 947 // if current thread is a virtual thread then prevent it from being 948 // suspended or unmounted when entering or holding interruptLock 949 Interruptible blocker; 950 disableSuspendAndPreempt(); 951 try { 952 synchronized (interruptLock) { 953 interrupted = true; 954 blocker = nioBlocker(); 955 if (blocker != null) { 956 blocker.interrupt(this); 957 } 958 959 // interrupt carrier thread if mounted 960 Thread carrier = carrierThread; 961 if (carrier != null) carrier.setInterrupt(); 962 } 963 } finally { 964 enableSuspendAndPreempt(); 965 } 966 967 // notify blocker after releasing interruptLock 968 if (blocker != null) { 969 blocker.postInterrupt(); 970 } 971 972 // make available parking permit, unpark thread if parked 973 unpark(); 974 975 } else { 976 interrupted = true; 977 carrierThread.setInterrupt(); 978 setParkPermit(true); 979 } 980 } 981 982 @Override 983 public boolean isInterrupted() { 984 return interrupted; 985 } 986 987 @Override 988 boolean getAndClearInterrupt() { 989 assert Thread.currentThread() == this; 990 boolean oldValue = interrupted; 991 if (oldValue) { 992 disableSuspendAndPreempt(); 993 try { 994 synchronized (interruptLock) { 995 interrupted = false; 996 carrierThread.clearInterrupt(); 997 } 998 } finally { 999 enableSuspendAndPreempt(); 1000 } 1001 } 1002 return oldValue; 1003 } 1004 1005 @Override 1006 Thread.State threadState() { 1007 int s = state(); 1008 switch (s & ~SUSPENDED) { 1009 case NEW: 1010 return Thread.State.NEW; 1011 case STARTED: 1012 // return NEW if thread container not yet set 1013 if (threadContainer() == null) { 1014 return Thread.State.NEW; 1015 } else { 1016 return Thread.State.RUNNABLE; 1017 } 1018 case UNPARKED: 1019 case YIELDED: 1020 // runnable, not mounted 1021 return Thread.State.RUNNABLE; 1022 case RUNNING: 1023 // if mounted then return state of carrier thread 1024 if (Thread.currentThread() != this) { 1025 disableSuspendAndPreempt(); 1026 try { 1027 synchronized (carrierThreadAccessLock()) { 1028 Thread carrierThread = this.carrierThread; 1029 if (carrierThread != null) { 1030 return carrierThread.threadState(); 1031 } 1032 } 1033 } finally { 1034 enableSuspendAndPreempt(); 1035 } 1036 } 1037 // runnable, mounted 1038 return Thread.State.RUNNABLE; 1039 case PARKING: 1040 case TIMED_PARKING: 1041 case YIELDING: 1042 // runnable, in transition 1043 return Thread.State.RUNNABLE; 1044 case PARKED: 1045 case PINNED: 1046 return State.WAITING; 1047 case TIMED_PARKED: 1048 case TIMED_PINNED: 1049 return State.TIMED_WAITING; 1050 case TERMINATED: 1051 return Thread.State.TERMINATED; 1052 default: 1053 throw new InternalError(); 1054 } 1055 } 1056 1057 @Override 1058 boolean alive() { 1059 int s = state; 1060 return (s != NEW && s != TERMINATED); 1061 } 1062 1063 @Override 1064 boolean isTerminated() { 1065 return (state == TERMINATED); 1066 } 1067 1068 @Override 1069 StackTraceElement[] asyncGetStackTrace() { 1070 StackTraceElement[] stackTrace; 1071 do { 1072 stackTrace = (carrierThread != null) 1073 ? super.asyncGetStackTrace() // mounted 1074 : tryGetStackTrace(); // unmounted 1075 if (stackTrace == null) { 1076 Thread.yield(); 1077 } 1078 } while (stackTrace == null); 1079 return stackTrace; 1080 } 1081 1082 /** 1083 * Returns the stack trace for this virtual thread if it is unmounted. 1084 * Returns null if the thread is mounted or in transition. 1085 */ 1086 private StackTraceElement[] tryGetStackTrace() { 1087 int initialState = state() & ~SUSPENDED; 1088 switch (initialState) { 1089 case NEW, STARTED, TERMINATED -> { 1090 return new StackTraceElement[0]; // unmounted, empty stack 1091 } 1092 case RUNNING, PINNED, TIMED_PINNED -> { 1093 return null; // mounted 1094 } 1095 case PARKED, TIMED_PARKED -> { 1096 // unmounted, not runnable 1097 } 1098 case UNPARKED, YIELDED -> { 1099 // unmounted, runnable 1100 } 1101 case PARKING, TIMED_PARKING, YIELDING -> { 1102 return null; // in transition 1103 } 1104 default -> throw new InternalError("" + initialState); 1105 } 1106 1107 // thread is unmounted, prevent it from continuing 1108 int suspendedState = initialState | SUSPENDED; 1109 if (!compareAndSetState(initialState, suspendedState)) { 1110 return null; 1111 } 1112 1113 // get stack trace and restore state 1114 StackTraceElement[] stack; 1115 try { 1116 stack = cont.getStackTrace(); 1117 } finally { 1118 assert state == suspendedState; 1119 setState(initialState); 1120 } 1121 boolean resubmit = switch (initialState) { 1122 case UNPARKED, YIELDED -> { 1123 // resubmit as task may have run while suspended 1124 yield true; 1125 } 1126 case PARKED, TIMED_PARKED -> { 1127 // resubmit if unparked while suspended 1128 yield parkPermit && compareAndSetState(initialState, UNPARKED); 1129 } 1130 default -> throw new InternalError(); 1131 }; 1132 if (resubmit) { 1133 submitRunContinuation(); 1134 } 1135 return stack; 1136 } 1137 1138 @Override 1139 public String toString() { 1140 StringBuilder sb = new StringBuilder("VirtualThread[#"); 1141 sb.append(threadId()); 1142 String name = getName(); 1143 if (!name.isEmpty()) { 1144 sb.append(","); 1145 sb.append(name); 1146 } 1147 sb.append("]/"); 1148 1149 // add the carrier state and thread name when mounted 1150 boolean mounted; 1151 if (Thread.currentThread() == this) { 1152 mounted = appendCarrierInfo(sb); 1153 } else { 1154 disableSuspendAndPreempt(); 1155 try { 1156 synchronized (carrierThreadAccessLock()) { 1157 mounted = appendCarrierInfo(sb); 1158 } 1159 } finally { 1160 enableSuspendAndPreempt(); 1161 } 1162 } 1163 1164 // add virtual thread state when not mounted 1165 if (!mounted) { 1166 String stateAsString = threadState().toString(); 1167 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1168 } 1169 1170 return sb.toString(); 1171 } 1172 1173 /** 1174 * Appends the carrier state and thread name to the string buffer if mounted. 1175 * @return true if mounted, false if not mounted 1176 */ 1177 private boolean appendCarrierInfo(StringBuilder sb) { 1178 assert Thread.currentThread() == this || Thread.holdsLock(carrierThreadAccessLock()); 1179 Thread carrier = carrierThread; 1180 if (carrier != null) { 1181 String stateAsString = carrier.threadState().toString(); 1182 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1183 sb.append('@'); 1184 sb.append(carrier.getName()); 1185 return true; 1186 } else { 1187 return false; 1188 } 1189 } 1190 1191 @Override 1192 public int hashCode() { 1193 return (int) threadId(); 1194 } 1195 1196 @Override 1197 public boolean equals(Object obj) { 1198 return obj == this; 1199 } 1200 1201 /** 1202 * Returns the termination object, creating it if needed. 1203 */ 1204 private CountDownLatch getTermination() { 1205 CountDownLatch termination = this.termination; 1206 if (termination == null) { 1207 termination = new CountDownLatch(1); 1208 if (!U.compareAndSetReference(this, TERMINATION, null, termination)) { 1209 termination = this.termination; 1210 } 1211 } 1212 return termination; 1213 } 1214 1215 /** 1216 * Returns the lock object to synchronize on when accessing carrierThread. 1217 * The lock prevents carrierThread from being reset to null during unmount. 1218 */ 1219 private Object carrierThreadAccessLock() { 1220 // return interruptLock as unmount has to coordinate with interrupt 1221 return interruptLock; 1222 } 1223 1224 /** 1225 * Disallow the current thread be suspended or preempted. 1226 */ 1227 private void disableSuspendAndPreempt() { 1228 notifyJvmtiDisableSuspend(true); 1229 Continuation.pin(); 1230 } 1231 1232 /** 1233 * Allow the current thread be suspended or preempted. 1234 */ 1235 private void enableSuspendAndPreempt() { 1236 Continuation.unpin(); 1237 notifyJvmtiDisableSuspend(false); 1238 } 1239 1240 // -- wrappers for get/set of state, parking permit, and carrier thread -- 1241 1242 private int state() { 1243 return state; // volatile read 1244 } 1245 1246 private void setState(int newValue) { 1247 state = newValue; // volatile write 1248 } 1249 1250 private boolean compareAndSetState(int expectedValue, int newValue) { 1251 return U.compareAndSetInt(this, STATE, expectedValue, newValue); 1252 } 1253 1254 private void setParkPermit(boolean newValue) { 1255 if (parkPermit != newValue) { 1256 parkPermit = newValue; 1257 } 1258 } 1259 1260 private boolean getAndSetParkPermit(boolean newValue) { 1261 if (parkPermit != newValue) { 1262 return U.getAndSetBoolean(this, PARK_PERMIT, newValue); 1263 } else { 1264 return newValue; 1265 } 1266 } 1267 1268 private void setCarrierThread(Thread carrier) { 1269 // U.putReferenceRelease(this, CARRIER_THREAD, carrier); 1270 this.carrierThread = carrier; 1271 } 1272 1273 // -- JVM TI support -- 1274 1275 @IntrinsicCandidate 1276 @JvmtiMountTransition 1277 private native void notifyJvmtiStart(); 1278 1279 @IntrinsicCandidate 1280 @JvmtiMountTransition 1281 private native void notifyJvmtiEnd(); 1282 1283 @IntrinsicCandidate 1284 @JvmtiMountTransition 1285 private native void notifyJvmtiMount(boolean hide); 1286 1287 @IntrinsicCandidate 1288 @JvmtiMountTransition 1289 private native void notifyJvmtiUnmount(boolean hide); 1290 1291 @IntrinsicCandidate 1292 @JvmtiMountTransition 1293 private static native void notifyJvmtiHideFrames(boolean hide); 1294 1295 @IntrinsicCandidate 1296 private static native void notifyJvmtiDisableSuspend(boolean enter); 1297 1298 private static native void registerNatives(); 1299 static { 1300 registerNatives(); 1301 1302 // ensure VTHREAD_GROUP is created, may be accessed by JVMTI 1303 var group = Thread.virtualThreadGroup(); 1304 1305 // ensure VirtualThreadPinnedEvent is loaded/initialized 1306 U.ensureClassInitialized(VirtualThreadPinnedEvent.class); 1307 } 1308 1309 /** 1310 * Creates the default ForkJoinPool scheduler. 1311 */ 1312 @SuppressWarnings("removal") 1313 private static ForkJoinPool createDefaultScheduler() { 1314 ForkJoinWorkerThreadFactory factory = pool -> { 1315 PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread(pool); 1316 return AccessController.doPrivileged(pa); 1317 }; 1318 PrivilegedAction<ForkJoinPool> pa = () -> { 1319 int parallelism, maxPoolSize, minRunnable; 1320 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism"); 1321 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize"); 1322 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable"); 1323 if (parallelismValue != null) { 1324 parallelism = Integer.parseInt(parallelismValue); 1325 } else { 1326 parallelism = Runtime.getRuntime().availableProcessors(); 1327 } 1328 if (maxPoolSizeValue != null) { 1329 maxPoolSize = Integer.parseInt(maxPoolSizeValue); 1330 parallelism = Integer.min(parallelism, maxPoolSize); 1331 } else { 1332 maxPoolSize = Integer.max(parallelism, 256); 1333 } 1334 if (minRunnableValue != null) { 1335 minRunnable = Integer.parseInt(minRunnableValue); 1336 } else { 1337 minRunnable = Integer.max(parallelism / 2, 1); 1338 } 1339 Thread.UncaughtExceptionHandler handler = (t, e) -> { }; 1340 boolean asyncMode = true; // FIFO 1341 return new ForkJoinPool(parallelism, factory, handler, asyncMode, 1342 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS); 1343 }; 1344 return AccessController.doPrivileged(pa); 1345 } 1346 1347 /** 1348 * Schedule a runnable task to run after a delay. 1349 */ 1350 private static Future<?> schedule(Runnable command, long delay, TimeUnit unit) { 1351 long tid = Thread.currentThread().threadId(); 1352 int index = (int) tid & (DELAYED_TASK_SCHEDULERS.length - 1); 1353 return DELAYED_TASK_SCHEDULERS[index].schedule(command, delay, unit); 1354 } 1355 1356 /** 1357 * Creates the ScheduledThreadPoolExecutors used to execute delayed tasks. 1358 */ 1359 private static ScheduledExecutorService[] createDelayedTaskSchedulers() { 1360 String propName = "jdk.virtualThreadScheduler.timerQueues"; 1361 String propValue = GetPropertyAction.privilegedGetProperty(propName); 1362 int queueCount; 1363 if (propValue != null) { 1364 queueCount = Integer.parseInt(propValue); 1365 if (queueCount != Integer.highestOneBit(queueCount)) { 1366 throw new RuntimeException("Value of " + propName + " must be power of 2"); 1367 } 1368 } else { 1369 int ncpus = Runtime.getRuntime().availableProcessors(); 1370 queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1); 1371 } 1372 var schedulers = new ScheduledExecutorService[queueCount]; 1373 for (int i = 0; i < queueCount; i++) { 1374 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) 1375 Executors.newScheduledThreadPool(1, task -> { 1376 Thread t = InnocuousThread.newThread("VirtualThread-unparker", task); 1377 t.setDaemon(true); 1378 return t; 1379 }); 1380 stpe.setRemoveOnCancelPolicy(true); 1381 schedulers[i] = stpe; 1382 } 1383 return schedulers; 1384 } 1385 1386 /** 1387 * Reads the value of the jdk.tracePinnedThreads property to determine if stack 1388 * traces should be printed when a carrier thread is pinned when a virtual thread 1389 * attempts to park. 1390 */ 1391 private static int tracePinningMode() { 1392 String propValue = GetPropertyAction.privilegedGetProperty("jdk.tracePinnedThreads"); 1393 if (propValue != null) { 1394 if (propValue.length() == 0 || "full".equalsIgnoreCase(propValue)) 1395 return 1; 1396 if ("short".equalsIgnoreCase(propValue)) 1397 return 2; 1398 } 1399 return 0; 1400 } 1401 }