1 /* 2 * Copyright (c) 2018, 2024, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.lang; 26 27 import java.security.AccessController; 28 import java.security.PrivilegedAction; 29 import java.util.Locale; 30 import java.util.Objects; 31 import java.util.concurrent.Callable; 32 import java.util.concurrent.CountDownLatch; 33 import java.util.concurrent.Executor; 34 import java.util.concurrent.Executors; 35 import java.util.concurrent.ForkJoinPool; 36 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory; 37 import java.util.concurrent.ForkJoinTask; 38 import java.util.concurrent.ForkJoinWorkerThread; 39 import java.util.concurrent.Future; 40 import java.util.concurrent.RejectedExecutionException; 41 import java.util.concurrent.ScheduledExecutorService; 42 import java.util.concurrent.ScheduledThreadPoolExecutor; 43 import java.util.concurrent.TimeUnit; 44 import jdk.internal.event.VirtualThreadEndEvent; 45 import jdk.internal.event.VirtualThreadPinnedEvent; 46 import jdk.internal.event.VirtualThreadStartEvent; 47 import jdk.internal.event.VirtualThreadSubmitFailedEvent; 48 import jdk.internal.misc.CarrierThread; 49 import jdk.internal.misc.InnocuousThread; 50 import jdk.internal.misc.Unsafe; 51 import jdk.internal.vm.Continuation; 52 import jdk.internal.vm.ContinuationScope; 53 import jdk.internal.vm.StackableScope; 54 import jdk.internal.vm.ThreadContainer; 55 import jdk.internal.vm.ThreadContainers; 56 import jdk.internal.vm.annotation.ChangesCurrentThread; 57 import jdk.internal.vm.annotation.Hidden; 58 import jdk.internal.vm.annotation.IntrinsicCandidate; 59 import jdk.internal.vm.annotation.JvmtiMountTransition; 60 import jdk.internal.vm.annotation.ReservedStackAccess; 61 import sun.nio.ch.Interruptible; 62 import sun.security.action.GetPropertyAction; 63 import static java.util.concurrent.TimeUnit.*; 64 65 /** 66 * A thread that is scheduled by the Java virtual machine rather than the operating system. 67 */ 68 final class VirtualThread extends BaseVirtualThread { 69 private static final Unsafe U = Unsafe.getUnsafe(); 70 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads"); 71 private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler(); 72 private static final ScheduledExecutorService[] DELAYED_TASK_SCHEDULERS = createDelayedTaskSchedulers(); 73 private static final int TRACE_PINNING_MODE = tracePinningMode(); 74 75 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state"); 76 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit"); 77 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread"); 78 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination"); 79 80 // scheduler and continuation 81 private final Executor scheduler; 82 private final Continuation cont; 83 private final Runnable runContinuation; 84 85 // virtual thread state, accessed by VM 86 private volatile int state; 87 88 /* 89 * Virtual thread state transitions: 90 * 91 * NEW -> STARTED // Thread.start, schedule to run 92 * STARTED -> TERMINATED // failed to start 93 * STARTED -> RUNNING // first run 94 * RUNNING -> TERMINATED // done 95 * 96 * RUNNING -> PARKING // Thread parking with LockSupport.park 97 * PARKING -> PARKED // cont.yield successful, parked indefinitely 98 * PARKING -> PINNED // cont.yield failed, parked indefinitely on carrier 99 * PARKED -> UNPARKED // unparked, may be scheduled to continue 100 * PINNED -> RUNNING // unparked, continue execution on same carrier 101 * UNPARKED -> RUNNING // continue execution after park 102 * 103 * RUNNING -> TIMED_PARKING // Thread parking with LockSupport.parkNanos 104 * TIMED_PARKING -> TIMED_PARKED // cont.yield successful, timed-parked 105 * TIMED_PARKING -> TIMED_PINNED // cont.yield failed, timed-parked on carrier 106 * TIMED_PARKED -> UNPARKED // unparked, may be scheduled to continue 107 * TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier 108 * 109 * RUNNING -> YIELDING // Thread.yield 110 * YIELDING -> YIELDED // cont.yield successful, may be scheduled to continue 111 * YIELDING -> RUNNING // cont.yield failed 112 * YIELDED -> RUNNING // continue execution after Thread.yield 113 */ 114 private static final int NEW = 0; 115 private static final int STARTED = 1; 116 private static final int RUNNING = 2; // runnable-mounted 117 118 // untimed and timed parking 119 private static final int PARKING = 3; 120 private static final int PARKED = 4; // unmounted 121 private static final int PINNED = 5; // mounted 122 private static final int TIMED_PARKING = 6; 123 private static final int TIMED_PARKED = 7; // unmounted 124 private static final int TIMED_PINNED = 8; // mounted 125 private static final int UNPARKED = 9; // unmounted but runnable 126 127 // Thread.yield 128 private static final int YIELDING = 10; 129 private static final int YIELDED = 11; // unmounted but runnable 130 131 private static final int TERMINATED = 99; // final state 132 133 // can be suspended from scheduling when unmounted 134 private static final int SUSPENDED = 1 << 8; 135 136 // parking permit 137 private volatile boolean parkPermit; 138 139 // carrier thread when mounted, accessed by VM 140 private volatile Thread carrierThread; 141 142 // termination object when joining, created lazily if needed 143 private volatile CountDownLatch termination; 144 145 /** 146 * Returns the continuation scope used for virtual threads. 147 */ 148 static ContinuationScope continuationScope() { 149 return VTHREAD_SCOPE; 150 } 151 152 /** 153 * Creates a new {@code VirtualThread} to run the given task with the given 154 * scheduler. If the given scheduler is {@code null} and the current thread 155 * is a platform thread then the newly created virtual thread will use the 156 * default scheduler. If given scheduler is {@code null} and the current 157 * thread is a virtual thread then the current thread's scheduler is used. 158 * 159 * @param scheduler the scheduler or null 160 * @param name thread name 161 * @param characteristics characteristics 162 * @param task the task to execute 163 */ 164 VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) { 165 super(name, characteristics, /*bound*/ false); 166 Objects.requireNonNull(task); 167 168 // choose scheduler if not specified 169 if (scheduler == null) { 170 Thread parent = Thread.currentThread(); 171 if (parent instanceof VirtualThread vparent) { 172 scheduler = vparent.scheduler; 173 } else { 174 scheduler = DEFAULT_SCHEDULER; 175 } 176 } 177 178 this.scheduler = scheduler; 179 this.cont = new VThreadContinuation(this, task); 180 this.runContinuation = this::runContinuation; 181 } 182 183 /** 184 * The continuation that a virtual thread executes. 185 */ 186 private static class VThreadContinuation extends Continuation { 187 VThreadContinuation(VirtualThread vthread, Runnable task) { 188 super(VTHREAD_SCOPE, wrap(vthread, task)); 189 } 190 @Override 191 protected void onPinned(Continuation.Pinned reason) { 192 if (TRACE_PINNING_MODE > 0) { 193 boolean printAll = (TRACE_PINNING_MODE == 1); 194 VirtualThread vthread = (VirtualThread) Thread.currentThread(); 195 int oldState = vthread.state(); 196 try { 197 // avoid printing when in transition states 198 vthread.setState(RUNNING); 199 PinnedThreadPrinter.printStackTrace(System.out, reason, printAll); 200 } finally { 201 vthread.setState(oldState); 202 } 203 } 204 } 205 private static Runnable wrap(VirtualThread vthread, Runnable task) { 206 return new Runnable() { 207 @Hidden 208 public void run() { 209 vthread.run(task); 210 } 211 }; 212 } 213 } 214 215 /** 216 * Runs or continues execution on the current thread. The virtual thread is mounted 217 * on the current thread before the task runs or continues. It unmounts when the 218 * task completes or yields. 219 */ 220 @ChangesCurrentThread // allow mount/unmount to be inlined 221 private void runContinuation() { 222 // the carrier must be a platform thread 223 if (Thread.currentThread().isVirtual()) { 224 throw new WrongThreadException(); 225 } 226 227 // set state to RUNNING 228 int initialState = state(); 229 if (initialState == STARTED || initialState == UNPARKED || initialState == YIELDED) { 230 // newly started or continue after parking/blocking/Thread.yield 231 if (!compareAndSetState(initialState, RUNNING)) { 232 return; 233 } 234 // consume parking permit when continuing after parking 235 if (initialState == UNPARKED) { 236 setParkPermit(false); 237 } 238 } else { 239 // not runnable 240 return; 241 } 242 243 mount(); 244 try { 245 cont.run(); 246 } finally { 247 unmount(); 248 if (cont.isDone()) { 249 afterDone(); 250 } else { 251 afterYield(); 252 } 253 } 254 } 255 256 /** 257 * Submits the runContinuation task to the scheduler. For the default scheduler, 258 * and calling it on a worker thread, the task will be pushed to the local queue, 259 * otherwise it will be pushed to an external submission queue. 260 * @param scheduler the scheduler 261 * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown 262 * @throws RejectedExecutionException 263 */ 264 @ChangesCurrentThread 265 private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) { 266 boolean done = false; 267 while (!done) { 268 try { 269 // The scheduler's execute method is invoked in the context of the 270 // carrier thread. For the default scheduler this ensures that the 271 // current thread is a ForkJoinWorkerThread so the task will be pushed 272 // to the local queue. For other schedulers, it avoids deadlock that 273 // would arise due to platform and virtual threads contending for a 274 // lock on the scheduler's submission queue. 275 if (currentThread() instanceof VirtualThread vthread) { 276 vthread.switchToCarrierThread(); 277 try { 278 scheduler.execute(runContinuation); 279 } finally { 280 switchToVirtualThread(vthread); 281 } 282 } else { 283 scheduler.execute(runContinuation); 284 } 285 done = true; 286 } catch (RejectedExecutionException ree) { 287 submitFailed(ree); 288 throw ree; 289 } catch (OutOfMemoryError e) { 290 if (retryOnOOME) { 291 U.park(false, 100_000_000); // 100ms 292 } else { 293 throw e; 294 } 295 } 296 } 297 } 298 299 /** 300 * Submits the runContinuation task to given scheduler with a lazy submit. 301 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 302 * @throws RejectedExecutionException 303 * @see ForkJoinPool#lazySubmit(ForkJoinTask) 304 */ 305 private void lazySubmitRunContinuation(ForkJoinPool pool) { 306 assert Thread.currentThread() instanceof CarrierThread; 307 try { 308 pool.lazySubmit(ForkJoinTask.adapt(runContinuation)); 309 } catch (RejectedExecutionException ree) { 310 submitFailed(ree); 311 throw ree; 312 } catch (OutOfMemoryError e) { 313 submitRunContinuation(pool, true); 314 } 315 } 316 317 /** 318 * Submits the runContinuation task to the given scheduler as an external submit. 319 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 320 * @throws RejectedExecutionException 321 * @see ForkJoinPool#externalSubmit(ForkJoinTask) 322 */ 323 private void externalSubmitRunContinuation(ForkJoinPool pool) { 324 assert Thread.currentThread() instanceof CarrierThread; 325 try { 326 pool.externalSubmit(ForkJoinTask.adapt(runContinuation)); 327 } catch (RejectedExecutionException ree) { 328 submitFailed(ree); 329 throw ree; 330 } catch (OutOfMemoryError e) { 331 submitRunContinuation(pool, true); 332 } 333 } 334 335 /** 336 * Submits the runContinuation task to the scheduler. For the default scheduler, 337 * and calling it on a worker thread, the task will be pushed to the local queue, 338 * otherwise it will be pushed to an external submission queue. 339 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 340 * @throws RejectedExecutionException 341 */ 342 private void submitRunContinuation() { 343 submitRunContinuation(scheduler, true); 344 } 345 346 /** 347 * Submits the runContinuation task to the scheduler. For the default scheduler, and 348 * calling it a virtual thread that uses the default scheduler, the task will be 349 * pushed to an external submission queue. This method may throw OutOfMemoryError. 350 * @throws RejectedExecutionException 351 * @throws OutOfMemoryError 352 */ 353 private void externalSubmitRunContinuationOrThrow() { 354 if (scheduler == DEFAULT_SCHEDULER && currentCarrierThread() instanceof CarrierThread ct) { 355 try { 356 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation)); 357 } catch (RejectedExecutionException ree) { 358 submitFailed(ree); 359 throw ree; 360 } 361 } else { 362 submitRunContinuation(scheduler, false); 363 } 364 } 365 366 /** 367 * If enabled, emits a JFR VirtualThreadSubmitFailedEvent. 368 */ 369 private void submitFailed(RejectedExecutionException ree) { 370 var event = new VirtualThreadSubmitFailedEvent(); 371 if (event.isEnabled()) { 372 event.javaThreadId = threadId(); 373 event.exceptionMessage = ree.getMessage(); 374 event.commit(); 375 } 376 } 377 378 /** 379 * Runs a task in the context of this virtual thread. 380 */ 381 private void run(Runnable task) { 382 assert Thread.currentThread() == this && state == RUNNING; 383 384 // notify JVMTI, may post VirtualThreadStart event 385 notifyJvmtiStart(); 386 387 // emit JFR event if enabled 388 if (VirtualThreadStartEvent.isTurnedOn()) { 389 var event = new VirtualThreadStartEvent(); 390 event.javaThreadId = threadId(); 391 event.commit(); 392 } 393 394 Object bindings = Thread.scopedValueBindings(); 395 try { 396 runWith(bindings, task); 397 } catch (Throwable exc) { 398 dispatchUncaughtException(exc); 399 } finally { 400 try { 401 // pop any remaining scopes from the stack, this may block 402 StackableScope.popAll(); 403 404 // emit JFR event if enabled 405 if (VirtualThreadEndEvent.isTurnedOn()) { 406 var event = new VirtualThreadEndEvent(); 407 event.javaThreadId = threadId(); 408 event.commit(); 409 } 410 411 } finally { 412 // notify JVMTI, may post VirtualThreadEnd event 413 notifyJvmtiEnd(); 414 } 415 } 416 } 417 418 /** 419 * Mounts this virtual thread onto the current platform thread. On 420 * return, the current thread is the virtual thread. 421 */ 422 @ChangesCurrentThread 423 @ReservedStackAccess 424 private void mount() { 425 // notify JVMTI before mount 426 notifyJvmtiMount(/*hide*/true); 427 428 // sets the carrier thread 429 Thread carrier = Thread.currentCarrierThread(); 430 setCarrierThread(carrier); 431 432 // sync up carrier thread interrupt status if needed 433 if (interrupted) { 434 carrier.setInterrupt(); 435 } else if (carrier.isInterrupted()) { 436 synchronized (interruptLock) { 437 // need to recheck interrupt status 438 if (!interrupted) { 439 carrier.clearInterrupt(); 440 } 441 } 442 } 443 444 // set Thread.currentThread() to return this virtual thread 445 carrier.setCurrentThread(this); 446 } 447 448 /** 449 * Unmounts this virtual thread from the carrier. On return, the 450 * current thread is the current platform thread. 451 */ 452 @ChangesCurrentThread 453 @ReservedStackAccess 454 private void unmount() { 455 assert !Thread.holdsLock(interruptLock); 456 457 // set Thread.currentThread() to return the platform thread 458 Thread carrier = this.carrierThread; 459 carrier.setCurrentThread(carrier); 460 461 // break connection to carrier thread, synchronized with interrupt 462 synchronized (interruptLock) { 463 setCarrierThread(null); 464 } 465 carrier.clearInterrupt(); 466 467 // notify JVMTI after unmount 468 notifyJvmtiUnmount(/*hide*/false); 469 } 470 471 /** 472 * Sets the current thread to the current carrier thread. 473 */ 474 @ChangesCurrentThread 475 @JvmtiMountTransition 476 private void switchToCarrierThread() { 477 notifyJvmtiHideFrames(true); 478 Thread carrier = this.carrierThread; 479 assert Thread.currentThread() == this 480 && carrier == Thread.currentCarrierThread(); 481 carrier.setCurrentThread(carrier); 482 } 483 484 /** 485 * Sets the current thread to the given virtual thread. 486 */ 487 @ChangesCurrentThread 488 @JvmtiMountTransition 489 private static void switchToVirtualThread(VirtualThread vthread) { 490 Thread carrier = vthread.carrierThread; 491 assert carrier == Thread.currentCarrierThread(); 492 carrier.setCurrentThread(vthread); 493 notifyJvmtiHideFrames(false); 494 } 495 496 /** 497 * Executes the given value returning task on the current carrier thread. 498 */ 499 @ChangesCurrentThread 500 <V> V executeOnCarrierThread(Callable<V> task) throws Exception { 501 assert Thread.currentThread() == this; 502 switchToCarrierThread(); 503 try { 504 return task.call(); 505 } finally { 506 switchToVirtualThread(this); 507 } 508 } 509 510 /** 511 * Invokes Continuation.yield, notifying JVMTI (if enabled) to hide frames until 512 * the continuation continues. 513 */ 514 @Hidden 515 private boolean yieldContinuation() { 516 notifyJvmtiUnmount(/*hide*/true); 517 try { 518 return Continuation.yield(VTHREAD_SCOPE); 519 } finally { 520 notifyJvmtiMount(/*hide*/false); 521 } 522 } 523 524 /** 525 * Invoked after the continuation yields. If parking then it sets the state 526 * and also re-submits the task to continue if unparked while parking. 527 * If yielding due to Thread.yield then it just submits the task to continue. 528 */ 529 private void afterYield() { 530 assert carrierThread == null; 531 532 // re-adjust parallelism if the virtual thread yielded when compensating 533 if (currentThread() instanceof CarrierThread ct) { 534 ct.endBlocking(); 535 } 536 537 int s = state(); 538 539 // LockSupport.park/parkNanos 540 if (s == PARKING || s == TIMED_PARKING) { 541 int newState = (s == PARKING) ? PARKED : TIMED_PARKED; 542 setState(newState); 543 544 // may have been unparked while parking 545 if (parkPermit && compareAndSetState(newState, UNPARKED)) { 546 // lazy submit to continue on the current carrier if possible 547 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 548 lazySubmitRunContinuation(ct.getPool()); 549 } else { 550 submitRunContinuation(); 551 } 552 } 553 return; 554 } 555 556 // Thread.yield 557 if (s == YIELDING) { 558 setState(YIELDED); 559 560 // external submit if there are no tasks in the local task queue 561 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 562 externalSubmitRunContinuation(ct.getPool()); 563 } else { 564 submitRunContinuation(); 565 } 566 return; 567 } 568 569 assert false; 570 } 571 572 /** 573 * Invoked after the continuation completes. 574 */ 575 private void afterDone() { 576 afterDone(true); 577 } 578 579 /** 580 * Invoked after the continuation completes (or start failed). Sets the thread 581 * state to TERMINATED and notifies anyone waiting for the thread to terminate. 582 * 583 * @param notifyContainer true if its container should be notified 584 */ 585 private void afterDone(boolean notifyContainer) { 586 assert carrierThread == null; 587 setState(TERMINATED); 588 589 // notify anyone waiting for this virtual thread to terminate 590 CountDownLatch termination = this.termination; 591 if (termination != null) { 592 assert termination.getCount() == 1; 593 termination.countDown(); 594 } 595 596 // notify container 597 if (notifyContainer) { 598 threadContainer().onExit(this); 599 } 600 601 // clear references to thread locals 602 clearReferences(); 603 } 604 605 /** 606 * Schedules this {@code VirtualThread} to execute. 607 * 608 * @throws IllegalStateException if the container is shutdown or closed 609 * @throws IllegalThreadStateException if the thread has already been started 610 * @throws RejectedExecutionException if the scheduler cannot accept a task 611 */ 612 @Override 613 void start(ThreadContainer container) { 614 if (!compareAndSetState(NEW, STARTED)) { 615 throw new IllegalThreadStateException("Already started"); 616 } 617 618 // bind thread to container 619 assert threadContainer() == null; 620 setThreadContainer(container); 621 622 // start thread 623 boolean addedToContainer = false; 624 boolean started = false; 625 try { 626 container.onStart(this); // may throw 627 addedToContainer = true; 628 629 // scoped values may be inherited 630 inheritScopedValueBindings(container); 631 632 // submit task to run thread, using externalSubmit if possible 633 externalSubmitRunContinuationOrThrow(); 634 started = true; 635 } finally { 636 if (!started) { 637 afterDone(addedToContainer); 638 } 639 } 640 } 641 642 @Override 643 public void start() { 644 start(ThreadContainers.root()); 645 } 646 647 @Override 648 public void run() { 649 // do nothing 650 } 651 652 /** 653 * Parks until unparked or interrupted. If already unparked then the parking 654 * permit is consumed and this method completes immediately (meaning it doesn't 655 * yield). It also completes immediately if the interrupt status is set. 656 */ 657 @Override 658 void park() { 659 assert Thread.currentThread() == this; 660 661 // complete immediately if parking permit available or interrupted 662 if (getAndSetParkPermit(false) || interrupted) 663 return; 664 665 // park the thread 666 boolean yielded = false; 667 setState(PARKING); 668 try { 669 yielded = yieldContinuation(); // may throw 670 } finally { 671 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 672 if (!yielded) { 673 assert state() == PARKING; 674 setState(RUNNING); 675 } 676 } 677 678 // park on the carrier thread when pinned 679 if (!yielded) { 680 parkOnCarrierThread(false, 0); 681 } 682 } 683 684 /** 685 * Parks up to the given waiting time or until unparked or interrupted. 686 * If already unparked then the parking permit is consumed and this method 687 * completes immediately (meaning it doesn't yield). It also completes immediately 688 * if the interrupt status is set or the waiting time is {@code <= 0}. 689 * 690 * @param nanos the maximum number of nanoseconds to wait. 691 */ 692 @Override 693 void parkNanos(long nanos) { 694 assert Thread.currentThread() == this; 695 696 // complete immediately if parking permit available or interrupted 697 if (getAndSetParkPermit(false) || interrupted) 698 return; 699 700 // park the thread for the waiting time 701 if (nanos > 0) { 702 long startTime = System.nanoTime(); 703 704 boolean yielded = false; 705 Future<?> unparker = scheduleUnpark(nanos); // may throw OOME 706 setState(TIMED_PARKING); 707 try { 708 yielded = yieldContinuation(); // may throw 709 } finally { 710 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 711 if (!yielded) { 712 assert state() == TIMED_PARKING; 713 setState(RUNNING); 714 } 715 cancel(unparker); 716 } 717 718 // park on carrier thread for remaining time when pinned 719 if (!yielded) { 720 long remainingNanos = nanos - (System.nanoTime() - startTime); 721 parkOnCarrierThread(true, remainingNanos); 722 } 723 } 724 } 725 726 /** 727 * Parks the current carrier thread up to the given waiting time or until 728 * unparked or interrupted. If the virtual thread is interrupted then the 729 * interrupt status will be propagated to the carrier thread. 730 * @param timed true for a timed park, false for untimed 731 * @param nanos the waiting time in nanoseconds 732 */ 733 private void parkOnCarrierThread(boolean timed, long nanos) { 734 assert state() == RUNNING; 735 736 VirtualThreadPinnedEvent event; 737 try { 738 event = new VirtualThreadPinnedEvent(); 739 event.begin(); 740 } catch (OutOfMemoryError e) { 741 event = null; 742 } 743 744 setState(timed ? TIMED_PINNED : PINNED); 745 try { 746 if (!parkPermit) { 747 if (!timed) { 748 U.park(false, 0); 749 } else if (nanos > 0) { 750 U.park(false, nanos); 751 } 752 } 753 } finally { 754 setState(RUNNING); 755 } 756 757 // consume parking permit 758 setParkPermit(false); 759 760 if (event != null) { 761 try { 762 event.commit(); 763 } catch (OutOfMemoryError e) { 764 // ignore 765 } 766 } 767 } 768 769 /** 770 * Schedule this virtual thread to be unparked after a given delay. 771 */ 772 @ChangesCurrentThread 773 private Future<?> scheduleUnpark(long nanos) { 774 assert Thread.currentThread() == this; 775 // need to switch to current carrier thread to avoid nested parking 776 switchToCarrierThread(); 777 try { 778 return schedule(this::unpark, nanos, NANOSECONDS); 779 } finally { 780 switchToVirtualThread(this); 781 } 782 } 783 784 /** 785 * Cancels a task if it has not completed. 786 */ 787 @ChangesCurrentThread 788 private void cancel(Future<?> future) { 789 assert Thread.currentThread() == this; 790 if (!future.isDone()) { 791 // need to switch to current carrier thread to avoid nested parking 792 switchToCarrierThread(); 793 try { 794 future.cancel(false); 795 } finally { 796 switchToVirtualThread(this); 797 } 798 } 799 } 800 801 /** 802 * Re-enables this virtual thread for scheduling. If this virtual thread is parked 803 * then its task is scheduled to continue, otherwise its next call to {@code park} or 804 * {@linkplain #parkNanos(long) parkNanos} is guaranteed not to block. 805 * @throws RejectedExecutionException if the scheduler cannot accept a task 806 */ 807 @Override 808 void unpark() { 809 if (!getAndSetParkPermit(true) && currentThread() != this) { 810 int s = state(); 811 812 // unparked while parked 813 if ((s == PARKED || s == TIMED_PARKED) && compareAndSetState(s, UNPARKED)) { 814 submitRunContinuation(); 815 return; 816 } 817 818 // unparked while parked when pinned 819 if (s == PINNED || s == TIMED_PINNED) { 820 // unpark carrier thread when pinned 821 disableSuspendAndPreempt(); 822 try { 823 synchronized (carrierThreadAccessLock()) { 824 Thread carrier = carrierThread; 825 if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) { 826 U.unpark(carrier); 827 } 828 } 829 } finally { 830 enableSuspendAndPreempt(); 831 } 832 return; 833 } 834 } 835 } 836 837 /** 838 * Attempts to yield the current virtual thread (Thread.yield). 839 */ 840 void tryYield() { 841 assert Thread.currentThread() == this; 842 setState(YIELDING); 843 boolean yielded = false; 844 try { 845 yielded = yieldContinuation(); // may throw 846 } finally { 847 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 848 if (!yielded) { 849 assert state() == YIELDING; 850 setState(RUNNING); 851 } 852 } 853 } 854 855 /** 856 * Sleep the current thread for the given sleep time (in nanoseconds). If 857 * nanos is 0 then the thread will attempt to yield. 858 * 859 * @implNote This implementation parks the thread for the given sleeping time 860 * and will therefore be observed in PARKED state during the sleep. Parking 861 * will consume the parking permit so this method makes available the parking 862 * permit after the sleep. This may be observed as a spurious, but benign, 863 * wakeup when the thread subsequently attempts to park. 864 * 865 * @param nanos the maximum number of nanoseconds to sleep 866 * @throws InterruptedException if interrupted while sleeping 867 */ 868 void sleepNanos(long nanos) throws InterruptedException { 869 assert Thread.currentThread() == this && nanos >= 0; 870 if (getAndClearInterrupt()) 871 throw new InterruptedException(); 872 if (nanos == 0) { 873 tryYield(); 874 } else { 875 // park for the sleep time 876 try { 877 long remainingNanos = nanos; 878 long startNanos = System.nanoTime(); 879 while (remainingNanos > 0) { 880 parkNanos(remainingNanos); 881 if (getAndClearInterrupt()) { 882 throw new InterruptedException(); 883 } 884 remainingNanos = nanos - (System.nanoTime() - startNanos); 885 } 886 } finally { 887 // may have been unparked while sleeping 888 setParkPermit(true); 889 } 890 } 891 } 892 893 /** 894 * Waits up to {@code nanos} nanoseconds for this virtual thread to terminate. 895 * A timeout of {@code 0} means to wait forever. 896 * 897 * @throws InterruptedException if interrupted while waiting 898 * @return true if the thread has terminated 899 */ 900 boolean joinNanos(long nanos) throws InterruptedException { 901 if (state() == TERMINATED) 902 return true; 903 904 // ensure termination object exists, then re-check state 905 CountDownLatch termination = getTermination(); 906 if (state() == TERMINATED) 907 return true; 908 909 // wait for virtual thread to terminate 910 if (nanos == 0) { 911 termination.await(); 912 } else { 913 boolean terminated = termination.await(nanos, NANOSECONDS); 914 if (!terminated) { 915 // waiting time elapsed 916 return false; 917 } 918 } 919 assert state() == TERMINATED; 920 return true; 921 } 922 923 @Override 924 void blockedOn(Interruptible b) { 925 disableSuspendAndPreempt(); 926 try { 927 super.blockedOn(b); 928 } finally { 929 enableSuspendAndPreempt(); 930 } 931 } 932 933 @Override 934 @SuppressWarnings("removal") 935 public void interrupt() { 936 if (Thread.currentThread() != this) { 937 checkAccess(); 938 939 // if current thread is a virtual thread then prevent it from being 940 // suspended or unmounted when entering or holding interruptLock 941 Interruptible blocker; 942 disableSuspendAndPreempt(); 943 try { 944 synchronized (interruptLock) { 945 interrupted = true; 946 blocker = nioBlocker(); 947 if (blocker != null) { 948 blocker.interrupt(this); 949 } 950 951 // interrupt carrier thread if mounted 952 Thread carrier = carrierThread; 953 if (carrier != null) carrier.setInterrupt(); 954 } 955 } finally { 956 enableSuspendAndPreempt(); 957 } 958 959 // notify blocker after releasing interruptLock 960 if (blocker != null) { 961 blocker.postInterrupt(); 962 } 963 964 // make available parking permit, unpark thread if parked 965 unpark(); 966 967 } else { 968 interrupted = true; 969 carrierThread.setInterrupt(); 970 setParkPermit(true); 971 } 972 } 973 974 @Override 975 public boolean isInterrupted() { 976 return interrupted; 977 } 978 979 @Override 980 boolean getAndClearInterrupt() { 981 assert Thread.currentThread() == this; 982 boolean oldValue = interrupted; 983 if (oldValue) { 984 disableSuspendAndPreempt(); 985 try { 986 synchronized (interruptLock) { 987 interrupted = false; 988 carrierThread.clearInterrupt(); 989 } 990 } finally { 991 enableSuspendAndPreempt(); 992 } 993 } 994 return oldValue; 995 } 996 997 @Override 998 Thread.State threadState() { 999 int s = state(); 1000 switch (s & ~SUSPENDED) { 1001 case NEW: 1002 return Thread.State.NEW; 1003 case STARTED: 1004 // return NEW if thread container not yet set 1005 if (threadContainer() == null) { 1006 return Thread.State.NEW; 1007 } else { 1008 return Thread.State.RUNNABLE; 1009 } 1010 case UNPARKED: 1011 case YIELDED: 1012 // runnable, not mounted 1013 return Thread.State.RUNNABLE; 1014 case RUNNING: 1015 // if mounted then return state of carrier thread 1016 if (Thread.currentThread() != this) { 1017 disableSuspendAndPreempt(); 1018 try { 1019 synchronized (carrierThreadAccessLock()) { 1020 Thread carrierThread = this.carrierThread; 1021 if (carrierThread != null) { 1022 return carrierThread.threadState(); 1023 } 1024 } 1025 } finally { 1026 enableSuspendAndPreempt(); 1027 } 1028 } 1029 // runnable, mounted 1030 return Thread.State.RUNNABLE; 1031 case PARKING: 1032 case TIMED_PARKING: 1033 case YIELDING: 1034 // runnable, in transition 1035 return Thread.State.RUNNABLE; 1036 case PARKED: 1037 case PINNED: 1038 return State.WAITING; 1039 case TIMED_PARKED: 1040 case TIMED_PINNED: 1041 return State.TIMED_WAITING; 1042 case TERMINATED: 1043 return Thread.State.TERMINATED; 1044 default: 1045 throw new InternalError(); 1046 } 1047 } 1048 1049 @Override 1050 boolean alive() { 1051 int s = state; 1052 return (s != NEW && s != TERMINATED); 1053 } 1054 1055 @Override 1056 boolean isTerminated() { 1057 return (state == TERMINATED); 1058 } 1059 1060 @Override 1061 StackTraceElement[] asyncGetStackTrace() { 1062 StackTraceElement[] stackTrace; 1063 do { 1064 stackTrace = (carrierThread != null) 1065 ? super.asyncGetStackTrace() // mounted 1066 : tryGetStackTrace(); // unmounted 1067 if (stackTrace == null) { 1068 Thread.yield(); 1069 } 1070 } while (stackTrace == null); 1071 return stackTrace; 1072 } 1073 1074 /** 1075 * Returns the stack trace for this virtual thread if it is unmounted. 1076 * Returns null if the thread is mounted or in transition. 1077 */ 1078 private StackTraceElement[] tryGetStackTrace() { 1079 int initialState = state() & ~SUSPENDED; 1080 switch (initialState) { 1081 case NEW, STARTED, TERMINATED -> { 1082 return new StackTraceElement[0]; // unmounted, empty stack 1083 } 1084 case RUNNING, PINNED, TIMED_PINNED -> { 1085 return null; // mounted 1086 } 1087 case PARKED, TIMED_PARKED -> { 1088 // unmounted, not runnable 1089 } 1090 case UNPARKED, YIELDED -> { 1091 // unmounted, runnable 1092 } 1093 case PARKING, TIMED_PARKING, YIELDING -> { 1094 return null; // in transition 1095 } 1096 default -> throw new InternalError("" + initialState); 1097 } 1098 1099 // thread is unmounted, prevent it from continuing 1100 int suspendedState = initialState | SUSPENDED; 1101 if (!compareAndSetState(initialState, suspendedState)) { 1102 return null; 1103 } 1104 1105 // get stack trace and restore state 1106 StackTraceElement[] stack; 1107 try { 1108 stack = cont.getStackTrace(); 1109 } finally { 1110 assert state == suspendedState; 1111 setState(initialState); 1112 } 1113 boolean resubmit = switch (initialState) { 1114 case UNPARKED, YIELDED -> { 1115 // resubmit as task may have run while suspended 1116 yield true; 1117 } 1118 case PARKED, TIMED_PARKED -> { 1119 // resubmit if unparked while suspended 1120 yield parkPermit && compareAndSetState(initialState, UNPARKED); 1121 } 1122 default -> throw new InternalError(); 1123 }; 1124 if (resubmit) { 1125 submitRunContinuation(); 1126 } 1127 return stack; 1128 } 1129 1130 @Override 1131 public String toString() { 1132 StringBuilder sb = new StringBuilder("VirtualThread[#"); 1133 sb.append(threadId()); 1134 String name = getName(); 1135 if (!name.isEmpty()) { 1136 sb.append(","); 1137 sb.append(name); 1138 } 1139 sb.append("]/"); 1140 1141 // add the carrier state and thread name when mounted 1142 boolean mounted; 1143 if (Thread.currentThread() == this) { 1144 mounted = appendCarrierInfo(sb); 1145 } else { 1146 disableSuspendAndPreempt(); 1147 try { 1148 synchronized (carrierThreadAccessLock()) { 1149 mounted = appendCarrierInfo(sb); 1150 } 1151 } finally { 1152 enableSuspendAndPreempt(); 1153 } 1154 } 1155 1156 // add virtual thread state when not mounted 1157 if (!mounted) { 1158 String stateAsString = threadState().toString(); 1159 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1160 } 1161 1162 return sb.toString(); 1163 } 1164 1165 /** 1166 * Appends the carrier state and thread name to the string buffer if mounted. 1167 * @return true if mounted, false if not mounted 1168 */ 1169 private boolean appendCarrierInfo(StringBuilder sb) { 1170 assert Thread.currentThread() == this || Thread.holdsLock(carrierThreadAccessLock()); 1171 Thread carrier = carrierThread; 1172 if (carrier != null) { 1173 String stateAsString = carrier.threadState().toString(); 1174 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1175 sb.append('@'); 1176 sb.append(carrier.getName()); 1177 return true; 1178 } else { 1179 return false; 1180 } 1181 } 1182 1183 @Override 1184 public int hashCode() { 1185 return (int) threadId(); 1186 } 1187 1188 @Override 1189 public boolean equals(Object obj) { 1190 return obj == this; 1191 } 1192 1193 /** 1194 * Returns the termination object, creating it if needed. 1195 */ 1196 private CountDownLatch getTermination() { 1197 CountDownLatch termination = this.termination; 1198 if (termination == null) { 1199 termination = new CountDownLatch(1); 1200 if (!U.compareAndSetReference(this, TERMINATION, null, termination)) { 1201 termination = this.termination; 1202 } 1203 } 1204 return termination; 1205 } 1206 1207 /** 1208 * Returns the lock object to synchronize on when accessing carrierThread. 1209 * The lock prevents carrierThread from being reset to null during unmount. 1210 */ 1211 private Object carrierThreadAccessLock() { 1212 // return interruptLock as unmount has to coordinate with interrupt 1213 return interruptLock; 1214 } 1215 1216 /** 1217 * Disallow the current thread be suspended or preempted. 1218 */ 1219 private void disableSuspendAndPreempt() { 1220 notifyJvmtiDisableSuspend(true); 1221 Continuation.pin(); 1222 } 1223 1224 /** 1225 * Allow the current thread be suspended or preempted. 1226 */ 1227 private void enableSuspendAndPreempt() { 1228 Continuation.unpin(); 1229 notifyJvmtiDisableSuspend(false); 1230 } 1231 1232 // -- wrappers for get/set of state, parking permit, and carrier thread -- 1233 1234 private int state() { 1235 return state; // volatile read 1236 } 1237 1238 private void setState(int newValue) { 1239 state = newValue; // volatile write 1240 } 1241 1242 private boolean compareAndSetState(int expectedValue, int newValue) { 1243 return U.compareAndSetInt(this, STATE, expectedValue, newValue); 1244 } 1245 1246 private void setParkPermit(boolean newValue) { 1247 if (parkPermit != newValue) { 1248 parkPermit = newValue; 1249 } 1250 } 1251 1252 private boolean getAndSetParkPermit(boolean newValue) { 1253 if (parkPermit != newValue) { 1254 return U.getAndSetBoolean(this, PARK_PERMIT, newValue); 1255 } else { 1256 return newValue; 1257 } 1258 } 1259 1260 private void setCarrierThread(Thread carrier) { 1261 // U.putReferenceRelease(this, CARRIER_THREAD, carrier); 1262 this.carrierThread = carrier; 1263 } 1264 1265 // -- JVM TI support -- 1266 1267 @IntrinsicCandidate 1268 @JvmtiMountTransition 1269 private native void notifyJvmtiStart(); 1270 1271 @IntrinsicCandidate 1272 @JvmtiMountTransition 1273 private native void notifyJvmtiEnd(); 1274 1275 @IntrinsicCandidate 1276 @JvmtiMountTransition 1277 private native void notifyJvmtiMount(boolean hide); 1278 1279 @IntrinsicCandidate 1280 @JvmtiMountTransition 1281 private native void notifyJvmtiUnmount(boolean hide); 1282 1283 @IntrinsicCandidate 1284 @JvmtiMountTransition 1285 private static native void notifyJvmtiHideFrames(boolean hide); 1286 1287 @IntrinsicCandidate 1288 private static native void notifyJvmtiDisableSuspend(boolean enter); 1289 1290 private static native void registerNatives(); 1291 static { 1292 registerNatives(); 1293 1294 // ensure VTHREAD_GROUP is created, may be accessed by JVMTI 1295 var group = Thread.virtualThreadGroup(); 1296 1297 // ensure VirtualThreadPinnedEvent is loaded/initialized 1298 U.ensureClassInitialized(VirtualThreadPinnedEvent.class); 1299 } 1300 1301 /** 1302 * Creates the default ForkJoinPool scheduler. 1303 */ 1304 @SuppressWarnings("removal") 1305 private static ForkJoinPool createDefaultScheduler() { 1306 ForkJoinWorkerThreadFactory factory = pool -> { 1307 PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread(pool); 1308 return AccessController.doPrivileged(pa); 1309 }; 1310 PrivilegedAction<ForkJoinPool> pa = () -> { 1311 int parallelism, maxPoolSize, minRunnable; 1312 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism"); 1313 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize"); 1314 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable"); 1315 if (parallelismValue != null) { 1316 parallelism = Integer.parseInt(parallelismValue); 1317 } else { 1318 parallelism = Runtime.getRuntime().availableProcessors(); 1319 } 1320 if (maxPoolSizeValue != null) { 1321 maxPoolSize = Integer.parseInt(maxPoolSizeValue); 1322 parallelism = Integer.min(parallelism, maxPoolSize); 1323 } else { 1324 maxPoolSize = Integer.max(parallelism, 256); 1325 } 1326 if (minRunnableValue != null) { 1327 minRunnable = Integer.parseInt(minRunnableValue); 1328 } else { 1329 minRunnable = Integer.max(parallelism / 2, 1); 1330 } 1331 Thread.UncaughtExceptionHandler handler = (t, e) -> { }; 1332 boolean asyncMode = true; // FIFO 1333 return new ForkJoinPool(parallelism, factory, handler, asyncMode, 1334 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS); 1335 }; 1336 return AccessController.doPrivileged(pa); 1337 } 1338 1339 /** 1340 * Schedule a runnable task to run after a delay. 1341 */ 1342 private static Future<?> schedule(Runnable command, long delay, TimeUnit unit) { 1343 long tid = Thread.currentThread().threadId(); 1344 int index = (int) tid & (DELAYED_TASK_SCHEDULERS.length - 1); 1345 return DELAYED_TASK_SCHEDULERS[index].schedule(command, delay, unit); 1346 } 1347 1348 /** 1349 * Creates the ScheduledThreadPoolExecutors used to execute delayed tasks. 1350 */ 1351 private static ScheduledExecutorService[] createDelayedTaskSchedulers() { 1352 String propName = "jdk.virtualThreadScheduler.timerQueues"; 1353 String propValue = GetPropertyAction.privilegedGetProperty(propName); 1354 int queueCount; 1355 if (propValue != null) { 1356 queueCount = Integer.parseInt(propValue); 1357 if (queueCount != Integer.highestOneBit(queueCount)) { 1358 throw new RuntimeException("Value of " + propName + " must be power of 2"); 1359 } 1360 } else { 1361 int ncpus = Runtime.getRuntime().availableProcessors(); 1362 queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1); 1363 } 1364 var schedulers = new ScheduledExecutorService[queueCount]; 1365 for (int i = 0; i < queueCount; i++) { 1366 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) 1367 Executors.newScheduledThreadPool(1, task -> { 1368 Thread t = InnocuousThread.newThread("VirtualThread-unparker", task); 1369 t.setDaemon(true); 1370 return t; 1371 }); 1372 stpe.setRemoveOnCancelPolicy(true); 1373 schedulers[i] = stpe; 1374 } 1375 return schedulers; 1376 } 1377 1378 /** 1379 * Reads the value of the jdk.tracePinnedThreads property to determine if stack 1380 * traces should be printed when a carrier thread is pinned when a virtual thread 1381 * attempts to park. 1382 */ 1383 private static int tracePinningMode() { 1384 String propValue = GetPropertyAction.privilegedGetProperty("jdk.tracePinnedThreads"); 1385 if (propValue != null) { 1386 if (propValue.length() == 0 || "full".equalsIgnoreCase(propValue)) 1387 return 1; 1388 if ("short".equalsIgnoreCase(propValue)) 1389 return 2; 1390 } 1391 return 0; 1392 } 1393 }