1 /* 2 * Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.lang; 26 27 import java.nio.charset.StandardCharsets; 28 import java.security.AccessController; 29 import java.security.PrivilegedAction; 30 import java.util.Locale; 31 import java.util.Objects; 32 import java.util.concurrent.Callable; 33 import java.util.concurrent.CountDownLatch; 34 import java.util.concurrent.Executor; 35 import java.util.concurrent.Executors; 36 import java.util.concurrent.ForkJoinPool; 37 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory; 38 import java.util.concurrent.ForkJoinTask; 39 import java.util.concurrent.ForkJoinWorkerThread; 40 import java.util.concurrent.Future; 41 import java.util.concurrent.RejectedExecutionException; 42 import java.util.concurrent.ScheduledExecutorService; 43 import java.util.concurrent.ScheduledThreadPoolExecutor; 44 import jdk.internal.event.VirtualThreadEndEvent; 45 import jdk.internal.event.VirtualThreadPinnedEvent; 46 import jdk.internal.event.VirtualThreadStartEvent; 47 import jdk.internal.event.VirtualThreadSubmitFailedEvent; 48 import jdk.internal.misc.CarrierThread; 49 import jdk.internal.misc.InnocuousThread; 50 import jdk.internal.misc.Unsafe; 51 import jdk.internal.vm.Continuation; 52 import jdk.internal.vm.ContinuationScope; 53 import jdk.internal.vm.StackableScope; 54 import jdk.internal.vm.ThreadContainer; 55 import jdk.internal.vm.ThreadContainers; 56 import jdk.internal.vm.annotation.ChangesCurrentThread; 57 import jdk.internal.vm.annotation.Hidden; 58 import jdk.internal.vm.annotation.IntrinsicCandidate; 59 import jdk.internal.vm.annotation.JvmtiMountTransition; 60 import jdk.internal.vm.annotation.ReservedStackAccess; 61 import sun.nio.ch.Interruptible; 62 import sun.security.action.GetPropertyAction; 63 import static java.util.concurrent.TimeUnit.*; 64 65 /** 66 * A thread that is scheduled by the Java virtual machine rather than the operating 67 * system. 68 */ 69 final class VirtualThread extends BaseVirtualThread { 70 private static final Unsafe U = Unsafe.getUnsafe(); 71 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads"); 72 private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler(); 73 private static final ScheduledExecutorService[] DELAYED_TASK_SCHEDULERS = createDelayedTaskSchedulers(); 74 private static final int TRACE_PINNING_MODE = tracePinningMode(); 75 76 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state"); 77 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit"); 78 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread"); 79 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination"); 80 81 // scheduler and continuation 82 private final Executor scheduler; 83 private final Continuation cont; 84 private final Runnable runContinuation; 85 86 // virtual thread state, accessed by VM 87 private volatile int state; 88 89 /* 90 * Virtual thread state transitions: 91 * 92 * NEW -> STARTED // Thread.start, schedule to run 93 * STARTED -> TERMINATED // failed to start 94 * STARTED -> RUNNING // first run 95 * RUNNING -> TERMINATED // done 96 * 97 * RUNNING -> PARKING // Thread parking with LockSupport.park 98 * PARKING -> PARKED // cont.yield successful, parked indefinitely 99 * PARKING -> PINNED // cont.yield failed, parked indefinitely on carrier 100 * PARKED -> UNPARKED // unparked, may be scheduled to continue 101 * PINNED -> RUNNING // unparked, continue execution on same carrier 102 * UNPARKED -> RUNNING // continue execution after park 103 * 104 * RUNNING -> TIMED_PARKING // Thread parking with LockSupport.parkNanos 105 * TIMED_PARKING -> TIMED_PARKED // cont.yield successful, timed-parked 106 * TIMED_PARKING -> TIMED_PINNED // cont.yield failed, timed-parked on carrier 107 * TIMED_PARKED -> UNPARKED // unparked, may be scheduled to continue 108 * TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier 109 * 110 * RUNNABLE -> BLOCKING // blocking on monitor enter 111 * BLOCKING -> BLOCKED // blocked on monitor enter 112 * BLOCKED -> UNBLOCKED // unblocked, may be scheduled to continue 113 * UNBLOCKED -> RUNNING // continue execution after blocked on monitor enter 114 * 115 * RUNNING -> YIELDING // Thread.yield 116 * YIELDING -> YIELDED // cont.yield successful, may be scheduled to continue 117 * YIELDING -> RUNNING // cont.yield failed 118 * YIELDED -> RUNNING // continue execution after Thread.yield 119 */ 120 private static final int NEW = 0; 121 private static final int STARTED = 1; 122 private static final int RUNNING = 2; // runnable-mounted 123 124 // untimed and timed parking 125 private static final int PARKING = 3; 126 private static final int PARKED = 4; // unmounted 127 private static final int PINNED = 5; // mounted 128 private static final int TIMED_PARKING = 6; 129 private static final int TIMED_PARKED = 7; // unmounted 130 private static final int TIMED_PINNED = 8; // mounted 131 private static final int UNPARKED = 9; // unmounted but runnable 132 133 // Thread.yield 134 private static final int YIELDING = 10; 135 private static final int YIELDED = 11; // unmounted but runnable 136 137 // monitor enter 138 private static final int BLOCKING = 12; 139 private static final int BLOCKED = 13; // unmounted 140 private static final int UNBLOCKED = 14; // unmounted but runnable 141 142 private static final int TERMINATED = 99; // final state 143 144 // can be suspended from scheduling when unmounted 145 private static final int SUSPENDED = 1 << 8; 146 147 // parking permit 148 private volatile boolean parkPermit; 149 150 // unblocked 151 private volatile boolean unblocked; 152 153 // carrier thread when mounted, accessed by VM 154 private volatile Thread carrierThread; 155 156 // termination object when joining, created lazily if needed 157 private volatile CountDownLatch termination; 158 159 /** 160 * Returns the continuation scope used for virtual threads. 161 */ 162 static ContinuationScope continuationScope() { 163 return VTHREAD_SCOPE; 164 } 165 166 /** 167 * Creates a new {@code VirtualThread} to run the given task with the given 168 * scheduler. If the given scheduler is {@code null} and the current thread 169 * is a platform thread then the newly created virtual thread will use the 170 * default scheduler. If given scheduler is {@code null} and the current 171 * thread is a virtual thread then the current thread's scheduler is used. 172 * 173 * @param scheduler the scheduler or null 174 * @param name thread name 175 * @param characteristics characteristics 176 * @param task the task to execute 177 */ 178 VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) { 179 super(name, characteristics, /*bound*/ false); 180 Objects.requireNonNull(task); 181 182 // choose scheduler if not specified 183 if (scheduler == null) { 184 Thread parent = Thread.currentThread(); 185 if (parent instanceof VirtualThread vparent) { 186 scheduler = vparent.scheduler; 187 } else { 188 scheduler = DEFAULT_SCHEDULER; 189 } 190 } 191 192 this.scheduler = scheduler; 193 this.cont = new VThreadContinuation(this, task); 194 this.runContinuation = this::runContinuation; 195 } 196 197 /** 198 * The continuation that a virtual thread executes. 199 */ 200 private static class VThreadContinuation extends Continuation { 201 VThreadContinuation(VirtualThread vthread, Runnable task) { 202 super(VTHREAD_SCOPE, wrap(vthread, task)); 203 } 204 @Override 205 protected void onPinned(Continuation.Pinned reason) { 206 if (TRACE_PINNING_MODE > 0) { 207 boolean printAll = (TRACE_PINNING_MODE == 1); 208 VirtualThread vthread = (VirtualThread) Thread.currentThread(); 209 int oldState = vthread.state(); 210 try { 211 // avoid printing when in transition states 212 vthread.setState(RUNNING); 213 PinnedThreadPrinter.printStackTrace(System.out, printAll); 214 } finally { 215 vthread.setState(oldState); 216 } 217 } 218 } 219 private static Runnable wrap(VirtualThread vthread, Runnable task) { 220 return new Runnable() { 221 @Hidden 222 public void run() { 223 vthread.run(task); 224 } 225 }; 226 } 227 } 228 229 /** 230 * Runs or continues execution on the current thread. The virtual thread is mounted 231 * on the current thread before the task runs or continues. It unmounts when the 232 * task completes or yields. 233 */ 234 @ChangesCurrentThread 235 private void runContinuation() { 236 // the carrier must be a platform thread 237 if (Thread.currentThread().isVirtual()) { 238 throw new WrongThreadException(); 239 } 240 241 // set state to RUNNING 242 int initialState = state(); 243 if (initialState == STARTED || initialState == UNPARKED 244 || initialState == UNBLOCKED || initialState == YIELDED) { 245 // newly started or continue after parking/blocking/Thread.yield 246 if (!compareAndSetState(initialState, RUNNING)) { 247 return; 248 } 249 // consume parking permit when continuing after parking 250 if (initialState == UNPARKED) { 251 setParkPermit(false); 252 } 253 } else { 254 // not runnable 255 return; 256 } 257 258 mount(); 259 try { 260 cont.run(); 261 } finally { 262 unmount(); 263 if (cont.isDone()) { 264 afterDone(); 265 } else { 266 afterYield(); 267 } 268 } 269 } 270 271 /** 272 * Submits the runContinuation task to the scheduler. For the default scheduler, 273 * and calling it on a worker thread, the task will be pushed to the local queue, 274 * otherwise it will be pushed to an external submission queue. 275 * @throws RejectedExecutionException 276 */ 277 private void submitRunContinuation() { 278 try { 279 scheduler.execute(runContinuation); 280 } catch (RejectedExecutionException ree) { 281 submitFailed(ree); 282 throw ree; 283 } 284 } 285 286 /** 287 * Submits the runContinuation task the scheduler. For the default scheduler, the task 288 * will be pushed to an external submission queue. 289 * @throws RejectedExecutionException 290 */ 291 private void externalSubmitRunContinuation() { 292 if (scheduler == DEFAULT_SCHEDULER 293 && currentCarrierThread() instanceof CarrierThread ct) { 294 externalSubmitRunContinuation(ct.getPool()); 295 } else { 296 submitRunContinuation(); 297 } 298 } 299 300 /** 301 * Submits the runContinuation task to given scheduler with a lazy submit. 302 * @throws RejectedExecutionException 303 * @see ForkJoinPool#lazySubmit(ForkJoinTask) 304 */ 305 private void lazySubmitRunContinuation(ForkJoinPool pool) { 306 try { 307 pool.lazySubmit(ForkJoinTask.adapt(runContinuation)); 308 } catch (RejectedExecutionException ree) { 309 submitFailed(ree); 310 throw ree; 311 } 312 } 313 314 /** 315 * Submits the runContinuation task to the given scheduler as an external submit. 316 * @throws RejectedExecutionException 317 * @see ForkJoinPool#externalSubmit(ForkJoinTask) 318 */ 319 private void externalSubmitRunContinuation(ForkJoinPool pool) { 320 try { 321 pool.externalSubmit(ForkJoinTask.adapt(runContinuation)); 322 } catch (RejectedExecutionException ree) { 323 submitFailed(ree); 324 throw ree; 325 } 326 } 327 328 /** 329 * If enabled, emits a JFR VirtualThreadSubmitFailedEvent. 330 */ 331 private void submitFailed(RejectedExecutionException ree) { 332 var event = new VirtualThreadSubmitFailedEvent(); 333 if (event.isEnabled()) { 334 event.javaThreadId = threadId(); 335 event.exceptionMessage = ree.getMessage(); 336 event.commit(); 337 } 338 } 339 340 /** 341 * Runs a task in the context of this virtual thread. 342 */ 343 private void run(Runnable task) { 344 assert Thread.currentThread() == this && state == RUNNING; 345 346 // notify JVMTI, may post VirtualThreadStart event 347 notifyJvmtiStart(); 348 349 // emit JFR event if enabled 350 if (VirtualThreadStartEvent.isTurnedOn()) { 351 var event = new VirtualThreadStartEvent(); 352 event.javaThreadId = threadId(); 353 event.commit(); 354 } 355 356 Object bindings = Thread.scopedValueBindings(); 357 try { 358 runWith(bindings, task); 359 } catch (Throwable exc) { 360 dispatchUncaughtException(exc); 361 } finally { 362 try { 363 // pop any remaining scopes from the stack, this may block 364 StackableScope.popAll(); 365 366 // emit JFR event if enabled 367 if (VirtualThreadEndEvent.isTurnedOn()) { 368 var event = new VirtualThreadEndEvent(); 369 event.javaThreadId = threadId(); 370 event.commit(); 371 } 372 373 } finally { 374 // notify JVMTI, may post VirtualThreadEnd event 375 notifyJvmtiEnd(); 376 } 377 } 378 } 379 380 /** 381 * Mounts this virtual thread onto the current platform thread. On 382 * return, the current thread is the virtual thread. 383 */ 384 @ChangesCurrentThread 385 @ReservedStackAccess 386 private void mount() { 387 // notify JVMTI before mount 388 notifyJvmtiMount(/*hide*/true); 389 390 // sets the carrier thread 391 Thread carrier = Thread.currentCarrierThread(); 392 setCarrierThread(carrier); 393 394 // sync up carrier thread interrupt status if needed 395 if (interrupted) { 396 carrier.setInterrupt(); 397 } else if (carrier.isInterrupted()) { 398 synchronized (interruptLock) { 399 // need to recheck interrupt status 400 if (!interrupted) { 401 carrier.clearInterrupt(); 402 } 403 } 404 } 405 406 // set Thread.currentThread() to return this virtual thread 407 carrier.setCurrentThread(this); 408 } 409 410 /** 411 * Unmounts this virtual thread from the carrier. On return, the 412 * current thread is the current platform thread. 413 */ 414 @ChangesCurrentThread 415 @ReservedStackAccess 416 private void unmount() { 417 assert !Thread.holdsLock(interruptLock); 418 419 // set Thread.currentThread() to return the platform thread 420 Thread carrier = this.carrierThread; 421 carrier.setCurrentThread(carrier); 422 423 // break connection to carrier thread, synchronized with interrupt 424 synchronized (interruptLock) { 425 setCarrierThread(null); 426 } 427 carrier.clearInterrupt(); 428 429 // notify JVMTI after unmount 430 notifyJvmtiUnmount(/*hide*/false); 431 } 432 433 /** 434 * Sets the current thread to the current carrier thread. 435 */ 436 @ChangesCurrentThread 437 @JvmtiMountTransition 438 private void switchToCarrierThread() { 439 notifyJvmtiHideFrames(true); 440 Thread carrier = this.carrierThread; 441 assert Thread.currentThread() == this 442 && carrier == Thread.currentCarrierThread(); 443 carrier.setCurrentThread(carrier); 444 } 445 446 /** 447 * Sets the current thread to the given virtual thread. 448 */ 449 @ChangesCurrentThread 450 @JvmtiMountTransition 451 private void switchToVirtualThread(VirtualThread vthread) { 452 Thread carrier = vthread.carrierThread; 453 assert carrier == Thread.currentCarrierThread(); 454 carrier.setCurrentThread(vthread); 455 notifyJvmtiHideFrames(false); 456 } 457 458 /** 459 * Executes the given value returning task on the current carrier thread. 460 */ 461 @ChangesCurrentThread 462 <V> V executeOnCarrierThread(Callable<V> task) throws Exception { 463 assert Thread.currentThread() == this; 464 switchToCarrierThread(); 465 try { 466 return task.call(); 467 } finally { 468 switchToVirtualThread(this); 469 } 470 } 471 472 /** 473 * Invokes Continuation.yield, notifying JVMTI (if enabled) to hide frames until 474 * the continuation continues. 475 */ 476 @Hidden 477 private boolean yieldContinuation() { 478 notifyJvmtiUnmount(/*hide*/true); 479 try { 480 return Continuation.yield(VTHREAD_SCOPE); 481 } finally { 482 notifyJvmtiMount(/*hide*/false); 483 } 484 } 485 486 /** 487 * Invoked after the continuation yields. If parking then it sets the state 488 * and also re-submits the task to continue if unparked while parking. 489 * If yielding due to Thread.yield then it just submits the task to continue. 490 */ 491 private void afterYield() { 492 assert carrierThread == null; 493 494 int s = state(); 495 496 // LockSupport.park/parkNanos 497 if (s == PARKING || s == TIMED_PARKING) { 498 int newState = (s == PARKING) ? PARKED : TIMED_PARKED; 499 setState(newState); 500 501 // may have been unparked while parking 502 if (parkPermit && compareAndSetState(newState, UNPARKED)) { 503 // lazy submit to continue on the current thread as carrier if possible 504 if (currentThread() instanceof CarrierThread ct) { 505 lazySubmitRunContinuation(ct.getPool()); 506 } else { 507 submitRunContinuation(); 508 } 509 510 } 511 return; 512 } 513 514 // Thread.yield 515 if (s == YIELDING) { 516 setState(YIELDED); 517 518 // external submit if there are no tasks in the local task queue 519 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 520 externalSubmitRunContinuation(ct.getPool()); 521 } else { 522 submitRunContinuation(); 523 } 524 return; 525 } 526 527 // blocking on monitorenter 528 if (s == BLOCKING) { 529 setState(BLOCKED); 530 if (unblocked && compareAndSetState(BLOCKED, UNBLOCKED)) { 531 unblocked = false; 532 submitRunContinuation(); 533 } 534 return; 535 } 536 537 assert false; 538 } 539 540 /** 541 * Invoked after the continuation completes. 542 */ 543 private void afterDone() { 544 afterDone(true); 545 } 546 547 /** 548 * Invoked after the continuation completes (or start failed). Sets the thread 549 * state to TERMINATED and notifies anyone waiting for the thread to terminate. 550 * 551 * @param notifyContainer true if its container should be notified 552 */ 553 private void afterDone(boolean notifyContainer) { 554 assert carrierThread == null; 555 setState(TERMINATED); 556 557 // notify anyone waiting for this virtual thread to terminate 558 CountDownLatch termination = this.termination; 559 if (termination != null) { 560 assert termination.getCount() == 1; 561 termination.countDown(); 562 } 563 564 // notify container 565 if (notifyContainer) { 566 threadContainer().onExit(this); 567 } 568 569 // clear references to thread locals 570 clearReferences(); 571 } 572 573 /** 574 * Schedules this {@code VirtualThread} to execute. 575 * 576 * @throws IllegalStateException if the container is shutdown or closed 577 * @throws IllegalThreadStateException if the thread has already been started 578 * @throws RejectedExecutionException if the scheduler cannot accept a task 579 */ 580 @Override 581 void start(ThreadContainer container) { 582 if (!compareAndSetState(NEW, STARTED)) { 583 throw new IllegalThreadStateException("Already started"); 584 } 585 586 // bind thread to container 587 assert threadContainer() == null; 588 setThreadContainer(container); 589 590 // start thread 591 boolean addedToContainer = false; 592 boolean started = false; 593 try { 594 container.onStart(this); // may throw 595 addedToContainer = true; 596 597 // scoped values may be inherited 598 inheritScopedValueBindings(container); 599 600 // submit task to run thread, using externalSubmit if possible 601 externalSubmitRunContinuation(); 602 started = true; 603 } finally { 604 if (!started) { 605 afterDone(addedToContainer); 606 } 607 } 608 } 609 610 @Override 611 public void start() { 612 start(ThreadContainers.root()); 613 } 614 615 @Override 616 public void run() { 617 // do nothing 618 } 619 620 /** 621 * Parks until unparked or interrupted. If already unparked then the parking 622 * permit is consumed and this method completes immediately (meaning it doesn't 623 * yield). It also completes immediately if the interrupt status is set. 624 */ 625 @Override 626 void park() { 627 assert Thread.currentThread() == this; 628 629 // complete immediately if parking permit available or interrupted 630 if (getAndSetParkPermit(false) || interrupted) 631 return; 632 633 // park the thread 634 boolean yielded = false; 635 setState(PARKING); 636 try { 637 yielded = yieldContinuation(); // may throw 638 } finally { 639 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 640 if (!yielded) { 641 assert state() == PARKING; 642 setState(RUNNING); 643 } 644 } 645 646 // park on the carrier thread when pinned 647 if (!yielded) { 648 parkOnCarrierThread(false, 0); 649 } 650 } 651 652 /** 653 * Parks up to the given waiting time or until unparked or interrupted. 654 * If already unparked then the parking permit is consumed and this method 655 * completes immediately (meaning it doesn't yield). It also completes immediately 656 * if the interrupt status is set or the waiting time is {@code <= 0}. 657 * 658 * @param nanos the maximum number of nanoseconds to wait. 659 */ 660 @Override 661 void parkNanos(long nanos) { 662 assert Thread.currentThread() == this; 663 664 // complete immediately if parking permit available or interrupted 665 if (getAndSetParkPermit(false) || interrupted) 666 return; 667 668 // park the thread for the waiting time 669 if (nanos > 0) { 670 long startTime = System.nanoTime(); 671 672 boolean yielded = false; 673 Future<?> unparker = scheduleUnpark(nanos); // may throw OOME 674 setState(TIMED_PARKING); 675 try { 676 yielded = yieldContinuation(); // may throw 677 } finally { 678 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 679 if (!yielded) { 680 assert state() == TIMED_PARKING; 681 setState(RUNNING); 682 } 683 cancel(unparker); 684 } 685 686 // park on carrier thread for remaining time when pinned 687 if (!yielded) { 688 long remainingNanos = nanos - (System.nanoTime() - startTime); 689 parkOnCarrierThread(true, remainingNanos); 690 } 691 } 692 } 693 694 /** 695 * Parks the current carrier thread up to the given waiting time or until 696 * unparked or interrupted. If the virtual thread is interrupted then the 697 * interrupt status will be propagated to the carrier thread. 698 * @param timed true for a timed park, false for untimed 699 * @param nanos the waiting time in nanoseconds 700 */ 701 private void parkOnCarrierThread(boolean timed, long nanos) { 702 assert state() == RUNNING; 703 704 VirtualThreadPinnedEvent event; 705 try { 706 event = new VirtualThreadPinnedEvent(); 707 event.begin(); 708 } catch (OutOfMemoryError e) { 709 event = null; 710 } 711 712 setState(timed ? TIMED_PINNED : PINNED); 713 try { 714 if (!parkPermit) { 715 if (!timed) { 716 U.park(false, 0); 717 } else if (nanos > 0) { 718 U.park(false, nanos); 719 } 720 } 721 } finally { 722 setState(RUNNING); 723 } 724 725 // consume parking permit 726 setParkPermit(false); 727 728 if (event != null) { 729 try { 730 event.commit(); 731 } catch (OutOfMemoryError e) { 732 // ignore 733 } 734 } 735 } 736 737 /** 738 * Schedule this virtual thread to be unparked after a given delay. 739 */ 740 @ChangesCurrentThread 741 private Future<?> scheduleUnpark(long nanos) { 742 assert Thread.currentThread() == this; 743 // need to switch to current carrier thread to avoid nested parking 744 switchToCarrierThread(); 745 try { 746 return delayedTaskScheduler().schedule(this::unpark, nanos, NANOSECONDS); 747 } finally { 748 switchToVirtualThread(this); 749 } 750 } 751 752 /** 753 * Cancels a task if it has not completed. 754 */ 755 @ChangesCurrentThread 756 private void cancel(Future<?> future) { 757 assert Thread.currentThread() == this; 758 if (!future.isDone()) { 759 // need to switch to current carrier thread to avoid nested parking 760 switchToCarrierThread(); 761 try { 762 future.cancel(false); 763 } finally { 764 switchToVirtualThread(this); 765 } 766 } 767 } 768 769 /** 770 * Re-enables this virtual thread for scheduling. If the virtual thread was 771 * {@link #park() parked} then it will be unblocked, otherwise its next call 772 * to {@code park} or {@linkplain #parkNanos(long) parkNanos} is guaranteed 773 * not to block. 774 * @throws RejectedExecutionException if the scheduler cannot accept a task 775 */ 776 @Override 777 @ChangesCurrentThread 778 void unpark() { 779 Thread currentThread = Thread.currentThread(); 780 if (!getAndSetParkPermit(true) && currentThread != this) { 781 int s = state(); 782 boolean parked = (s == PARKED) || (s == TIMED_PARKED); 783 if (parked && compareAndSetState(s, UNPARKED)) { 784 if (currentThread instanceof VirtualThread vthread) { 785 vthread.switchToCarrierThread(); 786 try { 787 submitRunContinuation(); 788 } finally { 789 switchToVirtualThread(vthread); 790 } 791 } else { 792 submitRunContinuation(); 793 } 794 } else if ((s == PINNED) || (s == TIMED_PINNED)) { 795 // unpark carrier thread when pinned 796 synchronized (carrierThreadAccessLock()) { 797 Thread carrier = carrierThread; 798 if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) { 799 U.unpark(carrier); 800 } 801 } 802 } 803 } 804 } 805 806 /** 807 * Re-enables this virtual thread for scheduling after blocking on monitor enter. 808 * @throws RejectedExecutionException if the scheduler cannot accept a task 809 */ 810 private void unblock() { 811 assert !Thread.currentThread().isVirtual(); 812 unblocked = true; 813 if (state() == BLOCKED && compareAndSetState(BLOCKED, UNBLOCKED)) { 814 unblocked = false; 815 submitRunContinuation(); 816 } 817 } 818 819 /** 820 * Attempts to yield the current virtual thread (Thread.yield). 821 */ 822 void tryYield() { 823 assert Thread.currentThread() == this; 824 setState(YIELDING); 825 boolean yielded = false; 826 try { 827 yielded = yieldContinuation(); // may throw 828 } finally { 829 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 830 if (!yielded) { 831 assert state() == YIELDING; 832 setState(RUNNING); 833 } 834 } 835 } 836 837 /** 838 * Sleep the current thread for the given sleep time (in nanoseconds). If 839 * nanos is 0 then the thread will attempt to yield. 840 * 841 * @implNote This implementation parks the thread for the given sleeping time 842 * and will therefore be observed in PARKED state during the sleep. Parking 843 * will consume the parking permit so this method makes available the parking 844 * permit after the sleep. This may be observed as a spurious, but benign, 845 * wakeup when the thread subsequently attempts to park. 846 * 847 * @param nanos the maximum number of nanoseconds to sleep 848 * @throws InterruptedException if interrupted while sleeping 849 */ 850 void sleepNanos(long nanos) throws InterruptedException { 851 assert Thread.currentThread() == this && nanos >= 0; 852 if (getAndClearInterrupt()) 853 throw new InterruptedException(); 854 if (nanos == 0) { 855 tryYield(); 856 } else { 857 // park for the sleep time 858 try { 859 long remainingNanos = nanos; 860 long startNanos = System.nanoTime(); 861 while (remainingNanos > 0) { 862 parkNanos(remainingNanos); 863 if (getAndClearInterrupt()) { 864 throw new InterruptedException(); 865 } 866 remainingNanos = nanos - (System.nanoTime() - startNanos); 867 } 868 } finally { 869 // may have been unparked while sleeping 870 setParkPermit(true); 871 } 872 } 873 } 874 875 /** 876 * Waits up to {@code nanos} nanoseconds for this virtual thread to terminate. 877 * A timeout of {@code 0} means to wait forever. 878 * 879 * @throws InterruptedException if interrupted while waiting 880 * @return true if the thread has terminated 881 */ 882 boolean joinNanos(long nanos) throws InterruptedException { 883 if (state() == TERMINATED) 884 return true; 885 886 // ensure termination object exists, then re-check state 887 CountDownLatch termination = getTermination(); 888 if (state() == TERMINATED) 889 return true; 890 891 // wait for virtual thread to terminate 892 if (nanos == 0) { 893 termination.await(); 894 } else { 895 boolean terminated = termination.await(nanos, NANOSECONDS); 896 if (!terminated) { 897 // waiting time elapsed 898 return false; 899 } 900 } 901 assert state() == TERMINATED; 902 return true; 903 } 904 905 @Override 906 @SuppressWarnings("removal") 907 public void interrupt() { 908 if (Thread.currentThread() != this) { 909 checkAccess(); 910 synchronized (interruptLock) { 911 interrupted = true; 912 Interruptible b = nioBlocker; 913 if (b != null) { 914 // ensure current thread doesn't unmount while holding interruptLock 915 Continuation.pin(); 916 try { 917 b.interrupt(this); 918 } finally { 919 Continuation.unpin(); 920 } 921 } 922 923 // interrupt carrier thread if mounted 924 Thread carrier = carrierThread; 925 if (carrier != null) carrier.setInterrupt(); 926 } 927 } else { 928 interrupted = true; 929 carrierThread.setInterrupt(); 930 } 931 unpark(); 932 } 933 934 @Override 935 public boolean isInterrupted() { 936 return interrupted; 937 } 938 939 @Override 940 boolean getAndClearInterrupt() { 941 assert Thread.currentThread() == this; 942 boolean oldValue = interrupted; 943 if (oldValue) { 944 // ensure current thread doesn't unmount trying to enter interruptLock 945 Continuation.pin(); 946 try { 947 synchronized (interruptLock) { 948 interrupted = false; 949 carrierThread.clearInterrupt(); 950 } 951 } finally { 952 Continuation.unpin(); 953 } 954 } 955 return oldValue; 956 } 957 958 @Override 959 Thread.State threadState() { 960 int s = state(); 961 switch (s & ~SUSPENDED) { 962 case NEW: 963 return Thread.State.NEW; 964 case STARTED: 965 // return NEW if thread container not yet set 966 if (threadContainer() == null) { 967 return Thread.State.NEW; 968 } else { 969 return Thread.State.RUNNABLE; 970 } 971 case UNPARKED: 972 case UNBLOCKED: 973 case YIELDED: 974 // runnable, not mounted 975 return Thread.State.RUNNABLE; 976 case RUNNING: 977 // if mounted then return state of carrier thread 978 if (Thread.currentThread() != this) { 979 synchronized (carrierThreadAccessLock()) { 980 Thread carrier = this.carrierThread; 981 if (carrier != null) { 982 return carrier.threadState(); 983 } 984 } 985 } 986 // runnable, mounted 987 return Thread.State.RUNNABLE; 988 case PARKING: 989 case TIMED_PARKING: 990 case YIELDING: 991 case BLOCKING: 992 // runnable, in transition 993 return Thread.State.RUNNABLE; 994 case PARKED: 995 case PINNED: 996 return State.WAITING; 997 case TIMED_PARKED: 998 case TIMED_PINNED: 999 return State.TIMED_WAITING; 1000 case BLOCKED: 1001 return State.BLOCKED; 1002 case TERMINATED: 1003 return Thread.State.TERMINATED; 1004 default: 1005 throw new InternalError(); 1006 } 1007 } 1008 1009 @Override 1010 boolean alive() { 1011 int s = state; 1012 return (s != NEW && s != TERMINATED); 1013 } 1014 1015 @Override 1016 boolean isTerminated() { 1017 return (state == TERMINATED); 1018 } 1019 1020 @Override 1021 StackTraceElement[] asyncGetStackTrace() { 1022 StackTraceElement[] stackTrace; 1023 do { 1024 stackTrace = (carrierThread != null) 1025 ? super.asyncGetStackTrace() // mounted 1026 : tryGetStackTrace(); // unmounted 1027 if (stackTrace == null) { 1028 Thread.yield(); 1029 } 1030 } while (stackTrace == null); 1031 return stackTrace; 1032 } 1033 1034 /** 1035 * Returns the stack trace for this virtual thread if it is unmounted. 1036 * Returns null if the thread is mounted or in transition. 1037 */ 1038 private StackTraceElement[] tryGetStackTrace() { 1039 int initialState = state(); 1040 switch (initialState) { 1041 case NEW, STARTED, TERMINATED -> { 1042 return new StackTraceElement[0]; // unmounted, empty stack 1043 } 1044 case RUNNING, PINNED -> { 1045 return null; // mounted 1046 } 1047 case PARKED, TIMED_PARKED, BLOCKED -> { 1048 // unmounted, not runnable 1049 } 1050 case UNPARKED, UNBLOCKED, YIELDED -> { 1051 // unmounted, runnable 1052 } 1053 case PARKING, TIMED_PARKING, BLOCKING, YIELDING -> { 1054 return null; // in transition 1055 } 1056 default -> throw new InternalError(); 1057 } 1058 1059 // thread is unmounted, prevent it from continuing 1060 int suspendedState = initialState | SUSPENDED; 1061 if (!compareAndSetState(initialState, suspendedState)) { 1062 return null; 1063 } 1064 1065 // get stack trace and restore state 1066 StackTraceElement[] stack; 1067 try { 1068 stack = cont.getStackTrace(); 1069 } finally { 1070 assert state == suspendedState; 1071 setState(initialState); 1072 } 1073 boolean resubmit = switch (initialState) { 1074 case UNPARKED, UNBLOCKED, YIELDED -> { 1075 // resubmit as task may have run while suspended 1076 yield true; 1077 } 1078 case PARKED, TIMED_PARKED -> { 1079 // resubmit if unparked while suspended 1080 yield parkPermit && compareAndSetState(initialState, UNPARKED); 1081 } 1082 case BLOCKED -> { 1083 // resubmit if unblocked while suspended 1084 yield unblocked && compareAndSetState(BLOCKED, UNBLOCKED); 1085 } 1086 default -> throw new InternalError(); 1087 }; 1088 if (resubmit) { 1089 submitRunContinuation(); 1090 } 1091 return stack; 1092 } 1093 1094 @Override 1095 public String toString() { 1096 StringBuilder sb = new StringBuilder("VirtualThread[#"); 1097 sb.append(threadId()); 1098 String name = getName(); 1099 if (!name.isEmpty()) { 1100 sb.append(","); 1101 sb.append(name); 1102 } 1103 sb.append("]/"); 1104 1105 // add the carrier state and thread name when mounted 1106 boolean mounted; 1107 if (Thread.currentThread() == this) { 1108 mounted = appendCarrierInfo(sb); 1109 } else { 1110 synchronized (carrierThreadAccessLock()) { 1111 mounted = appendCarrierInfo(sb); 1112 } 1113 } 1114 1115 // add virtual thread state when not mounted 1116 if (!mounted) { 1117 String stateAsString = threadState().toString(); 1118 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1119 } 1120 1121 return sb.toString(); 1122 } 1123 1124 /** 1125 * Appends the carrier state and thread name to the string buffer if mounted. 1126 * @return true if mounted, false if not mounted 1127 */ 1128 private boolean appendCarrierInfo(StringBuilder sb) { 1129 assert Thread.currentThread() == this || Thread.holdsLock(carrierThreadAccessLock()); 1130 Thread carrier = carrierThread; 1131 if (carrier != null) { 1132 String stateAsString = carrier.threadState().toString(); 1133 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1134 sb.append('@'); 1135 sb.append(carrier.getName()); 1136 return true; 1137 } else { 1138 return false; 1139 } 1140 } 1141 1142 @Override 1143 public int hashCode() { 1144 return (int) threadId(); 1145 } 1146 1147 @Override 1148 public boolean equals(Object obj) { 1149 return obj == this; 1150 } 1151 1152 /** 1153 * Returns a ScheduledExecutorService to execute a delayed task. 1154 */ 1155 private ScheduledExecutorService delayedTaskScheduler() { 1156 long tid = Thread.currentThread().threadId(); 1157 int index = (int) tid & (DELAYED_TASK_SCHEDULERS.length - 1); 1158 return DELAYED_TASK_SCHEDULERS[index]; 1159 } 1160 1161 /** 1162 * Returns the termination object, creating it if needed. 1163 */ 1164 private CountDownLatch getTermination() { 1165 CountDownLatch termination = this.termination; 1166 if (termination == null) { 1167 termination = new CountDownLatch(1); 1168 if (!U.compareAndSetReference(this, TERMINATION, null, termination)) { 1169 termination = this.termination; 1170 } 1171 } 1172 return termination; 1173 } 1174 1175 /** 1176 * Returns the lock object to synchronize on when accessing carrierThread. 1177 * The lock prevents carrierThread from being reset to null during unmount. 1178 */ 1179 private Object carrierThreadAccessLock() { 1180 // return interruptLock as unmount has to coordinate with interrupt 1181 return interruptLock; 1182 } 1183 1184 // -- wrappers for get/set of state, parking permit, and carrier thread -- 1185 1186 private int state() { 1187 return state; // volatile read 1188 } 1189 1190 private void setState(int newValue) { 1191 state = newValue; // volatile write 1192 } 1193 1194 private boolean compareAndSetState(int expectedValue, int newValue) { 1195 return U.compareAndSetInt(this, STATE, expectedValue, newValue); 1196 } 1197 1198 private void setParkPermit(boolean newValue) { 1199 if (parkPermit != newValue) { 1200 parkPermit = newValue; 1201 } 1202 } 1203 1204 private boolean getAndSetParkPermit(boolean newValue) { 1205 if (parkPermit != newValue) { 1206 return U.getAndSetBoolean(this, PARK_PERMIT, newValue); 1207 } else { 1208 return newValue; 1209 } 1210 } 1211 1212 private void setCarrierThread(Thread carrier) { 1213 // U.putReferenceRelease(this, CARRIER_THREAD, carrier); 1214 this.carrierThread = carrier; 1215 } 1216 1217 // -- JVM TI support -- 1218 1219 @IntrinsicCandidate 1220 @JvmtiMountTransition 1221 private native void notifyJvmtiStart(); 1222 1223 @IntrinsicCandidate 1224 @JvmtiMountTransition 1225 private native void notifyJvmtiEnd(); 1226 1227 @IntrinsicCandidate 1228 @JvmtiMountTransition 1229 private native void notifyJvmtiMount(boolean hide); 1230 1231 @IntrinsicCandidate 1232 @JvmtiMountTransition 1233 private native void notifyJvmtiUnmount(boolean hide); 1234 1235 @IntrinsicCandidate 1236 @JvmtiMountTransition 1237 private native void notifyJvmtiHideFrames(boolean hide); 1238 1239 private static native void registerNatives(); 1240 static { 1241 registerNatives(); 1242 } 1243 1244 /** 1245 * Creates the default scheduler. 1246 */ 1247 @SuppressWarnings("removal") 1248 private static ForkJoinPool createDefaultScheduler() { 1249 ForkJoinWorkerThreadFactory factory = pool -> { 1250 PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread(pool); 1251 return AccessController.doPrivileged(pa); 1252 }; 1253 PrivilegedAction<ForkJoinPool> pa = () -> { 1254 int parallelism, maxPoolSize, minRunnable; 1255 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism"); 1256 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize"); 1257 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable"); 1258 if (parallelismValue != null) { 1259 parallelism = Integer.max(Integer.parseInt(parallelismValue), 1); 1260 } else { 1261 parallelism = Runtime.getRuntime().availableProcessors(); 1262 } 1263 if (maxPoolSizeValue != null) { 1264 maxPoolSize = Integer.parseInt(maxPoolSizeValue); 1265 if (maxPoolSize > 0) { 1266 parallelism = Integer.min(parallelism, maxPoolSize); 1267 } else { 1268 maxPoolSize = parallelism; // no spares 1269 } 1270 } else { 1271 maxPoolSize = Integer.max(parallelism, 256); 1272 } 1273 if (minRunnableValue != null) { 1274 minRunnable = Integer.parseInt(minRunnableValue); 1275 } else { 1276 minRunnable = Integer.max(parallelism / 2, 1); 1277 } 1278 Thread.UncaughtExceptionHandler handler = (t, e) -> { }; 1279 boolean asyncMode = true; // FIFO 1280 return new ForkJoinPool(parallelism, factory, handler, asyncMode, 1281 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS); 1282 }; 1283 return AccessController.doPrivileged(pa); 1284 } 1285 1286 /** 1287 * Invoked by the VM for the Thread.vthread_scheduler diagnostic command. 1288 */ 1289 private static byte[] printDefaultScheduler() { 1290 return String.format("%s%n", DEFAULT_SCHEDULER.toString()) 1291 .getBytes(StandardCharsets.UTF_8); 1292 } 1293 1294 /** 1295 * Creates the ScheduledThreadPoolExecutors used to execute delayed tasks. 1296 */ 1297 private static ScheduledExecutorService[] createDelayedTaskSchedulers() { 1298 String propName = "jdk.virtualThreadScheduler.timerQueues"; 1299 String propValue = GetPropertyAction.privilegedGetProperty(propName); 1300 int queueCount; 1301 if (propValue != null) { 1302 queueCount = Integer.parseInt(propValue); 1303 if (queueCount != Integer.highestOneBit(queueCount)) { 1304 throw new RuntimeException("Value of " + propName + " must be power of 2"); 1305 } 1306 } else { 1307 int ncpus = Runtime.getRuntime().availableProcessors(); 1308 queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1); 1309 } 1310 var schedulers = new ScheduledExecutorService[queueCount]; 1311 for (int i = 0; i < queueCount; i++) { 1312 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) 1313 Executors.newScheduledThreadPool(1, task -> { 1314 Thread t = InnocuousThread.newThread("VirtualThread-unparker", task); 1315 t.setDaemon(true); 1316 return t; 1317 }); 1318 stpe.setRemoveOnCancelPolicy(true); 1319 schedulers[i] = stpe; 1320 } 1321 return schedulers; 1322 } 1323 1324 /** 1325 * Reads the value of the jdk.tracePinnedThreads property to determine if stack 1326 * traces should be printed when a carrier thread is pinned when a virtual thread 1327 * attempts to park. 1328 */ 1329 private static int tracePinningMode() { 1330 String propValue = GetPropertyAction.privilegedGetProperty("jdk.tracePinnedThreads"); 1331 if (propValue != null) { 1332 if (propValue.length() == 0 || "full".equalsIgnoreCase(propValue)) 1333 return 1; 1334 if ("short".equalsIgnoreCase(propValue)) 1335 return 2; 1336 } 1337 return 0; 1338 } 1339 1340 /** 1341 * Unblock virtual threads that are ready to be scheduled again. 1342 */ 1343 private static void processPendingList() { 1344 // TBD invoke unblock 1345 } 1346 1347 static { 1348 var unblocker = InnocuousThread.newThread("VirtualThread-unblocker", 1349 VirtualThread::processPendingList); 1350 unblocker.setDaemon(true); 1351 unblocker.start(); 1352 } 1353 }