1 /* 2 * Copyright (c) 2018, 2024, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.lang; 26 27 import java.util.Arrays; 28 import java.util.Locale; 29 import java.util.Objects; 30 import java.util.concurrent.CountDownLatch; 31 import java.util.concurrent.Executor; 32 import java.util.concurrent.Executors; 33 import java.util.concurrent.ForkJoinPool; 34 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory; 35 import java.util.concurrent.ForkJoinTask; 36 import java.util.concurrent.Future; 37 import java.util.concurrent.RejectedExecutionException; 38 import java.util.concurrent.ScheduledExecutorService; 39 import java.util.concurrent.ScheduledThreadPoolExecutor; 40 import java.util.concurrent.TimeUnit; 41 import java.util.stream.Stream; 42 import jdk.internal.event.VirtualThreadEndEvent; 43 import jdk.internal.event.VirtualThreadStartEvent; 44 import jdk.internal.event.VirtualThreadSubmitFailedEvent; 45 import jdk.internal.misc.CarrierThread; 46 import jdk.internal.misc.InnocuousThread; 47 import jdk.internal.misc.Unsafe; 48 import jdk.internal.vm.Continuation; 49 import jdk.internal.vm.ContinuationScope; 50 import jdk.internal.vm.StackableScope; 51 import jdk.internal.vm.ThreadContainer; 52 import jdk.internal.vm.ThreadContainers; 53 import jdk.internal.vm.annotation.ChangesCurrentThread; 54 import jdk.internal.vm.annotation.Hidden; 55 import jdk.internal.vm.annotation.IntrinsicCandidate; 56 import jdk.internal.vm.annotation.JvmtiHideEvents; 57 import jdk.internal.vm.annotation.JvmtiMountTransition; 58 import jdk.internal.vm.annotation.ReservedStackAccess; 59 import sun.nio.ch.Interruptible; 60 import static java.util.concurrent.TimeUnit.*; 61 62 /** 63 * A thread that is scheduled by the Java virtual machine rather than the operating system. 64 */ 65 final class VirtualThread extends BaseVirtualThread { 66 private static final Unsafe U = Unsafe.getUnsafe(); 67 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads"); 68 private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler(); 69 private static final ScheduledExecutorService[] DELAYED_TASK_SCHEDULERS = createDelayedTaskSchedulers(); 70 71 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state"); 72 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit"); 73 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread"); 74 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination"); 75 private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList"); 76 77 // scheduler and continuation 78 private final Executor scheduler; 79 private final Continuation cont; 80 private final Runnable runContinuation; 81 82 // virtual thread state, accessed by VM 83 private volatile int state; 84 85 /* 86 * Virtual thread state transitions: 87 * 88 * NEW -> STARTED // Thread.start, schedule to run 89 * STARTED -> TERMINATED // failed to start 90 * STARTED -> RUNNING // first run 91 * RUNNING -> TERMINATED // done 92 * 93 * RUNNING -> PARKING // Thread parking with LockSupport.park 94 * PARKING -> PARKED // cont.yield successful, parked indefinitely 95 * PARKING -> PINNED // cont.yield failed, parked indefinitely on carrier 96 * PARKED -> UNPARKED // unparked, may be scheduled to continue 97 * PINNED -> RUNNING // unparked, continue execution on same carrier 98 * UNPARKED -> RUNNING // continue execution after park 99 * 100 * RUNNING -> TIMED_PARKING // Thread parking with LockSupport.parkNanos 101 * TIMED_PARKING -> TIMED_PARKED // cont.yield successful, timed-parked 102 * TIMED_PARKING -> TIMED_PINNED // cont.yield failed, timed-parked on carrier 103 * TIMED_PARKED -> UNPARKED // unparked, may be scheduled to continue 104 * TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier 105 * 106 * RUNNING -> BLOCKING // blocking on monitor enter 107 * BLOCKING -> BLOCKED // blocked on monitor enter 108 * BLOCKED -> UNBLOCKED // unblocked, may be scheduled to continue 109 * UNBLOCKED -> RUNNING // continue execution after blocked on monitor enter 110 * 111 * RUNNING -> WAITING // transitional state during wait on monitor 112 * WAITING -> WAIT // waiting on monitor 113 * WAIT -> BLOCKED // notified, waiting to be unblocked by monitor owner 114 * WAIT -> UNBLOCKED // timed-out/interrupted 115 * 116 * RUNNING -> TIMED_WAITING // transition state during timed-waiting on monitor 117 * TIMED_WAITING -> TIMED_WAIT // timed-waiting on monitor 118 * TIMED_WAIT -> BLOCKED // notified, waiting to be unblocked by monitor owner 119 * TIMED_WAIT -> UNBLOCKED // timed-out/interrupted 120 * 121 * RUNNING -> YIELDING // Thread.yield 122 * YIELDING -> YIELDED // cont.yield successful, may be scheduled to continue 123 * YIELDING -> RUNNING // cont.yield failed 124 * YIELDED -> RUNNING // continue execution after Thread.yield 125 */ 126 private static final int NEW = 0; 127 private static final int STARTED = 1; 128 private static final int RUNNING = 2; // runnable-mounted 129 130 // untimed and timed parking 131 private static final int PARKING = 3; 132 private static final int PARKED = 4; // unmounted 133 private static final int PINNED = 5; // mounted 134 private static final int TIMED_PARKING = 6; 135 private static final int TIMED_PARKED = 7; // unmounted 136 private static final int TIMED_PINNED = 8; // mounted 137 private static final int UNPARKED = 9; // unmounted but runnable 138 139 // Thread.yield 140 private static final int YIELDING = 10; 141 private static final int YIELDED = 11; // unmounted but runnable 142 143 // monitor enter 144 private static final int BLOCKING = 12; 145 private static final int BLOCKED = 13; // unmounted 146 private static final int UNBLOCKED = 14; // unmounted but runnable 147 148 // monitor wait/timed-wait 149 private static final int WAITING = 15; 150 private static final int WAIT = 16; // waiting in Object.wait 151 private static final int TIMED_WAITING = 17; 152 private static final int TIMED_WAIT = 18; // waiting in timed-Object.wait 153 154 private static final int TERMINATED = 99; // final state 155 156 // can be suspended from scheduling when unmounted 157 private static final int SUSPENDED = 1 << 8; 158 159 // parking permit made available by LockSupport.unpark 160 private volatile boolean parkPermit; 161 162 // blocking permit made available by unblocker thread when another thread exits monitor 163 private volatile boolean blockPermit; 164 165 // true when on the list of virtual threads waiting to be unblocked 166 private volatile boolean onWaitingList; 167 168 // next virtual thread on the list of virtual threads waiting to be unblocked 169 private volatile VirtualThread next; 170 171 // notified by Object.notify/notifyAll while waiting in Object.wait 172 private volatile boolean notified; 173 174 // timed-wait support 175 private byte timedWaitSeqNo; 176 177 // timeout for timed-park and timed-wait, only accessed on current/carrier thread 178 private long timeout; 179 180 // timer task for timed-park and timed-wait, only accessed on current/carrier thread 181 private Future<?> timeoutTask; 182 183 // carrier thread when mounted, accessed by VM 184 private volatile Thread carrierThread; 185 186 // termination object when joining, created lazily if needed 187 private volatile CountDownLatch termination; 188 189 /** 190 * Returns the default scheduler. 191 */ 192 static Executor defaultScheduler() { 193 return DEFAULT_SCHEDULER; 194 } 195 196 /** 197 * Returns a stream of the delayed task schedulers used to support timed operations. 198 */ 199 static Stream<ScheduledExecutorService> delayedTaskSchedulers() { 200 return Arrays.stream(DELAYED_TASK_SCHEDULERS); 201 } 202 203 /** 204 * Returns the continuation scope used for virtual threads. 205 */ 206 static ContinuationScope continuationScope() { 207 return VTHREAD_SCOPE; 208 } 209 210 /** 211 * Creates a new {@code VirtualThread} to run the given task with the given 212 * scheduler. If the given scheduler is {@code null} and the current thread 213 * is a platform thread then the newly created virtual thread will use the 214 * default scheduler. If given scheduler is {@code null} and the current 215 * thread is a virtual thread then the current thread's scheduler is used. 216 * 217 * @param scheduler the scheduler or null 218 * @param name thread name 219 * @param characteristics characteristics 220 * @param task the task to execute 221 */ 222 VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) { 223 super(name, characteristics, /*bound*/ false); 224 Objects.requireNonNull(task); 225 226 // choose scheduler if not specified 227 if (scheduler == null) { 228 Thread parent = Thread.currentThread(); 229 if (parent instanceof VirtualThread vparent) { 230 scheduler = vparent.scheduler; 231 } else { 232 scheduler = DEFAULT_SCHEDULER; 233 } 234 } 235 236 this.scheduler = scheduler; 237 this.cont = new VThreadContinuation(this, task); 238 this.runContinuation = this::runContinuation; 239 } 240 241 /** 242 * The continuation that a virtual thread executes. 243 */ 244 private static class VThreadContinuation extends Continuation { 245 VThreadContinuation(VirtualThread vthread, Runnable task) { 246 super(VTHREAD_SCOPE, wrap(vthread, task)); 247 } 248 @Override 249 protected void onPinned(Continuation.Pinned reason) { 250 } 251 private static Runnable wrap(VirtualThread vthread, Runnable task) { 252 return new Runnable() { 253 @Hidden 254 @JvmtiHideEvents 255 public void run() { 256 vthread.notifyJvmtiStart(); // notify JVMTI 257 try { 258 vthread.run(task); 259 } finally { 260 vthread.notifyJvmtiEnd(); // notify JVMTI 261 } 262 } 263 }; 264 } 265 } 266 267 /** 268 * Runs or continues execution on the current thread. The virtual thread is mounted 269 * on the current thread before the task runs or continues. It unmounts when the 270 * task completes or yields. 271 */ 272 @ChangesCurrentThread // allow mount/unmount to be inlined 273 private void runContinuation() { 274 // the carrier must be a platform thread 275 if (Thread.currentThread().isVirtual()) { 276 throw new WrongThreadException(); 277 } 278 279 // set state to RUNNING 280 int initialState = state(); 281 if (initialState == STARTED || initialState == UNPARKED 282 || initialState == UNBLOCKED || initialState == YIELDED) { 283 // newly started or continue after parking/blocking/Thread.yield 284 if (!compareAndSetState(initialState, RUNNING)) { 285 return; 286 } 287 // consume permit when continuing after parking or blocking. If continue 288 // after a timed-park or timed-wait then the timeout task is cancelled. 289 if (initialState == UNPARKED) { 290 cancelTimeoutTask(); 291 setParkPermit(false); 292 } else if (initialState == UNBLOCKED) { 293 cancelTimeoutTask(); 294 blockPermit = false; 295 } 296 } else { 297 // not runnable 298 return; 299 } 300 301 mount(); 302 try { 303 cont.run(); 304 } finally { 305 unmount(); 306 if (cont.isDone()) { 307 afterDone(); 308 } else { 309 afterYield(); 310 } 311 } 312 } 313 314 /** 315 * Cancel timeout task when continuing after timed-park or timed-wait. 316 * The timeout task may be executing, or may have already completed. 317 */ 318 private void cancelTimeoutTask() { 319 if (timeoutTask != null) { 320 timeoutTask.cancel(false); 321 timeoutTask = null; 322 } 323 } 324 325 /** 326 * Submits the runContinuation task to the scheduler. For the default scheduler, 327 * and calling it on a worker thread, the task will be pushed to the local queue, 328 * otherwise it will be pushed to an external submission queue. 329 * @param scheduler the scheduler 330 * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown 331 * @throws RejectedExecutionException 332 */ 333 private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) { 334 boolean done = false; 335 while (!done) { 336 try { 337 // Pin the continuation to prevent the virtual thread from unmounting 338 // when submitting a task. For the default scheduler this ensures that 339 // the carrier doesn't change when pushing a task. For other schedulers 340 // it avoids deadlock that could arise due to carriers and virtual 341 // threads contending for a lock. 342 if (currentThread().isVirtual()) { 343 Continuation.pin(); 344 try { 345 scheduler.execute(runContinuation); 346 } finally { 347 Continuation.unpin(); 348 } 349 } else { 350 scheduler.execute(runContinuation); 351 } 352 done = true; 353 } catch (RejectedExecutionException ree) { 354 submitFailed(ree); 355 throw ree; 356 } catch (OutOfMemoryError e) { 357 if (retryOnOOME) { 358 U.park(false, 100_000_000); // 100ms 359 } else { 360 throw e; 361 } 362 } 363 } 364 } 365 366 /** 367 * Submits the runContinuation task to the given scheduler as an external submit. 368 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 369 * @throws RejectedExecutionException 370 * @see ForkJoinPool#externalSubmit(ForkJoinTask) 371 */ 372 private void externalSubmitRunContinuation(ForkJoinPool pool) { 373 assert Thread.currentThread() instanceof CarrierThread; 374 try { 375 pool.externalSubmit(ForkJoinTask.adapt(runContinuation)); 376 } catch (RejectedExecutionException ree) { 377 submitFailed(ree); 378 throw ree; 379 } catch (OutOfMemoryError e) { 380 submitRunContinuation(pool, true); 381 } 382 } 383 384 /** 385 * Submits the runContinuation task to the scheduler. For the default scheduler, 386 * and calling it on a worker thread, the task will be pushed to the local queue, 387 * otherwise it will be pushed to an external submission queue. 388 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 389 * @throws RejectedExecutionException 390 */ 391 private void submitRunContinuation() { 392 submitRunContinuation(scheduler, true); 393 } 394 395 /** 396 * Lazy submit the runContinuation task if invoked on a carrier thread and its local 397 * queue is empty. If not empty, or invoked by another thread, then this method works 398 * like submitRunContinuation and just submits the task to the scheduler. 399 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 400 * @throws RejectedExecutionException 401 * @see ForkJoinPool#lazySubmit(ForkJoinTask) 402 */ 403 private void lazySubmitRunContinuation() { 404 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 405 ForkJoinPool pool = ct.getPool(); 406 try { 407 pool.lazySubmit(ForkJoinTask.adapt(runContinuation)); 408 } catch (RejectedExecutionException ree) { 409 submitFailed(ree); 410 throw ree; 411 } catch (OutOfMemoryError e) { 412 submitRunContinuation(); 413 } 414 } else { 415 submitRunContinuation(); 416 } 417 } 418 419 /** 420 * Submits the runContinuation task to the scheduler. For the default scheduler, and 421 * calling it a virtual thread that uses the default scheduler, the task will be 422 * pushed to an external submission queue. This method may throw OutOfMemoryError. 423 * @throws RejectedExecutionException 424 * @throws OutOfMemoryError 425 */ 426 private void externalSubmitRunContinuationOrThrow() { 427 if (scheduler == DEFAULT_SCHEDULER && currentCarrierThread() instanceof CarrierThread ct) { 428 try { 429 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation)); 430 } catch (RejectedExecutionException ree) { 431 submitFailed(ree); 432 throw ree; 433 } 434 } else { 435 submitRunContinuation(scheduler, false); 436 } 437 } 438 439 /** 440 * If enabled, emits a JFR VirtualThreadSubmitFailedEvent. 441 */ 442 private void submitFailed(RejectedExecutionException ree) { 443 var event = new VirtualThreadSubmitFailedEvent(); 444 if (event.isEnabled()) { 445 event.javaThreadId = threadId(); 446 event.exceptionMessage = ree.getMessage(); 447 event.commit(); 448 } 449 } 450 451 /** 452 * Runs a task in the context of this virtual thread. 453 */ 454 private void run(Runnable task) { 455 assert Thread.currentThread() == this && state == RUNNING; 456 457 // emit JFR event if enabled 458 if (VirtualThreadStartEvent.isTurnedOn()) { 459 var event = new VirtualThreadStartEvent(); 460 event.javaThreadId = threadId(); 461 event.commit(); 462 } 463 464 Object bindings = Thread.scopedValueBindings(); 465 try { 466 runWith(bindings, task); 467 } catch (Throwable exc) { 468 dispatchUncaughtException(exc); 469 } finally { 470 // pop any remaining scopes from the stack, this may block 471 StackableScope.popAll(); 472 473 // emit JFR event if enabled 474 if (VirtualThreadEndEvent.isTurnedOn()) { 475 var event = new VirtualThreadEndEvent(); 476 event.javaThreadId = threadId(); 477 event.commit(); 478 } 479 } 480 } 481 482 /** 483 * Mounts this virtual thread onto the current platform thread. On 484 * return, the current thread is the virtual thread. 485 */ 486 @ChangesCurrentThread 487 @ReservedStackAccess 488 private void mount() { 489 // notify JVMTI before mount 490 notifyJvmtiMount(/*hide*/true); 491 492 // sets the carrier thread 493 Thread carrier = Thread.currentCarrierThread(); 494 setCarrierThread(carrier); 495 496 // sync up carrier thread interrupt status if needed 497 if (interrupted) { 498 carrier.setInterrupt(); 499 } else if (carrier.isInterrupted()) { 500 synchronized (interruptLock) { 501 // need to recheck interrupt status 502 if (!interrupted) { 503 carrier.clearInterrupt(); 504 } 505 } 506 } 507 508 // set Thread.currentThread() to return this virtual thread 509 carrier.setCurrentThread(this); 510 } 511 512 /** 513 * Unmounts this virtual thread from the carrier. On return, the 514 * current thread is the current platform thread. 515 */ 516 @ChangesCurrentThread 517 @ReservedStackAccess 518 private void unmount() { 519 assert !Thread.holdsLock(interruptLock); 520 521 // set Thread.currentThread() to return the platform thread 522 Thread carrier = this.carrierThread; 523 carrier.setCurrentThread(carrier); 524 525 // break connection to carrier thread, synchronized with interrupt 526 synchronized (interruptLock) { 527 setCarrierThread(null); 528 } 529 carrier.clearInterrupt(); 530 531 // notify JVMTI after unmount 532 notifyJvmtiUnmount(/*hide*/false); 533 } 534 535 /** 536 * Invokes Continuation.yield, notifying JVMTI (if enabled) to hide frames until 537 * the continuation continues. 538 */ 539 @Hidden 540 private boolean yieldContinuation() { 541 notifyJvmtiUnmount(/*hide*/true); 542 try { 543 return Continuation.yield(VTHREAD_SCOPE); 544 } finally { 545 notifyJvmtiMount(/*hide*/false); 546 } 547 } 548 549 /** 550 * Invoked in the context of the carrier thread after the Continuation yields when 551 * parking, blocking on monitor enter, Object.wait, or Thread.yield. 552 */ 553 private void afterYield() { 554 assert carrierThread == null; 555 556 // re-adjust parallelism if the virtual thread yielded when compensating 557 if (currentThread() instanceof CarrierThread ct) { 558 ct.endBlocking(); 559 } 560 561 int s = state(); 562 563 // LockSupport.park/parkNanos 564 if (s == PARKING || s == TIMED_PARKING) { 565 int newState; 566 if (s == PARKING) { 567 setState(newState = PARKED); 568 } else { 569 // schedule unpark 570 assert timeout > 0; 571 timeoutTask = schedule(this::unpark, timeout, NANOSECONDS); 572 setState(newState = TIMED_PARKED); 573 } 574 575 // may have been unparked while parking 576 if (parkPermit && compareAndSetState(newState, UNPARKED)) { 577 // lazy submit if local queue is empty 578 lazySubmitRunContinuation(); 579 } 580 return; 581 } 582 583 // Thread.yield 584 if (s == YIELDING) { 585 setState(YIELDED); 586 587 // external submit if there are no tasks in the local task queue 588 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 589 externalSubmitRunContinuation(ct.getPool()); 590 } else { 591 submitRunContinuation(); 592 } 593 return; 594 } 595 596 // blocking on monitorenter 597 if (s == BLOCKING) { 598 setState(BLOCKED); 599 600 // may have been unblocked while blocking 601 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) { 602 // lazy submit if local queue is empty 603 lazySubmitRunContinuation(); 604 } 605 return; 606 } 607 608 // Object.wait 609 if (s == WAITING || s == TIMED_WAITING) { 610 int newState; 611 if (s == WAITING) { 612 setState(newState = WAIT); 613 } else { 614 // For timed-wait, a timeout task is scheduled to execute. The timeout 615 // task will change the thread state to UNBLOCKED and submit the thread 616 // to the scheduler. A sequence number is used to ensure that the timeout 617 // task only unblocks the thread for this timed-wait. We synchronize with 618 // the timeout task to coordinate access to the sequence number and to 619 // ensure the timeout task doesn't execute until the thread has got to 620 // the TIMED_WAIT state. 621 assert timeout > 0; 622 synchronized (timedWaitLock()) { 623 byte seqNo = ++timedWaitSeqNo; 624 timeoutTask = schedule(() -> waitTimeoutExpired(seqNo), timeout, MILLISECONDS); 625 setState(newState = TIMED_WAIT); 626 } 627 } 628 629 // may have been notified while in transition to wait state 630 if (notified && compareAndSetState(newState, BLOCKED)) { 631 // may have even been unblocked already 632 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) { 633 submitRunContinuation(); 634 } 635 return; 636 } 637 638 // may have been interrupted while in transition to wait state 639 if (interrupted && compareAndSetState(newState, UNBLOCKED)) { 640 submitRunContinuation(); 641 return; 642 } 643 return; 644 } 645 646 assert false; 647 } 648 649 /** 650 * Invoked after the continuation completes. 651 */ 652 private void afterDone() { 653 afterDone(true); 654 } 655 656 /** 657 * Invoked after the continuation completes (or start failed). Sets the thread 658 * state to TERMINATED and notifies anyone waiting for the thread to terminate. 659 * 660 * @param notifyContainer true if its container should be notified 661 */ 662 private void afterDone(boolean notifyContainer) { 663 assert carrierThread == null; 664 setState(TERMINATED); 665 666 // notify anyone waiting for this virtual thread to terminate 667 CountDownLatch termination = this.termination; 668 if (termination != null) { 669 assert termination.getCount() == 1; 670 termination.countDown(); 671 } 672 673 // notify container 674 if (notifyContainer) { 675 threadContainer().onExit(this); 676 } 677 678 // clear references to thread locals 679 clearReferences(); 680 } 681 682 /** 683 * Schedules this {@code VirtualThread} to execute. 684 * 685 * @throws IllegalStateException if the container is shutdown or closed 686 * @throws IllegalThreadStateException if the thread has already been started 687 * @throws RejectedExecutionException if the scheduler cannot accept a task 688 */ 689 @Override 690 void start(ThreadContainer container) { 691 if (!compareAndSetState(NEW, STARTED)) { 692 throw new IllegalThreadStateException("Already started"); 693 } 694 695 // bind thread to container 696 assert threadContainer() == null; 697 setThreadContainer(container); 698 699 // start thread 700 boolean addedToContainer = false; 701 boolean started = false; 702 try { 703 container.onStart(this); // may throw 704 addedToContainer = true; 705 706 // scoped values may be inherited 707 inheritScopedValueBindings(container); 708 709 // submit task to run thread, using externalSubmit if possible 710 externalSubmitRunContinuationOrThrow(); 711 started = true; 712 } finally { 713 if (!started) { 714 afterDone(addedToContainer); 715 } 716 } 717 } 718 719 @Override 720 public void start() { 721 start(ThreadContainers.root()); 722 } 723 724 @Override 725 public void run() { 726 // do nothing 727 } 728 729 /** 730 * Parks until unparked or interrupted. If already unparked then the parking 731 * permit is consumed and this method completes immediately (meaning it doesn't 732 * yield). It also completes immediately if the interrupt status is set. 733 */ 734 @Override 735 void park() { 736 assert Thread.currentThread() == this; 737 738 // complete immediately if parking permit available or interrupted 739 if (getAndSetParkPermit(false) || interrupted) 740 return; 741 742 // park the thread 743 boolean yielded = false; 744 setState(PARKING); 745 try { 746 yielded = yieldContinuation(); 747 } catch (OutOfMemoryError e) { 748 // park on carrier 749 } finally { 750 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 751 if (!yielded) { 752 assert state() == PARKING; 753 setState(RUNNING); 754 } 755 } 756 757 // park on the carrier thread when pinned 758 if (!yielded) { 759 parkOnCarrierThread(false, 0); 760 } 761 } 762 763 /** 764 * Parks up to the given waiting time or until unparked or interrupted. 765 * If already unparked then the parking permit is consumed and this method 766 * completes immediately (meaning it doesn't yield). It also completes immediately 767 * if the interrupt status is set or the waiting time is {@code <= 0}. 768 * 769 * @param nanos the maximum number of nanoseconds to wait. 770 */ 771 @Override 772 void parkNanos(long nanos) { 773 assert Thread.currentThread() == this; 774 775 // complete immediately if parking permit available or interrupted 776 if (getAndSetParkPermit(false) || interrupted) 777 return; 778 779 // park the thread for the waiting time 780 if (nanos > 0) { 781 long startTime = System.nanoTime(); 782 783 // park the thread, afterYield will schedule the thread to unpark 784 boolean yielded = false; 785 timeout = nanos; 786 setState(TIMED_PARKING); 787 try { 788 yielded = yieldContinuation(); 789 } catch (OutOfMemoryError e) { 790 // park on carrier 791 } finally { 792 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 793 if (!yielded) { 794 assert state() == TIMED_PARKING; 795 setState(RUNNING); 796 } 797 } 798 799 // park on carrier thread for remaining time when pinned (or OOME) 800 if (!yielded) { 801 long remainingNanos = nanos - (System.nanoTime() - startTime); 802 parkOnCarrierThread(true, remainingNanos); 803 } 804 } 805 } 806 807 /** 808 * Parks the current carrier thread up to the given waiting time or until 809 * unparked or interrupted. If the virtual thread is interrupted then the 810 * interrupt status will be propagated to the carrier thread. 811 * @param timed true for a timed park, false for untimed 812 * @param nanos the waiting time in nanoseconds 813 */ 814 private void parkOnCarrierThread(boolean timed, long nanos) { 815 assert state() == RUNNING; 816 817 setState(timed ? TIMED_PINNED : PINNED); 818 try { 819 if (!parkPermit) { 820 if (!timed) { 821 U.park(false, 0); 822 } else if (nanos > 0) { 823 U.park(false, nanos); 824 } 825 } 826 } finally { 827 setState(RUNNING); 828 } 829 830 // consume parking permit 831 setParkPermit(false); 832 833 // JFR jdk.VirtualThreadPinned event 834 postPinnedEvent("LockSupport.park"); 835 } 836 837 /** 838 * Call into VM when pinned to record a JFR jdk.VirtualThreadPinned event. 839 * Recording the event in the VM avoids having JFR event recorded in Java 840 * with the same name, but different ID, to events recorded by the VM. 841 */ 842 @Hidden 843 private static native void postPinnedEvent(String op); 844 845 /** 846 * Re-enables this virtual thread for scheduling. If this virtual thread is parked 847 * then its task is scheduled to continue, otherwise its next call to {@code park} or 848 * {@linkplain #parkNanos(long) parkNanos} is guaranteed not to block. 849 * @throws RejectedExecutionException if the scheduler cannot accept a task 850 */ 851 @Override 852 void unpark() { 853 if (!getAndSetParkPermit(true) && currentThread() != this) { 854 int s = state(); 855 856 // unparked while parked 857 if ((s == PARKED || s == TIMED_PARKED) && compareAndSetState(s, UNPARKED)) { 858 submitRunContinuation(); 859 return; 860 } 861 862 // unparked while parked when pinned 863 if (s == PINNED || s == TIMED_PINNED) { 864 // unpark carrier thread when pinned 865 disableSuspendAndPreempt(); 866 try { 867 synchronized (carrierThreadAccessLock()) { 868 Thread carrier = carrierThread; 869 if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) { 870 U.unpark(carrier); 871 } 872 } 873 } finally { 874 enableSuspendAndPreempt(); 875 } 876 return; 877 } 878 } 879 } 880 881 /** 882 * Invoked by unblocker thread to unblock this virtual thread. 883 */ 884 private void unblock() { 885 assert !Thread.currentThread().isVirtual(); 886 blockPermit = true; 887 if (state() == BLOCKED && compareAndSetState(BLOCKED, UNBLOCKED)) { 888 submitRunContinuation(); 889 } 890 } 891 892 /** 893 * Invoked by timer thread when wait timeout for virtual thread has expired. 894 * If the virtual thread is in timed-wait then this method will unblock the thread 895 * and submit its task so that it continues and attempts to reenter the monitor. 896 * This method does nothing if the thread has been woken by notify or interrupt. 897 */ 898 private void waitTimeoutExpired(byte seqNo) { 899 assert !Thread.currentThread().isVirtual(); 900 for (;;) { 901 boolean unblocked = false; 902 synchronized (timedWaitLock()) { 903 if (seqNo != timedWaitSeqNo) { 904 // this timeout task is for a past timed-wait 905 return; 906 } 907 int s = state(); 908 if (s == TIMED_WAIT) { 909 unblocked = compareAndSetState(TIMED_WAIT, UNBLOCKED); 910 } else if (s != (TIMED_WAIT | SUSPENDED)) { 911 // notified or interrupted, no longer waiting 912 return; 913 } 914 } 915 if (unblocked) { 916 submitRunContinuation(); 917 return; 918 } 919 // need to retry when thread is suspended in time-wait 920 Thread.yield(); 921 } 922 } 923 924 /** 925 * Attempts to yield the current virtual thread (Thread.yield). 926 */ 927 void tryYield() { 928 assert Thread.currentThread() == this; 929 setState(YIELDING); 930 boolean yielded = false; 931 try { 932 yielded = yieldContinuation(); // may throw 933 } finally { 934 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 935 if (!yielded) { 936 assert state() == YIELDING; 937 setState(RUNNING); 938 } 939 } 940 } 941 942 /** 943 * Sleep the current thread for the given sleep time (in nanoseconds). If 944 * nanos is 0 then the thread will attempt to yield. 945 * 946 * @implNote This implementation parks the thread for the given sleeping time 947 * and will therefore be observed in PARKED state during the sleep. Parking 948 * will consume the parking permit so this method makes available the parking 949 * permit after the sleep. This may be observed as a spurious, but benign, 950 * wakeup when the thread subsequently attempts to park. 951 * 952 * @param nanos the maximum number of nanoseconds to sleep 953 * @throws InterruptedException if interrupted while sleeping 954 */ 955 void sleepNanos(long nanos) throws InterruptedException { 956 assert Thread.currentThread() == this && nanos >= 0; 957 if (getAndClearInterrupt()) 958 throw new InterruptedException(); 959 if (nanos == 0) { 960 tryYield(); 961 } else { 962 // park for the sleep time 963 try { 964 long remainingNanos = nanos; 965 long startNanos = System.nanoTime(); 966 while (remainingNanos > 0) { 967 parkNanos(remainingNanos); 968 if (getAndClearInterrupt()) { 969 throw new InterruptedException(); 970 } 971 remainingNanos = nanos - (System.nanoTime() - startNanos); 972 } 973 } finally { 974 // may have been unparked while sleeping 975 setParkPermit(true); 976 } 977 } 978 } 979 980 /** 981 * Waits up to {@code nanos} nanoseconds for this virtual thread to terminate. 982 * A timeout of {@code 0} means to wait forever. 983 * 984 * @throws InterruptedException if interrupted while waiting 985 * @return true if the thread has terminated 986 */ 987 boolean joinNanos(long nanos) throws InterruptedException { 988 if (state() == TERMINATED) 989 return true; 990 991 // ensure termination object exists, then re-check state 992 CountDownLatch termination = getTermination(); 993 if (state() == TERMINATED) 994 return true; 995 996 // wait for virtual thread to terminate 997 if (nanos == 0) { 998 termination.await(); 999 } else { 1000 boolean terminated = termination.await(nanos, NANOSECONDS); 1001 if (!terminated) { 1002 // waiting time elapsed 1003 return false; 1004 } 1005 } 1006 assert state() == TERMINATED; 1007 return true; 1008 } 1009 1010 @Override 1011 void blockedOn(Interruptible b) { 1012 disableSuspendAndPreempt(); 1013 try { 1014 super.blockedOn(b); 1015 } finally { 1016 enableSuspendAndPreempt(); 1017 } 1018 } 1019 1020 @Override 1021 public void interrupt() { 1022 if (Thread.currentThread() != this) { 1023 // if current thread is a virtual thread then prevent it from being 1024 // suspended or unmounted when entering or holding interruptLock 1025 Interruptible blocker; 1026 disableSuspendAndPreempt(); 1027 try { 1028 synchronized (interruptLock) { 1029 interrupted = true; 1030 blocker = nioBlocker(); 1031 if (blocker != null) { 1032 blocker.interrupt(this); 1033 } 1034 1035 // interrupt carrier thread if mounted 1036 Thread carrier = carrierThread; 1037 if (carrier != null) carrier.setInterrupt(); 1038 } 1039 } finally { 1040 enableSuspendAndPreempt(); 1041 } 1042 1043 // notify blocker after releasing interruptLock 1044 if (blocker != null) { 1045 blocker.postInterrupt(); 1046 } 1047 1048 // make available parking permit, unpark thread if parked 1049 unpark(); 1050 1051 // if thread is waiting in Object.wait then schedule to try to reenter 1052 int s = state(); 1053 if ((s == WAIT || s == TIMED_WAIT) && compareAndSetState(s, UNBLOCKED)) { 1054 submitRunContinuation(); 1055 } 1056 1057 } else { 1058 interrupted = true; 1059 carrierThread.setInterrupt(); 1060 setParkPermit(true); 1061 } 1062 } 1063 1064 @Override 1065 public boolean isInterrupted() { 1066 return interrupted; 1067 } 1068 1069 @Override 1070 boolean getAndClearInterrupt() { 1071 assert Thread.currentThread() == this; 1072 boolean oldValue = interrupted; 1073 if (oldValue) { 1074 disableSuspendAndPreempt(); 1075 try { 1076 synchronized (interruptLock) { 1077 interrupted = false; 1078 carrierThread.clearInterrupt(); 1079 } 1080 } finally { 1081 enableSuspendAndPreempt(); 1082 } 1083 } 1084 return oldValue; 1085 } 1086 1087 @Override 1088 Thread.State threadState() { 1089 int s = state(); 1090 switch (s & ~SUSPENDED) { 1091 case NEW: 1092 return Thread.State.NEW; 1093 case STARTED: 1094 // return NEW if thread container not yet set 1095 if (threadContainer() == null) { 1096 return Thread.State.NEW; 1097 } else { 1098 return Thread.State.RUNNABLE; 1099 } 1100 case UNPARKED: 1101 case UNBLOCKED: 1102 case YIELDED: 1103 // runnable, not mounted 1104 return Thread.State.RUNNABLE; 1105 case RUNNING: 1106 // if mounted then return state of carrier thread 1107 if (Thread.currentThread() != this) { 1108 disableSuspendAndPreempt(); 1109 try { 1110 synchronized (carrierThreadAccessLock()) { 1111 Thread carrierThread = this.carrierThread; 1112 if (carrierThread != null) { 1113 return carrierThread.threadState(); 1114 } 1115 } 1116 } finally { 1117 enableSuspendAndPreempt(); 1118 } 1119 } 1120 // runnable, mounted 1121 return Thread.State.RUNNABLE; 1122 case PARKING: 1123 case TIMED_PARKING: 1124 case WAITING: 1125 case TIMED_WAITING: 1126 case YIELDING: 1127 // runnable, in transition 1128 return Thread.State.RUNNABLE; 1129 case PARKED: 1130 case PINNED: 1131 case WAIT: 1132 return Thread.State.WAITING; 1133 case TIMED_PARKED: 1134 case TIMED_PINNED: 1135 case TIMED_WAIT: 1136 return Thread.State.TIMED_WAITING; 1137 case BLOCKING: 1138 case BLOCKED: 1139 return Thread.State.BLOCKED; 1140 case TERMINATED: 1141 return Thread.State.TERMINATED; 1142 default: 1143 throw new InternalError(); 1144 } 1145 } 1146 1147 @Override 1148 boolean alive() { 1149 int s = state; 1150 return (s != NEW && s != TERMINATED); 1151 } 1152 1153 @Override 1154 boolean isTerminated() { 1155 return (state == TERMINATED); 1156 } 1157 1158 @Override 1159 StackTraceElement[] asyncGetStackTrace() { 1160 StackTraceElement[] stackTrace; 1161 do { 1162 stackTrace = (carrierThread != null) 1163 ? super.asyncGetStackTrace() // mounted 1164 : tryGetStackTrace(); // unmounted 1165 if (stackTrace == null) { 1166 Thread.yield(); 1167 } 1168 } while (stackTrace == null); 1169 return stackTrace; 1170 } 1171 1172 /** 1173 * Returns the stack trace for this virtual thread if it is unmounted. 1174 * Returns null if the thread is mounted or in transition. 1175 */ 1176 private StackTraceElement[] tryGetStackTrace() { 1177 int initialState = state() & ~SUSPENDED; 1178 switch (initialState) { 1179 case NEW, STARTED, TERMINATED -> { 1180 return new StackTraceElement[0]; // unmounted, empty stack 1181 } 1182 case RUNNING, PINNED, TIMED_PINNED -> { 1183 return null; // mounted 1184 } 1185 case PARKED, TIMED_PARKED, BLOCKED, WAIT, TIMED_WAIT -> { 1186 // unmounted, not runnable 1187 } 1188 case UNPARKED, UNBLOCKED, YIELDED -> { 1189 // unmounted, runnable 1190 } 1191 case PARKING, TIMED_PARKING, BLOCKING, YIELDING, WAITING, TIMED_WAITING -> { 1192 return null; // in transition 1193 } 1194 default -> throw new InternalError("" + initialState); 1195 } 1196 1197 // thread is unmounted, prevent it from continuing 1198 int suspendedState = initialState | SUSPENDED; 1199 if (!compareAndSetState(initialState, suspendedState)) { 1200 return null; 1201 } 1202 1203 // get stack trace and restore state 1204 StackTraceElement[] stack; 1205 try { 1206 stack = cont.getStackTrace(); 1207 } finally { 1208 assert state == suspendedState; 1209 setState(initialState); 1210 } 1211 boolean resubmit = switch (initialState) { 1212 case UNPARKED, UNBLOCKED, YIELDED -> { 1213 // resubmit as task may have run while suspended 1214 yield true; 1215 } 1216 case PARKED, TIMED_PARKED -> { 1217 // resubmit if unparked while suspended 1218 yield parkPermit && compareAndSetState(initialState, UNPARKED); 1219 } 1220 case BLOCKED -> { 1221 // resubmit if unblocked while suspended 1222 yield blockPermit && compareAndSetState(BLOCKED, UNBLOCKED); 1223 } 1224 case WAIT, TIMED_WAIT -> { 1225 // resubmit if notified or interrupted while waiting (Object.wait) 1226 // waitTimeoutExpired will retry if the timed expired when suspended 1227 yield (notified || interrupted) && compareAndSetState(initialState, UNBLOCKED); 1228 } 1229 default -> throw new InternalError(); 1230 }; 1231 if (resubmit) { 1232 submitRunContinuation(); 1233 } 1234 return stack; 1235 } 1236 1237 @Override 1238 public String toString() { 1239 StringBuilder sb = new StringBuilder("VirtualThread[#"); 1240 sb.append(threadId()); 1241 String name = getName(); 1242 if (!name.isEmpty()) { 1243 sb.append(","); 1244 sb.append(name); 1245 } 1246 sb.append("]/"); 1247 1248 // add the carrier state and thread name when mounted 1249 boolean mounted; 1250 if (Thread.currentThread() == this) { 1251 mounted = appendCarrierInfo(sb); 1252 } else { 1253 disableSuspendAndPreempt(); 1254 try { 1255 synchronized (carrierThreadAccessLock()) { 1256 mounted = appendCarrierInfo(sb); 1257 } 1258 } finally { 1259 enableSuspendAndPreempt(); 1260 } 1261 } 1262 1263 // add virtual thread state when not mounted 1264 if (!mounted) { 1265 String stateAsString = threadState().toString(); 1266 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1267 } 1268 1269 return sb.toString(); 1270 } 1271 1272 /** 1273 * Appends the carrier state and thread name to the string buffer if mounted. 1274 * @return true if mounted, false if not mounted 1275 */ 1276 private boolean appendCarrierInfo(StringBuilder sb) { 1277 assert Thread.currentThread() == this || Thread.holdsLock(carrierThreadAccessLock()); 1278 Thread carrier = carrierThread; 1279 if (carrier != null) { 1280 String stateAsString = carrier.threadState().toString(); 1281 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1282 sb.append('@'); 1283 sb.append(carrier.getName()); 1284 return true; 1285 } else { 1286 return false; 1287 } 1288 } 1289 1290 @Override 1291 public int hashCode() { 1292 return (int) threadId(); 1293 } 1294 1295 @Override 1296 public boolean equals(Object obj) { 1297 return obj == this; 1298 } 1299 1300 /** 1301 * Returns the termination object, creating it if needed. 1302 */ 1303 private CountDownLatch getTermination() { 1304 CountDownLatch termination = this.termination; 1305 if (termination == null) { 1306 termination = new CountDownLatch(1); 1307 if (!U.compareAndSetReference(this, TERMINATION, null, termination)) { 1308 termination = this.termination; 1309 } 1310 } 1311 return termination; 1312 } 1313 1314 /** 1315 * Returns the lock object to synchronize on when accessing carrierThread. 1316 * The lock prevents carrierThread from being reset to null during unmount. 1317 */ 1318 private Object carrierThreadAccessLock() { 1319 // return interruptLock as unmount has to coordinate with interrupt 1320 return interruptLock; 1321 } 1322 1323 /** 1324 * Returns a lock object for coordinating timed-wait setup and timeout handling. 1325 */ 1326 private Object timedWaitLock() { 1327 // use this object for now to avoid the overhead of introducing another lock 1328 return runContinuation; 1329 } 1330 1331 /** 1332 * Disallow the current thread be suspended or preempted. 1333 */ 1334 private void disableSuspendAndPreempt() { 1335 notifyJvmtiDisableSuspend(true); 1336 Continuation.pin(); 1337 } 1338 1339 /** 1340 * Allow the current thread be suspended or preempted. 1341 */ 1342 private void enableSuspendAndPreempt() { 1343 Continuation.unpin(); 1344 notifyJvmtiDisableSuspend(false); 1345 } 1346 1347 // -- wrappers for get/set of state, parking permit, and carrier thread -- 1348 1349 private int state() { 1350 return state; // volatile read 1351 } 1352 1353 private void setState(int newValue) { 1354 state = newValue; // volatile write 1355 } 1356 1357 private boolean compareAndSetState(int expectedValue, int newValue) { 1358 return U.compareAndSetInt(this, STATE, expectedValue, newValue); 1359 } 1360 1361 private boolean compareAndSetOnWaitingList(boolean expectedValue, boolean newValue) { 1362 return U.compareAndSetBoolean(this, ON_WAITING_LIST, expectedValue, newValue); 1363 } 1364 1365 private void setParkPermit(boolean newValue) { 1366 if (parkPermit != newValue) { 1367 parkPermit = newValue; 1368 } 1369 } 1370 1371 private boolean getAndSetParkPermit(boolean newValue) { 1372 if (parkPermit != newValue) { 1373 return U.getAndSetBoolean(this, PARK_PERMIT, newValue); 1374 } else { 1375 return newValue; 1376 } 1377 } 1378 1379 private void setCarrierThread(Thread carrier) { 1380 // U.putReferenceRelease(this, CARRIER_THREAD, carrier); 1381 this.carrierThread = carrier; 1382 } 1383 1384 // -- JVM TI support -- 1385 1386 @IntrinsicCandidate 1387 @JvmtiMountTransition 1388 private native void notifyJvmtiStart(); 1389 1390 @IntrinsicCandidate 1391 @JvmtiMountTransition 1392 private native void notifyJvmtiEnd(); 1393 1394 @IntrinsicCandidate 1395 @JvmtiMountTransition 1396 private native void notifyJvmtiMount(boolean hide); 1397 1398 @IntrinsicCandidate 1399 @JvmtiMountTransition 1400 private native void notifyJvmtiUnmount(boolean hide); 1401 1402 @IntrinsicCandidate 1403 private static native void notifyJvmtiDisableSuspend(boolean enter); 1404 1405 private static native void registerNatives(); 1406 static { 1407 registerNatives(); 1408 1409 // ensure VTHREAD_GROUP is created, may be accessed by JVMTI 1410 var group = Thread.virtualThreadGroup(); 1411 } 1412 1413 /** 1414 * Creates the default ForkJoinPool scheduler. 1415 */ 1416 private static ForkJoinPool createDefaultScheduler() { 1417 ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool); 1418 int parallelism, maxPoolSize, minRunnable; 1419 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism"); 1420 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize"); 1421 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable"); 1422 if (parallelismValue != null) { 1423 parallelism = Integer.parseInt(parallelismValue); 1424 } else { 1425 parallelism = Runtime.getRuntime().availableProcessors(); 1426 } 1427 if (maxPoolSizeValue != null) { 1428 maxPoolSize = Integer.parseInt(maxPoolSizeValue); 1429 parallelism = Integer.min(parallelism, maxPoolSize); 1430 } else { 1431 maxPoolSize = Integer.max(parallelism, 256); 1432 } 1433 if (minRunnableValue != null) { 1434 minRunnable = Integer.parseInt(minRunnableValue); 1435 } else { 1436 minRunnable = Integer.max(parallelism / 2, 1); 1437 } 1438 Thread.UncaughtExceptionHandler handler = (t, e) -> { }; 1439 boolean asyncMode = true; // FIFO 1440 return new ForkJoinPool(parallelism, factory, handler, asyncMode, 1441 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS); 1442 } 1443 1444 /** 1445 * Schedule a runnable task to run after a delay. 1446 */ 1447 private static Future<?> schedule(Runnable command, long delay, TimeUnit unit) { 1448 long tid = Thread.currentThread().threadId(); 1449 int index = (int) tid & (DELAYED_TASK_SCHEDULERS.length - 1); 1450 return DELAYED_TASK_SCHEDULERS[index].schedule(command, delay, unit); 1451 } 1452 1453 /** 1454 * Creates the ScheduledThreadPoolExecutors used to execute delayed tasks. 1455 */ 1456 private static ScheduledExecutorService[] createDelayedTaskSchedulers() { 1457 String propName = "jdk.virtualThreadScheduler.timerQueues"; 1458 String propValue = System.getProperty(propName); 1459 int queueCount; 1460 if (propValue != null) { 1461 queueCount = Integer.parseInt(propValue); 1462 if (queueCount != Integer.highestOneBit(queueCount)) { 1463 throw new RuntimeException("Value of " + propName + " must be power of 2"); 1464 } 1465 } else { 1466 int ncpus = Runtime.getRuntime().availableProcessors(); 1467 queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1); 1468 } 1469 var schedulers = new ScheduledExecutorService[queueCount]; 1470 for (int i = 0; i < queueCount; i++) { 1471 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) 1472 Executors.newScheduledThreadPool(1, task -> { 1473 Thread t = InnocuousThread.newThread("VirtualThread-unparker", task); 1474 t.setDaemon(true); 1475 return t; 1476 }); 1477 stpe.setRemoveOnCancelPolicy(true); 1478 schedulers[i] = stpe; 1479 } 1480 return schedulers; 1481 } 1482 1483 /** 1484 * Schedule virtual threads that are ready to be scheduled after they blocked on 1485 * monitor enter. 1486 */ 1487 private static void unblockVirtualThreads() { 1488 while (true) { 1489 VirtualThread vthread = takeVirtualThreadListToUnblock(); 1490 while (vthread != null) { 1491 assert vthread.onWaitingList; 1492 VirtualThread nextThread = vthread.next; 1493 1494 // remove from list and unblock 1495 vthread.next = null; 1496 boolean changed = vthread.compareAndSetOnWaitingList(true, false); 1497 assert changed; 1498 vthread.unblock(); 1499 1500 vthread = nextThread; 1501 } 1502 } 1503 } 1504 1505 /** 1506 * Retrieves the list of virtual threads that are waiting to be unblocked, waiting 1507 * if necessary until a list of one or more threads becomes available. 1508 */ 1509 private static native VirtualThread takeVirtualThreadListToUnblock(); 1510 1511 static { 1512 var unblocker = InnocuousThread.newThread("VirtualThread-unblocker", 1513 VirtualThread::unblockVirtualThreads); 1514 unblocker.setDaemon(true); 1515 unblocker.start(); 1516 } 1517 }