1 /* 2 * Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.lang; 26 27 import java.lang.invoke.MethodHandles; 28 import java.lang.invoke.VarHandle; 29 import java.lang.reflect.Constructor; 30 import java.util.Locale; 31 import java.util.Objects; 32 import java.util.concurrent.CountDownLatch; 33 import java.util.concurrent.Executors; 34 import java.util.concurrent.ForkJoinPool; 35 import java.util.concurrent.ForkJoinTask; 36 import java.util.concurrent.Future; 37 import java.util.concurrent.RejectedExecutionException; 38 import java.util.concurrent.ScheduledExecutorService; 39 import java.util.concurrent.ScheduledThreadPoolExecutor; 40 import java.util.concurrent.TimeUnit; 41 import jdk.internal.event.VirtualThreadEndEvent; 42 import jdk.internal.event.VirtualThreadParkEvent; 43 import jdk.internal.event.VirtualThreadStartEvent; 44 import jdk.internal.event.VirtualThreadSubmitFailedEvent; 45 import jdk.internal.invoke.MhUtil; 46 import jdk.internal.misc.CarrierThread; 47 import jdk.internal.misc.InnocuousThread; 48 import jdk.internal.misc.Unsafe; 49 import jdk.internal.vm.Continuation; 50 import jdk.internal.vm.ContinuationScope; 51 import jdk.internal.vm.StackableScope; 52 import jdk.internal.vm.ThreadContainer; 53 import jdk.internal.vm.ThreadContainers; 54 import jdk.internal.vm.annotation.ChangesCurrentThread; 55 import jdk.internal.vm.annotation.Hidden; 56 import jdk.internal.vm.annotation.IntrinsicCandidate; 57 import jdk.internal.vm.annotation.JvmtiHideEvents; 58 import jdk.internal.vm.annotation.JvmtiMountTransition; 59 import jdk.internal.vm.annotation.ReservedStackAccess; 60 import sun.nio.ch.Interruptible; 61 import static java.util.concurrent.TimeUnit.*; 62 63 /** 64 * A thread that is scheduled by the Java virtual machine rather than the operating system. 65 */ 66 final class VirtualThread extends BaseVirtualThread { 67 private static final Unsafe U = Unsafe.getUnsafe(); 68 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads"); 69 70 private static final BuiltinScheduler BUILTIN_SCHEDULER; 71 private static final VirtualThreadScheduler DEFAULT_SCHEDULER; 72 private static final VirtualThreadScheduler EXTERNAL_VIEW; 73 static { 74 // experimental 75 String propValue = System.getProperty("jdk.virtualThreadScheduler.implClass"); 76 if (propValue != null) { 77 BuiltinScheduler builtinScheduler = createBuiltinScheduler(true); 78 VirtualThreadScheduler externalView = builtinScheduler.createExternalView(); 79 VirtualThreadScheduler defaultScheduler = loadCustomScheduler(externalView, propValue); 80 BUILTIN_SCHEDULER = builtinScheduler; 81 DEFAULT_SCHEDULER = defaultScheduler; 82 EXTERNAL_VIEW = externalView; 83 } else { 84 var builtinScheduler = createBuiltinScheduler(false); 85 BUILTIN_SCHEDULER = builtinScheduler; 86 DEFAULT_SCHEDULER = builtinScheduler; 87 EXTERNAL_VIEW = builtinScheduler.createExternalView(); 88 } 89 } 90 91 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state"); 92 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit"); 93 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread"); 94 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination"); 95 private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList"); 96 97 // scheduler and continuation 98 private final VirtualThreadScheduler scheduler; 99 private final Continuation cont; 100 private final VirtualThreadTask runContinuation; 101 102 // virtual thread state, accessed by VM 103 private volatile int state; 104 105 /* 106 * Virtual thread state transitions: 107 * 108 * NEW -> STARTED // Thread.start, schedule to run 109 * STARTED -> TERMINATED // failed to start 110 * STARTED -> RUNNING // first run 111 * RUNNING -> TERMINATED // done 112 * 113 * RUNNING -> PARKING // Thread parking with LockSupport.park 114 * PARKING -> PARKED // cont.yield successful, parked indefinitely 115 * PARKING -> PINNED // cont.yield failed, parked indefinitely on carrier 116 * PARKED -> UNPARKED // unparked, may be scheduled to continue 117 * PINNED -> RUNNING // unparked, continue execution on same carrier 118 * UNPARKED -> RUNNING // continue execution after park 119 * 120 * RUNNING -> TIMED_PARKING // Thread parking with LockSupport.parkNanos 121 * TIMED_PARKING -> TIMED_PARKED // cont.yield successful, timed-parked 122 * TIMED_PARKING -> TIMED_PINNED // cont.yield failed, timed-parked on carrier 123 * TIMED_PARKED -> UNPARKED // unparked, may be scheduled to continue 124 * TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier 125 * 126 * RUNNING -> BLOCKING // blocking on monitor enter 127 * BLOCKING -> BLOCKED // blocked on monitor enter 128 * BLOCKED -> UNBLOCKED // unblocked, may be scheduled to continue 129 * UNBLOCKED -> RUNNING // continue execution after blocked on monitor enter 130 * 131 * RUNNING -> WAITING // transitional state during wait on monitor 132 * WAITING -> WAIT // waiting on monitor 133 * WAIT -> BLOCKED // notified, waiting to be unblocked by monitor owner 134 * WAIT -> UNBLOCKED // timed-out/interrupted 135 * 136 * RUNNING -> TIMED_WAITING // transition state during timed-waiting on monitor 137 * TIMED_WAITING -> TIMED_WAIT // timed-waiting on monitor 138 * TIMED_WAIT -> BLOCKED // notified, waiting to be unblocked by monitor owner 139 * TIMED_WAIT -> UNBLOCKED // timed-out/interrupted 140 * 141 * RUNNING -> YIELDING // Thread.yield 142 * YIELDING -> YIELDED // cont.yield successful, may be scheduled to continue 143 * YIELDING -> RUNNING // cont.yield failed 144 * YIELDED -> RUNNING // continue execution after Thread.yield 145 */ 146 private static final int NEW = 0; 147 private static final int STARTED = 1; 148 private static final int RUNNING = 2; // runnable-mounted 149 150 // untimed and timed parking 151 private static final int PARKING = 3; 152 private static final int PARKED = 4; // unmounted 153 private static final int PINNED = 5; // mounted 154 private static final int TIMED_PARKING = 6; 155 private static final int TIMED_PARKED = 7; // unmounted 156 private static final int TIMED_PINNED = 8; // mounted 157 private static final int UNPARKED = 9; // unmounted but runnable 158 159 // Thread.yield 160 private static final int YIELDING = 10; 161 private static final int YIELDED = 11; // unmounted but runnable 162 163 // monitor enter 164 private static final int BLOCKING = 12; 165 private static final int BLOCKED = 13; // unmounted 166 private static final int UNBLOCKED = 14; // unmounted but runnable 167 168 // monitor wait/timed-wait 169 private static final int WAITING = 15; 170 private static final int WAIT = 16; // waiting in Object.wait 171 private static final int TIMED_WAITING = 17; 172 private static final int TIMED_WAIT = 18; // waiting in timed-Object.wait 173 174 private static final int TERMINATED = 99; // final state 175 176 // can be suspended from scheduling when unmounted 177 private static final int SUSPENDED = 1 << 8; 178 179 // parking permit made available by LockSupport.unpark 180 private volatile boolean parkPermit; 181 182 // blocking permit made available by unblocker thread when another thread exits monitor 183 private volatile boolean blockPermit; 184 185 // true when on the list of virtual threads waiting to be unblocked 186 private volatile boolean onWaitingList; 187 188 // next virtual thread on the list of virtual threads waiting to be unblocked 189 private volatile VirtualThread next; 190 191 // notified by Object.notify/notifyAll while waiting in Object.wait 192 private volatile boolean notified; 193 194 // true when waiting in Object.wait, false for VM internal uninterruptible Object.wait 195 private volatile boolean interruptibleWait; 196 197 // timed-wait support 198 private byte timedWaitSeqNo; 199 200 // timeout for timed-park and timed-wait, only accessed on current/carrier thread 201 private long timeout; 202 203 // timer task for timed-park and timed-wait, only accessed on current/carrier thread 204 private Future<?> timeoutTask; 205 206 // carrier thread when mounted, accessed by VM 207 private volatile Thread carrierThread; 208 209 // termination object when joining, created lazily if needed 210 private volatile CountDownLatch termination; 211 212 /** 213 * Return the built-in scheduler. 214 */ 215 static VirtualThreadScheduler builtinScheduler() { 216 return BUILTIN_SCHEDULER; 217 } 218 219 /** 220 * Returns the default scheduler, usually the same as the built-in scheduler. 221 */ 222 static VirtualThreadScheduler defaultScheduler() { 223 return DEFAULT_SCHEDULER; 224 } 225 226 /** 227 * Returns the continuation scope used for virtual threads. 228 */ 229 static ContinuationScope continuationScope() { 230 return VTHREAD_SCOPE; 231 } 232 233 /** 234 * Return the scheduler for this thread. 235 * @param trusted true if caller is trusted, false if not trusted 236 */ 237 VirtualThreadScheduler scheduler(boolean trusted) { 238 if (scheduler == BUILTIN_SCHEDULER && !trusted) { 239 return EXTERNAL_VIEW; 240 } else { 241 return scheduler; 242 } 243 } 244 245 /** 246 * Creates a new {@code VirtualThread} to run the given task with the given scheduler. 247 * 248 * @param scheduler the scheduler or null for default scheduler 249 * @param name thread name 250 * @param characteristics characteristics 251 * @param task the task to execute 252 */ 253 VirtualThread(VirtualThreadScheduler scheduler, 254 String name, 255 int characteristics, 256 Runnable task, 257 Object att) { 258 super(name, characteristics, /*bound*/ false); 259 Objects.requireNonNull(task); 260 261 // use default scheduler if not provided 262 if (scheduler == null) { 263 scheduler = DEFAULT_SCHEDULER; 264 } else if (scheduler == EXTERNAL_VIEW) { 265 throw new UnsupportedOperationException(); 266 } 267 this.scheduler = scheduler; 268 this.cont = new VThreadContinuation(this, task); 269 270 if (scheduler == BUILTIN_SCHEDULER) { 271 this.runContinuation = new BuiltinSchedulerTask(this); 272 } else { 273 this.runContinuation = new CustomSchedulerTask(this, att); 274 } 275 } 276 277 /** 278 * The task to execute when using the built-in scheduler. 279 */ 280 static final class BuiltinSchedulerTask implements VirtualThreadTask { 281 private final VirtualThread vthread; 282 BuiltinSchedulerTask(VirtualThread vthread) { 283 this.vthread = vthread; 284 } 285 @Override 286 public Object attach(Object att) { 287 throw new UnsupportedOperationException(); 288 } 289 @Override 290 public Object attachment() { 291 throw new UnsupportedOperationException(); 292 } 293 @Override 294 public Thread thread() { 295 return vthread; 296 } 297 @Override 298 public void run() { 299 vthread.runContinuation();; 300 } 301 } 302 303 /** 304 * The task to execute when using a custom scheduler. 305 */ 306 static final class CustomSchedulerTask implements VirtualThreadTask { 307 private static final VarHandle ATT = 308 MhUtil.findVarHandle(MethodHandles.lookup(), "att", Object.class); 309 private final VirtualThread vthread; 310 private volatile Object att; 311 CustomSchedulerTask(VirtualThread vthread, Object att) { 312 this.vthread = vthread; 313 if (att != null) { 314 this.att = att; 315 } 316 } 317 @Override 318 public Object attach(Object att) { 319 return ATT.getAndSet(this, att); 320 } 321 @Override 322 public Object attachment() { 323 return att; 324 } 325 @Override 326 public Thread thread() { 327 return vthread; 328 } 329 @Override 330 public void run() { 331 vthread.runContinuation();; 332 } 333 } 334 335 /** 336 * The continuation that a virtual thread executes. 337 */ 338 private static class VThreadContinuation extends Continuation { 339 VThreadContinuation(VirtualThread vthread, Runnable task) { 340 super(VTHREAD_SCOPE, wrap(vthread, task)); 341 } 342 @Override 343 protected void onPinned(Continuation.Pinned reason) { 344 } 345 private static Runnable wrap(VirtualThread vthread, Runnable task) { 346 return new Runnable() { 347 @Hidden 348 @JvmtiHideEvents 349 public void run() { 350 vthread.notifyJvmtiStart(); // notify JVMTI 351 try { 352 vthread.run(task); 353 } finally { 354 vthread.notifyJvmtiEnd(); // notify JVMTI 355 } 356 } 357 }; 358 } 359 } 360 361 /** 362 * Runs or continues execution on the current thread. The virtual thread is mounted 363 * on the current thread before the task runs or continues. It unmounts when the 364 * task completes or yields. 365 */ 366 @ChangesCurrentThread // allow mount/unmount to be inlined 367 private void runContinuation() { 368 // the carrier must be a platform thread 369 if (Thread.currentThread().isVirtual()) { 370 throw new WrongThreadException(); 371 } 372 373 // set state to RUNNING 374 int initialState = state(); 375 if (initialState == STARTED || initialState == UNPARKED 376 || initialState == UNBLOCKED || initialState == YIELDED) { 377 // newly started or continue after parking/blocking/Thread.yield 378 if (!compareAndSetState(initialState, RUNNING)) { 379 return; 380 } 381 // consume permit when continuing after parking or blocking. If continue 382 // after a timed-park or timed-wait then the timeout task is cancelled. 383 if (initialState == UNPARKED) { 384 cancelTimeoutTask(); 385 setParkPermit(false); 386 } else if (initialState == UNBLOCKED) { 387 cancelTimeoutTask(); 388 blockPermit = false; 389 } 390 } else { 391 // not runnable 392 return; 393 } 394 395 mount(); 396 try { 397 cont.run(); 398 } finally { 399 unmount(); 400 if (cont.isDone()) { 401 afterDone(); 402 } else { 403 afterYield(); 404 } 405 } 406 } 407 408 /** 409 * Cancel timeout task when continuing after timed-park or timed-wait. 410 * The timeout task may be executing, or may have already completed. 411 */ 412 private void cancelTimeoutTask() { 413 if (timeoutTask != null) { 414 timeoutTask.cancel(false); 415 timeoutTask = null; 416 } 417 } 418 419 /** 420 * Submits the runContinuation task to the scheduler. For the built-in scheduler, 421 * the task will be pushed to the local queue if possible, otherwise it will be 422 * pushed to an external submission queue. 423 * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown 424 * @throws RejectedExecutionException 425 */ 426 private void submitRunContinuation(boolean retryOnOOME) { 427 boolean done = false; 428 while (!done) { 429 try { 430 // Pin the continuation to prevent the virtual thread from unmounting 431 // when submitting a task. For the default scheduler this ensures that 432 // the carrier doesn't change when pushing a task. For other schedulers 433 // it avoids deadlock that could arise due to carriers and virtual 434 // threads contending for a lock. 435 if (currentThread().isVirtual()) { 436 Continuation.pin(); 437 try { 438 scheduler.onContinue(runContinuation); 439 } finally { 440 Continuation.unpin(); 441 } 442 } else { 443 scheduler.onContinue(runContinuation); 444 } 445 done = true; 446 } catch (RejectedExecutionException ree) { 447 submitFailed(ree); 448 throw ree; 449 } catch (OutOfMemoryError e) { 450 if (retryOnOOME) { 451 U.park(false, 100_000_000); // 100ms 452 } else { 453 throw e; 454 } 455 } 456 } 457 } 458 459 /** 460 * Submits the runContinuation task to the scheduler. For the default scheduler, 461 * and calling it on a worker thread, the task will be pushed to the local queue, 462 * otherwise it will be pushed to an external submission queue. 463 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 464 * @throws RejectedExecutionException 465 */ 466 private void submitRunContinuation() { 467 submitRunContinuation(true); 468 } 469 470 /** 471 * Invoked from a carrier thread to lazy submit the runContinuation task to the 472 * carrier's local queue if the queue is empty. If not empty, or invoked by a thread 473 * for a custom scheduler, then it just submits the task to the scheduler. 474 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 475 * @throws RejectedExecutionException 476 * @see ForkJoinPool#lazySubmit(ForkJoinTask) 477 */ 478 private void lazySubmitRunContinuation() { 479 assert !currentThread().isVirtual(); 480 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 481 try { 482 ct.getPool().lazySubmit(ForkJoinTask.adapt(runContinuation)); 483 } catch (RejectedExecutionException ree) { 484 submitFailed(ree); 485 throw ree; 486 } catch (OutOfMemoryError e) { 487 submitRunContinuation(); 488 } 489 } else { 490 submitRunContinuation(); 491 } 492 } 493 494 /** 495 * Invoked from a carrier thread to externally submit the runContinuation task to the 496 * scheduler. If invoked by a thread for a custom scheduler, then it just submits the 497 * task to the scheduler. 498 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 499 * @throws RejectedExecutionException 500 * @see ForkJoinPool#externalSubmit(ForkJoinTask) 501 */ 502 private void externalSubmitRunContinuation() { 503 assert !currentThread().isVirtual(); 504 if (currentThread() instanceof CarrierThread ct) { 505 try { 506 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation)); 507 } catch (RejectedExecutionException ree) { 508 submitFailed(ree); 509 throw ree; 510 } catch (OutOfMemoryError e) { 511 submitRunContinuation(); 512 } 513 } else { 514 submitRunContinuation(); 515 } 516 } 517 518 /** 519 * Invoked from Thread.start to externally submit the runContinuation task to the 520 * scheduler. If this virtual thread is scheduled by the built-in scheduler, 521 * and this method is called from a virtual thread scheduled by the built-in 522 * scheduler, then it uses externalSubmit to ensure that the task is pushed to an 523 * external submission queue rather than the local queue. 524 * @throws RejectedExecutionException 525 * @throws OutOfMemoryError 526 * @see ForkJoinPool#externalSubmit(ForkJoinTask) 527 */ 528 private void externalSubmitRunContinuationOrThrow() { 529 try { 530 if (currentThread().isVirtual()) { 531 // Pin the continuation to prevent the virtual thread from unmounting 532 // when submitting a task. This avoids deadlock that could arise due to 533 // carriers and virtual threads contending for a lock. 534 Continuation.pin(); 535 try { 536 if (scheduler == BUILTIN_SCHEDULER 537 && currentCarrierThread() instanceof CarrierThread ct) { 538 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation)); 539 } else { 540 scheduler.onStart(runContinuation); 541 } 542 } finally { 543 Continuation.unpin(); 544 } 545 } else { 546 scheduler.onStart(runContinuation); 547 } 548 } catch (RejectedExecutionException ree) { 549 submitFailed(ree); 550 throw ree; 551 } 552 } 553 554 /** 555 * If enabled, emits a JFR VirtualThreadSubmitFailedEvent. 556 */ 557 private void submitFailed(RejectedExecutionException ree) { 558 var event = new VirtualThreadSubmitFailedEvent(); 559 if (event.isEnabled()) { 560 event.javaThreadId = threadId(); 561 event.exceptionMessage = ree.getMessage(); 562 event.commit(); 563 } 564 } 565 566 /** 567 * Runs a task in the context of this virtual thread. 568 */ 569 private void run(Runnable task) { 570 assert Thread.currentThread() == this && state == RUNNING; 571 572 // emit JFR event if enabled 573 if (VirtualThreadStartEvent.isTurnedOn()) { 574 var event = new VirtualThreadStartEvent(); 575 event.javaThreadId = threadId(); 576 event.commit(); 577 } 578 579 Object bindings = Thread.scopedValueBindings(); 580 try { 581 runWith(bindings, task); 582 } catch (Throwable exc) { 583 dispatchUncaughtException(exc); 584 } finally { 585 // pop any remaining scopes from the stack, this may block 586 StackableScope.popAll(); 587 588 // emit JFR event if enabled 589 if (VirtualThreadEndEvent.isTurnedOn()) { 590 var event = new VirtualThreadEndEvent(); 591 event.javaThreadId = threadId(); 592 event.commit(); 593 } 594 } 595 } 596 597 /** 598 * Mounts this virtual thread onto the current platform thread. On 599 * return, the current thread is the virtual thread. 600 */ 601 @ChangesCurrentThread 602 @ReservedStackAccess 603 private void mount() { 604 // notify JVMTI before mount 605 notifyJvmtiMount(/*hide*/true); 606 607 // sets the carrier thread 608 Thread carrier = Thread.currentCarrierThread(); 609 setCarrierThread(carrier); 610 611 // sync up carrier thread interrupted status if needed 612 if (interrupted) { 613 carrier.setInterrupt(); 614 } else if (carrier.isInterrupted()) { 615 synchronized (interruptLock) { 616 // need to recheck interrupted status 617 if (!interrupted) { 618 carrier.clearInterrupt(); 619 } 620 } 621 } 622 623 // set Thread.currentThread() to return this virtual thread 624 carrier.setCurrentThread(this); 625 } 626 627 /** 628 * Unmounts this virtual thread from the carrier. On return, the 629 * current thread is the current platform thread. 630 */ 631 @ChangesCurrentThread 632 @ReservedStackAccess 633 private void unmount() { 634 assert !Thread.holdsLock(interruptLock); 635 636 // set Thread.currentThread() to return the platform thread 637 Thread carrier = this.carrierThread; 638 carrier.setCurrentThread(carrier); 639 640 // break connection to carrier thread, synchronized with interrupt 641 synchronized (interruptLock) { 642 setCarrierThread(null); 643 } 644 carrier.clearInterrupt(); 645 646 // notify JVMTI after unmount 647 notifyJvmtiUnmount(/*hide*/false); 648 } 649 650 /** 651 * Invokes Continuation.yield, notifying JVMTI (if enabled) to hide frames until 652 * the continuation continues. 653 */ 654 @Hidden 655 private boolean yieldContinuation() { 656 notifyJvmtiUnmount(/*hide*/true); 657 try { 658 return Continuation.yield(VTHREAD_SCOPE); 659 } finally { 660 notifyJvmtiMount(/*hide*/false); 661 } 662 } 663 664 /** 665 * Invoked in the context of the carrier thread after the Continuation yields when 666 * parking, blocking on monitor enter, Object.wait, or Thread.yield. 667 */ 668 private void afterYield() { 669 assert carrierThread == null; 670 671 // re-adjust parallelism if the virtual thread yielded when compensating 672 if (currentThread() instanceof CarrierThread ct) { 673 ct.endBlocking(); 674 } 675 676 int s = state(); 677 678 // LockSupport.park/parkNanos 679 if (s == PARKING || s == TIMED_PARKING) { 680 int newState; 681 if (s == PARKING) { 682 setState(newState = PARKED); 683 } else { 684 // schedule unpark 685 long timeout = this.timeout; 686 assert timeout > 0; 687 timeoutTask = schedule(this::parkTimeoutExpired, timeout, NANOSECONDS); 688 setState(newState = TIMED_PARKED); 689 } 690 691 // may have been unparked while parking 692 if (parkPermit && compareAndSetState(newState, UNPARKED)) { 693 // lazy submit if local queue is empty 694 lazySubmitRunContinuation(); 695 } 696 return; 697 } 698 699 // Thread.yield 700 if (s == YIELDING) { 701 setState(YIELDED); 702 703 // external submit if there are no tasks in the local task queue 704 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 705 externalSubmitRunContinuation(); 706 } else { 707 submitRunContinuation(); 708 } 709 return; 710 } 711 712 // blocking on monitorenter 713 if (s == BLOCKING) { 714 setState(BLOCKED); 715 716 // may have been unblocked while blocking 717 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) { 718 // lazy submit if local queue is empty 719 lazySubmitRunContinuation(); 720 } 721 return; 722 } 723 724 // Object.wait 725 if (s == WAITING || s == TIMED_WAITING) { 726 int newState; 727 boolean interruptible = interruptibleWait; 728 if (s == WAITING) { 729 setState(newState = WAIT); 730 } else { 731 // For timed-wait, a timeout task is scheduled to execute. The timeout 732 // task will change the thread state to UNBLOCKED and submit the thread 733 // to the scheduler. A sequence number is used to ensure that the timeout 734 // task only unblocks the thread for this timed-wait. We synchronize with 735 // the timeout task to coordinate access to the sequence number and to 736 // ensure the timeout task doesn't execute until the thread has got to 737 // the TIMED_WAIT state. 738 long timeout = this.timeout; 739 assert timeout > 0; 740 synchronized (timedWaitLock()) { 741 byte seqNo = ++timedWaitSeqNo; 742 timeoutTask = schedule(() -> waitTimeoutExpired(seqNo), timeout, MILLISECONDS); 743 setState(newState = TIMED_WAIT); 744 } 745 } 746 747 // may have been notified while in transition to wait state 748 if (notified && compareAndSetState(newState, BLOCKED)) { 749 // may have even been unblocked already 750 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) { 751 submitRunContinuation(); 752 } 753 return; 754 } 755 756 // may have been interrupted while in transition to wait state 757 if (interruptible && interrupted && compareAndSetState(newState, UNBLOCKED)) { 758 submitRunContinuation(); 759 return; 760 } 761 return; 762 } 763 764 assert false; 765 } 766 767 /** 768 * Invoked after the continuation completes. 769 */ 770 private void afterDone() { 771 afterDone(true); 772 } 773 774 /** 775 * Invoked after the continuation completes (or start failed). Sets the thread 776 * state to TERMINATED and notifies anyone waiting for the thread to terminate. 777 * 778 * @param notifyContainer true if its container should be notified 779 */ 780 private void afterDone(boolean notifyContainer) { 781 assert carrierThread == null; 782 setState(TERMINATED); 783 784 // notify anyone waiting for this virtual thread to terminate 785 CountDownLatch termination = this.termination; 786 if (termination != null) { 787 assert termination.getCount() == 1; 788 termination.countDown(); 789 } 790 791 // notify container 792 if (notifyContainer) { 793 threadContainer().remove(this); 794 } 795 796 // clear references to thread locals 797 clearReferences(); 798 } 799 800 /** 801 * Schedules this {@code VirtualThread} to execute. 802 * 803 * @throws IllegalStateException if the container is shutdown or closed 804 * @throws IllegalThreadStateException if the thread has already been started 805 * @throws RejectedExecutionException if the scheduler cannot accept a task 806 */ 807 @Override 808 void start(ThreadContainer container) { 809 if (!compareAndSetState(NEW, STARTED)) { 810 throw new IllegalThreadStateException("Already started"); 811 } 812 813 // bind thread to container 814 assert threadContainer() == null; 815 setThreadContainer(container); 816 817 // start thread 818 boolean addedToContainer = false; 819 boolean started = false; 820 try { 821 container.add(this); // may throw 822 addedToContainer = true; 823 824 // scoped values may be inherited 825 inheritScopedValueBindings(container); 826 827 // submit task to run thread, using externalSubmit if possible 828 externalSubmitRunContinuationOrThrow(); 829 started = true; 830 } finally { 831 if (!started) { 832 afterDone(addedToContainer); 833 } 834 } 835 } 836 837 @Override 838 public void start() { 839 start(ThreadContainers.root()); 840 } 841 842 @Override 843 public void run() { 844 // do nothing 845 } 846 847 /** 848 * Parks until unparked or interrupted. If already unparked then the parking 849 * permit is consumed and this method completes immediately (meaning it doesn't 850 * yield). It also completes immediately if the interrupted status is set. 851 */ 852 @Override 853 void park() { 854 assert Thread.currentThread() == this; 855 856 // complete immediately if parking permit available or interrupted 857 if (getAndSetParkPermit(false) || interrupted) 858 return; 859 860 // park the thread 861 boolean yielded = false; 862 long eventStartTime = VirtualThreadParkEvent.eventStartTime(); 863 setState(PARKING); 864 try { 865 yielded = yieldContinuation(); 866 } catch (OutOfMemoryError e) { 867 // park on carrier 868 } finally { 869 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 870 if (yielded) { 871 VirtualThreadParkEvent.offer(eventStartTime, Long.MIN_VALUE); 872 } else { 873 assert state() == PARKING; 874 setState(RUNNING); 875 } 876 } 877 878 // park on the carrier thread when pinned 879 if (!yielded) { 880 parkOnCarrierThread(false, 0); 881 } 882 } 883 884 /** 885 * Parks up to the given waiting time or until unparked or interrupted. 886 * If already unparked then the parking permit is consumed and this method 887 * completes immediately (meaning it doesn't yield). It also completes immediately 888 * if the interrupted status is set or the waiting time is {@code <= 0}. 889 * 890 * @param nanos the maximum number of nanoseconds to wait. 891 */ 892 @Override 893 void parkNanos(long nanos) { 894 assert Thread.currentThread() == this; 895 896 // complete immediately if parking permit available or interrupted 897 if (getAndSetParkPermit(false) || interrupted) 898 return; 899 900 // park the thread for the waiting time 901 if (nanos > 0) { 902 long startTime = System.nanoTime(); 903 904 // park the thread, afterYield will schedule the thread to unpark 905 boolean yielded = false; 906 long eventStartTime = VirtualThreadParkEvent.eventStartTime(); 907 timeout = nanos; 908 setState(TIMED_PARKING); 909 try { 910 yielded = yieldContinuation(); 911 } catch (OutOfMemoryError e) { 912 // park on carrier 913 } finally { 914 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 915 if (yielded) { 916 VirtualThreadParkEvent.offer(eventStartTime, nanos); 917 } else { 918 assert state() == TIMED_PARKING; 919 setState(RUNNING); 920 } 921 } 922 923 // park on carrier thread for remaining time when pinned (or OOME) 924 if (!yielded) { 925 long remainingNanos = nanos - (System.nanoTime() - startTime); 926 parkOnCarrierThread(true, remainingNanos); 927 } 928 } 929 } 930 931 /** 932 * Parks the current carrier thread up to the given waiting time or until 933 * unparked or interrupted. If the virtual thread is interrupted then the 934 * interrupted status will be propagated to the carrier thread. 935 * @param timed true for a timed park, false for untimed 936 * @param nanos the waiting time in nanoseconds 937 */ 938 private void parkOnCarrierThread(boolean timed, long nanos) { 939 assert state() == RUNNING; 940 941 setState(timed ? TIMED_PINNED : PINNED); 942 try { 943 if (!parkPermit) { 944 if (!timed) { 945 U.park(false, 0); 946 } else if (nanos > 0) { 947 U.park(false, nanos); 948 } 949 } 950 } finally { 951 setState(RUNNING); 952 } 953 954 // consume parking permit 955 setParkPermit(false); 956 957 // JFR jdk.VirtualThreadPinned event 958 postPinnedEvent("LockSupport.park"); 959 } 960 961 /** 962 * Call into VM when pinned to record a JFR jdk.VirtualThreadPinned event. 963 * Recording the event in the VM avoids having JFR event recorded in Java 964 * with the same name, but different ID, to events recorded by the VM. 965 */ 966 @Hidden 967 private static native void postPinnedEvent(String op); 968 969 /** 970 * Re-enables this virtual thread for scheduling. If this virtual thread is parked 971 * then its task is scheduled to continue, otherwise its next call to {@code park} or 972 * {@linkplain #parkNanos(long) parkNanos} is guaranteed not to block. 973 * @throws RejectedExecutionException if the scheduler cannot accept a task 974 */ 975 @Override 976 void unpark() { 977 if (!getAndSetParkPermit(true) && currentThread() != this) { 978 int s = state(); 979 980 // unparked while parked 981 if ((s == PARKED || s == TIMED_PARKED) && compareAndSetState(s, UNPARKED)) { 982 submitRunContinuation(); 983 return; 984 } 985 986 // unparked while parked when pinned 987 if (s == PINNED || s == TIMED_PINNED) { 988 // unpark carrier thread when pinned 989 disableSuspendAndPreempt(); 990 try { 991 synchronized (carrierThreadAccessLock()) { 992 Thread carrier = carrierThread; 993 if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) { 994 U.unpark(carrier); 995 } 996 } 997 } finally { 998 enableSuspendAndPreempt(); 999 } 1000 return; 1001 } 1002 } 1003 } 1004 1005 /** 1006 * Invoked by unblocker thread to unblock this virtual thread. 1007 */ 1008 private void unblock() { 1009 assert !Thread.currentThread().isVirtual(); 1010 blockPermit = true; 1011 if (state() == BLOCKED && compareAndSetState(BLOCKED, UNBLOCKED)) { 1012 submitRunContinuation(); 1013 } 1014 } 1015 1016 /** 1017 * Invoked by FJP worker thread or STPE thread when park timeout expires. 1018 */ 1019 private void parkTimeoutExpired() { 1020 assert !VirtualThread.currentThread().isVirtual(); 1021 if (!getAndSetParkPermit(true) 1022 && (state() == TIMED_PARKED) 1023 && compareAndSetState(TIMED_PARKED, UNPARKED)) { 1024 lazySubmitRunContinuation(); 1025 } 1026 } 1027 1028 /** 1029 * Invoked by FJP worker thread or STPE thread when wait timeout expires. 1030 * If the virtual thread is in timed-wait then this method will unblock the thread 1031 * and submit its task so that it continues and attempts to reenter the monitor. 1032 * This method does nothing if the thread has been woken by notify or interrupt. 1033 */ 1034 private void waitTimeoutExpired(byte seqNo) { 1035 assert !Thread.currentThread().isVirtual(); 1036 for (;;) { 1037 boolean unblocked = false; 1038 synchronized (timedWaitLock()) { 1039 if (seqNo != timedWaitSeqNo) { 1040 // this timeout task is for a past timed-wait 1041 return; 1042 } 1043 int s = state(); 1044 if (s == TIMED_WAIT) { 1045 unblocked = compareAndSetState(TIMED_WAIT, UNBLOCKED); 1046 } else if (s != (TIMED_WAIT | SUSPENDED)) { 1047 // notified or interrupted, no longer waiting 1048 return; 1049 } 1050 } 1051 if (unblocked) { 1052 lazySubmitRunContinuation(); 1053 return; 1054 } 1055 // need to retry when thread is suspended in time-wait 1056 Thread.yield(); 1057 } 1058 } 1059 1060 /** 1061 * Attempts to yield the current virtual thread (Thread.yield). 1062 */ 1063 void tryYield() { 1064 assert Thread.currentThread() == this; 1065 setState(YIELDING); 1066 boolean yielded = false; 1067 try { 1068 yielded = yieldContinuation(); // may throw 1069 } finally { 1070 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 1071 if (!yielded) { 1072 assert state() == YIELDING; 1073 setState(RUNNING); 1074 } 1075 } 1076 } 1077 1078 /** 1079 * Sleep the current thread for the given sleep time (in nanoseconds). If 1080 * nanos is 0 then the thread will attempt to yield. 1081 * 1082 * @implNote This implementation parks the thread for the given sleeping time 1083 * and will therefore be observed in PARKED state during the sleep. Parking 1084 * will consume the parking permit so this method makes available the parking 1085 * permit after the sleep. This may be observed as a spurious, but benign, 1086 * wakeup when the thread subsequently attempts to park. 1087 * 1088 * @param nanos the maximum number of nanoseconds to sleep 1089 * @throws InterruptedException if interrupted while sleeping 1090 */ 1091 void sleepNanos(long nanos) throws InterruptedException { 1092 assert Thread.currentThread() == this && nanos >= 0; 1093 if (getAndClearInterrupt()) 1094 throw new InterruptedException(); 1095 if (nanos == 0) { 1096 tryYield(); 1097 } else { 1098 // park for the sleep time 1099 try { 1100 long remainingNanos = nanos; 1101 long startNanos = System.nanoTime(); 1102 while (remainingNanos > 0) { 1103 parkNanos(remainingNanos); 1104 if (getAndClearInterrupt()) { 1105 throw new InterruptedException(); 1106 } 1107 remainingNanos = nanos - (System.nanoTime() - startNanos); 1108 } 1109 } finally { 1110 // may have been unparked while sleeping 1111 setParkPermit(true); 1112 } 1113 } 1114 } 1115 1116 /** 1117 * Waits up to {@code nanos} nanoseconds for this virtual thread to terminate. 1118 * A timeout of {@code 0} means to wait forever. 1119 * 1120 * @throws InterruptedException if interrupted while waiting 1121 * @return true if the thread has terminated 1122 */ 1123 boolean joinNanos(long nanos) throws InterruptedException { 1124 if (state() == TERMINATED) 1125 return true; 1126 1127 // ensure termination object exists, then re-check state 1128 CountDownLatch termination = getTermination(); 1129 if (state() == TERMINATED) 1130 return true; 1131 1132 // wait for virtual thread to terminate 1133 if (nanos == 0) { 1134 termination.await(); 1135 } else { 1136 boolean terminated = termination.await(nanos, NANOSECONDS); 1137 if (!terminated) { 1138 // waiting time elapsed 1139 return false; 1140 } 1141 } 1142 assert state() == TERMINATED; 1143 return true; 1144 } 1145 1146 @Override 1147 void blockedOn(Interruptible b) { 1148 disableSuspendAndPreempt(); 1149 try { 1150 super.blockedOn(b); 1151 } finally { 1152 enableSuspendAndPreempt(); 1153 } 1154 } 1155 1156 @Override 1157 public void interrupt() { 1158 if (Thread.currentThread() != this) { 1159 // if current thread is a virtual thread then prevent it from being 1160 // suspended or unmounted when entering or holding interruptLock 1161 Interruptible blocker; 1162 disableSuspendAndPreempt(); 1163 try { 1164 synchronized (interruptLock) { 1165 interrupted = true; 1166 blocker = nioBlocker(); 1167 if (blocker != null) { 1168 blocker.interrupt(this); 1169 } 1170 1171 // interrupt carrier thread if mounted 1172 Thread carrier = carrierThread; 1173 if (carrier != null) carrier.setInterrupt(); 1174 } 1175 } finally { 1176 enableSuspendAndPreempt(); 1177 } 1178 1179 // notify blocker after releasing interruptLock 1180 if (blocker != null) { 1181 blocker.postInterrupt(); 1182 } 1183 1184 // make available parking permit, unpark thread if parked 1185 unpark(); 1186 1187 // if thread is waiting in Object.wait then schedule to try to reenter 1188 int s = state(); 1189 if ((s == WAIT || s == TIMED_WAIT) && compareAndSetState(s, UNBLOCKED)) { 1190 submitRunContinuation(); 1191 } 1192 1193 } else { 1194 interrupted = true; 1195 carrierThread.setInterrupt(); 1196 setParkPermit(true); 1197 } 1198 } 1199 1200 @Override 1201 public boolean isInterrupted() { 1202 return interrupted; 1203 } 1204 1205 @Override 1206 boolean getAndClearInterrupt() { 1207 assert Thread.currentThread() == this; 1208 boolean oldValue = interrupted; 1209 if (oldValue) { 1210 disableSuspendAndPreempt(); 1211 try { 1212 synchronized (interruptLock) { 1213 interrupted = false; 1214 carrierThread.clearInterrupt(); 1215 } 1216 } finally { 1217 enableSuspendAndPreempt(); 1218 } 1219 } 1220 return oldValue; 1221 } 1222 1223 @Override 1224 Thread.State threadState() { 1225 int s = state(); 1226 switch (s & ~SUSPENDED) { 1227 case NEW: 1228 return Thread.State.NEW; 1229 case STARTED: 1230 // return NEW if thread container not yet set 1231 if (threadContainer() == null) { 1232 return Thread.State.NEW; 1233 } else { 1234 return Thread.State.RUNNABLE; 1235 } 1236 case UNPARKED: 1237 case UNBLOCKED: 1238 case YIELDED: 1239 // runnable, not mounted 1240 return Thread.State.RUNNABLE; 1241 case RUNNING: 1242 // if mounted then return state of carrier thread 1243 if (Thread.currentThread() != this) { 1244 disableSuspendAndPreempt(); 1245 try { 1246 synchronized (carrierThreadAccessLock()) { 1247 Thread carrierThread = this.carrierThread; 1248 if (carrierThread != null) { 1249 return carrierThread.threadState(); 1250 } 1251 } 1252 } finally { 1253 enableSuspendAndPreempt(); 1254 } 1255 } 1256 // runnable, mounted 1257 return Thread.State.RUNNABLE; 1258 case PARKING: 1259 case TIMED_PARKING: 1260 case WAITING: 1261 case TIMED_WAITING: 1262 case YIELDING: 1263 // runnable, in transition 1264 return Thread.State.RUNNABLE; 1265 case PARKED: 1266 case PINNED: 1267 case WAIT: 1268 return Thread.State.WAITING; 1269 case TIMED_PARKED: 1270 case TIMED_PINNED: 1271 case TIMED_WAIT: 1272 return Thread.State.TIMED_WAITING; 1273 case BLOCKING: 1274 case BLOCKED: 1275 return Thread.State.BLOCKED; 1276 case TERMINATED: 1277 return Thread.State.TERMINATED; 1278 default: 1279 throw new InternalError(); 1280 } 1281 } 1282 1283 @Override 1284 boolean alive() { 1285 int s = state; 1286 return (s != NEW && s != TERMINATED); 1287 } 1288 1289 @Override 1290 boolean isTerminated() { 1291 return (state == TERMINATED); 1292 } 1293 1294 @Override 1295 StackTraceElement[] asyncGetStackTrace() { 1296 StackTraceElement[] stackTrace; 1297 do { 1298 stackTrace = (carrierThread != null) 1299 ? super.asyncGetStackTrace() // mounted 1300 : tryGetStackTrace(); // unmounted 1301 if (stackTrace == null) { 1302 Thread.yield(); 1303 } 1304 } while (stackTrace == null); 1305 return stackTrace; 1306 } 1307 1308 /** 1309 * Returns the stack trace for this virtual thread if it is unmounted. 1310 * Returns null if the thread is mounted or in transition. 1311 */ 1312 private StackTraceElement[] tryGetStackTrace() { 1313 int initialState = state() & ~SUSPENDED; 1314 switch (initialState) { 1315 case NEW, STARTED, TERMINATED -> { 1316 return new StackTraceElement[0]; // unmounted, empty stack 1317 } 1318 case RUNNING, PINNED, TIMED_PINNED -> { 1319 return null; // mounted 1320 } 1321 case PARKED, TIMED_PARKED, BLOCKED, WAIT, TIMED_WAIT -> { 1322 // unmounted, not runnable 1323 } 1324 case UNPARKED, UNBLOCKED, YIELDED -> { 1325 // unmounted, runnable 1326 } 1327 case PARKING, TIMED_PARKING, BLOCKING, YIELDING, WAITING, TIMED_WAITING -> { 1328 return null; // in transition 1329 } 1330 default -> throw new InternalError("" + initialState); 1331 } 1332 1333 // thread is unmounted, prevent it from continuing 1334 int suspendedState = initialState | SUSPENDED; 1335 if (!compareAndSetState(initialState, suspendedState)) { 1336 return null; 1337 } 1338 1339 // get stack trace and restore state 1340 StackTraceElement[] stack; 1341 try { 1342 stack = cont.getStackTrace(); 1343 } finally { 1344 assert state == suspendedState; 1345 setState(initialState); 1346 } 1347 boolean resubmit = switch (initialState) { 1348 case UNPARKED, UNBLOCKED, YIELDED -> { 1349 // resubmit as task may have run while suspended 1350 yield true; 1351 } 1352 case PARKED, TIMED_PARKED -> { 1353 // resubmit if unparked while suspended 1354 yield parkPermit && compareAndSetState(initialState, UNPARKED); 1355 } 1356 case BLOCKED -> { 1357 // resubmit if unblocked while suspended 1358 yield blockPermit && compareAndSetState(BLOCKED, UNBLOCKED); 1359 } 1360 case WAIT, TIMED_WAIT -> { 1361 // resubmit if notified or interrupted while waiting (Object.wait) 1362 // waitTimeoutExpired will retry if the timed expired when suspended 1363 yield (notified || interrupted) && compareAndSetState(initialState, UNBLOCKED); 1364 } 1365 default -> throw new InternalError(); 1366 }; 1367 if (resubmit) { 1368 submitRunContinuation(); 1369 } 1370 return stack; 1371 } 1372 1373 @Override 1374 public String toString() { 1375 StringBuilder sb = new StringBuilder("VirtualThread[#"); 1376 sb.append(threadId()); 1377 String name = getName(); 1378 if (!name.isEmpty()) { 1379 sb.append(","); 1380 sb.append(name); 1381 } 1382 sb.append("]/"); 1383 1384 // add the carrier state and thread name when mounted 1385 boolean mounted; 1386 if (Thread.currentThread() == this) { 1387 mounted = appendCarrierInfo(sb); 1388 } else { 1389 disableSuspendAndPreempt(); 1390 try { 1391 synchronized (carrierThreadAccessLock()) { 1392 mounted = appendCarrierInfo(sb); 1393 } 1394 } finally { 1395 enableSuspendAndPreempt(); 1396 } 1397 } 1398 1399 // add virtual thread state when not mounted 1400 if (!mounted) { 1401 String stateAsString = threadState().toString(); 1402 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1403 } 1404 1405 return sb.toString(); 1406 } 1407 1408 /** 1409 * Appends the carrier state and thread name to the string buffer if mounted. 1410 * @return true if mounted, false if not mounted 1411 */ 1412 private boolean appendCarrierInfo(StringBuilder sb) { 1413 assert Thread.currentThread() == this || Thread.holdsLock(carrierThreadAccessLock()); 1414 Thread carrier = carrierThread; 1415 if (carrier != null) { 1416 String stateAsString = carrier.threadState().toString(); 1417 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1418 sb.append('@'); 1419 sb.append(carrier.getName()); 1420 return true; 1421 } else { 1422 return false; 1423 } 1424 } 1425 1426 @Override 1427 public int hashCode() { 1428 return (int) threadId(); 1429 } 1430 1431 @Override 1432 public boolean equals(Object obj) { 1433 return obj == this; 1434 } 1435 1436 /** 1437 * Returns the termination object, creating it if needed. 1438 */ 1439 private CountDownLatch getTermination() { 1440 CountDownLatch termination = this.termination; 1441 if (termination == null) { 1442 termination = new CountDownLatch(1); 1443 if (!U.compareAndSetReference(this, TERMINATION, null, termination)) { 1444 termination = this.termination; 1445 } 1446 } 1447 return termination; 1448 } 1449 1450 /** 1451 * Returns the lock object to synchronize on when accessing carrierThread. 1452 * The lock prevents carrierThread from being reset to null during unmount. 1453 */ 1454 private Object carrierThreadAccessLock() { 1455 // return interruptLock as unmount has to coordinate with interrupt 1456 return interruptLock; 1457 } 1458 1459 /** 1460 * Returns a lock object for coordinating timed-wait setup and timeout handling. 1461 */ 1462 private Object timedWaitLock() { 1463 // use this object for now to avoid the overhead of introducing another lock 1464 return runContinuation; 1465 } 1466 1467 /** 1468 * Disallow the current thread be suspended or preempted. 1469 */ 1470 private void disableSuspendAndPreempt() { 1471 notifyJvmtiDisableSuspend(true); 1472 Continuation.pin(); 1473 } 1474 1475 /** 1476 * Allow the current thread be suspended or preempted. 1477 */ 1478 private void enableSuspendAndPreempt() { 1479 Continuation.unpin(); 1480 notifyJvmtiDisableSuspend(false); 1481 } 1482 1483 // -- wrappers for get/set of state, parking permit, and carrier thread -- 1484 1485 private int state() { 1486 return state; // volatile read 1487 } 1488 1489 private void setState(int newValue) { 1490 state = newValue; // volatile write 1491 } 1492 1493 private boolean compareAndSetState(int expectedValue, int newValue) { 1494 return U.compareAndSetInt(this, STATE, expectedValue, newValue); 1495 } 1496 1497 private boolean compareAndSetOnWaitingList(boolean expectedValue, boolean newValue) { 1498 return U.compareAndSetBoolean(this, ON_WAITING_LIST, expectedValue, newValue); 1499 } 1500 1501 private void setParkPermit(boolean newValue) { 1502 if (parkPermit != newValue) { 1503 parkPermit = newValue; 1504 } 1505 } 1506 1507 private boolean getAndSetParkPermit(boolean newValue) { 1508 if (parkPermit != newValue) { 1509 return U.getAndSetBoolean(this, PARK_PERMIT, newValue); 1510 } else { 1511 return newValue; 1512 } 1513 } 1514 1515 private void setCarrierThread(Thread carrier) { 1516 // U.putReferenceRelease(this, CARRIER_THREAD, carrier); 1517 this.carrierThread = carrier; 1518 } 1519 1520 // -- JVM TI support -- 1521 1522 @IntrinsicCandidate 1523 @JvmtiMountTransition 1524 private native void notifyJvmtiStart(); 1525 1526 @IntrinsicCandidate 1527 @JvmtiMountTransition 1528 private native void notifyJvmtiEnd(); 1529 1530 @IntrinsicCandidate 1531 @JvmtiMountTransition 1532 private native void notifyJvmtiMount(boolean hide); 1533 1534 @IntrinsicCandidate 1535 @JvmtiMountTransition 1536 private native void notifyJvmtiUnmount(boolean hide); 1537 1538 @IntrinsicCandidate 1539 private static native void notifyJvmtiDisableSuspend(boolean enter); 1540 1541 private static native void registerNatives(); 1542 static { 1543 registerNatives(); 1544 1545 // ensure VTHREAD_GROUP is created, may be accessed by JVMTI 1546 var group = Thread.virtualThreadGroup(); 1547 1548 // ensure event class is initialized 1549 try { 1550 MethodHandles.lookup().ensureInitialized(VirtualThreadParkEvent.class); 1551 } catch (IllegalAccessException e) { 1552 throw new ExceptionInInitializerError(e); 1553 } 1554 } 1555 1556 /** 1557 * Loads a VirtualThreadScheduler with the given class name. The class must be public 1558 * in an exported package, with public one-arg or no-arg constructor, and be visible 1559 * to the system class loader. 1560 * @param delegate the scheduler that the custom scheduler may delegate to 1561 * @param cn the class name of the custom scheduler 1562 */ 1563 private static VirtualThreadScheduler loadCustomScheduler(VirtualThreadScheduler delegate, String cn) { 1564 VirtualThreadScheduler scheduler; 1565 try { 1566 Class<?> clazz = Class.forName(cn, true, ClassLoader.getSystemClassLoader()); 1567 // 1-arg constructor 1568 try { 1569 Constructor<?> ctor = clazz.getConstructor(VirtualThreadScheduler.class); 1570 return (VirtualThreadScheduler) ctor.newInstance(delegate); 1571 } catch (NoSuchMethodException e) { 1572 // 0-arg constructor 1573 Constructor<?> ctor = clazz.getConstructor(); 1574 scheduler = (VirtualThreadScheduler) ctor.newInstance(); 1575 } 1576 } catch (Exception ex) { 1577 throw new Error(ex); 1578 } 1579 System.err.println("WARNING: Using custom default scheduler, this is an experimental feature!"); 1580 return scheduler; 1581 } 1582 1583 /** 1584 * Creates the built-in ForkJoinPool scheduler. 1585 * @param wrapped true if wrapped by a custom default scheduler 1586 */ 1587 private static BuiltinScheduler createBuiltinScheduler(boolean wrapped) { 1588 int parallelism, maxPoolSize, minRunnable; 1589 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism"); 1590 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize"); 1591 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable"); 1592 if (parallelismValue != null) { 1593 parallelism = Integer.parseInt(parallelismValue); 1594 } else { 1595 parallelism = Runtime.getRuntime().availableProcessors(); 1596 } 1597 if (maxPoolSizeValue != null) { 1598 maxPoolSize = Integer.parseInt(maxPoolSizeValue); 1599 parallelism = Integer.min(parallelism, maxPoolSize); 1600 } else { 1601 maxPoolSize = Integer.max(parallelism, 256); 1602 } 1603 if (minRunnableValue != null) { 1604 minRunnable = Integer.parseInt(minRunnableValue); 1605 } else { 1606 minRunnable = Integer.max(parallelism / 2, 1); 1607 } 1608 return new BuiltinScheduler(parallelism, maxPoolSize, minRunnable, wrapped); 1609 } 1610 1611 /** 1612 * The built-in ForkJoinPool scheduler. 1613 */ 1614 private static class BuiltinScheduler 1615 extends ForkJoinPool implements VirtualThreadScheduler { 1616 1617 BuiltinScheduler(int parallelism, int maxPoolSize, int minRunnable, boolean wrapped) { 1618 ForkJoinWorkerThreadFactory factory = wrapped 1619 ? ForkJoinPool.defaultForkJoinWorkerThreadFactory 1620 : CarrierThread::new; 1621 Thread.UncaughtExceptionHandler handler = (t, e) -> { }; 1622 boolean asyncMode = true; // FIFO 1623 super(parallelism, factory, handler, asyncMode, 1624 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS); 1625 } 1626 1627 private void adaptAndExecute(Runnable task) { 1628 execute(ForkJoinTask.adapt(task)); 1629 } 1630 1631 @Override 1632 public void onStart(VirtualThreadTask task) { 1633 adaptAndExecute(task); 1634 } 1635 1636 @Override 1637 public void onContinue(VirtualThreadTask task) { 1638 adaptAndExecute(task); 1639 } 1640 1641 /** 1642 * Wraps the scheduler to avoid leaking a direct reference with 1643 * {@link VirtualThreadScheduler#current()}. 1644 */ 1645 VirtualThreadScheduler createExternalView() { 1646 BuiltinScheduler builtin = this; 1647 return new VirtualThreadScheduler() { 1648 private void execute(VirtualThreadTask task) { 1649 var vthread = (VirtualThread) task.thread(); 1650 VirtualThreadScheduler scheduler = vthread.scheduler; 1651 if (scheduler == this || scheduler == DEFAULT_SCHEDULER) { 1652 builtin.adaptAndExecute(task); 1653 } else { 1654 throw new IllegalArgumentException(); 1655 } 1656 } 1657 @Override 1658 public void onStart(VirtualThreadTask task) { 1659 execute(task); 1660 } 1661 @Override 1662 public void onContinue(VirtualThreadTask task) { 1663 execute(task); 1664 } 1665 @Override 1666 public String toString() { 1667 return builtin.toString(); 1668 } 1669 }; 1670 } 1671 } 1672 1673 /** 1674 * Schedule a runnable task to run after a delay. 1675 */ 1676 private Future<?> schedule(Runnable command, long delay, TimeUnit unit) { 1677 if (scheduler == BUILTIN_SCHEDULER) { 1678 return BUILTIN_SCHEDULER.schedule(command, delay, unit); 1679 } else { 1680 return DelayedTaskSchedulers.schedule(command, delay, unit); 1681 } 1682 } 1683 1684 /** 1685 * Supports scheduling a runnable task to run after a delay. It uses a number 1686 * of ScheduledThreadPoolExecutor instances to reduce contention on the delayed 1687 * work queue used. This class is used when using a custom scheduler. 1688 */ 1689 private static class DelayedTaskSchedulers { 1690 private static final ScheduledExecutorService[] INSTANCE = createDelayedTaskSchedulers(); 1691 1692 static Future<?> schedule(Runnable command, long delay, TimeUnit unit) { 1693 long tid = Thread.currentThread().threadId(); 1694 int index = (int) tid & (INSTANCE.length - 1); 1695 return INSTANCE[index].schedule(command, delay, unit); 1696 } 1697 1698 private static ScheduledExecutorService[] createDelayedTaskSchedulers() { 1699 String propName = "jdk.virtualThreadScheduler.timerQueues"; 1700 String propValue = System.getProperty(propName); 1701 int queueCount; 1702 if (propValue != null) { 1703 queueCount = Integer.parseInt(propValue); 1704 if (queueCount != Integer.highestOneBit(queueCount)) { 1705 throw new RuntimeException("Value of " + propName + " must be power of 2"); 1706 } 1707 } else { 1708 int ncpus = Runtime.getRuntime().availableProcessors(); 1709 queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1); 1710 } 1711 var schedulers = new ScheduledExecutorService[queueCount]; 1712 for (int i = 0; i < queueCount; i++) { 1713 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) 1714 Executors.newScheduledThreadPool(1, task -> { 1715 Thread t = InnocuousThread.newThread("VirtualThread-unparker", task); 1716 t.setDaemon(true); 1717 return t; 1718 }); 1719 stpe.setRemoveOnCancelPolicy(true); 1720 schedulers[i] = stpe; 1721 } 1722 return schedulers; 1723 } 1724 } 1725 1726 /** 1727 * Schedule virtual threads that are ready to be scheduled after they blocked on 1728 * monitor enter. 1729 */ 1730 private static void unblockVirtualThreads() { 1731 while (true) { 1732 VirtualThread vthread = takeVirtualThreadListToUnblock(); 1733 while (vthread != null) { 1734 assert vthread.onWaitingList; 1735 VirtualThread nextThread = vthread.next; 1736 1737 // remove from list and unblock 1738 vthread.next = null; 1739 boolean changed = vthread.compareAndSetOnWaitingList(true, false); 1740 assert changed; 1741 vthread.unblock(); 1742 1743 vthread = nextThread; 1744 } 1745 } 1746 } 1747 1748 /** 1749 * Retrieves the list of virtual threads that are waiting to be unblocked, waiting 1750 * if necessary until a list of one or more threads becomes available. 1751 */ 1752 private static native VirtualThread takeVirtualThreadListToUnblock(); 1753 1754 static { 1755 var unblocker = InnocuousThread.newThread("VirtualThread-unblocker", 1756 VirtualThread::unblockVirtualThreads); 1757 unblocker.setDaemon(true); 1758 unblocker.start(); 1759 } 1760 } --- EOF ---