1 /* 2 * Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.lang; 26 27 import java.lang.invoke.MethodHandles; 28 import java.lang.invoke.VarHandle; 29 import java.lang.reflect.Constructor; 30 import java.util.Locale; 31 import java.util.Objects; 32 import java.util.concurrent.CountDownLatch; 33 import java.util.concurrent.Executor; 34 import java.util.concurrent.Executors; 35 import java.util.concurrent.ForkJoinPool; 36 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory; 37 import java.util.concurrent.ForkJoinTask; 38 import java.util.concurrent.Future; 39 import java.util.concurrent.RejectedExecutionException; 40 import java.util.concurrent.ScheduledExecutorService; 41 import java.util.concurrent.ScheduledThreadPoolExecutor; 42 import java.util.concurrent.TimeUnit; 43 import jdk.internal.event.VirtualThreadEndEvent; 44 import jdk.internal.event.VirtualThreadStartEvent; 45 import jdk.internal.event.VirtualThreadSubmitFailedEvent; 46 import jdk.internal.invoke.MhUtil; 47 import jdk.internal.misc.CarrierThread; 48 import jdk.internal.misc.InnocuousThread; 49 import jdk.internal.misc.Unsafe; 50 import jdk.internal.vm.Continuation; 51 import jdk.internal.vm.ContinuationScope; 52 import jdk.internal.vm.StackableScope; 53 import jdk.internal.vm.ThreadContainer; 54 import jdk.internal.vm.ThreadContainers; 55 import jdk.internal.vm.annotation.ChangesCurrentThread; 56 import jdk.internal.vm.annotation.Hidden; 57 import jdk.internal.vm.annotation.IntrinsicCandidate; 58 import jdk.internal.vm.annotation.JvmtiHideEvents; 59 import jdk.internal.vm.annotation.JvmtiMountTransition; 60 import jdk.internal.vm.annotation.ReservedStackAccess; 61 import sun.nio.ch.Interruptible; 62 import static java.util.concurrent.TimeUnit.*; 63 64 /** 65 * A thread that is scheduled by the Java virtual machine rather than the operating system. 66 */ 67 final class VirtualThread extends BaseVirtualThread { 68 private static final Unsafe U = Unsafe.getUnsafe(); 69 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads"); 70 71 private static final Executor DEFAULT_SCHEDULER; 72 private static final boolean USE_CUSTOM_RUNNER; 73 static { 74 // experimental 75 String propValue = System.getProperty("jdk.virtualThreadScheduler.implClass"); 76 if (propValue != null) { 77 DEFAULT_SCHEDULER = createCustomDefaultScheduler(propValue); 78 USE_CUSTOM_RUNNER = true; 79 } else { 80 DEFAULT_SCHEDULER = createDefaultForkJoinPoolScheduler(); 81 USE_CUSTOM_RUNNER = false; 82 } 83 } 84 85 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state"); 86 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit"); 87 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread"); 88 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination"); 89 private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList"); 90 91 // scheduler and continuation 92 private final Executor scheduler; 93 private final Continuation cont; 94 private final Runnable runContinuation; 95 96 // virtual thread state, accessed by VM 97 private volatile int state; 98 99 /* 100 * Virtual thread state transitions: 101 * 102 * NEW -> STARTED // Thread.start, schedule to run 103 * STARTED -> TERMINATED // failed to start 104 * STARTED -> RUNNING // first run 105 * RUNNING -> TERMINATED // done 106 * 107 * RUNNING -> PARKING // Thread parking with LockSupport.park 108 * PARKING -> PARKED // cont.yield successful, parked indefinitely 109 * PARKING -> PINNED // cont.yield failed, parked indefinitely on carrier 110 * PARKED -> UNPARKED // unparked, may be scheduled to continue 111 * PINNED -> RUNNING // unparked, continue execution on same carrier 112 * UNPARKED -> RUNNING // continue execution after park 113 * 114 * RUNNING -> TIMED_PARKING // Thread parking with LockSupport.parkNanos 115 * TIMED_PARKING -> TIMED_PARKED // cont.yield successful, timed-parked 116 * TIMED_PARKING -> TIMED_PINNED // cont.yield failed, timed-parked on carrier 117 * TIMED_PARKED -> UNPARKED // unparked, may be scheduled to continue 118 * TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier 119 * 120 * RUNNING -> BLOCKING // blocking on monitor enter 121 * BLOCKING -> BLOCKED // blocked on monitor enter 122 * BLOCKED -> UNBLOCKED // unblocked, may be scheduled to continue 123 * UNBLOCKED -> RUNNING // continue execution after blocked on monitor enter 124 * 125 * RUNNING -> WAITING // transitional state during wait on monitor 126 * WAITING -> WAIT // waiting on monitor 127 * WAIT -> BLOCKED // notified, waiting to be unblocked by monitor owner 128 * WAIT -> UNBLOCKED // timed-out/interrupted 129 * 130 * RUNNING -> TIMED_WAITING // transition state during timed-waiting on monitor 131 * TIMED_WAITING -> TIMED_WAIT // timed-waiting on monitor 132 * TIMED_WAIT -> BLOCKED // notified, waiting to be unblocked by monitor owner 133 * TIMED_WAIT -> UNBLOCKED // timed-out/interrupted 134 * 135 * RUNNING -> YIELDING // Thread.yield 136 * YIELDING -> YIELDED // cont.yield successful, may be scheduled to continue 137 * YIELDING -> RUNNING // cont.yield failed 138 * YIELDED -> RUNNING // continue execution after Thread.yield 139 */ 140 private static final int NEW = 0; 141 private static final int STARTED = 1; 142 private static final int RUNNING = 2; // runnable-mounted 143 144 // untimed and timed parking 145 private static final int PARKING = 3; 146 private static final int PARKED = 4; // unmounted 147 private static final int PINNED = 5; // mounted 148 private static final int TIMED_PARKING = 6; 149 private static final int TIMED_PARKED = 7; // unmounted 150 private static final int TIMED_PINNED = 8; // mounted 151 private static final int UNPARKED = 9; // unmounted but runnable 152 153 // Thread.yield 154 private static final int YIELDING = 10; 155 private static final int YIELDED = 11; // unmounted but runnable 156 157 // monitor enter 158 private static final int BLOCKING = 12; 159 private static final int BLOCKED = 13; // unmounted 160 private static final int UNBLOCKED = 14; // unmounted but runnable 161 162 // monitor wait/timed-wait 163 private static final int WAITING = 15; 164 private static final int WAIT = 16; // waiting in Object.wait 165 private static final int TIMED_WAITING = 17; 166 private static final int TIMED_WAIT = 18; // waiting in timed-Object.wait 167 168 private static final int TERMINATED = 99; // final state 169 170 // can be suspended from scheduling when unmounted 171 private static final int SUSPENDED = 1 << 8; 172 173 // parking permit made available by LockSupport.unpark 174 private volatile boolean parkPermit; 175 176 // blocking permit made available by unblocker thread when another thread exits monitor 177 private volatile boolean blockPermit; 178 179 // true when on the list of virtual threads waiting to be unblocked 180 private volatile boolean onWaitingList; 181 182 // next virtual thread on the list of virtual threads waiting to be unblocked 183 private volatile VirtualThread next; 184 185 // notified by Object.notify/notifyAll while waiting in Object.wait 186 private volatile boolean notified; 187 188 // true when waiting in Object.wait, false for VM internal uninterruptible Object.wait 189 private volatile boolean interruptableWait; 190 191 // timed-wait support 192 private byte timedWaitSeqNo; 193 194 // timeout for timed-park and timed-wait, only accessed on current/carrier thread 195 private long timeout; 196 197 // timer task for timed-park and timed-wait, only accessed on current/carrier thread 198 private Future<?> timeoutTask; 199 200 // carrier thread when mounted, accessed by VM 201 private volatile Thread carrierThread; 202 203 // termination object when joining, created lazily if needed 204 private volatile CountDownLatch termination; 205 206 /** 207 * Returns the default scheduler. 208 */ 209 static Executor defaultScheduler() { 210 return DEFAULT_SCHEDULER; 211 } 212 213 /** 214 * Returns the continuation scope used for virtual threads. 215 */ 216 static ContinuationScope continuationScope() { 217 return VTHREAD_SCOPE; 218 } 219 220 /** 221 * Return the scheduler for this thread. 222 */ 223 Executor scheduler() { 224 return scheduler; 225 } 226 227 /** 228 * Creates a new {@code VirtualThread} to run the given task with the given scheduler. 229 * 230 * @param scheduler the scheduler or null for default scheduler 231 * @param name thread name 232 * @param characteristics characteristics 233 * @param task the task to execute 234 */ 235 VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) { 236 super(name, characteristics, /*bound*/ false); 237 Objects.requireNonNull(task); 238 239 // use default scheduler if not provided 240 if (scheduler == null) { 241 scheduler = DEFAULT_SCHEDULER; 242 } 243 244 this.scheduler = scheduler; 245 this.cont = new VThreadContinuation(this, task); 246 if (USE_CUSTOM_RUNNER || (scheduler != DEFAULT_SCHEDULER)) { 247 this.runContinuation = new CustomRunner(this); 248 } else { 249 this.runContinuation = this::runContinuation; 250 } 251 } 252 253 /** 254 * The continuation that a virtual thread executes. 255 */ 256 private static class VThreadContinuation extends Continuation { 257 VThreadContinuation(VirtualThread vthread, Runnable task) { 258 super(VTHREAD_SCOPE, wrap(vthread, task)); 259 } 260 @Override 261 protected void onPinned(Continuation.Pinned reason) { 262 } 263 private static Runnable wrap(VirtualThread vthread, Runnable task) { 264 return new Runnable() { 265 @Hidden 266 @JvmtiHideEvents 267 public void run() { 268 vthread.notifyJvmtiStart(); // notify JVMTI 269 try { 270 vthread.run(task); 271 } finally { 272 vthread.notifyJvmtiEnd(); // notify JVMTI 273 } 274 } 275 }; 276 } 277 } 278 279 /** 280 * The task to execute when using a custom scheduler. 281 */ 282 private static class CustomRunner implements VirtualThreadTask { 283 private static final VarHandle ATTACHMENT = 284 MhUtil.findVarHandle(MethodHandles.lookup(), "attachment", Object.class); 285 private final VirtualThread vthread; 286 private volatile Object attachment; 287 CustomRunner(VirtualThread vthread) { 288 this.vthread = vthread; 289 } 290 @Override 291 public void run() { 292 vthread.runContinuation(); 293 } 294 @Override 295 public Thread thread() { 296 return vthread; 297 } 298 @Override 299 public Object attach(Object ob) { 300 return ATTACHMENT.getAndSet(this, ob); 301 } 302 @Override 303 public Object attachment() { 304 return attachment; 305 } 306 @Override 307 public String toString() { 308 return vthread.toString(); 309 } 310 } 311 312 /** 313 * Returns the object attached to the virtual thread's task. 314 */ 315 Object currentTaskAttachment() { 316 assert Thread.currentThread() == this; 317 if (runContinuation instanceof CustomRunner runner) { 318 return runner.attachment(); 319 } else { 320 return null; 321 } 322 } 323 324 /** 325 * Runs or continues execution on the current thread. The virtual thread is mounted 326 * on the current thread before the task runs or continues. It unmounts when the 327 * task completes or yields. 328 */ 329 @ChangesCurrentThread // allow mount/unmount to be inlined 330 private void runContinuation() { 331 // the carrier must be a platform thread 332 if (Thread.currentThread().isVirtual()) { 333 throw new WrongThreadException(); 334 } 335 336 // set state to RUNNING 337 int initialState = state(); 338 if (initialState == STARTED || initialState == UNPARKED 339 || initialState == UNBLOCKED || initialState == YIELDED) { 340 // newly started or continue after parking/blocking/Thread.yield 341 if (!compareAndSetState(initialState, RUNNING)) { 342 return; 343 } 344 // consume permit when continuing after parking or blocking. If continue 345 // after a timed-park or timed-wait then the timeout task is cancelled. 346 if (initialState == UNPARKED) { 347 cancelTimeoutTask(); 348 setParkPermit(false); 349 } else if (initialState == UNBLOCKED) { 350 cancelTimeoutTask(); 351 blockPermit = false; 352 } 353 } else { 354 // not runnable 355 return; 356 } 357 358 mount(); 359 try { 360 cont.run(); 361 } finally { 362 unmount(); 363 if (cont.isDone()) { 364 afterDone(); 365 } else { 366 afterYield(); 367 } 368 } 369 } 370 371 /** 372 * Cancel timeout task when continuing after timed-park or timed-wait. 373 * The timeout task may be executing, or may have already completed. 374 */ 375 private void cancelTimeoutTask() { 376 if (timeoutTask != null) { 377 timeoutTask.cancel(false); 378 timeoutTask = null; 379 } 380 } 381 382 /** 383 * Submits the given task to the given executor. If the scheduler is a 384 * ForkJoinPool then the task is first adapted to a ForkJoinTask. 385 */ 386 private void submit(Executor executor, Runnable task) { 387 if (executor instanceof ForkJoinPool pool) { 388 pool.submit(ForkJoinTask.adapt(task)); 389 } else { 390 executor.execute(task); 391 } 392 } 393 394 /** 395 * Submits the runContinuation task to the scheduler. For the default scheduler, 396 * and calling it on a worker thread, the task will be pushed to the local queue, 397 * otherwise it will be pushed to an external submission queue. 398 * @param scheduler the scheduler 399 * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown 400 * @throws RejectedExecutionException 401 */ 402 private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) { 403 boolean done = false; 404 while (!done) { 405 try { 406 // Pin the continuation to prevent the virtual thread from unmounting 407 // when submitting a task. For the default scheduler this ensures that 408 // the carrier doesn't change when pushing a task. For other schedulers 409 // it avoids deadlock that could arise due to carriers and virtual 410 // threads contending for a lock. 411 if (currentThread().isVirtual()) { 412 Continuation.pin(); 413 try { 414 submit(scheduler, runContinuation); 415 } finally { 416 Continuation.unpin(); 417 } 418 } else { 419 submit(scheduler, runContinuation); 420 } 421 done = true; 422 } catch (RejectedExecutionException ree) { 423 submitFailed(ree); 424 throw ree; 425 } catch (OutOfMemoryError e) { 426 if (retryOnOOME) { 427 U.park(false, 100_000_000); // 100ms 428 } else { 429 throw e; 430 } 431 } 432 } 433 } 434 435 /** 436 * Submits the runContinuation task to the given scheduler as an external submit. 437 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 438 * @throws RejectedExecutionException 439 * @see ForkJoinPool#externalSubmit(ForkJoinTask) 440 */ 441 private void externalSubmitRunContinuation(ForkJoinPool pool) { 442 assert Thread.currentThread() instanceof CarrierThread; 443 try { 444 pool.externalSubmit(ForkJoinTask.adapt(runContinuation)); 445 } catch (RejectedExecutionException ree) { 446 submitFailed(ree); 447 throw ree; 448 } catch (OutOfMemoryError e) { 449 submitRunContinuation(pool, true); 450 } 451 } 452 453 /** 454 * Submits the runContinuation task to the scheduler. For the default scheduler, 455 * and calling it on a worker thread, the task will be pushed to the local queue, 456 * otherwise it will be pushed to an external submission queue. 457 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 458 * @throws RejectedExecutionException 459 */ 460 private void submitRunContinuation() { 461 submitRunContinuation(scheduler, true); 462 } 463 464 /** 465 * Lazy submit the runContinuation task if invoked on a carrier thread and its local 466 * queue is empty. If not empty, or invoked by another thread, then this method works 467 * like submitRunContinuation and just submits the task to the scheduler. 468 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 469 * @throws RejectedExecutionException 470 * @see ForkJoinPool#lazySubmit(ForkJoinTask) 471 */ 472 private void lazySubmitRunContinuation() { 473 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 474 ForkJoinPool pool = ct.getPool(); 475 try { 476 pool.lazySubmit(ForkJoinTask.adapt(runContinuation)); 477 } catch (RejectedExecutionException ree) { 478 submitFailed(ree); 479 throw ree; 480 } catch (OutOfMemoryError e) { 481 submitRunContinuation(); 482 } 483 } else { 484 submitRunContinuation(); 485 } 486 } 487 488 /** 489 * Submits the runContinuation task to the scheduler. For the default scheduler, and 490 * calling it a virtual thread that uses the default scheduler, the task will be 491 * pushed to an external submission queue. This method may throw OutOfMemoryError. 492 * @throws RejectedExecutionException 493 * @throws OutOfMemoryError 494 */ 495 private void externalSubmitRunContinuationOrThrow() { 496 if (scheduler == DEFAULT_SCHEDULER && currentCarrierThread() instanceof CarrierThread ct) { 497 try { 498 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation)); 499 } catch (RejectedExecutionException ree) { 500 submitFailed(ree); 501 throw ree; 502 } 503 } else { 504 submitRunContinuation(scheduler, false); 505 } 506 } 507 508 /** 509 * If enabled, emits a JFR VirtualThreadSubmitFailedEvent. 510 */ 511 private void submitFailed(RejectedExecutionException ree) { 512 var event = new VirtualThreadSubmitFailedEvent(); 513 if (event.isEnabled()) { 514 event.javaThreadId = threadId(); 515 event.exceptionMessage = ree.getMessage(); 516 event.commit(); 517 } 518 } 519 520 /** 521 * Runs a task in the context of this virtual thread. 522 */ 523 private void run(Runnable task) { 524 assert Thread.currentThread() == this && state == RUNNING; 525 526 // emit JFR event if enabled 527 if (VirtualThreadStartEvent.isTurnedOn()) { 528 var event = new VirtualThreadStartEvent(); 529 event.javaThreadId = threadId(); 530 event.commit(); 531 } 532 533 Object bindings = Thread.scopedValueBindings(); 534 try { 535 runWith(bindings, task); 536 } catch (Throwable exc) { 537 dispatchUncaughtException(exc); 538 } finally { 539 // pop any remaining scopes from the stack, this may block 540 StackableScope.popAll(); 541 542 // emit JFR event if enabled 543 if (VirtualThreadEndEvent.isTurnedOn()) { 544 var event = new VirtualThreadEndEvent(); 545 event.javaThreadId = threadId(); 546 event.commit(); 547 } 548 } 549 } 550 551 /** 552 * Mounts this virtual thread onto the current platform thread. On 553 * return, the current thread is the virtual thread. 554 */ 555 @ChangesCurrentThread 556 @ReservedStackAccess 557 private void mount() { 558 // notify JVMTI before mount 559 notifyJvmtiMount(/*hide*/true); 560 561 // sets the carrier thread 562 Thread carrier = Thread.currentCarrierThread(); 563 setCarrierThread(carrier); 564 565 // sync up carrier thread interrupt status if needed 566 if (interrupted) { 567 carrier.setInterrupt(); 568 } else if (carrier.isInterrupted()) { 569 synchronized (interruptLock) { 570 // need to recheck interrupt status 571 if (!interrupted) { 572 carrier.clearInterrupt(); 573 } 574 } 575 } 576 577 // set Thread.currentThread() to return this virtual thread 578 carrier.setCurrentThread(this); 579 } 580 581 /** 582 * Unmounts this virtual thread from the carrier. On return, the 583 * current thread is the current platform thread. 584 */ 585 @ChangesCurrentThread 586 @ReservedStackAccess 587 private void unmount() { 588 assert !Thread.holdsLock(interruptLock); 589 590 // set Thread.currentThread() to return the platform thread 591 Thread carrier = this.carrierThread; 592 carrier.setCurrentThread(carrier); 593 594 // break connection to carrier thread, synchronized with interrupt 595 synchronized (interruptLock) { 596 setCarrierThread(null); 597 } 598 carrier.clearInterrupt(); 599 600 // notify JVMTI after unmount 601 notifyJvmtiUnmount(/*hide*/false); 602 } 603 604 /** 605 * Invokes Continuation.yield, notifying JVMTI (if enabled) to hide frames until 606 * the continuation continues. 607 */ 608 @Hidden 609 private boolean yieldContinuation() { 610 notifyJvmtiUnmount(/*hide*/true); 611 try { 612 return Continuation.yield(VTHREAD_SCOPE); 613 } finally { 614 notifyJvmtiMount(/*hide*/false); 615 } 616 } 617 618 /** 619 * Invoked in the context of the carrier thread after the Continuation yields when 620 * parking, blocking on monitor enter, Object.wait, or Thread.yield. 621 */ 622 private void afterYield() { 623 assert carrierThread == null; 624 625 // re-adjust parallelism if the virtual thread yielded when compensating 626 if (currentThread() instanceof CarrierThread ct) { 627 ct.endBlocking(); 628 } 629 630 int s = state(); 631 632 // LockSupport.park/parkNanos 633 if (s == PARKING || s == TIMED_PARKING) { 634 int newState; 635 if (s == PARKING) { 636 setState(newState = PARKED); 637 } else { 638 // schedule unpark 639 long timeout = this.timeout; 640 assert timeout > 0; 641 timeoutTask = schedule(this::parkTimeoutExpired, timeout, NANOSECONDS); 642 setState(newState = TIMED_PARKED); 643 } 644 645 // may have been unparked while parking 646 if (parkPermit && compareAndSetState(newState, UNPARKED)) { 647 // lazy submit if local queue is empty 648 lazySubmitRunContinuation(); 649 } 650 return; 651 } 652 653 // Thread.yield 654 if (s == YIELDING) { 655 setState(YIELDED); 656 657 // external submit if there are no tasks in the local task queue 658 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 659 externalSubmitRunContinuation(ct.getPool()); 660 } else { 661 submitRunContinuation(); 662 } 663 return; 664 } 665 666 // blocking on monitorenter 667 if (s == BLOCKING) { 668 setState(BLOCKED); 669 670 // may have been unblocked while blocking 671 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) { 672 // lazy submit if local queue is empty 673 lazySubmitRunContinuation(); 674 } 675 return; 676 } 677 678 // Object.wait 679 if (s == WAITING || s == TIMED_WAITING) { 680 int newState; 681 boolean interruptable = interruptableWait; 682 if (s == WAITING) { 683 setState(newState = WAIT); 684 } else { 685 // For timed-wait, a timeout task is scheduled to execute. The timeout 686 // task will change the thread state to UNBLOCKED and submit the thread 687 // to the scheduler. A sequence number is used to ensure that the timeout 688 // task only unblocks the thread for this timed-wait. We synchronize with 689 // the timeout task to coordinate access to the sequence number and to 690 // ensure the timeout task doesn't execute until the thread has got to 691 // the TIMED_WAIT state. 692 long timeout = this.timeout; 693 assert timeout > 0; 694 synchronized (timedWaitLock()) { 695 byte seqNo = ++timedWaitSeqNo; 696 timeoutTask = schedule(() -> waitTimeoutExpired(seqNo), timeout, MILLISECONDS); 697 setState(newState = TIMED_WAIT); 698 } 699 } 700 701 // may have been notified while in transition to wait state 702 if (notified && compareAndSetState(newState, BLOCKED)) { 703 // may have even been unblocked already 704 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) { 705 submitRunContinuation(); 706 } 707 return; 708 } 709 710 // may have been interrupted while in transition to wait state 711 if (interruptable && interrupted && compareAndSetState(newState, UNBLOCKED)) { 712 submitRunContinuation(); 713 return; 714 } 715 return; 716 } 717 718 assert false; 719 } 720 721 /** 722 * Invoked after the continuation completes. 723 */ 724 private void afterDone() { 725 afterDone(true); 726 } 727 728 /** 729 * Invoked after the continuation completes (or start failed). Sets the thread 730 * state to TERMINATED and notifies anyone waiting for the thread to terminate. 731 * 732 * @param notifyContainer true if its container should be notified 733 */ 734 private void afterDone(boolean notifyContainer) { 735 assert carrierThread == null; 736 setState(TERMINATED); 737 738 // notify anyone waiting for this virtual thread to terminate 739 CountDownLatch termination = this.termination; 740 if (termination != null) { 741 assert termination.getCount() == 1; 742 termination.countDown(); 743 } 744 745 // notify container 746 if (notifyContainer) { 747 threadContainer().remove(this); 748 } 749 750 // clear references to thread locals 751 clearReferences(); 752 } 753 754 /** 755 * Schedules this {@code VirtualThread} to execute. 756 * 757 * @throws IllegalStateException if the container is shutdown or closed 758 * @throws IllegalThreadStateException if the thread has already been started 759 * @throws RejectedExecutionException if the scheduler cannot accept a task 760 */ 761 @Override 762 void start(ThreadContainer container) { 763 if (!compareAndSetState(NEW, STARTED)) { 764 throw new IllegalThreadStateException("Already started"); 765 } 766 767 // bind thread to container 768 assert threadContainer() == null; 769 setThreadContainer(container); 770 771 // start thread 772 boolean addedToContainer = false; 773 boolean started = false; 774 try { 775 container.add(this); // may throw 776 addedToContainer = true; 777 778 // scoped values may be inherited 779 inheritScopedValueBindings(container); 780 781 // submit task to run thread, using externalSubmit if possible 782 externalSubmitRunContinuationOrThrow(); 783 started = true; 784 } finally { 785 if (!started) { 786 afterDone(addedToContainer); 787 } 788 } 789 } 790 791 @Override 792 public void start() { 793 start(ThreadContainers.root()); 794 } 795 796 @Override 797 public void run() { 798 // do nothing 799 } 800 801 /** 802 * Parks until unparked or interrupted. If already unparked then the parking 803 * permit is consumed and this method completes immediately (meaning it doesn't 804 * yield). It also completes immediately if the interrupt status is set. 805 */ 806 @Override 807 void park() { 808 assert Thread.currentThread() == this; 809 810 // complete immediately if parking permit available or interrupted 811 if (getAndSetParkPermit(false) || interrupted) 812 return; 813 814 // park the thread 815 boolean yielded = false; 816 setState(PARKING); 817 try { 818 yielded = yieldContinuation(); 819 } catch (OutOfMemoryError e) { 820 // park on carrier 821 } finally { 822 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 823 if (!yielded) { 824 assert state() == PARKING; 825 setState(RUNNING); 826 } 827 } 828 829 // park on the carrier thread when pinned 830 if (!yielded) { 831 parkOnCarrierThread(false, 0); 832 } 833 } 834 835 /** 836 * Parks up to the given waiting time or until unparked or interrupted. 837 * If already unparked then the parking permit is consumed and this method 838 * completes immediately (meaning it doesn't yield). It also completes immediately 839 * if the interrupt status is set or the waiting time is {@code <= 0}. 840 * 841 * @param nanos the maximum number of nanoseconds to wait. 842 */ 843 @Override 844 void parkNanos(long nanos) { 845 assert Thread.currentThread() == this; 846 847 // complete immediately if parking permit available or interrupted 848 if (getAndSetParkPermit(false) || interrupted) 849 return; 850 851 // park the thread for the waiting time 852 if (nanos > 0) { 853 long startTime = System.nanoTime(); 854 855 // park the thread, afterYield will schedule the thread to unpark 856 boolean yielded = false; 857 timeout = nanos; 858 setState(TIMED_PARKING); 859 try { 860 yielded = yieldContinuation(); 861 } catch (OutOfMemoryError e) { 862 // park on carrier 863 } finally { 864 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 865 if (!yielded) { 866 assert state() == TIMED_PARKING; 867 setState(RUNNING); 868 } 869 } 870 871 // park on carrier thread for remaining time when pinned (or OOME) 872 if (!yielded) { 873 long remainingNanos = nanos - (System.nanoTime() - startTime); 874 parkOnCarrierThread(true, remainingNanos); 875 } 876 } 877 } 878 879 /** 880 * Parks the current carrier thread up to the given waiting time or until 881 * unparked or interrupted. If the virtual thread is interrupted then the 882 * interrupt status will be propagated to the carrier thread. 883 * @param timed true for a timed park, false for untimed 884 * @param nanos the waiting time in nanoseconds 885 */ 886 private void parkOnCarrierThread(boolean timed, long nanos) { 887 assert state() == RUNNING; 888 889 setState(timed ? TIMED_PINNED : PINNED); 890 try { 891 if (!parkPermit) { 892 if (!timed) { 893 U.park(false, 0); 894 } else if (nanos > 0) { 895 U.park(false, nanos); 896 } 897 } 898 } finally { 899 setState(RUNNING); 900 } 901 902 // consume parking permit 903 setParkPermit(false); 904 905 // JFR jdk.VirtualThreadPinned event 906 postPinnedEvent("LockSupport.park"); 907 } 908 909 /** 910 * Call into VM when pinned to record a JFR jdk.VirtualThreadPinned event. 911 * Recording the event in the VM avoids having JFR event recorded in Java 912 * with the same name, but different ID, to events recorded by the VM. 913 */ 914 @Hidden 915 private static native void postPinnedEvent(String op); 916 917 /** 918 * Re-enables this virtual thread for scheduling. If this virtual thread is parked 919 * then its task is scheduled to continue, otherwise its next call to {@code park} or 920 * {@linkplain #parkNanos(long) parkNanos} is guaranteed not to block. 921 * @throws RejectedExecutionException if the scheduler cannot accept a task 922 */ 923 @Override 924 void unpark() { 925 if (!getAndSetParkPermit(true) && currentThread() != this) { 926 int s = state(); 927 928 // unparked while parked 929 if ((s == PARKED || s == TIMED_PARKED) && compareAndSetState(s, UNPARKED)) { 930 submitRunContinuation(); 931 return; 932 } 933 934 // unparked while parked when pinned 935 if (s == PINNED || s == TIMED_PINNED) { 936 // unpark carrier thread when pinned 937 disableSuspendAndPreempt(); 938 try { 939 synchronized (carrierThreadAccessLock()) { 940 Thread carrier = carrierThread; 941 if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) { 942 U.unpark(carrier); 943 } 944 } 945 } finally { 946 enableSuspendAndPreempt(); 947 } 948 return; 949 } 950 } 951 } 952 953 /** 954 * Invoked by unblocker thread to unblock this virtual thread. 955 */ 956 private void unblock() { 957 assert !Thread.currentThread().isVirtual(); 958 blockPermit = true; 959 if (state() == BLOCKED && compareAndSetState(BLOCKED, UNBLOCKED)) { 960 submitRunContinuation(); 961 } 962 } 963 964 /** 965 * Invoked by FJP worker thread or STPE thread when park timeout expires. 966 */ 967 private void parkTimeoutExpired() { 968 assert !VirtualThread.currentThread().isVirtual(); 969 if (!getAndSetParkPermit(true) 970 && (state() == TIMED_PARKED) 971 && compareAndSetState(TIMED_PARKED, UNPARKED)) { 972 lazySubmitRunContinuation(); 973 } 974 } 975 976 /** 977 * Invoked by FJP worker thread or STPE thread when wait timeout expires. 978 * If the virtual thread is in timed-wait then this method will unblock the thread 979 * and submit its task so that it continues and attempts to reenter the monitor. 980 * This method does nothing if the thread has been woken by notify or interrupt. 981 */ 982 private void waitTimeoutExpired(byte seqNo) { 983 assert !Thread.currentThread().isVirtual(); 984 for (;;) { 985 boolean unblocked = false; 986 synchronized (timedWaitLock()) { 987 if (seqNo != timedWaitSeqNo) { 988 // this timeout task is for a past timed-wait 989 return; 990 } 991 int s = state(); 992 if (s == TIMED_WAIT) { 993 unblocked = compareAndSetState(TIMED_WAIT, UNBLOCKED); 994 } else if (s != (TIMED_WAIT | SUSPENDED)) { 995 // notified or interrupted, no longer waiting 996 return; 997 } 998 } 999 if (unblocked) { 1000 lazySubmitRunContinuation(); 1001 return; 1002 } 1003 // need to retry when thread is suspended in time-wait 1004 Thread.yield(); 1005 } 1006 } 1007 1008 /** 1009 * Attempts to yield the current virtual thread (Thread.yield). 1010 */ 1011 void tryYield() { 1012 assert Thread.currentThread() == this; 1013 setState(YIELDING); 1014 boolean yielded = false; 1015 try { 1016 yielded = yieldContinuation(); // may throw 1017 } finally { 1018 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 1019 if (!yielded) { 1020 assert state() == YIELDING; 1021 setState(RUNNING); 1022 } 1023 } 1024 } 1025 1026 /** 1027 * Sleep the current thread for the given sleep time (in nanoseconds). If 1028 * nanos is 0 then the thread will attempt to yield. 1029 * 1030 * @implNote This implementation parks the thread for the given sleeping time 1031 * and will therefore be observed in PARKED state during the sleep. Parking 1032 * will consume the parking permit so this method makes available the parking 1033 * permit after the sleep. This may be observed as a spurious, but benign, 1034 * wakeup when the thread subsequently attempts to park. 1035 * 1036 * @param nanos the maximum number of nanoseconds to sleep 1037 * @throws InterruptedException if interrupted while sleeping 1038 */ 1039 void sleepNanos(long nanos) throws InterruptedException { 1040 assert Thread.currentThread() == this && nanos >= 0; 1041 if (getAndClearInterrupt()) 1042 throw new InterruptedException(); 1043 if (nanos == 0) { 1044 tryYield(); 1045 } else { 1046 // park for the sleep time 1047 try { 1048 long remainingNanos = nanos; 1049 long startNanos = System.nanoTime(); 1050 while (remainingNanos > 0) { 1051 parkNanos(remainingNanos); 1052 if (getAndClearInterrupt()) { 1053 throw new InterruptedException(); 1054 } 1055 remainingNanos = nanos - (System.nanoTime() - startNanos); 1056 } 1057 } finally { 1058 // may have been unparked while sleeping 1059 setParkPermit(true); 1060 } 1061 } 1062 } 1063 1064 /** 1065 * Waits up to {@code nanos} nanoseconds for this virtual thread to terminate. 1066 * A timeout of {@code 0} means to wait forever. 1067 * 1068 * @throws InterruptedException if interrupted while waiting 1069 * @return true if the thread has terminated 1070 */ 1071 boolean joinNanos(long nanos) throws InterruptedException { 1072 if (state() == TERMINATED) 1073 return true; 1074 1075 // ensure termination object exists, then re-check state 1076 CountDownLatch termination = getTermination(); 1077 if (state() == TERMINATED) 1078 return true; 1079 1080 // wait for virtual thread to terminate 1081 if (nanos == 0) { 1082 termination.await(); 1083 } else { 1084 boolean terminated = termination.await(nanos, NANOSECONDS); 1085 if (!terminated) { 1086 // waiting time elapsed 1087 return false; 1088 } 1089 } 1090 assert state() == TERMINATED; 1091 return true; 1092 } 1093 1094 @Override 1095 void blockedOn(Interruptible b) { 1096 disableSuspendAndPreempt(); 1097 try { 1098 super.blockedOn(b); 1099 } finally { 1100 enableSuspendAndPreempt(); 1101 } 1102 } 1103 1104 @Override 1105 public void interrupt() { 1106 if (Thread.currentThread() != this) { 1107 // if current thread is a virtual thread then prevent it from being 1108 // suspended or unmounted when entering or holding interruptLock 1109 Interruptible blocker; 1110 disableSuspendAndPreempt(); 1111 try { 1112 synchronized (interruptLock) { 1113 interrupted = true; 1114 blocker = nioBlocker(); 1115 if (blocker != null) { 1116 blocker.interrupt(this); 1117 } 1118 1119 // interrupt carrier thread if mounted 1120 Thread carrier = carrierThread; 1121 if (carrier != null) carrier.setInterrupt(); 1122 } 1123 } finally { 1124 enableSuspendAndPreempt(); 1125 } 1126 1127 // notify blocker after releasing interruptLock 1128 if (blocker != null) { 1129 blocker.postInterrupt(); 1130 } 1131 1132 // make available parking permit, unpark thread if parked 1133 unpark(); 1134 1135 // if thread is waiting in Object.wait then schedule to try to reenter 1136 int s = state(); 1137 if ((s == WAIT || s == TIMED_WAIT) && compareAndSetState(s, UNBLOCKED)) { 1138 submitRunContinuation(); 1139 } 1140 1141 } else { 1142 interrupted = true; 1143 carrierThread.setInterrupt(); 1144 setParkPermit(true); 1145 } 1146 } 1147 1148 @Override 1149 public boolean isInterrupted() { 1150 return interrupted; 1151 } 1152 1153 @Override 1154 boolean getAndClearInterrupt() { 1155 assert Thread.currentThread() == this; 1156 boolean oldValue = interrupted; 1157 if (oldValue) { 1158 disableSuspendAndPreempt(); 1159 try { 1160 synchronized (interruptLock) { 1161 interrupted = false; 1162 carrierThread.clearInterrupt(); 1163 } 1164 } finally { 1165 enableSuspendAndPreempt(); 1166 } 1167 } 1168 return oldValue; 1169 } 1170 1171 @Override 1172 Thread.State threadState() { 1173 int s = state(); 1174 switch (s & ~SUSPENDED) { 1175 case NEW: 1176 return Thread.State.NEW; 1177 case STARTED: 1178 // return NEW if thread container not yet set 1179 if (threadContainer() == null) { 1180 return Thread.State.NEW; 1181 } else { 1182 return Thread.State.RUNNABLE; 1183 } 1184 case UNPARKED: 1185 case UNBLOCKED: 1186 case YIELDED: 1187 // runnable, not mounted 1188 return Thread.State.RUNNABLE; 1189 case RUNNING: 1190 // if mounted then return state of carrier thread 1191 if (Thread.currentThread() != this) { 1192 disableSuspendAndPreempt(); 1193 try { 1194 synchronized (carrierThreadAccessLock()) { 1195 Thread carrierThread = this.carrierThread; 1196 if (carrierThread != null) { 1197 return carrierThread.threadState(); 1198 } 1199 } 1200 } finally { 1201 enableSuspendAndPreempt(); 1202 } 1203 } 1204 // runnable, mounted 1205 return Thread.State.RUNNABLE; 1206 case PARKING: 1207 case TIMED_PARKING: 1208 case WAITING: 1209 case TIMED_WAITING: 1210 case YIELDING: 1211 // runnable, in transition 1212 return Thread.State.RUNNABLE; 1213 case PARKED: 1214 case PINNED: 1215 case WAIT: 1216 return Thread.State.WAITING; 1217 case TIMED_PARKED: 1218 case TIMED_PINNED: 1219 case TIMED_WAIT: 1220 return Thread.State.TIMED_WAITING; 1221 case BLOCKING: 1222 case BLOCKED: 1223 return Thread.State.BLOCKED; 1224 case TERMINATED: 1225 return Thread.State.TERMINATED; 1226 default: 1227 throw new InternalError(); 1228 } 1229 } 1230 1231 @Override 1232 boolean alive() { 1233 int s = state; 1234 return (s != NEW && s != TERMINATED); 1235 } 1236 1237 @Override 1238 boolean isTerminated() { 1239 return (state == TERMINATED); 1240 } 1241 1242 @Override 1243 StackTraceElement[] asyncGetStackTrace() { 1244 StackTraceElement[] stackTrace; 1245 do { 1246 stackTrace = (carrierThread != null) 1247 ? super.asyncGetStackTrace() // mounted 1248 : tryGetStackTrace(); // unmounted 1249 if (stackTrace == null) { 1250 Thread.yield(); 1251 } 1252 } while (stackTrace == null); 1253 return stackTrace; 1254 } 1255 1256 /** 1257 * Returns the stack trace for this virtual thread if it is unmounted. 1258 * Returns null if the thread is mounted or in transition. 1259 */ 1260 private StackTraceElement[] tryGetStackTrace() { 1261 int initialState = state() & ~SUSPENDED; 1262 switch (initialState) { 1263 case NEW, STARTED, TERMINATED -> { 1264 return new StackTraceElement[0]; // unmounted, empty stack 1265 } 1266 case RUNNING, PINNED, TIMED_PINNED -> { 1267 return null; // mounted 1268 } 1269 case PARKED, TIMED_PARKED, BLOCKED, WAIT, TIMED_WAIT -> { 1270 // unmounted, not runnable 1271 } 1272 case UNPARKED, UNBLOCKED, YIELDED -> { 1273 // unmounted, runnable 1274 } 1275 case PARKING, TIMED_PARKING, BLOCKING, YIELDING, WAITING, TIMED_WAITING -> { 1276 return null; // in transition 1277 } 1278 default -> throw new InternalError("" + initialState); 1279 } 1280 1281 // thread is unmounted, prevent it from continuing 1282 int suspendedState = initialState | SUSPENDED; 1283 if (!compareAndSetState(initialState, suspendedState)) { 1284 return null; 1285 } 1286 1287 // get stack trace and restore state 1288 StackTraceElement[] stack; 1289 try { 1290 stack = cont.getStackTrace(); 1291 } finally { 1292 assert state == suspendedState; 1293 setState(initialState); 1294 } 1295 boolean resubmit = switch (initialState) { 1296 case UNPARKED, UNBLOCKED, YIELDED -> { 1297 // resubmit as task may have run while suspended 1298 yield true; 1299 } 1300 case PARKED, TIMED_PARKED -> { 1301 // resubmit if unparked while suspended 1302 yield parkPermit && compareAndSetState(initialState, UNPARKED); 1303 } 1304 case BLOCKED -> { 1305 // resubmit if unblocked while suspended 1306 yield blockPermit && compareAndSetState(BLOCKED, UNBLOCKED); 1307 } 1308 case WAIT, TIMED_WAIT -> { 1309 // resubmit if notified or interrupted while waiting (Object.wait) 1310 // waitTimeoutExpired will retry if the timed expired when suspended 1311 yield (notified || interrupted) && compareAndSetState(initialState, UNBLOCKED); 1312 } 1313 default -> throw new InternalError(); 1314 }; 1315 if (resubmit) { 1316 submitRunContinuation(); 1317 } 1318 return stack; 1319 } 1320 1321 @Override 1322 public String toString() { 1323 StringBuilder sb = new StringBuilder("VirtualThread[#"); 1324 sb.append(threadId()); 1325 String name = getName(); 1326 if (!name.isEmpty()) { 1327 sb.append(","); 1328 sb.append(name); 1329 } 1330 sb.append("]/"); 1331 1332 // add the carrier state and thread name when mounted 1333 boolean mounted; 1334 if (Thread.currentThread() == this) { 1335 mounted = appendCarrierInfo(sb); 1336 } else { 1337 disableSuspendAndPreempt(); 1338 try { 1339 synchronized (carrierThreadAccessLock()) { 1340 mounted = appendCarrierInfo(sb); 1341 } 1342 } finally { 1343 enableSuspendAndPreempt(); 1344 } 1345 } 1346 1347 // add virtual thread state when not mounted 1348 if (!mounted) { 1349 String stateAsString = threadState().toString(); 1350 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1351 } 1352 1353 return sb.toString(); 1354 } 1355 1356 /** 1357 * Appends the carrier state and thread name to the string buffer if mounted. 1358 * @return true if mounted, false if not mounted 1359 */ 1360 private boolean appendCarrierInfo(StringBuilder sb) { 1361 assert Thread.currentThread() == this || Thread.holdsLock(carrierThreadAccessLock()); 1362 Thread carrier = carrierThread; 1363 if (carrier != null) { 1364 String stateAsString = carrier.threadState().toString(); 1365 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1366 sb.append('@'); 1367 sb.append(carrier.getName()); 1368 return true; 1369 } else { 1370 return false; 1371 } 1372 } 1373 1374 @Override 1375 public int hashCode() { 1376 return (int) threadId(); 1377 } 1378 1379 @Override 1380 public boolean equals(Object obj) { 1381 return obj == this; 1382 } 1383 1384 /** 1385 * Returns the termination object, creating it if needed. 1386 */ 1387 private CountDownLatch getTermination() { 1388 CountDownLatch termination = this.termination; 1389 if (termination == null) { 1390 termination = new CountDownLatch(1); 1391 if (!U.compareAndSetReference(this, TERMINATION, null, termination)) { 1392 termination = this.termination; 1393 } 1394 } 1395 return termination; 1396 } 1397 1398 /** 1399 * Returns the lock object to synchronize on when accessing carrierThread. 1400 * The lock prevents carrierThread from being reset to null during unmount. 1401 */ 1402 private Object carrierThreadAccessLock() { 1403 // return interruptLock as unmount has to coordinate with interrupt 1404 return interruptLock; 1405 } 1406 1407 /** 1408 * Returns a lock object for coordinating timed-wait setup and timeout handling. 1409 */ 1410 private Object timedWaitLock() { 1411 // use this object for now to avoid the overhead of introducing another lock 1412 return runContinuation; 1413 } 1414 1415 /** 1416 * Disallow the current thread be suspended or preempted. 1417 */ 1418 private void disableSuspendAndPreempt() { 1419 notifyJvmtiDisableSuspend(true); 1420 Continuation.pin(); 1421 } 1422 1423 /** 1424 * Allow the current thread be suspended or preempted. 1425 */ 1426 private void enableSuspendAndPreempt() { 1427 Continuation.unpin(); 1428 notifyJvmtiDisableSuspend(false); 1429 } 1430 1431 // -- wrappers for get/set of state, parking permit, and carrier thread -- 1432 1433 private int state() { 1434 return state; // volatile read 1435 } 1436 1437 private void setState(int newValue) { 1438 state = newValue; // volatile write 1439 } 1440 1441 private boolean compareAndSetState(int expectedValue, int newValue) { 1442 return U.compareAndSetInt(this, STATE, expectedValue, newValue); 1443 } 1444 1445 private boolean compareAndSetOnWaitingList(boolean expectedValue, boolean newValue) { 1446 return U.compareAndSetBoolean(this, ON_WAITING_LIST, expectedValue, newValue); 1447 } 1448 1449 private void setParkPermit(boolean newValue) { 1450 if (parkPermit != newValue) { 1451 parkPermit = newValue; 1452 } 1453 } 1454 1455 private boolean getAndSetParkPermit(boolean newValue) { 1456 if (parkPermit != newValue) { 1457 return U.getAndSetBoolean(this, PARK_PERMIT, newValue); 1458 } else { 1459 return newValue; 1460 } 1461 } 1462 1463 private void setCarrierThread(Thread carrier) { 1464 // U.putReferenceRelease(this, CARRIER_THREAD, carrier); 1465 this.carrierThread = carrier; 1466 } 1467 1468 // -- JVM TI support -- 1469 1470 @IntrinsicCandidate 1471 @JvmtiMountTransition 1472 private native void notifyJvmtiStart(); 1473 1474 @IntrinsicCandidate 1475 @JvmtiMountTransition 1476 private native void notifyJvmtiEnd(); 1477 1478 @IntrinsicCandidate 1479 @JvmtiMountTransition 1480 private native void notifyJvmtiMount(boolean hide); 1481 1482 @IntrinsicCandidate 1483 @JvmtiMountTransition 1484 private native void notifyJvmtiUnmount(boolean hide); 1485 1486 @IntrinsicCandidate 1487 private static native void notifyJvmtiDisableSuspend(boolean enter); 1488 1489 private static native void registerNatives(); 1490 static { 1491 registerNatives(); 1492 1493 // ensure VTHREAD_GROUP is created, may be accessed by JVMTI 1494 var group = Thread.virtualThreadGroup(); 1495 } 1496 1497 /** 1498 * Loads a java.util.concurrent.Executor with the given class name to use at the 1499 * default scheduler. The class is public in an exported package, has a public 1500 * no-arg constructor, and is visible to the system class loader. 1501 */ 1502 private static Executor createCustomDefaultScheduler(String cn) { 1503 try { 1504 Class<?> clazz = Class.forName(cn, true, ClassLoader.getSystemClassLoader()); 1505 Constructor<?> ctor = clazz.getConstructor(); 1506 var scheduler = (Executor) ctor.newInstance(); 1507 System.err.println(""" 1508 WARNING: Using custom default scheduler, this is an experimental feature!"""); 1509 return scheduler; 1510 } catch (Exception ex) { 1511 throw new Error(ex); 1512 } 1513 } 1514 1515 /** 1516 * Creates the default ForkJoinPool scheduler. 1517 */ 1518 private static ForkJoinPool createDefaultForkJoinPoolScheduler() { 1519 ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool); 1520 int parallelism, maxPoolSize, minRunnable; 1521 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism"); 1522 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize"); 1523 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable"); 1524 if (parallelismValue != null) { 1525 parallelism = Integer.parseInt(parallelismValue); 1526 } else { 1527 parallelism = Runtime.getRuntime().availableProcessors(); 1528 } 1529 if (maxPoolSizeValue != null) { 1530 maxPoolSize = Integer.parseInt(maxPoolSizeValue); 1531 parallelism = Integer.min(parallelism, maxPoolSize); 1532 } else { 1533 maxPoolSize = Integer.max(parallelism, 256); 1534 } 1535 if (minRunnableValue != null) { 1536 minRunnable = Integer.parseInt(minRunnableValue); 1537 } else { 1538 minRunnable = Integer.max(parallelism / 2, 1); 1539 } 1540 Thread.UncaughtExceptionHandler handler = (t, e) -> { }; 1541 boolean asyncMode = true; // FIFO 1542 return new ForkJoinPool(parallelism, factory, handler, asyncMode, 1543 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS); 1544 } 1545 1546 /** 1547 * Schedule a runnable task to run after a delay. 1548 */ 1549 private Future<?> schedule(Runnable command, long delay, TimeUnit unit) { 1550 if (scheduler instanceof ForkJoinPool pool) { 1551 return pool.schedule(command, delay, unit); 1552 } else { 1553 return DelayedTaskSchedulers.schedule(command, delay, unit); 1554 } 1555 } 1556 1557 /** 1558 * Supports scheduling a runnable task to run after a delay. It uses a number 1559 * of ScheduledThreadPoolExecutor instances to reduce contention on the delayed 1560 * work queue used. This class is used when using a custom scheduler. 1561 */ 1562 private static class DelayedTaskSchedulers { 1563 private static final ScheduledExecutorService[] INSTANCE = createDelayedTaskSchedulers(); 1564 1565 static Future<?> schedule(Runnable command, long delay, TimeUnit unit) { 1566 long tid = Thread.currentThread().threadId(); 1567 int index = (int) tid & (INSTANCE.length - 1); 1568 return INSTANCE[index].schedule(command, delay, unit); 1569 } 1570 1571 private static ScheduledExecutorService[] createDelayedTaskSchedulers() { 1572 String propName = "jdk.virtualThreadScheduler.timerQueues"; 1573 String propValue = System.getProperty(propName); 1574 int queueCount; 1575 if (propValue != null) { 1576 queueCount = Integer.parseInt(propValue); 1577 if (queueCount != Integer.highestOneBit(queueCount)) { 1578 throw new RuntimeException("Value of " + propName + " must be power of 2"); 1579 } 1580 } else { 1581 int ncpus = Runtime.getRuntime().availableProcessors(); 1582 queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1); 1583 } 1584 var schedulers = new ScheduledExecutorService[queueCount]; 1585 for (int i = 0; i < queueCount; i++) { 1586 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) 1587 Executors.newScheduledThreadPool(1, task -> { 1588 Thread t = InnocuousThread.newThread("VirtualThread-unparker", task); 1589 t.setDaemon(true); 1590 return t; 1591 }); 1592 stpe.setRemoveOnCancelPolicy(true); 1593 schedulers[i] = stpe; 1594 } 1595 return schedulers; 1596 } 1597 } 1598 1599 /** 1600 * Schedule virtual threads that are ready to be scheduled after they blocked on 1601 * monitor enter. 1602 */ 1603 private static void unblockVirtualThreads() { 1604 while (true) { 1605 VirtualThread vthread = takeVirtualThreadListToUnblock(); 1606 while (vthread != null) { 1607 assert vthread.onWaitingList; 1608 VirtualThread nextThread = vthread.next; 1609 1610 // remove from list and unblock 1611 vthread.next = null; 1612 boolean changed = vthread.compareAndSetOnWaitingList(true, false); 1613 assert changed; 1614 vthread.unblock(); 1615 1616 vthread = nextThread; 1617 } 1618 } 1619 } 1620 1621 /** 1622 * Retrieves the list of virtual threads that are waiting to be unblocked, waiting 1623 * if necessary until a list of one or more threads becomes available. 1624 */ 1625 private static native VirtualThread takeVirtualThreadListToUnblock(); 1626 1627 static { 1628 var unblocker = InnocuousThread.newThread("VirtualThread-unblocker", 1629 VirtualThread::unblockVirtualThreads); 1630 unblocker.setDaemon(true); 1631 unblocker.start(); 1632 } 1633 }