1 /* 2 * Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.lang; 26 27 import java.lang.reflect.Constructor; 28 import java.util.Locale; 29 import java.util.Objects; 30 import java.util.concurrent.CountDownLatch; 31 import java.util.concurrent.Executors; 32 import java.util.concurrent.ForkJoinPool; 33 import java.util.concurrent.ForkJoinTask; 34 import java.util.concurrent.Future; 35 import java.util.concurrent.RejectedExecutionException; 36 import java.util.concurrent.ScheduledExecutorService; 37 import java.util.concurrent.ScheduledThreadPoolExecutor; 38 import java.util.concurrent.TimeUnit; 39 import java.util.function.Supplier; 40 import jdk.internal.event.VirtualThreadEndEvent; 41 import jdk.internal.event.VirtualThreadStartEvent; 42 import jdk.internal.event.VirtualThreadSubmitFailedEvent; 43 import jdk.internal.misc.CarrierThread; 44 import jdk.internal.misc.InnocuousThread; 45 import jdk.internal.misc.Unsafe; 46 import jdk.internal.vm.Continuation; 47 import jdk.internal.vm.ContinuationScope; 48 import jdk.internal.vm.StackableScope; 49 import jdk.internal.vm.ThreadContainer; 50 import jdk.internal.vm.ThreadContainers; 51 import jdk.internal.vm.annotation.ChangesCurrentThread; 52 import jdk.internal.vm.annotation.Hidden; 53 import jdk.internal.vm.annotation.IntrinsicCandidate; 54 import jdk.internal.vm.annotation.JvmtiHideEvents; 55 import jdk.internal.vm.annotation.JvmtiMountTransition; 56 import jdk.internal.vm.annotation.ReservedStackAccess; 57 import sun.nio.ch.Interruptible; 58 import static java.util.concurrent.TimeUnit.*; 59 60 /** 61 * A thread that is scheduled by the Java virtual machine rather than the operating system. 62 */ 63 final class VirtualThread extends BaseVirtualThread { 64 private static final Unsafe U = Unsafe.getUnsafe(); 65 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads"); 66 67 private static final VirtualThreadScheduler DEFAULT_SCHEDULER; 68 private static final boolean IS_CUSTOM_DEFAULT_SCHEDULER; 69 static { 70 // experimental 71 String propValue = System.getProperty("jdk.virtualThreadScheduler.implClass"); 72 if (propValue != null) { 73 DEFAULT_SCHEDULER = createCustomDefaultScheduler(propValue); 74 IS_CUSTOM_DEFAULT_SCHEDULER = true; 75 } else { 76 DEFAULT_SCHEDULER = createDefaultForkJoinPoolScheduler(); 77 IS_CUSTOM_DEFAULT_SCHEDULER = false; 78 } 79 } 80 81 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state"); 82 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit"); 83 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread"); 84 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination"); 85 private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList"); 86 87 // scheduler and continuation 88 private final VirtualThreadScheduler scheduler; 89 private final Continuation cont; 90 private final Runnable runContinuation; 91 92 // virtual thread state, accessed by VM 93 private volatile int state; 94 95 /* 96 * Virtual thread state transitions: 97 * 98 * NEW -> STARTED // Thread.start, schedule to run 99 * STARTED -> TERMINATED // failed to start 100 * STARTED -> RUNNING // first run 101 * RUNNING -> TERMINATED // done 102 * 103 * RUNNING -> PARKING // Thread parking with LockSupport.park 104 * PARKING -> PARKED // cont.yield successful, parked indefinitely 105 * PARKING -> PINNED // cont.yield failed, parked indefinitely on carrier 106 * PARKED -> UNPARKED // unparked, may be scheduled to continue 107 * PINNED -> RUNNING // unparked, continue execution on same carrier 108 * UNPARKED -> RUNNING // continue execution after park 109 * 110 * RUNNING -> TIMED_PARKING // Thread parking with LockSupport.parkNanos 111 * TIMED_PARKING -> TIMED_PARKED // cont.yield successful, timed-parked 112 * TIMED_PARKING -> TIMED_PINNED // cont.yield failed, timed-parked on carrier 113 * TIMED_PARKED -> UNPARKED // unparked, may be scheduled to continue 114 * TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier 115 * 116 * RUNNING -> BLOCKING // blocking on monitor enter 117 * BLOCKING -> BLOCKED // blocked on monitor enter 118 * BLOCKED -> UNBLOCKED // unblocked, may be scheduled to continue 119 * UNBLOCKED -> RUNNING // continue execution after blocked on monitor enter 120 * 121 * RUNNING -> WAITING // transitional state during wait on monitor 122 * WAITING -> WAIT // waiting on monitor 123 * WAIT -> BLOCKED // notified, waiting to be unblocked by monitor owner 124 * WAIT -> UNBLOCKED // timed-out/interrupted 125 * 126 * RUNNING -> TIMED_WAITING // transition state during timed-waiting on monitor 127 * TIMED_WAITING -> TIMED_WAIT // timed-waiting on monitor 128 * TIMED_WAIT -> BLOCKED // notified, waiting to be unblocked by monitor owner 129 * TIMED_WAIT -> UNBLOCKED // timed-out/interrupted 130 * 131 * RUNNING -> YIELDING // Thread.yield 132 * YIELDING -> YIELDED // cont.yield successful, may be scheduled to continue 133 * YIELDING -> RUNNING // cont.yield failed 134 * YIELDED -> RUNNING // continue execution after Thread.yield 135 */ 136 private static final int NEW = 0; 137 private static final int STARTED = 1; 138 private static final int RUNNING = 2; // runnable-mounted 139 140 // untimed and timed parking 141 private static final int PARKING = 3; 142 private static final int PARKED = 4; // unmounted 143 private static final int PINNED = 5; // mounted 144 private static final int TIMED_PARKING = 6; 145 private static final int TIMED_PARKED = 7; // unmounted 146 private static final int TIMED_PINNED = 8; // mounted 147 private static final int UNPARKED = 9; // unmounted but runnable 148 149 // Thread.yield 150 private static final int YIELDING = 10; 151 private static final int YIELDED = 11; // unmounted but runnable 152 153 // monitor enter 154 private static final int BLOCKING = 12; 155 private static final int BLOCKED = 13; // unmounted 156 private static final int UNBLOCKED = 14; // unmounted but runnable 157 158 // monitor wait/timed-wait 159 private static final int WAITING = 15; 160 private static final int WAIT = 16; // waiting in Object.wait 161 private static final int TIMED_WAITING = 17; 162 private static final int TIMED_WAIT = 18; // waiting in timed-Object.wait 163 164 private static final int TERMINATED = 99; // final state 165 166 // can be suspended from scheduling when unmounted 167 private static final int SUSPENDED = 1 << 8; 168 169 // parking permit made available by LockSupport.unpark 170 private volatile boolean parkPermit; 171 172 // blocking permit made available by unblocker thread when another thread exits monitor 173 private volatile boolean blockPermit; 174 175 // true when on the list of virtual threads waiting to be unblocked 176 private volatile boolean onWaitingList; 177 178 // next virtual thread on the list of virtual threads waiting to be unblocked 179 private volatile VirtualThread next; 180 181 // notified by Object.notify/notifyAll while waiting in Object.wait 182 private volatile boolean notified; 183 184 // true when waiting in Object.wait, false for VM internal uninterruptible Object.wait 185 private volatile boolean interruptableWait; 186 187 // timed-wait support 188 private byte timedWaitSeqNo; 189 190 // timeout for timed-park and timed-wait, only accessed on current/carrier thread 191 private long timeout; 192 193 // timer task for timed-park and timed-wait, only accessed on current/carrier thread 194 private Future<?> timeoutTask; 195 196 // carrier thread when mounted, accessed by VM 197 private volatile Thread carrierThread; 198 199 // termination object when joining, created lazily if needed 200 private volatile CountDownLatch termination; 201 202 /** 203 * Returns the default scheduler. 204 */ 205 static VirtualThreadScheduler defaultScheduler() { 206 return DEFAULT_SCHEDULER; 207 } 208 209 /** 210 * Returns true if using a custom default scheduler. 211 */ 212 static boolean isCustomDefaultScheduler() { 213 return IS_CUSTOM_DEFAULT_SCHEDULER; 214 } 215 216 /** 217 * Returns the continuation scope used for virtual threads. 218 */ 219 static ContinuationScope continuationScope() { 220 return VTHREAD_SCOPE; 221 } 222 223 /** 224 * Return the scheduler for this thread. 225 * @param revealBuiltin true to reveal the built-in default scheduler, false to hide 226 */ 227 VirtualThreadScheduler scheduler(boolean revealBuiltin) { 228 if (scheduler instanceof BuiltinDefaultScheduler builtin && !revealBuiltin) { 229 return builtin.externalView(); 230 } else { 231 return scheduler; 232 } 233 } 234 235 /** 236 * Creates a new {@code VirtualThread} to run the given task with the given scheduler. 237 * 238 * @param scheduler the scheduler or null for default scheduler 239 * @param name thread name 240 * @param characteristics characteristics 241 * @param task the task to execute 242 */ 243 VirtualThread(VirtualThreadScheduler scheduler, 244 String name, 245 int characteristics, 246 Runnable task) { 247 super(name, characteristics, /*bound*/ false); 248 Objects.requireNonNull(task); 249 250 // use default scheduler if not provided 251 if (scheduler == null) { 252 scheduler = DEFAULT_SCHEDULER; 253 } 254 255 this.scheduler = scheduler; 256 this.cont = new VThreadContinuation(this, task); 257 this.runContinuation = this::runContinuation; 258 } 259 260 /** 261 * The continuation that a virtual thread executes. 262 */ 263 private static class VThreadContinuation extends Continuation { 264 VThreadContinuation(VirtualThread vthread, Runnable task) { 265 super(VTHREAD_SCOPE, wrap(vthread, task)); 266 } 267 @Override 268 protected void onPinned(Continuation.Pinned reason) { 269 } 270 private static Runnable wrap(VirtualThread vthread, Runnable task) { 271 return new Runnable() { 272 @Hidden 273 @JvmtiHideEvents 274 public void run() { 275 vthread.notifyJvmtiStart(); // notify JVMTI 276 try { 277 vthread.run(task); 278 } finally { 279 vthread.notifyJvmtiEnd(); // notify JVMTI 280 } 281 } 282 }; 283 } 284 } 285 286 /** 287 * Runs or continues execution on the current thread. The virtual thread is mounted 288 * on the current thread before the task runs or continues. It unmounts when the 289 * task completes or yields. 290 */ 291 @ChangesCurrentThread // allow mount/unmount to be inlined 292 private void runContinuation() { 293 // the carrier must be a platform thread 294 if (Thread.currentThread().isVirtual()) { 295 throw new WrongThreadException(); 296 } 297 298 // set state to RUNNING 299 int initialState = state(); 300 if (initialState == STARTED || initialState == UNPARKED 301 || initialState == UNBLOCKED || initialState == YIELDED) { 302 // newly started or continue after parking/blocking/Thread.yield 303 if (!compareAndSetState(initialState, RUNNING)) { 304 return; 305 } 306 // consume permit when continuing after parking or blocking. If continue 307 // after a timed-park or timed-wait then the timeout task is cancelled. 308 if (initialState == UNPARKED) { 309 cancelTimeoutTask(); 310 setParkPermit(false); 311 } else if (initialState == UNBLOCKED) { 312 cancelTimeoutTask(); 313 blockPermit = false; 314 } 315 } else { 316 // not runnable 317 return; 318 } 319 320 mount(); 321 try { 322 cont.run(); 323 } finally { 324 unmount(); 325 if (cont.isDone()) { 326 afterDone(); 327 } else { 328 afterYield(); 329 } 330 } 331 } 332 333 /** 334 * Cancel timeout task when continuing after timed-park or timed-wait. 335 * The timeout task may be executing, or may have already completed. 336 */ 337 private void cancelTimeoutTask() { 338 if (timeoutTask != null) { 339 timeoutTask.cancel(false); 340 timeoutTask = null; 341 } 342 } 343 344 /** 345 * Submits the runContinuation task to the scheduler. For the default scheduler, 346 * and calling it on a worker thread, the task will be pushed to the local queue, 347 * otherwise it will be pushed to an external submission queue. 348 * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown 349 * @throws RejectedExecutionException 350 */ 351 private void submitRunContinuation(boolean retryOnOOME) { 352 boolean done = false; 353 while (!done) { 354 try { 355 // Pin the continuation to prevent the virtual thread from unmounting 356 // when submitting a task. For the default scheduler this ensures that 357 // the carrier doesn't change when pushing a task. For other schedulers 358 // it avoids deadlock that could arise due to carriers and virtual 359 // threads contending for a lock. 360 if (currentThread().isVirtual()) { 361 Continuation.pin(); 362 try { 363 scheduler.execute(this, runContinuation); 364 } finally { 365 Continuation.unpin(); 366 } 367 } else { 368 scheduler.execute(this, runContinuation); 369 } 370 done = true; 371 } catch (RejectedExecutionException ree) { 372 submitFailed(ree); 373 throw ree; 374 } catch (OutOfMemoryError e) { 375 if (retryOnOOME) { 376 U.park(false, 100_000_000); // 100ms 377 } else { 378 throw e; 379 } 380 } 381 } 382 } 383 384 /** 385 * Submits the runContinuation task to the scheduler. For the default scheduler, 386 * and calling it on a worker thread, the task will be pushed to the local queue, 387 * otherwise it will be pushed to an external submission queue. 388 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 389 * @throws RejectedExecutionException 390 */ 391 private void submitRunContinuation() { 392 submitRunContinuation(true); 393 } 394 395 /** 396 * Lazy submit the runContinuation task if invoked on a carrier thread and its local 397 * queue is empty. If not empty, or invoked by another thread, then this method works 398 * like submitRunContinuation and just submits the task to the scheduler. 399 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 400 * @throws RejectedExecutionException 401 * @see ForkJoinPool#lazySubmit(ForkJoinTask) 402 */ 403 private void lazySubmitRunContinuation() { 404 if (scheduler == DEFAULT_SCHEDULER 405 && currentCarrierThread() instanceof CarrierThread ct 406 && ct.getQueuedTaskCount() == 0) { 407 try { 408 ct.getPool().lazySubmit(ForkJoinTask.adapt(runContinuation)); 409 } catch (RejectedExecutionException ree) { 410 submitFailed(ree); 411 throw ree; 412 } catch (OutOfMemoryError e) { 413 submitRunContinuation(); 414 } 415 } else { 416 submitRunContinuation(); 417 } 418 } 419 420 /** 421 * Submits the runContinuation task to the scheduler. For the default scheduler, and 422 * calling it a virtual thread that uses the default scheduler, the task will be 423 * pushed to an external submission queue. 424 * @throws RejectedExecutionException 425 */ 426 private void externalSubmitRunContinuation() { 427 if (scheduler == DEFAULT_SCHEDULER && currentCarrierThread() instanceof CarrierThread ct) { 428 try { 429 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation)); 430 } catch (RejectedExecutionException ree) { 431 submitFailed(ree); 432 throw ree; 433 } catch (OutOfMemoryError e) { 434 submitRunContinuation(); 435 } 436 } else { 437 submitRunContinuation(); 438 } 439 } 440 441 /** 442 * Submits the runContinuation task to the scheduler. For the default scheduler, and 443 * calling it a virtual thread that uses the default scheduler, the task will be 444 * pushed to an external submission queue. This method may throw OutOfMemoryError. 445 * @throws RejectedExecutionException 446 * @throws OutOfMemoryError 447 */ 448 private void externalSubmitRunContinuationOrThrow() { 449 if (scheduler == DEFAULT_SCHEDULER && currentCarrierThread() instanceof CarrierThread ct) { 450 try { 451 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation)); 452 } catch (RejectedExecutionException ree) { 453 submitFailed(ree); 454 throw ree; 455 } 456 } else { 457 submitRunContinuation(false); 458 } 459 } 460 461 /** 462 * If enabled, emits a JFR VirtualThreadSubmitFailedEvent. 463 */ 464 private void submitFailed(RejectedExecutionException ree) { 465 var event = new VirtualThreadSubmitFailedEvent(); 466 if (event.isEnabled()) { 467 event.javaThreadId = threadId(); 468 event.exceptionMessage = ree.getMessage(); 469 event.commit(); 470 } 471 } 472 473 /** 474 * Runs a task in the context of this virtual thread. 475 */ 476 private void run(Runnable task) { 477 assert Thread.currentThread() == this && state == RUNNING; 478 479 // emit JFR event if enabled 480 if (VirtualThreadStartEvent.isTurnedOn()) { 481 var event = new VirtualThreadStartEvent(); 482 event.javaThreadId = threadId(); 483 event.commit(); 484 } 485 486 Object bindings = Thread.scopedValueBindings(); 487 try { 488 runWith(bindings, task); 489 } catch (Throwable exc) { 490 dispatchUncaughtException(exc); 491 } finally { 492 // pop any remaining scopes from the stack, this may block 493 StackableScope.popAll(); 494 495 // emit JFR event if enabled 496 if (VirtualThreadEndEvent.isTurnedOn()) { 497 var event = new VirtualThreadEndEvent(); 498 event.javaThreadId = threadId(); 499 event.commit(); 500 } 501 } 502 } 503 504 /** 505 * Mounts this virtual thread onto the current platform thread. On 506 * return, the current thread is the virtual thread. 507 */ 508 @ChangesCurrentThread 509 @ReservedStackAccess 510 private void mount() { 511 // notify JVMTI before mount 512 notifyJvmtiMount(/*hide*/true); 513 514 // sets the carrier thread 515 Thread carrier = Thread.currentCarrierThread(); 516 setCarrierThread(carrier); 517 518 // sync up carrier thread interrupt status if needed 519 if (interrupted) { 520 carrier.setInterrupt(); 521 } else if (carrier.isInterrupted()) { 522 synchronized (interruptLock) { 523 // need to recheck interrupt status 524 if (!interrupted) { 525 carrier.clearInterrupt(); 526 } 527 } 528 } 529 530 // set Thread.currentThread() to return this virtual thread 531 carrier.setCurrentThread(this); 532 } 533 534 /** 535 * Unmounts this virtual thread from the carrier. On return, the 536 * current thread is the current platform thread. 537 */ 538 @ChangesCurrentThread 539 @ReservedStackAccess 540 private void unmount() { 541 assert !Thread.holdsLock(interruptLock); 542 543 // set Thread.currentThread() to return the platform thread 544 Thread carrier = this.carrierThread; 545 carrier.setCurrentThread(carrier); 546 547 // break connection to carrier thread, synchronized with interrupt 548 synchronized (interruptLock) { 549 setCarrierThread(null); 550 } 551 carrier.clearInterrupt(); 552 553 // notify JVMTI after unmount 554 notifyJvmtiUnmount(/*hide*/false); 555 } 556 557 /** 558 * Invokes Continuation.yield, notifying JVMTI (if enabled) to hide frames until 559 * the continuation continues. 560 */ 561 @Hidden 562 private boolean yieldContinuation() { 563 notifyJvmtiUnmount(/*hide*/true); 564 try { 565 return Continuation.yield(VTHREAD_SCOPE); 566 } finally { 567 notifyJvmtiMount(/*hide*/false); 568 } 569 } 570 571 /** 572 * Invoked in the context of the carrier thread after the Continuation yields when 573 * parking, blocking on monitor enter, Object.wait, or Thread.yield. 574 */ 575 private void afterYield() { 576 assert carrierThread == null; 577 578 // re-adjust parallelism if the virtual thread yielded when compensating 579 if (currentThread() instanceof CarrierThread ct) { 580 ct.endBlocking(); 581 } 582 583 int s = state(); 584 585 // LockSupport.park/parkNanos 586 if (s == PARKING || s == TIMED_PARKING) { 587 int newState; 588 if (s == PARKING) { 589 setState(newState = PARKED); 590 } else { 591 // schedule unpark 592 long timeout = this.timeout; 593 assert timeout > 0; 594 timeoutTask = schedule(this::parkTimeoutExpired, timeout, NANOSECONDS); 595 setState(newState = TIMED_PARKED); 596 } 597 598 // may have been unparked while parking 599 if (parkPermit && compareAndSetState(newState, UNPARKED)) { 600 // lazy submit if local queue is empty 601 lazySubmitRunContinuation(); 602 } 603 return; 604 } 605 606 // Thread.yield 607 if (s == YIELDING) { 608 setState(YIELDED); 609 610 // external submit if there are no tasks in the local task queue 611 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 612 externalSubmitRunContinuation(); 613 } else { 614 submitRunContinuation(); 615 } 616 return; 617 } 618 619 // blocking on monitorenter 620 if (s == BLOCKING) { 621 setState(BLOCKED); 622 623 // may have been unblocked while blocking 624 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) { 625 // lazy submit if local queue is empty 626 lazySubmitRunContinuation(); 627 } 628 return; 629 } 630 631 // Object.wait 632 if (s == WAITING || s == TIMED_WAITING) { 633 int newState; 634 boolean interruptable = interruptableWait; 635 if (s == WAITING) { 636 setState(newState = WAIT); 637 } else { 638 // For timed-wait, a timeout task is scheduled to execute. The timeout 639 // task will change the thread state to UNBLOCKED and submit the thread 640 // to the scheduler. A sequence number is used to ensure that the timeout 641 // task only unblocks the thread for this timed-wait. We synchronize with 642 // the timeout task to coordinate access to the sequence number and to 643 // ensure the timeout task doesn't execute until the thread has got to 644 // the TIMED_WAIT state. 645 long timeout = this.timeout; 646 assert timeout > 0; 647 synchronized (timedWaitLock()) { 648 byte seqNo = ++timedWaitSeqNo; 649 timeoutTask = schedule(() -> waitTimeoutExpired(seqNo), timeout, MILLISECONDS); 650 setState(newState = TIMED_WAIT); 651 } 652 } 653 654 // may have been notified while in transition to wait state 655 if (notified && compareAndSetState(newState, BLOCKED)) { 656 // may have even been unblocked already 657 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) { 658 submitRunContinuation(); 659 } 660 return; 661 } 662 663 // may have been interrupted while in transition to wait state 664 if (interruptable && interrupted && compareAndSetState(newState, UNBLOCKED)) { 665 submitRunContinuation(); 666 return; 667 } 668 return; 669 } 670 671 assert false; 672 } 673 674 /** 675 * Invoked after the continuation completes. 676 */ 677 private void afterDone() { 678 afterDone(true); 679 } 680 681 /** 682 * Invoked after the continuation completes (or start failed). Sets the thread 683 * state to TERMINATED and notifies anyone waiting for the thread to terminate. 684 * 685 * @param notifyContainer true if its container should be notified 686 */ 687 private void afterDone(boolean notifyContainer) { 688 assert carrierThread == null; 689 setState(TERMINATED); 690 691 // notify anyone waiting for this virtual thread to terminate 692 CountDownLatch termination = this.termination; 693 if (termination != null) { 694 assert termination.getCount() == 1; 695 termination.countDown(); 696 } 697 698 // notify container 699 if (notifyContainer) { 700 threadContainer().remove(this); 701 } 702 703 // clear references to thread locals 704 clearReferences(); 705 } 706 707 /** 708 * Schedules this {@code VirtualThread} to execute. 709 * 710 * @throws IllegalStateException if the container is shutdown or closed 711 * @throws IllegalThreadStateException if the thread has already been started 712 * @throws RejectedExecutionException if the scheduler cannot accept a task 713 */ 714 @Override 715 void start(ThreadContainer container) { 716 if (!compareAndSetState(NEW, STARTED)) { 717 throw new IllegalThreadStateException("Already started"); 718 } 719 720 // bind thread to container 721 assert threadContainer() == null; 722 setThreadContainer(container); 723 724 // start thread 725 boolean addedToContainer = false; 726 boolean started = false; 727 try { 728 container.add(this); // may throw 729 addedToContainer = true; 730 731 // scoped values may be inherited 732 inheritScopedValueBindings(container); 733 734 // submit task to run thread, using externalSubmit if possible 735 externalSubmitRunContinuationOrThrow(); 736 started = true; 737 } finally { 738 if (!started) { 739 afterDone(addedToContainer); 740 } 741 } 742 } 743 744 @Override 745 public void start() { 746 start(ThreadContainers.root()); 747 } 748 749 @Override 750 public void run() { 751 // do nothing 752 } 753 754 /** 755 * Parks until unparked or interrupted. If already unparked then the parking 756 * permit is consumed and this method completes immediately (meaning it doesn't 757 * yield). It also completes immediately if the interrupt status is set. 758 */ 759 @Override 760 void park() { 761 assert Thread.currentThread() == this; 762 763 // complete immediately if parking permit available or interrupted 764 if (getAndSetParkPermit(false) || interrupted) 765 return; 766 767 // park the thread 768 boolean yielded = false; 769 setState(PARKING); 770 try { 771 yielded = yieldContinuation(); 772 } catch (OutOfMemoryError e) { 773 // park on carrier 774 } finally { 775 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 776 if (!yielded) { 777 assert state() == PARKING; 778 setState(RUNNING); 779 } 780 } 781 782 // park on the carrier thread when pinned 783 if (!yielded) { 784 parkOnCarrierThread(false, 0); 785 } 786 } 787 788 /** 789 * Parks up to the given waiting time or until unparked or interrupted. 790 * If already unparked then the parking permit is consumed and this method 791 * completes immediately (meaning it doesn't yield). It also completes immediately 792 * if the interrupt status is set or the waiting time is {@code <= 0}. 793 * 794 * @param nanos the maximum number of nanoseconds to wait. 795 */ 796 @Override 797 void parkNanos(long nanos) { 798 assert Thread.currentThread() == this; 799 800 // complete immediately if parking permit available or interrupted 801 if (getAndSetParkPermit(false) || interrupted) 802 return; 803 804 // park the thread for the waiting time 805 if (nanos > 0) { 806 long startTime = System.nanoTime(); 807 808 // park the thread, afterYield will schedule the thread to unpark 809 boolean yielded = false; 810 timeout = nanos; 811 setState(TIMED_PARKING); 812 try { 813 yielded = yieldContinuation(); 814 } catch (OutOfMemoryError e) { 815 // park on carrier 816 } finally { 817 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 818 if (!yielded) { 819 assert state() == TIMED_PARKING; 820 setState(RUNNING); 821 } 822 } 823 824 // park on carrier thread for remaining time when pinned (or OOME) 825 if (!yielded) { 826 long remainingNanos = nanos - (System.nanoTime() - startTime); 827 parkOnCarrierThread(true, remainingNanos); 828 } 829 } 830 } 831 832 /** 833 * Parks the current carrier thread up to the given waiting time or until 834 * unparked or interrupted. If the virtual thread is interrupted then the 835 * interrupt status will be propagated to the carrier thread. 836 * @param timed true for a timed park, false for untimed 837 * @param nanos the waiting time in nanoseconds 838 */ 839 private void parkOnCarrierThread(boolean timed, long nanos) { 840 assert state() == RUNNING; 841 842 setState(timed ? TIMED_PINNED : PINNED); 843 try { 844 if (!parkPermit) { 845 if (!timed) { 846 U.park(false, 0); 847 } else if (nanos > 0) { 848 U.park(false, nanos); 849 } 850 } 851 } finally { 852 setState(RUNNING); 853 } 854 855 // consume parking permit 856 setParkPermit(false); 857 858 // JFR jdk.VirtualThreadPinned event 859 postPinnedEvent("LockSupport.park"); 860 } 861 862 /** 863 * Call into VM when pinned to record a JFR jdk.VirtualThreadPinned event. 864 * Recording the event in the VM avoids having JFR event recorded in Java 865 * with the same name, but different ID, to events recorded by the VM. 866 */ 867 @Hidden 868 private static native void postPinnedEvent(String op); 869 870 /** 871 * Re-enables this virtual thread for scheduling. If this virtual thread is parked 872 * then its task is scheduled to continue, otherwise its next call to {@code park} or 873 * {@linkplain #parkNanos(long) parkNanos} is guaranteed not to block. 874 * @throws RejectedExecutionException if the scheduler cannot accept a task 875 */ 876 @Override 877 void unpark() { 878 if (!getAndSetParkPermit(true) && currentThread() != this) { 879 int s = state(); 880 881 // unparked while parked 882 if ((s == PARKED || s == TIMED_PARKED) && compareAndSetState(s, UNPARKED)) { 883 submitRunContinuation(); 884 return; 885 } 886 887 // unparked while parked when pinned 888 if (s == PINNED || s == TIMED_PINNED) { 889 // unpark carrier thread when pinned 890 disableSuspendAndPreempt(); 891 try { 892 synchronized (carrierThreadAccessLock()) { 893 Thread carrier = carrierThread; 894 if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) { 895 U.unpark(carrier); 896 } 897 } 898 } finally { 899 enableSuspendAndPreempt(); 900 } 901 return; 902 } 903 } 904 } 905 906 /** 907 * Invoked by unblocker thread to unblock this virtual thread. 908 */ 909 private void unblock() { 910 assert !Thread.currentThread().isVirtual(); 911 blockPermit = true; 912 if (state() == BLOCKED && compareAndSetState(BLOCKED, UNBLOCKED)) { 913 submitRunContinuation(); 914 } 915 } 916 917 /** 918 * Invoked by FJP worker thread or STPE thread when park timeout expires. 919 */ 920 private void parkTimeoutExpired() { 921 assert !VirtualThread.currentThread().isVirtual(); 922 if (!getAndSetParkPermit(true) 923 && (state() == TIMED_PARKED) 924 && compareAndSetState(TIMED_PARKED, UNPARKED)) { 925 lazySubmitRunContinuation(); 926 } 927 } 928 929 /** 930 * Invoked by FJP worker thread or STPE thread when wait timeout expires. 931 * If the virtual thread is in timed-wait then this method will unblock the thread 932 * and submit its task so that it continues and attempts to reenter the monitor. 933 * This method does nothing if the thread has been woken by notify or interrupt. 934 */ 935 private void waitTimeoutExpired(byte seqNo) { 936 assert !Thread.currentThread().isVirtual(); 937 for (;;) { 938 boolean unblocked = false; 939 synchronized (timedWaitLock()) { 940 if (seqNo != timedWaitSeqNo) { 941 // this timeout task is for a past timed-wait 942 return; 943 } 944 int s = state(); 945 if (s == TIMED_WAIT) { 946 unblocked = compareAndSetState(TIMED_WAIT, UNBLOCKED); 947 } else if (s != (TIMED_WAIT | SUSPENDED)) { 948 // notified or interrupted, no longer waiting 949 return; 950 } 951 } 952 if (unblocked) { 953 lazySubmitRunContinuation(); 954 return; 955 } 956 // need to retry when thread is suspended in time-wait 957 Thread.yield(); 958 } 959 } 960 961 /** 962 * Attempts to yield the current virtual thread (Thread.yield). 963 */ 964 void tryYield() { 965 assert Thread.currentThread() == this; 966 setState(YIELDING); 967 boolean yielded = false; 968 try { 969 yielded = yieldContinuation(); // may throw 970 } finally { 971 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 972 if (!yielded) { 973 assert state() == YIELDING; 974 setState(RUNNING); 975 } 976 } 977 } 978 979 /** 980 * Sleep the current thread for the given sleep time (in nanoseconds). If 981 * nanos is 0 then the thread will attempt to yield. 982 * 983 * @implNote This implementation parks the thread for the given sleeping time 984 * and will therefore be observed in PARKED state during the sleep. Parking 985 * will consume the parking permit so this method makes available the parking 986 * permit after the sleep. This may be observed as a spurious, but benign, 987 * wakeup when the thread subsequently attempts to park. 988 * 989 * @param nanos the maximum number of nanoseconds to sleep 990 * @throws InterruptedException if interrupted while sleeping 991 */ 992 void sleepNanos(long nanos) throws InterruptedException { 993 assert Thread.currentThread() == this && nanos >= 0; 994 if (getAndClearInterrupt()) 995 throw new InterruptedException(); 996 if (nanos == 0) { 997 tryYield(); 998 } else { 999 // park for the sleep time 1000 try { 1001 long remainingNanos = nanos; 1002 long startNanos = System.nanoTime(); 1003 while (remainingNanos > 0) { 1004 parkNanos(remainingNanos); 1005 if (getAndClearInterrupt()) { 1006 throw new InterruptedException(); 1007 } 1008 remainingNanos = nanos - (System.nanoTime() - startNanos); 1009 } 1010 } finally { 1011 // may have been unparked while sleeping 1012 setParkPermit(true); 1013 } 1014 } 1015 } 1016 1017 /** 1018 * Waits up to {@code nanos} nanoseconds for this virtual thread to terminate. 1019 * A timeout of {@code 0} means to wait forever. 1020 * 1021 * @throws InterruptedException if interrupted while waiting 1022 * @return true if the thread has terminated 1023 */ 1024 boolean joinNanos(long nanos) throws InterruptedException { 1025 if (state() == TERMINATED) 1026 return true; 1027 1028 // ensure termination object exists, then re-check state 1029 CountDownLatch termination = getTermination(); 1030 if (state() == TERMINATED) 1031 return true; 1032 1033 // wait for virtual thread to terminate 1034 if (nanos == 0) { 1035 termination.await(); 1036 } else { 1037 boolean terminated = termination.await(nanos, NANOSECONDS); 1038 if (!terminated) { 1039 // waiting time elapsed 1040 return false; 1041 } 1042 } 1043 assert state() == TERMINATED; 1044 return true; 1045 } 1046 1047 @Override 1048 void blockedOn(Interruptible b) { 1049 disableSuspendAndPreempt(); 1050 try { 1051 super.blockedOn(b); 1052 } finally { 1053 enableSuspendAndPreempt(); 1054 } 1055 } 1056 1057 @Override 1058 public void interrupt() { 1059 if (Thread.currentThread() != this) { 1060 // if current thread is a virtual thread then prevent it from being 1061 // suspended or unmounted when entering or holding interruptLock 1062 Interruptible blocker; 1063 disableSuspendAndPreempt(); 1064 try { 1065 synchronized (interruptLock) { 1066 interrupted = true; 1067 blocker = nioBlocker(); 1068 if (blocker != null) { 1069 blocker.interrupt(this); 1070 } 1071 1072 // interrupt carrier thread if mounted 1073 Thread carrier = carrierThread; 1074 if (carrier != null) carrier.setInterrupt(); 1075 } 1076 } finally { 1077 enableSuspendAndPreempt(); 1078 } 1079 1080 // notify blocker after releasing interruptLock 1081 if (blocker != null) { 1082 blocker.postInterrupt(); 1083 } 1084 1085 // make available parking permit, unpark thread if parked 1086 unpark(); 1087 1088 // if thread is waiting in Object.wait then schedule to try to reenter 1089 int s = state(); 1090 if ((s == WAIT || s == TIMED_WAIT) && compareAndSetState(s, UNBLOCKED)) { 1091 submitRunContinuation(); 1092 } 1093 1094 } else { 1095 interrupted = true; 1096 carrierThread.setInterrupt(); 1097 setParkPermit(true); 1098 } 1099 } 1100 1101 @Override 1102 public boolean isInterrupted() { 1103 return interrupted; 1104 } 1105 1106 @Override 1107 boolean getAndClearInterrupt() { 1108 assert Thread.currentThread() == this; 1109 boolean oldValue = interrupted; 1110 if (oldValue) { 1111 disableSuspendAndPreempt(); 1112 try { 1113 synchronized (interruptLock) { 1114 interrupted = false; 1115 carrierThread.clearInterrupt(); 1116 } 1117 } finally { 1118 enableSuspendAndPreempt(); 1119 } 1120 } 1121 return oldValue; 1122 } 1123 1124 @Override 1125 Thread.State threadState() { 1126 int s = state(); 1127 switch (s & ~SUSPENDED) { 1128 case NEW: 1129 return Thread.State.NEW; 1130 case STARTED: 1131 // return NEW if thread container not yet set 1132 if (threadContainer() == null) { 1133 return Thread.State.NEW; 1134 } else { 1135 return Thread.State.RUNNABLE; 1136 } 1137 case UNPARKED: 1138 case UNBLOCKED: 1139 case YIELDED: 1140 // runnable, not mounted 1141 return Thread.State.RUNNABLE; 1142 case RUNNING: 1143 // if mounted then return state of carrier thread 1144 if (Thread.currentThread() != this) { 1145 disableSuspendAndPreempt(); 1146 try { 1147 synchronized (carrierThreadAccessLock()) { 1148 Thread carrierThread = this.carrierThread; 1149 if (carrierThread != null) { 1150 return carrierThread.threadState(); 1151 } 1152 } 1153 } finally { 1154 enableSuspendAndPreempt(); 1155 } 1156 } 1157 // runnable, mounted 1158 return Thread.State.RUNNABLE; 1159 case PARKING: 1160 case TIMED_PARKING: 1161 case WAITING: 1162 case TIMED_WAITING: 1163 case YIELDING: 1164 // runnable, in transition 1165 return Thread.State.RUNNABLE; 1166 case PARKED: 1167 case PINNED: 1168 case WAIT: 1169 return Thread.State.WAITING; 1170 case TIMED_PARKED: 1171 case TIMED_PINNED: 1172 case TIMED_WAIT: 1173 return Thread.State.TIMED_WAITING; 1174 case BLOCKING: 1175 case BLOCKED: 1176 return Thread.State.BLOCKED; 1177 case TERMINATED: 1178 return Thread.State.TERMINATED; 1179 default: 1180 throw new InternalError(); 1181 } 1182 } 1183 1184 @Override 1185 boolean alive() { 1186 int s = state; 1187 return (s != NEW && s != TERMINATED); 1188 } 1189 1190 @Override 1191 boolean isTerminated() { 1192 return (state == TERMINATED); 1193 } 1194 1195 @Override 1196 StackTraceElement[] asyncGetStackTrace() { 1197 StackTraceElement[] stackTrace; 1198 do { 1199 stackTrace = (carrierThread != null) 1200 ? super.asyncGetStackTrace() // mounted 1201 : supplyIfUnmounted(cont::getStackTrace, // unmounted 1202 () -> new StackTraceElement[0]); 1203 if (stackTrace == null) { 1204 Thread.yield(); 1205 } 1206 } while (stackTrace == null); 1207 return stackTrace; 1208 } 1209 1210 /** 1211 * Invokes a supplier to produce a non-null result if this virtual thread is not mounted. 1212 * @param supplier1 invoked if this virtual thread is alive and unmounted 1213 * @param supplier2 invoked if this virtual thread is not alive 1214 * @return the result; {@code null} if this virtual thread is mounted or in transition 1215 */ 1216 <T> T supplyIfUnmounted(Supplier<T> supplier1, Supplier<T> supplier2) { 1217 int initialState = state() & ~SUSPENDED; 1218 switch (initialState) { 1219 case NEW, STARTED, TERMINATED -> { 1220 return supplier2.get(); // terminated or not started 1221 } 1222 case RUNNING, PINNED, TIMED_PINNED -> { 1223 return null; // mounted 1224 } 1225 case PARKED, TIMED_PARKED, BLOCKED, WAIT, TIMED_WAIT -> { 1226 // unmounted, not runnable 1227 } 1228 case UNPARKED, UNBLOCKED, YIELDED -> { 1229 // unmounted, runnable 1230 } 1231 case PARKING, TIMED_PARKING, BLOCKING, YIELDING, WAITING, TIMED_WAITING -> { 1232 return null; // in transition 1233 } 1234 default -> throw new InternalError("" + initialState); 1235 } 1236 1237 // thread is unmounted, prevent it from continuing 1238 int suspendedState = initialState | SUSPENDED; 1239 if (!compareAndSetState(initialState, suspendedState)) { 1240 return null; 1241 } 1242 1243 try { 1244 return supplier1.get(); 1245 } finally { 1246 assert state == suspendedState; 1247 setState(initialState); 1248 1249 boolean resubmit = switch (initialState) { 1250 case UNPARKED, UNBLOCKED, YIELDED -> { 1251 // resubmit as task may have run while suspended 1252 yield true; 1253 } 1254 case PARKED, TIMED_PARKED -> { 1255 // resubmit if unparked while suspended 1256 yield parkPermit && compareAndSetState(initialState, UNPARKED); 1257 } 1258 case BLOCKED -> { 1259 // resubmit if unblocked while suspended 1260 yield blockPermit && compareAndSetState(BLOCKED, UNBLOCKED); 1261 } 1262 case WAIT, TIMED_WAIT -> { 1263 // resubmit if notified or interrupted while waiting (Object.wait) 1264 // waitTimeoutExpired will retry if the timed expired when suspended 1265 yield (notified || interrupted) && compareAndSetState(initialState, UNBLOCKED); 1266 } 1267 default -> throw new InternalError(); 1268 }; 1269 if (resubmit) { 1270 submitRunContinuation(); 1271 } 1272 } 1273 1274 } 1275 1276 @Override 1277 public String toString() { 1278 StringBuilder sb = new StringBuilder("VirtualThread[#"); 1279 sb.append(threadId()); 1280 String name = getName(); 1281 if (!name.isEmpty()) { 1282 sb.append(","); 1283 sb.append(name); 1284 } 1285 sb.append("]/"); 1286 1287 // add the carrier state and thread name when mounted 1288 boolean mounted; 1289 if (Thread.currentThread() == this) { 1290 mounted = appendCarrierInfo(sb); 1291 } else { 1292 disableSuspendAndPreempt(); 1293 try { 1294 synchronized (carrierThreadAccessLock()) { 1295 mounted = appendCarrierInfo(sb); 1296 } 1297 } finally { 1298 enableSuspendAndPreempt(); 1299 } 1300 } 1301 1302 // add virtual thread state when not mounted 1303 if (!mounted) { 1304 String stateAsString = threadState().toString(); 1305 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1306 } 1307 1308 return sb.toString(); 1309 } 1310 1311 /** 1312 * Appends the carrier state and thread name to the string buffer if mounted. 1313 * @return true if mounted, false if not mounted 1314 */ 1315 private boolean appendCarrierInfo(StringBuilder sb) { 1316 assert Thread.currentThread() == this || Thread.holdsLock(carrierThreadAccessLock()); 1317 Thread carrier = carrierThread; 1318 if (carrier != null) { 1319 String stateAsString = carrier.threadState().toString(); 1320 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1321 sb.append('@'); 1322 sb.append(carrier.getName()); 1323 return true; 1324 } else { 1325 return false; 1326 } 1327 } 1328 1329 @Override 1330 public int hashCode() { 1331 return (int) threadId(); 1332 } 1333 1334 @Override 1335 public boolean equals(Object obj) { 1336 return obj == this; 1337 } 1338 1339 /** 1340 * Returns the termination object, creating it if needed. 1341 */ 1342 private CountDownLatch getTermination() { 1343 CountDownLatch termination = this.termination; 1344 if (termination == null) { 1345 termination = new CountDownLatch(1); 1346 if (!U.compareAndSetReference(this, TERMINATION, null, termination)) { 1347 termination = this.termination; 1348 } 1349 } 1350 return termination; 1351 } 1352 1353 /** 1354 * Returns the lock object to synchronize on when accessing carrierThread. 1355 * The lock prevents carrierThread from being reset to null during unmount. 1356 */ 1357 private Object carrierThreadAccessLock() { 1358 // return interruptLock as unmount has to coordinate with interrupt 1359 return interruptLock; 1360 } 1361 1362 /** 1363 * Returns a lock object for coordinating timed-wait setup and timeout handling. 1364 */ 1365 private Object timedWaitLock() { 1366 // use this object for now to avoid the overhead of introducing another lock 1367 return runContinuation; 1368 } 1369 1370 /** 1371 * Disallow the current thread be suspended or preempted. 1372 */ 1373 private void disableSuspendAndPreempt() { 1374 notifyJvmtiDisableSuspend(true); 1375 Continuation.pin(); 1376 } 1377 1378 /** 1379 * Allow the current thread be suspended or preempted. 1380 */ 1381 private void enableSuspendAndPreempt() { 1382 Continuation.unpin(); 1383 notifyJvmtiDisableSuspend(false); 1384 } 1385 1386 // -- wrappers for get/set of state, parking permit, and carrier thread -- 1387 1388 private int state() { 1389 return state; // volatile read 1390 } 1391 1392 private void setState(int newValue) { 1393 state = newValue; // volatile write 1394 } 1395 1396 private boolean compareAndSetState(int expectedValue, int newValue) { 1397 return U.compareAndSetInt(this, STATE, expectedValue, newValue); 1398 } 1399 1400 private boolean compareAndSetOnWaitingList(boolean expectedValue, boolean newValue) { 1401 return U.compareAndSetBoolean(this, ON_WAITING_LIST, expectedValue, newValue); 1402 } 1403 1404 private void setParkPermit(boolean newValue) { 1405 if (parkPermit != newValue) { 1406 parkPermit = newValue; 1407 } 1408 } 1409 1410 private boolean getAndSetParkPermit(boolean newValue) { 1411 if (parkPermit != newValue) { 1412 return U.getAndSetBoolean(this, PARK_PERMIT, newValue); 1413 } else { 1414 return newValue; 1415 } 1416 } 1417 1418 private void setCarrierThread(Thread carrier) { 1419 // U.putReferenceRelease(this, CARRIER_THREAD, carrier); 1420 this.carrierThread = carrier; 1421 } 1422 1423 // -- JVM TI support -- 1424 1425 @IntrinsicCandidate 1426 @JvmtiMountTransition 1427 private native void notifyJvmtiStart(); 1428 1429 @IntrinsicCandidate 1430 @JvmtiMountTransition 1431 private native void notifyJvmtiEnd(); 1432 1433 @IntrinsicCandidate 1434 @JvmtiMountTransition 1435 private native void notifyJvmtiMount(boolean hide); 1436 1437 @IntrinsicCandidate 1438 @JvmtiMountTransition 1439 private native void notifyJvmtiUnmount(boolean hide); 1440 1441 @IntrinsicCandidate 1442 private static native void notifyJvmtiDisableSuspend(boolean enter); 1443 1444 private static native void registerNatives(); 1445 static { 1446 registerNatives(); 1447 1448 // ensure VTHREAD_GROUP is created, may be accessed by JVMTI 1449 var group = Thread.virtualThreadGroup(); 1450 } 1451 1452 /** 1453 * Loads a VirtualThreadScheduler with the given class name to use at the 1454 * default scheduler. The class is public in an exported package, has a public 1455 * one-arg or no-arg constructor, and is visible to the system class loader. 1456 */ 1457 private static VirtualThreadScheduler createCustomDefaultScheduler(String cn) { 1458 try { 1459 Class<?> clazz = Class.forName(cn, true, ClassLoader.getSystemClassLoader()); 1460 VirtualThreadScheduler scheduler; 1461 try { 1462 // 1-arg constructor 1463 Constructor<?> ctor = clazz.getConstructor(VirtualThreadScheduler.class); 1464 var builtin = createDefaultForkJoinPoolScheduler(); 1465 scheduler = (VirtualThreadScheduler) ctor.newInstance(builtin.externalView()); 1466 } catch (NoSuchMethodException e) { 1467 // 0-arg constructor 1468 Constructor<?> ctor = clazz.getConstructor(); 1469 scheduler = (VirtualThreadScheduler) ctor.newInstance(); 1470 } 1471 System.err.println(""" 1472 WARNING: Using custom default scheduler, this is an experimental feature!"""); 1473 return scheduler; 1474 } catch (Exception ex) { 1475 throw new Error(ex); 1476 } 1477 } 1478 1479 /** 1480 * Creates the built-in default ForkJoinPool scheduler. 1481 */ 1482 private static BuiltinDefaultScheduler createDefaultForkJoinPoolScheduler() { 1483 int parallelism, maxPoolSize, minRunnable; 1484 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism"); 1485 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize"); 1486 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable"); 1487 if (parallelismValue != null) { 1488 parallelism = Integer.parseInt(parallelismValue); 1489 } else { 1490 parallelism = Runtime.getRuntime().availableProcessors(); 1491 } 1492 if (maxPoolSizeValue != null) { 1493 maxPoolSize = Integer.parseInt(maxPoolSizeValue); 1494 parallelism = Integer.min(parallelism, maxPoolSize); 1495 } else { 1496 maxPoolSize = Integer.max(parallelism, 256); 1497 } 1498 if (minRunnableValue != null) { 1499 minRunnable = Integer.parseInt(minRunnableValue); 1500 } else { 1501 minRunnable = Integer.max(parallelism / 2, 1); 1502 } 1503 return new BuiltinDefaultScheduler(parallelism, maxPoolSize, minRunnable); 1504 } 1505 1506 /** 1507 * The built-in default ForkJoinPool scheduler. 1508 */ 1509 private static class BuiltinDefaultScheduler 1510 extends ForkJoinPool implements VirtualThreadScheduler { 1511 1512 private static final StableValue<VirtualThreadScheduler> VIEW = StableValue.of(); 1513 1514 BuiltinDefaultScheduler(int parallelism, int maxPoolSize, int minRunnable) { 1515 ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool); 1516 Thread.UncaughtExceptionHandler handler = (t, e) -> { }; 1517 boolean asyncMode = true; // FIFO 1518 super(parallelism, factory, handler, asyncMode, 1519 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS); 1520 } 1521 1522 @Override 1523 public void execute(Thread vthread, Runnable task) { 1524 execute(ForkJoinTask.adapt(task)); 1525 } 1526 1527 /** 1528 * Wraps the scheduler to avoid leaking a direct reference. 1529 */ 1530 VirtualThreadScheduler externalView() { 1531 VirtualThreadScheduler builtin = this; 1532 return VIEW.orElseSet(() -> { 1533 return new VirtualThreadScheduler() { 1534 @Override 1535 public void execute(Thread thread, Runnable task) { 1536 Objects.requireNonNull(thread); 1537 if (thread instanceof VirtualThread vthread) { 1538 VirtualThreadScheduler scheduler = vthread.scheduler; 1539 if (scheduler == this || scheduler == DEFAULT_SCHEDULER) { 1540 builtin.execute(thread, task); 1541 } else { 1542 throw new IllegalArgumentException(); 1543 } 1544 } else { 1545 throw new UnsupportedOperationException(); 1546 } 1547 } 1548 }; 1549 }); 1550 } 1551 } 1552 1553 /** 1554 * Schedule a runnable task to run after a delay. 1555 */ 1556 private Future<?> schedule(Runnable command, long delay, TimeUnit unit) { 1557 if (scheduler instanceof ForkJoinPool pool) { 1558 return pool.schedule(command, delay, unit); 1559 } else { 1560 return DelayedTaskSchedulers.schedule(command, delay, unit); 1561 } 1562 } 1563 1564 /** 1565 * Supports scheduling a runnable task to run after a delay. It uses a number 1566 * of ScheduledThreadPoolExecutor instances to reduce contention on the delayed 1567 * work queue used. This class is used when using a custom scheduler. 1568 */ 1569 private static class DelayedTaskSchedulers { 1570 private static final ScheduledExecutorService[] INSTANCE = createDelayedTaskSchedulers(); 1571 1572 static Future<?> schedule(Runnable command, long delay, TimeUnit unit) { 1573 long tid = Thread.currentThread().threadId(); 1574 int index = (int) tid & (INSTANCE.length - 1); 1575 return INSTANCE[index].schedule(command, delay, unit); 1576 } 1577 1578 private static ScheduledExecutorService[] createDelayedTaskSchedulers() { 1579 String propName = "jdk.virtualThreadScheduler.timerQueues"; 1580 String propValue = System.getProperty(propName); 1581 int queueCount; 1582 if (propValue != null) { 1583 queueCount = Integer.parseInt(propValue); 1584 if (queueCount != Integer.highestOneBit(queueCount)) { 1585 throw new RuntimeException("Value of " + propName + " must be power of 2"); 1586 } 1587 } else { 1588 int ncpus = Runtime.getRuntime().availableProcessors(); 1589 queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1); 1590 } 1591 var schedulers = new ScheduledExecutorService[queueCount]; 1592 for (int i = 0; i < queueCount; i++) { 1593 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) 1594 Executors.newScheduledThreadPool(1, task -> { 1595 Thread t = InnocuousThread.newThread("VirtualThread-unparker", task); 1596 t.setDaemon(true); 1597 return t; 1598 }); 1599 stpe.setRemoveOnCancelPolicy(true); 1600 schedulers[i] = stpe; 1601 } 1602 return schedulers; 1603 } 1604 } 1605 1606 /** 1607 * Schedule virtual threads that are ready to be scheduled after they blocked on 1608 * monitor enter. 1609 */ 1610 private static void unblockVirtualThreads() { 1611 while (true) { 1612 VirtualThread vthread = takeVirtualThreadListToUnblock(); 1613 while (vthread != null) { 1614 assert vthread.onWaitingList; 1615 VirtualThread nextThread = vthread.next; 1616 1617 // remove from list and unblock 1618 vthread.next = null; 1619 boolean changed = vthread.compareAndSetOnWaitingList(true, false); 1620 assert changed; 1621 vthread.unblock(); 1622 1623 vthread = nextThread; 1624 } 1625 } 1626 } 1627 1628 /** 1629 * Retrieves the list of virtual threads that are waiting to be unblocked, waiting 1630 * if necessary until a list of one or more threads becomes available. 1631 */ 1632 private static native VirtualThread takeVirtualThreadListToUnblock(); 1633 1634 static { 1635 var unblocker = InnocuousThread.newThread("VirtualThread-unblocker", 1636 VirtualThread::unblockVirtualThreads); 1637 unblocker.setDaemon(true); 1638 unblocker.start(); 1639 } 1640 }