1 /* 2 * Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.lang; 26 27 import java.lang.invoke.MethodHandles; 28 import java.lang.invoke.VarHandle; 29 import java.lang.reflect.Constructor; 30 import java.util.Locale; 31 import java.util.Objects; 32 import java.util.concurrent.CountDownLatch; 33 import java.util.concurrent.Executors; 34 import java.util.concurrent.ForkJoinPool; 35 import java.util.concurrent.ForkJoinTask; 36 import java.util.concurrent.Future; 37 import java.util.concurrent.RejectedExecutionException; 38 import java.util.concurrent.ScheduledExecutorService; 39 import java.util.concurrent.ScheduledThreadPoolExecutor; 40 import java.util.concurrent.TimeUnit; 41 import jdk.internal.event.VirtualThreadEndEvent; 42 import jdk.internal.event.VirtualThreadParkEvent; 43 import jdk.internal.event.VirtualThreadStartEvent; 44 import jdk.internal.event.VirtualThreadSubmitFailedEvent; 45 import jdk.internal.invoke.MhUtil; 46 import jdk.internal.misc.CarrierThread; 47 import jdk.internal.misc.InnocuousThread; 48 import jdk.internal.misc.Unsafe; 49 import jdk.internal.vm.Continuation; 50 import jdk.internal.vm.ContinuationScope; 51 import jdk.internal.vm.StackableScope; 52 import jdk.internal.vm.ThreadContainer; 53 import jdk.internal.vm.ThreadContainers; 54 import jdk.internal.vm.annotation.ChangesCurrentThread; 55 import jdk.internal.vm.annotation.Hidden; 56 import jdk.internal.vm.annotation.IntrinsicCandidate; 57 import jdk.internal.vm.annotation.JvmtiHideEvents; 58 import jdk.internal.vm.annotation.JvmtiMountTransition; 59 import jdk.internal.vm.annotation.ReservedStackAccess; 60 import sun.nio.ch.Interruptible; 61 import static java.util.concurrent.TimeUnit.*; 62 63 /** 64 * A thread that is scheduled by the Java virtual machine rather than the operating system. 65 */ 66 final class VirtualThread extends BaseVirtualThread { 67 private static final Unsafe U = Unsafe.getUnsafe(); 68 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads"); 69 70 private static final ForkJoinPool BUILTIN_SCHEDULER; 71 private static final VirtualThreadScheduler DEFAULT_SCHEDULER; 72 static { 73 // experimental 74 String propValue = System.getProperty("jdk.virtualThreadScheduler.implClass"); 75 if (propValue != null) { 76 var builtinScheduler = createBuiltinDefaultScheduler(true); 77 VirtualThreadScheduler defaultScheduler = builtinScheduler.externalView(); 78 for (String cn: propValue.split(",")) { 79 defaultScheduler = loadCustomScheduler(defaultScheduler, cn); 80 } 81 BUILTIN_SCHEDULER = builtinScheduler; 82 DEFAULT_SCHEDULER = defaultScheduler; 83 } else { 84 var builtinScheduler = createBuiltinDefaultScheduler(false); 85 BUILTIN_SCHEDULER = builtinScheduler; 86 DEFAULT_SCHEDULER = builtinScheduler; 87 } 88 } 89 90 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state"); 91 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit"); 92 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread"); 93 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination"); 94 private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList"); 95 96 // scheduler and continuation 97 private final VirtualThreadScheduler scheduler; 98 private final Continuation cont; 99 private final VirtualThreadTask runContinuation; 100 101 // virtual thread state, accessed by VM 102 private volatile int state; 103 104 /* 105 * Virtual thread state transitions: 106 * 107 * NEW -> STARTED // Thread.start, schedule to run 108 * STARTED -> TERMINATED // failed to start 109 * STARTED -> RUNNING // first run 110 * RUNNING -> TERMINATED // done 111 * 112 * RUNNING -> PARKING // Thread parking with LockSupport.park 113 * PARKING -> PARKED // cont.yield successful, parked indefinitely 114 * PARKING -> PINNED // cont.yield failed, parked indefinitely on carrier 115 * PARKED -> UNPARKED // unparked, may be scheduled to continue 116 * PINNED -> RUNNING // unparked, continue execution on same carrier 117 * UNPARKED -> RUNNING // continue execution after park 118 * 119 * RUNNING -> TIMED_PARKING // Thread parking with LockSupport.parkNanos 120 * TIMED_PARKING -> TIMED_PARKED // cont.yield successful, timed-parked 121 * TIMED_PARKING -> TIMED_PINNED // cont.yield failed, timed-parked on carrier 122 * TIMED_PARKED -> UNPARKED // unparked, may be scheduled to continue 123 * TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier 124 * 125 * RUNNING -> BLOCKING // blocking on monitor enter 126 * BLOCKING -> BLOCKED // blocked on monitor enter 127 * BLOCKED -> UNBLOCKED // unblocked, may be scheduled to continue 128 * UNBLOCKED -> RUNNING // continue execution after blocked on monitor enter 129 * 130 * RUNNING -> WAITING // transitional state during wait on monitor 131 * WAITING -> WAIT // waiting on monitor 132 * WAIT -> BLOCKED // notified, waiting to be unblocked by monitor owner 133 * WAIT -> UNBLOCKED // timed-out/interrupted 134 * 135 * RUNNING -> TIMED_WAITING // transition state during timed-waiting on monitor 136 * TIMED_WAITING -> TIMED_WAIT // timed-waiting on monitor 137 * TIMED_WAIT -> BLOCKED // notified, waiting to be unblocked by monitor owner 138 * TIMED_WAIT -> UNBLOCKED // timed-out/interrupted 139 * 140 * RUNNING -> YIELDING // Thread.yield 141 * YIELDING -> YIELDED // cont.yield successful, may be scheduled to continue 142 * YIELDING -> RUNNING // cont.yield failed 143 * YIELDED -> RUNNING // continue execution after Thread.yield 144 */ 145 private static final int NEW = 0; 146 private static final int STARTED = 1; 147 private static final int RUNNING = 2; // runnable-mounted 148 149 // untimed and timed parking 150 private static final int PARKING = 3; 151 private static final int PARKED = 4; // unmounted 152 private static final int PINNED = 5; // mounted 153 private static final int TIMED_PARKING = 6; 154 private static final int TIMED_PARKED = 7; // unmounted 155 private static final int TIMED_PINNED = 8; // mounted 156 private static final int UNPARKED = 9; // unmounted but runnable 157 158 // Thread.yield 159 private static final int YIELDING = 10; 160 private static final int YIELDED = 11; // unmounted but runnable 161 162 // monitor enter 163 private static final int BLOCKING = 12; 164 private static final int BLOCKED = 13; // unmounted 165 private static final int UNBLOCKED = 14; // unmounted but runnable 166 167 // monitor wait/timed-wait 168 private static final int WAITING = 15; 169 private static final int WAIT = 16; // waiting in Object.wait 170 private static final int TIMED_WAITING = 17; 171 private static final int TIMED_WAIT = 18; // waiting in timed-Object.wait 172 173 private static final int TERMINATED = 99; // final state 174 175 // can be suspended from scheduling when unmounted 176 private static final int SUSPENDED = 1 << 8; 177 178 // parking permit made available by LockSupport.unpark 179 private volatile boolean parkPermit; 180 181 // blocking permit made available by unblocker thread when another thread exits monitor 182 private volatile boolean blockPermit; 183 184 // true when on the list of virtual threads waiting to be unblocked 185 private volatile boolean onWaitingList; 186 187 // next virtual thread on the list of virtual threads waiting to be unblocked 188 private volatile VirtualThread next; 189 190 // notified by Object.notify/notifyAll while waiting in Object.wait 191 private volatile boolean notified; 192 193 // true when waiting in Object.wait, false for VM internal uninterruptible Object.wait 194 private volatile boolean interruptibleWait; 195 196 // timed-wait support 197 private byte timedWaitSeqNo; 198 199 // timeout for timed-park and timed-wait, only accessed on current/carrier thread 200 private long timeout; 201 202 // timer task for timed-park and timed-wait, only accessed on current/carrier thread 203 private Future<?> timeoutTask; 204 205 // carrier thread when mounted, accessed by VM 206 private volatile Thread carrierThread; 207 208 // termination object when joining, created lazily if needed 209 private volatile CountDownLatch termination; 210 211 /** 212 * Returns the default scheduler. 213 */ 214 static VirtualThreadScheduler defaultScheduler() { 215 return DEFAULT_SCHEDULER; 216 } 217 218 /** 219 * Returns true if using a custom default scheduler. 220 */ 221 static boolean isCustomDefaultScheduler() { 222 return DEFAULT_SCHEDULER != BUILTIN_SCHEDULER; 223 } 224 225 /** 226 * Returns the continuation scope used for virtual threads. 227 */ 228 static ContinuationScope continuationScope() { 229 return VTHREAD_SCOPE; 230 } 231 232 /** 233 * Return the scheduler for this thread. 234 * @param revealBuiltin true to reveal the built-in scheduler, false to hide 235 */ 236 VirtualThreadScheduler scheduler(boolean revealBuiltin) { 237 if (scheduler instanceof BuiltinDefaultScheduler builtin && !revealBuiltin) { 238 return builtin.externalView(); 239 } else { 240 return scheduler; 241 } 242 } 243 244 /** 245 * Creates a new {@code VirtualThread} to run the given task with the given scheduler. 246 * 247 * @param scheduler the scheduler or null for default scheduler 248 * @param name thread name 249 * @param characteristics characteristics 250 * @param task the task to execute 251 */ 252 VirtualThread(VirtualThreadScheduler scheduler, 253 String name, 254 int characteristics, 255 Runnable task) { 256 super(name, characteristics, /*bound*/ false); 257 Objects.requireNonNull(task); 258 259 // use default scheduler if not provided 260 if (scheduler == null) { 261 scheduler = DEFAULT_SCHEDULER; 262 } 263 264 this.scheduler = scheduler; 265 this.cont = new VThreadContinuation(this, task); 266 267 if (scheduler == BUILTIN_SCHEDULER) { 268 this.runContinuation = new BuiltinSchedulerTask(this); 269 } else { 270 this.runContinuation = new CustomSchedulerTask(this); 271 } 272 } 273 274 /** 275 * The task to execute when using the built-in scheduler. 276 */ 277 static final class BuiltinSchedulerTask implements VirtualThreadTask { 278 private final VirtualThread vthread; 279 BuiltinSchedulerTask(VirtualThread vthread) { 280 this.vthread = vthread; 281 } 282 @Override 283 public Object attach(Object att) { 284 throw new UnsupportedOperationException(); 285 } 286 @Override 287 public Object attachment() { 288 throw new UnsupportedOperationException(); 289 } 290 @Override 291 public Thread thread() { 292 return vthread; 293 } 294 @Override 295 public void run() { 296 vthread.runContinuation();; 297 } 298 } 299 300 /** 301 * The task to execute when using a custom scheduler. 302 */ 303 static final class CustomSchedulerTask implements VirtualThreadTask { 304 private static final VarHandle ATT = 305 MhUtil.findVarHandle(MethodHandles.lookup(), "att", Object.class); 306 private final VirtualThread vthread; 307 private volatile Object att; 308 CustomSchedulerTask(VirtualThread vthread) { 309 this.vthread = vthread; 310 } 311 @Override 312 public Object attach(Object att) { 313 return ATT.getAndSet(this, att); 314 } 315 @Override 316 public Object attachment() { 317 return att; 318 } 319 @Override 320 public Thread thread() { 321 return vthread; 322 } 323 @Override 324 public void run() { 325 vthread.runContinuation();; 326 } 327 } 328 329 /** 330 * The continuation that a virtual thread executes. 331 */ 332 private static class VThreadContinuation extends Continuation { 333 VThreadContinuation(VirtualThread vthread, Runnable task) { 334 super(VTHREAD_SCOPE, wrap(vthread, task)); 335 } 336 @Override 337 protected void onPinned(Continuation.Pinned reason) { 338 } 339 private static Runnable wrap(VirtualThread vthread, Runnable task) { 340 return new Runnable() { 341 @Hidden 342 @JvmtiHideEvents 343 public void run() { 344 vthread.notifyJvmtiStart(); // notify JVMTI 345 try { 346 vthread.run(task); 347 } finally { 348 vthread.notifyJvmtiEnd(); // notify JVMTI 349 } 350 } 351 }; 352 } 353 } 354 355 /** 356 * Runs or continues execution on the current thread. The virtual thread is mounted 357 * on the current thread before the task runs or continues. It unmounts when the 358 * task completes or yields. 359 */ 360 @ChangesCurrentThread // allow mount/unmount to be inlined 361 private void runContinuation() { 362 // the carrier must be a platform thread 363 if (Thread.currentThread().isVirtual()) { 364 throw new WrongThreadException(); 365 } 366 367 // set state to RUNNING 368 int initialState = state(); 369 if (initialState == STARTED || initialState == UNPARKED 370 || initialState == UNBLOCKED || initialState == YIELDED) { 371 // newly started or continue after parking/blocking/Thread.yield 372 if (!compareAndSetState(initialState, RUNNING)) { 373 return; 374 } 375 // consume permit when continuing after parking or blocking. If continue 376 // after a timed-park or timed-wait then the timeout task is cancelled. 377 if (initialState == UNPARKED) { 378 cancelTimeoutTask(); 379 setParkPermit(false); 380 } else if (initialState == UNBLOCKED) { 381 cancelTimeoutTask(); 382 blockPermit = false; 383 } 384 } else { 385 // not runnable 386 return; 387 } 388 389 mount(); 390 try { 391 cont.run(); 392 } finally { 393 unmount(); 394 if (cont.isDone()) { 395 afterDone(); 396 } else { 397 afterYield(); 398 } 399 } 400 } 401 402 /** 403 * Cancel timeout task when continuing after timed-park or timed-wait. 404 * The timeout task may be executing, or may have already completed. 405 */ 406 private void cancelTimeoutTask() { 407 if (timeoutTask != null) { 408 timeoutTask.cancel(false); 409 timeoutTask = null; 410 } 411 } 412 413 /** 414 * Submits the runContinuation task to the scheduler. For the built-in scheduler, 415 * the task will be pushed to the local queue if possible, otherwise it will be 416 * pushed to an external submission queue. 417 * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown 418 * @throws RejectedExecutionException 419 */ 420 private void submitRunContinuation(boolean retryOnOOME) { 421 boolean done = false; 422 while (!done) { 423 try { 424 // Pin the continuation to prevent the virtual thread from unmounting 425 // when submitting a task. For the default scheduler this ensures that 426 // the carrier doesn't change when pushing a task. For other schedulers 427 // it avoids deadlock that could arise due to carriers and virtual 428 // threads contending for a lock. 429 if (currentThread().isVirtual()) { 430 Continuation.pin(); 431 try { 432 scheduler.onContinue(runContinuation); 433 } finally { 434 Continuation.unpin(); 435 } 436 } else { 437 scheduler.onContinue(runContinuation); 438 } 439 done = true; 440 } catch (RejectedExecutionException ree) { 441 submitFailed(ree); 442 throw ree; 443 } catch (OutOfMemoryError e) { 444 if (retryOnOOME) { 445 U.park(false, 100_000_000); // 100ms 446 } else { 447 throw e; 448 } 449 } 450 } 451 } 452 453 /** 454 * Submits the runContinuation task to the scheduler. For the default scheduler, 455 * and calling it on a worker thread, the task will be pushed to the local queue, 456 * otherwise it will be pushed to an external submission queue. 457 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 458 * @throws RejectedExecutionException 459 */ 460 private void submitRunContinuation() { 461 submitRunContinuation(true); 462 } 463 464 /** 465 * Invoked from a carrier thread to lazy submit the runContinuation task to the 466 * carrier's local queue if the queue is empty. If not empty, or invoked by a thread 467 * for a custom scheduler, then it just submits the task to the scheduler. 468 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 469 * @throws RejectedExecutionException 470 * @see ForkJoinPool#lazySubmit(ForkJoinTask) 471 */ 472 private void lazySubmitRunContinuation() { 473 assert !currentThread().isVirtual(); 474 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 475 try { 476 ct.getPool().lazySubmit(ForkJoinTask.adapt(runContinuation)); 477 } catch (RejectedExecutionException ree) { 478 submitFailed(ree); 479 throw ree; 480 } catch (OutOfMemoryError e) { 481 submitRunContinuation(); 482 } 483 } else { 484 submitRunContinuation(); 485 } 486 } 487 488 /** 489 * Invoked from a carrier thread to externally submit the runContinuation task to the 490 * scheduler. If invoked by a thread for a custom scheduler, then it just submits the 491 * task to the scheduler. 492 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 493 * @throws RejectedExecutionException 494 * @see ForkJoinPool#externalSubmit(ForkJoinTask) 495 */ 496 private void externalSubmitRunContinuation() { 497 assert !currentThread().isVirtual(); 498 if (currentThread() instanceof CarrierThread ct) { 499 try { 500 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation)); 501 } catch (RejectedExecutionException ree) { 502 submitFailed(ree); 503 throw ree; 504 } catch (OutOfMemoryError e) { 505 submitRunContinuation(); 506 } 507 } else { 508 submitRunContinuation(); 509 } 510 } 511 512 /** 513 * Invoked from Thread.start to externally submit the runContinuation task to the 514 * scheduler. If this virtual thread is scheduled by the built-in scheduler, 515 * and this method is called from a virtual thread scheduled by the built-in 516 * scheduler, then it uses externalSubmit to ensure that the task is pushed to an 517 * external submission queue rather than the local queue. 518 * @throws RejectedExecutionException 519 * @throws OutOfMemoryError 520 * @see ForkJoinPool#externalSubmit(ForkJoinTask) 521 */ 522 private void externalSubmitRunContinuationOrThrow() { 523 try { 524 if (currentThread().isVirtual()) { 525 // Pin the continuation to prevent the virtual thread from unmounting 526 // when submitting a task. This avoids deadlock that could arise due to 527 // carriers and virtual threads contending for a lock. 528 Continuation.pin(); 529 try { 530 if (scheduler == BUILTIN_SCHEDULER 531 && currentCarrierThread() instanceof CarrierThread ct) { 532 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation)); 533 } else { 534 scheduler.onStart(runContinuation); 535 } 536 } finally { 537 Continuation.unpin(); 538 } 539 } else { 540 scheduler.onStart(runContinuation); 541 } 542 } catch (RejectedExecutionException ree) { 543 submitFailed(ree); 544 throw ree; 545 } 546 } 547 548 /** 549 * If enabled, emits a JFR VirtualThreadSubmitFailedEvent. 550 */ 551 private void submitFailed(RejectedExecutionException ree) { 552 var event = new VirtualThreadSubmitFailedEvent(); 553 if (event.isEnabled()) { 554 event.javaThreadId = threadId(); 555 event.exceptionMessage = ree.getMessage(); 556 event.commit(); 557 } 558 } 559 560 /** 561 * Runs a task in the context of this virtual thread. 562 */ 563 private void run(Runnable task) { 564 assert Thread.currentThread() == this && state == RUNNING; 565 566 // emit JFR event if enabled 567 if (VirtualThreadStartEvent.isTurnedOn()) { 568 var event = new VirtualThreadStartEvent(); 569 event.javaThreadId = threadId(); 570 event.commit(); 571 } 572 573 Object bindings = Thread.scopedValueBindings(); 574 try { 575 runWith(bindings, task); 576 } catch (Throwable exc) { 577 dispatchUncaughtException(exc); 578 } finally { 579 // pop any remaining scopes from the stack, this may block 580 StackableScope.popAll(); 581 582 // emit JFR event if enabled 583 if (VirtualThreadEndEvent.isTurnedOn()) { 584 var event = new VirtualThreadEndEvent(); 585 event.javaThreadId = threadId(); 586 event.commit(); 587 } 588 } 589 } 590 591 /** 592 * Mounts this virtual thread onto the current platform thread. On 593 * return, the current thread is the virtual thread. 594 */ 595 @ChangesCurrentThread 596 @ReservedStackAccess 597 private void mount() { 598 // notify JVMTI before mount 599 notifyJvmtiMount(/*hide*/true); 600 601 // sets the carrier thread 602 Thread carrier = Thread.currentCarrierThread(); 603 setCarrierThread(carrier); 604 605 // sync up carrier thread interrupted status if needed 606 if (interrupted) { 607 carrier.setInterrupt(); 608 } else if (carrier.isInterrupted()) { 609 synchronized (interruptLock) { 610 // need to recheck interrupted status 611 if (!interrupted) { 612 carrier.clearInterrupt(); 613 } 614 } 615 } 616 617 // set Thread.currentThread() to return this virtual thread 618 carrier.setCurrentThread(this); 619 } 620 621 /** 622 * Unmounts this virtual thread from the carrier. On return, the 623 * current thread is the current platform thread. 624 */ 625 @ChangesCurrentThread 626 @ReservedStackAccess 627 private void unmount() { 628 assert !Thread.holdsLock(interruptLock); 629 630 // set Thread.currentThread() to return the platform thread 631 Thread carrier = this.carrierThread; 632 carrier.setCurrentThread(carrier); 633 634 // break connection to carrier thread, synchronized with interrupt 635 synchronized (interruptLock) { 636 setCarrierThread(null); 637 } 638 carrier.clearInterrupt(); 639 640 // notify JVMTI after unmount 641 notifyJvmtiUnmount(/*hide*/false); 642 } 643 644 /** 645 * Invokes Continuation.yield, notifying JVMTI (if enabled) to hide frames until 646 * the continuation continues. 647 */ 648 @Hidden 649 private boolean yieldContinuation() { 650 notifyJvmtiUnmount(/*hide*/true); 651 try { 652 return Continuation.yield(VTHREAD_SCOPE); 653 } finally { 654 notifyJvmtiMount(/*hide*/false); 655 } 656 } 657 658 /** 659 * Invoked in the context of the carrier thread after the Continuation yields when 660 * parking, blocking on monitor enter, Object.wait, or Thread.yield. 661 */ 662 private void afterYield() { 663 assert carrierThread == null; 664 665 // re-adjust parallelism if the virtual thread yielded when compensating 666 if (currentThread() instanceof CarrierThread ct) { 667 ct.endBlocking(); 668 } 669 670 int s = state(); 671 672 // LockSupport.park/parkNanos 673 if (s == PARKING || s == TIMED_PARKING) { 674 int newState; 675 if (s == PARKING) { 676 setState(newState = PARKED); 677 } else { 678 // schedule unpark 679 long timeout = this.timeout; 680 assert timeout > 0; 681 timeoutTask = schedule(this::parkTimeoutExpired, timeout, NANOSECONDS); 682 setState(newState = TIMED_PARKED); 683 } 684 685 // may have been unparked while parking 686 if (parkPermit && compareAndSetState(newState, UNPARKED)) { 687 // lazy submit if local queue is empty 688 lazySubmitRunContinuation(); 689 } 690 return; 691 } 692 693 // Thread.yield 694 if (s == YIELDING) { 695 setState(YIELDED); 696 697 // external submit if there are no tasks in the local task queue 698 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 699 externalSubmitRunContinuation(); 700 } else { 701 submitRunContinuation(); 702 } 703 return; 704 } 705 706 // blocking on monitorenter 707 if (s == BLOCKING) { 708 setState(BLOCKED); 709 710 // may have been unblocked while blocking 711 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) { 712 // lazy submit if local queue is empty 713 lazySubmitRunContinuation(); 714 } 715 return; 716 } 717 718 // Object.wait 719 if (s == WAITING || s == TIMED_WAITING) { 720 int newState; 721 boolean interruptible = interruptibleWait; 722 if (s == WAITING) { 723 setState(newState = WAIT); 724 } else { 725 // For timed-wait, a timeout task is scheduled to execute. The timeout 726 // task will change the thread state to UNBLOCKED and submit the thread 727 // to the scheduler. A sequence number is used to ensure that the timeout 728 // task only unblocks the thread for this timed-wait. We synchronize with 729 // the timeout task to coordinate access to the sequence number and to 730 // ensure the timeout task doesn't execute until the thread has got to 731 // the TIMED_WAIT state. 732 long timeout = this.timeout; 733 assert timeout > 0; 734 synchronized (timedWaitLock()) { 735 byte seqNo = ++timedWaitSeqNo; 736 timeoutTask = schedule(() -> waitTimeoutExpired(seqNo), timeout, MILLISECONDS); 737 setState(newState = TIMED_WAIT); 738 } 739 } 740 741 // may have been notified while in transition to wait state 742 if (notified && compareAndSetState(newState, BLOCKED)) { 743 // may have even been unblocked already 744 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) { 745 submitRunContinuation(); 746 } 747 return; 748 } 749 750 // may have been interrupted while in transition to wait state 751 if (interruptible && interrupted && compareAndSetState(newState, UNBLOCKED)) { 752 submitRunContinuation(); 753 return; 754 } 755 return; 756 } 757 758 assert false; 759 } 760 761 /** 762 * Invoked after the continuation completes. 763 */ 764 private void afterDone() { 765 afterDone(true); 766 } 767 768 /** 769 * Invoked after the continuation completes (or start failed). Sets the thread 770 * state to TERMINATED and notifies anyone waiting for the thread to terminate. 771 * 772 * @param notifyContainer true if its container should be notified 773 */ 774 private void afterDone(boolean notifyContainer) { 775 assert carrierThread == null; 776 setState(TERMINATED); 777 778 // notify anyone waiting for this virtual thread to terminate 779 CountDownLatch termination = this.termination; 780 if (termination != null) { 781 assert termination.getCount() == 1; 782 termination.countDown(); 783 } 784 785 // notify container 786 if (notifyContainer) { 787 threadContainer().remove(this); 788 } 789 790 // clear references to thread locals 791 clearReferences(); 792 } 793 794 /** 795 * Schedules this {@code VirtualThread} to execute. 796 * 797 * @throws IllegalStateException if the container is shutdown or closed 798 * @throws IllegalThreadStateException if the thread has already been started 799 * @throws RejectedExecutionException if the scheduler cannot accept a task 800 */ 801 @Override 802 void start(ThreadContainer container) { 803 if (!compareAndSetState(NEW, STARTED)) { 804 throw new IllegalThreadStateException("Already started"); 805 } 806 807 // bind thread to container 808 assert threadContainer() == null; 809 setThreadContainer(container); 810 811 // start thread 812 boolean addedToContainer = false; 813 boolean started = false; 814 try { 815 container.add(this); // may throw 816 addedToContainer = true; 817 818 // scoped values may be inherited 819 inheritScopedValueBindings(container); 820 821 // submit task to run thread, using externalSubmit if possible 822 externalSubmitRunContinuationOrThrow(); 823 started = true; 824 } finally { 825 if (!started) { 826 afterDone(addedToContainer); 827 } 828 } 829 } 830 831 @Override 832 public void start() { 833 start(ThreadContainers.root()); 834 } 835 836 @Override 837 public void run() { 838 // do nothing 839 } 840 841 /** 842 * Parks until unparked or interrupted. If already unparked then the parking 843 * permit is consumed and this method completes immediately (meaning it doesn't 844 * yield). It also completes immediately if the interrupted status is set. 845 */ 846 @Override 847 void park() { 848 assert Thread.currentThread() == this; 849 850 // complete immediately if parking permit available or interrupted 851 if (getAndSetParkPermit(false) || interrupted) 852 return; 853 854 // park the thread 855 boolean yielded = false; 856 long eventStartTime = VirtualThreadParkEvent.eventStartTime(); 857 setState(PARKING); 858 try { 859 yielded = yieldContinuation(); 860 } catch (OutOfMemoryError e) { 861 // park on carrier 862 } finally { 863 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 864 if (yielded) { 865 VirtualThreadParkEvent.offer(eventStartTime, Long.MIN_VALUE); 866 } else { 867 assert state() == PARKING; 868 setState(RUNNING); 869 } 870 } 871 872 // park on the carrier thread when pinned 873 if (!yielded) { 874 parkOnCarrierThread(false, 0); 875 } 876 } 877 878 /** 879 * Parks up to the given waiting time or until unparked or interrupted. 880 * If already unparked then the parking permit is consumed and this method 881 * completes immediately (meaning it doesn't yield). It also completes immediately 882 * if the interrupted status is set or the waiting time is {@code <= 0}. 883 * 884 * @param nanos the maximum number of nanoseconds to wait. 885 */ 886 @Override 887 void parkNanos(long nanos) { 888 assert Thread.currentThread() == this; 889 890 // complete immediately if parking permit available or interrupted 891 if (getAndSetParkPermit(false) || interrupted) 892 return; 893 894 // park the thread for the waiting time 895 if (nanos > 0) { 896 long startTime = System.nanoTime(); 897 898 // park the thread, afterYield will schedule the thread to unpark 899 boolean yielded = false; 900 long eventStartTime = VirtualThreadParkEvent.eventStartTime(); 901 timeout = nanos; 902 setState(TIMED_PARKING); 903 try { 904 yielded = yieldContinuation(); 905 } catch (OutOfMemoryError e) { 906 // park on carrier 907 } finally { 908 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 909 if (yielded) { 910 VirtualThreadParkEvent.offer(eventStartTime, nanos); 911 } else { 912 assert state() == TIMED_PARKING; 913 setState(RUNNING); 914 } 915 } 916 917 // park on carrier thread for remaining time when pinned (or OOME) 918 if (!yielded) { 919 long remainingNanos = nanos - (System.nanoTime() - startTime); 920 parkOnCarrierThread(true, remainingNanos); 921 } 922 } 923 } 924 925 /** 926 * Parks the current carrier thread up to the given waiting time or until 927 * unparked or interrupted. If the virtual thread is interrupted then the 928 * interrupted status will be propagated to the carrier thread. 929 * @param timed true for a timed park, false for untimed 930 * @param nanos the waiting time in nanoseconds 931 */ 932 private void parkOnCarrierThread(boolean timed, long nanos) { 933 assert state() == RUNNING; 934 935 setState(timed ? TIMED_PINNED : PINNED); 936 try { 937 if (!parkPermit) { 938 if (!timed) { 939 U.park(false, 0); 940 } else if (nanos > 0) { 941 U.park(false, nanos); 942 } 943 } 944 } finally { 945 setState(RUNNING); 946 } 947 948 // consume parking permit 949 setParkPermit(false); 950 951 // JFR jdk.VirtualThreadPinned event 952 postPinnedEvent("LockSupport.park"); 953 } 954 955 /** 956 * Call into VM when pinned to record a JFR jdk.VirtualThreadPinned event. 957 * Recording the event in the VM avoids having JFR event recorded in Java 958 * with the same name, but different ID, to events recorded by the VM. 959 */ 960 @Hidden 961 private static native void postPinnedEvent(String op); 962 963 /** 964 * Re-enables this virtual thread for scheduling. If this virtual thread is parked 965 * then its task is scheduled to continue, otherwise its next call to {@code park} or 966 * {@linkplain #parkNanos(long) parkNanos} is guaranteed not to block. 967 * @throws RejectedExecutionException if the scheduler cannot accept a task 968 */ 969 @Override 970 void unpark() { 971 if (!getAndSetParkPermit(true) && currentThread() != this) { 972 int s = state(); 973 974 // unparked while parked 975 if ((s == PARKED || s == TIMED_PARKED) && compareAndSetState(s, UNPARKED)) { 976 submitRunContinuation(); 977 return; 978 } 979 980 // unparked while parked when pinned 981 if (s == PINNED || s == TIMED_PINNED) { 982 // unpark carrier thread when pinned 983 disableSuspendAndPreempt(); 984 try { 985 synchronized (carrierThreadAccessLock()) { 986 Thread carrier = carrierThread; 987 if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) { 988 U.unpark(carrier); 989 } 990 } 991 } finally { 992 enableSuspendAndPreempt(); 993 } 994 return; 995 } 996 } 997 } 998 999 /** 1000 * Invoked by unblocker thread to unblock this virtual thread. 1001 */ 1002 private void unblock() { 1003 assert !Thread.currentThread().isVirtual(); 1004 blockPermit = true; 1005 if (state() == BLOCKED && compareAndSetState(BLOCKED, UNBLOCKED)) { 1006 submitRunContinuation(); 1007 } 1008 } 1009 1010 /** 1011 * Invoked by FJP worker thread or STPE thread when park timeout expires. 1012 */ 1013 private void parkTimeoutExpired() { 1014 assert !VirtualThread.currentThread().isVirtual(); 1015 if (!getAndSetParkPermit(true) 1016 && (state() == TIMED_PARKED) 1017 && compareAndSetState(TIMED_PARKED, UNPARKED)) { 1018 lazySubmitRunContinuation(); 1019 } 1020 } 1021 1022 /** 1023 * Invoked by FJP worker thread or STPE thread when wait timeout expires. 1024 * If the virtual thread is in timed-wait then this method will unblock the thread 1025 * and submit its task so that it continues and attempts to reenter the monitor. 1026 * This method does nothing if the thread has been woken by notify or interrupt. 1027 */ 1028 private void waitTimeoutExpired(byte seqNo) { 1029 assert !Thread.currentThread().isVirtual(); 1030 for (;;) { 1031 boolean unblocked = false; 1032 synchronized (timedWaitLock()) { 1033 if (seqNo != timedWaitSeqNo) { 1034 // this timeout task is for a past timed-wait 1035 return; 1036 } 1037 int s = state(); 1038 if (s == TIMED_WAIT) { 1039 unblocked = compareAndSetState(TIMED_WAIT, UNBLOCKED); 1040 } else if (s != (TIMED_WAIT | SUSPENDED)) { 1041 // notified or interrupted, no longer waiting 1042 return; 1043 } 1044 } 1045 if (unblocked) { 1046 lazySubmitRunContinuation(); 1047 return; 1048 } 1049 // need to retry when thread is suspended in time-wait 1050 Thread.yield(); 1051 } 1052 } 1053 1054 /** 1055 * Attempts to yield the current virtual thread (Thread.yield). 1056 */ 1057 void tryYield() { 1058 assert Thread.currentThread() == this; 1059 setState(YIELDING); 1060 boolean yielded = false; 1061 try { 1062 yielded = yieldContinuation(); // may throw 1063 } finally { 1064 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 1065 if (!yielded) { 1066 assert state() == YIELDING; 1067 setState(RUNNING); 1068 } 1069 } 1070 } 1071 1072 /** 1073 * Sleep the current thread for the given sleep time (in nanoseconds). If 1074 * nanos is 0 then the thread will attempt to yield. 1075 * 1076 * @implNote This implementation parks the thread for the given sleeping time 1077 * and will therefore be observed in PARKED state during the sleep. Parking 1078 * will consume the parking permit so this method makes available the parking 1079 * permit after the sleep. This may be observed as a spurious, but benign, 1080 * wakeup when the thread subsequently attempts to park. 1081 * 1082 * @param nanos the maximum number of nanoseconds to sleep 1083 * @throws InterruptedException if interrupted while sleeping 1084 */ 1085 void sleepNanos(long nanos) throws InterruptedException { 1086 assert Thread.currentThread() == this && nanos >= 0; 1087 if (getAndClearInterrupt()) 1088 throw new InterruptedException(); 1089 if (nanos == 0) { 1090 tryYield(); 1091 } else { 1092 // park for the sleep time 1093 try { 1094 long remainingNanos = nanos; 1095 long startNanos = System.nanoTime(); 1096 while (remainingNanos > 0) { 1097 parkNanos(remainingNanos); 1098 if (getAndClearInterrupt()) { 1099 throw new InterruptedException(); 1100 } 1101 remainingNanos = nanos - (System.nanoTime() - startNanos); 1102 } 1103 } finally { 1104 // may have been unparked while sleeping 1105 setParkPermit(true); 1106 } 1107 } 1108 } 1109 1110 /** 1111 * Waits up to {@code nanos} nanoseconds for this virtual thread to terminate. 1112 * A timeout of {@code 0} means to wait forever. 1113 * 1114 * @throws InterruptedException if interrupted while waiting 1115 * @return true if the thread has terminated 1116 */ 1117 boolean joinNanos(long nanos) throws InterruptedException { 1118 if (state() == TERMINATED) 1119 return true; 1120 1121 // ensure termination object exists, then re-check state 1122 CountDownLatch termination = getTermination(); 1123 if (state() == TERMINATED) 1124 return true; 1125 1126 // wait for virtual thread to terminate 1127 if (nanos == 0) { 1128 termination.await(); 1129 } else { 1130 boolean terminated = termination.await(nanos, NANOSECONDS); 1131 if (!terminated) { 1132 // waiting time elapsed 1133 return false; 1134 } 1135 } 1136 assert state() == TERMINATED; 1137 return true; 1138 } 1139 1140 @Override 1141 void blockedOn(Interruptible b) { 1142 disableSuspendAndPreempt(); 1143 try { 1144 super.blockedOn(b); 1145 } finally { 1146 enableSuspendAndPreempt(); 1147 } 1148 } 1149 1150 @Override 1151 public void interrupt() { 1152 if (Thread.currentThread() != this) { 1153 // if current thread is a virtual thread then prevent it from being 1154 // suspended or unmounted when entering or holding interruptLock 1155 Interruptible blocker; 1156 disableSuspendAndPreempt(); 1157 try { 1158 synchronized (interruptLock) { 1159 interrupted = true; 1160 blocker = nioBlocker(); 1161 if (blocker != null) { 1162 blocker.interrupt(this); 1163 } 1164 1165 // interrupt carrier thread if mounted 1166 Thread carrier = carrierThread; 1167 if (carrier != null) carrier.setInterrupt(); 1168 } 1169 } finally { 1170 enableSuspendAndPreempt(); 1171 } 1172 1173 // notify blocker after releasing interruptLock 1174 if (blocker != null) { 1175 blocker.postInterrupt(); 1176 } 1177 1178 // make available parking permit, unpark thread if parked 1179 unpark(); 1180 1181 // if thread is waiting in Object.wait then schedule to try to reenter 1182 int s = state(); 1183 if ((s == WAIT || s == TIMED_WAIT) && compareAndSetState(s, UNBLOCKED)) { 1184 submitRunContinuation(); 1185 } 1186 1187 } else { 1188 interrupted = true; 1189 carrierThread.setInterrupt(); 1190 setParkPermit(true); 1191 } 1192 } 1193 1194 @Override 1195 public boolean isInterrupted() { 1196 return interrupted; 1197 } 1198 1199 @Override 1200 boolean getAndClearInterrupt() { 1201 assert Thread.currentThread() == this; 1202 boolean oldValue = interrupted; 1203 if (oldValue) { 1204 disableSuspendAndPreempt(); 1205 try { 1206 synchronized (interruptLock) { 1207 interrupted = false; 1208 carrierThread.clearInterrupt(); 1209 } 1210 } finally { 1211 enableSuspendAndPreempt(); 1212 } 1213 } 1214 return oldValue; 1215 } 1216 1217 @Override 1218 Thread.State threadState() { 1219 int s = state(); 1220 switch (s & ~SUSPENDED) { 1221 case NEW: 1222 return Thread.State.NEW; 1223 case STARTED: 1224 // return NEW if thread container not yet set 1225 if (threadContainer() == null) { 1226 return Thread.State.NEW; 1227 } else { 1228 return Thread.State.RUNNABLE; 1229 } 1230 case UNPARKED: 1231 case UNBLOCKED: 1232 case YIELDED: 1233 // runnable, not mounted 1234 return Thread.State.RUNNABLE; 1235 case RUNNING: 1236 // if mounted then return state of carrier thread 1237 if (Thread.currentThread() != this) { 1238 disableSuspendAndPreempt(); 1239 try { 1240 synchronized (carrierThreadAccessLock()) { 1241 Thread carrierThread = this.carrierThread; 1242 if (carrierThread != null) { 1243 return carrierThread.threadState(); 1244 } 1245 } 1246 } finally { 1247 enableSuspendAndPreempt(); 1248 } 1249 } 1250 // runnable, mounted 1251 return Thread.State.RUNNABLE; 1252 case PARKING: 1253 case TIMED_PARKING: 1254 case WAITING: 1255 case TIMED_WAITING: 1256 case YIELDING: 1257 // runnable, in transition 1258 return Thread.State.RUNNABLE; 1259 case PARKED: 1260 case PINNED: 1261 case WAIT: 1262 return Thread.State.WAITING; 1263 case TIMED_PARKED: 1264 case TIMED_PINNED: 1265 case TIMED_WAIT: 1266 return Thread.State.TIMED_WAITING; 1267 case BLOCKING: 1268 case BLOCKED: 1269 return Thread.State.BLOCKED; 1270 case TERMINATED: 1271 return Thread.State.TERMINATED; 1272 default: 1273 throw new InternalError(); 1274 } 1275 } 1276 1277 @Override 1278 boolean alive() { 1279 int s = state; 1280 return (s != NEW && s != TERMINATED); 1281 } 1282 1283 @Override 1284 boolean isTerminated() { 1285 return (state == TERMINATED); 1286 } 1287 1288 @Override 1289 StackTraceElement[] asyncGetStackTrace() { 1290 StackTraceElement[] stackTrace; 1291 do { 1292 stackTrace = (carrierThread != null) 1293 ? super.asyncGetStackTrace() // mounted 1294 : tryGetStackTrace(); // unmounted 1295 if (stackTrace == null) { 1296 Thread.yield(); 1297 } 1298 } while (stackTrace == null); 1299 return stackTrace; 1300 } 1301 1302 /** 1303 * Returns the stack trace for this virtual thread if it is unmounted. 1304 * Returns null if the thread is mounted or in transition. 1305 */ 1306 private StackTraceElement[] tryGetStackTrace() { 1307 int initialState = state() & ~SUSPENDED; 1308 switch (initialState) { 1309 case NEW, STARTED, TERMINATED -> { 1310 return new StackTraceElement[0]; // unmounted, empty stack 1311 } 1312 case RUNNING, PINNED, TIMED_PINNED -> { 1313 return null; // mounted 1314 } 1315 case PARKED, TIMED_PARKED, BLOCKED, WAIT, TIMED_WAIT -> { 1316 // unmounted, not runnable 1317 } 1318 case UNPARKED, UNBLOCKED, YIELDED -> { 1319 // unmounted, runnable 1320 } 1321 case PARKING, TIMED_PARKING, BLOCKING, YIELDING, WAITING, TIMED_WAITING -> { 1322 return null; // in transition 1323 } 1324 default -> throw new InternalError("" + initialState); 1325 } 1326 1327 // thread is unmounted, prevent it from continuing 1328 int suspendedState = initialState | SUSPENDED; 1329 if (!compareAndSetState(initialState, suspendedState)) { 1330 return null; 1331 } 1332 1333 // get stack trace and restore state 1334 StackTraceElement[] stack; 1335 try { 1336 stack = cont.getStackTrace(); 1337 } finally { 1338 assert state == suspendedState; 1339 setState(initialState); 1340 } 1341 boolean resubmit = switch (initialState) { 1342 case UNPARKED, UNBLOCKED, YIELDED -> { 1343 // resubmit as task may have run while suspended 1344 yield true; 1345 } 1346 case PARKED, TIMED_PARKED -> { 1347 // resubmit if unparked while suspended 1348 yield parkPermit && compareAndSetState(initialState, UNPARKED); 1349 } 1350 case BLOCKED -> { 1351 // resubmit if unblocked while suspended 1352 yield blockPermit && compareAndSetState(BLOCKED, UNBLOCKED); 1353 } 1354 case WAIT, TIMED_WAIT -> { 1355 // resubmit if notified or interrupted while waiting (Object.wait) 1356 // waitTimeoutExpired will retry if the timed expired when suspended 1357 yield (notified || interrupted) && compareAndSetState(initialState, UNBLOCKED); 1358 } 1359 default -> throw new InternalError(); 1360 }; 1361 if (resubmit) { 1362 submitRunContinuation(); 1363 } 1364 return stack; 1365 } 1366 1367 @Override 1368 public String toString() { 1369 StringBuilder sb = new StringBuilder("VirtualThread[#"); 1370 sb.append(threadId()); 1371 String name = getName(); 1372 if (!name.isEmpty()) { 1373 sb.append(","); 1374 sb.append(name); 1375 } 1376 sb.append("]/"); 1377 1378 // add the carrier state and thread name when mounted 1379 boolean mounted; 1380 if (Thread.currentThread() == this) { 1381 mounted = appendCarrierInfo(sb); 1382 } else { 1383 disableSuspendAndPreempt(); 1384 try { 1385 synchronized (carrierThreadAccessLock()) { 1386 mounted = appendCarrierInfo(sb); 1387 } 1388 } finally { 1389 enableSuspendAndPreempt(); 1390 } 1391 } 1392 1393 // add virtual thread state when not mounted 1394 if (!mounted) { 1395 String stateAsString = threadState().toString(); 1396 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1397 } 1398 1399 return sb.toString(); 1400 } 1401 1402 /** 1403 * Appends the carrier state and thread name to the string buffer if mounted. 1404 * @return true if mounted, false if not mounted 1405 */ 1406 private boolean appendCarrierInfo(StringBuilder sb) { 1407 assert Thread.currentThread() == this || Thread.holdsLock(carrierThreadAccessLock()); 1408 Thread carrier = carrierThread; 1409 if (carrier != null) { 1410 String stateAsString = carrier.threadState().toString(); 1411 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1412 sb.append('@'); 1413 sb.append(carrier.getName()); 1414 return true; 1415 } else { 1416 return false; 1417 } 1418 } 1419 1420 @Override 1421 public int hashCode() { 1422 return (int) threadId(); 1423 } 1424 1425 @Override 1426 public boolean equals(Object obj) { 1427 return obj == this; 1428 } 1429 1430 /** 1431 * Returns the termination object, creating it if needed. 1432 */ 1433 private CountDownLatch getTermination() { 1434 CountDownLatch termination = this.termination; 1435 if (termination == null) { 1436 termination = new CountDownLatch(1); 1437 if (!U.compareAndSetReference(this, TERMINATION, null, termination)) { 1438 termination = this.termination; 1439 } 1440 } 1441 return termination; 1442 } 1443 1444 /** 1445 * Returns the lock object to synchronize on when accessing carrierThread. 1446 * The lock prevents carrierThread from being reset to null during unmount. 1447 */ 1448 private Object carrierThreadAccessLock() { 1449 // return interruptLock as unmount has to coordinate with interrupt 1450 return interruptLock; 1451 } 1452 1453 /** 1454 * Returns a lock object for coordinating timed-wait setup and timeout handling. 1455 */ 1456 private Object timedWaitLock() { 1457 // use this object for now to avoid the overhead of introducing another lock 1458 return runContinuation; 1459 } 1460 1461 /** 1462 * Disallow the current thread be suspended or preempted. 1463 */ 1464 private void disableSuspendAndPreempt() { 1465 notifyJvmtiDisableSuspend(true); 1466 Continuation.pin(); 1467 } 1468 1469 /** 1470 * Allow the current thread be suspended or preempted. 1471 */ 1472 private void enableSuspendAndPreempt() { 1473 Continuation.unpin(); 1474 notifyJvmtiDisableSuspend(false); 1475 } 1476 1477 // -- wrappers for get/set of state, parking permit, and carrier thread -- 1478 1479 private int state() { 1480 return state; // volatile read 1481 } 1482 1483 private void setState(int newValue) { 1484 state = newValue; // volatile write 1485 } 1486 1487 private boolean compareAndSetState(int expectedValue, int newValue) { 1488 return U.compareAndSetInt(this, STATE, expectedValue, newValue); 1489 } 1490 1491 private boolean compareAndSetOnWaitingList(boolean expectedValue, boolean newValue) { 1492 return U.compareAndSetBoolean(this, ON_WAITING_LIST, expectedValue, newValue); 1493 } 1494 1495 private void setParkPermit(boolean newValue) { 1496 if (parkPermit != newValue) { 1497 parkPermit = newValue; 1498 } 1499 } 1500 1501 private boolean getAndSetParkPermit(boolean newValue) { 1502 if (parkPermit != newValue) { 1503 return U.getAndSetBoolean(this, PARK_PERMIT, newValue); 1504 } else { 1505 return newValue; 1506 } 1507 } 1508 1509 private void setCarrierThread(Thread carrier) { 1510 // U.putReferenceRelease(this, CARRIER_THREAD, carrier); 1511 this.carrierThread = carrier; 1512 } 1513 1514 // -- JVM TI support -- 1515 1516 @IntrinsicCandidate 1517 @JvmtiMountTransition 1518 private native void notifyJvmtiStart(); 1519 1520 @IntrinsicCandidate 1521 @JvmtiMountTransition 1522 private native void notifyJvmtiEnd(); 1523 1524 @IntrinsicCandidate 1525 @JvmtiMountTransition 1526 private native void notifyJvmtiMount(boolean hide); 1527 1528 @IntrinsicCandidate 1529 @JvmtiMountTransition 1530 private native void notifyJvmtiUnmount(boolean hide); 1531 1532 @IntrinsicCandidate 1533 private static native void notifyJvmtiDisableSuspend(boolean enter); 1534 1535 private static native void registerNatives(); 1536 static { 1537 registerNatives(); 1538 1539 // ensure VTHREAD_GROUP is created, may be accessed by JVMTI 1540 var group = Thread.virtualThreadGroup(); 1541 1542 // ensure event class is initialized 1543 try { 1544 MethodHandles.lookup().ensureInitialized(VirtualThreadParkEvent.class); 1545 } catch (IllegalAccessException e) { 1546 throw new ExceptionInInitializerError(e); 1547 } 1548 } 1549 1550 /** 1551 * Loads a VirtualThreadScheduler with the given class name. The class must be public 1552 * in an exported package, with public one-arg or no-arg constructor, and be visible 1553 * to the system class loader. 1554 * @param delegate the scheduler that the custom scheduler may delegate to 1555 * @param cn the class name of the custom scheduler 1556 */ 1557 private static VirtualThreadScheduler loadCustomScheduler(VirtualThreadScheduler delegate, String cn) { 1558 try { 1559 Class<?> clazz = Class.forName(cn, true, ClassLoader.getSystemClassLoader()); 1560 VirtualThreadScheduler scheduler; 1561 try { 1562 // 1-arg constructor 1563 Constructor<?> ctor = clazz.getConstructor(VirtualThreadScheduler.class); 1564 scheduler = (VirtualThreadScheduler) ctor.newInstance(delegate); 1565 } catch (NoSuchMethodException e) { 1566 // 0-arg constructor 1567 Constructor<?> ctor = clazz.getConstructor(); 1568 scheduler = (VirtualThreadScheduler) ctor.newInstance(); 1569 } 1570 System.err.println(""" 1571 WARNING: Using custom default scheduler, this is an experimental feature!"""); 1572 return scheduler; 1573 } catch (Exception ex) { 1574 throw new Error(ex); 1575 } 1576 } 1577 1578 /** 1579 * Creates the built-in ForkJoinPool scheduler. 1580 * @param wrapped true if wrapped by a custom default scheduler 1581 */ 1582 private static BuiltinDefaultScheduler createBuiltinDefaultScheduler(boolean wrapped) { 1583 int parallelism, maxPoolSize, minRunnable; 1584 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism"); 1585 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize"); 1586 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable"); 1587 if (parallelismValue != null) { 1588 parallelism = Integer.parseInt(parallelismValue); 1589 } else { 1590 parallelism = Runtime.getRuntime().availableProcessors(); 1591 } 1592 if (maxPoolSizeValue != null) { 1593 maxPoolSize = Integer.parseInt(maxPoolSizeValue); 1594 parallelism = Integer.min(parallelism, maxPoolSize); 1595 } else { 1596 maxPoolSize = Integer.max(parallelism, 256); 1597 } 1598 if (minRunnableValue != null) { 1599 minRunnable = Integer.parseInt(minRunnableValue); 1600 } else { 1601 minRunnable = Integer.max(parallelism / 2, 1); 1602 } 1603 return new BuiltinDefaultScheduler(parallelism, maxPoolSize, minRunnable, wrapped); 1604 } 1605 1606 /** 1607 * The built-in ForkJoinPool scheduler. 1608 */ 1609 private static class BuiltinDefaultScheduler 1610 extends ForkJoinPool implements VirtualThreadScheduler { 1611 1612 private static final StableValue<VirtualThreadScheduler> VIEW = StableValue.of(); 1613 1614 BuiltinDefaultScheduler(int parallelism, int maxPoolSize, int minRunnable, boolean wrapped) { 1615 ForkJoinWorkerThreadFactory factory = wrapped 1616 ? ForkJoinPool.defaultForkJoinWorkerThreadFactory 1617 : CarrierThread::new; 1618 Thread.UncaughtExceptionHandler handler = (t, e) -> { }; 1619 boolean asyncMode = true; // FIFO 1620 super(parallelism, factory, handler, asyncMode, 1621 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS); 1622 } 1623 1624 private void adaptAndExecute(Runnable task) { 1625 execute(ForkJoinTask.adapt(task)); 1626 } 1627 1628 @Override 1629 public void onStart(VirtualThreadTask task) { 1630 adaptAndExecute(task); 1631 } 1632 1633 @Override 1634 public void onContinue(VirtualThreadTask task) { 1635 adaptAndExecute(task); 1636 } 1637 1638 /** 1639 * Wraps the scheduler to avoid leaking a direct reference. 1640 */ 1641 VirtualThreadScheduler externalView() { 1642 BuiltinDefaultScheduler builtin = this; 1643 return VIEW.orElseSet(() -> { 1644 return new VirtualThreadScheduler() { 1645 private void execute(VirtualThreadTask task) { 1646 var vthread = (VirtualThread) task.thread(); 1647 VirtualThreadScheduler scheduler = vthread.scheduler; 1648 if (scheduler == this || scheduler == DEFAULT_SCHEDULER) { 1649 builtin.adaptAndExecute(task); 1650 } else { 1651 throw new IllegalArgumentException(); 1652 } 1653 } 1654 @Override 1655 public void onStart(VirtualThreadTask task) { 1656 execute(task); 1657 } 1658 @Override 1659 public void onContinue(VirtualThreadTask task) { 1660 execute(task); 1661 } 1662 }; 1663 }); 1664 } 1665 } 1666 1667 /** 1668 * Schedule a runnable task to run after a delay. 1669 */ 1670 private Future<?> schedule(Runnable command, long delay, TimeUnit unit) { 1671 if (scheduler instanceof ForkJoinPool pool) { 1672 return pool.schedule(command, delay, unit); 1673 } else { 1674 return DelayedTaskSchedulers.schedule(command, delay, unit); 1675 } 1676 } 1677 1678 /** 1679 * Supports scheduling a runnable task to run after a delay. It uses a number 1680 * of ScheduledThreadPoolExecutor instances to reduce contention on the delayed 1681 * work queue used. This class is used when using a custom scheduler. 1682 */ 1683 private static class DelayedTaskSchedulers { 1684 private static final ScheduledExecutorService[] INSTANCE = createDelayedTaskSchedulers(); 1685 1686 static Future<?> schedule(Runnable command, long delay, TimeUnit unit) { 1687 long tid = Thread.currentThread().threadId(); 1688 int index = (int) tid & (INSTANCE.length - 1); 1689 return INSTANCE[index].schedule(command, delay, unit); 1690 } 1691 1692 private static ScheduledExecutorService[] createDelayedTaskSchedulers() { 1693 String propName = "jdk.virtualThreadScheduler.timerQueues"; 1694 String propValue = System.getProperty(propName); 1695 int queueCount; 1696 if (propValue != null) { 1697 queueCount = Integer.parseInt(propValue); 1698 if (queueCount != Integer.highestOneBit(queueCount)) { 1699 throw new RuntimeException("Value of " + propName + " must be power of 2"); 1700 } 1701 } else { 1702 int ncpus = Runtime.getRuntime().availableProcessors(); 1703 queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1); 1704 } 1705 var schedulers = new ScheduledExecutorService[queueCount]; 1706 for (int i = 0; i < queueCount; i++) { 1707 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) 1708 Executors.newScheduledThreadPool(1, task -> { 1709 Thread t = InnocuousThread.newThread("VirtualThread-unparker", task); 1710 t.setDaemon(true); 1711 return t; 1712 }); 1713 stpe.setRemoveOnCancelPolicy(true); 1714 schedulers[i] = stpe; 1715 } 1716 return schedulers; 1717 } 1718 } 1719 1720 /** 1721 * Schedule virtual threads that are ready to be scheduled after they blocked on 1722 * monitor enter. 1723 */ 1724 private static void unblockVirtualThreads() { 1725 while (true) { 1726 VirtualThread vthread = takeVirtualThreadListToUnblock(); 1727 while (vthread != null) { 1728 assert vthread.onWaitingList; 1729 VirtualThread nextThread = vthread.next; 1730 1731 // remove from list and unblock 1732 vthread.next = null; 1733 boolean changed = vthread.compareAndSetOnWaitingList(true, false); 1734 assert changed; 1735 vthread.unblock(); 1736 1737 vthread = nextThread; 1738 } 1739 } 1740 } 1741 1742 /** 1743 * Retrieves the list of virtual threads that are waiting to be unblocked, waiting 1744 * if necessary until a list of one or more threads becomes available. 1745 */ 1746 private static native VirtualThread takeVirtualThreadListToUnblock(); 1747 1748 static { 1749 var unblocker = InnocuousThread.newThread("VirtualThread-unblocker", 1750 VirtualThread::unblockVirtualThreads); 1751 unblocker.setDaemon(true); 1752 unblocker.start(); 1753 } 1754 } --- EOF ---