1 /* 2 * Copyright (c) 2018, 2026, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.lang; 26 27 import java.lang.invoke.MethodHandles; 28 import java.lang.invoke.VarHandle; 29 import java.lang.reflect.Constructor; 30 import java.util.Locale; 31 import java.util.Objects; 32 import java.util.concurrent.CountDownLatch; 33 import java.util.concurrent.Executors; 34 import java.util.concurrent.ForkJoinPool; 35 import java.util.concurrent.ForkJoinTask; 36 import java.util.concurrent.Future; 37 import java.util.concurrent.LinkedTransferQueue; 38 import java.util.concurrent.RejectedExecutionException; 39 import java.util.concurrent.ScheduledExecutorService; 40 import java.util.concurrent.ScheduledFuture; 41 import java.util.concurrent.ScheduledThreadPoolExecutor; 42 import java.util.concurrent.ThreadFactory; 43 import java.util.concurrent.ThreadPoolExecutor; 44 import java.util.concurrent.TimeUnit; 45 import jdk.internal.event.VirtualThreadEndEvent; 46 import jdk.internal.event.VirtualThreadParkEvent; 47 import jdk.internal.event.VirtualThreadStartEvent; 48 import jdk.internal.event.VirtualThreadSubmitFailedEvent; 49 import jdk.internal.invoke.MhUtil; 50 import jdk.internal.misc.CarrierThread; 51 import jdk.internal.misc.InnocuousThread; 52 import jdk.internal.misc.Unsafe; 53 import jdk.internal.vm.Continuation; 54 import jdk.internal.vm.ContinuationScope; 55 import jdk.internal.vm.StackableScope; 56 import jdk.internal.vm.ThreadContainer; 57 import jdk.internal.vm.ThreadContainers; 58 import jdk.internal.vm.annotation.ChangesCurrentThread; 59 import jdk.internal.vm.annotation.Hidden; 60 import jdk.internal.vm.annotation.IntrinsicCandidate; 61 import jdk.internal.vm.annotation.JvmtiHideEvents; 62 import jdk.internal.vm.annotation.JvmtiMountTransition; 63 import jdk.internal.vm.annotation.ReservedStackAccess; 64 import sun.nio.ch.Interruptible; 65 import static java.util.concurrent.TimeUnit.*; 66 67 /** 68 * A thread that is scheduled by the Java virtual machine rather than the operating system. 69 */ 70 final class VirtualThread extends BaseVirtualThread { 71 private static final Unsafe U = Unsafe.getUnsafe(); 72 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads"); 73 74 private static final VirtualThreadScheduler BUILTIN_SCHEDULER; 75 private static final VirtualThreadScheduler DEFAULT_SCHEDULER; 76 private static final VirtualThreadScheduler EXTERNAL_VIEW; 77 static { 78 // experimental 79 String propValue = System.getProperty("jdk.virtualThreadScheduler.implClass"); 80 if (propValue != null) { 81 VirtualThreadScheduler builtinScheduler = createBuiltinScheduler(true); 82 VirtualThreadScheduler externalView = createExternalView(builtinScheduler); 83 VirtualThreadScheduler defaultScheduler = loadCustomScheduler(externalView, propValue); 84 BUILTIN_SCHEDULER = builtinScheduler; 85 DEFAULT_SCHEDULER = defaultScheduler; 86 EXTERNAL_VIEW = externalView; 87 } else { 88 var builtinScheduler = createBuiltinScheduler(false); 89 BUILTIN_SCHEDULER = builtinScheduler; 90 DEFAULT_SCHEDULER = builtinScheduler; 91 EXTERNAL_VIEW = createExternalView(builtinScheduler); 92 } 93 } 94 95 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state"); 96 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit"); 97 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread"); 98 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination"); 99 private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList"); 100 101 // scheduler and continuation 102 private final VirtualThreadScheduler scheduler; 103 private final Continuation cont; 104 private final VirtualThreadTask runContinuation; 105 106 // virtual thread state, accessed by VM 107 private volatile int state; 108 109 /* 110 * Virtual thread state transitions: 111 * 112 * NEW -> STARTED // Thread.start, schedule to run 113 * STARTED -> TERMINATED // failed to start 114 * STARTED -> RUNNING // first run 115 * RUNNING -> TERMINATED // done 116 * 117 * RUNNING -> PARKING // Thread parking with LockSupport.park 118 * PARKING -> PARKED // cont.yield successful, parked indefinitely 119 * PARKED -> UNPARKED // unparked, may be scheduled to continue 120 * UNPARKED -> RUNNING // continue execution after park 121 * 122 * PARKING -> RUNNING // cont.yield failed, need to park on carrier 123 * RUNNING -> PINNED // park on carrier 124 * PINNED -> RUNNING // unparked, continue execution on same carrier 125 * 126 * RUNNING -> TIMED_PARKING // Thread parking with LockSupport.parkNanos 127 * TIMED_PARKING -> TIMED_PARKED // cont.yield successful, timed-parked 128 * TIMED_PARKED -> UNPARKED // unparked, may be scheduled to continue 129 * 130 * TIMED_PARKING -> RUNNING // cont.yield failed, need to park on carrier 131 * RUNNING -> TIMED_PINNED // park on carrier 132 * TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier 133 * 134 * RUNNING -> BLOCKING // blocking on monitor enter 135 * BLOCKING -> BLOCKED // blocked on monitor enter 136 * BLOCKED -> UNBLOCKED // unblocked, may be scheduled to continue 137 * UNBLOCKED -> RUNNING // continue execution after blocked on monitor enter 138 * 139 * RUNNING -> WAITING // transitional state during wait on monitor 140 * WAITING -> WAIT // waiting on monitor 141 * WAIT -> BLOCKED // notified, waiting to be unblocked by monitor owner 142 * WAIT -> UNBLOCKED // interrupted 143 * 144 * RUNNING -> TIMED_WAITING // transition state during timed-waiting on monitor 145 * TIMED_WAITING -> TIMED_WAIT // timed-waiting on monitor 146 * TIMED_WAIT -> BLOCKED // notified, waiting to be unblocked by monitor owner 147 * TIMED_WAIT -> UNBLOCKED // timed-out/interrupted 148 * 149 * RUNNING -> YIELDING // Thread.yield 150 * YIELDING -> YIELDED // cont.yield successful, may be scheduled to continue 151 * YIELDING -> RUNNING // cont.yield failed 152 * YIELDED -> RUNNING // continue execution after Thread.yield 153 */ 154 private static final int NEW = 0; 155 private static final int STARTED = 1; 156 private static final int RUNNING = 2; // runnable-mounted 157 158 // untimed and timed parking 159 private static final int PARKING = 3; 160 private static final int PARKED = 4; // unmounted 161 private static final int PINNED = 5; // mounted 162 private static final int TIMED_PARKING = 6; 163 private static final int TIMED_PARKED = 7; // unmounted 164 private static final int TIMED_PINNED = 8; // mounted 165 private static final int UNPARKED = 9; // unmounted but runnable 166 167 // Thread.yield 168 private static final int YIELDING = 10; 169 private static final int YIELDED = 11; // unmounted but runnable 170 171 // monitor enter 172 private static final int BLOCKING = 12; 173 private static final int BLOCKED = 13; // unmounted 174 private static final int UNBLOCKED = 14; // unmounted but runnable 175 176 // monitor wait/timed-wait 177 private static final int WAITING = 15; 178 private static final int WAIT = 16; // waiting in Object.wait 179 private static final int TIMED_WAITING = 17; 180 private static final int TIMED_WAIT = 18; // waiting in timed-Object.wait 181 182 private static final int TERMINATED = 99; // final state 183 184 // parking permit made available by LockSupport.unpark 185 private volatile boolean parkPermit; 186 187 // blocking permit made available by unblocker thread when another thread exits monitor 188 private volatile boolean blockPermit; 189 190 // true when on the list of virtual threads waiting to be unblocked 191 private volatile boolean onWaitingList; 192 193 // next virtual thread on the list of virtual threads waiting to be unblocked 194 private volatile VirtualThread next; 195 196 // notified by Object.notify/notifyAll while waiting in Object.wait 197 private volatile boolean notified; 198 199 // true when waiting in Object.wait, false for VM internal uninterruptible Object.wait 200 private volatile boolean interruptibleWait; 201 202 // timed-wait support 203 private byte timedWaitSeqNo; 204 205 // timeout for timed-park and timed-wait, only accessed on current/carrier thread 206 private long timeout; 207 208 // timer task for timed-park and timed-wait, only accessed on current/carrier thread 209 private Future<?> timeoutTask; 210 211 // carrier thread when mounted, accessed by VM 212 private volatile Thread carrierThread; 213 214 // termination object when joining, created lazily if needed 215 private volatile CountDownLatch termination; 216 217 /** 218 * Return the built-in scheduler. 219 * @param trusted true if caller is trusted, false if not trusted 220 */ 221 static VirtualThreadScheduler builtinScheduler(boolean trusted) { 222 return trusted ? BUILTIN_SCHEDULER : EXTERNAL_VIEW; 223 } 224 225 /** 226 * Returns the default scheduler, usually the same as the built-in scheduler. 227 */ 228 static VirtualThreadScheduler defaultScheduler() { 229 return DEFAULT_SCHEDULER; 230 } 231 232 /** 233 * Returns the continuation scope used for virtual threads. 234 */ 235 static ContinuationScope continuationScope() { 236 return VTHREAD_SCOPE; 237 } 238 239 /** 240 * Return the scheduler for this thread. 241 * @param trusted true if caller is trusted, false if not trusted 242 */ 243 VirtualThreadScheduler scheduler(boolean trusted) { 244 if (scheduler == BUILTIN_SCHEDULER && !trusted) { 245 return EXTERNAL_VIEW; 246 } else { 247 return scheduler; 248 } 249 } 250 251 /** 252 * Returns the task to start/continue this virtual thread. 253 */ 254 VirtualThreadTask virtualThreadTask() { 255 return runContinuation; 256 } 257 258 /** 259 * Creates a new {@code VirtualThread} to run the given task with the given scheduler. 260 * 261 * @param scheduler the scheduler or null for default scheduler 262 * @param preferredCarrier the preferred carrier or null 263 * @param name thread name 264 * @param characteristics characteristics 265 * @param task the task to execute 266 */ 267 VirtualThread(VirtualThreadScheduler scheduler, 268 Thread preferredCarrier, 269 String name, 270 int characteristics, 271 Runnable task) { 272 super(name, characteristics, /*bound*/ false); 273 Objects.requireNonNull(task); 274 275 // use default scheduler if not provided 276 if (scheduler == null) { 277 scheduler = DEFAULT_SCHEDULER; 278 } else if (scheduler == EXTERNAL_VIEW) { 279 throw new UnsupportedOperationException(); 280 } 281 this.scheduler = scheduler; 282 this.cont = new VThreadContinuation(this, task); 283 284 if (scheduler == BUILTIN_SCHEDULER) { 285 this.runContinuation = new BuiltinSchedulerTask(this); 286 } else { 287 this.runContinuation = new CustomSchedulerTask(this, preferredCarrier); 288 } 289 } 290 291 /** 292 * The task to execute when using the built-in scheduler. 293 */ 294 static final class BuiltinSchedulerTask implements VirtualThreadTask { 295 private final VirtualThread vthread; 296 BuiltinSchedulerTask(VirtualThread vthread) { 297 this.vthread = vthread; 298 } 299 @Override 300 public Thread thread() { 301 return vthread; 302 } 303 @Override 304 public void run() { 305 vthread.runContinuation();; 306 } 307 @Override 308 public Thread preferredCarrier() { 309 throw new UnsupportedOperationException(); 310 } 311 @Override 312 public Object attach(Object att) { 313 throw new UnsupportedOperationException(); 314 } 315 @Override 316 public Object attachment() { 317 throw new UnsupportedOperationException(); 318 } 319 } 320 321 /** 322 * The task to execute when using a custom scheduler. 323 */ 324 static final class CustomSchedulerTask implements VirtualThreadTask { 325 private static final VarHandle ATT = 326 MhUtil.findVarHandle(MethodHandles.lookup(), "att", Object.class); 327 private final VirtualThread vthread; 328 private final Thread preferredCarrier; 329 private volatile Object att; 330 CustomSchedulerTask(VirtualThread vthread, Thread preferredCarrier) { 331 this.vthread = vthread; 332 this.preferredCarrier = preferredCarrier; 333 } 334 @Override 335 public Thread thread() { 336 return vthread; 337 } 338 @Override 339 public void run() { 340 vthread.runContinuation();; 341 } 342 @Override 343 public Thread preferredCarrier() { 344 return preferredCarrier; 345 } 346 @Override 347 public Object attach(Object att) { 348 return ATT.getAndSet(this, att); 349 } 350 @Override 351 public Object attachment() { 352 return att; 353 } 354 } 355 356 /** 357 * The continuation that a virtual thread executes. 358 */ 359 private static class VThreadContinuation extends Continuation { 360 VThreadContinuation(VirtualThread vthread, Runnable task) { 361 super(VTHREAD_SCOPE, wrap(vthread, task)); 362 } 363 @Override 364 protected void onPinned(Continuation.Pinned reason) { 365 } 366 private static Runnable wrap(VirtualThread vthread, Runnable task) { 367 return new Runnable() { 368 @Hidden 369 @JvmtiHideEvents 370 public void run() { 371 vthread.endFirstTransition(); 372 try { 373 vthread.run(task); 374 } finally { 375 vthread.startFinalTransition(); 376 } 377 } 378 }; 379 } 380 } 381 382 /** 383 * Runs or continues execution on the current thread. The virtual thread is mounted 384 * on the current thread before the task runs or continues. It unmounts when the 385 * task completes or yields. 386 */ 387 @ChangesCurrentThread // allow mount/unmount to be inlined 388 private void runContinuation() { 389 // the carrier must be a platform thread 390 if (Thread.currentThread().isVirtual()) { 391 throw new WrongThreadException(); 392 } 393 394 // set state to RUNNING 395 int initialState = state(); 396 if (initialState == STARTED || initialState == UNPARKED 397 || initialState == UNBLOCKED || initialState == YIELDED) { 398 // newly started or continue after parking/blocking/Thread.yield 399 if (!compareAndSetState(initialState, RUNNING)) { 400 return; 401 } 402 // consume permit when continuing after parking or blocking. If continue 403 // after a timed-park or timed-wait then the timeout task is cancelled. 404 if (initialState == UNPARKED) { 405 cancelTimeoutTask(); 406 setParkPermit(false); 407 } else if (initialState == UNBLOCKED) { 408 cancelTimeoutTask(); 409 blockPermit = false; 410 } 411 } else { 412 // not runnable 413 return; 414 } 415 416 mount(); 417 try { 418 cont.run(); 419 } finally { 420 unmount(); 421 if (cont.isDone()) { 422 afterDone(); 423 } else { 424 afterYield(); 425 } 426 } 427 } 428 429 /** 430 * Cancel timeout task when continuing after timed-park or timed-wait. 431 * The timeout task may be executing, or may have already completed. 432 */ 433 private void cancelTimeoutTask() { 434 if (timeoutTask != null) { 435 timeoutTask.cancel(false); 436 timeoutTask = null; 437 } 438 } 439 440 /** 441 * Submits the runContinuation task to the scheduler. For the built-in scheduler, 442 * the task will be pushed to the local queue if possible, otherwise it will be 443 * pushed to an external submission queue. 444 * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown 445 * @throws RejectedExecutionException 446 */ 447 private void submitRunContinuation(boolean retryOnOOME) { 448 boolean done = false; 449 while (!done) { 450 try { 451 // Pin the continuation to prevent the virtual thread from unmounting 452 // when submitting a task. For the default scheduler this ensures that 453 // the carrier doesn't change when pushing a task. For other schedulers 454 // it avoids deadlock that could arise due to carriers and virtual 455 // threads contending for a lock. 456 if (currentThread().isVirtual()) { 457 Continuation.pin(); 458 try { 459 scheduler.onContinue(runContinuation); 460 } finally { 461 Continuation.unpin(); 462 } 463 } else { 464 scheduler.onContinue(runContinuation); 465 } 466 done = true; 467 } catch (RejectedExecutionException ree) { 468 submitFailed(ree); 469 throw ree; 470 } catch (OutOfMemoryError e) { 471 if (retryOnOOME) { 472 U.park(false, 100_000_000); // 100ms 473 } else { 474 throw e; 475 } 476 } 477 } 478 } 479 480 /** 481 * Submits the runContinuation task to the scheduler. For the default scheduler, 482 * and calling it on a worker thread, the task will be pushed to the local queue, 483 * otherwise it will be pushed to an external submission queue. 484 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 485 * @throws RejectedExecutionException 486 */ 487 private void submitRunContinuation() { 488 submitRunContinuation(true); 489 } 490 491 /** 492 * Invoked from a carrier thread to lazy submit the runContinuation task to the 493 * carrier's local queue if the queue is empty. If not empty, or invoked by a thread 494 * for a custom scheduler, then it just submits the task to the scheduler. 495 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 496 * @throws RejectedExecutionException 497 * @see ForkJoinPool#lazySubmit(ForkJoinTask) 498 */ 499 private void lazySubmitRunContinuation() { 500 assert !currentThread().isVirtual(); 501 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 502 try { 503 ct.getPool().lazySubmit(ForkJoinTask.adapt(runContinuation)); 504 } catch (RejectedExecutionException ree) { 505 submitFailed(ree); 506 throw ree; 507 } catch (OutOfMemoryError e) { 508 submitRunContinuation(); 509 } 510 } else { 511 submitRunContinuation(); 512 } 513 } 514 515 /** 516 * Invoked from a carrier thread to externally submit the runContinuation task to the 517 * scheduler. If invoked by a thread for a custom scheduler, then it just submits the 518 * task to the scheduler. 519 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 520 * @throws RejectedExecutionException 521 * @see ForkJoinPool#externalSubmit(ForkJoinTask) 522 */ 523 private void externalSubmitRunContinuation() { 524 assert !currentThread().isVirtual(); 525 if (currentThread() instanceof CarrierThread ct) { 526 try { 527 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation)); 528 } catch (RejectedExecutionException ree) { 529 submitFailed(ree); 530 throw ree; 531 } catch (OutOfMemoryError e) { 532 submitRunContinuation(); 533 } 534 } else { 535 submitRunContinuation(); 536 } 537 } 538 539 /** 540 * Invoked from Thread.start to externally submit the runContinuation task to the 541 * scheduler. If this virtual thread is scheduled by the built-in scheduler, 542 * and this method is called from a virtual thread scheduled by the built-in 543 * scheduler, then it uses externalSubmit to ensure that the task is pushed to an 544 * external submission queue rather than the local queue. 545 * @throws RejectedExecutionException 546 * @throws OutOfMemoryError 547 * @see ForkJoinPool#externalSubmit(ForkJoinTask) 548 */ 549 private void externalSubmitRunContinuationOrThrow() { 550 try { 551 if (currentThread().isVirtual()) { 552 // Pin the continuation to prevent the virtual thread from unmounting 553 // when submitting a task. This avoids deadlock that could arise due to 554 // carriers and virtual threads contending for a lock. 555 Continuation.pin(); 556 try { 557 if (scheduler == BUILTIN_SCHEDULER 558 && currentCarrierThread() instanceof CarrierThread ct) { 559 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation)); 560 } else { 561 scheduler.onStart(runContinuation); 562 } 563 } finally { 564 Continuation.unpin(); 565 } 566 } else { 567 scheduler.onStart(runContinuation); 568 } 569 } catch (RejectedExecutionException ree) { 570 submitFailed(ree); 571 throw ree; 572 } 573 } 574 575 /** 576 * If enabled, emits a JFR VirtualThreadSubmitFailedEvent. 577 */ 578 private void submitFailed(RejectedExecutionException ree) { 579 var event = new VirtualThreadSubmitFailedEvent(); 580 if (event.isEnabled()) { 581 event.javaThreadId = threadId(); 582 event.exceptionMessage = ree.getMessage(); 583 event.commit(); 584 } 585 } 586 587 /** 588 * Runs a task in the context of this virtual thread. 589 */ 590 private void run(Runnable task) { 591 assert Thread.currentThread() == this && state == RUNNING; 592 593 // emit JFR event if enabled 594 if (VirtualThreadStartEvent.isTurnedOn()) { 595 var event = new VirtualThreadStartEvent(); 596 event.javaThreadId = threadId(); 597 event.commit(); 598 } 599 600 Object bindings = Thread.scopedValueBindings(); 601 try { 602 runWith(bindings, task); 603 } catch (Throwable exc) { 604 dispatchUncaughtException(exc); 605 } finally { 606 // pop any remaining scopes from the stack, this may block 607 StackableScope.popAll(); 608 609 // emit JFR event if enabled 610 if (VirtualThreadEndEvent.isTurnedOn()) { 611 var event = new VirtualThreadEndEvent(); 612 event.javaThreadId = threadId(); 613 event.commit(); 614 } 615 } 616 } 617 618 /** 619 * Mounts this virtual thread onto the current platform thread. On 620 * return, the current thread is the virtual thread. 621 */ 622 @ChangesCurrentThread 623 @ReservedStackAccess 624 private void mount() { 625 startTransition(/*mount*/true); 626 // We assume following volatile accesses provide equivalent 627 // of acquire ordering, otherwise we need U.loadFence() here. 628 629 // sets the carrier thread 630 Thread carrier = Thread.currentCarrierThread(); 631 setCarrierThread(carrier); 632 633 // sync up carrier thread interrupted status if needed 634 if (interrupted) { 635 carrier.setInterrupt(); 636 } else if (carrier.isInterrupted()) { 637 synchronized (interruptLock) { 638 // need to recheck interrupted status 639 if (!interrupted) { 640 carrier.clearInterrupt(); 641 } 642 } 643 } 644 645 // set Thread.currentThread() to return this virtual thread 646 carrier.setCurrentThread(this); 647 } 648 649 /** 650 * Unmounts this virtual thread from the carrier. On return, the 651 * current thread is the current platform thread. 652 */ 653 @ChangesCurrentThread 654 @ReservedStackAccess 655 private void unmount() { 656 assert !Thread.holdsLock(interruptLock); 657 658 // set Thread.currentThread() to return the platform thread 659 Thread carrier = this.carrierThread; 660 carrier.setCurrentThread(carrier); 661 662 // break connection to carrier thread, synchronized with interrupt 663 synchronized (interruptLock) { 664 setCarrierThread(null); 665 } 666 carrier.clearInterrupt(); 667 668 // We assume previous volatile accesses provide equivalent 669 // of release ordering, otherwise we need U.storeFence() here. 670 endTransition(/*mount*/false); 671 } 672 673 /** 674 * Invokes Continuation.yield, notifying JVMTI (if enabled) to hide frames until 675 * the continuation continues. 676 */ 677 @Hidden 678 private boolean yieldContinuation() { 679 startTransition(/*mount*/false); 680 try { 681 return Continuation.yield(VTHREAD_SCOPE); 682 } finally { 683 endTransition(/*mount*/true); 684 } 685 } 686 687 /** 688 * Invoked in the context of the carrier thread after the Continuation yields when 689 * parking, blocking on monitor enter, Object.wait, or Thread.yield. 690 */ 691 private void afterYield() { 692 assert carrierThread == null; 693 694 // re-adjust parallelism if the virtual thread yielded when compensating 695 if (currentThread() instanceof CarrierThread ct) { 696 ct.endBlocking(); 697 } 698 699 int s = state(); 700 701 // LockSupport.park/parkNanos 702 if (s == PARKING || s == TIMED_PARKING) { 703 int newState; 704 if (s == PARKING) { 705 setState(newState = PARKED); 706 } else { 707 // schedule unpark 708 long timeout = this.timeout; 709 assert timeout > 0; 710 timeoutTask = schedule(this::parkTimeoutExpired, timeout, NANOSECONDS); 711 setState(newState = TIMED_PARKED); 712 } 713 714 // may have been unparked while parking 715 if (parkPermit && compareAndSetState(newState, UNPARKED)) { 716 // lazy submit if local queue is empty 717 lazySubmitRunContinuation(); 718 } 719 return; 720 } 721 722 // Thread.yield 723 if (s == YIELDING) { 724 setState(YIELDED); 725 726 // external submit if there are no tasks in the local task queue 727 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 728 externalSubmitRunContinuation(); 729 } else { 730 submitRunContinuation(); 731 } 732 return; 733 } 734 735 // blocking on monitorenter 736 if (s == BLOCKING) { 737 setState(BLOCKED); 738 739 // may have been unblocked while blocking 740 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) { 741 // lazy submit if local queue is empty 742 lazySubmitRunContinuation(); 743 } 744 return; 745 } 746 747 // Object.wait 748 if (s == WAITING || s == TIMED_WAITING) { 749 int newState; 750 boolean blocked; 751 boolean interruptible = interruptibleWait; 752 if (s == WAITING) { 753 setState(newState = WAIT); 754 // may have been notified while in transition 755 blocked = notified && compareAndSetState(WAIT, BLOCKED); 756 } else { 757 // For timed-wait, a timeout task is scheduled to execute. The timeout 758 // task will change the thread state to UNBLOCKED and submit the thread 759 // to the scheduler. A sequence number is used to ensure that the timeout 760 // task only unblocks the thread for this timed-wait. We synchronize with 761 // the timeout task to coordinate access to the sequence number and to 762 // ensure the timeout task doesn't execute until the thread has got to 763 // the TIMED_WAIT state. 764 long timeout = this.timeout; 765 assert timeout > 0; 766 synchronized (timedWaitLock()) { 767 byte seqNo = ++timedWaitSeqNo; 768 timeoutTask = schedule(() -> waitTimeoutExpired(seqNo), timeout, MILLISECONDS); 769 setState(newState = TIMED_WAIT); 770 // May have been notified while in transition. This must be done while 771 // holding the monitor to avoid changing the state of a new timed wait call. 772 blocked = notified && compareAndSetState(TIMED_WAIT, BLOCKED); 773 } 774 } 775 776 if (blocked) { 777 // may have been unblocked already 778 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) { 779 lazySubmitRunContinuation(); 780 } 781 } else { 782 // may have been interrupted while in transition to wait state 783 if (interruptible && interrupted && compareAndSetState(newState, UNBLOCKED)) { 784 lazySubmitRunContinuation(); 785 } 786 } 787 return; 788 } 789 790 assert false; 791 } 792 793 /** 794 * Invoked after the continuation completes. 795 */ 796 private void afterDone() { 797 afterDone(true); 798 } 799 800 /** 801 * Invoked after the continuation completes (or start failed). Sets the thread 802 * state to TERMINATED and notifies anyone waiting for the thread to terminate. 803 * 804 * @param notifyContainer true if its container should be notified 805 */ 806 private void afterDone(boolean notifyContainer) { 807 assert carrierThread == null; 808 setState(TERMINATED); 809 810 // notify anyone waiting for this virtual thread to terminate 811 CountDownLatch termination = this.termination; 812 if (termination != null) { 813 assert termination.getCount() == 1; 814 termination.countDown(); 815 } 816 817 // notify container 818 if (notifyContainer) { 819 threadContainer().remove(this); 820 } 821 822 // clear references to thread locals 823 clearReferences(); 824 } 825 826 /** 827 * Schedules this {@code VirtualThread} to execute. 828 * 829 * @throws IllegalStateException if the container is shutdown or closed 830 * @throws IllegalThreadStateException if the thread has already been started 831 * @throws RejectedExecutionException if the scheduler cannot accept a task 832 */ 833 @Override 834 void start(ThreadContainer container) { 835 if (!compareAndSetState(NEW, STARTED)) { 836 throw new IllegalThreadStateException("Already started"); 837 } 838 839 // bind thread to container 840 assert threadContainer() == null; 841 setThreadContainer(container); 842 843 // start thread 844 boolean addedToContainer = false; 845 boolean started = false; 846 try { 847 container.add(this); // may throw 848 addedToContainer = true; 849 850 // scoped values may be inherited 851 inheritScopedValueBindings(container); 852 853 // submit task to run thread, using externalSubmit if possible 854 externalSubmitRunContinuationOrThrow(); 855 started = true; 856 } finally { 857 if (!started) { 858 afterDone(addedToContainer); 859 } 860 } 861 } 862 863 @Override 864 public void start() { 865 start(ThreadContainers.root()); 866 } 867 868 @Override 869 public void run() { 870 // do nothing 871 } 872 873 /** 874 * Parks until unparked or interrupted. If already unparked then the parking 875 * permit is consumed and this method completes immediately (meaning it doesn't 876 * yield). It also completes immediately if the interrupted status is set. 877 */ 878 @Override 879 void park() { 880 assert Thread.currentThread() == this; 881 882 // complete immediately if parking permit available or interrupted 883 if (getAndSetParkPermit(false) || interrupted) 884 return; 885 886 // park the thread 887 boolean yielded = false; 888 long eventStartTime = VirtualThreadParkEvent.eventStartTime(); 889 setState(PARKING); 890 try { 891 yielded = yieldContinuation(); 892 } catch (OutOfMemoryError e) { 893 // park on carrier 894 } finally { 895 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 896 if (yielded) { 897 VirtualThreadParkEvent.offer(eventStartTime, Long.MIN_VALUE); 898 } else { 899 assert state() == PARKING; 900 setState(RUNNING); 901 } 902 } 903 904 // park on the carrier thread when pinned 905 if (!yielded) { 906 parkOnCarrierThread(false, 0); 907 } 908 } 909 910 /** 911 * Parks up to the given waiting time or until unparked or interrupted. 912 * If already unparked then the parking permit is consumed and this method 913 * completes immediately (meaning it doesn't yield). It also completes immediately 914 * if the interrupted status is set or the waiting time is {@code <= 0}. 915 * 916 * @param nanos the maximum number of nanoseconds to wait. 917 */ 918 @Override 919 void parkNanos(long nanos) { 920 assert Thread.currentThread() == this; 921 922 // complete immediately if parking permit available or interrupted 923 if (getAndSetParkPermit(false) || interrupted) 924 return; 925 926 // park the thread for the waiting time 927 if (nanos > 0) { 928 long startTime = System.nanoTime(); 929 930 // park the thread, afterYield will schedule the thread to unpark 931 boolean yielded = false; 932 long eventStartTime = VirtualThreadParkEvent.eventStartTime(); 933 timeout = nanos; 934 setState(TIMED_PARKING); 935 try { 936 yielded = yieldContinuation(); 937 } catch (OutOfMemoryError e) { 938 // park on carrier 939 } finally { 940 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 941 if (yielded) { 942 VirtualThreadParkEvent.offer(eventStartTime, nanos); 943 } else { 944 assert state() == TIMED_PARKING; 945 setState(RUNNING); 946 } 947 } 948 949 // park on carrier thread for remaining time when pinned (or OOME) 950 if (!yielded) { 951 long remainingNanos = nanos - (System.nanoTime() - startTime); 952 parkOnCarrierThread(true, remainingNanos); 953 } 954 } 955 } 956 957 /** 958 * Parks the current carrier thread up to the given waiting time or until 959 * unparked or interrupted. If the virtual thread is interrupted then the 960 * interrupted status will be propagated to the carrier thread. 961 * @param timed true for a timed park, false for untimed 962 * @param nanos the waiting time in nanoseconds 963 */ 964 private void parkOnCarrierThread(boolean timed, long nanos) { 965 assert state() == RUNNING; 966 967 setState(timed ? TIMED_PINNED : PINNED); 968 try { 969 if (!parkPermit) { 970 if (!timed) { 971 U.park(false, 0); 972 } else if (nanos > 0) { 973 U.park(false, nanos); 974 } 975 } 976 } finally { 977 setState(RUNNING); 978 } 979 980 // consume parking permit 981 setParkPermit(false); 982 983 // JFR jdk.VirtualThreadPinned event 984 postPinnedEvent("LockSupport.park"); 985 } 986 987 /** 988 * Call into VM when pinned to record a JFR jdk.VirtualThreadPinned event. 989 * Recording the event in the VM avoids having JFR event recorded in Java 990 * with the same name, but different ID, to events recorded by the VM. 991 */ 992 @Hidden 993 private static native void postPinnedEvent(String op); 994 995 /** 996 * Re-enables this virtual thread for scheduling. If this virtual thread is parked 997 * then its task is scheduled to continue, otherwise its next call to {@code park} or 998 * {@linkplain #parkNanos(long) parkNanos} is guaranteed not to block. 999 * @param lazySubmit to use lazySubmit if possible 1000 * @throws RejectedExecutionException if the scheduler cannot accept a task 1001 */ 1002 private void unpark(boolean lazySubmit) { 1003 if (!getAndSetParkPermit(true) && currentThread() != this) { 1004 int s = state(); 1005 1006 // unparked while parked 1007 if ((s == PARKED || s == TIMED_PARKED) && compareAndSetState(s, UNPARKED)) { 1008 if (lazySubmit) { 1009 lazySubmitRunContinuation(); 1010 } else { 1011 submitRunContinuation(); 1012 } 1013 return; 1014 } 1015 1016 // unparked while parked when pinned 1017 if (s == PINNED || s == TIMED_PINNED) { 1018 // unpark carrier thread when pinned 1019 disableSuspendAndPreempt(); 1020 try { 1021 synchronized (carrierThreadAccessLock()) { 1022 Thread carrier = carrierThread; 1023 if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) { 1024 U.unpark(carrier); 1025 } 1026 } 1027 } finally { 1028 enableSuspendAndPreempt(); 1029 } 1030 return; 1031 } 1032 } 1033 } 1034 1035 @Override 1036 void unpark() { 1037 unpark(false); 1038 } 1039 1040 /** 1041 * Invoked by unblocker thread to unblock this virtual thread. 1042 */ 1043 private void unblock() { 1044 assert !Thread.currentThread().isVirtual(); 1045 blockPermit = true; 1046 if (state() == BLOCKED && compareAndSetState(BLOCKED, UNBLOCKED)) { 1047 submitRunContinuation(); 1048 } 1049 } 1050 1051 /** 1052 * Invoked by FJP worker thread or STPE thread when park timeout expires. 1053 */ 1054 private void parkTimeoutExpired() { 1055 assert !VirtualThread.currentThread().isVirtual(); 1056 unpark(true); 1057 } 1058 1059 /** 1060 * Invoked by FJP worker thread or STPE thread when wait timeout expires. 1061 * If the virtual thread is in timed-wait then this method will unblock the thread 1062 * and submit its task so that it continues and attempts to reenter the monitor. 1063 * This method does nothing if the thread has been woken by notify or interrupt. 1064 */ 1065 private void waitTimeoutExpired(byte seqNo) { 1066 assert !Thread.currentThread().isVirtual(); 1067 1068 synchronized (timedWaitLock()) { 1069 if (seqNo != timedWaitSeqNo) { 1070 // this timeout task is for a past timed-wait 1071 return; 1072 } 1073 if (!compareAndSetState(TIMED_WAIT, UNBLOCKED)) { 1074 // already unblocked 1075 return; 1076 } 1077 } 1078 1079 lazySubmitRunContinuation(); 1080 } 1081 1082 /** 1083 * Attempts to yield the current virtual thread (Thread.yield). 1084 */ 1085 void tryYield() { 1086 assert Thread.currentThread() == this; 1087 setState(YIELDING); 1088 boolean yielded = false; 1089 try { 1090 yielded = yieldContinuation(); // may throw 1091 } finally { 1092 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 1093 if (!yielded) { 1094 assert state() == YIELDING; 1095 setState(RUNNING); 1096 } 1097 } 1098 } 1099 1100 /** 1101 * Sleep the current thread for the given sleep time (in nanoseconds). If 1102 * nanos is 0 then the thread will attempt to yield. 1103 * 1104 * @implNote This implementation parks the thread for the given sleeping time 1105 * and will therefore be observed in PARKED state during the sleep. Parking 1106 * will consume the parking permit so this method makes available the parking 1107 * permit after the sleep. This may be observed as a spurious, but benign, 1108 * wakeup when the thread subsequently attempts to park. 1109 * 1110 * @param nanos the maximum number of nanoseconds to sleep 1111 * @throws InterruptedException if interrupted while sleeping 1112 */ 1113 void sleepNanos(long nanos) throws InterruptedException { 1114 assert Thread.currentThread() == this && nanos >= 0; 1115 if (getAndClearInterrupt()) 1116 throw new InterruptedException(); 1117 if (nanos == 0) { 1118 tryYield(); 1119 } else { 1120 // park for the sleep time 1121 try { 1122 long remainingNanos = nanos; 1123 long startNanos = System.nanoTime(); 1124 while (remainingNanos > 0) { 1125 parkNanos(remainingNanos); 1126 if (getAndClearInterrupt()) { 1127 throw new InterruptedException(); 1128 } 1129 remainingNanos = nanos - (System.nanoTime() - startNanos); 1130 } 1131 } finally { 1132 // may have been unparked while sleeping 1133 setParkPermit(true); 1134 } 1135 } 1136 } 1137 1138 /** 1139 * Waits up to {@code nanos} nanoseconds for this virtual thread to terminate. 1140 * A timeout of {@code 0} means to wait forever. 1141 * 1142 * @throws InterruptedException if interrupted while waiting 1143 * @return true if the thread has terminated 1144 */ 1145 boolean joinNanos(long nanos) throws InterruptedException { 1146 if (state() == TERMINATED) 1147 return true; 1148 1149 // ensure termination object exists, then re-check state 1150 CountDownLatch termination = getTermination(); 1151 if (state() == TERMINATED) 1152 return true; 1153 1154 // wait for virtual thread to terminate 1155 if (nanos == 0) { 1156 termination.await(); 1157 } else { 1158 boolean terminated = termination.await(nanos, NANOSECONDS); 1159 if (!terminated) { 1160 // waiting time elapsed 1161 return false; 1162 } 1163 } 1164 assert state() == TERMINATED; 1165 return true; 1166 } 1167 1168 @Override 1169 void blockedOn(Interruptible b) { 1170 disableSuspendAndPreempt(); 1171 try { 1172 super.blockedOn(b); 1173 } finally { 1174 enableSuspendAndPreempt(); 1175 } 1176 } 1177 1178 @Override 1179 public void interrupt() { 1180 if (Thread.currentThread() != this) { 1181 // if current thread is a virtual thread then prevent it from being 1182 // suspended or unmounted when entering or holding interruptLock 1183 Interruptible blocker; 1184 disableSuspendAndPreempt(); 1185 try { 1186 synchronized (interruptLock) { 1187 interrupted = true; 1188 blocker = nioBlocker(); 1189 if (blocker != null) { 1190 blocker.interrupt(this); 1191 } 1192 1193 // interrupt carrier thread if mounted 1194 Thread carrier = carrierThread; 1195 if (carrier != null) carrier.setInterrupt(); 1196 } 1197 } finally { 1198 enableSuspendAndPreempt(); 1199 } 1200 1201 // notify blocker after releasing interruptLock 1202 if (blocker != null) { 1203 blocker.postInterrupt(); 1204 } 1205 1206 // make available parking permit, unpark thread if parked 1207 unpark(); 1208 1209 // if thread is waiting in Object.wait then schedule to try to reenter 1210 int s = state(); 1211 if ((s == WAIT || s == TIMED_WAIT) && compareAndSetState(s, UNBLOCKED)) { 1212 submitRunContinuation(); 1213 } 1214 1215 } else { 1216 interrupted = true; 1217 carrierThread.setInterrupt(); 1218 setParkPermit(true); 1219 } 1220 } 1221 1222 @Override 1223 public boolean isInterrupted() { 1224 return interrupted; 1225 } 1226 1227 @Override 1228 boolean getAndClearInterrupt() { 1229 assert Thread.currentThread() == this; 1230 boolean oldValue = interrupted; 1231 if (oldValue) { 1232 disableSuspendAndPreempt(); 1233 try { 1234 synchronized (interruptLock) { 1235 interrupted = false; 1236 carrierThread.clearInterrupt(); 1237 } 1238 } finally { 1239 enableSuspendAndPreempt(); 1240 } 1241 } 1242 return oldValue; 1243 } 1244 1245 @Override 1246 Thread.State threadState() { 1247 switch (state()) { 1248 case NEW: 1249 return Thread.State.NEW; 1250 case STARTED: 1251 // return NEW if thread container not yet set 1252 if (threadContainer() == null) { 1253 return Thread.State.NEW; 1254 } else { 1255 return Thread.State.RUNNABLE; 1256 } 1257 case UNPARKED: 1258 case UNBLOCKED: 1259 case YIELDED: 1260 // runnable, not mounted 1261 return Thread.State.RUNNABLE; 1262 case RUNNING: 1263 // if mounted then return state of carrier thread 1264 if (Thread.currentThread() != this) { 1265 disableSuspendAndPreempt(); 1266 try { 1267 synchronized (carrierThreadAccessLock()) { 1268 Thread carrierThread = this.carrierThread; 1269 if (carrierThread != null) { 1270 return carrierThread.threadState(); 1271 } 1272 } 1273 } finally { 1274 enableSuspendAndPreempt(); 1275 } 1276 } 1277 // runnable, mounted 1278 return Thread.State.RUNNABLE; 1279 case PARKING: 1280 case TIMED_PARKING: 1281 case WAITING: 1282 case TIMED_WAITING: 1283 case YIELDING: 1284 // runnable, in transition 1285 return Thread.State.RUNNABLE; 1286 case PARKED: 1287 case PINNED: 1288 case WAIT: 1289 return Thread.State.WAITING; 1290 case TIMED_PARKED: 1291 case TIMED_PINNED: 1292 case TIMED_WAIT: 1293 return Thread.State.TIMED_WAITING; 1294 case BLOCKING: 1295 case BLOCKED: 1296 return Thread.State.BLOCKED; 1297 case TERMINATED: 1298 return Thread.State.TERMINATED; 1299 default: 1300 throw new InternalError(); 1301 } 1302 } 1303 1304 @Override 1305 boolean alive() { 1306 int s = state; 1307 return (s != NEW && s != TERMINATED); 1308 } 1309 1310 @Override 1311 boolean isTerminated() { 1312 return (state == TERMINATED); 1313 } 1314 1315 @Override 1316 public String toString() { 1317 StringBuilder sb = new StringBuilder("VirtualThread[#"); 1318 sb.append(threadId()); 1319 String name = getName(); 1320 if (!name.isEmpty()) { 1321 sb.append(","); 1322 sb.append(name); 1323 } 1324 sb.append("]/"); 1325 1326 // add the carrier state and thread name when mounted 1327 boolean mounted; 1328 if (Thread.currentThread() == this) { 1329 mounted = appendCarrierInfo(sb); 1330 } else { 1331 disableSuspendAndPreempt(); 1332 try { 1333 synchronized (carrierThreadAccessLock()) { 1334 mounted = appendCarrierInfo(sb); 1335 } 1336 } finally { 1337 enableSuspendAndPreempt(); 1338 } 1339 } 1340 1341 // add virtual thread state when not mounted 1342 if (!mounted) { 1343 String stateAsString = threadState().toString(); 1344 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1345 } 1346 1347 return sb.toString(); 1348 } 1349 1350 /** 1351 * Appends the carrier state and thread name to the string buffer if mounted. 1352 * @return true if mounted, false if not mounted 1353 */ 1354 private boolean appendCarrierInfo(StringBuilder sb) { 1355 assert Thread.currentThread() == this || Thread.holdsLock(carrierThreadAccessLock()); 1356 Thread carrier = carrierThread; 1357 if (carrier != null) { 1358 String stateAsString = carrier.threadState().toString(); 1359 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1360 sb.append('@'); 1361 sb.append(carrier.getName()); 1362 return true; 1363 } else { 1364 return false; 1365 } 1366 } 1367 1368 @Override 1369 public int hashCode() { 1370 return (int) threadId(); 1371 } 1372 1373 @Override 1374 public boolean equals(Object obj) { 1375 return obj == this; 1376 } 1377 1378 /** 1379 * Returns the termination object, creating it if needed. 1380 */ 1381 private CountDownLatch getTermination() { 1382 CountDownLatch termination = this.termination; 1383 if (termination == null) { 1384 termination = new CountDownLatch(1); 1385 if (!U.compareAndSetReference(this, TERMINATION, null, termination)) { 1386 termination = this.termination; 1387 } 1388 } 1389 return termination; 1390 } 1391 1392 /** 1393 * Returns the lock object to synchronize on when accessing carrierThread. 1394 * The lock prevents carrierThread from being reset to null during unmount. 1395 */ 1396 private Object carrierThreadAccessLock() { 1397 // return interruptLock as unmount has to coordinate with interrupt 1398 return interruptLock; 1399 } 1400 1401 /** 1402 * Returns a lock object for coordinating timed-wait setup and timeout handling. 1403 */ 1404 private Object timedWaitLock() { 1405 // use this object for now to avoid the overhead of introducing another lock 1406 return runContinuation; 1407 } 1408 1409 /** 1410 * Disallow the current thread be suspended or preempted. 1411 */ 1412 private void disableSuspendAndPreempt() { 1413 notifyJvmtiDisableSuspend(true); 1414 Continuation.pin(); 1415 } 1416 1417 /** 1418 * Allow the current thread be suspended or preempted. 1419 */ 1420 private void enableSuspendAndPreempt() { 1421 Continuation.unpin(); 1422 notifyJvmtiDisableSuspend(false); 1423 } 1424 1425 // -- wrappers for get/set of state, parking permit, and carrier thread -- 1426 1427 private int state() { 1428 return state; // volatile read 1429 } 1430 1431 private void setState(int newValue) { 1432 state = newValue; // volatile write 1433 } 1434 1435 private boolean compareAndSetState(int expectedValue, int newValue) { 1436 return U.compareAndSetInt(this, STATE, expectedValue, newValue); 1437 } 1438 1439 private boolean compareAndSetOnWaitingList(boolean expectedValue, boolean newValue) { 1440 return U.compareAndSetBoolean(this, ON_WAITING_LIST, expectedValue, newValue); 1441 } 1442 1443 private void setParkPermit(boolean newValue) { 1444 if (parkPermit != newValue) { 1445 parkPermit = newValue; 1446 } 1447 } 1448 1449 private boolean getAndSetParkPermit(boolean newValue) { 1450 if (parkPermit != newValue) { 1451 return U.getAndSetBoolean(this, PARK_PERMIT, newValue); 1452 } else { 1453 return newValue; 1454 } 1455 } 1456 1457 private void setCarrierThread(Thread carrier) { 1458 // U.putReferenceRelease(this, CARRIER_THREAD, carrier); 1459 this.carrierThread = carrier; 1460 } 1461 1462 // The following four methods notify the VM when a "transition" starts and ends. 1463 // A "mount transition" embodies the steps to transfer control from a platform 1464 // thread to a virtual thread, changing the thread identity, and starting or 1465 // resuming the virtual thread's continuation on the carrier. 1466 // An "unmount transition" embodies the steps to transfer control from a virtual 1467 // thread to its carrier, suspending the virtual thread's continuation, and 1468 // restoring the thread identity to the platform thread. 1469 // The notifications to the VM are necessary in order to coordinate with functions 1470 // (JVMTI mostly) that disable transitions for one or all virtual threads. Starting 1471 // a transition may block if transitions are disabled. Ending a transition may 1472 // notify a thread that is waiting to disable transitions. The notifications are 1473 // also used to post JVMTI events for virtual thread start and end. 1474 1475 @IntrinsicCandidate 1476 @JvmtiMountTransition 1477 private native void endFirstTransition(); 1478 1479 @IntrinsicCandidate 1480 @JvmtiMountTransition 1481 private native void startFinalTransition(); 1482 1483 @IntrinsicCandidate 1484 @JvmtiMountTransition 1485 private native void startTransition(boolean mount); 1486 1487 @IntrinsicCandidate 1488 @JvmtiMountTransition 1489 private native void endTransition(boolean mount); 1490 1491 @IntrinsicCandidate 1492 private static native void notifyJvmtiDisableSuspend(boolean enter); 1493 1494 private static native void registerNatives(); 1495 static { 1496 registerNatives(); 1497 1498 // ensure VTHREAD_GROUP is created, may be accessed by JVMTI 1499 var group = Thread.virtualThreadGroup(); 1500 1501 // ensure event class is initialized 1502 try { 1503 MethodHandles.lookup().ensureInitialized(VirtualThreadParkEvent.class); 1504 } catch (IllegalAccessException e) { 1505 throw new ExceptionInInitializerError(e); 1506 } 1507 } 1508 1509 /** 1510 * Loads a VirtualThreadScheduler with the given class name. The class must be public 1511 * in an exported package, with public one-arg or no-arg constructor, and be visible 1512 * to the system class loader. 1513 * @param delegate the scheduler that the custom scheduler may delegate to 1514 * @param cn the class name of the custom scheduler 1515 */ 1516 private static VirtualThreadScheduler loadCustomScheduler(VirtualThreadScheduler delegate, String cn) { 1517 VirtualThreadScheduler scheduler; 1518 try { 1519 Class<?> clazz = Class.forName(cn, true, ClassLoader.getSystemClassLoader()); 1520 // 1-arg constructor 1521 try { 1522 Constructor<?> ctor = clazz.getConstructor(VirtualThreadScheduler.class); 1523 return (VirtualThreadScheduler) ctor.newInstance(delegate); 1524 } catch (NoSuchMethodException e) { 1525 // 0-arg constructor 1526 Constructor<?> ctor = clazz.getConstructor(); 1527 scheduler = (VirtualThreadScheduler) ctor.newInstance(); 1528 } 1529 } catch (Exception ex) { 1530 throw new Error(ex); 1531 } 1532 System.err.println("WARNING: Using custom default scheduler, this is an experimental feature!"); 1533 return scheduler; 1534 } 1535 1536 /** 1537 * Creates the built-in ForkJoinPool scheduler. 1538 * @param wrapped true if wrapped by a custom default scheduler 1539 */ 1540 private static VirtualThreadScheduler createBuiltinScheduler(boolean wrapped) { 1541 int parallelism, maxPoolSize, minRunnable; 1542 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism"); 1543 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize"); 1544 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable"); 1545 if (parallelismValue != null) { 1546 parallelism = Integer.parseInt(parallelismValue); 1547 } else { 1548 parallelism = Runtime.getRuntime().availableProcessors(); 1549 } 1550 if (maxPoolSizeValue != null) { 1551 maxPoolSize = Integer.parseInt(maxPoolSizeValue); 1552 parallelism = Integer.min(parallelism, maxPoolSize); 1553 } else { 1554 maxPoolSize = Integer.max(parallelism, 256); 1555 } 1556 if (minRunnableValue != null) { 1557 minRunnable = Integer.parseInt(minRunnableValue); 1558 } else { 1559 minRunnable = Integer.max(parallelism / 2, 1); 1560 } 1561 if (System.getProperty("jdk.virtualThreadScheduler.threadPoolExecutor") != null) { 1562 return new BuiltinThreadPoolExecutorScheduler(parallelism); 1563 } else { 1564 return new BuiltinForkJoinPoolScheduler(parallelism, maxPoolSize, minRunnable, wrapped); 1565 } 1566 } 1567 1568 /** 1569 * The built-in ForkJoinPool scheduler. 1570 */ 1571 private static class BuiltinForkJoinPoolScheduler 1572 extends ForkJoinPool implements VirtualThreadScheduler { 1573 1574 BuiltinForkJoinPoolScheduler(int parallelism, int maxPoolSize, int minRunnable, boolean wrapped) { 1575 ForkJoinWorkerThreadFactory factory = wrapped 1576 ? ForkJoinPool.defaultForkJoinWorkerThreadFactory 1577 : CarrierThread::new; 1578 Thread.UncaughtExceptionHandler handler = (t, e) -> { }; 1579 boolean asyncMode = true; // FIFO 1580 super(parallelism, factory, handler, asyncMode, 1581 0, maxPoolSize, minRunnable, pool -> true, 30L, SECONDS); 1582 } 1583 1584 private void adaptAndExecute(Runnable task) { 1585 execute(ForkJoinTask.adapt(task)); 1586 } 1587 1588 @Override 1589 public void onStart(VirtualThreadTask task) { 1590 adaptAndExecute(task); 1591 } 1592 1593 @Override 1594 public void onContinue(VirtualThreadTask task) { 1595 adaptAndExecute(task); 1596 } 1597 1598 @Override 1599 public ScheduledFuture<?> schedule(Runnable task, long delay, TimeUnit unit) { 1600 return super.schedule(task, delay, unit); 1601 } 1602 } 1603 1604 /** 1605 * Built-in ThreadPoolExecutor scheduler. 1606 */ 1607 private static class BuiltinThreadPoolExecutorScheduler 1608 extends ThreadPoolExecutor implements VirtualThreadScheduler { 1609 1610 BuiltinThreadPoolExecutorScheduler(int maxPoolSize) { 1611 ThreadFactory factory = task -> InnocuousThread.newThread(task); 1612 super(maxPoolSize, maxPoolSize, 1613 0L, SECONDS, 1614 new LinkedTransferQueue<>(), 1615 factory); 1616 } 1617 1618 @Override 1619 public void onStart(VirtualThreadTask task) { 1620 execute(task); 1621 } 1622 1623 @Override 1624 public void onContinue(VirtualThreadTask task) { 1625 execute(task); 1626 } 1627 } 1628 1629 /** 1630 * Wraps the scheduler to avoid leaking a direct reference with 1631 * {@link VirtualThreadScheduler#current()}. 1632 */ 1633 static VirtualThreadScheduler createExternalView(VirtualThreadScheduler delegate) { 1634 return new VirtualThreadScheduler() { 1635 private void check(VirtualThreadTask task) { 1636 var vthread = (VirtualThread) task.thread(); 1637 VirtualThreadScheduler scheduler = vthread.scheduler; 1638 if (scheduler != this && scheduler != DEFAULT_SCHEDULER) { 1639 throw new IllegalArgumentException(); 1640 } 1641 } 1642 @Override 1643 public void onStart(VirtualThreadTask task) { 1644 check(task); 1645 delegate.onStart(task); 1646 } 1647 @Override 1648 public void onContinue(VirtualThreadTask task) { 1649 check(task); 1650 delegate.onContinue(task); 1651 } 1652 @Override 1653 public String toString() { 1654 return delegate.toString(); 1655 } 1656 }; 1657 } 1658 1659 /** 1660 * Schedule a runnable task to run after a delay. 1661 */ 1662 private Future<?> schedule(Runnable command, long delay, TimeUnit unit) { 1663 return scheduler.schedule(command, delay, unit); 1664 } 1665 1666 /** 1667 * Supports scheduling a runnable task to run after a delay. It uses a number 1668 * of ScheduledThreadPoolExecutor instances to reduce contention on the delayed 1669 * work queue used. This class is used when using a custom scheduler. 1670 */ 1671 static class DelayedTaskSchedulers { 1672 private static final ScheduledExecutorService[] INSTANCE = createDelayedTaskSchedulers(); 1673 1674 static Future<?> schedule(Runnable command, long delay, TimeUnit unit) { 1675 long tid = Thread.currentThread().threadId(); 1676 int index = (int) tid & (INSTANCE.length - 1); 1677 return INSTANCE[index].schedule(command, delay, unit); 1678 } 1679 1680 private static ScheduledExecutorService[] createDelayedTaskSchedulers() { 1681 String propName = "jdk.virtualThreadScheduler.timerQueues"; 1682 String propValue = System.getProperty(propName); 1683 int queueCount; 1684 if (propValue != null) { 1685 queueCount = Integer.parseInt(propValue); 1686 if (queueCount != Integer.highestOneBit(queueCount)) { 1687 throw new RuntimeException("Value of " + propName + " must be power of 2"); 1688 } 1689 } else { 1690 int ncpus = Runtime.getRuntime().availableProcessors(); 1691 queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1); 1692 } 1693 var schedulers = new ScheduledExecutorService[queueCount]; 1694 for (int i = 0; i < queueCount; i++) { 1695 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) 1696 Executors.newScheduledThreadPool(1, task -> { 1697 Thread t = InnocuousThread.newThread("VirtualThread-unparker", task); 1698 t.setDaemon(true); 1699 return t; 1700 }); 1701 stpe.setRemoveOnCancelPolicy(true); 1702 schedulers[i] = stpe; 1703 } 1704 return schedulers; 1705 } 1706 } 1707 1708 /** 1709 * Schedule virtual threads that are ready to be scheduled after they blocked on 1710 * monitor enter. 1711 */ 1712 private static void unblockVirtualThreads() { 1713 while (true) { 1714 VirtualThread vthread = takeVirtualThreadListToUnblock(); 1715 while (vthread != null) { 1716 assert vthread.onWaitingList; 1717 VirtualThread nextThread = vthread.next; 1718 1719 // remove from list and unblock 1720 vthread.next = null; 1721 boolean changed = vthread.compareAndSetOnWaitingList(true, false); 1722 assert changed; 1723 vthread.unblock(); 1724 1725 vthread = nextThread; 1726 } 1727 } 1728 } 1729 1730 /** 1731 * Retrieves the list of virtual threads that are waiting to be unblocked, waiting 1732 * if necessary until a list of one or more threads becomes available. 1733 */ 1734 private static native VirtualThread takeVirtualThreadListToUnblock(); 1735 1736 static { 1737 var unblocker = InnocuousThread.newThread("VirtualThread-unblocker", 1738 VirtualThread::unblockVirtualThreads); 1739 unblocker.setDaemon(true); 1740 unblocker.start(); 1741 } 1742 } --- EOF ---