1 /* 2 * Copyright (c) 2018, 2026, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.lang; 26 27 import java.util.Locale; 28 import java.util.Objects; 29 import java.util.concurrent.CountDownLatch; 30 import java.util.concurrent.Executor; 31 import java.util.concurrent.Executors; 32 import java.util.concurrent.ForkJoinPool; 33 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory; 34 import java.util.concurrent.ForkJoinTask; 35 import java.util.concurrent.Future; 36 import java.util.concurrent.RejectedExecutionException; 37 import java.util.concurrent.ScheduledExecutorService; 38 import java.util.concurrent.ScheduledThreadPoolExecutor; 39 import java.util.concurrent.TimeUnit; 40 import jdk.internal.event.VirtualThreadEndEvent; 41 import jdk.internal.event.VirtualThreadStartEvent; 42 import jdk.internal.event.VirtualThreadSubmitFailedEvent; 43 import jdk.internal.misc.CarrierThread; 44 import jdk.internal.misc.InnocuousThread; 45 import jdk.internal.misc.Unsafe; 46 import jdk.internal.vm.Continuation; 47 import jdk.internal.vm.ContinuationScope; 48 import jdk.internal.vm.StackableScope; 49 import jdk.internal.vm.ThreadContainer; 50 import jdk.internal.vm.ThreadContainers; 51 import jdk.internal.vm.annotation.ChangesCurrentThread; 52 import jdk.internal.vm.annotation.Hidden; 53 import jdk.internal.vm.annotation.IntrinsicCandidate; 54 import jdk.internal.vm.annotation.JvmtiHideEvents; 55 import jdk.internal.vm.annotation.JvmtiMountTransition; 56 import jdk.internal.vm.annotation.ReservedStackAccess; 57 import sun.nio.ch.Interruptible; 58 import static java.util.concurrent.TimeUnit.*; 59 60 /** 61 * A thread that is scheduled by the Java virtual machine rather than the operating system. 62 */ 63 final class VirtualThread extends BaseVirtualThread { 64 private static final Unsafe U = Unsafe.getUnsafe(); 65 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads"); 66 private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler(); 67 68 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state"); 69 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit"); 70 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread"); 71 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination"); 72 private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList"); 73 74 // scheduler and continuation 75 private final Executor scheduler; 76 private final Continuation cont; 77 private final Runnable runContinuation; 78 79 // virtual thread state, accessed by VM 80 private volatile int state; 81 82 /* 83 * Virtual thread state transitions: 84 * 85 * NEW -> STARTED // Thread.start, schedule to run 86 * STARTED -> TERMINATED // failed to start 87 * STARTED -> RUNNING // first run 88 * RUNNING -> TERMINATED // done 89 * 90 * RUNNING -> PARKING // Thread parking with LockSupport.park 91 * PARKING -> PARKED // cont.yield successful, parked indefinitely 92 * PARKED -> UNPARKED // unparked, may be scheduled to continue 93 * UNPARKED -> RUNNING // continue execution after park 94 * 95 * PARKING -> RUNNING // cont.yield failed, need to park on carrier 96 * RUNNING -> PINNED // park on carrier 97 * PINNED -> RUNNING // unparked, continue execution on same carrier 98 * 99 * RUNNING -> TIMED_PARKING // Thread parking with LockSupport.parkNanos 100 * TIMED_PARKING -> TIMED_PARKED // cont.yield successful, timed-parked 101 * TIMED_PARKED -> UNPARKED // unparked, may be scheduled to continue 102 * 103 * TIMED_PARKING -> RUNNING // cont.yield failed, need to park on carrier 104 * RUNNING -> TIMED_PINNED // park on carrier 105 * TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier 106 * 107 * RUNNING -> BLOCKING // blocking on monitor enter 108 * BLOCKING -> BLOCKED // blocked on monitor enter 109 * BLOCKED -> UNBLOCKED // unblocked, may be scheduled to continue 110 * UNBLOCKED -> RUNNING // continue execution after blocked on monitor enter 111 * 112 * RUNNING -> WAITING // transitional state during wait on monitor 113 * WAITING -> WAIT // waiting on monitor 114 * WAIT -> BLOCKED // notified, waiting to be unblocked by monitor owner 115 * WAIT -> UNBLOCKED // interrupted 116 * 117 * RUNNING -> TIMED_WAITING // transition state during timed-waiting on monitor 118 * TIMED_WAITING -> TIMED_WAIT // timed-waiting on monitor 119 * TIMED_WAIT -> BLOCKED // notified, waiting to be unblocked by monitor owner 120 * TIMED_WAIT -> UNBLOCKED // timed-out/interrupted 121 * 122 * RUNNING -> YIELDING // Thread.yield 123 * YIELDING -> YIELDED // cont.yield successful, may be scheduled to continue 124 * YIELDING -> RUNNING // cont.yield failed 125 * YIELDED -> RUNNING // continue execution after Thread.yield 126 */ 127 private static final int NEW = 0; 128 private static final int STARTED = 1; 129 private static final int RUNNING = 2; // runnable-mounted 130 131 // untimed and timed parking 132 private static final int PARKING = 3; 133 private static final int PARKED = 4; // unmounted 134 private static final int PINNED = 5; // mounted 135 private static final int TIMED_PARKING = 6; 136 private static final int TIMED_PARKED = 7; // unmounted 137 private static final int TIMED_PINNED = 8; // mounted 138 private static final int UNPARKED = 9; // unmounted but runnable 139 140 // Thread.yield 141 private static final int YIELDING = 10; 142 private static final int YIELDED = 11; // unmounted but runnable 143 144 // monitor enter 145 private static final int BLOCKING = 12; 146 private static final int BLOCKED = 13; // unmounted 147 private static final int UNBLOCKED = 14; // unmounted but runnable 148 149 // monitor wait/timed-wait 150 private static final int WAITING = 15; 151 private static final int WAIT = 16; // waiting in Object.wait 152 private static final int TIMED_WAITING = 17; 153 private static final int TIMED_WAIT = 18; // waiting in timed-Object.wait 154 155 private static final int TERMINATED = 99; // final state 156 157 // can be suspended from scheduling when unmounted 158 private static final int SUSPENDED = 1 << 8; 159 160 // parking permit made available by LockSupport.unpark 161 private volatile boolean parkPermit; 162 163 // blocking permit made available by unblocker thread when another thread exits monitor 164 private volatile boolean blockPermit; 165 166 // true when on the list of virtual threads waiting to be unblocked 167 private volatile boolean onWaitingList; 168 169 // next virtual thread on the list of virtual threads waiting to be unblocked 170 private volatile VirtualThread next; 171 172 // notified by Object.notify/notifyAll while waiting in Object.wait 173 private volatile boolean notified; 174 175 // true when waiting in Object.wait, false for VM internal uninterruptible Object.wait 176 private volatile boolean interruptibleWait; 177 178 // timed-wait support 179 private byte timedWaitSeqNo; 180 181 // timeout for timed-park and timed-wait, only accessed on current/carrier thread 182 private long timeout; 183 184 // timer task for timed-park and timed-wait, only accessed on current/carrier thread 185 private Future<?> timeoutTask; 186 187 // carrier thread when mounted, accessed by VM 188 private volatile Thread carrierThread; 189 190 // termination object when joining, created lazily if needed 191 private volatile CountDownLatch termination; 192 193 /** 194 * Returns the default scheduler. 195 */ 196 static Executor defaultScheduler() { 197 return DEFAULT_SCHEDULER; 198 } 199 200 /** 201 * Returns the continuation scope used for virtual threads. 202 */ 203 static ContinuationScope continuationScope() { 204 return VTHREAD_SCOPE; 205 } 206 207 /** 208 * Creates a new {@code VirtualThread} to run the given task with the given 209 * scheduler. If the given scheduler is {@code null} and the current thread 210 * is a platform thread then the newly created virtual thread will use the 211 * default scheduler. If given scheduler is {@code null} and the current 212 * thread is a virtual thread then the current thread's scheduler is used. 213 * 214 * @param scheduler the scheduler or null 215 * @param name thread name 216 * @param characteristics characteristics 217 * @param task the task to execute 218 */ 219 VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) { 220 super(name, characteristics, /*bound*/ false); 221 Objects.requireNonNull(task); 222 223 // choose scheduler if not specified 224 if (scheduler == null) { 225 Thread parent = Thread.currentThread(); 226 if (parent instanceof VirtualThread vparent) { 227 scheduler = vparent.scheduler; 228 } else { 229 scheduler = DEFAULT_SCHEDULER; 230 } 231 } 232 233 this.scheduler = scheduler; 234 this.cont = new VThreadContinuation(this, task); 235 this.runContinuation = this::runContinuation; 236 } 237 238 /** 239 * The continuation that a virtual thread executes. 240 */ 241 private static class VThreadContinuation extends Continuation { 242 VThreadContinuation(VirtualThread vthread, Runnable task) { 243 super(VTHREAD_SCOPE, wrap(vthread, task)); 244 } 245 @Override 246 protected void onPinned(Continuation.Pinned reason) { 247 } 248 private static Runnable wrap(VirtualThread vthread, Runnable task) { 249 return new Runnable() { 250 @Hidden 251 @JvmtiHideEvents 252 public void run() { 253 vthread.endFirstTransition(); 254 try { 255 vthread.run(task); 256 } finally { 257 vthread.startFinalTransition(); 258 } 259 } 260 }; 261 } 262 } 263 264 /** 265 * Runs or continues execution on the current thread. The virtual thread is mounted 266 * on the current thread before the task runs or continues. It unmounts when the 267 * task completes or yields. 268 */ 269 @ChangesCurrentThread // allow mount/unmount to be inlined 270 private void runContinuation() { 271 // the carrier must be a platform thread 272 if (Thread.currentThread().isVirtual()) { 273 throw new WrongThreadException(); 274 } 275 276 // set state to RUNNING 277 int initialState = state(); 278 if (initialState == STARTED || initialState == UNPARKED 279 || initialState == UNBLOCKED || initialState == YIELDED) { 280 // newly started or continue after parking/blocking/Thread.yield 281 if (!compareAndSetState(initialState, RUNNING)) { 282 return; 283 } 284 // consume permit when continuing after parking or blocking. If continue 285 // after a timed-park or timed-wait then the timeout task is cancelled. 286 if (initialState == UNPARKED) { 287 cancelTimeoutTask(); 288 setParkPermit(false); 289 } else if (initialState == UNBLOCKED) { 290 cancelTimeoutTask(); 291 blockPermit = false; 292 } 293 } else { 294 // not runnable 295 return; 296 } 297 298 mount(); 299 try { 300 cont.run(); 301 } finally { 302 unmount(); 303 if (cont.isDone()) { 304 afterDone(); 305 } else { 306 afterYield(); 307 } 308 } 309 } 310 311 /** 312 * Cancel timeout task when continuing after timed-park or timed-wait. 313 * The timeout task may be executing, or may have already completed. 314 */ 315 private void cancelTimeoutTask() { 316 if (timeoutTask != null) { 317 timeoutTask.cancel(false); 318 timeoutTask = null; 319 } 320 } 321 322 /** 323 * Submits the given task to the given executor. If the scheduler is a 324 * ForkJoinPool then the task is first adapted to a ForkJoinTask. 325 */ 326 private void submit(Executor executor, Runnable task) { 327 if (executor instanceof ForkJoinPool pool) { 328 pool.submit(ForkJoinTask.adapt(task)); 329 } else { 330 executor.execute(task); 331 } 332 } 333 334 /** 335 * Submits the runContinuation task to the scheduler. For the default scheduler, 336 * and calling it on a worker thread, the task will be pushed to the local queue, 337 * otherwise it will be pushed to an external submission queue. 338 * @param scheduler the scheduler 339 * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown 340 * @throws RejectedExecutionException 341 */ 342 private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) { 343 boolean done = false; 344 while (!done) { 345 try { 346 // Pin the continuation to prevent the virtual thread from unmounting 347 // when submitting a task. For the default scheduler this ensures that 348 // the carrier doesn't change when pushing a task. For other schedulers 349 // it avoids deadlock that could arise due to carriers and virtual 350 // threads contending for a lock. 351 if (currentThread().isVirtual()) { 352 Continuation.pin(); 353 try { 354 submit(scheduler, runContinuation); 355 } finally { 356 Continuation.unpin(); 357 } 358 } else { 359 submit(scheduler, runContinuation); 360 } 361 done = true; 362 } catch (RejectedExecutionException ree) { 363 submitFailed(ree); 364 throw ree; 365 } catch (OutOfMemoryError e) { 366 if (retryOnOOME) { 367 U.park(false, 100_000_000); // 100ms 368 } else { 369 throw e; 370 } 371 } 372 } 373 } 374 375 /** 376 * Submits the runContinuation task to the given scheduler as an external submit. 377 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 378 * @throws RejectedExecutionException 379 * @see ForkJoinPool#externalSubmit(ForkJoinTask) 380 */ 381 private void externalSubmitRunContinuation(ForkJoinPool pool) { 382 assert Thread.currentThread() instanceof CarrierThread; 383 try { 384 pool.externalSubmit(ForkJoinTask.adapt(runContinuation)); 385 } catch (RejectedExecutionException ree) { 386 submitFailed(ree); 387 throw ree; 388 } catch (OutOfMemoryError e) { 389 submitRunContinuation(pool, true); 390 } 391 } 392 393 /** 394 * Submits the runContinuation task to the scheduler. For the default scheduler, 395 * and calling it on a worker thread, the task will be pushed to the local queue, 396 * otherwise it will be pushed to an external submission queue. 397 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 398 * @throws RejectedExecutionException 399 */ 400 private void submitRunContinuation() { 401 submitRunContinuation(scheduler, true); 402 } 403 404 /** 405 * Lazy submit the runContinuation task if invoked on a carrier thread and its local 406 * queue is empty. If not empty, or invoked by another thread, then this method works 407 * like submitRunContinuation and just submits the task to the scheduler. 408 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 409 * @throws RejectedExecutionException 410 * @see ForkJoinPool#lazySubmit(ForkJoinTask) 411 */ 412 private void lazySubmitRunContinuation() { 413 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 414 ForkJoinPool pool = ct.getPool(); 415 try { 416 pool.lazySubmit(ForkJoinTask.adapt(runContinuation)); 417 } catch (RejectedExecutionException ree) { 418 submitFailed(ree); 419 throw ree; 420 } catch (OutOfMemoryError e) { 421 submitRunContinuation(); 422 } 423 } else { 424 submitRunContinuation(); 425 } 426 } 427 428 /** 429 * Submits the runContinuation task to the scheduler. For the default scheduler, and 430 * calling it a virtual thread that uses the default scheduler, the task will be 431 * pushed to an external submission queue. This method may throw OutOfMemoryError. 432 * @throws RejectedExecutionException 433 * @throws OutOfMemoryError 434 */ 435 private void externalSubmitRunContinuationOrThrow() { 436 if (scheduler == DEFAULT_SCHEDULER && currentCarrierThread() instanceof CarrierThread ct) { 437 try { 438 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation)); 439 } catch (RejectedExecutionException ree) { 440 submitFailed(ree); 441 throw ree; 442 } 443 } else { 444 submitRunContinuation(scheduler, false); 445 } 446 } 447 448 /** 449 * If enabled, emits a JFR VirtualThreadSubmitFailedEvent. 450 */ 451 private void submitFailed(RejectedExecutionException ree) { 452 var event = new VirtualThreadSubmitFailedEvent(); 453 if (event.isEnabled()) { 454 event.javaThreadId = threadId(); 455 event.exceptionMessage = ree.getMessage(); 456 event.commit(); 457 } 458 } 459 460 /** 461 * Runs a task in the context of this virtual thread. 462 */ 463 private void run(Runnable task) { 464 assert Thread.currentThread() == this && state == RUNNING; 465 466 // emit JFR event if enabled 467 if (VirtualThreadStartEvent.isTurnedOn()) { 468 var event = new VirtualThreadStartEvent(); 469 event.javaThreadId = threadId(); 470 event.commit(); 471 } 472 473 Object bindings = Thread.scopedValueBindings(); 474 try { 475 runWith(bindings, task); 476 } catch (Throwable exc) { 477 dispatchUncaughtException(exc); 478 } finally { 479 // pop any remaining scopes from the stack, this may block 480 StackableScope.popAll(); 481 482 // emit JFR event if enabled 483 if (VirtualThreadEndEvent.isTurnedOn()) { 484 var event = new VirtualThreadEndEvent(); 485 event.javaThreadId = threadId(); 486 event.commit(); 487 } 488 } 489 } 490 491 /** 492 * Mounts this virtual thread onto the current platform thread. On 493 * return, the current thread is the virtual thread. 494 */ 495 @ChangesCurrentThread 496 @ReservedStackAccess 497 private void mount() { 498 startTransition(/*is_mount*/true); 499 // We assume following volatile accesses provide equivalent 500 // of acquire ordering, otherwise we need U.loadFence() here. 501 502 // sets the carrier thread 503 Thread carrier = Thread.currentCarrierThread(); 504 setCarrierThread(carrier); 505 506 // sync up carrier thread interrupted status if needed 507 if (interrupted) { 508 carrier.setInterrupt(); 509 } else if (carrier.isInterrupted()) { 510 synchronized (interruptLock) { 511 // need to recheck interrupted status 512 if (!interrupted) { 513 carrier.clearInterrupt(); 514 } 515 } 516 } 517 518 // set Thread.currentThread() to return this virtual thread 519 carrier.setCurrentThread(this); 520 } 521 522 /** 523 * Unmounts this virtual thread from the carrier. On return, the 524 * current thread is the current platform thread. 525 */ 526 @ChangesCurrentThread 527 @ReservedStackAccess 528 private void unmount() { 529 assert !Thread.holdsLock(interruptLock); 530 531 // set Thread.currentThread() to return the platform thread 532 Thread carrier = this.carrierThread; 533 carrier.setCurrentThread(carrier); 534 535 // break connection to carrier thread, synchronized with interrupt 536 synchronized (interruptLock) { 537 setCarrierThread(null); 538 } 539 carrier.clearInterrupt(); 540 541 // We assume previous volatile accesses provide equivalent 542 // of release ordering, otherwise we need U.storeFence() here. 543 endTransition(/*is_mount*/false); 544 } 545 546 /** 547 * Invokes Continuation.yield, notifying JVMTI (if enabled) to hide frames until 548 * the continuation continues. 549 */ 550 @Hidden 551 private boolean yieldContinuation() { 552 startTransition(/*is_mount*/false); 553 try { 554 return Continuation.yield(VTHREAD_SCOPE); 555 } finally { 556 endTransition(/*is_mount*/true); 557 } 558 } 559 560 /** 561 * Invoked in the context of the carrier thread after the Continuation yields when 562 * parking, blocking on monitor enter, Object.wait, or Thread.yield. 563 */ 564 private void afterYield() { 565 assert carrierThread == null; 566 567 // re-adjust parallelism if the virtual thread yielded when compensating 568 if (currentThread() instanceof CarrierThread ct) { 569 ct.endBlocking(); 570 } 571 572 int s = state(); 573 574 // LockSupport.park/parkNanos 575 if (s == PARKING || s == TIMED_PARKING) { 576 int newState; 577 if (s == PARKING) { 578 setState(newState = PARKED); 579 } else { 580 // schedule unpark 581 long timeout = this.timeout; 582 assert timeout > 0; 583 timeoutTask = schedule(this::parkTimeoutExpired, timeout, NANOSECONDS); 584 setState(newState = TIMED_PARKED); 585 } 586 587 // may have been unparked while parking 588 if (parkPermit && compareAndSetState(newState, UNPARKED)) { 589 // lazy submit if local queue is empty 590 lazySubmitRunContinuation(); 591 } 592 return; 593 } 594 595 // Thread.yield 596 if (s == YIELDING) { 597 setState(YIELDED); 598 599 // external submit if there are no tasks in the local task queue 600 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 601 externalSubmitRunContinuation(ct.getPool()); 602 } else { 603 submitRunContinuation(); 604 } 605 return; 606 } 607 608 // blocking on monitorenter 609 if (s == BLOCKING) { 610 setState(BLOCKED); 611 612 // may have been unblocked while blocking 613 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) { 614 // lazy submit if local queue is empty 615 lazySubmitRunContinuation(); 616 } 617 return; 618 } 619 620 // Object.wait 621 if (s == WAITING || s == TIMED_WAITING) { 622 int newState; 623 boolean interruptible = interruptibleWait; 624 if (s == WAITING) { 625 setState(newState = WAIT); 626 } else { 627 // For timed-wait, a timeout task is scheduled to execute. The timeout 628 // task will change the thread state to UNBLOCKED and submit the thread 629 // to the scheduler. A sequence number is used to ensure that the timeout 630 // task only unblocks the thread for this timed-wait. We synchronize with 631 // the timeout task to coordinate access to the sequence number and to 632 // ensure the timeout task doesn't execute until the thread has got to 633 // the TIMED_WAIT state. 634 long timeout = this.timeout; 635 assert timeout > 0; 636 synchronized (timedWaitLock()) { 637 byte seqNo = ++timedWaitSeqNo; 638 timeoutTask = schedule(() -> waitTimeoutExpired(seqNo), timeout, MILLISECONDS); 639 setState(newState = TIMED_WAIT); 640 } 641 } 642 643 // may have been notified while in transition to wait state 644 if (notified && compareAndSetState(newState, BLOCKED)) { 645 // may have even been unblocked already 646 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) { 647 submitRunContinuation(); 648 } 649 return; 650 } 651 652 // may have been interrupted while in transition to wait state 653 if (interruptible && interrupted && compareAndSetState(newState, UNBLOCKED)) { 654 submitRunContinuation(); 655 return; 656 } 657 return; 658 } 659 660 assert false; 661 } 662 663 /** 664 * Invoked after the continuation completes. 665 */ 666 private void afterDone() { 667 afterDone(true); 668 } 669 670 /** 671 * Invoked after the continuation completes (or start failed). Sets the thread 672 * state to TERMINATED and notifies anyone waiting for the thread to terminate. 673 * 674 * @param notifyContainer true if its container should be notified 675 */ 676 private void afterDone(boolean notifyContainer) { 677 assert carrierThread == null; 678 setState(TERMINATED); 679 680 // notify anyone waiting for this virtual thread to terminate 681 CountDownLatch termination = this.termination; 682 if (termination != null) { 683 assert termination.getCount() == 1; 684 termination.countDown(); 685 } 686 687 // notify container 688 if (notifyContainer) { 689 threadContainer().remove(this); 690 } 691 692 // clear references to thread locals 693 clearReferences(); 694 } 695 696 /** 697 * Schedules this {@code VirtualThread} to execute. 698 * 699 * @throws IllegalStateException if the container is shutdown or closed 700 * @throws IllegalThreadStateException if the thread has already been started 701 * @throws RejectedExecutionException if the scheduler cannot accept a task 702 */ 703 @Override 704 void start(ThreadContainer container) { 705 if (!compareAndSetState(NEW, STARTED)) { 706 throw new IllegalThreadStateException("Already started"); 707 } 708 709 // bind thread to container 710 assert threadContainer() == null; 711 setThreadContainer(container); 712 713 // start thread 714 boolean addedToContainer = false; 715 boolean started = false; 716 try { 717 container.add(this); // may throw 718 addedToContainer = true; 719 720 // scoped values may be inherited 721 inheritScopedValueBindings(container); 722 723 // submit task to run thread, using externalSubmit if possible 724 externalSubmitRunContinuationOrThrow(); 725 started = true; 726 } finally { 727 if (!started) { 728 afterDone(addedToContainer); 729 } 730 } 731 } 732 733 @Override 734 public void start() { 735 start(ThreadContainers.root()); 736 } 737 738 @Override 739 public void run() { 740 // do nothing 741 } 742 743 /** 744 * Parks until unparked or interrupted. If already unparked then the parking 745 * permit is consumed and this method completes immediately (meaning it doesn't 746 * yield). It also completes immediately if the interrupted status is set. 747 */ 748 @Override 749 void park() { 750 assert Thread.currentThread() == this; 751 752 // complete immediately if parking permit available or interrupted 753 if (getAndSetParkPermit(false) || interrupted) 754 return; 755 756 // park the thread 757 boolean yielded = false; 758 setState(PARKING); 759 try { 760 yielded = yieldContinuation(); 761 } catch (OutOfMemoryError e) { 762 // park on carrier 763 } finally { 764 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 765 if (!yielded) { 766 assert state() == PARKING; 767 setState(RUNNING); 768 } 769 } 770 771 // park on the carrier thread when pinned 772 if (!yielded) { 773 parkOnCarrierThread(false, 0); 774 } 775 } 776 777 /** 778 * Parks up to the given waiting time or until unparked or interrupted. 779 * If already unparked then the parking permit is consumed and this method 780 * completes immediately (meaning it doesn't yield). It also completes immediately 781 * if the interrupted status is set or the waiting time is {@code <= 0}. 782 * 783 * @param nanos the maximum number of nanoseconds to wait. 784 */ 785 @Override 786 void parkNanos(long nanos) { 787 assert Thread.currentThread() == this; 788 789 // complete immediately if parking permit available or interrupted 790 if (getAndSetParkPermit(false) || interrupted) 791 return; 792 793 // park the thread for the waiting time 794 if (nanos > 0) { 795 long startTime = System.nanoTime(); 796 797 // park the thread, afterYield will schedule the thread to unpark 798 boolean yielded = false; 799 timeout = nanos; 800 setState(TIMED_PARKING); 801 try { 802 yielded = yieldContinuation(); 803 } catch (OutOfMemoryError e) { 804 // park on carrier 805 } finally { 806 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 807 if (!yielded) { 808 assert state() == TIMED_PARKING; 809 setState(RUNNING); 810 } 811 } 812 813 // park on carrier thread for remaining time when pinned (or OOME) 814 if (!yielded) { 815 long remainingNanos = nanos - (System.nanoTime() - startTime); 816 parkOnCarrierThread(true, remainingNanos); 817 } 818 } 819 } 820 821 /** 822 * Parks the current carrier thread up to the given waiting time or until 823 * unparked or interrupted. If the virtual thread is interrupted then the 824 * interrupted status will be propagated to the carrier thread. 825 * @param timed true for a timed park, false for untimed 826 * @param nanos the waiting time in nanoseconds 827 */ 828 private void parkOnCarrierThread(boolean timed, long nanos) { 829 assert state() == RUNNING; 830 831 setState(timed ? TIMED_PINNED : PINNED); 832 try { 833 if (!parkPermit) { 834 if (!timed) { 835 U.park(false, 0); 836 } else if (nanos > 0) { 837 U.park(false, nanos); 838 } 839 } 840 } finally { 841 setState(RUNNING); 842 } 843 844 // consume parking permit 845 setParkPermit(false); 846 847 // JFR jdk.VirtualThreadPinned event 848 postPinnedEvent("LockSupport.park"); 849 } 850 851 /** 852 * Call into VM when pinned to record a JFR jdk.VirtualThreadPinned event. 853 * Recording the event in the VM avoids having JFR event recorded in Java 854 * with the same name, but different ID, to events recorded by the VM. 855 */ 856 @Hidden 857 private static native void postPinnedEvent(String op); 858 859 /** 860 * Re-enables this virtual thread for scheduling. If this virtual thread is parked 861 * then its task is scheduled to continue, otherwise its next call to {@code park} or 862 * {@linkplain #parkNanos(long) parkNanos} is guaranteed not to block. 863 * @param lazySubmit to use lazySubmit if possible 864 * @throws RejectedExecutionException if the scheduler cannot accept a task 865 */ 866 private void unpark(boolean lazySubmit) { 867 if (!getAndSetParkPermit(true) && currentThread() != this) { 868 int s = state(); 869 870 // unparked while parked 871 if ((s == PARKED || s == TIMED_PARKED) && compareAndSetState(s, UNPARKED)) { 872 if (lazySubmit) { 873 lazySubmitRunContinuation(); 874 } else { 875 submitRunContinuation(); 876 } 877 return; 878 } 879 880 // unparked while parked when pinned 881 if (s == PINNED || s == TIMED_PINNED) { 882 // unpark carrier thread when pinned 883 disableSuspendAndPreempt(); 884 try { 885 synchronized (carrierThreadAccessLock()) { 886 Thread carrier = carrierThread; 887 if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) { 888 U.unpark(carrier); 889 } 890 } 891 } finally { 892 enableSuspendAndPreempt(); 893 } 894 return; 895 } 896 } 897 } 898 899 @Override 900 void unpark() { 901 unpark(false); 902 } 903 904 /** 905 * Invoked by unblocker thread to unblock this virtual thread. 906 */ 907 private void unblock() { 908 assert !Thread.currentThread().isVirtual(); 909 blockPermit = true; 910 if (state() == BLOCKED && compareAndSetState(BLOCKED, UNBLOCKED)) { 911 submitRunContinuation(); 912 } 913 } 914 915 /** 916 * Invoked by FJP worker thread or STPE thread when park timeout expires. 917 */ 918 private void parkTimeoutExpired() { 919 assert !VirtualThread.currentThread().isVirtual(); 920 unpark(true); 921 } 922 923 /** 924 * Invoked by FJP worker thread or STPE thread when wait timeout expires. 925 * If the virtual thread is in timed-wait then this method will unblock the thread 926 * and submit its task so that it continues and attempts to reenter the monitor. 927 * This method does nothing if the thread has been woken by notify or interrupt. 928 */ 929 private void waitTimeoutExpired(byte seqNo) { 930 assert !Thread.currentThread().isVirtual(); 931 for (;;) { 932 boolean unblocked = false; 933 synchronized (timedWaitLock()) { 934 if (seqNo != timedWaitSeqNo) { 935 // this timeout task is for a past timed-wait 936 return; 937 } 938 int s = state(); 939 if (s == TIMED_WAIT) { 940 unblocked = compareAndSetState(TIMED_WAIT, UNBLOCKED); 941 } else if (s != (TIMED_WAIT | SUSPENDED)) { 942 // notified or interrupted, no longer waiting 943 return; 944 } 945 } 946 if (unblocked) { 947 lazySubmitRunContinuation(); 948 return; 949 } 950 // need to retry when thread is suspended in time-wait 951 Thread.yield(); 952 } 953 } 954 955 /** 956 * Attempts to yield the current virtual thread (Thread.yield). 957 */ 958 void tryYield() { 959 assert Thread.currentThread() == this; 960 setState(YIELDING); 961 boolean yielded = false; 962 try { 963 yielded = yieldContinuation(); // may throw 964 } finally { 965 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 966 if (!yielded) { 967 assert state() == YIELDING; 968 setState(RUNNING); 969 } 970 } 971 } 972 973 /** 974 * Sleep the current thread for the given sleep time (in nanoseconds). If 975 * nanos is 0 then the thread will attempt to yield. 976 * 977 * @implNote This implementation parks the thread for the given sleeping time 978 * and will therefore be observed in PARKED state during the sleep. Parking 979 * will consume the parking permit so this method makes available the parking 980 * permit after the sleep. This may be observed as a spurious, but benign, 981 * wakeup when the thread subsequently attempts to park. 982 * 983 * @param nanos the maximum number of nanoseconds to sleep 984 * @throws InterruptedException if interrupted while sleeping 985 */ 986 void sleepNanos(long nanos) throws InterruptedException { 987 assert Thread.currentThread() == this && nanos >= 0; 988 if (getAndClearInterrupt()) 989 throw new InterruptedException(); 990 if (nanos == 0) { 991 tryYield(); 992 } else { 993 // park for the sleep time 994 try { 995 long remainingNanos = nanos; 996 long startNanos = System.nanoTime(); 997 while (remainingNanos > 0) { 998 parkNanos(remainingNanos); 999 if (getAndClearInterrupt()) { 1000 throw new InterruptedException(); 1001 } 1002 remainingNanos = nanos - (System.nanoTime() - startNanos); 1003 } 1004 } finally { 1005 // may have been unparked while sleeping 1006 setParkPermit(true); 1007 } 1008 } 1009 } 1010 1011 /** 1012 * Waits up to {@code nanos} nanoseconds for this virtual thread to terminate. 1013 * A timeout of {@code 0} means to wait forever. 1014 * 1015 * @throws InterruptedException if interrupted while waiting 1016 * @return true if the thread has terminated 1017 */ 1018 boolean joinNanos(long nanos) throws InterruptedException { 1019 if (state() == TERMINATED) 1020 return true; 1021 1022 // ensure termination object exists, then re-check state 1023 CountDownLatch termination = getTermination(); 1024 if (state() == TERMINATED) 1025 return true; 1026 1027 // wait for virtual thread to terminate 1028 if (nanos == 0) { 1029 termination.await(); 1030 } else { 1031 boolean terminated = termination.await(nanos, NANOSECONDS); 1032 if (!terminated) { 1033 // waiting time elapsed 1034 return false; 1035 } 1036 } 1037 assert state() == TERMINATED; 1038 return true; 1039 } 1040 1041 @Override 1042 void blockedOn(Interruptible b) { 1043 disableSuspendAndPreempt(); 1044 try { 1045 super.blockedOn(b); 1046 } finally { 1047 enableSuspendAndPreempt(); 1048 } 1049 } 1050 1051 @Override 1052 public void interrupt() { 1053 if (Thread.currentThread() != this) { 1054 // if current thread is a virtual thread then prevent it from being 1055 // suspended or unmounted when entering or holding interruptLock 1056 Interruptible blocker; 1057 disableSuspendAndPreempt(); 1058 try { 1059 synchronized (interruptLock) { 1060 interrupted = true; 1061 blocker = nioBlocker(); 1062 if (blocker != null) { 1063 blocker.interrupt(this); 1064 } 1065 1066 // interrupt carrier thread if mounted 1067 Thread carrier = carrierThread; 1068 if (carrier != null) carrier.setInterrupt(); 1069 } 1070 } finally { 1071 enableSuspendAndPreempt(); 1072 } 1073 1074 // notify blocker after releasing interruptLock 1075 if (blocker != null) { 1076 blocker.postInterrupt(); 1077 } 1078 1079 // make available parking permit, unpark thread if parked 1080 unpark(); 1081 1082 // if thread is waiting in Object.wait then schedule to try to reenter 1083 int s = state(); 1084 if ((s == WAIT || s == TIMED_WAIT) && compareAndSetState(s, UNBLOCKED)) { 1085 submitRunContinuation(); 1086 } 1087 1088 } else { 1089 interrupted = true; 1090 carrierThread.setInterrupt(); 1091 setParkPermit(true); 1092 } 1093 } 1094 1095 @Override 1096 public boolean isInterrupted() { 1097 return interrupted; 1098 } 1099 1100 @Override 1101 boolean getAndClearInterrupt() { 1102 assert Thread.currentThread() == this; 1103 boolean oldValue = interrupted; 1104 if (oldValue) { 1105 disableSuspendAndPreempt(); 1106 try { 1107 synchronized (interruptLock) { 1108 interrupted = false; 1109 carrierThread.clearInterrupt(); 1110 } 1111 } finally { 1112 enableSuspendAndPreempt(); 1113 } 1114 } 1115 return oldValue; 1116 } 1117 1118 @Override 1119 Thread.State threadState() { 1120 int s = state(); 1121 switch (s & ~SUSPENDED) { 1122 case NEW: 1123 return Thread.State.NEW; 1124 case STARTED: 1125 // return NEW if thread container not yet set 1126 if (threadContainer() == null) { 1127 return Thread.State.NEW; 1128 } else { 1129 return Thread.State.RUNNABLE; 1130 } 1131 case UNPARKED: 1132 case UNBLOCKED: 1133 case YIELDED: 1134 // runnable, not mounted 1135 return Thread.State.RUNNABLE; 1136 case RUNNING: 1137 // if mounted then return state of carrier thread 1138 if (Thread.currentThread() != this) { 1139 disableSuspendAndPreempt(); 1140 try { 1141 synchronized (carrierThreadAccessLock()) { 1142 Thread carrierThread = this.carrierThread; 1143 if (carrierThread != null) { 1144 return carrierThread.threadState(); 1145 } 1146 } 1147 } finally { 1148 enableSuspendAndPreempt(); 1149 } 1150 } 1151 // runnable, mounted 1152 return Thread.State.RUNNABLE; 1153 case PARKING: 1154 case TIMED_PARKING: 1155 case WAITING: 1156 case TIMED_WAITING: 1157 case YIELDING: 1158 // runnable, in transition 1159 return Thread.State.RUNNABLE; 1160 case PARKED: 1161 case PINNED: 1162 case WAIT: 1163 return Thread.State.WAITING; 1164 case TIMED_PARKED: 1165 case TIMED_PINNED: 1166 case TIMED_WAIT: 1167 return Thread.State.TIMED_WAITING; 1168 case BLOCKING: 1169 case BLOCKED: 1170 return Thread.State.BLOCKED; 1171 case TERMINATED: 1172 return Thread.State.TERMINATED; 1173 default: 1174 throw new InternalError(); 1175 } 1176 } 1177 1178 @Override 1179 boolean alive() { 1180 int s = state; 1181 return (s != NEW && s != TERMINATED); 1182 } 1183 1184 @Override 1185 boolean isTerminated() { 1186 return (state == TERMINATED); 1187 } 1188 1189 @Override 1190 StackTraceElement[] asyncGetStackTrace() { 1191 StackTraceElement[] stackTrace; 1192 do { 1193 stackTrace = (carrierThread != null) 1194 ? super.asyncGetStackTrace() // mounted 1195 : tryGetStackTrace(); // unmounted 1196 if (stackTrace == null) { 1197 Thread.yield(); 1198 } 1199 } while (stackTrace == null); 1200 return stackTrace; 1201 } 1202 1203 /** 1204 * Returns the stack trace for this virtual thread if it is unmounted. 1205 * Returns null if the thread is mounted or in transition. 1206 */ 1207 private StackTraceElement[] tryGetStackTrace() { 1208 int initialState = state() & ~SUSPENDED; 1209 switch (initialState) { 1210 case NEW, STARTED, TERMINATED -> { 1211 return new StackTraceElement[0]; // unmounted, empty stack 1212 } 1213 case RUNNING, PINNED, TIMED_PINNED -> { 1214 return null; // mounted 1215 } 1216 case PARKED, TIMED_PARKED, BLOCKED, WAIT, TIMED_WAIT -> { 1217 // unmounted, not runnable 1218 } 1219 case UNPARKED, UNBLOCKED, YIELDED -> { 1220 // unmounted, runnable 1221 } 1222 case PARKING, TIMED_PARKING, BLOCKING, YIELDING, WAITING, TIMED_WAITING -> { 1223 return null; // in transition 1224 } 1225 default -> throw new InternalError("" + initialState); 1226 } 1227 1228 // thread is unmounted, prevent it from continuing 1229 int suspendedState = initialState | SUSPENDED; 1230 if (!compareAndSetState(initialState, suspendedState)) { 1231 return null; 1232 } 1233 1234 // get stack trace and restore state 1235 StackTraceElement[] stack; 1236 try { 1237 stack = cont.getStackTrace(); 1238 } finally { 1239 assert state == suspendedState; 1240 setState(initialState); 1241 } 1242 boolean resubmit = switch (initialState) { 1243 case UNPARKED, UNBLOCKED, YIELDED -> { 1244 // resubmit as task may have run while suspended 1245 yield true; 1246 } 1247 case PARKED, TIMED_PARKED -> { 1248 // resubmit if unparked while suspended 1249 yield parkPermit && compareAndSetState(initialState, UNPARKED); 1250 } 1251 case BLOCKED -> { 1252 // resubmit if unblocked while suspended 1253 yield blockPermit && compareAndSetState(BLOCKED, UNBLOCKED); 1254 } 1255 case WAIT, TIMED_WAIT -> { 1256 // resubmit if notified or interrupted while waiting (Object.wait) 1257 // waitTimeoutExpired will retry if the timed expired when suspended 1258 yield (notified || interrupted) && compareAndSetState(initialState, UNBLOCKED); 1259 } 1260 default -> throw new InternalError(); 1261 }; 1262 if (resubmit) { 1263 submitRunContinuation(); 1264 } 1265 return stack; 1266 } 1267 1268 @Override 1269 public String toString() { 1270 StringBuilder sb = new StringBuilder("VirtualThread[#"); 1271 sb.append(threadId()); 1272 String name = getName(); 1273 if (!name.isEmpty()) { 1274 sb.append(","); 1275 sb.append(name); 1276 } 1277 sb.append("]/"); 1278 1279 // add the carrier state and thread name when mounted 1280 boolean mounted; 1281 if (Thread.currentThread() == this) { 1282 mounted = appendCarrierInfo(sb); 1283 } else { 1284 disableSuspendAndPreempt(); 1285 try { 1286 synchronized (carrierThreadAccessLock()) { 1287 mounted = appendCarrierInfo(sb); 1288 } 1289 } finally { 1290 enableSuspendAndPreempt(); 1291 } 1292 } 1293 1294 // add virtual thread state when not mounted 1295 if (!mounted) { 1296 String stateAsString = threadState().toString(); 1297 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1298 } 1299 1300 return sb.toString(); 1301 } 1302 1303 /** 1304 * Appends the carrier state and thread name to the string buffer if mounted. 1305 * @return true if mounted, false if not mounted 1306 */ 1307 private boolean appendCarrierInfo(StringBuilder sb) { 1308 assert Thread.currentThread() == this || Thread.holdsLock(carrierThreadAccessLock()); 1309 Thread carrier = carrierThread; 1310 if (carrier != null) { 1311 String stateAsString = carrier.threadState().toString(); 1312 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1313 sb.append('@'); 1314 sb.append(carrier.getName()); 1315 return true; 1316 } else { 1317 return false; 1318 } 1319 } 1320 1321 @Override 1322 public int hashCode() { 1323 return (int) threadId(); 1324 } 1325 1326 @Override 1327 public boolean equals(Object obj) { 1328 return obj == this; 1329 } 1330 1331 /** 1332 * Returns the termination object, creating it if needed. 1333 */ 1334 private CountDownLatch getTermination() { 1335 CountDownLatch termination = this.termination; 1336 if (termination == null) { 1337 termination = new CountDownLatch(1); 1338 if (!U.compareAndSetReference(this, TERMINATION, null, termination)) { 1339 termination = this.termination; 1340 } 1341 } 1342 return termination; 1343 } 1344 1345 /** 1346 * Returns the lock object to synchronize on when accessing carrierThread. 1347 * The lock prevents carrierThread from being reset to null during unmount. 1348 */ 1349 private Object carrierThreadAccessLock() { 1350 // return interruptLock as unmount has to coordinate with interrupt 1351 return interruptLock; 1352 } 1353 1354 /** 1355 * Returns a lock object for coordinating timed-wait setup and timeout handling. 1356 */ 1357 private Object timedWaitLock() { 1358 // use this object for now to avoid the overhead of introducing another lock 1359 return runContinuation; 1360 } 1361 1362 /** 1363 * Disallow the current thread be suspended or preempted. 1364 */ 1365 private void disableSuspendAndPreempt() { 1366 notifyJvmtiDisableSuspend(true); 1367 Continuation.pin(); 1368 } 1369 1370 /** 1371 * Allow the current thread be suspended or preempted. 1372 */ 1373 private void enableSuspendAndPreempt() { 1374 Continuation.unpin(); 1375 notifyJvmtiDisableSuspend(false); 1376 } 1377 1378 // -- wrappers for get/set of state, parking permit, and carrier thread -- 1379 1380 private int state() { 1381 return state; // volatile read 1382 } 1383 1384 private void setState(int newValue) { 1385 state = newValue; // volatile write 1386 } 1387 1388 private boolean compareAndSetState(int expectedValue, int newValue) { 1389 return U.compareAndSetInt(this, STATE, expectedValue, newValue); 1390 } 1391 1392 private boolean compareAndSetOnWaitingList(boolean expectedValue, boolean newValue) { 1393 return U.compareAndSetBoolean(this, ON_WAITING_LIST, expectedValue, newValue); 1394 } 1395 1396 private void setParkPermit(boolean newValue) { 1397 if (parkPermit != newValue) { 1398 parkPermit = newValue; 1399 } 1400 } 1401 1402 private boolean getAndSetParkPermit(boolean newValue) { 1403 if (parkPermit != newValue) { 1404 return U.getAndSetBoolean(this, PARK_PERMIT, newValue); 1405 } else { 1406 return newValue; 1407 } 1408 } 1409 1410 private void setCarrierThread(Thread carrier) { 1411 // U.putReferenceRelease(this, CARRIER_THREAD, carrier); 1412 this.carrierThread = carrier; 1413 } 1414 1415 // The following four methods notify the VM when a "transition" starts and ends. 1416 // A "mount transition" embodies the steps to transfer control from a platform 1417 // thread to a virtual thread, changing the thread identity, and starting or 1418 // resuming the virtual thread's continuation on the carrier. 1419 // An "unmount transition" embodies the steps to transfer control from a virtual 1420 // thread to its carrier, suspending the virtual thread's continuation, and 1421 // restoring the thread identity to the platform thread. 1422 // The notifications to the VM are necessary in order to coordinate with functions 1423 // (JVMTI mostly) that disable transitions for one or all virtual threads. Starting 1424 // a transition may block if transitions are disabled. Ending a transition may 1425 // notify a thread that is waiting to disable transitions. The notifications are 1426 // also used to post JVMTI events for virtual thread start and end. 1427 1428 @IntrinsicCandidate 1429 @JvmtiMountTransition 1430 private native void endFirstTransition(); 1431 1432 @IntrinsicCandidate 1433 @JvmtiMountTransition 1434 private native void startFinalTransition(); 1435 1436 @IntrinsicCandidate 1437 @JvmtiMountTransition 1438 private native void startTransition(boolean is_mount); 1439 1440 @IntrinsicCandidate 1441 @JvmtiMountTransition 1442 private native void endTransition(boolean is_mount); 1443 1444 @IntrinsicCandidate 1445 private static native void notifyJvmtiDisableSuspend(boolean enter); 1446 1447 private static native void registerNatives(); 1448 static { 1449 registerNatives(); 1450 1451 // ensure VTHREAD_GROUP is created, may be accessed by JVMTI 1452 var group = Thread.virtualThreadGroup(); 1453 } 1454 1455 /** 1456 * Creates the default ForkJoinPool scheduler. 1457 */ 1458 private static ForkJoinPool createDefaultScheduler() { 1459 ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool); 1460 int parallelism, maxPoolSize, minRunnable; 1461 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism"); 1462 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize"); 1463 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable"); 1464 if (parallelismValue != null) { 1465 parallelism = Integer.parseInt(parallelismValue); 1466 } else { 1467 parallelism = Runtime.getRuntime().availableProcessors(); 1468 } 1469 if (maxPoolSizeValue != null) { 1470 maxPoolSize = Integer.parseInt(maxPoolSizeValue); 1471 parallelism = Integer.min(parallelism, maxPoolSize); 1472 } else { 1473 maxPoolSize = Integer.max(parallelism, 256); 1474 } 1475 if (minRunnableValue != null) { 1476 minRunnable = Integer.parseInt(minRunnableValue); 1477 } else { 1478 minRunnable = Integer.max(parallelism / 2, 1); 1479 } 1480 Thread.UncaughtExceptionHandler handler = (t, e) -> { }; 1481 boolean asyncMode = true; // FIFO 1482 return new ForkJoinPool(parallelism, factory, handler, asyncMode, 1483 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS); 1484 } 1485 1486 /** 1487 * Schedule a runnable task to run after a delay. 1488 */ 1489 private Future<?> schedule(Runnable command, long delay, TimeUnit unit) { 1490 if (scheduler instanceof ForkJoinPool pool) { 1491 return pool.schedule(command, delay, unit); 1492 } else { 1493 return DelayedTaskSchedulers.schedule(command, delay, unit); 1494 } 1495 } 1496 1497 /** 1498 * Supports scheduling a runnable task to run after a delay. It uses a number 1499 * of ScheduledThreadPoolExecutor instances to reduce contention on the delayed 1500 * work queue used. This class is used when using a custom scheduler. 1501 */ 1502 private static class DelayedTaskSchedulers { 1503 private static final ScheduledExecutorService[] INSTANCE = createDelayedTaskSchedulers(); 1504 1505 static Future<?> schedule(Runnable command, long delay, TimeUnit unit) { 1506 long tid = Thread.currentThread().threadId(); 1507 int index = (int) tid & (INSTANCE.length - 1); 1508 return INSTANCE[index].schedule(command, delay, unit); 1509 } 1510 1511 private static ScheduledExecutorService[] createDelayedTaskSchedulers() { 1512 String propName = "jdk.virtualThreadScheduler.timerQueues"; 1513 String propValue = System.getProperty(propName); 1514 int queueCount; 1515 if (propValue != null) { 1516 queueCount = Integer.parseInt(propValue); 1517 if (queueCount != Integer.highestOneBit(queueCount)) { 1518 throw new RuntimeException("Value of " + propName + " must be power of 2"); 1519 } 1520 } else { 1521 int ncpus = Runtime.getRuntime().availableProcessors(); 1522 queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1); 1523 } 1524 var schedulers = new ScheduledExecutorService[queueCount]; 1525 for (int i = 0; i < queueCount; i++) { 1526 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) 1527 Executors.newScheduledThreadPool(1, task -> { 1528 Thread t = InnocuousThread.newThread("VirtualThread-unparker", task); 1529 t.setDaemon(true); 1530 return t; 1531 }); 1532 stpe.setRemoveOnCancelPolicy(true); 1533 schedulers[i] = stpe; 1534 } 1535 return schedulers; 1536 } 1537 } 1538 1539 /** 1540 * Schedule virtual threads that are ready to be scheduled after they blocked on 1541 * monitor enter. 1542 */ 1543 private static void unblockVirtualThreads() { 1544 while (true) { 1545 VirtualThread vthread = takeVirtualThreadListToUnblock(); 1546 while (vthread != null) { 1547 assert vthread.onWaitingList; 1548 VirtualThread nextThread = vthread.next; 1549 1550 // remove from list and unblock 1551 vthread.next = null; 1552 boolean changed = vthread.compareAndSetOnWaitingList(true, false); 1553 assert changed; 1554 vthread.unblock(); 1555 1556 vthread = nextThread; 1557 } 1558 } 1559 } 1560 1561 /** 1562 * Retrieves the list of virtual threads that are waiting to be unblocked, waiting 1563 * if necessary until a list of one or more threads becomes available. 1564 */ 1565 private static native VirtualThread takeVirtualThreadListToUnblock(); 1566 1567 static { 1568 var unblocker = InnocuousThread.newThread("VirtualThread-unblocker", 1569 VirtualThread::unblockVirtualThreads); 1570 unblocker.setDaemon(true); 1571 unblocker.start(); 1572 } 1573 } --- EOF ---