1 /* 2 * Copyright (c) 2018, 2024, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.lang; 26 27 import java.util.Locale; 28 import java.util.Objects; 29 import java.util.concurrent.CountDownLatch; 30 import java.util.concurrent.Executor; 31 import java.util.concurrent.Executors; 32 import java.util.concurrent.ForkJoinPool; 33 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory; 34 import java.util.concurrent.ForkJoinTask; 35 import java.util.concurrent.ForkJoinWorkerThread; 36 import java.util.concurrent.Future; 37 import java.util.concurrent.RejectedExecutionException; 38 import java.util.concurrent.ScheduledExecutorService; 39 import java.util.concurrent.ScheduledThreadPoolExecutor; 40 import java.util.concurrent.TimeUnit; 41 import jdk.internal.event.VirtualThreadEndEvent; 42 import jdk.internal.event.VirtualThreadStartEvent; 43 import jdk.internal.event.VirtualThreadSubmitFailedEvent; 44 import jdk.internal.misc.CarrierThread; 45 import jdk.internal.misc.InnocuousThread; 46 import jdk.internal.misc.Unsafe; 47 import jdk.internal.vm.Continuation; 48 import jdk.internal.vm.ContinuationScope; 49 import jdk.internal.vm.StackableScope; 50 import jdk.internal.vm.ThreadContainer; 51 import jdk.internal.vm.ThreadContainers; 52 import jdk.internal.vm.annotation.ChangesCurrentThread; 53 import jdk.internal.vm.annotation.Hidden; 54 import jdk.internal.vm.annotation.IntrinsicCandidate; 55 import jdk.internal.vm.annotation.JvmtiHideEvents; 56 import jdk.internal.vm.annotation.JvmtiMountTransition; 57 import jdk.internal.vm.annotation.ReservedStackAccess; 58 import sun.nio.ch.Interruptible; 59 import static java.util.concurrent.TimeUnit.*; 60 61 /** 62 * A thread that is scheduled by the Java virtual machine rather than the operating system. 63 */ 64 final class VirtualThread extends BaseVirtualThread { 65 private static final Unsafe U = Unsafe.getUnsafe(); 66 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads"); 67 private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler(); 68 private static final ScheduledExecutorService[] DELAYED_TASK_SCHEDULERS = createDelayedTaskSchedulers(); 69 70 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state"); 71 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit"); 72 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread"); 73 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination"); 74 private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList"); 75 76 // scheduler and continuation 77 private final Executor scheduler; 78 private final Continuation cont; 79 private final Runnable runContinuation; 80 81 // virtual thread state, accessed by VM 82 private volatile int state; 83 84 /* 85 * Virtual thread state transitions: 86 * 87 * NEW -> STARTED // Thread.start, schedule to run 88 * STARTED -> TERMINATED // failed to start 89 * STARTED -> RUNNING // first run 90 * RUNNING -> TERMINATED // done 91 * 92 * RUNNING -> PARKING // Thread parking with LockSupport.park 93 * PARKING -> PARKED // cont.yield successful, parked indefinitely 94 * PARKING -> PINNED // cont.yield failed, parked indefinitely on carrier 95 * PARKED -> UNPARKED // unparked, may be scheduled to continue 96 * PINNED -> RUNNING // unparked, continue execution on same carrier 97 * UNPARKED -> RUNNING // continue execution after park 98 * 99 * RUNNING -> TIMED_PARKING // Thread parking with LockSupport.parkNanos 100 * TIMED_PARKING -> TIMED_PARKED // cont.yield successful, timed-parked 101 * TIMED_PARKING -> TIMED_PINNED // cont.yield failed, timed-parked on carrier 102 * TIMED_PARKED -> UNPARKED // unparked, may be scheduled to continue 103 * TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier 104 * 105 * RUNNING -> BLOCKING // blocking on monitor enter 106 * BLOCKING -> BLOCKED // blocked on monitor enter 107 * BLOCKED -> UNBLOCKED // unblocked, may be scheduled to continue 108 * UNBLOCKED -> RUNNING // continue execution after blocked on monitor enter 109 * 110 * RUNNING -> WAITING // transitional state during wait on monitor 111 * WAITING -> WAITED // waiting on monitor 112 * WAITED -> BLOCKED // notified, waiting to be unblocked by monitor owner 113 * WAITED -> UNBLOCKED // timed-out/interrupted 114 * 115 * RUNNING -> TIMED_WAITING // transition state during timed-waiting on monitor 116 * TIMED_WAITING -> TIMED_WAITED // timed-waiting on monitor 117 * TIMED_WAITED -> BLOCKED // notified, waiting to be unblocked by monitor owner 118 * TIMED_WAITED -> UNBLOCKED // timed-out/interrupted 119 * 120 * RUNNING -> YIELDING // Thread.yield 121 * YIELDING -> YIELDED // cont.yield successful, may be scheduled to continue 122 * YIELDING -> RUNNING // cont.yield failed 123 * YIELDED -> RUNNING // continue execution after Thread.yield 124 */ 125 private static final int NEW = 0; 126 private static final int STARTED = 1; 127 private static final int RUNNING = 2; // runnable-mounted 128 129 // untimed and timed parking 130 private static final int PARKING = 3; 131 private static final int PARKED = 4; // unmounted 132 private static final int PINNED = 5; // mounted 133 private static final int TIMED_PARKING = 6; 134 private static final int TIMED_PARKED = 7; // unmounted 135 private static final int TIMED_PINNED = 8; // mounted 136 private static final int UNPARKED = 9; // unmounted but runnable 137 138 // Thread.yield 139 private static final int YIELDING = 10; 140 private static final int YIELDED = 11; // unmounted but runnable 141 142 // monitor enter 143 private static final int BLOCKING = 12; 144 private static final int BLOCKED = 13; // unmounted 145 private static final int UNBLOCKED = 14; // unmounted but runnable 146 147 // monitor wait/timed-wait 148 private static final int WAITING = 15; 149 private static final int WAIT = 16; // waiting in Object.wait 150 private static final int TIMED_WAITING = 17; 151 private static final int TIMED_WAIT = 18; // waiting in timed-Object.wait 152 153 private static final int TERMINATED = 99; // final state 154 155 // can be suspended from scheduling when unmounted 156 private static final int SUSPENDED = 1 << 8; 157 158 // parking permit made available by LockSupport.unpark 159 private volatile boolean parkPermit; 160 161 // blocking permit made available by unblocker thread when another thread exits monitor 162 private volatile boolean blockPermit; 163 164 // true when on the list of virtual threads waiting to be unblocked 165 private volatile boolean onWaitingList; 166 167 // next virtual thread on the list of virtual threads waiting to be unblocked 168 private volatile VirtualThread next; 169 170 // notified by Object.notify/notifyAll while waiting in Object.wait 171 private volatile boolean notified; 172 173 // timed-wait support 174 private byte timedWaitSeqNo; 175 176 // timeout for timed-park and timed-wait, only accessed on current/carrier thread 177 private long timeout; 178 179 // timer task for timed-park and timed-wait, only accessed on current/carrier thread 180 private Future<?> timeoutTask; 181 182 // carrier thread when mounted, accessed by VM 183 private volatile Thread carrierThread; 184 185 // termination object when joining, created lazily if needed 186 private volatile CountDownLatch termination; 187 188 /** 189 * Returns the default scheduler. 190 */ 191 static Executor defaultScheduler() { 192 return DEFAULT_SCHEDULER; 193 } 194 195 /** 196 * Returns the continuation scope used for virtual threads. 197 */ 198 static ContinuationScope continuationScope() { 199 return VTHREAD_SCOPE; 200 } 201 202 /** 203 * Creates a new {@code VirtualThread} to run the given task with the given 204 * scheduler. If the given scheduler is {@code null} and the current thread 205 * is a platform thread then the newly created virtual thread will use the 206 * default scheduler. If given scheduler is {@code null} and the current 207 * thread is a virtual thread then the current thread's scheduler is used. 208 * 209 * @param scheduler the scheduler or null 210 * @param name thread name 211 * @param characteristics characteristics 212 * @param task the task to execute 213 */ 214 VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) { 215 super(name, characteristics, /*bound*/ false); 216 Objects.requireNonNull(task); 217 218 // choose scheduler if not specified 219 if (scheduler == null) { 220 Thread parent = Thread.currentThread(); 221 if (parent instanceof VirtualThread vparent) { 222 scheduler = vparent.scheduler; 223 } else { 224 scheduler = DEFAULT_SCHEDULER; 225 } 226 } 227 228 this.scheduler = scheduler; 229 this.cont = new VThreadContinuation(this, task); 230 this.runContinuation = this::runContinuation; 231 } 232 233 /** 234 * The continuation that a virtual thread executes. 235 */ 236 private static class VThreadContinuation extends Continuation { 237 VThreadContinuation(VirtualThread vthread, Runnable task) { 238 super(VTHREAD_SCOPE, wrap(vthread, task)); 239 } 240 @Override 241 protected void onPinned(Continuation.Pinned reason) { 242 } 243 private static Runnable wrap(VirtualThread vthread, Runnable task) { 244 return new Runnable() { 245 @Hidden 246 @JvmtiHideEvents 247 public void run() { 248 vthread.notifyJvmtiStart(); // notify JVMTI 249 try { 250 vthread.run(task); 251 } finally { 252 vthread.notifyJvmtiEnd(); // notify JVMTI 253 } 254 } 255 }; 256 } 257 } 258 259 /** 260 * Runs or continues execution on the current thread. The virtual thread is mounted 261 * on the current thread before the task runs or continues. It unmounts when the 262 * task completes or yields. 263 */ 264 @ChangesCurrentThread // allow mount/unmount to be inlined 265 private void runContinuation() { 266 // the carrier must be a platform thread 267 if (Thread.currentThread().isVirtual()) { 268 throw new WrongThreadException(); 269 } 270 271 // set state to RUNNING 272 int initialState = state(); 273 if (initialState == STARTED || initialState == UNPARKED 274 || initialState == UNBLOCKED || initialState == YIELDED) { 275 // newly started or continue after parking/blocking/Thread.yield 276 if (!compareAndSetState(initialState, RUNNING)) { 277 return; 278 } 279 // consume permit when continuing after parking or blocking. If continue 280 // after a timed-park or timed-wait then the timeout task is cancelled. 281 if (initialState == UNPARKED) { 282 cancelTimeoutTask(); 283 setParkPermit(false); 284 } else if (initialState == UNBLOCKED) { 285 cancelTimeoutTask(); 286 blockPermit = false; 287 } 288 } else { 289 // not runnable 290 return; 291 } 292 293 mount(); 294 try { 295 cont.run(); 296 } finally { 297 unmount(); 298 if (cont.isDone()) { 299 afterDone(); 300 } else { 301 afterYield(); 302 } 303 } 304 } 305 306 /** 307 * Cancel timeout task when continuing after timed-park or timed-wait. 308 * The timeout task may be executing, or may have already completed. 309 */ 310 private void cancelTimeoutTask() { 311 if (timeoutTask != null) { 312 timeoutTask.cancel(false); 313 timeoutTask = null; 314 } 315 } 316 317 /** 318 * Submits the runContinuation task to the scheduler. For the default scheduler, 319 * and calling it on a worker thread, the task will be pushed to the local queue, 320 * otherwise it will be pushed to an external submission queue. 321 * @param scheduler the scheduler 322 * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown 323 * @throws RejectedExecutionException 324 */ 325 private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) { 326 boolean done = false; 327 while (!done) { 328 try { 329 // Pin the continuation to prevent the virtual thread from unmounting 330 // when submitting a task. For the default scheduler this ensures that 331 // the carrier doesn't change when pushing a task. For other schedulers 332 // it avoids deadlock that could arise due to carriers and virtual 333 // threads contending for a lock. 334 if (currentThread().isVirtual()) { 335 Continuation.pin(); 336 try { 337 scheduler.execute(runContinuation); 338 } finally { 339 Continuation.unpin(); 340 } 341 } else { 342 scheduler.execute(runContinuation); 343 } 344 done = true; 345 } catch (RejectedExecutionException ree) { 346 submitFailed(ree); 347 throw ree; 348 } catch (OutOfMemoryError e) { 349 if (retryOnOOME) { 350 U.park(false, 100_000_000); // 100ms 351 } else { 352 throw e; 353 } 354 } 355 } 356 } 357 358 /** 359 * Submits the runContinuation task to the given scheduler as an external submit. 360 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 361 * @throws RejectedExecutionException 362 * @see ForkJoinPool#externalSubmit(ForkJoinTask) 363 */ 364 private void externalSubmitRunContinuation(ForkJoinPool pool) { 365 assert Thread.currentThread() instanceof CarrierThread; 366 try { 367 pool.externalSubmit(ForkJoinTask.adapt(runContinuation)); 368 } catch (RejectedExecutionException ree) { 369 submitFailed(ree); 370 throw ree; 371 } catch (OutOfMemoryError e) { 372 submitRunContinuation(pool, true); 373 } 374 } 375 376 /** 377 * Submits the runContinuation task to the scheduler. For the default scheduler, 378 * and calling it on a worker thread, the task will be pushed to the local queue, 379 * otherwise it will be pushed to an external submission queue. 380 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 381 * @throws RejectedExecutionException 382 */ 383 private void submitRunContinuation() { 384 submitRunContinuation(scheduler, true); 385 } 386 387 /** 388 * Lazy submit the runContinuation task if invoked on a carrier thread and its local 389 * queue is empty. If not empty, or invoked by another thread, then this method works 390 * like submitRunContinuation and just submits the task to the scheduler. 391 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 392 * @throws RejectedExecutionException 393 * @see ForkJoinPool#lazySubmit(ForkJoinTask) 394 */ 395 private void lazySubmitRunContinuation() { 396 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 397 ForkJoinPool pool = ct.getPool(); 398 try { 399 pool.lazySubmit(ForkJoinTask.adapt(runContinuation)); 400 } catch (RejectedExecutionException ree) { 401 submitFailed(ree); 402 throw ree; 403 } catch (OutOfMemoryError e) { 404 submitRunContinuation(); 405 } 406 } else { 407 submitRunContinuation(); 408 } 409 } 410 411 /** 412 * Submits the runContinuation task to the scheduler. For the default scheduler, and 413 * calling it a virtual thread that uses the default scheduler, the task will be 414 * pushed to an external submission queue. This method may throw OutOfMemoryError. 415 * @throws RejectedExecutionException 416 * @throws OutOfMemoryError 417 */ 418 private void externalSubmitRunContinuationOrThrow() { 419 if (scheduler == DEFAULT_SCHEDULER && currentCarrierThread() instanceof CarrierThread ct) { 420 try { 421 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation)); 422 } catch (RejectedExecutionException ree) { 423 submitFailed(ree); 424 throw ree; 425 } 426 } else { 427 submitRunContinuation(scheduler, false); 428 } 429 } 430 431 /** 432 * If enabled, emits a JFR VirtualThreadSubmitFailedEvent. 433 */ 434 private void submitFailed(RejectedExecutionException ree) { 435 var event = new VirtualThreadSubmitFailedEvent(); 436 if (event.isEnabled()) { 437 event.javaThreadId = threadId(); 438 event.exceptionMessage = ree.getMessage(); 439 event.commit(); 440 } 441 } 442 443 /** 444 * Runs a task in the context of this virtual thread. 445 */ 446 private void run(Runnable task) { 447 assert Thread.currentThread() == this && state == RUNNING; 448 449 // emit JFR event if enabled 450 if (VirtualThreadStartEvent.isTurnedOn()) { 451 var event = new VirtualThreadStartEvent(); 452 event.javaThreadId = threadId(); 453 event.commit(); 454 } 455 456 Object bindings = Thread.scopedValueBindings(); 457 try { 458 runWith(bindings, task); 459 } catch (Throwable exc) { 460 dispatchUncaughtException(exc); 461 } finally { 462 // pop any remaining scopes from the stack, this may block 463 StackableScope.popAll(); 464 465 // emit JFR event if enabled 466 if (VirtualThreadEndEvent.isTurnedOn()) { 467 var event = new VirtualThreadEndEvent(); 468 event.javaThreadId = threadId(); 469 event.commit(); 470 } 471 } 472 } 473 474 /** 475 * Mounts this virtual thread onto the current platform thread. On 476 * return, the current thread is the virtual thread. 477 */ 478 @ChangesCurrentThread 479 @ReservedStackAccess 480 private void mount() { 481 // notify JVMTI before mount 482 notifyJvmtiMount(/*hide*/true); 483 484 // sets the carrier thread 485 Thread carrier = Thread.currentCarrierThread(); 486 setCarrierThread(carrier); 487 488 // sync up carrier thread interrupt status if needed 489 if (interrupted) { 490 carrier.setInterrupt(); 491 } else if (carrier.isInterrupted()) { 492 synchronized (interruptLock) { 493 // need to recheck interrupt status 494 if (!interrupted) { 495 carrier.clearInterrupt(); 496 } 497 } 498 } 499 500 // set Thread.currentThread() to return this virtual thread 501 carrier.setCurrentThread(this); 502 } 503 504 /** 505 * Unmounts this virtual thread from the carrier. On return, the 506 * current thread is the current platform thread. 507 */ 508 @ChangesCurrentThread 509 @ReservedStackAccess 510 private void unmount() { 511 assert !Thread.holdsLock(interruptLock); 512 513 // set Thread.currentThread() to return the platform thread 514 Thread carrier = this.carrierThread; 515 carrier.setCurrentThread(carrier); 516 517 // break connection to carrier thread, synchronized with interrupt 518 synchronized (interruptLock) { 519 setCarrierThread(null); 520 } 521 carrier.clearInterrupt(); 522 523 // notify JVMTI after unmount 524 notifyJvmtiUnmount(/*hide*/false); 525 } 526 527 /** 528 * Invokes Continuation.yield, notifying JVMTI (if enabled) to hide frames until 529 * the continuation continues. 530 */ 531 @Hidden 532 private boolean yieldContinuation() { 533 notifyJvmtiUnmount(/*hide*/true); 534 try { 535 return Continuation.yield(VTHREAD_SCOPE); 536 } finally { 537 notifyJvmtiMount(/*hide*/false); 538 } 539 } 540 541 /** 542 * Invoked in the context of the carrier thread after the Continuation yields when 543 * parking, blocking on monitor enter, Object.wait, or Thread.yield. 544 */ 545 private void afterYield() { 546 assert carrierThread == null; 547 548 // re-adjust parallelism if the virtual thread yielded when compensating 549 if (currentThread() instanceof CarrierThread ct) { 550 ct.endBlocking(); 551 } 552 553 int s = state(); 554 555 // LockSupport.park/parkNanos 556 if (s == PARKING || s == TIMED_PARKING) { 557 int newState; 558 if (s == PARKING) { 559 setState(newState = PARKED); 560 } else { 561 // schedule unpark 562 assert timeout > 0; 563 timeoutTask = schedule(this::unpark, timeout, NANOSECONDS); 564 setState(newState = TIMED_PARKED); 565 } 566 567 // may have been unparked while parking 568 if (parkPermit && compareAndSetState(newState, UNPARKED)) { 569 // lazy submit if local queue is empty 570 lazySubmitRunContinuation(); 571 } 572 return; 573 } 574 575 // Thread.yield 576 if (s == YIELDING) { 577 setState(YIELDED); 578 579 // external submit if there are no tasks in the local task queue 580 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 581 externalSubmitRunContinuation(ct.getPool()); 582 } else { 583 submitRunContinuation(); 584 } 585 return; 586 } 587 588 // blocking on monitorenter 589 if (s == BLOCKING) { 590 setState(BLOCKED); 591 592 // may have been unblocked while blocking 593 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) { 594 // lazy submit if local queue is empty 595 lazySubmitRunContinuation(); 596 } 597 return; 598 } 599 600 // Object.wait 601 if (s == WAITING || s == TIMED_WAITING) { 602 int newState; 603 if (s == WAITING) { 604 setState(newState = WAIT); 605 } else { 606 // For timed-wait, a timeout task is scheduled to execute. The timeout 607 // task will change the thread state to UNBLOCKED and submit the thread 608 // to the scheduler. A sequence number is used to ensure that the timeout 609 // task only unblocks the thread for this timed-wait. We synchronize with 610 // the timeout task to coordinate access to the sequence number and to 611 // ensure the timeout task doesn't execute until the thread has got to 612 // the TIMED_WAIT state. 613 assert timeout > 0; 614 synchronized (timedWaitLock()) { 615 byte seqNo = ++timedWaitSeqNo; 616 timeoutTask = schedule(() -> waitTimeoutExpired(seqNo), timeout, MILLISECONDS); 617 setState(newState = TIMED_WAIT); 618 } 619 } 620 621 // may have been notified while in transition to wait state 622 if (notified && compareAndSetState(newState, BLOCKED)) { 623 // may have even been unblocked already 624 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) { 625 submitRunContinuation(); 626 } 627 return; 628 } 629 630 // may have been interrupted while in transition to wait state 631 if (interrupted && compareAndSetState(newState, UNBLOCKED)) { 632 submitRunContinuation(); 633 return; 634 } 635 return; 636 } 637 638 assert false; 639 } 640 641 /** 642 * Invoked after the continuation completes. 643 */ 644 private void afterDone() { 645 afterDone(true); 646 } 647 648 /** 649 * Invoked after the continuation completes (or start failed). Sets the thread 650 * state to TERMINATED and notifies anyone waiting for the thread to terminate. 651 * 652 * @param notifyContainer true if its container should be notified 653 */ 654 private void afterDone(boolean notifyContainer) { 655 assert carrierThread == null; 656 setState(TERMINATED); 657 658 // notify anyone waiting for this virtual thread to terminate 659 CountDownLatch termination = this.termination; 660 if (termination != null) { 661 assert termination.getCount() == 1; 662 termination.countDown(); 663 } 664 665 // notify container 666 if (notifyContainer) { 667 threadContainer().onExit(this); 668 } 669 670 // clear references to thread locals 671 clearReferences(); 672 } 673 674 /** 675 * Schedules this {@code VirtualThread} to execute. 676 * 677 * @throws IllegalStateException if the container is shutdown or closed 678 * @throws IllegalThreadStateException if the thread has already been started 679 * @throws RejectedExecutionException if the scheduler cannot accept a task 680 */ 681 @Override 682 void start(ThreadContainer container) { 683 if (!compareAndSetState(NEW, STARTED)) { 684 throw new IllegalThreadStateException("Already started"); 685 } 686 687 // bind thread to container 688 assert threadContainer() == null; 689 setThreadContainer(container); 690 691 // start thread 692 boolean addedToContainer = false; 693 boolean started = false; 694 try { 695 container.onStart(this); // may throw 696 addedToContainer = true; 697 698 // scoped values may be inherited 699 inheritScopedValueBindings(container); 700 701 // submit task to run thread, using externalSubmit if possible 702 externalSubmitRunContinuationOrThrow(); 703 started = true; 704 } finally { 705 if (!started) { 706 afterDone(addedToContainer); 707 } 708 } 709 } 710 711 @Override 712 public void start() { 713 start(ThreadContainers.root()); 714 } 715 716 @Override 717 public void run() { 718 // do nothing 719 } 720 721 /** 722 * Parks until unparked or interrupted. If already unparked then the parking 723 * permit is consumed and this method completes immediately (meaning it doesn't 724 * yield). It also completes immediately if the interrupt status is set. 725 */ 726 @Override 727 void park() { 728 assert Thread.currentThread() == this; 729 730 // complete immediately if parking permit available or interrupted 731 if (getAndSetParkPermit(false) || interrupted) 732 return; 733 734 // park the thread 735 boolean yielded = false; 736 setState(PARKING); 737 try { 738 yielded = yieldContinuation(); 739 } catch (OutOfMemoryError e) { 740 // park on carrier 741 } finally { 742 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 743 if (!yielded) { 744 assert state() == PARKING; 745 setState(RUNNING); 746 } 747 } 748 749 // park on the carrier thread when pinned 750 if (!yielded) { 751 parkOnCarrierThread(false, 0); 752 } 753 } 754 755 /** 756 * Parks up to the given waiting time or until unparked or interrupted. 757 * If already unparked then the parking permit is consumed and this method 758 * completes immediately (meaning it doesn't yield). It also completes immediately 759 * if the interrupt status is set or the waiting time is {@code <= 0}. 760 * 761 * @param nanos the maximum number of nanoseconds to wait. 762 */ 763 @Override 764 void parkNanos(long nanos) { 765 assert Thread.currentThread() == this; 766 767 // complete immediately if parking permit available or interrupted 768 if (getAndSetParkPermit(false) || interrupted) 769 return; 770 771 // park the thread for the waiting time 772 if (nanos > 0) { 773 long startTime = System.nanoTime(); 774 775 // park the thread, afterYield will schedule the thread to unpark 776 boolean yielded = false; 777 timeout = nanos; 778 setState(TIMED_PARKING); 779 try { 780 yielded = yieldContinuation(); 781 } catch (OutOfMemoryError e) { 782 // park on carrier 783 } finally { 784 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 785 if (!yielded) { 786 assert state() == TIMED_PARKING; 787 setState(RUNNING); 788 } 789 } 790 791 // park on carrier thread for remaining time when pinned (or OOME) 792 if (!yielded) { 793 long remainingNanos = nanos - (System.nanoTime() - startTime); 794 parkOnCarrierThread(true, remainingNanos); 795 } 796 } 797 } 798 799 /** 800 * Parks the current carrier thread up to the given waiting time or until 801 * unparked or interrupted. If the virtual thread is interrupted then the 802 * interrupt status will be propagated to the carrier thread. 803 * @param timed true for a timed park, false for untimed 804 * @param nanos the waiting time in nanoseconds 805 */ 806 private void parkOnCarrierThread(boolean timed, long nanos) { 807 assert state() == RUNNING; 808 809 setState(timed ? TIMED_PINNED : PINNED); 810 try { 811 if (!parkPermit) { 812 if (!timed) { 813 U.park(false, 0); 814 } else if (nanos > 0) { 815 U.park(false, nanos); 816 } 817 } 818 } finally { 819 setState(RUNNING); 820 } 821 822 // consume parking permit 823 setParkPermit(false); 824 825 // JFR jdk.VirtualThreadPinned event 826 postPinnedEvent("LockSupport.park"); 827 } 828 829 /** 830 * Call into VM when pinned to record a JFR jdk.VirtualThreadPinned event. 831 * Recording the event in the VM avoids having JFR event recorded in Java 832 * with the same name, but different ID, to events recorded by the VM. 833 */ 834 @Hidden 835 private static native void postPinnedEvent(String op); 836 837 /** 838 * Re-enables this virtual thread for scheduling. If this virtual thread is parked 839 * then its task is scheduled to continue, otherwise its next call to {@code park} or 840 * {@linkplain #parkNanos(long) parkNanos} is guaranteed not to block. 841 * @throws RejectedExecutionException if the scheduler cannot accept a task 842 */ 843 @Override 844 void unpark() { 845 if (!getAndSetParkPermit(true) && currentThread() != this) { 846 int s = state(); 847 848 // unparked while parked 849 if ((s == PARKED || s == TIMED_PARKED) && compareAndSetState(s, UNPARKED)) { 850 submitRunContinuation(); 851 return; 852 } 853 854 // unparked while parked when pinned 855 if (s == PINNED || s == TIMED_PINNED) { 856 // unpark carrier thread when pinned 857 disableSuspendAndPreempt(); 858 try { 859 synchronized (carrierThreadAccessLock()) { 860 Thread carrier = carrierThread; 861 if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) { 862 U.unpark(carrier); 863 } 864 } 865 } finally { 866 enableSuspendAndPreempt(); 867 } 868 return; 869 } 870 } 871 } 872 873 /** 874 * Invoked by unblocker thread to unblock this virtual thread. 875 */ 876 private void unblock() { 877 assert !Thread.currentThread().isVirtual(); 878 blockPermit = true; 879 if (state() == BLOCKED && compareAndSetState(BLOCKED, UNBLOCKED)) { 880 submitRunContinuation(); 881 } 882 } 883 884 /** 885 * Invoked by timer thread when wait timeout for virtual thread has expired. 886 * If the virtual thread is in timed-wait then this method will unblock the thread 887 * and submit its task so that it continues and attempts to reenter the monitor. 888 * This method does nothing if the thread has been woken by notify or interrupt. 889 */ 890 private void waitTimeoutExpired(byte seqNo) { 891 assert !Thread.currentThread().isVirtual(); 892 for (;;) { 893 boolean unblocked = false; 894 synchronized (timedWaitLock()) { 895 if (seqNo != timedWaitSeqNo) { 896 // this timeout task is for a past timed-wait 897 return; 898 } 899 int s = state(); 900 if (s == TIMED_WAIT) { 901 unblocked = compareAndSetState(TIMED_WAIT, UNBLOCKED); 902 } else if (s != (TIMED_WAIT | SUSPENDED)) { 903 // notified or interrupted, no longer waiting 904 return; 905 } 906 } 907 if (unblocked) { 908 submitRunContinuation(); 909 return; 910 } 911 // need to retry when thread is suspended in time-wait 912 Thread.yield(); 913 } 914 } 915 916 /** 917 * Attempts to yield the current virtual thread (Thread.yield). 918 */ 919 void tryYield() { 920 assert Thread.currentThread() == this; 921 setState(YIELDING); 922 boolean yielded = false; 923 try { 924 yielded = yieldContinuation(); // may throw 925 } finally { 926 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 927 if (!yielded) { 928 assert state() == YIELDING; 929 setState(RUNNING); 930 } 931 } 932 } 933 934 /** 935 * Sleep the current thread for the given sleep time (in nanoseconds). If 936 * nanos is 0 then the thread will attempt to yield. 937 * 938 * @implNote This implementation parks the thread for the given sleeping time 939 * and will therefore be observed in PARKED state during the sleep. Parking 940 * will consume the parking permit so this method makes available the parking 941 * permit after the sleep. This may be observed as a spurious, but benign, 942 * wakeup when the thread subsequently attempts to park. 943 * 944 * @param nanos the maximum number of nanoseconds to sleep 945 * @throws InterruptedException if interrupted while sleeping 946 */ 947 void sleepNanos(long nanos) throws InterruptedException { 948 assert Thread.currentThread() == this && nanos >= 0; 949 if (getAndClearInterrupt()) 950 throw new InterruptedException(); 951 if (nanos == 0) { 952 tryYield(); 953 } else { 954 // park for the sleep time 955 try { 956 long remainingNanos = nanos; 957 long startNanos = System.nanoTime(); 958 while (remainingNanos > 0) { 959 parkNanos(remainingNanos); 960 if (getAndClearInterrupt()) { 961 throw new InterruptedException(); 962 } 963 remainingNanos = nanos - (System.nanoTime() - startNanos); 964 } 965 } finally { 966 // may have been unparked while sleeping 967 setParkPermit(true); 968 } 969 } 970 } 971 972 /** 973 * Waits up to {@code nanos} nanoseconds for this virtual thread to terminate. 974 * A timeout of {@code 0} means to wait forever. 975 * 976 * @throws InterruptedException if interrupted while waiting 977 * @return true if the thread has terminated 978 */ 979 boolean joinNanos(long nanos) throws InterruptedException { 980 if (state() == TERMINATED) 981 return true; 982 983 // ensure termination object exists, then re-check state 984 CountDownLatch termination = getTermination(); 985 if (state() == TERMINATED) 986 return true; 987 988 // wait for virtual thread to terminate 989 if (nanos == 0) { 990 termination.await(); 991 } else { 992 boolean terminated = termination.await(nanos, NANOSECONDS); 993 if (!terminated) { 994 // waiting time elapsed 995 return false; 996 } 997 } 998 assert state() == TERMINATED; 999 return true; 1000 } 1001 1002 @Override 1003 void blockedOn(Interruptible b) { 1004 disableSuspendAndPreempt(); 1005 try { 1006 super.blockedOn(b); 1007 } finally { 1008 enableSuspendAndPreempt(); 1009 } 1010 } 1011 1012 @Override 1013 public void interrupt() { 1014 if (Thread.currentThread() != this) { 1015 // if current thread is a virtual thread then prevent it from being 1016 // suspended or unmounted when entering or holding interruptLock 1017 Interruptible blocker; 1018 disableSuspendAndPreempt(); 1019 try { 1020 synchronized (interruptLock) { 1021 interrupted = true; 1022 blocker = nioBlocker(); 1023 if (blocker != null) { 1024 blocker.interrupt(this); 1025 } 1026 1027 // interrupt carrier thread if mounted 1028 Thread carrier = carrierThread; 1029 if (carrier != null) carrier.setInterrupt(); 1030 } 1031 } finally { 1032 enableSuspendAndPreempt(); 1033 } 1034 1035 // notify blocker after releasing interruptLock 1036 if (blocker != null) { 1037 blocker.postInterrupt(); 1038 } 1039 1040 // make available parking permit, unpark thread if parked 1041 unpark(); 1042 1043 // if thread is waiting in Object.wait then schedule to try to reenter 1044 int s = state(); 1045 if ((s == WAIT || s == TIMED_WAIT) && compareAndSetState(s, UNBLOCKED)) { 1046 submitRunContinuation(); 1047 } 1048 1049 } else { 1050 interrupted = true; 1051 carrierThread.setInterrupt(); 1052 setParkPermit(true); 1053 } 1054 } 1055 1056 @Override 1057 public boolean isInterrupted() { 1058 return interrupted; 1059 } 1060 1061 @Override 1062 boolean getAndClearInterrupt() { 1063 assert Thread.currentThread() == this; 1064 boolean oldValue = interrupted; 1065 if (oldValue) { 1066 disableSuspendAndPreempt(); 1067 try { 1068 synchronized (interruptLock) { 1069 interrupted = false; 1070 carrierThread.clearInterrupt(); 1071 } 1072 } finally { 1073 enableSuspendAndPreempt(); 1074 } 1075 } 1076 return oldValue; 1077 } 1078 1079 @Override 1080 Thread.State threadState() { 1081 int s = state(); 1082 switch (s & ~SUSPENDED) { 1083 case NEW: 1084 return Thread.State.NEW; 1085 case STARTED: 1086 // return NEW if thread container not yet set 1087 if (threadContainer() == null) { 1088 return Thread.State.NEW; 1089 } else { 1090 return Thread.State.RUNNABLE; 1091 } 1092 case UNPARKED: 1093 case UNBLOCKED: 1094 case YIELDED: 1095 // runnable, not mounted 1096 return Thread.State.RUNNABLE; 1097 case RUNNING: 1098 // if mounted then return state of carrier thread 1099 if (Thread.currentThread() != this) { 1100 disableSuspendAndPreempt(); 1101 try { 1102 synchronized (carrierThreadAccessLock()) { 1103 Thread carrierThread = this.carrierThread; 1104 if (carrierThread != null) { 1105 return carrierThread.threadState(); 1106 } 1107 } 1108 } finally { 1109 enableSuspendAndPreempt(); 1110 } 1111 } 1112 // runnable, mounted 1113 return Thread.State.RUNNABLE; 1114 case PARKING: 1115 case TIMED_PARKING: 1116 case WAITING: 1117 case TIMED_WAITING: 1118 case YIELDING: 1119 // runnable, in transition 1120 return Thread.State.RUNNABLE; 1121 case PARKED: 1122 case PINNED: 1123 case WAIT: 1124 return Thread.State.WAITING; 1125 case TIMED_PARKED: 1126 case TIMED_PINNED: 1127 case TIMED_WAIT: 1128 return Thread.State.TIMED_WAITING; 1129 case BLOCKING: 1130 case BLOCKED: 1131 return Thread.State.BLOCKED; 1132 case TERMINATED: 1133 return Thread.State.TERMINATED; 1134 default: 1135 throw new InternalError(); 1136 } 1137 } 1138 1139 @Override 1140 boolean alive() { 1141 int s = state; 1142 return (s != NEW && s != TERMINATED); 1143 } 1144 1145 @Override 1146 boolean isTerminated() { 1147 return (state == TERMINATED); 1148 } 1149 1150 @Override 1151 StackTraceElement[] asyncGetStackTrace() { 1152 StackTraceElement[] stackTrace; 1153 do { 1154 stackTrace = (carrierThread != null) 1155 ? super.asyncGetStackTrace() // mounted 1156 : tryGetStackTrace(); // unmounted 1157 if (stackTrace == null) { 1158 Thread.yield(); 1159 } 1160 } while (stackTrace == null); 1161 return stackTrace; 1162 } 1163 1164 /** 1165 * Returns the stack trace for this virtual thread if it is unmounted. 1166 * Returns null if the thread is mounted or in transition. 1167 */ 1168 private StackTraceElement[] tryGetStackTrace() { 1169 int initialState = state() & ~SUSPENDED; 1170 switch (initialState) { 1171 case NEW, STARTED, TERMINATED -> { 1172 return new StackTraceElement[0]; // unmounted, empty stack 1173 } 1174 case RUNNING, PINNED, TIMED_PINNED -> { 1175 return null; // mounted 1176 } 1177 case PARKED, TIMED_PARKED, BLOCKED, WAIT, TIMED_WAIT -> { 1178 // unmounted, not runnable 1179 } 1180 case UNPARKED, UNBLOCKED, YIELDED -> { 1181 // unmounted, runnable 1182 } 1183 case PARKING, TIMED_PARKING, BLOCKING, YIELDING, WAITING, TIMED_WAITING -> { 1184 return null; // in transition 1185 } 1186 default -> throw new InternalError("" + initialState); 1187 } 1188 1189 // thread is unmounted, prevent it from continuing 1190 int suspendedState = initialState | SUSPENDED; 1191 if (!compareAndSetState(initialState, suspendedState)) { 1192 return null; 1193 } 1194 1195 // get stack trace and restore state 1196 StackTraceElement[] stack; 1197 try { 1198 stack = cont.getStackTrace(); 1199 } finally { 1200 assert state == suspendedState; 1201 setState(initialState); 1202 } 1203 boolean resubmit = switch (initialState) { 1204 case UNPARKED, UNBLOCKED, YIELDED -> { 1205 // resubmit as task may have run while suspended 1206 yield true; 1207 } 1208 case PARKED, TIMED_PARKED -> { 1209 // resubmit if unparked while suspended 1210 yield parkPermit && compareAndSetState(initialState, UNPARKED); 1211 } 1212 case BLOCKED -> { 1213 // resubmit if unblocked while suspended 1214 yield blockPermit && compareAndSetState(BLOCKED, UNBLOCKED); 1215 } 1216 case WAIT, TIMED_WAIT -> { 1217 // resubmit if notified or interrupted while waiting (Object.wait) 1218 // waitTimeoutExpired will retry if the timed expired when suspended 1219 yield (notified || interrupted) && compareAndSetState(initialState, UNBLOCKED); 1220 } 1221 default -> throw new InternalError(); 1222 }; 1223 if (resubmit) { 1224 submitRunContinuation(); 1225 } 1226 return stack; 1227 } 1228 1229 @Override 1230 public String toString() { 1231 StringBuilder sb = new StringBuilder("VirtualThread[#"); 1232 sb.append(threadId()); 1233 String name = getName(); 1234 if (!name.isEmpty()) { 1235 sb.append(","); 1236 sb.append(name); 1237 } 1238 sb.append("]/"); 1239 1240 // add the carrier state and thread name when mounted 1241 boolean mounted; 1242 if (Thread.currentThread() == this) { 1243 mounted = appendCarrierInfo(sb); 1244 } else { 1245 disableSuspendAndPreempt(); 1246 try { 1247 synchronized (carrierThreadAccessLock()) { 1248 mounted = appendCarrierInfo(sb); 1249 } 1250 } finally { 1251 enableSuspendAndPreempt(); 1252 } 1253 } 1254 1255 // add virtual thread state when not mounted 1256 if (!mounted) { 1257 String stateAsString = threadState().toString(); 1258 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1259 } 1260 1261 return sb.toString(); 1262 } 1263 1264 /** 1265 * Appends the carrier state and thread name to the string buffer if mounted. 1266 * @return true if mounted, false if not mounted 1267 */ 1268 private boolean appendCarrierInfo(StringBuilder sb) { 1269 assert Thread.currentThread() == this || Thread.holdsLock(carrierThreadAccessLock()); 1270 Thread carrier = carrierThread; 1271 if (carrier != null) { 1272 String stateAsString = carrier.threadState().toString(); 1273 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1274 sb.append('@'); 1275 sb.append(carrier.getName()); 1276 return true; 1277 } else { 1278 return false; 1279 } 1280 } 1281 1282 @Override 1283 public int hashCode() { 1284 return (int) threadId(); 1285 } 1286 1287 @Override 1288 public boolean equals(Object obj) { 1289 return obj == this; 1290 } 1291 1292 /** 1293 * Returns the termination object, creating it if needed. 1294 */ 1295 private CountDownLatch getTermination() { 1296 CountDownLatch termination = this.termination; 1297 if (termination == null) { 1298 termination = new CountDownLatch(1); 1299 if (!U.compareAndSetReference(this, TERMINATION, null, termination)) { 1300 termination = this.termination; 1301 } 1302 } 1303 return termination; 1304 } 1305 1306 /** 1307 * Returns the lock object to synchronize on when accessing carrierThread. 1308 * The lock prevents carrierThread from being reset to null during unmount. 1309 */ 1310 private Object carrierThreadAccessLock() { 1311 // return interruptLock as unmount has to coordinate with interrupt 1312 return interruptLock; 1313 } 1314 1315 /** 1316 * Returns a lock object for coordinating timed-wait setup and timeout handling. 1317 */ 1318 private Object timedWaitLock() { 1319 // use this object for now to avoid the overhead of introducing another lock 1320 return runContinuation; 1321 } 1322 1323 /** 1324 * Disallow the current thread be suspended or preempted. 1325 */ 1326 private void disableSuspendAndPreempt() { 1327 notifyJvmtiDisableSuspend(true); 1328 Continuation.pin(); 1329 } 1330 1331 /** 1332 * Allow the current thread be suspended or preempted. 1333 */ 1334 private void enableSuspendAndPreempt() { 1335 Continuation.unpin(); 1336 notifyJvmtiDisableSuspend(false); 1337 } 1338 1339 // -- wrappers for get/set of state, parking permit, and carrier thread -- 1340 1341 private int state() { 1342 return state; // volatile read 1343 } 1344 1345 private void setState(int newValue) { 1346 state = newValue; // volatile write 1347 } 1348 1349 private boolean compareAndSetState(int expectedValue, int newValue) { 1350 return U.compareAndSetInt(this, STATE, expectedValue, newValue); 1351 } 1352 1353 private boolean compareAndSetOnWaitingList(boolean expectedValue, boolean newValue) { 1354 return U.compareAndSetBoolean(this, ON_WAITING_LIST, expectedValue, newValue); 1355 } 1356 1357 private void setParkPermit(boolean newValue) { 1358 if (parkPermit != newValue) { 1359 parkPermit = newValue; 1360 } 1361 } 1362 1363 private boolean getAndSetParkPermit(boolean newValue) { 1364 if (parkPermit != newValue) { 1365 return U.getAndSetBoolean(this, PARK_PERMIT, newValue); 1366 } else { 1367 return newValue; 1368 } 1369 } 1370 1371 private void setCarrierThread(Thread carrier) { 1372 // U.putReferenceRelease(this, CARRIER_THREAD, carrier); 1373 this.carrierThread = carrier; 1374 } 1375 1376 // -- JVM TI support -- 1377 1378 @IntrinsicCandidate 1379 @JvmtiMountTransition 1380 private native void notifyJvmtiStart(); 1381 1382 @IntrinsicCandidate 1383 @JvmtiMountTransition 1384 private native void notifyJvmtiEnd(); 1385 1386 @IntrinsicCandidate 1387 @JvmtiMountTransition 1388 private native void notifyJvmtiMount(boolean hide); 1389 1390 @IntrinsicCandidate 1391 @JvmtiMountTransition 1392 private native void notifyJvmtiUnmount(boolean hide); 1393 1394 @IntrinsicCandidate 1395 private static native void notifyJvmtiDisableSuspend(boolean enter); 1396 1397 private static native void registerNatives(); 1398 static { 1399 registerNatives(); 1400 1401 // ensure VTHREAD_GROUP is created, may be accessed by JVMTI 1402 var group = Thread.virtualThreadGroup(); 1403 } 1404 1405 /** 1406 * Creates the default ForkJoinPool scheduler. 1407 */ 1408 private static ForkJoinPool createDefaultScheduler() { 1409 ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool); 1410 int parallelism, maxPoolSize, minRunnable; 1411 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism"); 1412 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize"); 1413 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable"); 1414 if (parallelismValue != null) { 1415 parallelism = Integer.parseInt(parallelismValue); 1416 } else { 1417 parallelism = Runtime.getRuntime().availableProcessors(); 1418 } 1419 if (maxPoolSizeValue != null) { 1420 maxPoolSize = Integer.parseInt(maxPoolSizeValue); 1421 parallelism = Integer.min(parallelism, maxPoolSize); 1422 } else { 1423 maxPoolSize = Integer.max(parallelism, 256); 1424 } 1425 if (minRunnableValue != null) { 1426 minRunnable = Integer.parseInt(minRunnableValue); 1427 } else { 1428 minRunnable = Integer.max(parallelism / 2, 1); 1429 } 1430 Thread.UncaughtExceptionHandler handler = (t, e) -> { }; 1431 boolean asyncMode = true; // FIFO 1432 return new ForkJoinPool(parallelism, factory, handler, asyncMode, 1433 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS); 1434 } 1435 1436 /** 1437 * Schedule a runnable task to run after a delay. 1438 */ 1439 private static Future<?> schedule(Runnable command, long delay, TimeUnit unit) { 1440 long tid = Thread.currentThread().threadId(); 1441 int index = (int) tid & (DELAYED_TASK_SCHEDULERS.length - 1); 1442 return DELAYED_TASK_SCHEDULERS[index].schedule(command, delay, unit); 1443 } 1444 1445 /** 1446 * Creates the ScheduledThreadPoolExecutors used to execute delayed tasks. 1447 */ 1448 private static ScheduledExecutorService[] createDelayedTaskSchedulers() { 1449 String propName = "jdk.virtualThreadScheduler.timerQueues"; 1450 String propValue = System.getProperty(propName); 1451 int queueCount; 1452 if (propValue != null) { 1453 queueCount = Integer.parseInt(propValue); 1454 if (queueCount != Integer.highestOneBit(queueCount)) { 1455 throw new RuntimeException("Value of " + propName + " must be power of 2"); 1456 } 1457 } else { 1458 int ncpus = Runtime.getRuntime().availableProcessors(); 1459 queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1); 1460 } 1461 var schedulers = new ScheduledExecutorService[queueCount]; 1462 for (int i = 0; i < queueCount; i++) { 1463 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) 1464 Executors.newScheduledThreadPool(1, task -> { 1465 Thread t = InnocuousThread.newThread("VirtualThread-unparker", task); 1466 t.setDaemon(true); 1467 return t; 1468 }); 1469 stpe.setRemoveOnCancelPolicy(true); 1470 schedulers[i] = stpe; 1471 } 1472 return schedulers; 1473 } 1474 1475 /** 1476 * Schedule virtual threads that are ready to be scheduled after they blocked on 1477 * monitor enter. 1478 */ 1479 private static void unblockVirtualThreads() { 1480 while (true) { 1481 VirtualThread vthread = takeVirtualThreadListToUnblock(); 1482 while (vthread != null) { 1483 assert vthread.onWaitingList; 1484 VirtualThread nextThread = vthread.next; 1485 1486 // remove from list and unblock 1487 vthread.next = null; 1488 boolean changed = vthread.compareAndSetOnWaitingList(true, false); 1489 assert changed; 1490 vthread.unblock(); 1491 1492 vthread = nextThread; 1493 } 1494 } 1495 } 1496 1497 /** 1498 * Retrieves the list of virtual threads that are waiting to be unblocked, waiting 1499 * if necessary until a list of one or more threads becomes available. 1500 */ 1501 private static native VirtualThread takeVirtualThreadListToUnblock(); 1502 1503 static { 1504 var unblocker = InnocuousThread.newThread("VirtualThread-unblocker", 1505 VirtualThread::unblockVirtualThreads); 1506 unblocker.setDaemon(true); 1507 unblocker.start(); 1508 } 1509 }