1 /* 2 * Copyright (c) 2018, 2026, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.lang; 26 27 import java.lang.invoke.MethodHandles; 28 import java.lang.invoke.VarHandle; 29 import java.lang.reflect.Constructor; 30 import java.util.Locale; 31 import java.util.Objects; 32 import java.util.concurrent.CountDownLatch; 33 import java.util.concurrent.Executors; 34 import java.util.concurrent.ForkJoinPool; 35 import java.util.concurrent.ForkJoinTask; 36 import java.util.concurrent.Future; 37 import java.util.concurrent.LinkedTransferQueue; 38 import java.util.concurrent.RejectedExecutionException; 39 import java.util.concurrent.ScheduledExecutorService; 40 import java.util.concurrent.ScheduledFuture; 41 import java.util.concurrent.ScheduledThreadPoolExecutor; 42 import java.util.concurrent.ThreadFactory; 43 import java.util.concurrent.ThreadPoolExecutor; 44 import java.util.concurrent.TimeUnit; 45 import jdk.internal.event.VirtualThreadEndEvent; 46 import jdk.internal.event.VirtualThreadParkEvent; 47 import jdk.internal.event.VirtualThreadStartEvent; 48 import jdk.internal.event.VirtualThreadSubmitFailedEvent; 49 import jdk.internal.invoke.MhUtil; 50 import jdk.internal.misc.CarrierThread; 51 import jdk.internal.misc.InnocuousThread; 52 import jdk.internal.misc.Unsafe; 53 import jdk.internal.vm.Continuation; 54 import jdk.internal.vm.ContinuationScope; 55 import jdk.internal.vm.StackableScope; 56 import jdk.internal.vm.ThreadContainer; 57 import jdk.internal.vm.ThreadContainers; 58 import jdk.internal.vm.annotation.ChangesCurrentThread; 59 import jdk.internal.vm.annotation.Hidden; 60 import jdk.internal.vm.annotation.IntrinsicCandidate; 61 import jdk.internal.vm.annotation.JvmtiHideEvents; 62 import jdk.internal.vm.annotation.JvmtiMountTransition; 63 import jdk.internal.vm.annotation.ReservedStackAccess; 64 import sun.nio.ch.Interruptible; 65 import static java.util.concurrent.TimeUnit.*; 66 67 /** 68 * A thread that is scheduled by the Java virtual machine rather than the operating system. 69 */ 70 final class VirtualThread extends BaseVirtualThread { 71 private static final Unsafe U = Unsafe.getUnsafe(); 72 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads"); 73 74 private static final VirtualThreadScheduler BUILTIN_SCHEDULER; 75 private static final VirtualThreadScheduler DEFAULT_SCHEDULER; 76 private static final VirtualThreadScheduler EXTERNAL_VIEW; 77 private static final boolean USE_STPE; 78 static { 79 // experimental 80 String propValue = System.getProperty("jdk.virtualThreadScheduler.implClass"); 81 if (propValue != null) { 82 VirtualThreadScheduler builtinScheduler = createBuiltinScheduler(true); 83 VirtualThreadScheduler externalView = createExternalView(builtinScheduler); 84 VirtualThreadScheduler defaultScheduler = loadCustomScheduler(externalView, propValue); 85 BUILTIN_SCHEDULER = builtinScheduler; 86 DEFAULT_SCHEDULER = defaultScheduler; 87 EXTERNAL_VIEW = externalView; 88 } else { 89 var builtinScheduler = createBuiltinScheduler(false); 90 BUILTIN_SCHEDULER = builtinScheduler; 91 DEFAULT_SCHEDULER = builtinScheduler; 92 EXTERNAL_VIEW = createExternalView(builtinScheduler); 93 } 94 USE_STPE = Boolean.getBoolean("jdk.virtualThreadScheduler.useSTPE"); 95 } 96 97 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state"); 98 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit"); 99 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread"); 100 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination"); 101 private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList"); 102 103 // scheduler and continuation 104 private final VirtualThreadScheduler scheduler; 105 private final Continuation cont; 106 private final VirtualThreadTask runContinuation; 107 108 // virtual thread state, accessed by VM 109 private volatile int state; 110 111 /* 112 * Virtual thread state transitions: 113 * 114 * NEW -> STARTED // Thread.start, schedule to run 115 * STARTED -> TERMINATED // failed to start 116 * STARTED -> RUNNING // first run 117 * RUNNING -> TERMINATED // done 118 * 119 * RUNNING -> PARKING // Thread parking with LockSupport.park 120 * PARKING -> PARKED // cont.yield successful, parked indefinitely 121 * PARKED -> UNPARKED // unparked, may be scheduled to continue 122 * UNPARKED -> RUNNING // continue execution after park 123 * 124 * PARKING -> RUNNING // cont.yield failed, need to park on carrier 125 * RUNNING -> PINNED // park on carrier 126 * PINNED -> RUNNING // unparked, continue execution on same carrier 127 * 128 * RUNNING -> TIMED_PARKING // Thread parking with LockSupport.parkNanos 129 * TIMED_PARKING -> TIMED_PARKED // cont.yield successful, timed-parked 130 * TIMED_PARKED -> UNPARKED // unparked, may be scheduled to continue 131 * 132 * TIMED_PARKING -> RUNNING // cont.yield failed, need to park on carrier 133 * RUNNING -> TIMED_PINNED // park on carrier 134 * TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier 135 * 136 * RUNNING -> BLOCKING // blocking on monitor enter 137 * BLOCKING -> BLOCKED // blocked on monitor enter 138 * BLOCKED -> UNBLOCKED // unblocked, may be scheduled to continue 139 * UNBLOCKED -> RUNNING // continue execution after blocked on monitor enter 140 * 141 * RUNNING -> WAITING // transitional state during wait on monitor 142 * WAITING -> WAIT // waiting on monitor 143 * WAIT -> BLOCKED // notified, waiting to be unblocked by monitor owner 144 * WAIT -> UNBLOCKED // interrupted 145 * 146 * RUNNING -> TIMED_WAITING // transition state during timed-waiting on monitor 147 * TIMED_WAITING -> TIMED_WAIT // timed-waiting on monitor 148 * TIMED_WAIT -> BLOCKED // notified, waiting to be unblocked by monitor owner 149 * TIMED_WAIT -> UNBLOCKED // timed-out/interrupted 150 * 151 * RUNNING -> YIELDING // Thread.yield 152 * YIELDING -> YIELDED // cont.yield successful, may be scheduled to continue 153 * YIELDING -> RUNNING // cont.yield failed 154 * YIELDED -> RUNNING // continue execution after Thread.yield 155 */ 156 private static final int NEW = 0; 157 private static final int STARTED = 1; 158 private static final int RUNNING = 2; // runnable-mounted 159 160 // untimed and timed parking 161 private static final int PARKING = 3; 162 private static final int PARKED = 4; // unmounted 163 private static final int PINNED = 5; // mounted 164 private static final int TIMED_PARKING = 6; 165 private static final int TIMED_PARKED = 7; // unmounted 166 private static final int TIMED_PINNED = 8; // mounted 167 private static final int UNPARKED = 9; // unmounted but runnable 168 169 // Thread.yield 170 private static final int YIELDING = 10; 171 private static final int YIELDED = 11; // unmounted but runnable 172 173 // monitor enter 174 private static final int BLOCKING = 12; 175 private static final int BLOCKED = 13; // unmounted 176 private static final int UNBLOCKED = 14; // unmounted but runnable 177 178 // monitor wait/timed-wait 179 private static final int WAITING = 15; 180 private static final int WAIT = 16; // waiting in Object.wait 181 private static final int TIMED_WAITING = 17; 182 private static final int TIMED_WAIT = 18; // waiting in timed-Object.wait 183 184 private static final int TERMINATED = 99; // final state 185 186 // parking permit made available by LockSupport.unpark 187 private volatile boolean parkPermit; 188 189 // blocking permit made available by unblocker thread when another thread exits monitor 190 private volatile boolean blockPermit; 191 192 // true when on the list of virtual threads waiting to be unblocked 193 private volatile boolean onWaitingList; 194 195 // next virtual thread on the list of virtual threads waiting to be unblocked 196 private volatile VirtualThread next; 197 198 // notified by Object.notify/notifyAll while waiting in Object.wait 199 private volatile boolean notified; 200 201 // true when waiting in Object.wait, false for VM internal uninterruptible Object.wait 202 private volatile boolean interruptibleWait; 203 204 // timed-wait support 205 private byte timedWaitSeqNo; 206 207 // timeout for timed-park and timed-wait, only accessed on current/carrier thread 208 private long timeout; 209 210 // timer task for timed-park and timed-wait, only accessed on current/carrier thread 211 private Future<?> timeoutTask; 212 213 // carrier thread when mounted, accessed by VM 214 private volatile Thread carrierThread; 215 216 // termination object when joining, created lazily if needed 217 private volatile CountDownLatch termination; 218 219 /** 220 * Return the built-in scheduler. 221 * @param trusted true if caller is trusted, false if not trusted 222 */ 223 static VirtualThreadScheduler builtinScheduler(boolean trusted) { 224 return trusted ? BUILTIN_SCHEDULER : EXTERNAL_VIEW; 225 } 226 227 /** 228 * Returns the default scheduler, usually the same as the built-in scheduler. 229 */ 230 static VirtualThreadScheduler defaultScheduler() { 231 return DEFAULT_SCHEDULER; 232 } 233 234 /** 235 * Returns the continuation scope used for virtual threads. 236 */ 237 static ContinuationScope continuationScope() { 238 return VTHREAD_SCOPE; 239 } 240 241 /** 242 * Return the scheduler for this thread. 243 * @param trusted true if caller is trusted, false if not trusted 244 */ 245 VirtualThreadScheduler scheduler(boolean trusted) { 246 if (scheduler == BUILTIN_SCHEDULER && !trusted) { 247 return EXTERNAL_VIEW; 248 } else { 249 return scheduler; 250 } 251 } 252 253 /** 254 * Returns the task to start/continue this virtual thread. 255 */ 256 VirtualThreadTask virtualThreadTask() { 257 return runContinuation; 258 } 259 260 /** 261 * Creates a new {@code VirtualThread} to run the given task with the given scheduler. 262 * 263 * @param scheduler the scheduler or null for default scheduler 264 * @param preferredCarrier the preferred carrier or null 265 * @param name thread name 266 * @param characteristics characteristics 267 * @param task the task to execute 268 */ 269 VirtualThread(VirtualThreadScheduler scheduler, 270 Thread preferredCarrier, 271 String name, 272 int characteristics, 273 Runnable task) { 274 super(name, characteristics, /*bound*/ false); 275 Objects.requireNonNull(task); 276 277 // use default scheduler if not provided 278 if (scheduler == null) { 279 scheduler = DEFAULT_SCHEDULER; 280 } else if (scheduler == EXTERNAL_VIEW) { 281 throw new UnsupportedOperationException(); 282 } 283 this.scheduler = scheduler; 284 this.cont = new VThreadContinuation(this, task); 285 286 if (scheduler == BUILTIN_SCHEDULER) { 287 this.runContinuation = new BuiltinSchedulerTask(this); 288 } else { 289 this.runContinuation = new CustomSchedulerTask(this, preferredCarrier); 290 } 291 } 292 293 /** 294 * The task to execute when using the built-in scheduler. 295 */ 296 static final class BuiltinSchedulerTask implements VirtualThreadTask { 297 private final VirtualThread vthread; 298 BuiltinSchedulerTask(VirtualThread vthread) { 299 this.vthread = vthread; 300 } 301 @Override 302 public Thread thread() { 303 return vthread; 304 } 305 @Override 306 public void run() { 307 vthread.runContinuation();; 308 } 309 @Override 310 public Thread preferredCarrier() { 311 throw new UnsupportedOperationException(); 312 } 313 @Override 314 public Object attach(Object att) { 315 throw new UnsupportedOperationException(); 316 } 317 @Override 318 public Object attachment() { 319 throw new UnsupportedOperationException(); 320 } 321 } 322 323 /** 324 * The task to execute when using a custom scheduler. 325 */ 326 static final class CustomSchedulerTask implements VirtualThreadTask { 327 private static final VarHandle ATT = 328 MhUtil.findVarHandle(MethodHandles.lookup(), "att", Object.class); 329 private final VirtualThread vthread; 330 private final Thread preferredCarrier; 331 private volatile Object att; 332 CustomSchedulerTask(VirtualThread vthread, Thread preferredCarrier) { 333 this.vthread = vthread; 334 this.preferredCarrier = preferredCarrier; 335 } 336 @Override 337 public Thread thread() { 338 return vthread; 339 } 340 @Override 341 public void run() { 342 vthread.runContinuation();; 343 } 344 @Override 345 public Thread preferredCarrier() { 346 return preferredCarrier; 347 } 348 @Override 349 public Object attach(Object att) { 350 return ATT.getAndSet(this, att); 351 } 352 @Override 353 public Object attachment() { 354 return att; 355 } 356 } 357 358 /** 359 * The continuation that a virtual thread executes. 360 */ 361 private static class VThreadContinuation extends Continuation { 362 VThreadContinuation(VirtualThread vthread, Runnable task) { 363 super(VTHREAD_SCOPE, wrap(vthread, task)); 364 } 365 @Override 366 protected void onPinned(Continuation.Pinned reason) { 367 } 368 private static Runnable wrap(VirtualThread vthread, Runnable task) { 369 return new Runnable() { 370 @Hidden 371 @JvmtiHideEvents 372 public void run() { 373 vthread.endFirstTransition(); 374 try { 375 vthread.run(task); 376 } finally { 377 vthread.startFinalTransition(); 378 } 379 } 380 }; 381 } 382 } 383 384 /** 385 * Runs or continues execution on the current thread. The virtual thread is mounted 386 * on the current thread before the task runs or continues. It unmounts when the 387 * task completes or yields. 388 */ 389 @ChangesCurrentThread // allow mount/unmount to be inlined 390 private void runContinuation() { 391 // the carrier must be a platform thread 392 if (Thread.currentThread().isVirtual()) { 393 throw new WrongThreadException(); 394 } 395 396 // set state to RUNNING 397 int initialState = state(); 398 if (initialState == STARTED || initialState == UNPARKED 399 || initialState == UNBLOCKED || initialState == YIELDED) { 400 // newly started or continue after parking/blocking/Thread.yield 401 if (!compareAndSetState(initialState, RUNNING)) { 402 return; 403 } 404 // consume permit when continuing after parking or blocking. If continue 405 // after a timed-park or timed-wait then the timeout task is cancelled. 406 if (initialState == UNPARKED) { 407 cancelTimeoutTask(); 408 setParkPermit(false); 409 } else if (initialState == UNBLOCKED) { 410 cancelTimeoutTask(); 411 blockPermit = false; 412 } 413 } else { 414 // not runnable 415 return; 416 } 417 418 mount(); 419 try { 420 cont.run(); 421 } finally { 422 unmount(); 423 if (cont.isDone()) { 424 afterDone(); 425 } else { 426 afterYield(); 427 } 428 } 429 } 430 431 /** 432 * Cancel timeout task when continuing after timed-park or timed-wait. 433 * The timeout task may be executing, or may have already completed. 434 */ 435 private void cancelTimeoutTask() { 436 if (timeoutTask != null) { 437 timeoutTask.cancel(false); 438 timeoutTask = null; 439 } 440 } 441 442 /** 443 * Submits the runContinuation task to the scheduler. For the built-in scheduler, 444 * the task will be pushed to the local queue if possible, otherwise it will be 445 * pushed to an external submission queue. 446 * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown 447 * @throws RejectedExecutionException 448 */ 449 private void submitRunContinuation(boolean retryOnOOME) { 450 boolean done = false; 451 while (!done) { 452 try { 453 // Pin the continuation to prevent the virtual thread from unmounting 454 // when submitting a task. For the default scheduler this ensures that 455 // the carrier doesn't change when pushing a task. For other schedulers 456 // it avoids deadlock that could arise due to carriers and virtual 457 // threads contending for a lock. 458 if (currentThread().isVirtual()) { 459 Continuation.pin(); 460 try { 461 scheduler.onContinue(runContinuation); 462 } finally { 463 Continuation.unpin(); 464 } 465 } else { 466 scheduler.onContinue(runContinuation); 467 } 468 done = true; 469 } catch (RejectedExecutionException ree) { 470 submitFailed(ree); 471 throw ree; 472 } catch (OutOfMemoryError e) { 473 if (retryOnOOME) { 474 U.park(false, 100_000_000); // 100ms 475 } else { 476 throw e; 477 } 478 } 479 } 480 } 481 482 /** 483 * Submits the runContinuation task to the scheduler. For the default scheduler, 484 * and calling it on a worker thread, the task will be pushed to the local queue, 485 * otherwise it will be pushed to an external submission queue. 486 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 487 * @throws RejectedExecutionException 488 */ 489 private void submitRunContinuation() { 490 submitRunContinuation(true); 491 } 492 493 /** 494 * Invoked from a carrier thread to lazy submit the runContinuation task to the 495 * carrier's local queue if the queue is empty. If not empty, or invoked by a thread 496 * for a custom scheduler, then it just submits the task to the scheduler. 497 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 498 * @throws RejectedExecutionException 499 * @see ForkJoinPool#lazySubmit(ForkJoinTask) 500 */ 501 private void lazySubmitRunContinuation() { 502 assert !currentThread().isVirtual(); 503 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 504 try { 505 ct.getPool().lazySubmit(ForkJoinTask.adapt(runContinuation)); 506 } catch (RejectedExecutionException ree) { 507 submitFailed(ree); 508 throw ree; 509 } catch (OutOfMemoryError e) { 510 submitRunContinuation(); 511 } 512 } else { 513 submitRunContinuation(); 514 } 515 } 516 517 /** 518 * Invoked from a carrier thread to externally submit the runContinuation task to the 519 * scheduler. If invoked by a thread for a custom scheduler, then it just submits the 520 * task to the scheduler. 521 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 522 * @throws RejectedExecutionException 523 * @see ForkJoinPool#externalSubmit(ForkJoinTask) 524 */ 525 private void externalSubmitRunContinuation() { 526 assert !currentThread().isVirtual(); 527 if (currentThread() instanceof CarrierThread ct) { 528 try { 529 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation)); 530 } catch (RejectedExecutionException ree) { 531 submitFailed(ree); 532 throw ree; 533 } catch (OutOfMemoryError e) { 534 submitRunContinuation(); 535 } 536 } else { 537 submitRunContinuation(); 538 } 539 } 540 541 /** 542 * Invoked from Thread.start to externally submit the runContinuation task to the 543 * scheduler. If this virtual thread is scheduled by the built-in scheduler, 544 * and this method is called from a virtual thread scheduled by the built-in 545 * scheduler, then it uses externalSubmit to ensure that the task is pushed to an 546 * external submission queue rather than the local queue. 547 * @throws RejectedExecutionException 548 * @throws OutOfMemoryError 549 * @see ForkJoinPool#externalSubmit(ForkJoinTask) 550 */ 551 private void externalSubmitRunContinuationOrThrow() { 552 try { 553 if (currentThread().isVirtual()) { 554 // Pin the continuation to prevent the virtual thread from unmounting 555 // when submitting a task. This avoids deadlock that could arise due to 556 // carriers and virtual threads contending for a lock. 557 Continuation.pin(); 558 try { 559 if (scheduler == BUILTIN_SCHEDULER 560 && currentCarrierThread() instanceof CarrierThread ct) { 561 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation)); 562 } else { 563 scheduler.onStart(runContinuation); 564 } 565 } finally { 566 Continuation.unpin(); 567 } 568 } else { 569 scheduler.onStart(runContinuation); 570 } 571 } catch (RejectedExecutionException ree) { 572 submitFailed(ree); 573 throw ree; 574 } 575 } 576 577 /** 578 * If enabled, emits a JFR VirtualThreadSubmitFailedEvent. 579 */ 580 private void submitFailed(RejectedExecutionException ree) { 581 var event = new VirtualThreadSubmitFailedEvent(); 582 if (event.isEnabled()) { 583 event.javaThreadId = threadId(); 584 event.exceptionMessage = ree.getMessage(); 585 event.commit(); 586 } 587 } 588 589 /** 590 * Runs a task in the context of this virtual thread. 591 */ 592 private void run(Runnable task) { 593 assert Thread.currentThread() == this && state == RUNNING; 594 595 // emit JFR event if enabled 596 if (VirtualThreadStartEvent.isTurnedOn()) { 597 var event = new VirtualThreadStartEvent(); 598 event.javaThreadId = threadId(); 599 event.commit(); 600 } 601 602 Object bindings = Thread.scopedValueBindings(); 603 try { 604 runWith(bindings, task); 605 } catch (Throwable exc) { 606 dispatchUncaughtException(exc); 607 } finally { 608 // pop any remaining scopes from the stack, this may block 609 StackableScope.popAll(); 610 611 // emit JFR event if enabled 612 if (VirtualThreadEndEvent.isTurnedOn()) { 613 var event = new VirtualThreadEndEvent(); 614 event.javaThreadId = threadId(); 615 event.commit(); 616 } 617 } 618 } 619 620 /** 621 * Mounts this virtual thread onto the current platform thread. On 622 * return, the current thread is the virtual thread. 623 */ 624 @ChangesCurrentThread 625 @ReservedStackAccess 626 private void mount() { 627 startTransition(/*mount*/true); 628 // We assume following volatile accesses provide equivalent 629 // of acquire ordering, otherwise we need U.loadFence() here. 630 631 // sets the carrier thread 632 Thread carrier = Thread.currentCarrierThread(); 633 setCarrierThread(carrier); 634 635 // sync up carrier thread interrupted status if needed 636 if (interrupted) { 637 carrier.setInterrupt(); 638 } else if (carrier.isInterrupted()) { 639 synchronized (interruptLock) { 640 // need to recheck interrupted status 641 if (!interrupted) { 642 carrier.clearInterrupt(); 643 } 644 } 645 } 646 647 // set Thread.currentThread() to return this virtual thread 648 carrier.setCurrentThread(this); 649 } 650 651 /** 652 * Unmounts this virtual thread from the carrier. On return, the 653 * current thread is the current platform thread. 654 */ 655 @ChangesCurrentThread 656 @ReservedStackAccess 657 private void unmount() { 658 assert !Thread.holdsLock(interruptLock); 659 660 // set Thread.currentThread() to return the platform thread 661 Thread carrier = this.carrierThread; 662 carrier.setCurrentThread(carrier); 663 664 // break connection to carrier thread, synchronized with interrupt 665 synchronized (interruptLock) { 666 setCarrierThread(null); 667 } 668 carrier.clearInterrupt(); 669 670 // We assume previous volatile accesses provide equivalent 671 // of release ordering, otherwise we need U.storeFence() here. 672 endTransition(/*mount*/false); 673 } 674 675 /** 676 * Invokes Continuation.yield, notifying JVMTI (if enabled) to hide frames until 677 * the continuation continues. 678 */ 679 @Hidden 680 private boolean yieldContinuation() { 681 startTransition(/*mount*/false); 682 try { 683 return Continuation.yield(VTHREAD_SCOPE); 684 } finally { 685 endTransition(/*mount*/true); 686 } 687 } 688 689 /** 690 * Invoked in the context of the carrier thread after the Continuation yields when 691 * parking, blocking on monitor enter, Object.wait, or Thread.yield. 692 */ 693 private void afterYield() { 694 assert carrierThread == null; 695 696 // re-adjust parallelism if the virtual thread yielded when compensating 697 if (currentThread() instanceof CarrierThread ct) { 698 ct.endBlocking(); 699 } 700 701 int s = state(); 702 703 // LockSupport.park/parkNanos 704 if (s == PARKING || s == TIMED_PARKING) { 705 int newState; 706 if (s == PARKING) { 707 setState(newState = PARKED); 708 } else { 709 // schedule unpark 710 long timeout = this.timeout; 711 assert timeout > 0; 712 timeoutTask = schedule(this::parkTimeoutExpired, timeout, NANOSECONDS); 713 setState(newState = TIMED_PARKED); 714 } 715 716 // may have been unparked while parking 717 if (parkPermit && compareAndSetState(newState, UNPARKED)) { 718 // lazy submit if local queue is empty 719 lazySubmitRunContinuation(); 720 } 721 return; 722 } 723 724 // Thread.yield 725 if (s == YIELDING) { 726 setState(YIELDED); 727 728 // external submit if there are no tasks in the local task queue 729 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 730 externalSubmitRunContinuation(); 731 } else { 732 submitRunContinuation(); 733 } 734 return; 735 } 736 737 // blocking on monitorenter 738 if (s == BLOCKING) { 739 setState(BLOCKED); 740 741 // may have been unblocked while blocking 742 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) { 743 // lazy submit if local queue is empty 744 lazySubmitRunContinuation(); 745 } 746 return; 747 } 748 749 // Object.wait 750 if (s == WAITING || s == TIMED_WAITING) { 751 int newState; 752 boolean blocked; 753 boolean interruptible = interruptibleWait; 754 if (s == WAITING) { 755 setState(newState = WAIT); 756 // may have been notified while in transition 757 blocked = notified && compareAndSetState(WAIT, BLOCKED); 758 } else { 759 // For timed-wait, a timeout task is scheduled to execute. The timeout 760 // task will change the thread state to UNBLOCKED and submit the thread 761 // to the scheduler. A sequence number is used to ensure that the timeout 762 // task only unblocks the thread for this timed-wait. We synchronize with 763 // the timeout task to coordinate access to the sequence number and to 764 // ensure the timeout task doesn't execute until the thread has got to 765 // the TIMED_WAIT state. 766 long timeout = this.timeout; 767 assert timeout > 0; 768 synchronized (timedWaitLock()) { 769 byte seqNo = ++timedWaitSeqNo; 770 timeoutTask = schedule(() -> waitTimeoutExpired(seqNo), timeout, MILLISECONDS); 771 setState(newState = TIMED_WAIT); 772 // May have been notified while in transition. This must be done while 773 // holding the monitor to avoid changing the state of a new timed wait call. 774 blocked = notified && compareAndSetState(TIMED_WAIT, BLOCKED); 775 } 776 } 777 778 if (blocked) { 779 // may have been unblocked already 780 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) { 781 lazySubmitRunContinuation(); 782 } 783 } else { 784 // may have been interrupted while in transition to wait state 785 if (interruptible && interrupted && compareAndSetState(newState, UNBLOCKED)) { 786 lazySubmitRunContinuation(); 787 } 788 } 789 return; 790 } 791 792 assert false; 793 } 794 795 /** 796 * Invoked after the continuation completes. 797 */ 798 private void afterDone() { 799 afterDone(true); 800 } 801 802 /** 803 * Invoked after the continuation completes (or start failed). Sets the thread 804 * state to TERMINATED and notifies anyone waiting for the thread to terminate. 805 * 806 * @param notifyContainer true if its container should be notified 807 */ 808 private void afterDone(boolean notifyContainer) { 809 assert carrierThread == null; 810 setState(TERMINATED); 811 812 // notify anyone waiting for this virtual thread to terminate 813 CountDownLatch termination = this.termination; 814 if (termination != null) { 815 assert termination.getCount() == 1; 816 termination.countDown(); 817 } 818 819 // notify container 820 if (notifyContainer) { 821 threadContainer().remove(this); 822 } 823 824 // clear references to thread locals 825 clearReferences(); 826 } 827 828 /** 829 * Schedules this {@code VirtualThread} to execute. 830 * 831 * @throws IllegalStateException if the container is shutdown or closed 832 * @throws IllegalThreadStateException if the thread has already been started 833 * @throws RejectedExecutionException if the scheduler cannot accept a task 834 */ 835 @Override 836 void start(ThreadContainer container) { 837 if (!compareAndSetState(NEW, STARTED)) { 838 throw new IllegalThreadStateException("Already started"); 839 } 840 841 // bind thread to container 842 assert threadContainer() == null; 843 setThreadContainer(container); 844 845 // start thread 846 boolean addedToContainer = false; 847 boolean started = false; 848 try { 849 container.add(this); // may throw 850 addedToContainer = true; 851 852 // scoped values may be inherited 853 inheritScopedValueBindings(container); 854 855 // submit task to run thread, using externalSubmit if possible 856 externalSubmitRunContinuationOrThrow(); 857 started = true; 858 } finally { 859 if (!started) { 860 afterDone(addedToContainer); 861 } 862 } 863 } 864 865 @Override 866 public void start() { 867 start(ThreadContainers.root()); 868 } 869 870 @Override 871 public void run() { 872 // do nothing 873 } 874 875 /** 876 * Parks until unparked or interrupted. If already unparked then the parking 877 * permit is consumed and this method completes immediately (meaning it doesn't 878 * yield). It also completes immediately if the interrupted status is set. 879 */ 880 @Override 881 void park() { 882 assert Thread.currentThread() == this; 883 884 // complete immediately if parking permit available or interrupted 885 if (getAndSetParkPermit(false) || interrupted) 886 return; 887 888 // park the thread 889 boolean yielded = false; 890 long eventStartTime = VirtualThreadParkEvent.eventStartTime(); 891 setState(PARKING); 892 try { 893 yielded = yieldContinuation(); 894 } catch (OutOfMemoryError e) { 895 // park on carrier 896 } finally { 897 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 898 if (yielded) { 899 VirtualThreadParkEvent.offer(eventStartTime, Long.MIN_VALUE); 900 } else { 901 assert state() == PARKING; 902 setState(RUNNING); 903 } 904 } 905 906 // park on the carrier thread when pinned 907 if (!yielded) { 908 parkOnCarrierThread(false, 0); 909 } 910 } 911 912 /** 913 * Parks up to the given waiting time or until unparked or interrupted. 914 * If already unparked then the parking permit is consumed and this method 915 * completes immediately (meaning it doesn't yield). It also completes immediately 916 * if the interrupted status is set or the waiting time is {@code <= 0}. 917 * 918 * @param nanos the maximum number of nanoseconds to wait. 919 */ 920 @Override 921 void parkNanos(long nanos) { 922 assert Thread.currentThread() == this; 923 924 // complete immediately if parking permit available or interrupted 925 if (getAndSetParkPermit(false) || interrupted) 926 return; 927 928 // park the thread for the waiting time 929 if (nanos > 0) { 930 long startTime = System.nanoTime(); 931 932 // park the thread, afterYield will schedule the thread to unpark 933 boolean yielded = false; 934 long eventStartTime = VirtualThreadParkEvent.eventStartTime(); 935 timeout = nanos; 936 setState(TIMED_PARKING); 937 try { 938 yielded = yieldContinuation(); 939 } catch (OutOfMemoryError e) { 940 // park on carrier 941 } finally { 942 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 943 if (yielded) { 944 VirtualThreadParkEvent.offer(eventStartTime, nanos); 945 } else { 946 assert state() == TIMED_PARKING; 947 setState(RUNNING); 948 } 949 } 950 951 // park on carrier thread for remaining time when pinned (or OOME) 952 if (!yielded) { 953 long remainingNanos = nanos - (System.nanoTime() - startTime); 954 parkOnCarrierThread(true, remainingNanos); 955 } 956 } 957 } 958 959 /** 960 * Parks the current carrier thread up to the given waiting time or until 961 * unparked or interrupted. If the virtual thread is interrupted then the 962 * interrupted status will be propagated to the carrier thread. 963 * @param timed true for a timed park, false for untimed 964 * @param nanos the waiting time in nanoseconds 965 */ 966 private void parkOnCarrierThread(boolean timed, long nanos) { 967 assert state() == RUNNING; 968 969 setState(timed ? TIMED_PINNED : PINNED); 970 try { 971 if (!parkPermit) { 972 if (!timed) { 973 U.park(false, 0); 974 } else if (nanos > 0) { 975 U.park(false, nanos); 976 } 977 } 978 } finally { 979 setState(RUNNING); 980 } 981 982 // consume parking permit 983 setParkPermit(false); 984 985 // JFR jdk.VirtualThreadPinned event 986 postPinnedEvent("LockSupport.park"); 987 } 988 989 /** 990 * Call into VM when pinned to record a JFR jdk.VirtualThreadPinned event. 991 * Recording the event in the VM avoids having JFR event recorded in Java 992 * with the same name, but different ID, to events recorded by the VM. 993 */ 994 @Hidden 995 private static native void postPinnedEvent(String op); 996 997 /** 998 * Re-enables this virtual thread for scheduling. If this virtual thread is parked 999 * then its task is scheduled to continue, otherwise its next call to {@code park} or 1000 * {@linkplain #parkNanos(long) parkNanos} is guaranteed not to block. 1001 * @param lazySubmit to use lazySubmit if possible 1002 * @throws RejectedExecutionException if the scheduler cannot accept a task 1003 */ 1004 private void unpark(boolean lazySubmit) { 1005 if (!getAndSetParkPermit(true) && currentThread() != this) { 1006 int s = state(); 1007 1008 // unparked while parked 1009 if ((s == PARKED || s == TIMED_PARKED) && compareAndSetState(s, UNPARKED)) { 1010 if (lazySubmit) { 1011 lazySubmitRunContinuation(); 1012 } else { 1013 submitRunContinuation(); 1014 } 1015 return; 1016 } 1017 1018 // unparked while parked when pinned 1019 if (s == PINNED || s == TIMED_PINNED) { 1020 // unpark carrier thread when pinned 1021 disableSuspendAndPreempt(); 1022 try { 1023 synchronized (carrierThreadAccessLock()) { 1024 Thread carrier = carrierThread; 1025 if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) { 1026 U.unpark(carrier); 1027 } 1028 } 1029 } finally { 1030 enableSuspendAndPreempt(); 1031 } 1032 return; 1033 } 1034 } 1035 } 1036 1037 @Override 1038 void unpark() { 1039 unpark(false); 1040 } 1041 1042 /** 1043 * Invoked by unblocker thread to unblock this virtual thread. 1044 */ 1045 private void unblock() { 1046 assert !Thread.currentThread().isVirtual(); 1047 blockPermit = true; 1048 if (state() == BLOCKED && compareAndSetState(BLOCKED, UNBLOCKED)) { 1049 submitRunContinuation(); 1050 } 1051 } 1052 1053 /** 1054 * Invoked by FJP worker thread or STPE thread when park timeout expires. 1055 */ 1056 private void parkTimeoutExpired() { 1057 assert !VirtualThread.currentThread().isVirtual(); 1058 unpark(true); 1059 } 1060 1061 /** 1062 * Invoked by FJP worker thread or STPE thread when wait timeout expires. 1063 * If the virtual thread is in timed-wait then this method will unblock the thread 1064 * and submit its task so that it continues and attempts to reenter the monitor. 1065 * This method does nothing if the thread has been woken by notify or interrupt. 1066 */ 1067 private void waitTimeoutExpired(byte seqNo) { 1068 assert !Thread.currentThread().isVirtual(); 1069 1070 synchronized (timedWaitLock()) { 1071 if (seqNo != timedWaitSeqNo) { 1072 // this timeout task is for a past timed-wait 1073 return; 1074 } 1075 if (!compareAndSetState(TIMED_WAIT, UNBLOCKED)) { 1076 // already unblocked 1077 return; 1078 } 1079 } 1080 1081 lazySubmitRunContinuation(); 1082 } 1083 1084 /** 1085 * Attempts to yield the current virtual thread (Thread.yield). 1086 */ 1087 void tryYield() { 1088 assert Thread.currentThread() == this; 1089 setState(YIELDING); 1090 boolean yielded = false; 1091 try { 1092 yielded = yieldContinuation(); // may throw 1093 } finally { 1094 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 1095 if (!yielded) { 1096 assert state() == YIELDING; 1097 setState(RUNNING); 1098 } 1099 } 1100 } 1101 1102 /** 1103 * Sleep the current thread for the given sleep time (in nanoseconds). If 1104 * nanos is 0 then the thread will attempt to yield. 1105 * 1106 * @implNote This implementation parks the thread for the given sleeping time 1107 * and will therefore be observed in PARKED state during the sleep. Parking 1108 * will consume the parking permit so this method makes available the parking 1109 * permit after the sleep. This may be observed as a spurious, but benign, 1110 * wakeup when the thread subsequently attempts to park. 1111 * 1112 * @param nanos the maximum number of nanoseconds to sleep 1113 * @throws InterruptedException if interrupted while sleeping 1114 */ 1115 void sleepNanos(long nanos) throws InterruptedException { 1116 assert Thread.currentThread() == this && nanos >= 0; 1117 if (getAndClearInterrupt()) 1118 throw new InterruptedException(); 1119 if (nanos == 0) { 1120 tryYield(); 1121 } else { 1122 // park for the sleep time 1123 try { 1124 long remainingNanos = nanos; 1125 long startNanos = System.nanoTime(); 1126 while (remainingNanos > 0) { 1127 parkNanos(remainingNanos); 1128 if (getAndClearInterrupt()) { 1129 throw new InterruptedException(); 1130 } 1131 remainingNanos = nanos - (System.nanoTime() - startNanos); 1132 } 1133 } finally { 1134 // may have been unparked while sleeping 1135 setParkPermit(true); 1136 } 1137 } 1138 } 1139 1140 /** 1141 * Waits up to {@code nanos} nanoseconds for this virtual thread to terminate. 1142 * A timeout of {@code 0} means to wait forever. 1143 * 1144 * @throws InterruptedException if interrupted while waiting 1145 * @return true if the thread has terminated 1146 */ 1147 boolean joinNanos(long nanos) throws InterruptedException { 1148 if (state() == TERMINATED) 1149 return true; 1150 1151 // ensure termination object exists, then re-check state 1152 CountDownLatch termination = getTermination(); 1153 if (state() == TERMINATED) 1154 return true; 1155 1156 // wait for virtual thread to terminate 1157 if (nanos == 0) { 1158 termination.await(); 1159 } else { 1160 boolean terminated = termination.await(nanos, NANOSECONDS); 1161 if (!terminated) { 1162 // waiting time elapsed 1163 return false; 1164 } 1165 } 1166 assert state() == TERMINATED; 1167 return true; 1168 } 1169 1170 @Override 1171 void blockedOn(Interruptible b) { 1172 disableSuspendAndPreempt(); 1173 try { 1174 super.blockedOn(b); 1175 } finally { 1176 enableSuspendAndPreempt(); 1177 } 1178 } 1179 1180 @Override 1181 public void interrupt() { 1182 if (Thread.currentThread() != this) { 1183 // if current thread is a virtual thread then prevent it from being 1184 // suspended or unmounted when entering or holding interruptLock 1185 Interruptible blocker; 1186 disableSuspendAndPreempt(); 1187 try { 1188 synchronized (interruptLock) { 1189 interrupted = true; 1190 blocker = nioBlocker(); 1191 if (blocker != null) { 1192 blocker.interrupt(this); 1193 } 1194 1195 // interrupt carrier thread if mounted 1196 Thread carrier = carrierThread; 1197 if (carrier != null) carrier.setInterrupt(); 1198 } 1199 } finally { 1200 enableSuspendAndPreempt(); 1201 } 1202 1203 // notify blocker after releasing interruptLock 1204 if (blocker != null) { 1205 blocker.postInterrupt(); 1206 } 1207 1208 // make available parking permit, unpark thread if parked 1209 unpark(); 1210 1211 // if thread is waiting in Object.wait then schedule to try to reenter 1212 int s = state(); 1213 if ((s == WAIT || s == TIMED_WAIT) && compareAndSetState(s, UNBLOCKED)) { 1214 submitRunContinuation(); 1215 } 1216 1217 } else { 1218 interrupted = true; 1219 carrierThread.setInterrupt(); 1220 setParkPermit(true); 1221 } 1222 } 1223 1224 @Override 1225 public boolean isInterrupted() { 1226 return interrupted; 1227 } 1228 1229 @Override 1230 boolean getAndClearInterrupt() { 1231 assert Thread.currentThread() == this; 1232 boolean oldValue = interrupted; 1233 if (oldValue) { 1234 disableSuspendAndPreempt(); 1235 try { 1236 synchronized (interruptLock) { 1237 interrupted = false; 1238 carrierThread.clearInterrupt(); 1239 } 1240 } finally { 1241 enableSuspendAndPreempt(); 1242 } 1243 } 1244 return oldValue; 1245 } 1246 1247 @Override 1248 Thread.State threadState() { 1249 switch (state()) { 1250 case NEW: 1251 return Thread.State.NEW; 1252 case STARTED: 1253 // return NEW if thread container not yet set 1254 if (threadContainer() == null) { 1255 return Thread.State.NEW; 1256 } else { 1257 return Thread.State.RUNNABLE; 1258 } 1259 case UNPARKED: 1260 case UNBLOCKED: 1261 case YIELDED: 1262 // runnable, not mounted 1263 return Thread.State.RUNNABLE; 1264 case RUNNING: 1265 // if mounted then return state of carrier thread 1266 if (Thread.currentThread() != this) { 1267 disableSuspendAndPreempt(); 1268 try { 1269 synchronized (carrierThreadAccessLock()) { 1270 Thread carrierThread = this.carrierThread; 1271 if (carrierThread != null) { 1272 return carrierThread.threadState(); 1273 } 1274 } 1275 } finally { 1276 enableSuspendAndPreempt(); 1277 } 1278 } 1279 // runnable, mounted 1280 return Thread.State.RUNNABLE; 1281 case PARKING: 1282 case TIMED_PARKING: 1283 case WAITING: 1284 case TIMED_WAITING: 1285 case YIELDING: 1286 // runnable, in transition 1287 return Thread.State.RUNNABLE; 1288 case PARKED: 1289 case PINNED: 1290 case WAIT: 1291 return Thread.State.WAITING; 1292 case TIMED_PARKED: 1293 case TIMED_PINNED: 1294 case TIMED_WAIT: 1295 return Thread.State.TIMED_WAITING; 1296 case BLOCKING: 1297 case BLOCKED: 1298 return Thread.State.BLOCKED; 1299 case TERMINATED: 1300 return Thread.State.TERMINATED; 1301 default: 1302 throw new InternalError(); 1303 } 1304 } 1305 1306 @Override 1307 boolean alive() { 1308 int s = state; 1309 return (s != NEW && s != TERMINATED); 1310 } 1311 1312 @Override 1313 boolean isTerminated() { 1314 return (state == TERMINATED); 1315 } 1316 1317 @Override 1318 public String toString() { 1319 StringBuilder sb = new StringBuilder("VirtualThread[#"); 1320 sb.append(threadId()); 1321 String name = getName(); 1322 if (!name.isEmpty()) { 1323 sb.append(","); 1324 sb.append(name); 1325 } 1326 sb.append("]/"); 1327 1328 // add the carrier state and thread name when mounted 1329 boolean mounted; 1330 if (Thread.currentThread() == this) { 1331 mounted = appendCarrierInfo(sb); 1332 } else { 1333 disableSuspendAndPreempt(); 1334 try { 1335 synchronized (carrierThreadAccessLock()) { 1336 mounted = appendCarrierInfo(sb); 1337 } 1338 } finally { 1339 enableSuspendAndPreempt(); 1340 } 1341 } 1342 1343 // add virtual thread state when not mounted 1344 if (!mounted) { 1345 String stateAsString = threadState().toString(); 1346 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1347 } 1348 1349 return sb.toString(); 1350 } 1351 1352 /** 1353 * Appends the carrier state and thread name to the string buffer if mounted. 1354 * @return true if mounted, false if not mounted 1355 */ 1356 private boolean appendCarrierInfo(StringBuilder sb) { 1357 assert Thread.currentThread() == this || Thread.holdsLock(carrierThreadAccessLock()); 1358 Thread carrier = carrierThread; 1359 if (carrier != null) { 1360 String stateAsString = carrier.threadState().toString(); 1361 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1362 sb.append('@'); 1363 sb.append(carrier.getName()); 1364 return true; 1365 } else { 1366 return false; 1367 } 1368 } 1369 1370 @Override 1371 public int hashCode() { 1372 return (int) threadId(); 1373 } 1374 1375 @Override 1376 public boolean equals(Object obj) { 1377 return obj == this; 1378 } 1379 1380 /** 1381 * Returns the termination object, creating it if needed. 1382 */ 1383 private CountDownLatch getTermination() { 1384 CountDownLatch termination = this.termination; 1385 if (termination == null) { 1386 termination = new CountDownLatch(1); 1387 if (!U.compareAndSetReference(this, TERMINATION, null, termination)) { 1388 termination = this.termination; 1389 } 1390 } 1391 return termination; 1392 } 1393 1394 /** 1395 * Returns the lock object to synchronize on when accessing carrierThread. 1396 * The lock prevents carrierThread from being reset to null during unmount. 1397 */ 1398 private Object carrierThreadAccessLock() { 1399 // return interruptLock as unmount has to coordinate with interrupt 1400 return interruptLock; 1401 } 1402 1403 /** 1404 * Returns a lock object for coordinating timed-wait setup and timeout handling. 1405 */ 1406 private Object timedWaitLock() { 1407 // use this object for now to avoid the overhead of introducing another lock 1408 return runContinuation; 1409 } 1410 1411 /** 1412 * Disallow the current thread be suspended or preempted. 1413 */ 1414 private void disableSuspendAndPreempt() { 1415 notifyJvmtiDisableSuspend(true); 1416 Continuation.pin(); 1417 } 1418 1419 /** 1420 * Allow the current thread be suspended or preempted. 1421 */ 1422 private void enableSuspendAndPreempt() { 1423 Continuation.unpin(); 1424 notifyJvmtiDisableSuspend(false); 1425 } 1426 1427 // -- wrappers for get/set of state, parking permit, and carrier thread -- 1428 1429 private int state() { 1430 return state; // volatile read 1431 } 1432 1433 private void setState(int newValue) { 1434 state = newValue; // volatile write 1435 } 1436 1437 private boolean compareAndSetState(int expectedValue, int newValue) { 1438 return U.compareAndSetInt(this, STATE, expectedValue, newValue); 1439 } 1440 1441 private boolean compareAndSetOnWaitingList(boolean expectedValue, boolean newValue) { 1442 return U.compareAndSetBoolean(this, ON_WAITING_LIST, expectedValue, newValue); 1443 } 1444 1445 private void setParkPermit(boolean newValue) { 1446 if (parkPermit != newValue) { 1447 parkPermit = newValue; 1448 } 1449 } 1450 1451 private boolean getAndSetParkPermit(boolean newValue) { 1452 if (parkPermit != newValue) { 1453 return U.getAndSetBoolean(this, PARK_PERMIT, newValue); 1454 } else { 1455 return newValue; 1456 } 1457 } 1458 1459 private void setCarrierThread(Thread carrier) { 1460 // U.putReferenceRelease(this, CARRIER_THREAD, carrier); 1461 this.carrierThread = carrier; 1462 } 1463 1464 // The following four methods notify the VM when a "transition" starts and ends. 1465 // A "mount transition" embodies the steps to transfer control from a platform 1466 // thread to a virtual thread, changing the thread identity, and starting or 1467 // resuming the virtual thread's continuation on the carrier. 1468 // An "unmount transition" embodies the steps to transfer control from a virtual 1469 // thread to its carrier, suspending the virtual thread's continuation, and 1470 // restoring the thread identity to the platform thread. 1471 // The notifications to the VM are necessary in order to coordinate with functions 1472 // (JVMTI mostly) that disable transitions for one or all virtual threads. Starting 1473 // a transition may block if transitions are disabled. Ending a transition may 1474 // notify a thread that is waiting to disable transitions. The notifications are 1475 // also used to post JVMTI events for virtual thread start and end. 1476 1477 @IntrinsicCandidate 1478 @JvmtiMountTransition 1479 private native void endFirstTransition(); 1480 1481 @IntrinsicCandidate 1482 @JvmtiMountTransition 1483 private native void startFinalTransition(); 1484 1485 @IntrinsicCandidate 1486 @JvmtiMountTransition 1487 private native void startTransition(boolean mount); 1488 1489 @IntrinsicCandidate 1490 @JvmtiMountTransition 1491 private native void endTransition(boolean mount); 1492 1493 @IntrinsicCandidate 1494 private static native void notifyJvmtiDisableSuspend(boolean enter); 1495 1496 private static native void registerNatives(); 1497 static { 1498 registerNatives(); 1499 1500 // ensure VTHREAD_GROUP is created, may be accessed by JVMTI 1501 var group = Thread.virtualThreadGroup(); 1502 1503 // ensure event class is initialized 1504 try { 1505 MethodHandles.lookup().ensureInitialized(VirtualThreadParkEvent.class); 1506 } catch (IllegalAccessException e) { 1507 throw new ExceptionInInitializerError(e); 1508 } 1509 } 1510 1511 /** 1512 * Loads a VirtualThreadScheduler with the given class name. The class must be public 1513 * in an exported package, with public one-arg or no-arg constructor, and be visible 1514 * to the system class loader. 1515 * @param delegate the scheduler that the custom scheduler may delegate to 1516 * @param cn the class name of the custom scheduler 1517 */ 1518 private static VirtualThreadScheduler loadCustomScheduler(VirtualThreadScheduler delegate, String cn) { 1519 VirtualThreadScheduler scheduler; 1520 try { 1521 Class<?> clazz = Class.forName(cn, true, ClassLoader.getSystemClassLoader()); 1522 // 1-arg constructor 1523 try { 1524 Constructor<?> ctor = clazz.getConstructor(VirtualThreadScheduler.class); 1525 return (VirtualThreadScheduler) ctor.newInstance(delegate); 1526 } catch (NoSuchMethodException e) { 1527 // 0-arg constructor 1528 Constructor<?> ctor = clazz.getConstructor(); 1529 scheduler = (VirtualThreadScheduler) ctor.newInstance(); 1530 } 1531 } catch (Exception ex) { 1532 throw new Error(ex); 1533 } 1534 System.err.println("WARNING: Using custom default scheduler, this is an experimental feature!"); 1535 return scheduler; 1536 } 1537 1538 /** 1539 * Creates the built-in ForkJoinPool scheduler. 1540 * @param wrapped true if wrapped by a custom default scheduler 1541 */ 1542 private static VirtualThreadScheduler createBuiltinScheduler(boolean wrapped) { 1543 int parallelism, maxPoolSize, minRunnable; 1544 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism"); 1545 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize"); 1546 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable"); 1547 if (parallelismValue != null) { 1548 parallelism = Integer.parseInt(parallelismValue); 1549 } else { 1550 parallelism = Runtime.getRuntime().availableProcessors(); 1551 } 1552 if (maxPoolSizeValue != null) { 1553 maxPoolSize = Integer.parseInt(maxPoolSizeValue); 1554 parallelism = Integer.min(parallelism, maxPoolSize); 1555 } else { 1556 maxPoolSize = Integer.max(parallelism, 256); 1557 } 1558 if (minRunnableValue != null) { 1559 minRunnable = Integer.parseInt(minRunnableValue); 1560 } else { 1561 minRunnable = Integer.max(parallelism / 2, 1); 1562 } 1563 if (Boolean.getBoolean("jdk.virtualThreadScheduler.useTPE")) { 1564 return new BuiltinThreadPoolExecutorScheduler(parallelism); 1565 } else { 1566 return new BuiltinForkJoinPoolScheduler(parallelism, maxPoolSize, minRunnable, wrapped); 1567 } 1568 } 1569 1570 /** 1571 * The built-in ForkJoinPool scheduler. 1572 */ 1573 private static class BuiltinForkJoinPoolScheduler 1574 extends ForkJoinPool implements VirtualThreadScheduler { 1575 1576 BuiltinForkJoinPoolScheduler(int parallelism, int maxPoolSize, int minRunnable, boolean wrapped) { 1577 ForkJoinWorkerThreadFactory factory = wrapped 1578 ? ForkJoinPool.defaultForkJoinWorkerThreadFactory 1579 : CarrierThread::new; 1580 Thread.UncaughtExceptionHandler handler = (t, e) -> { }; 1581 boolean asyncMode = true; // FIFO 1582 super(parallelism, factory, handler, asyncMode, 1583 0, maxPoolSize, minRunnable, pool -> true, 30L, SECONDS); 1584 } 1585 1586 private void adaptAndExecute(Runnable task) { 1587 execute(ForkJoinTask.adapt(task)); 1588 } 1589 1590 @Override 1591 public void onStart(VirtualThreadTask task) { 1592 adaptAndExecute(task); 1593 } 1594 1595 @Override 1596 public void onContinue(VirtualThreadTask task) { 1597 adaptAndExecute(task); 1598 } 1599 1600 @Override 1601 public ScheduledFuture<?> schedule(Runnable task, long delay, TimeUnit unit) { 1602 return super.schedule(task, delay, unit); 1603 } 1604 } 1605 1606 /** 1607 * Built-in ThreadPoolExecutor scheduler. 1608 */ 1609 private static class BuiltinThreadPoolExecutorScheduler 1610 extends ThreadPoolExecutor implements VirtualThreadScheduler { 1611 1612 BuiltinThreadPoolExecutorScheduler(int maxPoolSize) { 1613 ThreadFactory factory = task -> { 1614 Thread t = InnocuousThread.newThread(task); 1615 t.setDaemon(true); 1616 return t; 1617 }; 1618 super(maxPoolSize, maxPoolSize, 1619 0L, SECONDS, 1620 new LinkedTransferQueue<>(), 1621 factory); 1622 } 1623 1624 @Override 1625 public void onStart(VirtualThreadTask task) { 1626 execute(task); 1627 } 1628 1629 @Override 1630 public void onContinue(VirtualThreadTask task) { 1631 execute(task); 1632 } 1633 } 1634 1635 /** 1636 * Wraps the scheduler to avoid leaking a direct reference with 1637 * {@link VirtualThreadScheduler#current()}. 1638 */ 1639 static VirtualThreadScheduler createExternalView(VirtualThreadScheduler delegate) { 1640 return new VirtualThreadScheduler() { 1641 private void check(VirtualThreadTask task) { 1642 var vthread = (VirtualThread) task.thread(); 1643 VirtualThreadScheduler scheduler = vthread.scheduler; 1644 if (scheduler != this && scheduler != DEFAULT_SCHEDULER) { 1645 throw new IllegalArgumentException(); 1646 } 1647 } 1648 @Override 1649 public void onStart(VirtualThreadTask task) { 1650 check(task); 1651 delegate.onStart(task); 1652 } 1653 @Override 1654 public void onContinue(VirtualThreadTask task) { 1655 check(task); 1656 delegate.onContinue(task); 1657 } 1658 @Override 1659 public String toString() { 1660 return delegate.toString(); 1661 } 1662 }; 1663 } 1664 1665 /** 1666 * Schedule a runnable task to run after a delay. 1667 */ 1668 private Future<?> schedule(Runnable command, long delay, TimeUnit unit) { 1669 if (USE_STPE) { 1670 return DelayedTaskSchedulers.schedule(command, delay, unit); 1671 } else { 1672 return scheduler.schedule(command, delay, unit); 1673 } 1674 } 1675 1676 /** 1677 * Supports scheduling a runnable task to run after a delay. It uses a number 1678 * of ScheduledThreadPoolExecutor instances to reduce contention on the delayed 1679 * work queue used. This class is used when using a custom scheduler. 1680 */ 1681 static class DelayedTaskSchedulers { 1682 private static final ScheduledExecutorService[] INSTANCE = createDelayedTaskSchedulers(); 1683 1684 static Future<?> schedule(Runnable command, long delay, TimeUnit unit) { 1685 long tid = Thread.currentThread().threadId(); 1686 int index = (int) tid & (INSTANCE.length - 1); 1687 return INSTANCE[index].schedule(command, delay, unit); 1688 } 1689 1690 private static ScheduledExecutorService[] createDelayedTaskSchedulers() { 1691 String propName = "jdk.virtualThreadScheduler.timerQueues"; 1692 String propValue = System.getProperty(propName); 1693 int queueCount; 1694 if (propValue != null) { 1695 queueCount = Integer.parseInt(propValue); 1696 if (queueCount != Integer.highestOneBit(queueCount)) { 1697 throw new RuntimeException("Value of " + propName + " must be power of 2"); 1698 } 1699 } else { 1700 int ncpus = Runtime.getRuntime().availableProcessors(); 1701 queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1); 1702 } 1703 var schedulers = new ScheduledExecutorService[queueCount]; 1704 for (int i = 0; i < queueCount; i++) { 1705 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) 1706 Executors.newScheduledThreadPool(1, task -> { 1707 Thread t = InnocuousThread.newThread("VirtualThread-unparker", task); 1708 t.setDaemon(true); 1709 return t; 1710 }); 1711 stpe.setRemoveOnCancelPolicy(true); 1712 schedulers[i] = stpe; 1713 } 1714 return schedulers; 1715 } 1716 } 1717 1718 /** 1719 * Schedule virtual threads that are ready to be scheduled after they blocked on 1720 * monitor enter. 1721 */ 1722 private static void unblockVirtualThreads() { 1723 while (true) { 1724 VirtualThread vthread = takeVirtualThreadListToUnblock(); 1725 while (vthread != null) { 1726 assert vthread.onWaitingList; 1727 VirtualThread nextThread = vthread.next; 1728 1729 // remove from list and unblock 1730 vthread.next = null; 1731 boolean changed = vthread.compareAndSetOnWaitingList(true, false); 1732 assert changed; 1733 vthread.unblock(); 1734 1735 vthread = nextThread; 1736 } 1737 } 1738 } 1739 1740 /** 1741 * Retrieves the list of virtual threads that are waiting to be unblocked, waiting 1742 * if necessary until a list of one or more threads becomes available. 1743 */ 1744 private static native VirtualThread takeVirtualThreadListToUnblock(); 1745 1746 static { 1747 var unblocker = InnocuousThread.newThread("VirtualThread-unblocker", 1748 VirtualThread::unblockVirtualThreads); 1749 unblocker.setDaemon(true); 1750 unblocker.start(); 1751 } 1752 } --- EOF ---