1 /* 2 * Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.lang; 26 27 import java.util.Locale; 28 import java.util.Objects; 29 import java.util.concurrent.CountDownLatch; 30 import java.util.concurrent.Executor; 31 import java.util.concurrent.Executors; 32 import java.util.concurrent.ForkJoinPool; 33 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory; 34 import java.util.concurrent.ForkJoinTask; 35 import java.util.concurrent.Future; 36 import java.util.concurrent.RejectedExecutionException; 37 import java.util.concurrent.ScheduledExecutorService; 38 import java.util.concurrent.ScheduledThreadPoolExecutor; 39 import java.util.concurrent.TimeUnit; 40 import jdk.internal.event.VirtualThreadEndEvent; 41 import jdk.internal.event.VirtualThreadStartEvent; 42 import jdk.internal.event.VirtualThreadSubmitFailedEvent; 43 import jdk.internal.misc.CarrierThread; 44 import jdk.internal.misc.InnocuousThread; 45 import jdk.internal.misc.Unsafe; 46 import jdk.internal.vm.Continuation; 47 import jdk.internal.vm.ContinuationScope; 48 import jdk.internal.vm.StackableScope; 49 import jdk.internal.vm.ThreadContainer; 50 import jdk.internal.vm.ThreadContainers; 51 import jdk.internal.vm.annotation.ChangesCurrentThread; 52 import jdk.internal.vm.annotation.Hidden; 53 import jdk.internal.vm.annotation.IntrinsicCandidate; 54 import jdk.internal.vm.annotation.JvmtiHideEvents; 55 import jdk.internal.vm.annotation.JvmtiMountTransition; 56 import jdk.internal.vm.annotation.ReservedStackAccess; 57 import sun.nio.ch.Interruptible; 58 import static java.util.concurrent.TimeUnit.*; 59 60 /** 61 * A thread that is scheduled by the Java virtual machine rather than the operating system. 62 */ 63 final class VirtualThread extends BaseVirtualThread { 64 private static final Unsafe U = Unsafe.getUnsafe(); 65 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads"); 66 private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler(); 67 68 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state"); 69 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit"); 70 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread"); 71 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination"); 72 private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList"); 73 74 // scheduler and continuation 75 private final Executor scheduler; 76 private final Continuation cont; 77 private final Runnable runContinuation; 78 79 // virtual thread state, accessed by VM 80 private volatile int state; 81 82 /* 83 * Virtual thread state transitions: 84 * 85 * NEW -> STARTED // Thread.start, schedule to run 86 * STARTED -> TERMINATED // failed to start 87 * STARTED -> RUNNING // first run 88 * RUNNING -> TERMINATED // done 89 * 90 * RUNNING -> PARKING // Thread parking with LockSupport.park 91 * PARKING -> PARKED // cont.yield successful, parked indefinitely 92 * PARKING -> PINNED // cont.yield failed, parked indefinitely on carrier 93 * PARKED -> UNPARKED // unparked, may be scheduled to continue 94 * PINNED -> RUNNING // unparked, continue execution on same carrier 95 * UNPARKED -> RUNNING // continue execution after park 96 * 97 * RUNNING -> TIMED_PARKING // Thread parking with LockSupport.parkNanos 98 * TIMED_PARKING -> TIMED_PARKED // cont.yield successful, timed-parked 99 * TIMED_PARKING -> TIMED_PINNED // cont.yield failed, timed-parked on carrier 100 * TIMED_PARKED -> UNPARKED // unparked, may be scheduled to continue 101 * TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier 102 * 103 * RUNNING -> BLOCKING // blocking on monitor enter 104 * BLOCKING -> BLOCKED // blocked on monitor enter 105 * BLOCKED -> UNBLOCKED // unblocked, may be scheduled to continue 106 * UNBLOCKED -> RUNNING // continue execution after blocked on monitor enter 107 * 108 * RUNNING -> WAITING // transitional state during wait on monitor 109 * WAITING -> WAIT // waiting on monitor 110 * WAIT -> BLOCKED // notified, waiting to be unblocked by monitor owner 111 * WAIT -> UNBLOCKED // timed-out/interrupted 112 * 113 * RUNNING -> TIMED_WAITING // transition state during timed-waiting on monitor 114 * TIMED_WAITING -> TIMED_WAIT // timed-waiting on monitor 115 * TIMED_WAIT -> BLOCKED // notified, waiting to be unblocked by monitor owner 116 * TIMED_WAIT -> UNBLOCKED // timed-out/interrupted 117 * 118 * RUNNING -> YIELDING // Thread.yield 119 * YIELDING -> YIELDED // cont.yield successful, may be scheduled to continue 120 * YIELDING -> RUNNING // cont.yield failed 121 * YIELDED -> RUNNING // continue execution after Thread.yield 122 */ 123 private static final int NEW = 0; 124 private static final int STARTED = 1; 125 private static final int RUNNING = 2; // runnable-mounted 126 127 // untimed and timed parking 128 private static final int PARKING = 3; 129 private static final int PARKED = 4; // unmounted 130 private static final int PINNED = 5; // mounted 131 private static final int TIMED_PARKING = 6; 132 private static final int TIMED_PARKED = 7; // unmounted 133 private static final int TIMED_PINNED = 8; // mounted 134 private static final int UNPARKED = 9; // unmounted but runnable 135 136 // Thread.yield 137 private static final int YIELDING = 10; 138 private static final int YIELDED = 11; // unmounted but runnable 139 140 // monitor enter 141 private static final int BLOCKING = 12; 142 private static final int BLOCKED = 13; // unmounted 143 private static final int UNBLOCKED = 14; // unmounted but runnable 144 145 // monitor wait/timed-wait 146 private static final int WAITING = 15; 147 private static final int WAIT = 16; // waiting in Object.wait 148 private static final int TIMED_WAITING = 17; 149 private static final int TIMED_WAIT = 18; // waiting in timed-Object.wait 150 151 private static final int TERMINATED = 99; // final state 152 153 // can be suspended from scheduling when unmounted 154 private static final int SUSPENDED = 1 << 8; 155 156 // parking permit made available by LockSupport.unpark 157 private volatile boolean parkPermit; 158 159 // blocking permit made available by unblocker thread when another thread exits monitor 160 private volatile boolean blockPermit; 161 162 // true when on the list of virtual threads waiting to be unblocked 163 private volatile boolean onWaitingList; 164 165 // next virtual thread on the list of virtual threads waiting to be unblocked 166 private volatile VirtualThread next; 167 168 // notified by Object.notify/notifyAll while waiting in Object.wait 169 private volatile boolean notified; 170 171 // timed-wait support 172 private byte timedWaitSeqNo; 173 174 // timeout for timed-park and timed-wait, only accessed on current/carrier thread 175 private long timeout; 176 177 // timer task for timed-park and timed-wait, only accessed on current/carrier thread 178 private Future<?> timeoutTask; 179 180 // carrier thread when mounted, accessed by VM 181 private volatile Thread carrierThread; 182 183 // termination object when joining, created lazily if needed 184 private volatile CountDownLatch termination; 185 186 /** 187 * Returns the default scheduler. 188 */ 189 static Executor defaultScheduler() { 190 return DEFAULT_SCHEDULER; 191 } 192 193 /** 194 * Returns the continuation scope used for virtual threads. 195 */ 196 static ContinuationScope continuationScope() { 197 return VTHREAD_SCOPE; 198 } 199 200 /** 201 * Creates a new {@code VirtualThread} to run the given task with the given 202 * scheduler. If the given scheduler is {@code null} and the current thread 203 * is a platform thread then the newly created virtual thread will use the 204 * default scheduler. If given scheduler is {@code null} and the current 205 * thread is a virtual thread then the current thread's scheduler is used. 206 * 207 * @param scheduler the scheduler or null 208 * @param name thread name 209 * @param characteristics characteristics 210 * @param task the task to execute 211 */ 212 VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) { 213 super(name, characteristics, /*bound*/ false); 214 Objects.requireNonNull(task); 215 216 // choose scheduler if not specified 217 if (scheduler == null) { 218 Thread parent = Thread.currentThread(); 219 if (parent instanceof VirtualThread vparent) { 220 scheduler = vparent.scheduler; 221 } else { 222 scheduler = DEFAULT_SCHEDULER; 223 } 224 } 225 226 this.scheduler = scheduler; 227 this.cont = new VThreadContinuation(this, task); 228 this.runContinuation = this::runContinuation; 229 } 230 231 /** 232 * The continuation that a virtual thread executes. 233 */ 234 private static class VThreadContinuation extends Continuation { 235 VThreadContinuation(VirtualThread vthread, Runnable task) { 236 super(VTHREAD_SCOPE, wrap(vthread, task)); 237 } 238 @Override 239 protected void onPinned(Continuation.Pinned reason) { 240 } 241 private static Runnable wrap(VirtualThread vthread, Runnable task) { 242 return new Runnable() { 243 @Hidden 244 @JvmtiHideEvents 245 public void run() { 246 vthread.notifyJvmtiStart(); // notify JVMTI 247 try { 248 vthread.run(task); 249 } finally { 250 vthread.notifyJvmtiEnd(); // notify JVMTI 251 } 252 } 253 }; 254 } 255 } 256 257 /** 258 * Runs or continues execution on the current thread. The virtual thread is mounted 259 * on the current thread before the task runs or continues. It unmounts when the 260 * task completes or yields. 261 */ 262 @ChangesCurrentThread // allow mount/unmount to be inlined 263 private void runContinuation() { 264 // the carrier must be a platform thread 265 if (Thread.currentThread().isVirtual()) { 266 throw new WrongThreadException(); 267 } 268 269 // set state to RUNNING 270 int initialState = state(); 271 if (initialState == STARTED || initialState == UNPARKED 272 || initialState == UNBLOCKED || initialState == YIELDED) { 273 // newly started or continue after parking/blocking/Thread.yield 274 if (!compareAndSetState(initialState, RUNNING)) { 275 return; 276 } 277 // consume permit when continuing after parking or blocking. If continue 278 // after a timed-park or timed-wait then the timeout task is cancelled. 279 if (initialState == UNPARKED) { 280 cancelTimeoutTask(); 281 setParkPermit(false); 282 } else if (initialState == UNBLOCKED) { 283 cancelTimeoutTask(); 284 blockPermit = false; 285 } 286 } else { 287 // not runnable 288 return; 289 } 290 291 mount(); 292 try { 293 cont.run(); 294 } finally { 295 unmount(); 296 if (cont.isDone()) { 297 afterDone(); 298 } else { 299 afterYield(); 300 } 301 } 302 } 303 304 /** 305 * Cancel timeout task when continuing after timed-park or timed-wait. 306 * The timeout task may be executing, or may have already completed. 307 */ 308 private void cancelTimeoutTask() { 309 if (timeoutTask != null) { 310 timeoutTask.cancel(false); 311 timeoutTask = null; 312 } 313 } 314 315 /** 316 * Submits the runContinuation task to the scheduler. For the default scheduler, 317 * and calling it on a worker thread, the task will be pushed to the local queue, 318 * otherwise it will be pushed to an external submission queue. 319 * @param scheduler the scheduler 320 * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown 321 * @throws RejectedExecutionException 322 */ 323 private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) { 324 boolean done = false; 325 while (!done) { 326 try { 327 // Pin the continuation to prevent the virtual thread from unmounting 328 // when submitting a task. For the default scheduler this ensures that 329 // the carrier doesn't change when pushing a task. For other schedulers 330 // it avoids deadlock that could arise due to carriers and virtual 331 // threads contending for a lock. 332 if (currentThread().isVirtual()) { 333 Continuation.pin(); 334 try { 335 scheduler.execute(runContinuation); 336 } finally { 337 Continuation.unpin(); 338 } 339 } else { 340 scheduler.execute(runContinuation); 341 } 342 done = true; 343 } catch (RejectedExecutionException ree) { 344 submitFailed(ree); 345 throw ree; 346 } catch (OutOfMemoryError e) { 347 if (retryOnOOME) { 348 U.park(false, 100_000_000); // 100ms 349 } else { 350 throw e; 351 } 352 } 353 } 354 } 355 356 /** 357 * Submits the runContinuation task to the given scheduler as an external submit. 358 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 359 * @throws RejectedExecutionException 360 * @see ForkJoinPool#externalSubmit(ForkJoinTask) 361 */ 362 private void externalSubmitRunContinuation(ForkJoinPool pool) { 363 assert Thread.currentThread() instanceof CarrierThread; 364 try { 365 pool.externalSubmit(ForkJoinTask.adapt(runContinuation)); 366 } catch (RejectedExecutionException ree) { 367 submitFailed(ree); 368 throw ree; 369 } catch (OutOfMemoryError e) { 370 submitRunContinuation(pool, true); 371 } 372 } 373 374 /** 375 * Submits the runContinuation task to the scheduler. For the default scheduler, 376 * and calling it on a worker thread, the task will be pushed to the local queue, 377 * otherwise it will be pushed to an external submission queue. 378 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 379 * @throws RejectedExecutionException 380 */ 381 private void submitRunContinuation() { 382 submitRunContinuation(scheduler, true); 383 } 384 385 /** 386 * Lazy submit the runContinuation task if invoked on a carrier thread and its local 387 * queue is empty. If not empty, or invoked by another thread, then this method works 388 * like submitRunContinuation and just submits the task to the scheduler. 389 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 390 * @throws RejectedExecutionException 391 * @see ForkJoinPool#lazySubmit(ForkJoinTask) 392 */ 393 private void lazySubmitRunContinuation() { 394 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 395 ForkJoinPool pool = ct.getPool(); 396 try { 397 pool.lazySubmit(ForkJoinTask.adapt(runContinuation)); 398 } catch (RejectedExecutionException ree) { 399 submitFailed(ree); 400 throw ree; 401 } catch (OutOfMemoryError e) { 402 submitRunContinuation(); 403 } 404 } else { 405 submitRunContinuation(); 406 } 407 } 408 409 /** 410 * Submits the runContinuation task to the scheduler. For the default scheduler, and 411 * calling it a virtual thread that uses the default scheduler, the task will be 412 * pushed to an external submission queue. This method may throw OutOfMemoryError. 413 * @throws RejectedExecutionException 414 * @throws OutOfMemoryError 415 */ 416 private void externalSubmitRunContinuationOrThrow() { 417 if (scheduler == DEFAULT_SCHEDULER && currentCarrierThread() instanceof CarrierThread ct) { 418 try { 419 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation)); 420 } catch (RejectedExecutionException ree) { 421 submitFailed(ree); 422 throw ree; 423 } 424 } else { 425 submitRunContinuation(scheduler, false); 426 } 427 } 428 429 /** 430 * If enabled, emits a JFR VirtualThreadSubmitFailedEvent. 431 */ 432 private void submitFailed(RejectedExecutionException ree) { 433 var event = new VirtualThreadSubmitFailedEvent(); 434 if (event.isEnabled()) { 435 event.javaThreadId = threadId(); 436 event.exceptionMessage = ree.getMessage(); 437 event.commit(); 438 } 439 } 440 441 /** 442 * Runs a task in the context of this virtual thread. 443 */ 444 private void run(Runnable task) { 445 assert Thread.currentThread() == this && state == RUNNING; 446 447 // emit JFR event if enabled 448 if (VirtualThreadStartEvent.isTurnedOn()) { 449 var event = new VirtualThreadStartEvent(); 450 event.javaThreadId = threadId(); 451 event.commit(); 452 } 453 454 Object bindings = Thread.scopedValueBindings(); 455 try { 456 runWith(bindings, task); 457 } catch (Throwable exc) { 458 dispatchUncaughtException(exc); 459 } finally { 460 // pop any remaining scopes from the stack, this may block 461 StackableScope.popAll(); 462 463 // emit JFR event if enabled 464 if (VirtualThreadEndEvent.isTurnedOn()) { 465 var event = new VirtualThreadEndEvent(); 466 event.javaThreadId = threadId(); 467 event.commit(); 468 } 469 } 470 } 471 472 /** 473 * Mounts this virtual thread onto the current platform thread. On 474 * return, the current thread is the virtual thread. 475 */ 476 @ChangesCurrentThread 477 @ReservedStackAccess 478 private void mount() { 479 // notify JVMTI before mount 480 notifyJvmtiMount(/*hide*/true); 481 482 // sets the carrier thread 483 Thread carrier = Thread.currentCarrierThread(); 484 setCarrierThread(carrier); 485 486 // sync up carrier thread interrupt status if needed 487 if (interrupted) { 488 carrier.setInterrupt(); 489 } else if (carrier.isInterrupted()) { 490 synchronized (interruptLock) { 491 // need to recheck interrupt status 492 if (!interrupted) { 493 carrier.clearInterrupt(); 494 } 495 } 496 } 497 498 // set Thread.currentThread() to return this virtual thread 499 carrier.setCurrentThread(this); 500 } 501 502 /** 503 * Unmounts this virtual thread from the carrier. On return, the 504 * current thread is the current platform thread. 505 */ 506 @ChangesCurrentThread 507 @ReservedStackAccess 508 private void unmount() { 509 assert !Thread.holdsLock(interruptLock); 510 511 // set Thread.currentThread() to return the platform thread 512 Thread carrier = this.carrierThread; 513 carrier.setCurrentThread(carrier); 514 515 // break connection to carrier thread, synchronized with interrupt 516 synchronized (interruptLock) { 517 setCarrierThread(null); 518 } 519 carrier.clearInterrupt(); 520 521 // notify JVMTI after unmount 522 notifyJvmtiUnmount(/*hide*/false); 523 } 524 525 /** 526 * Invokes Continuation.yield, notifying JVMTI (if enabled) to hide frames until 527 * the continuation continues. 528 */ 529 @Hidden 530 private boolean yieldContinuation() { 531 notifyJvmtiUnmount(/*hide*/true); 532 try { 533 return Continuation.yield(VTHREAD_SCOPE); 534 } finally { 535 notifyJvmtiMount(/*hide*/false); 536 } 537 } 538 539 /** 540 * Invoked in the context of the carrier thread after the Continuation yields when 541 * parking, blocking on monitor enter, Object.wait, or Thread.yield. 542 */ 543 private void afterYield() { 544 assert carrierThread == null; 545 546 // re-adjust parallelism if the virtual thread yielded when compensating 547 if (currentThread() instanceof CarrierThread ct) { 548 ct.endBlocking(); 549 } 550 551 int s = state(); 552 553 // LockSupport.park/parkNanos 554 if (s == PARKING || s == TIMED_PARKING) { 555 int newState; 556 if (s == PARKING) { 557 setState(newState = PARKED); 558 } else { 559 // schedule unpark 560 long timeout = this.timeout; 561 assert timeout > 0; 562 timeoutTask = schedule(this::parkTimeoutExpired, timeout, NANOSECONDS); 563 setState(newState = TIMED_PARKED); 564 } 565 566 // may have been unparked while parking 567 if (parkPermit && compareAndSetState(newState, UNPARKED)) { 568 // lazy submit if local queue is empty 569 lazySubmitRunContinuation(); 570 } 571 return; 572 } 573 574 // Thread.yield 575 if (s == YIELDING) { 576 setState(YIELDED); 577 578 // external submit if there are no tasks in the local task queue 579 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 580 externalSubmitRunContinuation(ct.getPool()); 581 } else { 582 submitRunContinuation(); 583 } 584 return; 585 } 586 587 // blocking on monitorenter 588 if (s == BLOCKING) { 589 setState(BLOCKED); 590 591 // may have been unblocked while blocking 592 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) { 593 // lazy submit if local queue is empty 594 lazySubmitRunContinuation(); 595 } 596 return; 597 } 598 599 // Object.wait 600 if (s == WAITING || s == TIMED_WAITING) { 601 int newState; 602 if (s == WAITING) { 603 setState(newState = WAIT); 604 } else { 605 // For timed-wait, a timeout task is scheduled to execute. The timeout 606 // task will change the thread state to UNBLOCKED and submit the thread 607 // to the scheduler. A sequence number is used to ensure that the timeout 608 // task only unblocks the thread for this timed-wait. We synchronize with 609 // the timeout task to coordinate access to the sequence number and to 610 // ensure the timeout task doesn't execute until the thread has got to 611 // the TIMED_WAIT state. 612 long timeout = this.timeout; 613 assert timeout > 0; 614 synchronized (timedWaitLock()) { 615 byte seqNo = ++timedWaitSeqNo; 616 timeoutTask = schedule(() -> waitTimeoutExpired(seqNo), timeout, MILLISECONDS); 617 setState(newState = TIMED_WAIT); 618 } 619 } 620 621 // may have been notified while in transition to wait state 622 if (notified && compareAndSetState(newState, BLOCKED)) { 623 // may have even been unblocked already 624 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) { 625 submitRunContinuation(); 626 } 627 return; 628 } 629 630 // may have been interrupted while in transition to wait state 631 if (interrupted && compareAndSetState(newState, UNBLOCKED)) { 632 submitRunContinuation(); 633 return; 634 } 635 return; 636 } 637 638 assert false; 639 } 640 641 /** 642 * Invoked after the continuation completes. 643 */ 644 private void afterDone() { 645 afterDone(true); 646 } 647 648 /** 649 * Invoked after the continuation completes (or start failed). Sets the thread 650 * state to TERMINATED and notifies anyone waiting for the thread to terminate. 651 * 652 * @param notifyContainer true if its container should be notified 653 */ 654 private void afterDone(boolean notifyContainer) { 655 assert carrierThread == null; 656 setState(TERMINATED); 657 658 // notify anyone waiting for this virtual thread to terminate 659 CountDownLatch termination = this.termination; 660 if (termination != null) { 661 assert termination.getCount() == 1; 662 termination.countDown(); 663 } 664 665 // notify container 666 if (notifyContainer) { 667 threadContainer().onExit(this); 668 } 669 670 // clear references to thread locals 671 clearReferences(); 672 } 673 674 /** 675 * Schedules this {@code VirtualThread} to execute. 676 * 677 * @throws IllegalStateException if the container is shutdown or closed 678 * @throws IllegalThreadStateException if the thread has already been started 679 * @throws RejectedExecutionException if the scheduler cannot accept a task 680 */ 681 @Override 682 void start(ThreadContainer container) { 683 if (!compareAndSetState(NEW, STARTED)) { 684 throw new IllegalThreadStateException("Already started"); 685 } 686 687 // bind thread to container 688 assert threadContainer() == null; 689 setThreadContainer(container); 690 691 // start thread 692 boolean addedToContainer = false; 693 boolean started = false; 694 try { 695 container.onStart(this); // may throw 696 addedToContainer = true; 697 698 // scoped values may be inherited 699 inheritScopedValueBindings(container); 700 701 // submit task to run thread, using externalSubmit if possible 702 externalSubmitRunContinuationOrThrow(); 703 started = true; 704 } finally { 705 if (!started) { 706 afterDone(addedToContainer); 707 } 708 } 709 } 710 711 @Override 712 public void start() { 713 start(ThreadContainers.root()); 714 } 715 716 @Override 717 public void run() { 718 // do nothing 719 } 720 721 /** 722 * Parks until unparked or interrupted. If already unparked then the parking 723 * permit is consumed and this method completes immediately (meaning it doesn't 724 * yield). It also completes immediately if the interrupt status is set. 725 */ 726 @Override 727 void park() { 728 assert Thread.currentThread() == this; 729 730 // complete immediately if parking permit available or interrupted 731 if (getAndSetParkPermit(false) || interrupted) 732 return; 733 734 // park the thread 735 boolean yielded = false; 736 setState(PARKING); 737 try { 738 yielded = yieldContinuation(); 739 } catch (OutOfMemoryError e) { 740 // park on carrier 741 } finally { 742 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 743 if (!yielded) { 744 assert state() == PARKING; 745 setState(RUNNING); 746 } 747 } 748 749 // park on the carrier thread when pinned 750 if (!yielded) { 751 parkOnCarrierThread(false, 0); 752 } 753 } 754 755 /** 756 * Parks up to the given waiting time or until unparked or interrupted. 757 * If already unparked then the parking permit is consumed and this method 758 * completes immediately (meaning it doesn't yield). It also completes immediately 759 * if the interrupt status is set or the waiting time is {@code <= 0}. 760 * 761 * @param nanos the maximum number of nanoseconds to wait. 762 */ 763 @Override 764 void parkNanos(long nanos) { 765 assert Thread.currentThread() == this; 766 767 // complete immediately if parking permit available or interrupted 768 if (getAndSetParkPermit(false) || interrupted) 769 return; 770 771 // park the thread for the waiting time 772 if (nanos > 0) { 773 long startTime = System.nanoTime(); 774 775 // park the thread, afterYield will schedule the thread to unpark 776 boolean yielded = false; 777 timeout = nanos; 778 setState(TIMED_PARKING); 779 try { 780 yielded = yieldContinuation(); 781 } catch (OutOfMemoryError e) { 782 // park on carrier 783 } finally { 784 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 785 if (!yielded) { 786 assert state() == TIMED_PARKING; 787 setState(RUNNING); 788 } 789 } 790 791 // park on carrier thread for remaining time when pinned (or OOME) 792 if (!yielded) { 793 long remainingNanos = nanos - (System.nanoTime() - startTime); 794 parkOnCarrierThread(true, remainingNanos); 795 } 796 } 797 } 798 799 /** 800 * Parks the current carrier thread up to the given waiting time or until 801 * unparked or interrupted. If the virtual thread is interrupted then the 802 * interrupt status will be propagated to the carrier thread. 803 * @param timed true for a timed park, false for untimed 804 * @param nanos the waiting time in nanoseconds 805 */ 806 private void parkOnCarrierThread(boolean timed, long nanos) { 807 assert state() == RUNNING; 808 809 setState(timed ? TIMED_PINNED : PINNED); 810 try { 811 if (!parkPermit) { 812 if (!timed) { 813 U.park(false, 0); 814 } else if (nanos > 0) { 815 U.park(false, nanos); 816 } 817 } 818 } finally { 819 setState(RUNNING); 820 } 821 822 // consume parking permit 823 setParkPermit(false); 824 825 // JFR jdk.VirtualThreadPinned event 826 postPinnedEvent("LockSupport.park"); 827 } 828 829 /** 830 * Call into VM when pinned to record a JFR jdk.VirtualThreadPinned event. 831 * Recording the event in the VM avoids having JFR event recorded in Java 832 * with the same name, but different ID, to events recorded by the VM. 833 */ 834 @Hidden 835 private static native void postPinnedEvent(String op); 836 837 /** 838 * Re-enables this virtual thread for scheduling. If this virtual thread is parked 839 * then its task is scheduled to continue, otherwise its next call to {@code park} or 840 * {@linkplain #parkNanos(long) parkNanos} is guaranteed not to block. 841 * @throws RejectedExecutionException if the scheduler cannot accept a task 842 */ 843 @Override 844 void unpark() { 845 if (!getAndSetParkPermit(true) && currentThread() != this) { 846 int s = state(); 847 848 // unparked while parked 849 if ((s == PARKED || s == TIMED_PARKED) && compareAndSetState(s, UNPARKED)) { 850 submitRunContinuation(); 851 return; 852 } 853 854 // unparked while parked when pinned 855 if (s == PINNED || s == TIMED_PINNED) { 856 // unpark carrier thread when pinned 857 disableSuspendAndPreempt(); 858 try { 859 synchronized (carrierThreadAccessLock()) { 860 Thread carrier = carrierThread; 861 if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) { 862 U.unpark(carrier); 863 } 864 } 865 } finally { 866 enableSuspendAndPreempt(); 867 } 868 return; 869 } 870 } 871 } 872 873 /** 874 * Invoked by unblocker thread to unblock this virtual thread. 875 */ 876 private void unblock() { 877 assert !Thread.currentThread().isVirtual(); 878 blockPermit = true; 879 if (state() == BLOCKED && compareAndSetState(BLOCKED, UNBLOCKED)) { 880 submitRunContinuation(); 881 } 882 } 883 884 /** 885 * Invoked by FJP worker thread or STPE thread when park timeout expires. 886 */ 887 private void parkTimeoutExpired() { 888 assert !VirtualThread.currentThread().isVirtual(); 889 if (!getAndSetParkPermit(true) 890 && (state() == TIMED_PARKED) 891 && compareAndSetState(TIMED_PARKED, UNPARKED)) { 892 lazySubmitRunContinuation(); 893 } 894 } 895 896 /** 897 * Invoked by FJP worker thread or STPE thread when wait timeout expires. 898 * If the virtual thread is in timed-wait then this method will unblock the thread 899 * and submit its task so that it continues and attempts to reenter the monitor. 900 * This method does nothing if the thread has been woken by notify or interrupt. 901 */ 902 private void waitTimeoutExpired(byte seqNo) { 903 assert !Thread.currentThread().isVirtual(); 904 for (;;) { 905 boolean unblocked = false; 906 synchronized (timedWaitLock()) { 907 if (seqNo != timedWaitSeqNo) { 908 // this timeout task is for a past timed-wait 909 return; 910 } 911 int s = state(); 912 if (s == TIMED_WAIT) { 913 unblocked = compareAndSetState(TIMED_WAIT, UNBLOCKED); 914 } else if (s != (TIMED_WAIT | SUSPENDED)) { 915 // notified or interrupted, no longer waiting 916 return; 917 } 918 } 919 if (unblocked) { 920 lazySubmitRunContinuation(); 921 return; 922 } 923 // need to retry when thread is suspended in time-wait 924 Thread.yield(); 925 } 926 } 927 928 /** 929 * Attempts to yield the current virtual thread (Thread.yield). 930 */ 931 void tryYield() { 932 assert Thread.currentThread() == this; 933 setState(YIELDING); 934 boolean yielded = false; 935 try { 936 yielded = yieldContinuation(); // may throw 937 } finally { 938 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 939 if (!yielded) { 940 assert state() == YIELDING; 941 setState(RUNNING); 942 } 943 } 944 } 945 946 /** 947 * Sleep the current thread for the given sleep time (in nanoseconds). If 948 * nanos is 0 then the thread will attempt to yield. 949 * 950 * @implNote This implementation parks the thread for the given sleeping time 951 * and will therefore be observed in PARKED state during the sleep. Parking 952 * will consume the parking permit so this method makes available the parking 953 * permit after the sleep. This may be observed as a spurious, but benign, 954 * wakeup when the thread subsequently attempts to park. 955 * 956 * @param nanos the maximum number of nanoseconds to sleep 957 * @throws InterruptedException if interrupted while sleeping 958 */ 959 void sleepNanos(long nanos) throws InterruptedException { 960 assert Thread.currentThread() == this && nanos >= 0; 961 if (getAndClearInterrupt()) 962 throw new InterruptedException(); 963 if (nanos == 0) { 964 tryYield(); 965 } else { 966 // park for the sleep time 967 try { 968 long remainingNanos = nanos; 969 long startNanos = System.nanoTime(); 970 while (remainingNanos > 0) { 971 parkNanos(remainingNanos); 972 if (getAndClearInterrupt()) { 973 throw new InterruptedException(); 974 } 975 remainingNanos = nanos - (System.nanoTime() - startNanos); 976 } 977 } finally { 978 // may have been unparked while sleeping 979 setParkPermit(true); 980 } 981 } 982 } 983 984 /** 985 * Waits up to {@code nanos} nanoseconds for this virtual thread to terminate. 986 * A timeout of {@code 0} means to wait forever. 987 * 988 * @throws InterruptedException if interrupted while waiting 989 * @return true if the thread has terminated 990 */ 991 boolean joinNanos(long nanos) throws InterruptedException { 992 if (state() == TERMINATED) 993 return true; 994 995 // ensure termination object exists, then re-check state 996 CountDownLatch termination = getTermination(); 997 if (state() == TERMINATED) 998 return true; 999 1000 // wait for virtual thread to terminate 1001 if (nanos == 0) { 1002 termination.await(); 1003 } else { 1004 boolean terminated = termination.await(nanos, NANOSECONDS); 1005 if (!terminated) { 1006 // waiting time elapsed 1007 return false; 1008 } 1009 } 1010 assert state() == TERMINATED; 1011 return true; 1012 } 1013 1014 @Override 1015 void blockedOn(Interruptible b) { 1016 disableSuspendAndPreempt(); 1017 try { 1018 super.blockedOn(b); 1019 } finally { 1020 enableSuspendAndPreempt(); 1021 } 1022 } 1023 1024 @Override 1025 public void interrupt() { 1026 if (Thread.currentThread() != this) { 1027 // if current thread is a virtual thread then prevent it from being 1028 // suspended or unmounted when entering or holding interruptLock 1029 Interruptible blocker; 1030 disableSuspendAndPreempt(); 1031 try { 1032 synchronized (interruptLock) { 1033 interrupted = true; 1034 blocker = nioBlocker(); 1035 if (blocker != null) { 1036 blocker.interrupt(this); 1037 } 1038 1039 // interrupt carrier thread if mounted 1040 Thread carrier = carrierThread; 1041 if (carrier != null) carrier.setInterrupt(); 1042 } 1043 } finally { 1044 enableSuspendAndPreempt(); 1045 } 1046 1047 // notify blocker after releasing interruptLock 1048 if (blocker != null) { 1049 blocker.postInterrupt(); 1050 } 1051 1052 // make available parking permit, unpark thread if parked 1053 unpark(); 1054 1055 // if thread is waiting in Object.wait then schedule to try to reenter 1056 int s = state(); 1057 if ((s == WAIT || s == TIMED_WAIT) && compareAndSetState(s, UNBLOCKED)) { 1058 submitRunContinuation(); 1059 } 1060 1061 } else { 1062 interrupted = true; 1063 carrierThread.setInterrupt(); 1064 setParkPermit(true); 1065 } 1066 } 1067 1068 @Override 1069 public boolean isInterrupted() { 1070 return interrupted; 1071 } 1072 1073 @Override 1074 boolean getAndClearInterrupt() { 1075 assert Thread.currentThread() == this; 1076 boolean oldValue = interrupted; 1077 if (oldValue) { 1078 disableSuspendAndPreempt(); 1079 try { 1080 synchronized (interruptLock) { 1081 interrupted = false; 1082 carrierThread.clearInterrupt(); 1083 } 1084 } finally { 1085 enableSuspendAndPreempt(); 1086 } 1087 } 1088 return oldValue; 1089 } 1090 1091 @Override 1092 Thread.State threadState() { 1093 int s = state(); 1094 switch (s & ~SUSPENDED) { 1095 case NEW: 1096 return Thread.State.NEW; 1097 case STARTED: 1098 // return NEW if thread container not yet set 1099 if (threadContainer() == null) { 1100 return Thread.State.NEW; 1101 } else { 1102 return Thread.State.RUNNABLE; 1103 } 1104 case UNPARKED: 1105 case UNBLOCKED: 1106 case YIELDED: 1107 // runnable, not mounted 1108 return Thread.State.RUNNABLE; 1109 case RUNNING: 1110 // if mounted then return state of carrier thread 1111 if (Thread.currentThread() != this) { 1112 disableSuspendAndPreempt(); 1113 try { 1114 synchronized (carrierThreadAccessLock()) { 1115 Thread carrierThread = this.carrierThread; 1116 if (carrierThread != null) { 1117 return carrierThread.threadState(); 1118 } 1119 } 1120 } finally { 1121 enableSuspendAndPreempt(); 1122 } 1123 } 1124 // runnable, mounted 1125 return Thread.State.RUNNABLE; 1126 case PARKING: 1127 case TIMED_PARKING: 1128 case WAITING: 1129 case TIMED_WAITING: 1130 case YIELDING: 1131 // runnable, in transition 1132 return Thread.State.RUNNABLE; 1133 case PARKED: 1134 case PINNED: 1135 case WAIT: 1136 return Thread.State.WAITING; 1137 case TIMED_PARKED: 1138 case TIMED_PINNED: 1139 case TIMED_WAIT: 1140 return Thread.State.TIMED_WAITING; 1141 case BLOCKING: 1142 case BLOCKED: 1143 return Thread.State.BLOCKED; 1144 case TERMINATED: 1145 return Thread.State.TERMINATED; 1146 default: 1147 throw new InternalError(); 1148 } 1149 } 1150 1151 @Override 1152 boolean alive() { 1153 int s = state; 1154 return (s != NEW && s != TERMINATED); 1155 } 1156 1157 @Override 1158 boolean isTerminated() { 1159 return (state == TERMINATED); 1160 } 1161 1162 @Override 1163 StackTraceElement[] asyncGetStackTrace() { 1164 StackTraceElement[] stackTrace; 1165 do { 1166 stackTrace = (carrierThread != null) 1167 ? super.asyncGetStackTrace() // mounted 1168 : tryGetStackTrace(); // unmounted 1169 if (stackTrace == null) { 1170 Thread.yield(); 1171 } 1172 } while (stackTrace == null); 1173 return stackTrace; 1174 } 1175 1176 /** 1177 * Returns the stack trace for this virtual thread if it is unmounted. 1178 * Returns null if the thread is mounted or in transition. 1179 */ 1180 private StackTraceElement[] tryGetStackTrace() { 1181 int initialState = state() & ~SUSPENDED; 1182 switch (initialState) { 1183 case NEW, STARTED, TERMINATED -> { 1184 return new StackTraceElement[0]; // unmounted, empty stack 1185 } 1186 case RUNNING, PINNED, TIMED_PINNED -> { 1187 return null; // mounted 1188 } 1189 case PARKED, TIMED_PARKED, BLOCKED, WAIT, TIMED_WAIT -> { 1190 // unmounted, not runnable 1191 } 1192 case UNPARKED, UNBLOCKED, YIELDED -> { 1193 // unmounted, runnable 1194 } 1195 case PARKING, TIMED_PARKING, BLOCKING, YIELDING, WAITING, TIMED_WAITING -> { 1196 return null; // in transition 1197 } 1198 default -> throw new InternalError("" + initialState); 1199 } 1200 1201 // thread is unmounted, prevent it from continuing 1202 int suspendedState = initialState | SUSPENDED; 1203 if (!compareAndSetState(initialState, suspendedState)) { 1204 return null; 1205 } 1206 1207 // get stack trace and restore state 1208 StackTraceElement[] stack; 1209 try { 1210 stack = cont.getStackTrace(); 1211 } finally { 1212 assert state == suspendedState; 1213 setState(initialState); 1214 } 1215 boolean resubmit = switch (initialState) { 1216 case UNPARKED, UNBLOCKED, YIELDED -> { 1217 // resubmit as task may have run while suspended 1218 yield true; 1219 } 1220 case PARKED, TIMED_PARKED -> { 1221 // resubmit if unparked while suspended 1222 yield parkPermit && compareAndSetState(initialState, UNPARKED); 1223 } 1224 case BLOCKED -> { 1225 // resubmit if unblocked while suspended 1226 yield blockPermit && compareAndSetState(BLOCKED, UNBLOCKED); 1227 } 1228 case WAIT, TIMED_WAIT -> { 1229 // resubmit if notified or interrupted while waiting (Object.wait) 1230 // waitTimeoutExpired will retry if the timed expired when suspended 1231 yield (notified || interrupted) && compareAndSetState(initialState, UNBLOCKED); 1232 } 1233 default -> throw new InternalError(); 1234 }; 1235 if (resubmit) { 1236 submitRunContinuation(); 1237 } 1238 return stack; 1239 } 1240 1241 @Override 1242 public String toString() { 1243 StringBuilder sb = new StringBuilder("VirtualThread[#"); 1244 sb.append(threadId()); 1245 String name = getName(); 1246 if (!name.isEmpty()) { 1247 sb.append(","); 1248 sb.append(name); 1249 } 1250 sb.append("]/"); 1251 1252 // add the carrier state and thread name when mounted 1253 boolean mounted; 1254 if (Thread.currentThread() == this) { 1255 mounted = appendCarrierInfo(sb); 1256 } else { 1257 disableSuspendAndPreempt(); 1258 try { 1259 synchronized (carrierThreadAccessLock()) { 1260 mounted = appendCarrierInfo(sb); 1261 } 1262 } finally { 1263 enableSuspendAndPreempt(); 1264 } 1265 } 1266 1267 // add virtual thread state when not mounted 1268 if (!mounted) { 1269 String stateAsString = threadState().toString(); 1270 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1271 } 1272 1273 return sb.toString(); 1274 } 1275 1276 /** 1277 * Appends the carrier state and thread name to the string buffer if mounted. 1278 * @return true if mounted, false if not mounted 1279 */ 1280 private boolean appendCarrierInfo(StringBuilder sb) { 1281 assert Thread.currentThread() == this || Thread.holdsLock(carrierThreadAccessLock()); 1282 Thread carrier = carrierThread; 1283 if (carrier != null) { 1284 String stateAsString = carrier.threadState().toString(); 1285 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1286 sb.append('@'); 1287 sb.append(carrier.getName()); 1288 return true; 1289 } else { 1290 return false; 1291 } 1292 } 1293 1294 @Override 1295 public int hashCode() { 1296 return (int) threadId(); 1297 } 1298 1299 @Override 1300 public boolean equals(Object obj) { 1301 return obj == this; 1302 } 1303 1304 /** 1305 * Returns the termination object, creating it if needed. 1306 */ 1307 private CountDownLatch getTermination() { 1308 CountDownLatch termination = this.termination; 1309 if (termination == null) { 1310 termination = new CountDownLatch(1); 1311 if (!U.compareAndSetReference(this, TERMINATION, null, termination)) { 1312 termination = this.termination; 1313 } 1314 } 1315 return termination; 1316 } 1317 1318 /** 1319 * Returns the lock object to synchronize on when accessing carrierThread. 1320 * The lock prevents carrierThread from being reset to null during unmount. 1321 */ 1322 private Object carrierThreadAccessLock() { 1323 // return interruptLock as unmount has to coordinate with interrupt 1324 return interruptLock; 1325 } 1326 1327 /** 1328 * Returns a lock object for coordinating timed-wait setup and timeout handling. 1329 */ 1330 private Object timedWaitLock() { 1331 // use this object for now to avoid the overhead of introducing another lock 1332 return runContinuation; 1333 } 1334 1335 /** 1336 * Disallow the current thread be suspended or preempted. 1337 */ 1338 private void disableSuspendAndPreempt() { 1339 notifyJvmtiDisableSuspend(true); 1340 Continuation.pin(); 1341 } 1342 1343 /** 1344 * Allow the current thread be suspended or preempted. 1345 */ 1346 private void enableSuspendAndPreempt() { 1347 Continuation.unpin(); 1348 notifyJvmtiDisableSuspend(false); 1349 } 1350 1351 // -- wrappers for get/set of state, parking permit, and carrier thread -- 1352 1353 private int state() { 1354 return state; // volatile read 1355 } 1356 1357 private void setState(int newValue) { 1358 state = newValue; // volatile write 1359 } 1360 1361 private boolean compareAndSetState(int expectedValue, int newValue) { 1362 return U.compareAndSetInt(this, STATE, expectedValue, newValue); 1363 } 1364 1365 private boolean compareAndSetOnWaitingList(boolean expectedValue, boolean newValue) { 1366 return U.compareAndSetBoolean(this, ON_WAITING_LIST, expectedValue, newValue); 1367 } 1368 1369 private void setParkPermit(boolean newValue) { 1370 if (parkPermit != newValue) { 1371 parkPermit = newValue; 1372 } 1373 } 1374 1375 private boolean getAndSetParkPermit(boolean newValue) { 1376 if (parkPermit != newValue) { 1377 return U.getAndSetBoolean(this, PARK_PERMIT, newValue); 1378 } else { 1379 return newValue; 1380 } 1381 } 1382 1383 private void setCarrierThread(Thread carrier) { 1384 // U.putReferenceRelease(this, CARRIER_THREAD, carrier); 1385 this.carrierThread = carrier; 1386 } 1387 1388 // -- JVM TI support -- 1389 1390 @IntrinsicCandidate 1391 @JvmtiMountTransition 1392 private native void notifyJvmtiStart(); 1393 1394 @IntrinsicCandidate 1395 @JvmtiMountTransition 1396 private native void notifyJvmtiEnd(); 1397 1398 @IntrinsicCandidate 1399 @JvmtiMountTransition 1400 private native void notifyJvmtiMount(boolean hide); 1401 1402 @IntrinsicCandidate 1403 @JvmtiMountTransition 1404 private native void notifyJvmtiUnmount(boolean hide); 1405 1406 @IntrinsicCandidate 1407 private static native void notifyJvmtiDisableSuspend(boolean enter); 1408 1409 private static native void registerNatives(); 1410 static { 1411 registerNatives(); 1412 1413 // ensure VTHREAD_GROUP is created, may be accessed by JVMTI 1414 var group = Thread.virtualThreadGroup(); 1415 } 1416 1417 /** 1418 * Creates the default ForkJoinPool scheduler. 1419 */ 1420 private static ForkJoinPool createDefaultScheduler() { 1421 ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool); 1422 int parallelism, maxPoolSize, minRunnable; 1423 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism"); 1424 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize"); 1425 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable"); 1426 if (parallelismValue != null) { 1427 parallelism = Integer.parseInt(parallelismValue); 1428 } else { 1429 parallelism = Runtime.getRuntime().availableProcessors(); 1430 } 1431 if (maxPoolSizeValue != null) { 1432 maxPoolSize = Integer.parseInt(maxPoolSizeValue); 1433 parallelism = Integer.min(parallelism, maxPoolSize); 1434 } else { 1435 maxPoolSize = Integer.max(parallelism, 256); 1436 } 1437 if (minRunnableValue != null) { 1438 minRunnable = Integer.parseInt(minRunnableValue); 1439 } else { 1440 minRunnable = Integer.max(parallelism / 2, 1); 1441 } 1442 Thread.UncaughtExceptionHandler handler = (t, e) -> { }; 1443 boolean asyncMode = true; // FIFO 1444 return new ForkJoinPool(parallelism, factory, handler, asyncMode, 1445 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS); 1446 } 1447 1448 /** 1449 * Schedule a runnable task to run after a delay. 1450 */ 1451 private Future<?> schedule(Runnable command, long delay, TimeUnit unit) { 1452 if (scheduler instanceof ForkJoinPool pool) { 1453 return pool.schedule(command, delay, unit); 1454 } else { 1455 return DelayedTaskSchedulers.schedule(command, delay, unit); 1456 } 1457 } 1458 1459 /** 1460 * Supports scheduling a runnable task to run after a delay. It uses a number 1461 * of ScheduledThreadPoolExecutor instances to reduce contention on the delayed 1462 * work queue used. This class is used when using a custom scheduler. 1463 */ 1464 private static class DelayedTaskSchedulers { 1465 private static final ScheduledExecutorService[] INSTANCE = createDelayedTaskSchedulers(); 1466 1467 static Future<?> schedule(Runnable command, long delay, TimeUnit unit) { 1468 long tid = Thread.currentThread().threadId(); 1469 int index = (int) tid & (INSTANCE.length - 1); 1470 return INSTANCE[index].schedule(command, delay, unit); 1471 } 1472 1473 private static ScheduledExecutorService[] createDelayedTaskSchedulers() { 1474 String propName = "jdk.virtualThreadScheduler.timerQueues"; 1475 String propValue = System.getProperty(propName); 1476 int queueCount; 1477 if (propValue != null) { 1478 queueCount = Integer.parseInt(propValue); 1479 if (queueCount != Integer.highestOneBit(queueCount)) { 1480 throw new RuntimeException("Value of " + propName + " must be power of 2"); 1481 } 1482 } else { 1483 int ncpus = Runtime.getRuntime().availableProcessors(); 1484 queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1); 1485 } 1486 var schedulers = new ScheduledExecutorService[queueCount]; 1487 for (int i = 0; i < queueCount; i++) { 1488 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) 1489 Executors.newScheduledThreadPool(1, task -> { 1490 Thread t = InnocuousThread.newThread("VirtualThread-unparker", task); 1491 t.setDaemon(true); 1492 return t; 1493 }); 1494 stpe.setRemoveOnCancelPolicy(true); 1495 schedulers[i] = stpe; 1496 } 1497 return schedulers; 1498 } 1499 } 1500 1501 /** 1502 * Schedule virtual threads that are ready to be scheduled after they blocked on 1503 * monitor enter. 1504 */ 1505 private static void unblockVirtualThreads() { 1506 while (true) { 1507 VirtualThread vthread = takeVirtualThreadListToUnblock(); 1508 while (vthread != null) { 1509 assert vthread.onWaitingList; 1510 VirtualThread nextThread = vthread.next; 1511 1512 // remove from list and unblock 1513 vthread.next = null; 1514 boolean changed = vthread.compareAndSetOnWaitingList(true, false); 1515 assert changed; 1516 vthread.unblock(); 1517 1518 vthread = nextThread; 1519 } 1520 } 1521 } 1522 1523 /** 1524 * Retrieves the list of virtual threads that are waiting to be unblocked, waiting 1525 * if necessary until a list of one or more threads becomes available. 1526 */ 1527 private static native VirtualThread takeVirtualThreadListToUnblock(); 1528 1529 static { 1530 var unblocker = InnocuousThread.newThread("VirtualThread-unblocker", 1531 VirtualThread::unblockVirtualThreads); 1532 unblocker.setDaemon(true); 1533 unblocker.start(); 1534 } 1535 }