1 /* 2 * Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.lang; 26 27 import java.lang.invoke.MethodHandles; 28 import java.lang.invoke.VarHandle; 29 import java.lang.reflect.Constructor; 30 import java.util.Locale; 31 import java.util.Objects; 32 import java.util.concurrent.CountDownLatch; 33 import java.util.concurrent.Executor; 34 import java.util.concurrent.Executors; 35 import java.util.concurrent.ForkJoinPool; 36 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory; 37 import java.util.concurrent.ForkJoinTask; 38 import java.util.concurrent.Future; 39 import java.util.concurrent.RejectedExecutionException; 40 import java.util.concurrent.ScheduledExecutorService; 41 import java.util.concurrent.ScheduledThreadPoolExecutor; 42 import java.util.concurrent.TimeUnit; 43 import jdk.internal.event.VirtualThreadEndEvent; 44 import jdk.internal.event.VirtualThreadStartEvent; 45 import jdk.internal.event.VirtualThreadSubmitFailedEvent; 46 import jdk.internal.invoke.MhUtil; 47 import jdk.internal.misc.CarrierThread; 48 import jdk.internal.misc.InnocuousThread; 49 import jdk.internal.misc.Unsafe; 50 import jdk.internal.vm.Continuation; 51 import jdk.internal.vm.ContinuationScope; 52 import jdk.internal.vm.StackableScope; 53 import jdk.internal.vm.ThreadContainer; 54 import jdk.internal.vm.ThreadContainers; 55 import jdk.internal.vm.annotation.ChangesCurrentThread; 56 import jdk.internal.vm.annotation.Hidden; 57 import jdk.internal.vm.annotation.IntrinsicCandidate; 58 import jdk.internal.vm.annotation.JvmtiHideEvents; 59 import jdk.internal.vm.annotation.JvmtiMountTransition; 60 import jdk.internal.vm.annotation.ReservedStackAccess; 61 import sun.nio.ch.Interruptible; 62 import static java.util.concurrent.TimeUnit.*; 63 64 /** 65 * A thread that is scheduled by the Java virtual machine rather than the operating system. 66 */ 67 final class VirtualThread extends BaseVirtualThread { 68 private static final Unsafe U = Unsafe.getUnsafe(); 69 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads"); 70 71 private static final Executor DEFAULT_SCHEDULER; 72 private static final boolean USE_CUSTOM_RUNNER; 73 static { 74 // experimental 75 String propValue = System.getProperty("jdk.virtualThreadScheduler.implClass"); 76 if (propValue != null) { 77 DEFAULT_SCHEDULER = createCustomDefaultScheduler(propValue); 78 USE_CUSTOM_RUNNER = true; 79 } else { 80 DEFAULT_SCHEDULER = createDefaultForkJoinPoolScheduler(); 81 USE_CUSTOM_RUNNER = false; 82 } 83 } 84 85 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state"); 86 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit"); 87 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread"); 88 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination"); 89 private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList"); 90 91 // scheduler and continuation 92 private final Executor scheduler; 93 private final Continuation cont; 94 private final Runnable runContinuation; 95 96 // virtual thread state, accessed by VM 97 private volatile int state; 98 99 /* 100 * Virtual thread state transitions: 101 * 102 * NEW -> STARTED // Thread.start, schedule to run 103 * STARTED -> TERMINATED // failed to start 104 * STARTED -> RUNNING // first run 105 * RUNNING -> TERMINATED // done 106 * 107 * RUNNING -> PARKING // Thread parking with LockSupport.park 108 * PARKING -> PARKED // cont.yield successful, parked indefinitely 109 * PARKING -> PINNED // cont.yield failed, parked indefinitely on carrier 110 * PARKED -> UNPARKED // unparked, may be scheduled to continue 111 * PINNED -> RUNNING // unparked, continue execution on same carrier 112 * UNPARKED -> RUNNING // continue execution after park 113 * 114 * RUNNING -> TIMED_PARKING // Thread parking with LockSupport.parkNanos 115 * TIMED_PARKING -> TIMED_PARKED // cont.yield successful, timed-parked 116 * TIMED_PARKING -> TIMED_PINNED // cont.yield failed, timed-parked on carrier 117 * TIMED_PARKED -> UNPARKED // unparked, may be scheduled to continue 118 * TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier 119 * 120 * RUNNING -> BLOCKING // blocking on monitor enter 121 * BLOCKING -> BLOCKED // blocked on monitor enter 122 * BLOCKED -> UNBLOCKED // unblocked, may be scheduled to continue 123 * UNBLOCKED -> RUNNING // continue execution after blocked on monitor enter 124 * 125 * RUNNING -> WAITING // transitional state during wait on monitor 126 * WAITING -> WAIT // waiting on monitor 127 * WAIT -> BLOCKED // notified, waiting to be unblocked by monitor owner 128 * WAIT -> UNBLOCKED // timed-out/interrupted 129 * 130 * RUNNING -> TIMED_WAITING // transition state during timed-waiting on monitor 131 * TIMED_WAITING -> TIMED_WAIT // timed-waiting on monitor 132 * TIMED_WAIT -> BLOCKED // notified, waiting to be unblocked by monitor owner 133 * TIMED_WAIT -> UNBLOCKED // timed-out/interrupted 134 * 135 * RUNNING -> YIELDING // Thread.yield 136 * YIELDING -> YIELDED // cont.yield successful, may be scheduled to continue 137 * YIELDING -> RUNNING // cont.yield failed 138 * YIELDED -> RUNNING // continue execution after Thread.yield 139 */ 140 private static final int NEW = 0; 141 private static final int STARTED = 1; 142 private static final int RUNNING = 2; // runnable-mounted 143 144 // untimed and timed parking 145 private static final int PARKING = 3; 146 private static final int PARKED = 4; // unmounted 147 private static final int PINNED = 5; // mounted 148 private static final int TIMED_PARKING = 6; 149 private static final int TIMED_PARKED = 7; // unmounted 150 private static final int TIMED_PINNED = 8; // mounted 151 private static final int UNPARKED = 9; // unmounted but runnable 152 153 // Thread.yield 154 private static final int YIELDING = 10; 155 private static final int YIELDED = 11; // unmounted but runnable 156 157 // monitor enter 158 private static final int BLOCKING = 12; 159 private static final int BLOCKED = 13; // unmounted 160 private static final int UNBLOCKED = 14; // unmounted but runnable 161 162 // monitor wait/timed-wait 163 private static final int WAITING = 15; 164 private static final int WAIT = 16; // waiting in Object.wait 165 private static final int TIMED_WAITING = 17; 166 private static final int TIMED_WAIT = 18; // waiting in timed-Object.wait 167 168 private static final int TERMINATED = 99; // final state 169 170 // can be suspended from scheduling when unmounted 171 private static final int SUSPENDED = 1 << 8; 172 173 // parking permit made available by LockSupport.unpark 174 private volatile boolean parkPermit; 175 176 // blocking permit made available by unblocker thread when another thread exits monitor 177 private volatile boolean blockPermit; 178 179 // true when on the list of virtual threads waiting to be unblocked 180 private volatile boolean onWaitingList; 181 182 // next virtual thread on the list of virtual threads waiting to be unblocked 183 private volatile VirtualThread next; 184 185 // notified by Object.notify/notifyAll while waiting in Object.wait 186 private volatile boolean notified; 187 188 // true when waiting in Object.wait, false for VM internal uninterruptible Object.wait 189 private volatile boolean interruptableWait; 190 191 // timed-wait support 192 private byte timedWaitSeqNo; 193 194 // timeout for timed-park and timed-wait, only accessed on current/carrier thread 195 private long timeout; 196 197 // timer task for timed-park and timed-wait, only accessed on current/carrier thread 198 private Future<?> timeoutTask; 199 200 // carrier thread when mounted, accessed by VM 201 private volatile Thread carrierThread; 202 203 // termination object when joining, created lazily if needed 204 private volatile CountDownLatch termination; 205 206 /** 207 * Returns the default scheduler. 208 */ 209 static Executor defaultScheduler() { 210 return DEFAULT_SCHEDULER; 211 } 212 213 /** 214 * Returns true if using a custom default scheduler. 215 */ 216 static boolean isCustomDefaultScheduler() { 217 return USE_CUSTOM_RUNNER; 218 } 219 220 /** 221 * Returns the continuation scope used for virtual threads. 222 */ 223 static ContinuationScope continuationScope() { 224 return VTHREAD_SCOPE; 225 } 226 227 /** 228 * Return the scheduler for this thread. 229 */ 230 Executor scheduler() { 231 return scheduler; 232 } 233 234 /** 235 * Creates a new {@code VirtualThread} to run the given task with the given scheduler. 236 * 237 * @param scheduler the scheduler or null for default scheduler 238 * @param name thread name 239 * @param characteristics characteristics 240 * @param task the task to execute 241 */ 242 VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) { 243 super(name, characteristics, /*bound*/ false); 244 Objects.requireNonNull(task); 245 246 // use default scheduler if not provided 247 if (scheduler == null) { 248 scheduler = DEFAULT_SCHEDULER; 249 } 250 251 this.scheduler = scheduler; 252 this.cont = new VThreadContinuation(this, task); 253 if (USE_CUSTOM_RUNNER || (scheduler != DEFAULT_SCHEDULER)) { 254 this.runContinuation = new CustomRunner(this); 255 } else { 256 this.runContinuation = this::runContinuation; 257 } 258 } 259 260 /** 261 * The continuation that a virtual thread executes. 262 */ 263 private static class VThreadContinuation extends Continuation { 264 VThreadContinuation(VirtualThread vthread, Runnable task) { 265 super(VTHREAD_SCOPE, wrap(vthread, task)); 266 } 267 @Override 268 protected void onPinned(Continuation.Pinned reason) { 269 } 270 private static Runnable wrap(VirtualThread vthread, Runnable task) { 271 return new Runnable() { 272 @Hidden 273 @JvmtiHideEvents 274 public void run() { 275 vthread.notifyJvmtiStart(); // notify JVMTI 276 try { 277 vthread.run(task); 278 } finally { 279 vthread.notifyJvmtiEnd(); // notify JVMTI 280 } 281 } 282 }; 283 } 284 } 285 286 /** 287 * The task to execute when using a custom scheduler. 288 */ 289 private static class CustomRunner implements VirtualThreadTask { 290 private static final VarHandle ATTACHMENT = 291 MhUtil.findVarHandle(MethodHandles.lookup(), "attachment", Object.class); 292 private final VirtualThread vthread; 293 private volatile Object attachment; 294 CustomRunner(VirtualThread vthread) { 295 this.vthread = vthread; 296 } 297 @Override 298 public void run() { 299 vthread.runContinuation(); 300 } 301 @Override 302 public Thread thread() { 303 return vthread; 304 } 305 @Override 306 public Object attach(Object ob) { 307 return ATTACHMENT.getAndSet(this, ob); 308 } 309 @Override 310 public Object attachment() { 311 return attachment; 312 } 313 @Override 314 public String toString() { 315 return vthread.toString(); 316 } 317 } 318 319 /** 320 * Returns the object attached to the virtual thread's task. 321 */ 322 Object currentTaskAttachment() { 323 assert Thread.currentThread() == this; 324 if (runContinuation instanceof CustomRunner runner) { 325 return runner.attachment(); 326 } else { 327 return null; 328 } 329 } 330 331 /** 332 * Runs or continues execution on the current thread. The virtual thread is mounted 333 * on the current thread before the task runs or continues. It unmounts when the 334 * task completes or yields. 335 */ 336 @ChangesCurrentThread // allow mount/unmount to be inlined 337 private void runContinuation() { 338 // the carrier must be a platform thread 339 if (Thread.currentThread().isVirtual()) { 340 throw new WrongThreadException(); 341 } 342 343 // set state to RUNNING 344 int initialState = state(); 345 if (initialState == STARTED || initialState == UNPARKED 346 || initialState == UNBLOCKED || initialState == YIELDED) { 347 // newly started or continue after parking/blocking/Thread.yield 348 if (!compareAndSetState(initialState, RUNNING)) { 349 return; 350 } 351 // consume permit when continuing after parking or blocking. If continue 352 // after a timed-park or timed-wait then the timeout task is cancelled. 353 if (initialState == UNPARKED) { 354 cancelTimeoutTask(); 355 setParkPermit(false); 356 } else if (initialState == UNBLOCKED) { 357 cancelTimeoutTask(); 358 blockPermit = false; 359 } 360 } else { 361 // not runnable 362 return; 363 } 364 365 mount(); 366 try { 367 cont.run(); 368 } finally { 369 unmount(); 370 if (cont.isDone()) { 371 afterDone(); 372 } else { 373 afterYield(); 374 } 375 } 376 } 377 378 /** 379 * Cancel timeout task when continuing after timed-park or timed-wait. 380 * The timeout task may be executing, or may have already completed. 381 */ 382 private void cancelTimeoutTask() { 383 if (timeoutTask != null) { 384 timeoutTask.cancel(false); 385 timeoutTask = null; 386 } 387 } 388 389 /** 390 * Submits the given task to the given executor. If the scheduler is a 391 * ForkJoinPool then the task is first adapted to a ForkJoinTask. 392 */ 393 private void submit(Executor executor, Runnable task) { 394 if (executor instanceof ForkJoinPool pool) { 395 pool.submit(ForkJoinTask.adapt(task)); 396 } else { 397 executor.execute(task); 398 } 399 } 400 401 /** 402 * Submits the runContinuation task to the scheduler. For the default scheduler, 403 * and calling it on a worker thread, the task will be pushed to the local queue, 404 * otherwise it will be pushed to an external submission queue. 405 * @param scheduler the scheduler 406 * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown 407 * @throws RejectedExecutionException 408 */ 409 private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) { 410 boolean done = false; 411 while (!done) { 412 try { 413 // Pin the continuation to prevent the virtual thread from unmounting 414 // when submitting a task. For the default scheduler this ensures that 415 // the carrier doesn't change when pushing a task. For other schedulers 416 // it avoids deadlock that could arise due to carriers and virtual 417 // threads contending for a lock. 418 if (currentThread().isVirtual()) { 419 Continuation.pin(); 420 try { 421 submit(scheduler, runContinuation); 422 } finally { 423 Continuation.unpin(); 424 } 425 } else { 426 submit(scheduler, runContinuation); 427 } 428 done = true; 429 } catch (RejectedExecutionException ree) { 430 submitFailed(ree); 431 throw ree; 432 } catch (OutOfMemoryError e) { 433 if (retryOnOOME) { 434 U.park(false, 100_000_000); // 100ms 435 } else { 436 throw e; 437 } 438 } 439 } 440 } 441 442 /** 443 * Submits the runContinuation task to the given scheduler as an external submit. 444 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 445 * @throws RejectedExecutionException 446 * @see ForkJoinPool#externalSubmit(ForkJoinTask) 447 */ 448 private void externalSubmitRunContinuation(ForkJoinPool pool) { 449 assert Thread.currentThread() instanceof CarrierThread; 450 try { 451 pool.externalSubmit(ForkJoinTask.adapt(runContinuation)); 452 } catch (RejectedExecutionException ree) { 453 submitFailed(ree); 454 throw ree; 455 } catch (OutOfMemoryError e) { 456 submitRunContinuation(pool, true); 457 } 458 } 459 460 /** 461 * Submits the runContinuation task to the scheduler. For the default scheduler, 462 * and calling it on a worker thread, the task will be pushed to the local queue, 463 * otherwise it will be pushed to an external submission queue. 464 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 465 * @throws RejectedExecutionException 466 */ 467 private void submitRunContinuation() { 468 submitRunContinuation(scheduler, true); 469 } 470 471 /** 472 * Lazy submit the runContinuation task if invoked on a carrier thread and its local 473 * queue is empty. If not empty, or invoked by another thread, then this method works 474 * like submitRunContinuation and just submits the task to the scheduler. 475 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 476 * @throws RejectedExecutionException 477 * @see ForkJoinPool#lazySubmit(ForkJoinTask) 478 */ 479 private void lazySubmitRunContinuation() { 480 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 481 ForkJoinPool pool = ct.getPool(); 482 try { 483 pool.lazySubmit(ForkJoinTask.adapt(runContinuation)); 484 } catch (RejectedExecutionException ree) { 485 submitFailed(ree); 486 throw ree; 487 } catch (OutOfMemoryError e) { 488 submitRunContinuation(); 489 } 490 } else { 491 submitRunContinuation(); 492 } 493 } 494 495 /** 496 * Submits the runContinuation task to the scheduler. For the default scheduler, and 497 * calling it a virtual thread that uses the default scheduler, the task will be 498 * pushed to an external submission queue. This method may throw OutOfMemoryError. 499 * @throws RejectedExecutionException 500 * @throws OutOfMemoryError 501 */ 502 private void externalSubmitRunContinuationOrThrow() { 503 if (scheduler == DEFAULT_SCHEDULER && currentCarrierThread() instanceof CarrierThread ct) { 504 try { 505 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation)); 506 } catch (RejectedExecutionException ree) { 507 submitFailed(ree); 508 throw ree; 509 } 510 } else { 511 submitRunContinuation(scheduler, false); 512 } 513 } 514 515 /** 516 * If enabled, emits a JFR VirtualThreadSubmitFailedEvent. 517 */ 518 private void submitFailed(RejectedExecutionException ree) { 519 var event = new VirtualThreadSubmitFailedEvent(); 520 if (event.isEnabled()) { 521 event.javaThreadId = threadId(); 522 event.exceptionMessage = ree.getMessage(); 523 event.commit(); 524 } 525 } 526 527 /** 528 * Runs a task in the context of this virtual thread. 529 */ 530 private void run(Runnable task) { 531 assert Thread.currentThread() == this && state == RUNNING; 532 533 // emit JFR event if enabled 534 if (VirtualThreadStartEvent.isTurnedOn()) { 535 var event = new VirtualThreadStartEvent(); 536 event.javaThreadId = threadId(); 537 event.commit(); 538 } 539 540 Object bindings = Thread.scopedValueBindings(); 541 try { 542 runWith(bindings, task); 543 } catch (Throwable exc) { 544 dispatchUncaughtException(exc); 545 } finally { 546 // pop any remaining scopes from the stack, this may block 547 StackableScope.popAll(); 548 549 // emit JFR event if enabled 550 if (VirtualThreadEndEvent.isTurnedOn()) { 551 var event = new VirtualThreadEndEvent(); 552 event.javaThreadId = threadId(); 553 event.commit(); 554 } 555 } 556 } 557 558 /** 559 * Mounts this virtual thread onto the current platform thread. On 560 * return, the current thread is the virtual thread. 561 */ 562 @ChangesCurrentThread 563 @ReservedStackAccess 564 private void mount() { 565 // notify JVMTI before mount 566 notifyJvmtiMount(/*hide*/true); 567 568 // sets the carrier thread 569 Thread carrier = Thread.currentCarrierThread(); 570 setCarrierThread(carrier); 571 572 // sync up carrier thread interrupt status if needed 573 if (interrupted) { 574 carrier.setInterrupt(); 575 } else if (carrier.isInterrupted()) { 576 synchronized (interruptLock) { 577 // need to recheck interrupt status 578 if (!interrupted) { 579 carrier.clearInterrupt(); 580 } 581 } 582 } 583 584 // set Thread.currentThread() to return this virtual thread 585 carrier.setCurrentThread(this); 586 } 587 588 /** 589 * Unmounts this virtual thread from the carrier. On return, the 590 * current thread is the current platform thread. 591 */ 592 @ChangesCurrentThread 593 @ReservedStackAccess 594 private void unmount() { 595 assert !Thread.holdsLock(interruptLock); 596 597 // set Thread.currentThread() to return the platform thread 598 Thread carrier = this.carrierThread; 599 carrier.setCurrentThread(carrier); 600 601 // break connection to carrier thread, synchronized with interrupt 602 synchronized (interruptLock) { 603 setCarrierThread(null); 604 } 605 carrier.clearInterrupt(); 606 607 // notify JVMTI after unmount 608 notifyJvmtiUnmount(/*hide*/false); 609 } 610 611 /** 612 * Invokes Continuation.yield, notifying JVMTI (if enabled) to hide frames until 613 * the continuation continues. 614 */ 615 @Hidden 616 private boolean yieldContinuation() { 617 notifyJvmtiUnmount(/*hide*/true); 618 try { 619 return Continuation.yield(VTHREAD_SCOPE); 620 } finally { 621 notifyJvmtiMount(/*hide*/false); 622 } 623 } 624 625 /** 626 * Invoked in the context of the carrier thread after the Continuation yields when 627 * parking, blocking on monitor enter, Object.wait, or Thread.yield. 628 */ 629 private void afterYield() { 630 assert carrierThread == null; 631 632 // re-adjust parallelism if the virtual thread yielded when compensating 633 if (currentThread() instanceof CarrierThread ct) { 634 ct.endBlocking(); 635 } 636 637 int s = state(); 638 639 // LockSupport.park/parkNanos 640 if (s == PARKING || s == TIMED_PARKING) { 641 int newState; 642 if (s == PARKING) { 643 setState(newState = PARKED); 644 } else { 645 // schedule unpark 646 long timeout = this.timeout; 647 assert timeout > 0; 648 timeoutTask = schedule(this::parkTimeoutExpired, timeout, NANOSECONDS); 649 setState(newState = TIMED_PARKED); 650 } 651 652 // may have been unparked while parking 653 if (parkPermit && compareAndSetState(newState, UNPARKED)) { 654 // lazy submit if local queue is empty 655 lazySubmitRunContinuation(); 656 } 657 return; 658 } 659 660 // Thread.yield 661 if (s == YIELDING) { 662 setState(YIELDED); 663 664 // external submit if there are no tasks in the local task queue 665 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 666 externalSubmitRunContinuation(ct.getPool()); 667 } else { 668 submitRunContinuation(); 669 } 670 return; 671 } 672 673 // blocking on monitorenter 674 if (s == BLOCKING) { 675 setState(BLOCKED); 676 677 // may have been unblocked while blocking 678 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) { 679 // lazy submit if local queue is empty 680 lazySubmitRunContinuation(); 681 } 682 return; 683 } 684 685 // Object.wait 686 if (s == WAITING || s == TIMED_WAITING) { 687 int newState; 688 boolean interruptable = interruptableWait; 689 if (s == WAITING) { 690 setState(newState = WAIT); 691 } else { 692 // For timed-wait, a timeout task is scheduled to execute. The timeout 693 // task will change the thread state to UNBLOCKED and submit the thread 694 // to the scheduler. A sequence number is used to ensure that the timeout 695 // task only unblocks the thread for this timed-wait. We synchronize with 696 // the timeout task to coordinate access to the sequence number and to 697 // ensure the timeout task doesn't execute until the thread has got to 698 // the TIMED_WAIT state. 699 long timeout = this.timeout; 700 assert timeout > 0; 701 synchronized (timedWaitLock()) { 702 byte seqNo = ++timedWaitSeqNo; 703 timeoutTask = schedule(() -> waitTimeoutExpired(seqNo), timeout, MILLISECONDS); 704 setState(newState = TIMED_WAIT); 705 } 706 } 707 708 // may have been notified while in transition to wait state 709 if (notified && compareAndSetState(newState, BLOCKED)) { 710 // may have even been unblocked already 711 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) { 712 submitRunContinuation(); 713 } 714 return; 715 } 716 717 // may have been interrupted while in transition to wait state 718 if (interruptable && interrupted && compareAndSetState(newState, UNBLOCKED)) { 719 submitRunContinuation(); 720 return; 721 } 722 return; 723 } 724 725 assert false; 726 } 727 728 /** 729 * Invoked after the continuation completes. 730 */ 731 private void afterDone() { 732 afterDone(true); 733 } 734 735 /** 736 * Invoked after the continuation completes (or start failed). Sets the thread 737 * state to TERMINATED and notifies anyone waiting for the thread to terminate. 738 * 739 * @param notifyContainer true if its container should be notified 740 */ 741 private void afterDone(boolean notifyContainer) { 742 assert carrierThread == null; 743 setState(TERMINATED); 744 745 // notify anyone waiting for this virtual thread to terminate 746 CountDownLatch termination = this.termination; 747 if (termination != null) { 748 assert termination.getCount() == 1; 749 termination.countDown(); 750 } 751 752 // notify container 753 if (notifyContainer) { 754 threadContainer().remove(this); 755 } 756 757 // clear references to thread locals 758 clearReferences(); 759 } 760 761 /** 762 * Schedules this {@code VirtualThread} to execute. 763 * 764 * @throws IllegalStateException if the container is shutdown or closed 765 * @throws IllegalThreadStateException if the thread has already been started 766 * @throws RejectedExecutionException if the scheduler cannot accept a task 767 */ 768 @Override 769 void start(ThreadContainer container) { 770 if (!compareAndSetState(NEW, STARTED)) { 771 throw new IllegalThreadStateException("Already started"); 772 } 773 774 // bind thread to container 775 assert threadContainer() == null; 776 setThreadContainer(container); 777 778 // start thread 779 boolean addedToContainer = false; 780 boolean started = false; 781 try { 782 container.add(this); // may throw 783 addedToContainer = true; 784 785 // scoped values may be inherited 786 inheritScopedValueBindings(container); 787 788 // submit task to run thread, using externalSubmit if possible 789 externalSubmitRunContinuationOrThrow(); 790 started = true; 791 } finally { 792 if (!started) { 793 afterDone(addedToContainer); 794 } 795 } 796 } 797 798 @Override 799 public void start() { 800 start(ThreadContainers.root()); 801 } 802 803 @Override 804 public void run() { 805 // do nothing 806 } 807 808 /** 809 * Parks until unparked or interrupted. If already unparked then the parking 810 * permit is consumed and this method completes immediately (meaning it doesn't 811 * yield). It also completes immediately if the interrupt status is set. 812 */ 813 @Override 814 void park() { 815 assert Thread.currentThread() == this; 816 817 // complete immediately if parking permit available or interrupted 818 if (getAndSetParkPermit(false) || interrupted) 819 return; 820 821 // park the thread 822 boolean yielded = false; 823 setState(PARKING); 824 try { 825 yielded = yieldContinuation(); 826 } catch (OutOfMemoryError e) { 827 // park on carrier 828 } finally { 829 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 830 if (!yielded) { 831 assert state() == PARKING; 832 setState(RUNNING); 833 } 834 } 835 836 // park on the carrier thread when pinned 837 if (!yielded) { 838 parkOnCarrierThread(false, 0); 839 } 840 } 841 842 /** 843 * Parks up to the given waiting time or until unparked or interrupted. 844 * If already unparked then the parking permit is consumed and this method 845 * completes immediately (meaning it doesn't yield). It also completes immediately 846 * if the interrupt status is set or the waiting time is {@code <= 0}. 847 * 848 * @param nanos the maximum number of nanoseconds to wait. 849 */ 850 @Override 851 void parkNanos(long nanos) { 852 assert Thread.currentThread() == this; 853 854 // complete immediately if parking permit available or interrupted 855 if (getAndSetParkPermit(false) || interrupted) 856 return; 857 858 // park the thread for the waiting time 859 if (nanos > 0) { 860 long startTime = System.nanoTime(); 861 862 // park the thread, afterYield will schedule the thread to unpark 863 boolean yielded = false; 864 timeout = nanos; 865 setState(TIMED_PARKING); 866 try { 867 yielded = yieldContinuation(); 868 } catch (OutOfMemoryError e) { 869 // park on carrier 870 } finally { 871 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 872 if (!yielded) { 873 assert state() == TIMED_PARKING; 874 setState(RUNNING); 875 } 876 } 877 878 // park on carrier thread for remaining time when pinned (or OOME) 879 if (!yielded) { 880 long remainingNanos = nanos - (System.nanoTime() - startTime); 881 parkOnCarrierThread(true, remainingNanos); 882 } 883 } 884 } 885 886 /** 887 * Parks the current carrier thread up to the given waiting time or until 888 * unparked or interrupted. If the virtual thread is interrupted then the 889 * interrupt status will be propagated to the carrier thread. 890 * @param timed true for a timed park, false for untimed 891 * @param nanos the waiting time in nanoseconds 892 */ 893 private void parkOnCarrierThread(boolean timed, long nanos) { 894 assert state() == RUNNING; 895 896 setState(timed ? TIMED_PINNED : PINNED); 897 try { 898 if (!parkPermit) { 899 if (!timed) { 900 U.park(false, 0); 901 } else if (nanos > 0) { 902 U.park(false, nanos); 903 } 904 } 905 } finally { 906 setState(RUNNING); 907 } 908 909 // consume parking permit 910 setParkPermit(false); 911 912 // JFR jdk.VirtualThreadPinned event 913 postPinnedEvent("LockSupport.park"); 914 } 915 916 /** 917 * Call into VM when pinned to record a JFR jdk.VirtualThreadPinned event. 918 * Recording the event in the VM avoids having JFR event recorded in Java 919 * with the same name, but different ID, to events recorded by the VM. 920 */ 921 @Hidden 922 private static native void postPinnedEvent(String op); 923 924 /** 925 * Re-enables this virtual thread for scheduling. If this virtual thread is parked 926 * then its task is scheduled to continue, otherwise its next call to {@code park} or 927 * {@linkplain #parkNanos(long) parkNanos} is guaranteed not to block. 928 * @throws RejectedExecutionException if the scheduler cannot accept a task 929 */ 930 @Override 931 void unpark() { 932 if (!getAndSetParkPermit(true) && currentThread() != this) { 933 int s = state(); 934 935 // unparked while parked 936 if ((s == PARKED || s == TIMED_PARKED) && compareAndSetState(s, UNPARKED)) { 937 submitRunContinuation(); 938 return; 939 } 940 941 // unparked while parked when pinned 942 if (s == PINNED || s == TIMED_PINNED) { 943 // unpark carrier thread when pinned 944 disableSuspendAndPreempt(); 945 try { 946 synchronized (carrierThreadAccessLock()) { 947 Thread carrier = carrierThread; 948 if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) { 949 U.unpark(carrier); 950 } 951 } 952 } finally { 953 enableSuspendAndPreempt(); 954 } 955 return; 956 } 957 } 958 } 959 960 /** 961 * Invoked by unblocker thread to unblock this virtual thread. 962 */ 963 private void unblock() { 964 assert !Thread.currentThread().isVirtual(); 965 blockPermit = true; 966 if (state() == BLOCKED && compareAndSetState(BLOCKED, UNBLOCKED)) { 967 submitRunContinuation(); 968 } 969 } 970 971 /** 972 * Invoked by FJP worker thread or STPE thread when park timeout expires. 973 */ 974 private void parkTimeoutExpired() { 975 assert !VirtualThread.currentThread().isVirtual(); 976 if (!getAndSetParkPermit(true) 977 && (state() == TIMED_PARKED) 978 && compareAndSetState(TIMED_PARKED, UNPARKED)) { 979 lazySubmitRunContinuation(); 980 } 981 } 982 983 /** 984 * Invoked by FJP worker thread or STPE thread when wait timeout expires. 985 * If the virtual thread is in timed-wait then this method will unblock the thread 986 * and submit its task so that it continues and attempts to reenter the monitor. 987 * This method does nothing if the thread has been woken by notify or interrupt. 988 */ 989 private void waitTimeoutExpired(byte seqNo) { 990 assert !Thread.currentThread().isVirtual(); 991 for (;;) { 992 boolean unblocked = false; 993 synchronized (timedWaitLock()) { 994 if (seqNo != timedWaitSeqNo) { 995 // this timeout task is for a past timed-wait 996 return; 997 } 998 int s = state(); 999 if (s == TIMED_WAIT) { 1000 unblocked = compareAndSetState(TIMED_WAIT, UNBLOCKED); 1001 } else if (s != (TIMED_WAIT | SUSPENDED)) { 1002 // notified or interrupted, no longer waiting 1003 return; 1004 } 1005 } 1006 if (unblocked) { 1007 lazySubmitRunContinuation(); 1008 return; 1009 } 1010 // need to retry when thread is suspended in time-wait 1011 Thread.yield(); 1012 } 1013 } 1014 1015 /** 1016 * Attempts to yield the current virtual thread (Thread.yield). 1017 */ 1018 void tryYield() { 1019 assert Thread.currentThread() == this; 1020 setState(YIELDING); 1021 boolean yielded = false; 1022 try { 1023 yielded = yieldContinuation(); // may throw 1024 } finally { 1025 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 1026 if (!yielded) { 1027 assert state() == YIELDING; 1028 setState(RUNNING); 1029 } 1030 } 1031 } 1032 1033 /** 1034 * Sleep the current thread for the given sleep time (in nanoseconds). If 1035 * nanos is 0 then the thread will attempt to yield. 1036 * 1037 * @implNote This implementation parks the thread for the given sleeping time 1038 * and will therefore be observed in PARKED state during the sleep. Parking 1039 * will consume the parking permit so this method makes available the parking 1040 * permit after the sleep. This may be observed as a spurious, but benign, 1041 * wakeup when the thread subsequently attempts to park. 1042 * 1043 * @param nanos the maximum number of nanoseconds to sleep 1044 * @throws InterruptedException if interrupted while sleeping 1045 */ 1046 void sleepNanos(long nanos) throws InterruptedException { 1047 assert Thread.currentThread() == this && nanos >= 0; 1048 if (getAndClearInterrupt()) 1049 throw new InterruptedException(); 1050 if (nanos == 0) { 1051 tryYield(); 1052 } else { 1053 // park for the sleep time 1054 try { 1055 long remainingNanos = nanos; 1056 long startNanos = System.nanoTime(); 1057 while (remainingNanos > 0) { 1058 parkNanos(remainingNanos); 1059 if (getAndClearInterrupt()) { 1060 throw new InterruptedException(); 1061 } 1062 remainingNanos = nanos - (System.nanoTime() - startNanos); 1063 } 1064 } finally { 1065 // may have been unparked while sleeping 1066 setParkPermit(true); 1067 } 1068 } 1069 } 1070 1071 /** 1072 * Waits up to {@code nanos} nanoseconds for this virtual thread to terminate. 1073 * A timeout of {@code 0} means to wait forever. 1074 * 1075 * @throws InterruptedException if interrupted while waiting 1076 * @return true if the thread has terminated 1077 */ 1078 boolean joinNanos(long nanos) throws InterruptedException { 1079 if (state() == TERMINATED) 1080 return true; 1081 1082 // ensure termination object exists, then re-check state 1083 CountDownLatch termination = getTermination(); 1084 if (state() == TERMINATED) 1085 return true; 1086 1087 // wait for virtual thread to terminate 1088 if (nanos == 0) { 1089 termination.await(); 1090 } else { 1091 boolean terminated = termination.await(nanos, NANOSECONDS); 1092 if (!terminated) { 1093 // waiting time elapsed 1094 return false; 1095 } 1096 } 1097 assert state() == TERMINATED; 1098 return true; 1099 } 1100 1101 @Override 1102 void blockedOn(Interruptible b) { 1103 disableSuspendAndPreempt(); 1104 try { 1105 super.blockedOn(b); 1106 } finally { 1107 enableSuspendAndPreempt(); 1108 } 1109 } 1110 1111 @Override 1112 public void interrupt() { 1113 if (Thread.currentThread() != this) { 1114 // if current thread is a virtual thread then prevent it from being 1115 // suspended or unmounted when entering or holding interruptLock 1116 Interruptible blocker; 1117 disableSuspendAndPreempt(); 1118 try { 1119 synchronized (interruptLock) { 1120 interrupted = true; 1121 blocker = nioBlocker(); 1122 if (blocker != null) { 1123 blocker.interrupt(this); 1124 } 1125 1126 // interrupt carrier thread if mounted 1127 Thread carrier = carrierThread; 1128 if (carrier != null) carrier.setInterrupt(); 1129 } 1130 } finally { 1131 enableSuspendAndPreempt(); 1132 } 1133 1134 // notify blocker after releasing interruptLock 1135 if (blocker != null) { 1136 blocker.postInterrupt(); 1137 } 1138 1139 // make available parking permit, unpark thread if parked 1140 unpark(); 1141 1142 // if thread is waiting in Object.wait then schedule to try to reenter 1143 int s = state(); 1144 if ((s == WAIT || s == TIMED_WAIT) && compareAndSetState(s, UNBLOCKED)) { 1145 submitRunContinuation(); 1146 } 1147 1148 } else { 1149 interrupted = true; 1150 carrierThread.setInterrupt(); 1151 setParkPermit(true); 1152 } 1153 } 1154 1155 @Override 1156 public boolean isInterrupted() { 1157 return interrupted; 1158 } 1159 1160 @Override 1161 boolean getAndClearInterrupt() { 1162 assert Thread.currentThread() == this; 1163 boolean oldValue = interrupted; 1164 if (oldValue) { 1165 disableSuspendAndPreempt(); 1166 try { 1167 synchronized (interruptLock) { 1168 interrupted = false; 1169 carrierThread.clearInterrupt(); 1170 } 1171 } finally { 1172 enableSuspendAndPreempt(); 1173 } 1174 } 1175 return oldValue; 1176 } 1177 1178 @Override 1179 Thread.State threadState() { 1180 int s = state(); 1181 switch (s & ~SUSPENDED) { 1182 case NEW: 1183 return Thread.State.NEW; 1184 case STARTED: 1185 // return NEW if thread container not yet set 1186 if (threadContainer() == null) { 1187 return Thread.State.NEW; 1188 } else { 1189 return Thread.State.RUNNABLE; 1190 } 1191 case UNPARKED: 1192 case UNBLOCKED: 1193 case YIELDED: 1194 // runnable, not mounted 1195 return Thread.State.RUNNABLE; 1196 case RUNNING: 1197 // if mounted then return state of carrier thread 1198 if (Thread.currentThread() != this) { 1199 disableSuspendAndPreempt(); 1200 try { 1201 synchronized (carrierThreadAccessLock()) { 1202 Thread carrierThread = this.carrierThread; 1203 if (carrierThread != null) { 1204 return carrierThread.threadState(); 1205 } 1206 } 1207 } finally { 1208 enableSuspendAndPreempt(); 1209 } 1210 } 1211 // runnable, mounted 1212 return Thread.State.RUNNABLE; 1213 case PARKING: 1214 case TIMED_PARKING: 1215 case WAITING: 1216 case TIMED_WAITING: 1217 case YIELDING: 1218 // runnable, in transition 1219 return Thread.State.RUNNABLE; 1220 case PARKED: 1221 case PINNED: 1222 case WAIT: 1223 return Thread.State.WAITING; 1224 case TIMED_PARKED: 1225 case TIMED_PINNED: 1226 case TIMED_WAIT: 1227 return Thread.State.TIMED_WAITING; 1228 case BLOCKING: 1229 case BLOCKED: 1230 return Thread.State.BLOCKED; 1231 case TERMINATED: 1232 return Thread.State.TERMINATED; 1233 default: 1234 throw new InternalError(); 1235 } 1236 } 1237 1238 @Override 1239 boolean alive() { 1240 int s = state; 1241 return (s != NEW && s != TERMINATED); 1242 } 1243 1244 @Override 1245 boolean isTerminated() { 1246 return (state == TERMINATED); 1247 } 1248 1249 @Override 1250 StackTraceElement[] asyncGetStackTrace() { 1251 StackTraceElement[] stackTrace; 1252 do { 1253 stackTrace = (carrierThread != null) 1254 ? super.asyncGetStackTrace() // mounted 1255 : tryGetStackTrace(); // unmounted 1256 if (stackTrace == null) { 1257 Thread.yield(); 1258 } 1259 } while (stackTrace == null); 1260 return stackTrace; 1261 } 1262 1263 /** 1264 * Returns the stack trace for this virtual thread if it is unmounted. 1265 * Returns null if the thread is mounted or in transition. 1266 */ 1267 private StackTraceElement[] tryGetStackTrace() { 1268 int initialState = state() & ~SUSPENDED; 1269 switch (initialState) { 1270 case NEW, STARTED, TERMINATED -> { 1271 return new StackTraceElement[0]; // unmounted, empty stack 1272 } 1273 case RUNNING, PINNED, TIMED_PINNED -> { 1274 return null; // mounted 1275 } 1276 case PARKED, TIMED_PARKED, BLOCKED, WAIT, TIMED_WAIT -> { 1277 // unmounted, not runnable 1278 } 1279 case UNPARKED, UNBLOCKED, YIELDED -> { 1280 // unmounted, runnable 1281 } 1282 case PARKING, TIMED_PARKING, BLOCKING, YIELDING, WAITING, TIMED_WAITING -> { 1283 return null; // in transition 1284 } 1285 default -> throw new InternalError("" + initialState); 1286 } 1287 1288 // thread is unmounted, prevent it from continuing 1289 int suspendedState = initialState | SUSPENDED; 1290 if (!compareAndSetState(initialState, suspendedState)) { 1291 return null; 1292 } 1293 1294 // get stack trace and restore state 1295 StackTraceElement[] stack; 1296 try { 1297 stack = cont.getStackTrace(); 1298 } finally { 1299 assert state == suspendedState; 1300 setState(initialState); 1301 } 1302 boolean resubmit = switch (initialState) { 1303 case UNPARKED, UNBLOCKED, YIELDED -> { 1304 // resubmit as task may have run while suspended 1305 yield true; 1306 } 1307 case PARKED, TIMED_PARKED -> { 1308 // resubmit if unparked while suspended 1309 yield parkPermit && compareAndSetState(initialState, UNPARKED); 1310 } 1311 case BLOCKED -> { 1312 // resubmit if unblocked while suspended 1313 yield blockPermit && compareAndSetState(BLOCKED, UNBLOCKED); 1314 } 1315 case WAIT, TIMED_WAIT -> { 1316 // resubmit if notified or interrupted while waiting (Object.wait) 1317 // waitTimeoutExpired will retry if the timed expired when suspended 1318 yield (notified || interrupted) && compareAndSetState(initialState, UNBLOCKED); 1319 } 1320 default -> throw new InternalError(); 1321 }; 1322 if (resubmit) { 1323 submitRunContinuation(); 1324 } 1325 return stack; 1326 } 1327 1328 @Override 1329 public String toString() { 1330 StringBuilder sb = new StringBuilder("VirtualThread[#"); 1331 sb.append(threadId()); 1332 String name = getName(); 1333 if (!name.isEmpty()) { 1334 sb.append(","); 1335 sb.append(name); 1336 } 1337 sb.append("]/"); 1338 1339 // add the carrier state and thread name when mounted 1340 boolean mounted; 1341 if (Thread.currentThread() == this) { 1342 mounted = appendCarrierInfo(sb); 1343 } else { 1344 disableSuspendAndPreempt(); 1345 try { 1346 synchronized (carrierThreadAccessLock()) { 1347 mounted = appendCarrierInfo(sb); 1348 } 1349 } finally { 1350 enableSuspendAndPreempt(); 1351 } 1352 } 1353 1354 // add virtual thread state when not mounted 1355 if (!mounted) { 1356 String stateAsString = threadState().toString(); 1357 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1358 } 1359 1360 return sb.toString(); 1361 } 1362 1363 /** 1364 * Appends the carrier state and thread name to the string buffer if mounted. 1365 * @return true if mounted, false if not mounted 1366 */ 1367 private boolean appendCarrierInfo(StringBuilder sb) { 1368 assert Thread.currentThread() == this || Thread.holdsLock(carrierThreadAccessLock()); 1369 Thread carrier = carrierThread; 1370 if (carrier != null) { 1371 String stateAsString = carrier.threadState().toString(); 1372 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1373 sb.append('@'); 1374 sb.append(carrier.getName()); 1375 return true; 1376 } else { 1377 return false; 1378 } 1379 } 1380 1381 @Override 1382 public int hashCode() { 1383 return (int) threadId(); 1384 } 1385 1386 @Override 1387 public boolean equals(Object obj) { 1388 return obj == this; 1389 } 1390 1391 /** 1392 * Returns the termination object, creating it if needed. 1393 */ 1394 private CountDownLatch getTermination() { 1395 CountDownLatch termination = this.termination; 1396 if (termination == null) { 1397 termination = new CountDownLatch(1); 1398 if (!U.compareAndSetReference(this, TERMINATION, null, termination)) { 1399 termination = this.termination; 1400 } 1401 } 1402 return termination; 1403 } 1404 1405 /** 1406 * Returns the lock object to synchronize on when accessing carrierThread. 1407 * The lock prevents carrierThread from being reset to null during unmount. 1408 */ 1409 private Object carrierThreadAccessLock() { 1410 // return interruptLock as unmount has to coordinate with interrupt 1411 return interruptLock; 1412 } 1413 1414 /** 1415 * Returns a lock object for coordinating timed-wait setup and timeout handling. 1416 */ 1417 private Object timedWaitLock() { 1418 // use this object for now to avoid the overhead of introducing another lock 1419 return runContinuation; 1420 } 1421 1422 /** 1423 * Disallow the current thread be suspended or preempted. 1424 */ 1425 private void disableSuspendAndPreempt() { 1426 notifyJvmtiDisableSuspend(true); 1427 Continuation.pin(); 1428 } 1429 1430 /** 1431 * Allow the current thread be suspended or preempted. 1432 */ 1433 private void enableSuspendAndPreempt() { 1434 Continuation.unpin(); 1435 notifyJvmtiDisableSuspend(false); 1436 } 1437 1438 // -- wrappers for get/set of state, parking permit, and carrier thread -- 1439 1440 private int state() { 1441 return state; // volatile read 1442 } 1443 1444 private void setState(int newValue) { 1445 state = newValue; // volatile write 1446 } 1447 1448 private boolean compareAndSetState(int expectedValue, int newValue) { 1449 return U.compareAndSetInt(this, STATE, expectedValue, newValue); 1450 } 1451 1452 private boolean compareAndSetOnWaitingList(boolean expectedValue, boolean newValue) { 1453 return U.compareAndSetBoolean(this, ON_WAITING_LIST, expectedValue, newValue); 1454 } 1455 1456 private void setParkPermit(boolean newValue) { 1457 if (parkPermit != newValue) { 1458 parkPermit = newValue; 1459 } 1460 } 1461 1462 private boolean getAndSetParkPermit(boolean newValue) { 1463 if (parkPermit != newValue) { 1464 return U.getAndSetBoolean(this, PARK_PERMIT, newValue); 1465 } else { 1466 return newValue; 1467 } 1468 } 1469 1470 private void setCarrierThread(Thread carrier) { 1471 // U.putReferenceRelease(this, CARRIER_THREAD, carrier); 1472 this.carrierThread = carrier; 1473 } 1474 1475 // -- JVM TI support -- 1476 1477 @IntrinsicCandidate 1478 @JvmtiMountTransition 1479 private native void notifyJvmtiStart(); 1480 1481 @IntrinsicCandidate 1482 @JvmtiMountTransition 1483 private native void notifyJvmtiEnd(); 1484 1485 @IntrinsicCandidate 1486 @JvmtiMountTransition 1487 private native void notifyJvmtiMount(boolean hide); 1488 1489 @IntrinsicCandidate 1490 @JvmtiMountTransition 1491 private native void notifyJvmtiUnmount(boolean hide); 1492 1493 @IntrinsicCandidate 1494 private static native void notifyJvmtiDisableSuspend(boolean enter); 1495 1496 private static native void registerNatives(); 1497 static { 1498 registerNatives(); 1499 1500 // ensure VTHREAD_GROUP is created, may be accessed by JVMTI 1501 var group = Thread.virtualThreadGroup(); 1502 } 1503 1504 /** 1505 * Loads a java.util.concurrent.Executor with the given class name to use at the 1506 * default scheduler. The class is public in an exported package, has a public 1507 * no-arg constructor, and is visible to the system class loader. 1508 */ 1509 private static Executor createCustomDefaultScheduler(String cn) { 1510 try { 1511 Class<?> clazz = Class.forName(cn, true, ClassLoader.getSystemClassLoader()); 1512 Constructor<?> ctor = clazz.getConstructor(); 1513 var scheduler = (Executor) ctor.newInstance(); 1514 System.err.println(""" 1515 WARNING: Using custom default scheduler, this is an experimental feature!"""); 1516 return scheduler; 1517 } catch (Exception ex) { 1518 throw new Error(ex); 1519 } 1520 } 1521 1522 /** 1523 * Creates the default ForkJoinPool scheduler. 1524 */ 1525 private static ForkJoinPool createDefaultForkJoinPoolScheduler() { 1526 ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool); 1527 int parallelism, maxPoolSize, minRunnable; 1528 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism"); 1529 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize"); 1530 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable"); 1531 if (parallelismValue != null) { 1532 parallelism = Integer.parseInt(parallelismValue); 1533 } else { 1534 parallelism = Runtime.getRuntime().availableProcessors(); 1535 } 1536 if (maxPoolSizeValue != null) { 1537 maxPoolSize = Integer.parseInt(maxPoolSizeValue); 1538 parallelism = Integer.min(parallelism, maxPoolSize); 1539 } else { 1540 maxPoolSize = Integer.max(parallelism, 256); 1541 } 1542 if (minRunnableValue != null) { 1543 minRunnable = Integer.parseInt(minRunnableValue); 1544 } else { 1545 minRunnable = Integer.max(parallelism / 2, 1); 1546 } 1547 Thread.UncaughtExceptionHandler handler = (t, e) -> { }; 1548 boolean asyncMode = true; // FIFO 1549 return new ForkJoinPool(parallelism, factory, handler, asyncMode, 1550 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS); 1551 } 1552 1553 /** 1554 * Schedule a runnable task to run after a delay. 1555 */ 1556 private Future<?> schedule(Runnable command, long delay, TimeUnit unit) { 1557 if (scheduler instanceof ForkJoinPool pool) { 1558 return pool.schedule(command, delay, unit); 1559 } else { 1560 return DelayedTaskSchedulers.schedule(command, delay, unit); 1561 } 1562 } 1563 1564 /** 1565 * Supports scheduling a runnable task to run after a delay. It uses a number 1566 * of ScheduledThreadPoolExecutor instances to reduce contention on the delayed 1567 * work queue used. This class is used when using a custom scheduler. 1568 */ 1569 private static class DelayedTaskSchedulers { 1570 private static final ScheduledExecutorService[] INSTANCE = createDelayedTaskSchedulers(); 1571 1572 static Future<?> schedule(Runnable command, long delay, TimeUnit unit) { 1573 long tid = Thread.currentThread().threadId(); 1574 int index = (int) tid & (INSTANCE.length - 1); 1575 return INSTANCE[index].schedule(command, delay, unit); 1576 } 1577 1578 private static ScheduledExecutorService[] createDelayedTaskSchedulers() { 1579 String propName = "jdk.virtualThreadScheduler.timerQueues"; 1580 String propValue = System.getProperty(propName); 1581 int queueCount; 1582 if (propValue != null) { 1583 queueCount = Integer.parseInt(propValue); 1584 if (queueCount != Integer.highestOneBit(queueCount)) { 1585 throw new RuntimeException("Value of " + propName + " must be power of 2"); 1586 } 1587 } else { 1588 int ncpus = Runtime.getRuntime().availableProcessors(); 1589 queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1); 1590 } 1591 var schedulers = new ScheduledExecutorService[queueCount]; 1592 for (int i = 0; i < queueCount; i++) { 1593 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) 1594 Executors.newScheduledThreadPool(1, task -> { 1595 Thread t = InnocuousThread.newThread("VirtualThread-unparker", task); 1596 t.setDaemon(true); 1597 return t; 1598 }); 1599 stpe.setRemoveOnCancelPolicy(true); 1600 schedulers[i] = stpe; 1601 } 1602 return schedulers; 1603 } 1604 } 1605 1606 /** 1607 * Schedule virtual threads that are ready to be scheduled after they blocked on 1608 * monitor enter. 1609 */ 1610 private static void unblockVirtualThreads() { 1611 while (true) { 1612 VirtualThread vthread = takeVirtualThreadListToUnblock(); 1613 while (vthread != null) { 1614 assert vthread.onWaitingList; 1615 VirtualThread nextThread = vthread.next; 1616 1617 // remove from list and unblock 1618 vthread.next = null; 1619 boolean changed = vthread.compareAndSetOnWaitingList(true, false); 1620 assert changed; 1621 vthread.unblock(); 1622 1623 vthread = nextThread; 1624 } 1625 } 1626 } 1627 1628 /** 1629 * Retrieves the list of virtual threads that are waiting to be unblocked, waiting 1630 * if necessary until a list of one or more threads becomes available. 1631 */ 1632 private static native VirtualThread takeVirtualThreadListToUnblock(); 1633 1634 static { 1635 var unblocker = InnocuousThread.newThread("VirtualThread-unblocker", 1636 VirtualThread::unblockVirtualThreads); 1637 unblocker.setDaemon(true); 1638 unblocker.start(); 1639 } 1640 } --- EOF ---