1 /* 2 * Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.lang; 26 27 import java.lang.invoke.MethodHandles; 28 import java.lang.invoke.VarHandle; 29 import java.lang.reflect.Constructor; 30 import java.util.Locale; 31 import java.util.Objects; 32 import java.util.concurrent.CountDownLatch; 33 import java.util.concurrent.Executors; 34 import java.util.concurrent.ForkJoinPool; 35 import java.util.concurrent.ForkJoinTask; 36 import java.util.concurrent.Future; 37 import java.util.concurrent.RejectedExecutionException; 38 import java.util.concurrent.ScheduledExecutorService; 39 import java.util.concurrent.ScheduledThreadPoolExecutor; 40 import java.util.concurrent.TimeUnit; 41 import jdk.internal.event.VirtualThreadEndEvent; 42 import jdk.internal.event.VirtualThreadParkEvent; 43 import jdk.internal.event.VirtualThreadStartEvent; 44 import jdk.internal.event.VirtualThreadSubmitFailedEvent; 45 import jdk.internal.invoke.MhUtil; 46 import jdk.internal.misc.CarrierThread; 47 import jdk.internal.misc.InnocuousThread; 48 import jdk.internal.misc.Unsafe; 49 import jdk.internal.vm.Continuation; 50 import jdk.internal.vm.ContinuationScope; 51 import jdk.internal.vm.StackableScope; 52 import jdk.internal.vm.ThreadContainer; 53 import jdk.internal.vm.ThreadContainers; 54 import jdk.internal.vm.annotation.ChangesCurrentThread; 55 import jdk.internal.vm.annotation.Hidden; 56 import jdk.internal.vm.annotation.IntrinsicCandidate; 57 import jdk.internal.vm.annotation.JvmtiHideEvents; 58 import jdk.internal.vm.annotation.JvmtiMountTransition; 59 import jdk.internal.vm.annotation.ReservedStackAccess; 60 import sun.nio.ch.Interruptible; 61 import static java.util.concurrent.TimeUnit.*; 62 63 /** 64 * A thread that is scheduled by the Java virtual machine rather than the operating system. 65 */ 66 final class VirtualThread extends BaseVirtualThread { 67 private static final Unsafe U = Unsafe.getUnsafe(); 68 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads"); 69 70 private static final BuiltinScheduler BUILTIN_SCHEDULER; 71 private static final VirtualThreadScheduler DEFAULT_SCHEDULER; 72 private static final VirtualThreadScheduler EXTERNAL_VIEW; 73 static { 74 // experimental 75 String propValue = System.getProperty("jdk.virtualThreadScheduler.implClass"); 76 if (propValue != null) { 77 BuiltinScheduler builtinScheduler = createBuiltinScheduler(true); 78 VirtualThreadScheduler externalView = builtinScheduler.createExternalView(); 79 VirtualThreadScheduler defaultScheduler = loadCustomScheduler(externalView, propValue); 80 BUILTIN_SCHEDULER = builtinScheduler; 81 DEFAULT_SCHEDULER = defaultScheduler; 82 EXTERNAL_VIEW = externalView; 83 } else { 84 var builtinScheduler = createBuiltinScheduler(false); 85 BUILTIN_SCHEDULER = builtinScheduler; 86 DEFAULT_SCHEDULER = builtinScheduler; 87 EXTERNAL_VIEW = builtinScheduler.createExternalView(); 88 } 89 } 90 91 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state"); 92 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit"); 93 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread"); 94 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination"); 95 private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList"); 96 97 // scheduler and continuation 98 private final VirtualThreadScheduler scheduler; 99 private final Continuation cont; 100 private final VirtualThreadTask runContinuation; 101 102 // virtual thread state, accessed by VM 103 private volatile int state; 104 105 /* 106 * Virtual thread state transitions: 107 * 108 * NEW -> STARTED // Thread.start, schedule to run 109 * STARTED -> TERMINATED // failed to start 110 * STARTED -> RUNNING // first run 111 * RUNNING -> TERMINATED // done 112 * 113 * RUNNING -> PARKING // Thread parking with LockSupport.park 114 * PARKING -> PARKED // cont.yield successful, parked indefinitely 115 * PARKING -> PINNED // cont.yield failed, parked indefinitely on carrier 116 * PARKED -> UNPARKED // unparked, may be scheduled to continue 117 * PINNED -> RUNNING // unparked, continue execution on same carrier 118 * UNPARKED -> RUNNING // continue execution after park 119 * 120 * RUNNING -> TIMED_PARKING // Thread parking with LockSupport.parkNanos 121 * TIMED_PARKING -> TIMED_PARKED // cont.yield successful, timed-parked 122 * TIMED_PARKING -> TIMED_PINNED // cont.yield failed, timed-parked on carrier 123 * TIMED_PARKED -> UNPARKED // unparked, may be scheduled to continue 124 * TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier 125 * 126 * RUNNING -> BLOCKING // blocking on monitor enter 127 * BLOCKING -> BLOCKED // blocked on monitor enter 128 * BLOCKED -> UNBLOCKED // unblocked, may be scheduled to continue 129 * UNBLOCKED -> RUNNING // continue execution after blocked on monitor enter 130 * 131 * RUNNING -> WAITING // transitional state during wait on monitor 132 * WAITING -> WAIT // waiting on monitor 133 * WAIT -> BLOCKED // notified, waiting to be unblocked by monitor owner 134 * WAIT -> UNBLOCKED // timed-out/interrupted 135 * 136 * RUNNING -> TIMED_WAITING // transition state during timed-waiting on monitor 137 * TIMED_WAITING -> TIMED_WAIT // timed-waiting on monitor 138 * TIMED_WAIT -> BLOCKED // notified, waiting to be unblocked by monitor owner 139 * TIMED_WAIT -> UNBLOCKED // timed-out/interrupted 140 * 141 * RUNNING -> YIELDING // Thread.yield 142 * YIELDING -> YIELDED // cont.yield successful, may be scheduled to continue 143 * YIELDING -> RUNNING // cont.yield failed 144 * YIELDED -> RUNNING // continue execution after Thread.yield 145 */ 146 private static final int NEW = 0; 147 private static final int STARTED = 1; 148 private static final int RUNNING = 2; // runnable-mounted 149 150 // untimed and timed parking 151 private static final int PARKING = 3; 152 private static final int PARKED = 4; // unmounted 153 private static final int PINNED = 5; // mounted 154 private static final int TIMED_PARKING = 6; 155 private static final int TIMED_PARKED = 7; // unmounted 156 private static final int TIMED_PINNED = 8; // mounted 157 private static final int UNPARKED = 9; // unmounted but runnable 158 159 // Thread.yield 160 private static final int YIELDING = 10; 161 private static final int YIELDED = 11; // unmounted but runnable 162 163 // monitor enter 164 private static final int BLOCKING = 12; 165 private static final int BLOCKED = 13; // unmounted 166 private static final int UNBLOCKED = 14; // unmounted but runnable 167 168 // monitor wait/timed-wait 169 private static final int WAITING = 15; 170 private static final int WAIT = 16; // waiting in Object.wait 171 private static final int TIMED_WAITING = 17; 172 private static final int TIMED_WAIT = 18; // waiting in timed-Object.wait 173 174 private static final int TERMINATED = 99; // final state 175 176 // can be suspended from scheduling when unmounted 177 private static final int SUSPENDED = 1 << 8; 178 179 // parking permit made available by LockSupport.unpark 180 private volatile boolean parkPermit; 181 182 // blocking permit made available by unblocker thread when another thread exits monitor 183 private volatile boolean blockPermit; 184 185 // true when on the list of virtual threads waiting to be unblocked 186 private volatile boolean onWaitingList; 187 188 // next virtual thread on the list of virtual threads waiting to be unblocked 189 private volatile VirtualThread next; 190 191 // notified by Object.notify/notifyAll while waiting in Object.wait 192 private volatile boolean notified; 193 194 // true when waiting in Object.wait, false for VM internal uninterruptible Object.wait 195 private volatile boolean interruptibleWait; 196 197 // timed-wait support 198 private byte timedWaitSeqNo; 199 200 // timeout for timed-park and timed-wait, only accessed on current/carrier thread 201 private long timeout; 202 203 // timer task for timed-park and timed-wait, only accessed on current/carrier thread 204 private Future<?> timeoutTask; 205 206 // carrier thread when mounted, accessed by VM 207 private volatile Thread carrierThread; 208 209 // termination object when joining, created lazily if needed 210 private volatile CountDownLatch termination; 211 212 /** 213 * Return the built-in scheduler. 214 */ 215 static VirtualThreadScheduler builtinScheduler() { 216 return BUILTIN_SCHEDULER; 217 } 218 219 /** 220 * Returns the default scheduler, usually the same as the built-in scheduler. 221 */ 222 static VirtualThreadScheduler defaultScheduler() { 223 return DEFAULT_SCHEDULER; 224 } 225 226 /** 227 * Returns the continuation scope used for virtual threads. 228 */ 229 static ContinuationScope continuationScope() { 230 return VTHREAD_SCOPE; 231 } 232 233 /** 234 * Return the scheduler for this thread. 235 * @param trusted true if caller is trusted, false if not trusted 236 */ 237 VirtualThreadScheduler scheduler(boolean trusted) { 238 if (scheduler == BUILTIN_SCHEDULER && !trusted) { 239 return EXTERNAL_VIEW; 240 } else { 241 return scheduler; 242 } 243 } 244 245 /** 246 * Creates a new {@code VirtualThread} to run the given task with the given scheduler. 247 * 248 * @param scheduler the scheduler or null for default scheduler 249 * @param preferredCarrier the preferred carrier or null 250 * @param name thread name 251 * @param characteristics characteristics 252 * @param task the task to execute 253 */ 254 VirtualThread(VirtualThreadScheduler scheduler, 255 Thread preferredCarrier, 256 String name, 257 int characteristics, 258 Runnable task, 259 Object att) { 260 super(name, characteristics, /*bound*/ false); 261 Objects.requireNonNull(task); 262 263 // use default scheduler if not provided 264 if (scheduler == null) { 265 scheduler = DEFAULT_SCHEDULER; 266 } else if (scheduler == EXTERNAL_VIEW) { 267 throw new UnsupportedOperationException(); 268 } 269 this.scheduler = scheduler; 270 this.cont = new VThreadContinuation(this, task); 271 272 if (scheduler == BUILTIN_SCHEDULER) { 273 this.runContinuation = new BuiltinSchedulerTask(this); 274 } else { 275 this.runContinuation = new CustomSchedulerTask(this, preferredCarrier, att); 276 } 277 } 278 279 /** 280 * The task to execute when using the built-in scheduler. 281 */ 282 static final class BuiltinSchedulerTask implements VirtualThreadTask { 283 private final VirtualThread vthread; 284 BuiltinSchedulerTask(VirtualThread vthread) { 285 this.vthread = vthread; 286 } 287 @Override 288 public Thread thread() { 289 return vthread; 290 } 291 @Override 292 public void run() { 293 vthread.runContinuation();; 294 } 295 @Override 296 public Thread preferredCarrier() { 297 throw new UnsupportedOperationException(); 298 } 299 @Override 300 public Object attach(Object att) { 301 throw new UnsupportedOperationException(); 302 } 303 @Override 304 public Object attachment() { 305 throw new UnsupportedOperationException(); 306 } 307 } 308 309 /** 310 * The task to execute when using a custom scheduler. 311 */ 312 static final class CustomSchedulerTask implements VirtualThreadTask { 313 private static final VarHandle ATT = 314 MhUtil.findVarHandle(MethodHandles.lookup(), "att", Object.class); 315 private final VirtualThread vthread; 316 private final Thread preferredCarrier; 317 private volatile Object att; 318 CustomSchedulerTask(VirtualThread vthread, Thread preferredCarrier, Object att) { 319 this.vthread = vthread; 320 this.preferredCarrier = preferredCarrier; 321 if (att != null) { 322 this.att = att; 323 } 324 } 325 @Override 326 public Thread thread() { 327 return vthread; 328 } 329 @Override 330 public void run() { 331 vthread.runContinuation();; 332 } 333 @Override 334 public Thread preferredCarrier() { 335 return preferredCarrier; 336 } 337 @Override 338 public Object attach(Object att) { 339 return ATT.getAndSet(this, att); 340 } 341 @Override 342 public Object attachment() { 343 return att; 344 } 345 } 346 347 /** 348 * The continuation that a virtual thread executes. 349 */ 350 private static class VThreadContinuation extends Continuation { 351 VThreadContinuation(VirtualThread vthread, Runnable task) { 352 super(VTHREAD_SCOPE, wrap(vthread, task)); 353 } 354 @Override 355 protected void onPinned(Continuation.Pinned reason) { 356 } 357 private static Runnable wrap(VirtualThread vthread, Runnable task) { 358 return new Runnable() { 359 @Hidden 360 @JvmtiHideEvents 361 public void run() { 362 vthread.endFirstTransition(); 363 try { 364 vthread.run(task); 365 } finally { 366 vthread.startFinalTransition(); 367 } 368 } 369 }; 370 } 371 } 372 373 /** 374 * Runs or continues execution on the current thread. The virtual thread is mounted 375 * on the current thread before the task runs or continues. It unmounts when the 376 * task completes or yields. 377 */ 378 @ChangesCurrentThread // allow mount/unmount to be inlined 379 private void runContinuation() { 380 // the carrier must be a platform thread 381 if (Thread.currentThread().isVirtual()) { 382 throw new WrongThreadException(); 383 } 384 385 // set state to RUNNING 386 int initialState = state(); 387 if (initialState == STARTED || initialState == UNPARKED 388 || initialState == UNBLOCKED || initialState == YIELDED) { 389 // newly started or continue after parking/blocking/Thread.yield 390 if (!compareAndSetState(initialState, RUNNING)) { 391 return; 392 } 393 // consume permit when continuing after parking or blocking. If continue 394 // after a timed-park or timed-wait then the timeout task is cancelled. 395 if (initialState == UNPARKED) { 396 cancelTimeoutTask(); 397 setParkPermit(false); 398 } else if (initialState == UNBLOCKED) { 399 cancelTimeoutTask(); 400 blockPermit = false; 401 } 402 } else { 403 // not runnable 404 return; 405 } 406 407 mount(); 408 try { 409 cont.run(); 410 } finally { 411 unmount(); 412 if (cont.isDone()) { 413 afterDone(); 414 } else { 415 afterYield(); 416 } 417 } 418 } 419 420 /** 421 * Cancel timeout task when continuing after timed-park or timed-wait. 422 * The timeout task may be executing, or may have already completed. 423 */ 424 private void cancelTimeoutTask() { 425 if (timeoutTask != null) { 426 timeoutTask.cancel(false); 427 timeoutTask = null; 428 } 429 } 430 431 /** 432 * Submits the runContinuation task to the scheduler. For the built-in scheduler, 433 * the task will be pushed to the local queue if possible, otherwise it will be 434 * pushed to an external submission queue. 435 * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown 436 * @throws RejectedExecutionException 437 */ 438 private void submitRunContinuation(boolean retryOnOOME) { 439 boolean done = false; 440 while (!done) { 441 try { 442 // Pin the continuation to prevent the virtual thread from unmounting 443 // when submitting a task. For the default scheduler this ensures that 444 // the carrier doesn't change when pushing a task. For other schedulers 445 // it avoids deadlock that could arise due to carriers and virtual 446 // threads contending for a lock. 447 if (currentThread().isVirtual()) { 448 Continuation.pin(); 449 try { 450 scheduler.onContinue(runContinuation); 451 } finally { 452 Continuation.unpin(); 453 } 454 } else { 455 scheduler.onContinue(runContinuation); 456 } 457 done = true; 458 } catch (RejectedExecutionException ree) { 459 submitFailed(ree); 460 throw ree; 461 } catch (OutOfMemoryError e) { 462 if (retryOnOOME) { 463 U.park(false, 100_000_000); // 100ms 464 } else { 465 throw e; 466 } 467 } 468 } 469 } 470 471 /** 472 * Submits the runContinuation task to the scheduler. For the default scheduler, 473 * and calling it on a worker thread, the task will be pushed to the local queue, 474 * otherwise it will be pushed to an external submission queue. 475 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 476 * @throws RejectedExecutionException 477 */ 478 private void submitRunContinuation() { 479 submitRunContinuation(true); 480 } 481 482 /** 483 * Invoked from a carrier thread to lazy submit the runContinuation task to the 484 * carrier's local queue if the queue is empty. If not empty, or invoked by a thread 485 * for a custom scheduler, then it just submits the task to the scheduler. 486 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 487 * @throws RejectedExecutionException 488 * @see ForkJoinPool#lazySubmit(ForkJoinTask) 489 */ 490 private void lazySubmitRunContinuation() { 491 assert !currentThread().isVirtual(); 492 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 493 try { 494 ct.getPool().lazySubmit(ForkJoinTask.adapt(runContinuation)); 495 } catch (RejectedExecutionException ree) { 496 submitFailed(ree); 497 throw ree; 498 } catch (OutOfMemoryError e) { 499 submitRunContinuation(); 500 } 501 } else { 502 submitRunContinuation(); 503 } 504 } 505 506 /** 507 * Invoked from a carrier thread to externally submit the runContinuation task to the 508 * scheduler. If invoked by a thread for a custom scheduler, then it just submits the 509 * task to the scheduler. 510 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 511 * @throws RejectedExecutionException 512 * @see ForkJoinPool#externalSubmit(ForkJoinTask) 513 */ 514 private void externalSubmitRunContinuation() { 515 assert !currentThread().isVirtual(); 516 if (currentThread() instanceof CarrierThread ct) { 517 try { 518 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation)); 519 } catch (RejectedExecutionException ree) { 520 submitFailed(ree); 521 throw ree; 522 } catch (OutOfMemoryError e) { 523 submitRunContinuation(); 524 } 525 } else { 526 submitRunContinuation(); 527 } 528 } 529 530 /** 531 * Invoked from Thread.start to externally submit the runContinuation task to the 532 * scheduler. If this virtual thread is scheduled by the built-in scheduler, 533 * and this method is called from a virtual thread scheduled by the built-in 534 * scheduler, then it uses externalSubmit to ensure that the task is pushed to an 535 * external submission queue rather than the local queue. 536 * @throws RejectedExecutionException 537 * @throws OutOfMemoryError 538 * @see ForkJoinPool#externalSubmit(ForkJoinTask) 539 */ 540 private void externalSubmitRunContinuationOrThrow() { 541 try { 542 if (currentThread().isVirtual()) { 543 // Pin the continuation to prevent the virtual thread from unmounting 544 // when submitting a task. This avoids deadlock that could arise due to 545 // carriers and virtual threads contending for a lock. 546 Continuation.pin(); 547 try { 548 if (scheduler == BUILTIN_SCHEDULER 549 && currentCarrierThread() instanceof CarrierThread ct) { 550 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation)); 551 } else { 552 scheduler.onStart(runContinuation); 553 } 554 } finally { 555 Continuation.unpin(); 556 } 557 } else { 558 scheduler.onStart(runContinuation); 559 } 560 } catch (RejectedExecutionException ree) { 561 submitFailed(ree); 562 throw ree; 563 } 564 } 565 566 /** 567 * If enabled, emits a JFR VirtualThreadSubmitFailedEvent. 568 */ 569 private void submitFailed(RejectedExecutionException ree) { 570 var event = new VirtualThreadSubmitFailedEvent(); 571 if (event.isEnabled()) { 572 event.javaThreadId = threadId(); 573 event.exceptionMessage = ree.getMessage(); 574 event.commit(); 575 } 576 } 577 578 /** 579 * Runs a task in the context of this virtual thread. 580 */ 581 private void run(Runnable task) { 582 assert Thread.currentThread() == this && state == RUNNING; 583 584 // emit JFR event if enabled 585 if (VirtualThreadStartEvent.isTurnedOn()) { 586 var event = new VirtualThreadStartEvent(); 587 event.javaThreadId = threadId(); 588 event.commit(); 589 } 590 591 Object bindings = Thread.scopedValueBindings(); 592 try { 593 runWith(bindings, task); 594 } catch (Throwable exc) { 595 dispatchUncaughtException(exc); 596 } finally { 597 // pop any remaining scopes from the stack, this may block 598 StackableScope.popAll(); 599 600 // emit JFR event if enabled 601 if (VirtualThreadEndEvent.isTurnedOn()) { 602 var event = new VirtualThreadEndEvent(); 603 event.javaThreadId = threadId(); 604 event.commit(); 605 } 606 } 607 } 608 609 /** 610 * Mounts this virtual thread onto the current platform thread. On 611 * return, the current thread is the virtual thread. 612 */ 613 @ChangesCurrentThread 614 @ReservedStackAccess 615 private void mount() { 616 startTransition(/*is_mount*/true); 617 // We assume following volatile accesses provide equivalent 618 // of acquire ordering, otherwise we need U.loadFence() here. 619 620 // sets the carrier thread 621 Thread carrier = Thread.currentCarrierThread(); 622 setCarrierThread(carrier); 623 624 // sync up carrier thread interrupted status if needed 625 if (interrupted) { 626 carrier.setInterrupt(); 627 } else if (carrier.isInterrupted()) { 628 synchronized (interruptLock) { 629 // need to recheck interrupted status 630 if (!interrupted) { 631 carrier.clearInterrupt(); 632 } 633 } 634 } 635 636 // set Thread.currentThread() to return this virtual thread 637 carrier.setCurrentThread(this); 638 } 639 640 /** 641 * Unmounts this virtual thread from the carrier. On return, the 642 * current thread is the current platform thread. 643 */ 644 @ChangesCurrentThread 645 @ReservedStackAccess 646 private void unmount() { 647 assert !Thread.holdsLock(interruptLock); 648 649 // set Thread.currentThread() to return the platform thread 650 Thread carrier = this.carrierThread; 651 carrier.setCurrentThread(carrier); 652 653 // break connection to carrier thread, synchronized with interrupt 654 synchronized (interruptLock) { 655 setCarrierThread(null); 656 } 657 carrier.clearInterrupt(); 658 659 // We assume previous volatile accesses provide equivalent 660 // of release ordering, otherwise we need U.storeFence() here. 661 endTransition(/*is_mount*/false); 662 } 663 664 /** 665 * Invokes Continuation.yield, notifying JVMTI (if enabled) to hide frames until 666 * the continuation continues. 667 */ 668 @Hidden 669 private boolean yieldContinuation() { 670 startTransition(/*is_mount*/false); 671 try { 672 return Continuation.yield(VTHREAD_SCOPE); 673 } finally { 674 endTransition(/*is_mount*/true); 675 } 676 } 677 678 /** 679 * Invoked in the context of the carrier thread after the Continuation yields when 680 * parking, blocking on monitor enter, Object.wait, or Thread.yield. 681 */ 682 private void afterYield() { 683 assert carrierThread == null; 684 685 // re-adjust parallelism if the virtual thread yielded when compensating 686 if (currentThread() instanceof CarrierThread ct) { 687 ct.endBlocking(); 688 } 689 690 int s = state(); 691 692 // LockSupport.park/parkNanos 693 if (s == PARKING || s == TIMED_PARKING) { 694 int newState; 695 if (s == PARKING) { 696 setState(newState = PARKED); 697 } else { 698 // schedule unpark 699 long timeout = this.timeout; 700 assert timeout > 0; 701 timeoutTask = schedule(this::parkTimeoutExpired, timeout, NANOSECONDS); 702 setState(newState = TIMED_PARKED); 703 } 704 705 // may have been unparked while parking 706 if (parkPermit && compareAndSetState(newState, UNPARKED)) { 707 // lazy submit if local queue is empty 708 lazySubmitRunContinuation(); 709 } 710 return; 711 } 712 713 // Thread.yield 714 if (s == YIELDING) { 715 setState(YIELDED); 716 717 // external submit if there are no tasks in the local task queue 718 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 719 externalSubmitRunContinuation(); 720 } else { 721 submitRunContinuation(); 722 } 723 return; 724 } 725 726 // blocking on monitorenter 727 if (s == BLOCKING) { 728 setState(BLOCKED); 729 730 // may have been unblocked while blocking 731 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) { 732 // lazy submit if local queue is empty 733 lazySubmitRunContinuation(); 734 } 735 return; 736 } 737 738 // Object.wait 739 if (s == WAITING || s == TIMED_WAITING) { 740 int newState; 741 boolean interruptible = interruptibleWait; 742 if (s == WAITING) { 743 setState(newState = WAIT); 744 } else { 745 // For timed-wait, a timeout task is scheduled to execute. The timeout 746 // task will change the thread state to UNBLOCKED and submit the thread 747 // to the scheduler. A sequence number is used to ensure that the timeout 748 // task only unblocks the thread for this timed-wait. We synchronize with 749 // the timeout task to coordinate access to the sequence number and to 750 // ensure the timeout task doesn't execute until the thread has got to 751 // the TIMED_WAIT state. 752 long timeout = this.timeout; 753 assert timeout > 0; 754 synchronized (timedWaitLock()) { 755 byte seqNo = ++timedWaitSeqNo; 756 timeoutTask = schedule(() -> waitTimeoutExpired(seqNo), timeout, MILLISECONDS); 757 setState(newState = TIMED_WAIT); 758 } 759 } 760 761 // may have been notified while in transition to wait state 762 if (notified && compareAndSetState(newState, BLOCKED)) { 763 // may have even been unblocked already 764 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) { 765 submitRunContinuation(); 766 } 767 return; 768 } 769 770 // may have been interrupted while in transition to wait state 771 if (interruptible && interrupted && compareAndSetState(newState, UNBLOCKED)) { 772 submitRunContinuation(); 773 return; 774 } 775 return; 776 } 777 778 assert false; 779 } 780 781 /** 782 * Invoked after the continuation completes. 783 */ 784 private void afterDone() { 785 afterDone(true); 786 } 787 788 /** 789 * Invoked after the continuation completes (or start failed). Sets the thread 790 * state to TERMINATED and notifies anyone waiting for the thread to terminate. 791 * 792 * @param notifyContainer true if its container should be notified 793 */ 794 private void afterDone(boolean notifyContainer) { 795 assert carrierThread == null; 796 setState(TERMINATED); 797 798 // notify anyone waiting for this virtual thread to terminate 799 CountDownLatch termination = this.termination; 800 if (termination != null) { 801 assert termination.getCount() == 1; 802 termination.countDown(); 803 } 804 805 // notify container 806 if (notifyContainer) { 807 threadContainer().remove(this); 808 } 809 810 // clear references to thread locals 811 clearReferences(); 812 } 813 814 /** 815 * Schedules this {@code VirtualThread} to execute. 816 * 817 * @throws IllegalStateException if the container is shutdown or closed 818 * @throws IllegalThreadStateException if the thread has already been started 819 * @throws RejectedExecutionException if the scheduler cannot accept a task 820 */ 821 @Override 822 void start(ThreadContainer container) { 823 if (!compareAndSetState(NEW, STARTED)) { 824 throw new IllegalThreadStateException("Already started"); 825 } 826 827 // bind thread to container 828 assert threadContainer() == null; 829 setThreadContainer(container); 830 831 // start thread 832 boolean addedToContainer = false; 833 boolean started = false; 834 try { 835 container.add(this); // may throw 836 addedToContainer = true; 837 838 // scoped values may be inherited 839 inheritScopedValueBindings(container); 840 841 // submit task to run thread, using externalSubmit if possible 842 externalSubmitRunContinuationOrThrow(); 843 started = true; 844 } finally { 845 if (!started) { 846 afterDone(addedToContainer); 847 } 848 } 849 } 850 851 @Override 852 public void start() { 853 start(ThreadContainers.root()); 854 } 855 856 @Override 857 public void run() { 858 // do nothing 859 } 860 861 /** 862 * Parks until unparked or interrupted. If already unparked then the parking 863 * permit is consumed and this method completes immediately (meaning it doesn't 864 * yield). It also completes immediately if the interrupted status is set. 865 */ 866 @Override 867 void park() { 868 assert Thread.currentThread() == this; 869 870 // complete immediately if parking permit available or interrupted 871 if (getAndSetParkPermit(false) || interrupted) 872 return; 873 874 // park the thread 875 boolean yielded = false; 876 long eventStartTime = VirtualThreadParkEvent.eventStartTime(); 877 setState(PARKING); 878 try { 879 yielded = yieldContinuation(); 880 } catch (OutOfMemoryError e) { 881 // park on carrier 882 } finally { 883 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 884 if (yielded) { 885 VirtualThreadParkEvent.offer(eventStartTime, Long.MIN_VALUE); 886 } else { 887 assert state() == PARKING; 888 setState(RUNNING); 889 } 890 } 891 892 // park on the carrier thread when pinned 893 if (!yielded) { 894 parkOnCarrierThread(false, 0); 895 } 896 } 897 898 /** 899 * Parks up to the given waiting time or until unparked or interrupted. 900 * If already unparked then the parking permit is consumed and this method 901 * completes immediately (meaning it doesn't yield). It also completes immediately 902 * if the interrupted status is set or the waiting time is {@code <= 0}. 903 * 904 * @param nanos the maximum number of nanoseconds to wait. 905 */ 906 @Override 907 void parkNanos(long nanos) { 908 assert Thread.currentThread() == this; 909 910 // complete immediately if parking permit available or interrupted 911 if (getAndSetParkPermit(false) || interrupted) 912 return; 913 914 // park the thread for the waiting time 915 if (nanos > 0) { 916 long startTime = System.nanoTime(); 917 918 // park the thread, afterYield will schedule the thread to unpark 919 boolean yielded = false; 920 long eventStartTime = VirtualThreadParkEvent.eventStartTime(); 921 timeout = nanos; 922 setState(TIMED_PARKING); 923 try { 924 yielded = yieldContinuation(); 925 } catch (OutOfMemoryError e) { 926 // park on carrier 927 } finally { 928 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 929 if (yielded) { 930 VirtualThreadParkEvent.offer(eventStartTime, nanos); 931 } else { 932 assert state() == TIMED_PARKING; 933 setState(RUNNING); 934 } 935 } 936 937 // park on carrier thread for remaining time when pinned (or OOME) 938 if (!yielded) { 939 long remainingNanos = nanos - (System.nanoTime() - startTime); 940 parkOnCarrierThread(true, remainingNanos); 941 } 942 } 943 } 944 945 /** 946 * Parks the current carrier thread up to the given waiting time or until 947 * unparked or interrupted. If the virtual thread is interrupted then the 948 * interrupted status will be propagated to the carrier thread. 949 * @param timed true for a timed park, false for untimed 950 * @param nanos the waiting time in nanoseconds 951 */ 952 private void parkOnCarrierThread(boolean timed, long nanos) { 953 assert state() == RUNNING; 954 955 setState(timed ? TIMED_PINNED : PINNED); 956 try { 957 if (!parkPermit) { 958 if (!timed) { 959 U.park(false, 0); 960 } else if (nanos > 0) { 961 U.park(false, nanos); 962 } 963 } 964 } finally { 965 setState(RUNNING); 966 } 967 968 // consume parking permit 969 setParkPermit(false); 970 971 // JFR jdk.VirtualThreadPinned event 972 postPinnedEvent("LockSupport.park"); 973 } 974 975 /** 976 * Call into VM when pinned to record a JFR jdk.VirtualThreadPinned event. 977 * Recording the event in the VM avoids having JFR event recorded in Java 978 * with the same name, but different ID, to events recorded by the VM. 979 */ 980 @Hidden 981 private static native void postPinnedEvent(String op); 982 983 /** 984 * Re-enables this virtual thread for scheduling. If this virtual thread is parked 985 * then its task is scheduled to continue, otherwise its next call to {@code park} or 986 * {@linkplain #parkNanos(long) parkNanos} is guaranteed not to block. 987 * @throws RejectedExecutionException if the scheduler cannot accept a task 988 */ 989 @Override 990 void unpark() { 991 if (!getAndSetParkPermit(true) && currentThread() != this) { 992 int s = state(); 993 994 // unparked while parked 995 if ((s == PARKED || s == TIMED_PARKED) && compareAndSetState(s, UNPARKED)) { 996 submitRunContinuation(); 997 return; 998 } 999 1000 // unparked while parked when pinned 1001 if (s == PINNED || s == TIMED_PINNED) { 1002 // unpark carrier thread when pinned 1003 disableSuspendAndPreempt(); 1004 try { 1005 synchronized (carrierThreadAccessLock()) { 1006 Thread carrier = carrierThread; 1007 if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) { 1008 U.unpark(carrier); 1009 } 1010 } 1011 } finally { 1012 enableSuspendAndPreempt(); 1013 } 1014 return; 1015 } 1016 } 1017 } 1018 1019 /** 1020 * Invoked by unblocker thread to unblock this virtual thread. 1021 */ 1022 private void unblock() { 1023 assert !Thread.currentThread().isVirtual(); 1024 blockPermit = true; 1025 if (state() == BLOCKED && compareAndSetState(BLOCKED, UNBLOCKED)) { 1026 submitRunContinuation(); 1027 } 1028 } 1029 1030 /** 1031 * Invoked by FJP worker thread or STPE thread when park timeout expires. 1032 */ 1033 private void parkTimeoutExpired() { 1034 assert !VirtualThread.currentThread().isVirtual(); 1035 if (!getAndSetParkPermit(true) 1036 && (state() == TIMED_PARKED) 1037 && compareAndSetState(TIMED_PARKED, UNPARKED)) { 1038 lazySubmitRunContinuation(); 1039 } 1040 } 1041 1042 /** 1043 * Invoked by FJP worker thread or STPE thread when wait timeout expires. 1044 * If the virtual thread is in timed-wait then this method will unblock the thread 1045 * and submit its task so that it continues and attempts to reenter the monitor. 1046 * This method does nothing if the thread has been woken by notify or interrupt. 1047 */ 1048 private void waitTimeoutExpired(byte seqNo) { 1049 assert !Thread.currentThread().isVirtual(); 1050 for (;;) { 1051 boolean unblocked = false; 1052 synchronized (timedWaitLock()) { 1053 if (seqNo != timedWaitSeqNo) { 1054 // this timeout task is for a past timed-wait 1055 return; 1056 } 1057 int s = state(); 1058 if (s == TIMED_WAIT) { 1059 unblocked = compareAndSetState(TIMED_WAIT, UNBLOCKED); 1060 } else if (s != (TIMED_WAIT | SUSPENDED)) { 1061 // notified or interrupted, no longer waiting 1062 return; 1063 } 1064 } 1065 if (unblocked) { 1066 lazySubmitRunContinuation(); 1067 return; 1068 } 1069 // need to retry when thread is suspended in time-wait 1070 Thread.yield(); 1071 } 1072 } 1073 1074 /** 1075 * Attempts to yield the current virtual thread (Thread.yield). 1076 */ 1077 void tryYield() { 1078 assert Thread.currentThread() == this; 1079 setState(YIELDING); 1080 boolean yielded = false; 1081 try { 1082 yielded = yieldContinuation(); // may throw 1083 } finally { 1084 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 1085 if (!yielded) { 1086 assert state() == YIELDING; 1087 setState(RUNNING); 1088 } 1089 } 1090 } 1091 1092 /** 1093 * Sleep the current thread for the given sleep time (in nanoseconds). If 1094 * nanos is 0 then the thread will attempt to yield. 1095 * 1096 * @implNote This implementation parks the thread for the given sleeping time 1097 * and will therefore be observed in PARKED state during the sleep. Parking 1098 * will consume the parking permit so this method makes available the parking 1099 * permit after the sleep. This may be observed as a spurious, but benign, 1100 * wakeup when the thread subsequently attempts to park. 1101 * 1102 * @param nanos the maximum number of nanoseconds to sleep 1103 * @throws InterruptedException if interrupted while sleeping 1104 */ 1105 void sleepNanos(long nanos) throws InterruptedException { 1106 assert Thread.currentThread() == this && nanos >= 0; 1107 if (getAndClearInterrupt()) 1108 throw new InterruptedException(); 1109 if (nanos == 0) { 1110 tryYield(); 1111 } else { 1112 // park for the sleep time 1113 try { 1114 long remainingNanos = nanos; 1115 long startNanos = System.nanoTime(); 1116 while (remainingNanos > 0) { 1117 parkNanos(remainingNanos); 1118 if (getAndClearInterrupt()) { 1119 throw new InterruptedException(); 1120 } 1121 remainingNanos = nanos - (System.nanoTime() - startNanos); 1122 } 1123 } finally { 1124 // may have been unparked while sleeping 1125 setParkPermit(true); 1126 } 1127 } 1128 } 1129 1130 /** 1131 * Waits up to {@code nanos} nanoseconds for this virtual thread to terminate. 1132 * A timeout of {@code 0} means to wait forever. 1133 * 1134 * @throws InterruptedException if interrupted while waiting 1135 * @return true if the thread has terminated 1136 */ 1137 boolean joinNanos(long nanos) throws InterruptedException { 1138 if (state() == TERMINATED) 1139 return true; 1140 1141 // ensure termination object exists, then re-check state 1142 CountDownLatch termination = getTermination(); 1143 if (state() == TERMINATED) 1144 return true; 1145 1146 // wait for virtual thread to terminate 1147 if (nanos == 0) { 1148 termination.await(); 1149 } else { 1150 boolean terminated = termination.await(nanos, NANOSECONDS); 1151 if (!terminated) { 1152 // waiting time elapsed 1153 return false; 1154 } 1155 } 1156 assert state() == TERMINATED; 1157 return true; 1158 } 1159 1160 @Override 1161 void blockedOn(Interruptible b) { 1162 disableSuspendAndPreempt(); 1163 try { 1164 super.blockedOn(b); 1165 } finally { 1166 enableSuspendAndPreempt(); 1167 } 1168 } 1169 1170 @Override 1171 public void interrupt() { 1172 if (Thread.currentThread() != this) { 1173 // if current thread is a virtual thread then prevent it from being 1174 // suspended or unmounted when entering or holding interruptLock 1175 Interruptible blocker; 1176 disableSuspendAndPreempt(); 1177 try { 1178 synchronized (interruptLock) { 1179 interrupted = true; 1180 blocker = nioBlocker(); 1181 if (blocker != null) { 1182 blocker.interrupt(this); 1183 } 1184 1185 // interrupt carrier thread if mounted 1186 Thread carrier = carrierThread; 1187 if (carrier != null) carrier.setInterrupt(); 1188 } 1189 } finally { 1190 enableSuspendAndPreempt(); 1191 } 1192 1193 // notify blocker after releasing interruptLock 1194 if (blocker != null) { 1195 blocker.postInterrupt(); 1196 } 1197 1198 // make available parking permit, unpark thread if parked 1199 unpark(); 1200 1201 // if thread is waiting in Object.wait then schedule to try to reenter 1202 int s = state(); 1203 if ((s == WAIT || s == TIMED_WAIT) && compareAndSetState(s, UNBLOCKED)) { 1204 submitRunContinuation(); 1205 } 1206 1207 } else { 1208 interrupted = true; 1209 carrierThread.setInterrupt(); 1210 setParkPermit(true); 1211 } 1212 } 1213 1214 @Override 1215 public boolean isInterrupted() { 1216 return interrupted; 1217 } 1218 1219 @Override 1220 boolean getAndClearInterrupt() { 1221 assert Thread.currentThread() == this; 1222 boolean oldValue = interrupted; 1223 if (oldValue) { 1224 disableSuspendAndPreempt(); 1225 try { 1226 synchronized (interruptLock) { 1227 interrupted = false; 1228 carrierThread.clearInterrupt(); 1229 } 1230 } finally { 1231 enableSuspendAndPreempt(); 1232 } 1233 } 1234 return oldValue; 1235 } 1236 1237 @Override 1238 Thread.State threadState() { 1239 int s = state(); 1240 switch (s & ~SUSPENDED) { 1241 case NEW: 1242 return Thread.State.NEW; 1243 case STARTED: 1244 // return NEW if thread container not yet set 1245 if (threadContainer() == null) { 1246 return Thread.State.NEW; 1247 } else { 1248 return Thread.State.RUNNABLE; 1249 } 1250 case UNPARKED: 1251 case UNBLOCKED: 1252 case YIELDED: 1253 // runnable, not mounted 1254 return Thread.State.RUNNABLE; 1255 case RUNNING: 1256 // if mounted then return state of carrier thread 1257 if (Thread.currentThread() != this) { 1258 disableSuspendAndPreempt(); 1259 try { 1260 synchronized (carrierThreadAccessLock()) { 1261 Thread carrierThread = this.carrierThread; 1262 if (carrierThread != null) { 1263 return carrierThread.threadState(); 1264 } 1265 } 1266 } finally { 1267 enableSuspendAndPreempt(); 1268 } 1269 } 1270 // runnable, mounted 1271 return Thread.State.RUNNABLE; 1272 case PARKING: 1273 case TIMED_PARKING: 1274 case WAITING: 1275 case TIMED_WAITING: 1276 case YIELDING: 1277 // runnable, in transition 1278 return Thread.State.RUNNABLE; 1279 case PARKED: 1280 case PINNED: 1281 case WAIT: 1282 return Thread.State.WAITING; 1283 case TIMED_PARKED: 1284 case TIMED_PINNED: 1285 case TIMED_WAIT: 1286 return Thread.State.TIMED_WAITING; 1287 case BLOCKING: 1288 case BLOCKED: 1289 return Thread.State.BLOCKED; 1290 case TERMINATED: 1291 return Thread.State.TERMINATED; 1292 default: 1293 throw new InternalError(); 1294 } 1295 } 1296 1297 @Override 1298 boolean alive() { 1299 int s = state; 1300 return (s != NEW && s != TERMINATED); 1301 } 1302 1303 @Override 1304 boolean isTerminated() { 1305 return (state == TERMINATED); 1306 } 1307 1308 @Override 1309 StackTraceElement[] asyncGetStackTrace() { 1310 StackTraceElement[] stackTrace; 1311 do { 1312 stackTrace = (carrierThread != null) 1313 ? super.asyncGetStackTrace() // mounted 1314 : tryGetStackTrace(); // unmounted 1315 if (stackTrace == null) { 1316 Thread.yield(); 1317 } 1318 } while (stackTrace == null); 1319 return stackTrace; 1320 } 1321 1322 /** 1323 * Returns the stack trace for this virtual thread if it is unmounted. 1324 * Returns null if the thread is mounted or in transition. 1325 */ 1326 private StackTraceElement[] tryGetStackTrace() { 1327 int initialState = state() & ~SUSPENDED; 1328 switch (initialState) { 1329 case NEW, STARTED, TERMINATED -> { 1330 return new StackTraceElement[0]; // unmounted, empty stack 1331 } 1332 case RUNNING, PINNED, TIMED_PINNED -> { 1333 return null; // mounted 1334 } 1335 case PARKED, TIMED_PARKED, BLOCKED, WAIT, TIMED_WAIT -> { 1336 // unmounted, not runnable 1337 } 1338 case UNPARKED, UNBLOCKED, YIELDED -> { 1339 // unmounted, runnable 1340 } 1341 case PARKING, TIMED_PARKING, BLOCKING, YIELDING, WAITING, TIMED_WAITING -> { 1342 return null; // in transition 1343 } 1344 default -> throw new InternalError("" + initialState); 1345 } 1346 1347 // thread is unmounted, prevent it from continuing 1348 int suspendedState = initialState | SUSPENDED; 1349 if (!compareAndSetState(initialState, suspendedState)) { 1350 return null; 1351 } 1352 1353 // get stack trace and restore state 1354 StackTraceElement[] stack; 1355 try { 1356 stack = cont.getStackTrace(); 1357 } finally { 1358 assert state == suspendedState; 1359 setState(initialState); 1360 } 1361 boolean resubmit = switch (initialState) { 1362 case UNPARKED, UNBLOCKED, YIELDED -> { 1363 // resubmit as task may have run while suspended 1364 yield true; 1365 } 1366 case PARKED, TIMED_PARKED -> { 1367 // resubmit if unparked while suspended 1368 yield parkPermit && compareAndSetState(initialState, UNPARKED); 1369 } 1370 case BLOCKED -> { 1371 // resubmit if unblocked while suspended 1372 yield blockPermit && compareAndSetState(BLOCKED, UNBLOCKED); 1373 } 1374 case WAIT, TIMED_WAIT -> { 1375 // resubmit if notified or interrupted while waiting (Object.wait) 1376 // waitTimeoutExpired will retry if the timed expired when suspended 1377 yield (notified || interrupted) && compareAndSetState(initialState, UNBLOCKED); 1378 } 1379 default -> throw new InternalError(); 1380 }; 1381 if (resubmit) { 1382 submitRunContinuation(); 1383 } 1384 return stack; 1385 } 1386 1387 @Override 1388 public String toString() { 1389 StringBuilder sb = new StringBuilder("VirtualThread[#"); 1390 sb.append(threadId()); 1391 String name = getName(); 1392 if (!name.isEmpty()) { 1393 sb.append(","); 1394 sb.append(name); 1395 } 1396 sb.append("]/"); 1397 1398 // add the carrier state and thread name when mounted 1399 boolean mounted; 1400 if (Thread.currentThread() == this) { 1401 mounted = appendCarrierInfo(sb); 1402 } else { 1403 disableSuspendAndPreempt(); 1404 try { 1405 synchronized (carrierThreadAccessLock()) { 1406 mounted = appendCarrierInfo(sb); 1407 } 1408 } finally { 1409 enableSuspendAndPreempt(); 1410 } 1411 } 1412 1413 // add virtual thread state when not mounted 1414 if (!mounted) { 1415 String stateAsString = threadState().toString(); 1416 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1417 } 1418 1419 return sb.toString(); 1420 } 1421 1422 /** 1423 * Appends the carrier state and thread name to the string buffer if mounted. 1424 * @return true if mounted, false if not mounted 1425 */ 1426 private boolean appendCarrierInfo(StringBuilder sb) { 1427 assert Thread.currentThread() == this || Thread.holdsLock(carrierThreadAccessLock()); 1428 Thread carrier = carrierThread; 1429 if (carrier != null) { 1430 String stateAsString = carrier.threadState().toString(); 1431 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1432 sb.append('@'); 1433 sb.append(carrier.getName()); 1434 return true; 1435 } else { 1436 return false; 1437 } 1438 } 1439 1440 @Override 1441 public int hashCode() { 1442 return (int) threadId(); 1443 } 1444 1445 @Override 1446 public boolean equals(Object obj) { 1447 return obj == this; 1448 } 1449 1450 /** 1451 * Returns the termination object, creating it if needed. 1452 */ 1453 private CountDownLatch getTermination() { 1454 CountDownLatch termination = this.termination; 1455 if (termination == null) { 1456 termination = new CountDownLatch(1); 1457 if (!U.compareAndSetReference(this, TERMINATION, null, termination)) { 1458 termination = this.termination; 1459 } 1460 } 1461 return termination; 1462 } 1463 1464 /** 1465 * Returns the lock object to synchronize on when accessing carrierThread. 1466 * The lock prevents carrierThread from being reset to null during unmount. 1467 */ 1468 private Object carrierThreadAccessLock() { 1469 // return interruptLock as unmount has to coordinate with interrupt 1470 return interruptLock; 1471 } 1472 1473 /** 1474 * Returns a lock object for coordinating timed-wait setup and timeout handling. 1475 */ 1476 private Object timedWaitLock() { 1477 // use this object for now to avoid the overhead of introducing another lock 1478 return runContinuation; 1479 } 1480 1481 /** 1482 * Disallow the current thread be suspended or preempted. 1483 */ 1484 private void disableSuspendAndPreempt() { 1485 notifyJvmtiDisableSuspend(true); 1486 Continuation.pin(); 1487 } 1488 1489 /** 1490 * Allow the current thread be suspended or preempted. 1491 */ 1492 private void enableSuspendAndPreempt() { 1493 Continuation.unpin(); 1494 notifyJvmtiDisableSuspend(false); 1495 } 1496 1497 // -- wrappers for get/set of state, parking permit, and carrier thread -- 1498 1499 private int state() { 1500 return state; // volatile read 1501 } 1502 1503 private void setState(int newValue) { 1504 state = newValue; // volatile write 1505 } 1506 1507 private boolean compareAndSetState(int expectedValue, int newValue) { 1508 return U.compareAndSetInt(this, STATE, expectedValue, newValue); 1509 } 1510 1511 private boolean compareAndSetOnWaitingList(boolean expectedValue, boolean newValue) { 1512 return U.compareAndSetBoolean(this, ON_WAITING_LIST, expectedValue, newValue); 1513 } 1514 1515 private void setParkPermit(boolean newValue) { 1516 if (parkPermit != newValue) { 1517 parkPermit = newValue; 1518 } 1519 } 1520 1521 private boolean getAndSetParkPermit(boolean newValue) { 1522 if (parkPermit != newValue) { 1523 return U.getAndSetBoolean(this, PARK_PERMIT, newValue); 1524 } else { 1525 return newValue; 1526 } 1527 } 1528 1529 private void setCarrierThread(Thread carrier) { 1530 // U.putReferenceRelease(this, CARRIER_THREAD, carrier); 1531 this.carrierThread = carrier; 1532 } 1533 1534 // The following four methods notify the VM when a "transition" starts and ends. 1535 // A "mount transition" embodies the steps to transfer control from a platform 1536 // thread to a virtual thread, changing the thread identity, and starting or 1537 // resuming the virtual thread's continuation on the carrier. 1538 // An "unmount transition" embodies the steps to transfer control from a virtual 1539 // thread to its carrier, suspending the virtual thread's continuation, and 1540 // restoring the thread identity to the platform thread. 1541 // The notifications to the VM are necessary in order to coordinate with functions 1542 // (JVMTI mostly) that disable transitions for one or all virtual threads. Starting 1543 // a transition may block if transitions are disabled. Ending a transition may 1544 // notify a thread that is waiting to disable transitions. The notifications are 1545 // also used to post JVMTI events for virtual thread start and end. 1546 1547 @IntrinsicCandidate 1548 @JvmtiMountTransition 1549 private native void endFirstTransition(); 1550 1551 @IntrinsicCandidate 1552 @JvmtiMountTransition 1553 private native void startFinalTransition(); 1554 1555 @IntrinsicCandidate 1556 @JvmtiMountTransition 1557 private native void startTransition(boolean is_mount); 1558 1559 @IntrinsicCandidate 1560 @JvmtiMountTransition 1561 private native void endTransition(boolean is_mount); 1562 1563 @IntrinsicCandidate 1564 private static native void notifyJvmtiDisableSuspend(boolean enter); 1565 1566 private static native void registerNatives(); 1567 static { 1568 registerNatives(); 1569 1570 // ensure VTHREAD_GROUP is created, may be accessed by JVMTI 1571 var group = Thread.virtualThreadGroup(); 1572 1573 // ensure event class is initialized 1574 try { 1575 MethodHandles.lookup().ensureInitialized(VirtualThreadParkEvent.class); 1576 } catch (IllegalAccessException e) { 1577 throw new ExceptionInInitializerError(e); 1578 } 1579 } 1580 1581 /** 1582 * Loads a VirtualThreadScheduler with the given class name. The class must be public 1583 * in an exported package, with public one-arg or no-arg constructor, and be visible 1584 * to the system class loader. 1585 * @param delegate the scheduler that the custom scheduler may delegate to 1586 * @param cn the class name of the custom scheduler 1587 */ 1588 private static VirtualThreadScheduler loadCustomScheduler(VirtualThreadScheduler delegate, String cn) { 1589 VirtualThreadScheduler scheduler; 1590 try { 1591 Class<?> clazz = Class.forName(cn, true, ClassLoader.getSystemClassLoader()); 1592 // 1-arg constructor 1593 try { 1594 Constructor<?> ctor = clazz.getConstructor(VirtualThreadScheduler.class); 1595 return (VirtualThreadScheduler) ctor.newInstance(delegate); 1596 } catch (NoSuchMethodException e) { 1597 // 0-arg constructor 1598 Constructor<?> ctor = clazz.getConstructor(); 1599 scheduler = (VirtualThreadScheduler) ctor.newInstance(); 1600 } 1601 } catch (Exception ex) { 1602 throw new Error(ex); 1603 } 1604 System.err.println("WARNING: Using custom default scheduler, this is an experimental feature!"); 1605 return scheduler; 1606 } 1607 1608 /** 1609 * Creates the built-in ForkJoinPool scheduler. 1610 * @param wrapped true if wrapped by a custom default scheduler 1611 */ 1612 private static BuiltinScheduler createBuiltinScheduler(boolean wrapped) { 1613 int parallelism, maxPoolSize, minRunnable; 1614 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism"); 1615 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize"); 1616 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable"); 1617 if (parallelismValue != null) { 1618 parallelism = Integer.parseInt(parallelismValue); 1619 } else { 1620 parallelism = Runtime.getRuntime().availableProcessors(); 1621 } 1622 if (maxPoolSizeValue != null) { 1623 maxPoolSize = Integer.parseInt(maxPoolSizeValue); 1624 parallelism = Integer.min(parallelism, maxPoolSize); 1625 } else { 1626 maxPoolSize = Integer.max(parallelism, 256); 1627 } 1628 if (minRunnableValue != null) { 1629 minRunnable = Integer.parseInt(minRunnableValue); 1630 } else { 1631 minRunnable = Integer.max(parallelism / 2, 1); 1632 } 1633 return new BuiltinScheduler(parallelism, maxPoolSize, minRunnable, wrapped); 1634 } 1635 1636 /** 1637 * The built-in ForkJoinPool scheduler. 1638 */ 1639 private static class BuiltinScheduler 1640 extends ForkJoinPool implements VirtualThreadScheduler { 1641 1642 BuiltinScheduler(int parallelism, int maxPoolSize, int minRunnable, boolean wrapped) { 1643 ForkJoinWorkerThreadFactory factory = wrapped 1644 ? ForkJoinPool.defaultForkJoinWorkerThreadFactory 1645 : CarrierThread::new; 1646 Thread.UncaughtExceptionHandler handler = (t, e) -> { }; 1647 boolean asyncMode = true; // FIFO 1648 super(parallelism, factory, handler, asyncMode, 1649 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS); 1650 } 1651 1652 private void adaptAndExecute(Runnable task) { 1653 execute(ForkJoinTask.adapt(task)); 1654 } 1655 1656 @Override 1657 public void onStart(VirtualThreadTask task) { 1658 adaptAndExecute(task); 1659 } 1660 1661 @Override 1662 public void onContinue(VirtualThreadTask task) { 1663 adaptAndExecute(task); 1664 } 1665 1666 /** 1667 * Wraps the scheduler to avoid leaking a direct reference with 1668 * {@link VirtualThreadScheduler#current()}. 1669 */ 1670 VirtualThreadScheduler createExternalView() { 1671 BuiltinScheduler builtin = this; 1672 return new VirtualThreadScheduler() { 1673 private void execute(VirtualThreadTask task) { 1674 var vthread = (VirtualThread) task.thread(); 1675 VirtualThreadScheduler scheduler = vthread.scheduler; 1676 if (scheduler == this || scheduler == DEFAULT_SCHEDULER) { 1677 builtin.adaptAndExecute(task); 1678 } else { 1679 throw new IllegalArgumentException(); 1680 } 1681 } 1682 @Override 1683 public void onStart(VirtualThreadTask task) { 1684 execute(task); 1685 } 1686 @Override 1687 public void onContinue(VirtualThreadTask task) { 1688 execute(task); 1689 } 1690 @Override 1691 public String toString() { 1692 return builtin.toString(); 1693 } 1694 }; 1695 } 1696 } 1697 1698 /** 1699 * Schedule a runnable task to run after a delay. 1700 */ 1701 private Future<?> schedule(Runnable command, long delay, TimeUnit unit) { 1702 if (scheduler == BUILTIN_SCHEDULER) { 1703 return BUILTIN_SCHEDULER.schedule(command, delay, unit); 1704 } else { 1705 return DelayedTaskSchedulers.schedule(command, delay, unit); 1706 } 1707 } 1708 1709 /** 1710 * Supports scheduling a runnable task to run after a delay. It uses a number 1711 * of ScheduledThreadPoolExecutor instances to reduce contention on the delayed 1712 * work queue used. This class is used when using a custom scheduler. 1713 */ 1714 private static class DelayedTaskSchedulers { 1715 private static final ScheduledExecutorService[] INSTANCE = createDelayedTaskSchedulers(); 1716 1717 static Future<?> schedule(Runnable command, long delay, TimeUnit unit) { 1718 long tid = Thread.currentThread().threadId(); 1719 int index = (int) tid & (INSTANCE.length - 1); 1720 return INSTANCE[index].schedule(command, delay, unit); 1721 } 1722 1723 private static ScheduledExecutorService[] createDelayedTaskSchedulers() { 1724 String propName = "jdk.virtualThreadScheduler.timerQueues"; 1725 String propValue = System.getProperty(propName); 1726 int queueCount; 1727 if (propValue != null) { 1728 queueCount = Integer.parseInt(propValue); 1729 if (queueCount != Integer.highestOneBit(queueCount)) { 1730 throw new RuntimeException("Value of " + propName + " must be power of 2"); 1731 } 1732 } else { 1733 int ncpus = Runtime.getRuntime().availableProcessors(); 1734 queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1); 1735 } 1736 var schedulers = new ScheduledExecutorService[queueCount]; 1737 for (int i = 0; i < queueCount; i++) { 1738 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) 1739 Executors.newScheduledThreadPool(1, task -> { 1740 Thread t = InnocuousThread.newThread("VirtualThread-unparker", task); 1741 t.setDaemon(true); 1742 return t; 1743 }); 1744 stpe.setRemoveOnCancelPolicy(true); 1745 schedulers[i] = stpe; 1746 } 1747 return schedulers; 1748 } 1749 } 1750 1751 /** 1752 * Schedule virtual threads that are ready to be scheduled after they blocked on 1753 * monitor enter. 1754 */ 1755 private static void unblockVirtualThreads() { 1756 while (true) { 1757 VirtualThread vthread = takeVirtualThreadListToUnblock(); 1758 while (vthread != null) { 1759 assert vthread.onWaitingList; 1760 VirtualThread nextThread = vthread.next; 1761 1762 // remove from list and unblock 1763 vthread.next = null; 1764 boolean changed = vthread.compareAndSetOnWaitingList(true, false); 1765 assert changed; 1766 vthread.unblock(); 1767 1768 vthread = nextThread; 1769 } 1770 } 1771 } 1772 1773 /** 1774 * Retrieves the list of virtual threads that are waiting to be unblocked, waiting 1775 * if necessary until a list of one or more threads becomes available. 1776 */ 1777 private static native VirtualThread takeVirtualThreadListToUnblock(); 1778 1779 static { 1780 var unblocker = InnocuousThread.newThread("VirtualThread-unblocker", 1781 VirtualThread::unblockVirtualThreads); 1782 unblocker.setDaemon(true); 1783 unblocker.start(); 1784 } 1785 } --- EOF ---