1 /* 2 * Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.lang; 26 27 import java.lang.invoke.MethodHandles; 28 import java.lang.reflect.Constructor; 29 import java.util.Locale; 30 import java.util.Objects; 31 import java.util.concurrent.CountDownLatch; 32 import java.util.concurrent.Executors; 33 import java.util.concurrent.ForkJoinPool; 34 import java.util.concurrent.ForkJoinTask; 35 import java.util.concurrent.Future; 36 import java.util.concurrent.RejectedExecutionException; 37 import java.util.concurrent.ScheduledExecutorService; 38 import java.util.concurrent.ScheduledThreadPoolExecutor; 39 import java.util.concurrent.TimeUnit; 40 import jdk.internal.event.VirtualThreadEndEvent; 41 import jdk.internal.event.VirtualThreadParkEvent; 42 import jdk.internal.event.VirtualThreadStartEvent; 43 import jdk.internal.event.VirtualThreadSubmitFailedEvent; 44 import jdk.internal.misc.CarrierThread; 45 import jdk.internal.misc.InnocuousThread; 46 import jdk.internal.misc.Unsafe; 47 import jdk.internal.vm.Continuation; 48 import jdk.internal.vm.ContinuationScope; 49 import jdk.internal.vm.StackableScope; 50 import jdk.internal.vm.ThreadContainer; 51 import jdk.internal.vm.ThreadContainers; 52 import jdk.internal.vm.annotation.ChangesCurrentThread; 53 import jdk.internal.vm.annotation.Hidden; 54 import jdk.internal.vm.annotation.IntrinsicCandidate; 55 import jdk.internal.vm.annotation.JvmtiHideEvents; 56 import jdk.internal.vm.annotation.JvmtiMountTransition; 57 import jdk.internal.vm.annotation.ReservedStackAccess; 58 import sun.nio.ch.Interruptible; 59 import static java.util.concurrent.TimeUnit.*; 60 61 /** 62 * A thread that is scheduled by the Java virtual machine rather than the operating system. 63 */ 64 final class VirtualThread extends BaseVirtualThread { 65 private static final Unsafe U = Unsafe.getUnsafe(); 66 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads"); 67 68 private static final ForkJoinPool BUILTIN_DEFAULT_SCHEDULER; 69 private static final VirtualThreadScheduler DEFAULT_SCHEDULER; 70 static { 71 // experimental 72 String propValue = System.getProperty("jdk.virtualThreadScheduler.implClass"); 73 if (propValue != null) { 74 var builtinScheduler = createBuiltinDefaultScheduler(true); 75 VirtualThreadScheduler defaultScheduler = builtinScheduler.externalView(); 76 for (String cn: propValue.split(",")) { 77 defaultScheduler = loadCustomScheduler(defaultScheduler, cn); 78 } 79 BUILTIN_DEFAULT_SCHEDULER = builtinScheduler; 80 DEFAULT_SCHEDULER = defaultScheduler; 81 } else { 82 var builtinScheduler = createBuiltinDefaultScheduler(false); 83 BUILTIN_DEFAULT_SCHEDULER = builtinScheduler; 84 DEFAULT_SCHEDULER = builtinScheduler; 85 } 86 } 87 88 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state"); 89 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit"); 90 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread"); 91 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination"); 92 private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList"); 93 94 // scheduler and continuation 95 private final VirtualThreadScheduler scheduler; 96 private final Continuation cont; 97 private final Runnable runContinuation; 98 99 // virtual thread state, accessed by VM 100 private volatile int state; 101 102 /* 103 * Virtual thread state transitions: 104 * 105 * NEW -> STARTED // Thread.start, schedule to run 106 * STARTED -> TERMINATED // failed to start 107 * STARTED -> RUNNING // first run 108 * RUNNING -> TERMINATED // done 109 * 110 * RUNNING -> PARKING // Thread parking with LockSupport.park 111 * PARKING -> PARKED // cont.yield successful, parked indefinitely 112 * PARKING -> PINNED // cont.yield failed, parked indefinitely on carrier 113 * PARKED -> UNPARKED // unparked, may be scheduled to continue 114 * PINNED -> RUNNING // unparked, continue execution on same carrier 115 * UNPARKED -> RUNNING // continue execution after park 116 * 117 * RUNNING -> TIMED_PARKING // Thread parking with LockSupport.parkNanos 118 * TIMED_PARKING -> TIMED_PARKED // cont.yield successful, timed-parked 119 * TIMED_PARKING -> TIMED_PINNED // cont.yield failed, timed-parked on carrier 120 * TIMED_PARKED -> UNPARKED // unparked, may be scheduled to continue 121 * TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier 122 * 123 * RUNNING -> BLOCKING // blocking on monitor enter 124 * BLOCKING -> BLOCKED // blocked on monitor enter 125 * BLOCKED -> UNBLOCKED // unblocked, may be scheduled to continue 126 * UNBLOCKED -> RUNNING // continue execution after blocked on monitor enter 127 * 128 * RUNNING -> WAITING // transitional state during wait on monitor 129 * WAITING -> WAIT // waiting on monitor 130 * WAIT -> BLOCKED // notified, waiting to be unblocked by monitor owner 131 * WAIT -> UNBLOCKED // timed-out/interrupted 132 * 133 * RUNNING -> TIMED_WAITING // transition state during timed-waiting on monitor 134 * TIMED_WAITING -> TIMED_WAIT // timed-waiting on monitor 135 * TIMED_WAIT -> BLOCKED // notified, waiting to be unblocked by monitor owner 136 * TIMED_WAIT -> UNBLOCKED // timed-out/interrupted 137 * 138 * RUNNING -> YIELDING // Thread.yield 139 * YIELDING -> YIELDED // cont.yield successful, may be scheduled to continue 140 * YIELDING -> RUNNING // cont.yield failed 141 * YIELDED -> RUNNING // continue execution after Thread.yield 142 */ 143 private static final int NEW = 0; 144 private static final int STARTED = 1; 145 private static final int RUNNING = 2; // runnable-mounted 146 147 // untimed and timed parking 148 private static final int PARKING = 3; 149 private static final int PARKED = 4; // unmounted 150 private static final int PINNED = 5; // mounted 151 private static final int TIMED_PARKING = 6; 152 private static final int TIMED_PARKED = 7; // unmounted 153 private static final int TIMED_PINNED = 8; // mounted 154 private static final int UNPARKED = 9; // unmounted but runnable 155 156 // Thread.yield 157 private static final int YIELDING = 10; 158 private static final int YIELDED = 11; // unmounted but runnable 159 160 // monitor enter 161 private static final int BLOCKING = 12; 162 private static final int BLOCKED = 13; // unmounted 163 private static final int UNBLOCKED = 14; // unmounted but runnable 164 165 // monitor wait/timed-wait 166 private static final int WAITING = 15; 167 private static final int WAIT = 16; // waiting in Object.wait 168 private static final int TIMED_WAITING = 17; 169 private static final int TIMED_WAIT = 18; // waiting in timed-Object.wait 170 171 private static final int TERMINATED = 99; // final state 172 173 // can be suspended from scheduling when unmounted 174 private static final int SUSPENDED = 1 << 8; 175 176 // parking permit made available by LockSupport.unpark 177 private volatile boolean parkPermit; 178 179 // blocking permit made available by unblocker thread when another thread exits monitor 180 private volatile boolean blockPermit; 181 182 // true when on the list of virtual threads waiting to be unblocked 183 private volatile boolean onWaitingList; 184 185 // next virtual thread on the list of virtual threads waiting to be unblocked 186 private volatile VirtualThread next; 187 188 // notified by Object.notify/notifyAll while waiting in Object.wait 189 private volatile boolean notified; 190 191 // true when waiting in Object.wait, false for VM internal uninterruptible Object.wait 192 private volatile boolean interruptibleWait; 193 194 // timed-wait support 195 private byte timedWaitSeqNo; 196 197 // timeout for timed-park and timed-wait, only accessed on current/carrier thread 198 private long timeout; 199 200 // timer task for timed-park and timed-wait, only accessed on current/carrier thread 201 private Future<?> timeoutTask; 202 203 // carrier thread when mounted, accessed by VM 204 private volatile Thread carrierThread; 205 206 // termination object when joining, created lazily if needed 207 private volatile CountDownLatch termination; 208 209 /** 210 * Returns the default scheduler. 211 */ 212 static VirtualThreadScheduler defaultScheduler() { 213 return DEFAULT_SCHEDULER; 214 } 215 216 /** 217 * Returns true if using a custom default scheduler. 218 */ 219 static boolean isCustomDefaultScheduler() { 220 return DEFAULT_SCHEDULER != BUILTIN_DEFAULT_SCHEDULER; 221 } 222 223 /** 224 * Returns the continuation scope used for virtual threads. 225 */ 226 static ContinuationScope continuationScope() { 227 return VTHREAD_SCOPE; 228 } 229 230 /** 231 * Return the scheduler for this thread. 232 * @param revealBuiltin true to reveal the built-in default scheduler, false to hide 233 */ 234 VirtualThreadScheduler scheduler(boolean revealBuiltin) { 235 if (scheduler instanceof BuiltinDefaultScheduler builtin && !revealBuiltin) { 236 return builtin.externalView(); 237 } else { 238 return scheduler; 239 } 240 } 241 242 /** 243 * Creates a new {@code VirtualThread} to run the given task with the given scheduler. 244 * 245 * @param scheduler the scheduler or null for default scheduler 246 * @param name thread name 247 * @param characteristics characteristics 248 * @param task the task to execute 249 */ 250 VirtualThread(VirtualThreadScheduler scheduler, 251 String name, 252 int characteristics, 253 Runnable task) { 254 super(name, characteristics, /*bound*/ false); 255 Objects.requireNonNull(task); 256 257 // use default scheduler if not provided 258 if (scheduler == null) { 259 scheduler = DEFAULT_SCHEDULER; 260 } 261 262 this.scheduler = scheduler; 263 this.cont = new VThreadContinuation(this, task); 264 this.runContinuation = this::runContinuation; 265 } 266 267 /** 268 * The continuation that a virtual thread executes. 269 */ 270 private static class VThreadContinuation extends Continuation { 271 VThreadContinuation(VirtualThread vthread, Runnable task) { 272 super(VTHREAD_SCOPE, wrap(vthread, task)); 273 } 274 @Override 275 protected void onPinned(Continuation.Pinned reason) { 276 } 277 private static Runnable wrap(VirtualThread vthread, Runnable task) { 278 return new Runnable() { 279 @Hidden 280 @JvmtiHideEvents 281 public void run() { 282 vthread.notifyJvmtiStart(); // notify JVMTI 283 try { 284 vthread.run(task); 285 } finally { 286 vthread.notifyJvmtiEnd(); // notify JVMTI 287 } 288 } 289 }; 290 } 291 } 292 293 /** 294 * Runs or continues execution on the current thread. The virtual thread is mounted 295 * on the current thread before the task runs or continues. It unmounts when the 296 * task completes or yields. 297 */ 298 @ChangesCurrentThread // allow mount/unmount to be inlined 299 private void runContinuation() { 300 // the carrier must be a platform thread 301 if (Thread.currentThread().isVirtual()) { 302 throw new WrongThreadException(); 303 } 304 305 // set state to RUNNING 306 int initialState = state(); 307 if (initialState == STARTED || initialState == UNPARKED 308 || initialState == UNBLOCKED || initialState == YIELDED) { 309 // newly started or continue after parking/blocking/Thread.yield 310 if (!compareAndSetState(initialState, RUNNING)) { 311 return; 312 } 313 // consume permit when continuing after parking or blocking. If continue 314 // after a timed-park or timed-wait then the timeout task is cancelled. 315 if (initialState == UNPARKED) { 316 cancelTimeoutTask(); 317 setParkPermit(false); 318 } else if (initialState == UNBLOCKED) { 319 cancelTimeoutTask(); 320 blockPermit = false; 321 } 322 } else { 323 // not runnable 324 return; 325 } 326 327 mount(); 328 try { 329 cont.run(); 330 } finally { 331 unmount(); 332 if (cont.isDone()) { 333 afterDone(); 334 } else { 335 afterYield(); 336 } 337 } 338 } 339 340 /** 341 * Cancel timeout task when continuing after timed-park or timed-wait. 342 * The timeout task may be executing, or may have already completed. 343 */ 344 private void cancelTimeoutTask() { 345 if (timeoutTask != null) { 346 timeoutTask.cancel(false); 347 timeoutTask = null; 348 } 349 } 350 351 /** 352 * Submits the runContinuation task to the scheduler. For the built-in default 353 * scheduler, the task will be pushed to the local queue if possible, otherwise it 354 * will be pushed to an external submission queue. 355 * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown 356 * @throws RejectedExecutionException 357 */ 358 private void submitRunContinuation(boolean retryOnOOME) { 359 boolean done = false; 360 while (!done) { 361 try { 362 // Pin the continuation to prevent the virtual thread from unmounting 363 // when submitting a task. For the default scheduler this ensures that 364 // the carrier doesn't change when pushing a task. For other schedulers 365 // it avoids deadlock that could arise due to carriers and virtual 366 // threads contending for a lock. 367 if (currentThread().isVirtual()) { 368 Continuation.pin(); 369 try { 370 scheduler.onContinue(this, runContinuation); 371 } finally { 372 Continuation.unpin(); 373 } 374 } else { 375 scheduler.onContinue(this, runContinuation); 376 } 377 done = true; 378 } catch (RejectedExecutionException ree) { 379 submitFailed(ree); 380 throw ree; 381 } catch (OutOfMemoryError e) { 382 if (retryOnOOME) { 383 U.park(false, 100_000_000); // 100ms 384 } else { 385 throw e; 386 } 387 } 388 } 389 } 390 391 /** 392 * Submits the runContinuation task to the scheduler. For the default scheduler, 393 * and calling it on a worker thread, the task will be pushed to the local queue, 394 * otherwise it will be pushed to an external submission queue. 395 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 396 * @throws RejectedExecutionException 397 */ 398 private void submitRunContinuation() { 399 submitRunContinuation(true); 400 } 401 402 /** 403 * Invoked from a carrier thread to lazy submit the runContinuation task to the 404 * carrier's local queue if the queue is empty. If not empty, or invoked by a thread 405 * for a custom scheduler, then it just submits the task to the scheduler. 406 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 407 * @throws RejectedExecutionException 408 * @see ForkJoinPool#lazySubmit(ForkJoinTask) 409 */ 410 private void lazySubmitRunContinuation() { 411 assert !currentThread().isVirtual(); 412 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 413 try { 414 ct.getPool().lazySubmit(ForkJoinTask.adapt(runContinuation)); 415 } catch (RejectedExecutionException ree) { 416 submitFailed(ree); 417 throw ree; 418 } catch (OutOfMemoryError e) { 419 submitRunContinuation(); 420 } 421 } else { 422 submitRunContinuation(); 423 } 424 } 425 426 /** 427 * Invoked from a carrier thread to externally submit the runContinuation task to the 428 * scheduler. If invoked by a thread for a custom scheduler, then it just submits the 429 * task to the scheduler. 430 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 431 * @throws RejectedExecutionException 432 * @see ForkJoinPool#externalSubmit(ForkJoinTask) 433 */ 434 private void externalSubmitRunContinuation() { 435 assert !currentThread().isVirtual(); 436 if (currentThread() instanceof CarrierThread ct) { 437 try { 438 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation)); 439 } catch (RejectedExecutionException ree) { 440 submitFailed(ree); 441 throw ree; 442 } catch (OutOfMemoryError e) { 443 submitRunContinuation(); 444 } 445 } else { 446 submitRunContinuation(); 447 } 448 } 449 450 /** 451 * Invoked from Thread.start to externally submit the runContinuation task to the 452 * scheduler. If this virtual thread is scheduled by the built-in default scheduler, 453 * and this method is called from a virtual thread scheduled by the built-in default 454 * scheduler, then it uses externalSubmit to ensure that the task is pushed to an 455 * external submission queue rather than the local queue. 456 * @throws RejectedExecutionException 457 * @throws OutOfMemoryError 458 * @see ForkJoinPool#externalSubmit(ForkJoinTask) 459 */ 460 private void externalSubmitRunContinuationOrThrow() { 461 try { 462 if (currentThread().isVirtual()) { 463 // Pin the continuation to prevent the virtual thread from unmounting 464 // when submitting a task. This avoids deadlock that could arise due to 465 // carriers and virtual threads contending for a lock. 466 Continuation.pin(); 467 try { 468 if (scheduler == BUILTIN_DEFAULT_SCHEDULER 469 && currentCarrierThread() instanceof CarrierThread ct) { 470 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation)); 471 } else { 472 scheduler.onStart(this, runContinuation); 473 } 474 } finally { 475 Continuation.unpin(); 476 } 477 } else { 478 scheduler.onStart(this, runContinuation); 479 } 480 } catch (RejectedExecutionException ree) { 481 submitFailed(ree); 482 throw ree; 483 } 484 } 485 486 /** 487 * If enabled, emits a JFR VirtualThreadSubmitFailedEvent. 488 */ 489 private void submitFailed(RejectedExecutionException ree) { 490 var event = new VirtualThreadSubmitFailedEvent(); 491 if (event.isEnabled()) { 492 event.javaThreadId = threadId(); 493 event.exceptionMessage = ree.getMessage(); 494 event.commit(); 495 } 496 } 497 498 /** 499 * Runs a task in the context of this virtual thread. 500 */ 501 private void run(Runnable task) { 502 assert Thread.currentThread() == this && state == RUNNING; 503 504 // emit JFR event if enabled 505 if (VirtualThreadStartEvent.isTurnedOn()) { 506 var event = new VirtualThreadStartEvent(); 507 event.javaThreadId = threadId(); 508 event.commit(); 509 } 510 511 Object bindings = Thread.scopedValueBindings(); 512 try { 513 runWith(bindings, task); 514 } catch (Throwable exc) { 515 dispatchUncaughtException(exc); 516 } finally { 517 // pop any remaining scopes from the stack, this may block 518 StackableScope.popAll(); 519 520 // emit JFR event if enabled 521 if (VirtualThreadEndEvent.isTurnedOn()) { 522 var event = new VirtualThreadEndEvent(); 523 event.javaThreadId = threadId(); 524 event.commit(); 525 } 526 } 527 } 528 529 /** 530 * Mounts this virtual thread onto the current platform thread. On 531 * return, the current thread is the virtual thread. 532 */ 533 @ChangesCurrentThread 534 @ReservedStackAccess 535 private void mount() { 536 // notify JVMTI before mount 537 notifyJvmtiMount(/*hide*/true); 538 539 // sets the carrier thread 540 Thread carrier = Thread.currentCarrierThread(); 541 setCarrierThread(carrier); 542 543 // sync up carrier thread interrupted status if needed 544 if (interrupted) { 545 carrier.setInterrupt(); 546 } else if (carrier.isInterrupted()) { 547 synchronized (interruptLock) { 548 // need to recheck interrupted status 549 if (!interrupted) { 550 carrier.clearInterrupt(); 551 } 552 } 553 } 554 555 // set Thread.currentThread() to return this virtual thread 556 carrier.setCurrentThread(this); 557 } 558 559 /** 560 * Unmounts this virtual thread from the carrier. On return, the 561 * current thread is the current platform thread. 562 */ 563 @ChangesCurrentThread 564 @ReservedStackAccess 565 private void unmount() { 566 assert !Thread.holdsLock(interruptLock); 567 568 // set Thread.currentThread() to return the platform thread 569 Thread carrier = this.carrierThread; 570 carrier.setCurrentThread(carrier); 571 572 // break connection to carrier thread, synchronized with interrupt 573 synchronized (interruptLock) { 574 setCarrierThread(null); 575 } 576 carrier.clearInterrupt(); 577 578 // notify JVMTI after unmount 579 notifyJvmtiUnmount(/*hide*/false); 580 } 581 582 /** 583 * Invokes Continuation.yield, notifying JVMTI (if enabled) to hide frames until 584 * the continuation continues. 585 */ 586 @Hidden 587 private boolean yieldContinuation() { 588 notifyJvmtiUnmount(/*hide*/true); 589 try { 590 return Continuation.yield(VTHREAD_SCOPE); 591 } finally { 592 notifyJvmtiMount(/*hide*/false); 593 } 594 } 595 596 /** 597 * Invoked in the context of the carrier thread after the Continuation yields when 598 * parking, blocking on monitor enter, Object.wait, or Thread.yield. 599 */ 600 private void afterYield() { 601 assert carrierThread == null; 602 603 // re-adjust parallelism if the virtual thread yielded when compensating 604 if (currentThread() instanceof CarrierThread ct) { 605 ct.endBlocking(); 606 } 607 608 int s = state(); 609 610 // LockSupport.park/parkNanos 611 if (s == PARKING || s == TIMED_PARKING) { 612 int newState; 613 if (s == PARKING) { 614 setState(newState = PARKED); 615 } else { 616 // schedule unpark 617 long timeout = this.timeout; 618 assert timeout > 0; 619 timeoutTask = schedule(this::parkTimeoutExpired, timeout, NANOSECONDS); 620 setState(newState = TIMED_PARKED); 621 } 622 623 // may have been unparked while parking 624 if (parkPermit && compareAndSetState(newState, UNPARKED)) { 625 // lazy submit if local queue is empty 626 lazySubmitRunContinuation(); 627 } 628 return; 629 } 630 631 // Thread.yield 632 if (s == YIELDING) { 633 setState(YIELDED); 634 635 // external submit if there are no tasks in the local task queue 636 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 637 externalSubmitRunContinuation(); 638 } else { 639 submitRunContinuation(); 640 } 641 return; 642 } 643 644 // blocking on monitorenter 645 if (s == BLOCKING) { 646 setState(BLOCKED); 647 648 // may have been unblocked while blocking 649 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) { 650 // lazy submit if local queue is empty 651 lazySubmitRunContinuation(); 652 } 653 return; 654 } 655 656 // Object.wait 657 if (s == WAITING || s == TIMED_WAITING) { 658 int newState; 659 boolean interruptible = interruptibleWait; 660 if (s == WAITING) { 661 setState(newState = WAIT); 662 } else { 663 // For timed-wait, a timeout task is scheduled to execute. The timeout 664 // task will change the thread state to UNBLOCKED and submit the thread 665 // to the scheduler. A sequence number is used to ensure that the timeout 666 // task only unblocks the thread for this timed-wait. We synchronize with 667 // the timeout task to coordinate access to the sequence number and to 668 // ensure the timeout task doesn't execute until the thread has got to 669 // the TIMED_WAIT state. 670 long timeout = this.timeout; 671 assert timeout > 0; 672 synchronized (timedWaitLock()) { 673 byte seqNo = ++timedWaitSeqNo; 674 timeoutTask = schedule(() -> waitTimeoutExpired(seqNo), timeout, MILLISECONDS); 675 setState(newState = TIMED_WAIT); 676 } 677 } 678 679 // may have been notified while in transition to wait state 680 if (notified && compareAndSetState(newState, BLOCKED)) { 681 // may have even been unblocked already 682 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) { 683 submitRunContinuation(); 684 } 685 return; 686 } 687 688 // may have been interrupted while in transition to wait state 689 if (interruptible && interrupted && compareAndSetState(newState, UNBLOCKED)) { 690 submitRunContinuation(); 691 return; 692 } 693 return; 694 } 695 696 assert false; 697 } 698 699 /** 700 * Invoked after the continuation completes. 701 */ 702 private void afterDone() { 703 afterDone(true); 704 } 705 706 /** 707 * Invoked after the continuation completes (or start failed). Sets the thread 708 * state to TERMINATED and notifies anyone waiting for the thread to terminate. 709 * 710 * @param notifyContainer true if its container should be notified 711 */ 712 private void afterDone(boolean notifyContainer) { 713 assert carrierThread == null; 714 setState(TERMINATED); 715 716 // notify anyone waiting for this virtual thread to terminate 717 CountDownLatch termination = this.termination; 718 if (termination != null) { 719 assert termination.getCount() == 1; 720 termination.countDown(); 721 } 722 723 // notify container 724 if (notifyContainer) { 725 threadContainer().remove(this); 726 } 727 728 // clear references to thread locals 729 clearReferences(); 730 } 731 732 /** 733 * Schedules this {@code VirtualThread} to execute. 734 * 735 * @throws IllegalStateException if the container is shutdown or closed 736 * @throws IllegalThreadStateException if the thread has already been started 737 * @throws RejectedExecutionException if the scheduler cannot accept a task 738 */ 739 @Override 740 void start(ThreadContainer container) { 741 if (!compareAndSetState(NEW, STARTED)) { 742 throw new IllegalThreadStateException("Already started"); 743 } 744 745 // bind thread to container 746 assert threadContainer() == null; 747 setThreadContainer(container); 748 749 // start thread 750 boolean addedToContainer = false; 751 boolean started = false; 752 try { 753 container.add(this); // may throw 754 addedToContainer = true; 755 756 // scoped values may be inherited 757 inheritScopedValueBindings(container); 758 759 // submit task to run thread, using externalSubmit if possible 760 externalSubmitRunContinuationOrThrow(); 761 started = true; 762 } finally { 763 if (!started) { 764 afterDone(addedToContainer); 765 } 766 } 767 } 768 769 @Override 770 public void start() { 771 start(ThreadContainers.root()); 772 } 773 774 @Override 775 public void run() { 776 // do nothing 777 } 778 779 /** 780 * Parks until unparked or interrupted. If already unparked then the parking 781 * permit is consumed and this method completes immediately (meaning it doesn't 782 * yield). It also completes immediately if the interrupted status is set. 783 */ 784 @Override 785 void park() { 786 assert Thread.currentThread() == this; 787 788 // complete immediately if parking permit available or interrupted 789 if (getAndSetParkPermit(false) || interrupted) 790 return; 791 792 // park the thread 793 boolean yielded = false; 794 long eventStartTime = VirtualThreadParkEvent.eventStartTime(); 795 setState(PARKING); 796 try { 797 yielded = yieldContinuation(); 798 } catch (OutOfMemoryError e) { 799 // park on carrier 800 } finally { 801 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 802 if (yielded) { 803 VirtualThreadParkEvent.offer(eventStartTime, Long.MIN_VALUE); 804 } else { 805 assert state() == PARKING; 806 setState(RUNNING); 807 } 808 } 809 810 // park on the carrier thread when pinned 811 if (!yielded) { 812 parkOnCarrierThread(false, 0); 813 } 814 } 815 816 /** 817 * Parks up to the given waiting time or until unparked or interrupted. 818 * If already unparked then the parking permit is consumed and this method 819 * completes immediately (meaning it doesn't yield). It also completes immediately 820 * if the interrupted status is set or the waiting time is {@code <= 0}. 821 * 822 * @param nanos the maximum number of nanoseconds to wait. 823 */ 824 @Override 825 void parkNanos(long nanos) { 826 assert Thread.currentThread() == this; 827 828 // complete immediately if parking permit available or interrupted 829 if (getAndSetParkPermit(false) || interrupted) 830 return; 831 832 // park the thread for the waiting time 833 if (nanos > 0) { 834 long startTime = System.nanoTime(); 835 836 // park the thread, afterYield will schedule the thread to unpark 837 boolean yielded = false; 838 long eventStartTime = VirtualThreadParkEvent.eventStartTime(); 839 timeout = nanos; 840 setState(TIMED_PARKING); 841 try { 842 yielded = yieldContinuation(); 843 } catch (OutOfMemoryError e) { 844 // park on carrier 845 } finally { 846 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 847 if (yielded) { 848 VirtualThreadParkEvent.offer(eventStartTime, nanos); 849 } else { 850 assert state() == TIMED_PARKING; 851 setState(RUNNING); 852 } 853 } 854 855 // park on carrier thread for remaining time when pinned (or OOME) 856 if (!yielded) { 857 long remainingNanos = nanos - (System.nanoTime() - startTime); 858 parkOnCarrierThread(true, remainingNanos); 859 } 860 } 861 } 862 863 /** 864 * Parks the current carrier thread up to the given waiting time or until 865 * unparked or interrupted. If the virtual thread is interrupted then the 866 * interrupted status will be propagated to the carrier thread. 867 * @param timed true for a timed park, false for untimed 868 * @param nanos the waiting time in nanoseconds 869 */ 870 private void parkOnCarrierThread(boolean timed, long nanos) { 871 assert state() == RUNNING; 872 873 setState(timed ? TIMED_PINNED : PINNED); 874 try { 875 if (!parkPermit) { 876 if (!timed) { 877 U.park(false, 0); 878 } else if (nanos > 0) { 879 U.park(false, nanos); 880 } 881 } 882 } finally { 883 setState(RUNNING); 884 } 885 886 // consume parking permit 887 setParkPermit(false); 888 889 // JFR jdk.VirtualThreadPinned event 890 postPinnedEvent("LockSupport.park"); 891 } 892 893 /** 894 * Call into VM when pinned to record a JFR jdk.VirtualThreadPinned event. 895 * Recording the event in the VM avoids having JFR event recorded in Java 896 * with the same name, but different ID, to events recorded by the VM. 897 */ 898 @Hidden 899 private static native void postPinnedEvent(String op); 900 901 /** 902 * Re-enables this virtual thread for scheduling. If this virtual thread is parked 903 * then its task is scheduled to continue, otherwise its next call to {@code park} or 904 * {@linkplain #parkNanos(long) parkNanos} is guaranteed not to block. 905 * @throws RejectedExecutionException if the scheduler cannot accept a task 906 */ 907 @Override 908 void unpark() { 909 if (!getAndSetParkPermit(true) && currentThread() != this) { 910 int s = state(); 911 912 // unparked while parked 913 if ((s == PARKED || s == TIMED_PARKED) && compareAndSetState(s, UNPARKED)) { 914 submitRunContinuation(); 915 return; 916 } 917 918 // unparked while parked when pinned 919 if (s == PINNED || s == TIMED_PINNED) { 920 // unpark carrier thread when pinned 921 disableSuspendAndPreempt(); 922 try { 923 synchronized (carrierThreadAccessLock()) { 924 Thread carrier = carrierThread; 925 if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) { 926 U.unpark(carrier); 927 } 928 } 929 } finally { 930 enableSuspendAndPreempt(); 931 } 932 return; 933 } 934 } 935 } 936 937 /** 938 * Invoked by unblocker thread to unblock this virtual thread. 939 */ 940 private void unblock() { 941 assert !Thread.currentThread().isVirtual(); 942 blockPermit = true; 943 if (state() == BLOCKED && compareAndSetState(BLOCKED, UNBLOCKED)) { 944 submitRunContinuation(); 945 } 946 } 947 948 /** 949 * Invoked by FJP worker thread or STPE thread when park timeout expires. 950 */ 951 private void parkTimeoutExpired() { 952 assert !VirtualThread.currentThread().isVirtual(); 953 if (!getAndSetParkPermit(true) 954 && (state() == TIMED_PARKED) 955 && compareAndSetState(TIMED_PARKED, UNPARKED)) { 956 lazySubmitRunContinuation(); 957 } 958 } 959 960 /** 961 * Invoked by FJP worker thread or STPE thread when wait timeout expires. 962 * If the virtual thread is in timed-wait then this method will unblock the thread 963 * and submit its task so that it continues and attempts to reenter the monitor. 964 * This method does nothing if the thread has been woken by notify or interrupt. 965 */ 966 private void waitTimeoutExpired(byte seqNo) { 967 assert !Thread.currentThread().isVirtual(); 968 for (;;) { 969 boolean unblocked = false; 970 synchronized (timedWaitLock()) { 971 if (seqNo != timedWaitSeqNo) { 972 // this timeout task is for a past timed-wait 973 return; 974 } 975 int s = state(); 976 if (s == TIMED_WAIT) { 977 unblocked = compareAndSetState(TIMED_WAIT, UNBLOCKED); 978 } else if (s != (TIMED_WAIT | SUSPENDED)) { 979 // notified or interrupted, no longer waiting 980 return; 981 } 982 } 983 if (unblocked) { 984 lazySubmitRunContinuation(); 985 return; 986 } 987 // need to retry when thread is suspended in time-wait 988 Thread.yield(); 989 } 990 } 991 992 /** 993 * Attempts to yield the current virtual thread (Thread.yield). 994 */ 995 void tryYield() { 996 assert Thread.currentThread() == this; 997 setState(YIELDING); 998 boolean yielded = false; 999 try { 1000 yielded = yieldContinuation(); // may throw 1001 } finally { 1002 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 1003 if (!yielded) { 1004 assert state() == YIELDING; 1005 setState(RUNNING); 1006 } 1007 } 1008 } 1009 1010 /** 1011 * Sleep the current thread for the given sleep time (in nanoseconds). If 1012 * nanos is 0 then the thread will attempt to yield. 1013 * 1014 * @implNote This implementation parks the thread for the given sleeping time 1015 * and will therefore be observed in PARKED state during the sleep. Parking 1016 * will consume the parking permit so this method makes available the parking 1017 * permit after the sleep. This may be observed as a spurious, but benign, 1018 * wakeup when the thread subsequently attempts to park. 1019 * 1020 * @param nanos the maximum number of nanoseconds to sleep 1021 * @throws InterruptedException if interrupted while sleeping 1022 */ 1023 void sleepNanos(long nanos) throws InterruptedException { 1024 assert Thread.currentThread() == this && nanos >= 0; 1025 if (getAndClearInterrupt()) 1026 throw new InterruptedException(); 1027 if (nanos == 0) { 1028 tryYield(); 1029 } else { 1030 // park for the sleep time 1031 try { 1032 long remainingNanos = nanos; 1033 long startNanos = System.nanoTime(); 1034 while (remainingNanos > 0) { 1035 parkNanos(remainingNanos); 1036 if (getAndClearInterrupt()) { 1037 throw new InterruptedException(); 1038 } 1039 remainingNanos = nanos - (System.nanoTime() - startNanos); 1040 } 1041 } finally { 1042 // may have been unparked while sleeping 1043 setParkPermit(true); 1044 } 1045 } 1046 } 1047 1048 /** 1049 * Waits up to {@code nanos} nanoseconds for this virtual thread to terminate. 1050 * A timeout of {@code 0} means to wait forever. 1051 * 1052 * @throws InterruptedException if interrupted while waiting 1053 * @return true if the thread has terminated 1054 */ 1055 boolean joinNanos(long nanos) throws InterruptedException { 1056 if (state() == TERMINATED) 1057 return true; 1058 1059 // ensure termination object exists, then re-check state 1060 CountDownLatch termination = getTermination(); 1061 if (state() == TERMINATED) 1062 return true; 1063 1064 // wait for virtual thread to terminate 1065 if (nanos == 0) { 1066 termination.await(); 1067 } else { 1068 boolean terminated = termination.await(nanos, NANOSECONDS); 1069 if (!terminated) { 1070 // waiting time elapsed 1071 return false; 1072 } 1073 } 1074 assert state() == TERMINATED; 1075 return true; 1076 } 1077 1078 @Override 1079 void blockedOn(Interruptible b) { 1080 disableSuspendAndPreempt(); 1081 try { 1082 super.blockedOn(b); 1083 } finally { 1084 enableSuspendAndPreempt(); 1085 } 1086 } 1087 1088 @Override 1089 public void interrupt() { 1090 if (Thread.currentThread() != this) { 1091 // if current thread is a virtual thread then prevent it from being 1092 // suspended or unmounted when entering or holding interruptLock 1093 Interruptible blocker; 1094 disableSuspendAndPreempt(); 1095 try { 1096 synchronized (interruptLock) { 1097 interrupted = true; 1098 blocker = nioBlocker(); 1099 if (blocker != null) { 1100 blocker.interrupt(this); 1101 } 1102 1103 // interrupt carrier thread if mounted 1104 Thread carrier = carrierThread; 1105 if (carrier != null) carrier.setInterrupt(); 1106 } 1107 } finally { 1108 enableSuspendAndPreempt(); 1109 } 1110 1111 // notify blocker after releasing interruptLock 1112 if (blocker != null) { 1113 blocker.postInterrupt(); 1114 } 1115 1116 // make available parking permit, unpark thread if parked 1117 unpark(); 1118 1119 // if thread is waiting in Object.wait then schedule to try to reenter 1120 int s = state(); 1121 if ((s == WAIT || s == TIMED_WAIT) && compareAndSetState(s, UNBLOCKED)) { 1122 submitRunContinuation(); 1123 } 1124 1125 } else { 1126 interrupted = true; 1127 carrierThread.setInterrupt(); 1128 setParkPermit(true); 1129 } 1130 } 1131 1132 @Override 1133 public boolean isInterrupted() { 1134 return interrupted; 1135 } 1136 1137 @Override 1138 boolean getAndClearInterrupt() { 1139 assert Thread.currentThread() == this; 1140 boolean oldValue = interrupted; 1141 if (oldValue) { 1142 disableSuspendAndPreempt(); 1143 try { 1144 synchronized (interruptLock) { 1145 interrupted = false; 1146 carrierThread.clearInterrupt(); 1147 } 1148 } finally { 1149 enableSuspendAndPreempt(); 1150 } 1151 } 1152 return oldValue; 1153 } 1154 1155 @Override 1156 Thread.State threadState() { 1157 int s = state(); 1158 switch (s & ~SUSPENDED) { 1159 case NEW: 1160 return Thread.State.NEW; 1161 case STARTED: 1162 // return NEW if thread container not yet set 1163 if (threadContainer() == null) { 1164 return Thread.State.NEW; 1165 } else { 1166 return Thread.State.RUNNABLE; 1167 } 1168 case UNPARKED: 1169 case UNBLOCKED: 1170 case YIELDED: 1171 // runnable, not mounted 1172 return Thread.State.RUNNABLE; 1173 case RUNNING: 1174 // if mounted then return state of carrier thread 1175 if (Thread.currentThread() != this) { 1176 disableSuspendAndPreempt(); 1177 try { 1178 synchronized (carrierThreadAccessLock()) { 1179 Thread carrierThread = this.carrierThread; 1180 if (carrierThread != null) { 1181 return carrierThread.threadState(); 1182 } 1183 } 1184 } finally { 1185 enableSuspendAndPreempt(); 1186 } 1187 } 1188 // runnable, mounted 1189 return Thread.State.RUNNABLE; 1190 case PARKING: 1191 case TIMED_PARKING: 1192 case WAITING: 1193 case TIMED_WAITING: 1194 case YIELDING: 1195 // runnable, in transition 1196 return Thread.State.RUNNABLE; 1197 case PARKED: 1198 case PINNED: 1199 case WAIT: 1200 return Thread.State.WAITING; 1201 case TIMED_PARKED: 1202 case TIMED_PINNED: 1203 case TIMED_WAIT: 1204 return Thread.State.TIMED_WAITING; 1205 case BLOCKING: 1206 case BLOCKED: 1207 return Thread.State.BLOCKED; 1208 case TERMINATED: 1209 return Thread.State.TERMINATED; 1210 default: 1211 throw new InternalError(); 1212 } 1213 } 1214 1215 @Override 1216 boolean alive() { 1217 int s = state; 1218 return (s != NEW && s != TERMINATED); 1219 } 1220 1221 @Override 1222 boolean isTerminated() { 1223 return (state == TERMINATED); 1224 } 1225 1226 @Override 1227 StackTraceElement[] asyncGetStackTrace() { 1228 StackTraceElement[] stackTrace; 1229 do { 1230 stackTrace = (carrierThread != null) 1231 ? super.asyncGetStackTrace() // mounted 1232 : tryGetStackTrace(); // unmounted 1233 if (stackTrace == null) { 1234 Thread.yield(); 1235 } 1236 } while (stackTrace == null); 1237 return stackTrace; 1238 } 1239 1240 /** 1241 * Returns the stack trace for this virtual thread if it is unmounted. 1242 * Returns null if the thread is mounted or in transition. 1243 */ 1244 private StackTraceElement[] tryGetStackTrace() { 1245 int initialState = state() & ~SUSPENDED; 1246 switch (initialState) { 1247 case NEW, STARTED, TERMINATED -> { 1248 return new StackTraceElement[0]; // unmounted, empty stack 1249 } 1250 case RUNNING, PINNED, TIMED_PINNED -> { 1251 return null; // mounted 1252 } 1253 case PARKED, TIMED_PARKED, BLOCKED, WAIT, TIMED_WAIT -> { 1254 // unmounted, not runnable 1255 } 1256 case UNPARKED, UNBLOCKED, YIELDED -> { 1257 // unmounted, runnable 1258 } 1259 case PARKING, TIMED_PARKING, BLOCKING, YIELDING, WAITING, TIMED_WAITING -> { 1260 return null; // in transition 1261 } 1262 default -> throw new InternalError("" + initialState); 1263 } 1264 1265 // thread is unmounted, prevent it from continuing 1266 int suspendedState = initialState | SUSPENDED; 1267 if (!compareAndSetState(initialState, suspendedState)) { 1268 return null; 1269 } 1270 1271 // get stack trace and restore state 1272 StackTraceElement[] stack; 1273 try { 1274 stack = cont.getStackTrace(); 1275 } finally { 1276 assert state == suspendedState; 1277 setState(initialState); 1278 } 1279 boolean resubmit = switch (initialState) { 1280 case UNPARKED, UNBLOCKED, YIELDED -> { 1281 // resubmit as task may have run while suspended 1282 yield true; 1283 } 1284 case PARKED, TIMED_PARKED -> { 1285 // resubmit if unparked while suspended 1286 yield parkPermit && compareAndSetState(initialState, UNPARKED); 1287 } 1288 case BLOCKED -> { 1289 // resubmit if unblocked while suspended 1290 yield blockPermit && compareAndSetState(BLOCKED, UNBLOCKED); 1291 } 1292 case WAIT, TIMED_WAIT -> { 1293 // resubmit if notified or interrupted while waiting (Object.wait) 1294 // waitTimeoutExpired will retry if the timed expired when suspended 1295 yield (notified || interrupted) && compareAndSetState(initialState, UNBLOCKED); 1296 } 1297 default -> throw new InternalError(); 1298 }; 1299 if (resubmit) { 1300 submitRunContinuation(); 1301 } 1302 return stack; 1303 } 1304 1305 @Override 1306 public String toString() { 1307 StringBuilder sb = new StringBuilder("VirtualThread[#"); 1308 sb.append(threadId()); 1309 String name = getName(); 1310 if (!name.isEmpty()) { 1311 sb.append(","); 1312 sb.append(name); 1313 } 1314 sb.append("]/"); 1315 1316 // add the carrier state and thread name when mounted 1317 boolean mounted; 1318 if (Thread.currentThread() == this) { 1319 mounted = appendCarrierInfo(sb); 1320 } else { 1321 disableSuspendAndPreempt(); 1322 try { 1323 synchronized (carrierThreadAccessLock()) { 1324 mounted = appendCarrierInfo(sb); 1325 } 1326 } finally { 1327 enableSuspendAndPreempt(); 1328 } 1329 } 1330 1331 // add virtual thread state when not mounted 1332 if (!mounted) { 1333 String stateAsString = threadState().toString(); 1334 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1335 } 1336 1337 return sb.toString(); 1338 } 1339 1340 /** 1341 * Appends the carrier state and thread name to the string buffer if mounted. 1342 * @return true if mounted, false if not mounted 1343 */ 1344 private boolean appendCarrierInfo(StringBuilder sb) { 1345 assert Thread.currentThread() == this || Thread.holdsLock(carrierThreadAccessLock()); 1346 Thread carrier = carrierThread; 1347 if (carrier != null) { 1348 String stateAsString = carrier.threadState().toString(); 1349 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1350 sb.append('@'); 1351 sb.append(carrier.getName()); 1352 return true; 1353 } else { 1354 return false; 1355 } 1356 } 1357 1358 @Override 1359 public int hashCode() { 1360 return (int) threadId(); 1361 } 1362 1363 @Override 1364 public boolean equals(Object obj) { 1365 return obj == this; 1366 } 1367 1368 /** 1369 * Returns the termination object, creating it if needed. 1370 */ 1371 private CountDownLatch getTermination() { 1372 CountDownLatch termination = this.termination; 1373 if (termination == null) { 1374 termination = new CountDownLatch(1); 1375 if (!U.compareAndSetReference(this, TERMINATION, null, termination)) { 1376 termination = this.termination; 1377 } 1378 } 1379 return termination; 1380 } 1381 1382 /** 1383 * Returns the lock object to synchronize on when accessing carrierThread. 1384 * The lock prevents carrierThread from being reset to null during unmount. 1385 */ 1386 private Object carrierThreadAccessLock() { 1387 // return interruptLock as unmount has to coordinate with interrupt 1388 return interruptLock; 1389 } 1390 1391 /** 1392 * Returns a lock object for coordinating timed-wait setup and timeout handling. 1393 */ 1394 private Object timedWaitLock() { 1395 // use this object for now to avoid the overhead of introducing another lock 1396 return runContinuation; 1397 } 1398 1399 /** 1400 * Disallow the current thread be suspended or preempted. 1401 */ 1402 private void disableSuspendAndPreempt() { 1403 notifyJvmtiDisableSuspend(true); 1404 Continuation.pin(); 1405 } 1406 1407 /** 1408 * Allow the current thread be suspended or preempted. 1409 */ 1410 private void enableSuspendAndPreempt() { 1411 Continuation.unpin(); 1412 notifyJvmtiDisableSuspend(false); 1413 } 1414 1415 // -- wrappers for get/set of state, parking permit, and carrier thread -- 1416 1417 private int state() { 1418 return state; // volatile read 1419 } 1420 1421 private void setState(int newValue) { 1422 state = newValue; // volatile write 1423 } 1424 1425 private boolean compareAndSetState(int expectedValue, int newValue) { 1426 return U.compareAndSetInt(this, STATE, expectedValue, newValue); 1427 } 1428 1429 private boolean compareAndSetOnWaitingList(boolean expectedValue, boolean newValue) { 1430 return U.compareAndSetBoolean(this, ON_WAITING_LIST, expectedValue, newValue); 1431 } 1432 1433 private void setParkPermit(boolean newValue) { 1434 if (parkPermit != newValue) { 1435 parkPermit = newValue; 1436 } 1437 } 1438 1439 private boolean getAndSetParkPermit(boolean newValue) { 1440 if (parkPermit != newValue) { 1441 return U.getAndSetBoolean(this, PARK_PERMIT, newValue); 1442 } else { 1443 return newValue; 1444 } 1445 } 1446 1447 private void setCarrierThread(Thread carrier) { 1448 // U.putReferenceRelease(this, CARRIER_THREAD, carrier); 1449 this.carrierThread = carrier; 1450 } 1451 1452 // -- JVM TI support -- 1453 1454 @IntrinsicCandidate 1455 @JvmtiMountTransition 1456 private native void notifyJvmtiStart(); 1457 1458 @IntrinsicCandidate 1459 @JvmtiMountTransition 1460 private native void notifyJvmtiEnd(); 1461 1462 @IntrinsicCandidate 1463 @JvmtiMountTransition 1464 private native void notifyJvmtiMount(boolean hide); 1465 1466 @IntrinsicCandidate 1467 @JvmtiMountTransition 1468 private native void notifyJvmtiUnmount(boolean hide); 1469 1470 @IntrinsicCandidate 1471 private static native void notifyJvmtiDisableSuspend(boolean enter); 1472 1473 private static native void registerNatives(); 1474 static { 1475 registerNatives(); 1476 1477 // ensure VTHREAD_GROUP is created, may be accessed by JVMTI 1478 var group = Thread.virtualThreadGroup(); 1479 1480 // ensure event class is initialized 1481 try { 1482 MethodHandles.lookup().ensureInitialized(VirtualThreadParkEvent.class); 1483 } catch (IllegalAccessException e) { 1484 throw new ExceptionInInitializerError(e); 1485 } 1486 } 1487 1488 /** 1489 * Loads a VirtualThreadScheduler with the given class name. The class must be public 1490 * in an exported package, with public one-arg or no-arg constructor, and be visible 1491 * to the system class loader. 1492 * @param delegate the scheduler that the custom scheduler may delegate to 1493 * @param cn the class name of the custom scheduler 1494 */ 1495 private static VirtualThreadScheduler loadCustomScheduler(VirtualThreadScheduler delegate, String cn) { 1496 try { 1497 Class<?> clazz = Class.forName(cn, true, ClassLoader.getSystemClassLoader()); 1498 VirtualThreadScheduler scheduler; 1499 try { 1500 // 1-arg constructor 1501 Constructor<?> ctor = clazz.getConstructor(VirtualThreadScheduler.class); 1502 scheduler = (VirtualThreadScheduler) ctor.newInstance(delegate); 1503 } catch (NoSuchMethodException e) { 1504 // 0-arg constructor 1505 Constructor<?> ctor = clazz.getConstructor(); 1506 scheduler = (VirtualThreadScheduler) ctor.newInstance(); 1507 } 1508 System.err.println(""" 1509 WARNING: Using custom default scheduler, this is an experimental feature!"""); 1510 return scheduler; 1511 } catch (Exception ex) { 1512 throw new Error(ex); 1513 } 1514 } 1515 1516 /** 1517 * Creates the built-in default ForkJoinPool scheduler. 1518 * @param wrapped true if wrapped by a custom default scheduler 1519 */ 1520 private static BuiltinDefaultScheduler createBuiltinDefaultScheduler(boolean wrapped) { 1521 int parallelism, maxPoolSize, minRunnable; 1522 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism"); 1523 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize"); 1524 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable"); 1525 if (parallelismValue != null) { 1526 parallelism = Integer.parseInt(parallelismValue); 1527 } else { 1528 parallelism = Runtime.getRuntime().availableProcessors(); 1529 } 1530 if (maxPoolSizeValue != null) { 1531 maxPoolSize = Integer.parseInt(maxPoolSizeValue); 1532 parallelism = Integer.min(parallelism, maxPoolSize); 1533 } else { 1534 maxPoolSize = Integer.max(parallelism, 256); 1535 } 1536 if (minRunnableValue != null) { 1537 minRunnable = Integer.parseInt(minRunnableValue); 1538 } else { 1539 minRunnable = Integer.max(parallelism / 2, 1); 1540 } 1541 return new BuiltinDefaultScheduler(parallelism, maxPoolSize, minRunnable, wrapped); 1542 } 1543 1544 /** 1545 * The built-in default ForkJoinPool scheduler. 1546 */ 1547 private static class BuiltinDefaultScheduler 1548 extends ForkJoinPool implements VirtualThreadScheduler { 1549 1550 private static final StableValue<VirtualThreadScheduler> VIEW = StableValue.of(); 1551 1552 BuiltinDefaultScheduler(int parallelism, int maxPoolSize, int minRunnable, boolean wrapped) { 1553 ForkJoinWorkerThreadFactory factory = wrapped 1554 ? ForkJoinPool.defaultForkJoinWorkerThreadFactory 1555 : CarrierThread::new; 1556 Thread.UncaughtExceptionHandler handler = (t, e) -> { }; 1557 boolean asyncMode = true; // FIFO 1558 super(parallelism, factory, handler, asyncMode, 1559 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS); 1560 } 1561 1562 private void adaptAndExecute(Runnable task) { 1563 execute(ForkJoinTask.adapt(task)); 1564 } 1565 1566 @Override 1567 public void onStart(Thread vthread, Runnable task) { 1568 adaptAndExecute(task); 1569 } 1570 1571 @Override 1572 public void onContinue(Thread vthread, Runnable task) { 1573 adaptAndExecute(task); 1574 } 1575 1576 /** 1577 * Wraps the scheduler to avoid leaking a direct reference. 1578 */ 1579 VirtualThreadScheduler externalView() { 1580 BuiltinDefaultScheduler builtin = this; 1581 return VIEW.orElseSet(() -> { 1582 return new VirtualThreadScheduler() { 1583 private void execute(Thread thread, Runnable task) { 1584 Objects.requireNonNull(thread); 1585 if (thread instanceof VirtualThread vthread) { 1586 VirtualThreadScheduler scheduler = vthread.scheduler; 1587 if (scheduler == this || scheduler == DEFAULT_SCHEDULER) { 1588 builtin.adaptAndExecute(task); 1589 } else { 1590 throw new IllegalArgumentException(); 1591 } 1592 } else { 1593 throw new UnsupportedOperationException(); 1594 } 1595 } 1596 @Override 1597 public void onStart(Thread thread, Runnable task) { 1598 execute(thread, task); 1599 } 1600 @Override 1601 public void onContinue(Thread thread, Runnable task) { 1602 execute(thread, task); 1603 } 1604 }; 1605 }); 1606 } 1607 } 1608 1609 /** 1610 * Schedule a runnable task to run after a delay. 1611 */ 1612 private Future<?> schedule(Runnable command, long delay, TimeUnit unit) { 1613 if (scheduler instanceof ForkJoinPool pool) { 1614 return pool.schedule(command, delay, unit); 1615 } else { 1616 return DelayedTaskSchedulers.schedule(command, delay, unit); 1617 } 1618 } 1619 1620 /** 1621 * Supports scheduling a runnable task to run after a delay. It uses a number 1622 * of ScheduledThreadPoolExecutor instances to reduce contention on the delayed 1623 * work queue used. This class is used when using a custom scheduler. 1624 */ 1625 private static class DelayedTaskSchedulers { 1626 private static final ScheduledExecutorService[] INSTANCE = createDelayedTaskSchedulers(); 1627 1628 static Future<?> schedule(Runnable command, long delay, TimeUnit unit) { 1629 long tid = Thread.currentThread().threadId(); 1630 int index = (int) tid & (INSTANCE.length - 1); 1631 return INSTANCE[index].schedule(command, delay, unit); 1632 } 1633 1634 private static ScheduledExecutorService[] createDelayedTaskSchedulers() { 1635 String propName = "jdk.virtualThreadScheduler.timerQueues"; 1636 String propValue = System.getProperty(propName); 1637 int queueCount; 1638 if (propValue != null) { 1639 queueCount = Integer.parseInt(propValue); 1640 if (queueCount != Integer.highestOneBit(queueCount)) { 1641 throw new RuntimeException("Value of " + propName + " must be power of 2"); 1642 } 1643 } else { 1644 int ncpus = Runtime.getRuntime().availableProcessors(); 1645 queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1); 1646 } 1647 var schedulers = new ScheduledExecutorService[queueCount]; 1648 for (int i = 0; i < queueCount; i++) { 1649 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) 1650 Executors.newScheduledThreadPool(1, task -> { 1651 Thread t = InnocuousThread.newThread("VirtualThread-unparker", task); 1652 t.setDaemon(true); 1653 return t; 1654 }); 1655 stpe.setRemoveOnCancelPolicy(true); 1656 schedulers[i] = stpe; 1657 } 1658 return schedulers; 1659 } 1660 } 1661 1662 /** 1663 * Schedule virtual threads that are ready to be scheduled after they blocked on 1664 * monitor enter. 1665 */ 1666 private static void unblockVirtualThreads() { 1667 while (true) { 1668 VirtualThread vthread = takeVirtualThreadListToUnblock(); 1669 while (vthread != null) { 1670 assert vthread.onWaitingList; 1671 VirtualThread nextThread = vthread.next; 1672 1673 // remove from list and unblock 1674 vthread.next = null; 1675 boolean changed = vthread.compareAndSetOnWaitingList(true, false); 1676 assert changed; 1677 vthread.unblock(); 1678 1679 vthread = nextThread; 1680 } 1681 } 1682 } 1683 1684 /** 1685 * Retrieves the list of virtual threads that are waiting to be unblocked, waiting 1686 * if necessary until a list of one or more threads becomes available. 1687 */ 1688 private static native VirtualThread takeVirtualThreadListToUnblock(); 1689 1690 static { 1691 var unblocker = InnocuousThread.newThread("VirtualThread-unblocker", 1692 VirtualThread::unblockVirtualThreads); 1693 unblocker.setDaemon(true); 1694 unblocker.start(); 1695 } 1696 } --- EOF ---