1 /* 2 * Copyright (c) 2018, 2024, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.lang; 26 27 import java.lang.reflect.Constructor; 28 import java.security.AccessController; 29 import java.security.PrivilegedAction; 30 import java.util.Arrays; 31 import java.util.Locale; 32 import java.util.Objects; 33 import java.util.concurrent.Callable; 34 import java.util.concurrent.CountDownLatch; 35 import java.util.concurrent.Executor; 36 import java.util.concurrent.Executors; 37 import java.util.concurrent.ForkJoinPool; 38 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory; 39 import java.util.concurrent.ForkJoinTask; 40 import java.util.concurrent.ForkJoinWorkerThread; 41 import java.util.concurrent.Future; 42 import java.util.concurrent.RejectedExecutionException; 43 import java.util.concurrent.ScheduledExecutorService; 44 import java.util.concurrent.ScheduledThreadPoolExecutor; 45 import java.util.concurrent.TimeUnit; 46 import java.util.stream.Stream; 47 import jdk.internal.event.VirtualThreadEndEvent; 48 import jdk.internal.event.VirtualThreadStartEvent; 49 import jdk.internal.event.VirtualThreadSubmitFailedEvent; 50 import jdk.internal.misc.CarrierThread; 51 import jdk.internal.misc.InnocuousThread; 52 import jdk.internal.misc.Unsafe; 53 import jdk.internal.vm.Continuation; 54 import jdk.internal.vm.ContinuationScope; 55 import jdk.internal.vm.StackableScope; 56 import jdk.internal.vm.ThreadContainer; 57 import jdk.internal.vm.ThreadContainers; 58 import jdk.internal.vm.annotation.ChangesCurrentThread; 59 import jdk.internal.vm.annotation.Hidden; 60 import jdk.internal.vm.annotation.IntrinsicCandidate; 61 import jdk.internal.vm.annotation.JvmtiMountTransition; 62 import jdk.internal.vm.annotation.ReservedStackAccess; 63 import sun.nio.ch.Interruptible; 64 import sun.security.action.GetPropertyAction; 65 import static java.util.concurrent.TimeUnit.*; 66 67 /** 68 * A thread that is scheduled by the Java virtual machine rather than the operating system. 69 */ 70 final class VirtualThread extends BaseVirtualThread { 71 private static final Unsafe U = Unsafe.getUnsafe(); 72 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads"); 73 private static final Executor DEFAULT_SCHEDULER = createDefaultScheduler(); 74 private static final ScheduledExecutorService[] DELAYED_TASK_SCHEDULERS = createDelayedTaskSchedulers(); 75 76 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state"); 77 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit"); 78 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread"); 79 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination"); 80 private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList"); 81 82 // scheduler and continuation 83 private final Executor scheduler; 84 private final Continuation cont; 85 private final Runnable runContinuation; 86 87 // virtual thread state, accessed by VM 88 private volatile int state; 89 90 /* 91 * Virtual thread state transitions: 92 * 93 * NEW -> STARTED // Thread.start, schedule to run 94 * STARTED -> TERMINATED // failed to start 95 * STARTED -> RUNNING // first run 96 * RUNNING -> TERMINATED // done 97 * 98 * RUNNING -> PARKING // Thread parking with LockSupport.park 99 * PARKING -> PARKED // cont.yield successful, parked indefinitely 100 * PARKING -> PINNED // cont.yield failed, parked indefinitely on carrier 101 * PARKED -> UNPARKED // unparked, may be scheduled to continue 102 * PINNED -> RUNNING // unparked, continue execution on same carrier 103 * UNPARKED -> RUNNING // continue execution after park 104 * 105 * RUNNING -> TIMED_PARKING // Thread parking with LockSupport.parkNanos 106 * TIMED_PARKING -> TIMED_PARKED // cont.yield successful, timed-parked 107 * TIMED_PARKING -> TIMED_PINNED // cont.yield failed, timed-parked on carrier 108 * TIMED_PARKED -> UNPARKED // unparked, may be scheduled to continue 109 * TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier 110 * 111 * RUNNING -> BLOCKING // blocking on monitor enter 112 * BLOCKING -> BLOCKED // blocked on monitor enter 113 * BLOCKED -> UNBLOCKED // unblocked, may be scheduled to continue 114 * UNBLOCKED -> RUNNING // continue execution after blocked on monitor enter 115 * 116 * RUNNING -> WAITING // transitional state during wait on monitor 117 * WAITING -> WAITED // waiting on monitor 118 * WAITED -> BLOCKED // notified, waiting to be unblocked by monitor owner 119 * WAITED -> UNBLOCKED // timed-out/interrupted 120 * 121 * RUNNING -> TIMED_WAITING // transition state during timed-waiting on monitor 122 * TIMED_WAITING -> TIMED_WAITED // timed-waiting on monitor 123 * TIMED_WAITED -> BLOCKED // notified, waiting to be unblocked by monitor owner 124 * TIMED_WAITED -> UNBLOCKED // timed-out/interrupted 125 * 126 * RUNNING -> YIELDING // Thread.yield 127 * YIELDING -> YIELDED // cont.yield successful, may be scheduled to continue 128 * YIELDING -> RUNNING // cont.yield failed 129 * YIELDED -> RUNNING // continue execution after Thread.yield 130 */ 131 private static final int NEW = 0; 132 private static final int STARTED = 1; 133 private static final int RUNNING = 2; // runnable-mounted 134 135 // untimed and timed parking 136 private static final int PARKING = 3; 137 private static final int PARKED = 4; // unmounted 138 private static final int PINNED = 5; // mounted 139 private static final int TIMED_PARKING = 6; 140 private static final int TIMED_PARKED = 7; // unmounted 141 private static final int TIMED_PINNED = 8; // mounted 142 private static final int UNPARKED = 9; // unmounted but runnable 143 144 // Thread.yield 145 private static final int YIELDING = 10; 146 private static final int YIELDED = 11; // unmounted but runnable 147 148 // monitor enter 149 private static final int BLOCKING = 12; 150 private static final int BLOCKED = 13; // unmounted 151 private static final int UNBLOCKED = 14; // unmounted but runnable 152 153 // monitor wait/timed-wait 154 private static final int WAITING = 15; 155 private static final int WAIT = 16; // waiting in Object.wait 156 private static final int TIMED_WAITING = 17; 157 private static final int TIMED_WAIT = 18; // waiting in timed-Object.wait 158 159 private static final int TERMINATED = 99; // final state 160 161 // can be suspended from scheduling when unmounted 162 private static final int SUSPENDED = 1 << 8; 163 164 // parking permit made available by LockSupport.unpark 165 private volatile boolean parkPermit; 166 167 // blocking permit made available by unblocker thread when another thread exits monitor 168 private volatile boolean blockPermit; 169 170 // true when on the list of virtual threads waiting to be unblocked 171 private volatile boolean onWaitingList; 172 173 // next virtual thread on the list of virtual threads waiting to be unblocked 174 private volatile VirtualThread next; 175 176 // notified by Object.notify/notifyAll while waiting in Object.wait 177 private volatile boolean notified; 178 179 // timed-wait support 180 private long waitTimeout; 181 private byte timedWaitNonce; 182 private volatile Future<?> waitTimeoutTask; 183 184 // carrier thread when mounted, accessed by VM 185 private volatile Thread carrierThread; 186 187 // termination object when joining, created lazily if needed 188 private volatile CountDownLatch termination; 189 190 /** 191 * Returns the default scheduler. 192 */ 193 static Executor defaultScheduler() { 194 return DEFAULT_SCHEDULER; 195 } 196 197 /** 198 * Returns a stream of the delayed task schedulers used to support timed operations. 199 */ 200 static Stream<ScheduledExecutorService> delayedTaskSchedulers() { 201 return Arrays.stream(DELAYED_TASK_SCHEDULERS); 202 } 203 204 /** 205 * Returns the continuation scope used for virtual threads. 206 */ 207 static ContinuationScope continuationScope() { 208 return VTHREAD_SCOPE; 209 } 210 211 /** 212 * Creates a new {@code VirtualThread} to run the given task with the given 213 * scheduler. If the given scheduler is {@code null} and the current thread 214 * is a platform thread then the newly created virtual thread will use the 215 * default scheduler. If given scheduler is {@code null} and the current 216 * thread is a virtual thread then the current thread's scheduler is used. 217 * 218 * @param scheduler the scheduler or null 219 * @param name thread name 220 * @param characteristics characteristics 221 * @param task the task to execute 222 */ 223 VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) { 224 super(name, characteristics, /*bound*/ false); 225 Objects.requireNonNull(task); 226 227 // choose scheduler if not specified 228 if (scheduler == null) { 229 Thread parent = Thread.currentThread(); 230 if (parent instanceof VirtualThread vparent) { 231 scheduler = vparent.scheduler; 232 } else { 233 scheduler = DEFAULT_SCHEDULER; 234 } 235 } 236 237 this.scheduler = scheduler; 238 this.cont = new VThreadContinuation(this, task); 239 this.runContinuation = this::runContinuation; 240 } 241 242 /** 243 * The continuation that a virtual thread executes. 244 */ 245 private static class VThreadContinuation extends Continuation { 246 VThreadContinuation(VirtualThread vthread, Runnable task) { 247 super(VTHREAD_SCOPE, wrap(vthread, task)); 248 } 249 @Override 250 protected void onPinned(Continuation.Pinned reason) { 251 // emit JFR event 252 virtualThreadPinnedEvent(reason.reasonCode(), reason.reasonString()); 253 } 254 private static Runnable wrap(VirtualThread vthread, Runnable task) { 255 return new Runnable() { 256 @Hidden 257 public void run() { 258 vthread.run(task); 259 } 260 }; 261 } 262 } 263 264 /** 265 * jdk.VirtualThreadPinned is emitted by HotSpot VM when pinned. Call into VM to 266 * emit event to avoid having a JFR event in Java with the same name (but different ID) 267 * to events emitted by the VM. 268 */ 269 private static native void virtualThreadPinnedEvent(int reason, String reasonString); 270 271 /** 272 * Runs or continues execution on the current thread. The virtual thread is mounted 273 * on the current thread before the task runs or continues. It unmounts when the 274 * task completes or yields. 275 */ 276 @ChangesCurrentThread // allow mount/unmount to be inlined 277 private void runContinuation() { 278 // the carrier must be a platform thread 279 if (Thread.currentThread().isVirtual()) { 280 throw new WrongThreadException(); 281 } 282 283 // set state to RUNNING 284 int initialState = state(); 285 if (initialState == STARTED || initialState == UNPARKED 286 || initialState == UNBLOCKED || initialState == YIELDED) { 287 // newly started or continue after parking/blocking/Thread.yield 288 if (!compareAndSetState(initialState, RUNNING)) { 289 return; 290 } 291 // consume permit when continuing after parking or blocking 292 if (initialState == UNPARKED) { 293 setParkPermit(false); 294 } if (initialState == UNBLOCKED) { 295 blockPermit = false; 296 } 297 } else { 298 // not runnable 299 return; 300 } 301 302 mount(); 303 try { 304 cont.run(); 305 } finally { 306 unmount(); 307 if (cont.isDone()) { 308 afterDone(); 309 } else { 310 afterYield(); 311 } 312 } 313 } 314 315 /** 316 * Submits the runContinuation task to the scheduler. For the default scheduler, 317 * and calling it on a worker thread, the task will be pushed to the local queue, 318 * otherwise it will be pushed to an external submission queue. 319 * @param scheduler the scheduler 320 * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown 321 * @throws RejectedExecutionException 322 */ 323 @ChangesCurrentThread 324 private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) { 325 boolean done = false; 326 while (!done) { 327 try { 328 // Pin the continuation to prevent the virtual thread from unmounting 329 // when submitting a task. For the default scheduler this ensures that 330 // the carrier doesn't change when pushing a task. For other schedulers 331 // it avoids deadlock that could arise due to carriers and virtual 332 // threads contending for a lock. 333 if (currentThread().isVirtual()) { 334 Continuation.pin(); 335 try { 336 scheduler.execute(runContinuation); 337 } finally { 338 Continuation.unpin(); 339 } 340 } else { 341 scheduler.execute(runContinuation); 342 } 343 done = true; 344 } catch (RejectedExecutionException ree) { 345 submitFailed(ree); 346 throw ree; 347 } catch (OutOfMemoryError e) { 348 if (retryOnOOME) { 349 U.park(false, 100_000_000); // 100ms 350 } else { 351 throw e; 352 } 353 } 354 } 355 } 356 357 /** 358 * Submits the runContinuation task to given scheduler with a lazy submit. 359 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 360 * @throws RejectedExecutionException 361 * @see ForkJoinPool#lazySubmit(ForkJoinTask) 362 */ 363 private void lazySubmitRunContinuation(ForkJoinPool pool) { 364 assert Thread.currentThread() instanceof CarrierThread; 365 try { 366 pool.lazySubmit(ForkJoinTask.adapt(runContinuation)); 367 } catch (RejectedExecutionException ree) { 368 submitFailed(ree); 369 throw ree; 370 } catch (OutOfMemoryError e) { 371 submitRunContinuation(pool, true); 372 } 373 } 374 375 /** 376 * Submits the runContinuation task to the given scheduler as an external submit. 377 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 378 * @throws RejectedExecutionException 379 * @see ForkJoinPool#externalSubmit(ForkJoinTask) 380 */ 381 private void externalSubmitRunContinuation(ForkJoinPool pool) { 382 assert Thread.currentThread() instanceof CarrierThread; 383 try { 384 pool.externalSubmit(ForkJoinTask.adapt(runContinuation)); 385 } catch (RejectedExecutionException ree) { 386 submitFailed(ree); 387 throw ree; 388 } catch (OutOfMemoryError e) { 389 submitRunContinuation(pool, true); 390 } 391 } 392 393 /** 394 * Submits the runContinuation task to the scheduler. For the default scheduler, 395 * and calling it on a worker thread, the task will be pushed to the local queue, 396 * otherwise it will be pushed to an external submission queue. 397 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 398 * @throws RejectedExecutionException 399 */ 400 private void submitRunContinuation() { 401 submitRunContinuation(scheduler, true); 402 } 403 404 /** 405 * Submits the runContinuation task to the scheduler. For the default scheduler, and 406 * calling it a virtual thread that uses the default scheduler, the task will be 407 * pushed to an external submission queue. This method may throw OutOfMemoryError. 408 * @throws RejectedExecutionException 409 * @throws OutOfMemoryError 410 */ 411 private void externalSubmitRunContinuationOrThrow() { 412 if (scheduler == DEFAULT_SCHEDULER && currentCarrierThread() instanceof CarrierThread ct) { 413 try { 414 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation)); 415 } catch (RejectedExecutionException ree) { 416 submitFailed(ree); 417 throw ree; 418 } 419 } else { 420 submitRunContinuation(scheduler, false); 421 } 422 } 423 424 /** 425 * If enabled, emits a JFR VirtualThreadSubmitFailedEvent. 426 */ 427 private void submitFailed(RejectedExecutionException ree) { 428 var event = new VirtualThreadSubmitFailedEvent(); 429 if (event.isEnabled()) { 430 event.javaThreadId = threadId(); 431 event.exceptionMessage = ree.getMessage(); 432 event.commit(); 433 } 434 } 435 436 /** 437 * Runs a task in the context of this virtual thread. 438 */ 439 private void run(Runnable task) { 440 assert Thread.currentThread() == this && state == RUNNING; 441 442 // notify JVMTI, may post VirtualThreadStart event 443 notifyJvmtiStart(); 444 445 // emit JFR event if enabled 446 if (VirtualThreadStartEvent.isTurnedOn()) { 447 var event = new VirtualThreadStartEvent(); 448 event.javaThreadId = threadId(); 449 event.commit(); 450 } 451 452 Object bindings = Thread.scopedValueBindings(); 453 try { 454 runWith(bindings, task); 455 } catch (Throwable exc) { 456 dispatchUncaughtException(exc); 457 } finally { 458 try { 459 // pop any remaining scopes from the stack, this may block 460 StackableScope.popAll(); 461 462 // emit JFR event if enabled 463 if (VirtualThreadEndEvent.isTurnedOn()) { 464 var event = new VirtualThreadEndEvent(); 465 event.javaThreadId = threadId(); 466 event.commit(); 467 } 468 469 } finally { 470 // notify JVMTI, may post VirtualThreadEnd event 471 notifyJvmtiEnd(); 472 } 473 } 474 } 475 476 /** 477 * Mounts this virtual thread onto the current platform thread. On 478 * return, the current thread is the virtual thread. 479 */ 480 @ChangesCurrentThread 481 @ReservedStackAccess 482 private void mount() { 483 // notify JVMTI before mount 484 notifyJvmtiMount(/*hide*/true); 485 486 // sets the carrier thread 487 Thread carrier = Thread.currentCarrierThread(); 488 setCarrierThread(carrier); 489 490 // sync up carrier thread interrupt status if needed 491 if (interrupted) { 492 carrier.setInterrupt(); 493 } else if (carrier.isInterrupted()) { 494 synchronized (interruptLock) { 495 // need to recheck interrupt status 496 if (!interrupted) { 497 carrier.clearInterrupt(); 498 } 499 } 500 } 501 502 // set Thread.currentThread() to return this virtual thread 503 carrier.setCurrentThread(this); 504 } 505 506 /** 507 * Unmounts this virtual thread from the carrier. On return, the 508 * current thread is the current platform thread. 509 */ 510 @ChangesCurrentThread 511 @ReservedStackAccess 512 private void unmount() { 513 assert !Thread.holdsLock(interruptLock); 514 515 // set Thread.currentThread() to return the platform thread 516 Thread carrier = this.carrierThread; 517 carrier.setCurrentThread(carrier); 518 519 // break connection to carrier thread, synchronized with interrupt 520 synchronized (interruptLock) { 521 setCarrierThread(null); 522 } 523 carrier.clearInterrupt(); 524 525 // notify JVMTI after unmount 526 notifyJvmtiUnmount(/*hide*/false); 527 } 528 529 /** 530 * Sets the current thread to the current carrier thread. 531 */ 532 @ChangesCurrentThread 533 @JvmtiMountTransition 534 private void switchToCarrierThread() { 535 notifyJvmtiHideFrames(true); 536 Thread carrier = this.carrierThread; 537 assert Thread.currentThread() == this 538 && carrier == Thread.currentCarrierThread(); 539 carrier.setCurrentThread(carrier); 540 Thread.setCurrentLockId(this.threadId()); // keep lock ID of virtual thread 541 } 542 543 /** 544 * Sets the current thread to the given virtual thread. 545 */ 546 @ChangesCurrentThread 547 @JvmtiMountTransition 548 private static void switchToVirtualThread(VirtualThread vthread) { 549 Thread carrier = vthread.carrierThread; 550 assert carrier == Thread.currentCarrierThread(); 551 carrier.setCurrentThread(vthread); 552 notifyJvmtiHideFrames(false); 553 } 554 555 /** 556 * Executes the given value returning task on the current carrier thread. 557 */ 558 @ChangesCurrentThread 559 <V> V executeOnCarrierThread(Callable<V> task) throws Exception { 560 assert Thread.currentThread() == this; 561 switchToCarrierThread(); 562 try { 563 return task.call(); 564 } finally { 565 switchToVirtualThread(this); 566 } 567 } 568 569 /** 570 * Invokes Continuation.yield, notifying JVMTI (if enabled) to hide frames until 571 * the continuation continues. 572 */ 573 @Hidden 574 private boolean yieldContinuation() { 575 notifyJvmtiUnmount(/*hide*/true); 576 try { 577 return Continuation.yield(VTHREAD_SCOPE); 578 } finally { 579 notifyJvmtiMount(/*hide*/false); 580 } 581 } 582 583 /** 584 * Invoked after the continuation yields. If parking then it sets the state 585 * and also re-submits the task to continue if unparked while parking. 586 * If yielding due to Thread.yield then it just submits the task to continue. 587 */ 588 private void afterYield() { 589 assert carrierThread == null; 590 591 // re-adjust parallelism if the virtual thread yielded when compensating 592 if (currentThread() instanceof CarrierThread ct) { 593 ct.endBlocking(); 594 } 595 596 int s = state(); 597 598 // LockSupport.park/parkNanos 599 if (s == PARKING || s == TIMED_PARKING) { 600 int newState = (s == PARKING) ? PARKED : TIMED_PARKED; 601 setState(newState); 602 603 // may have been unparked while parking 604 if (parkPermit && compareAndSetState(newState, UNPARKED)) { 605 // lazy submit to continue on the current carrier if possible 606 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 607 lazySubmitRunContinuation(ct.getPool()); 608 } else { 609 submitRunContinuation(); 610 } 611 } 612 return; 613 } 614 615 // Thread.yield 616 if (s == YIELDING) { 617 setState(YIELDED); 618 619 // external submit if there are no tasks in the local task queue 620 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 621 externalSubmitRunContinuation(ct.getPool()); 622 } else { 623 submitRunContinuation(); 624 } 625 return; 626 } 627 628 // blocking on monitorenter 629 if (s == BLOCKING) { 630 setState(BLOCKED); 631 632 // may have been unblocked while blocking 633 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) { 634 submitRunContinuation(); 635 } 636 return; 637 } 638 639 // Object.wait 640 if (s == WAITING || s == TIMED_WAITING) { 641 byte nonce; 642 int newState; 643 if (s == WAITING) { 644 nonce = 0; // not used 645 setState(newState = WAIT); 646 } else { 647 // synchronize with timeout task (previous timed-wait may be running) 648 synchronized (timedWaitLock()) { 649 nonce = ++timedWaitNonce; 650 setState(newState = TIMED_WAIT); 651 } 652 } 653 654 // may have been notified while in transition to wait state 655 if (notified && compareAndSetState(newState, BLOCKED)) { 656 // may have even been unblocked already 657 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) { 658 submitRunContinuation(); 659 } 660 return; 661 } 662 663 // may have been interrupted while in transition to wait state 664 if (interrupted && compareAndSetState(newState, UNBLOCKED)) { 665 submitRunContinuation(); 666 return; 667 } 668 669 // schedule wakeup 670 if (newState == TIMED_WAIT) { 671 assert waitTimeout > 0; 672 waitTimeoutTask = schedule(() -> waitTimeoutExpired(nonce), waitTimeout, MILLISECONDS); 673 } 674 return; 675 } 676 677 assert false; 678 } 679 680 /** 681 * Invoked after the continuation completes. 682 */ 683 private void afterDone() { 684 afterDone(true); 685 } 686 687 /** 688 * Invoked after the continuation completes (or start failed). Sets the thread 689 * state to TERMINATED and notifies anyone waiting for the thread to terminate. 690 * 691 * @param notifyContainer true if its container should be notified 692 */ 693 private void afterDone(boolean notifyContainer) { 694 assert carrierThread == null; 695 setState(TERMINATED); 696 697 // notify anyone waiting for this virtual thread to terminate 698 CountDownLatch termination = this.termination; 699 if (termination != null) { 700 assert termination.getCount() == 1; 701 termination.countDown(); 702 } 703 704 // notify container 705 if (notifyContainer) { 706 threadContainer().onExit(this); 707 } 708 709 // clear references to thread locals 710 clearReferences(); 711 } 712 713 /** 714 * Schedules this {@code VirtualThread} to execute. 715 * 716 * @throws IllegalStateException if the container is shutdown or closed 717 * @throws IllegalThreadStateException if the thread has already been started 718 * @throws RejectedExecutionException if the scheduler cannot accept a task 719 */ 720 @Override 721 void start(ThreadContainer container) { 722 if (!compareAndSetState(NEW, STARTED)) { 723 throw new IllegalThreadStateException("Already started"); 724 } 725 726 // bind thread to container 727 assert threadContainer() == null; 728 setThreadContainer(container); 729 730 // start thread 731 boolean addedToContainer = false; 732 boolean started = false; 733 try { 734 container.onStart(this); // may throw 735 addedToContainer = true; 736 737 // scoped values may be inherited 738 inheritScopedValueBindings(container); 739 740 // submit task to run thread, using externalSubmit if possible 741 externalSubmitRunContinuationOrThrow(); 742 started = true; 743 } finally { 744 if (!started) { 745 afterDone(addedToContainer); 746 } 747 } 748 } 749 750 @Override 751 public void start() { 752 start(ThreadContainers.root()); 753 } 754 755 @Override 756 public void run() { 757 // do nothing 758 } 759 760 /** 761 * Parks until unparked or interrupted. If already unparked then the parking 762 * permit is consumed and this method completes immediately (meaning it doesn't 763 * yield). It also completes immediately if the interrupt status is set. 764 */ 765 @Override 766 void park() { 767 assert Thread.currentThread() == this; 768 769 // complete immediately if parking permit available or interrupted 770 if (getAndSetParkPermit(false) || interrupted) 771 return; 772 773 // park the thread 774 boolean yielded = false; 775 setState(PARKING); 776 try { 777 yielded = yieldContinuation(); 778 } catch (OutOfMemoryError e) { 779 // park on carrier 780 } finally { 781 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 782 if (!yielded) { 783 assert state() == PARKING; 784 setState(RUNNING); 785 } 786 } 787 788 // park on the carrier thread when pinned 789 if (!yielded) { 790 parkOnCarrierThread(false, 0); 791 } 792 } 793 794 /** 795 * Parks up to the given waiting time or until unparked or interrupted. 796 * If already unparked then the parking permit is consumed and this method 797 * completes immediately (meaning it doesn't yield). It also completes immediately 798 * if the interrupt status is set or the waiting time is {@code <= 0}. 799 * 800 * @param nanos the maximum number of nanoseconds to wait. 801 */ 802 @Override 803 void parkNanos(long nanos) { 804 assert Thread.currentThread() == this; 805 806 // complete immediately if parking permit available or interrupted 807 if (getAndSetParkPermit(false) || interrupted) 808 return; 809 810 // park the thread for the waiting time 811 if (nanos > 0) { 812 long startTime = System.nanoTime(); 813 814 boolean yielded = false; 815 Future<?> unparker = null; 816 try { 817 unparker = scheduleUnpark(nanos); 818 } catch (OutOfMemoryError e) { 819 // park on carrier 820 } 821 if (unparker != null) { 822 setState(TIMED_PARKING); 823 try { 824 yielded = yieldContinuation(); 825 } catch (OutOfMemoryError e) { 826 // park on carrier 827 } finally { 828 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 829 if (!yielded) { 830 assert state() == TIMED_PARKING; 831 setState(RUNNING); 832 } 833 cancel(unparker); 834 } 835 } 836 837 // park on carrier thread for remaining time when pinned (or OOME) 838 if (!yielded) { 839 long remainingNanos = nanos - (System.nanoTime() - startTime); 840 parkOnCarrierThread(true, remainingNanos); 841 } 842 } 843 } 844 845 /** 846 * Parks the current carrier thread up to the given waiting time or until 847 * unparked or interrupted. If the virtual thread is interrupted then the 848 * interrupt status will be propagated to the carrier thread. 849 * @param timed true for a timed park, false for untimed 850 * @param nanos the waiting time in nanoseconds 851 */ 852 private void parkOnCarrierThread(boolean timed, long nanos) { 853 assert state() == RUNNING; 854 855 setState(timed ? TIMED_PINNED : PINNED); 856 try { 857 if (!parkPermit) { 858 if (!timed) { 859 U.park(false, 0); 860 } else if (nanos > 0) { 861 U.park(false, nanos); 862 } 863 } 864 } finally { 865 setState(RUNNING); 866 } 867 868 // consume parking permit 869 setParkPermit(false); 870 } 871 872 /** 873 * Invoked by parkNanos to schedule this virtual thread to be unparked after 874 * a given delay. 875 */ 876 @ChangesCurrentThread 877 private Future<?> scheduleUnpark(long nanos) { 878 assert Thread.currentThread() == this; 879 // need to switch to current carrier thread to avoid nested parking 880 switchToCarrierThread(); 881 try { 882 return schedule(this::unpark, nanos, NANOSECONDS); 883 } finally { 884 switchToVirtualThread(this); 885 } 886 } 887 888 /** 889 * Invoked by parkNanos to cancel the unpark timer. 890 */ 891 @ChangesCurrentThread 892 private void cancel(Future<?> future) { 893 assert Thread.currentThread() == this; 894 if (!future.isDone()) { 895 // need to switch to current carrier thread to avoid nested parking 896 switchToCarrierThread(); 897 try { 898 future.cancel(false); 899 } finally { 900 switchToVirtualThread(this); 901 } 902 } 903 } 904 905 /** 906 * Re-enables this virtual thread for scheduling. If this virtual thread is parked 907 * then its task is scheduled to continue, otherwise its next call to {@code park} or 908 * {@linkplain #parkNanos(long) parkNanos} is guaranteed not to block. 909 * @throws RejectedExecutionException if the scheduler cannot accept a task 910 */ 911 @Override 912 void unpark() { 913 if (!getAndSetParkPermit(true) && currentThread() != this) { 914 int s = state(); 915 916 // unparked while parked 917 if ((s == PARKED || s == TIMED_PARKED) && compareAndSetState(s, UNPARKED)) { 918 submitRunContinuation(); 919 return; 920 } 921 922 // unparked while parked when pinned 923 if (s == PINNED || s == TIMED_PINNED) { 924 // unpark carrier thread when pinned 925 disableSuspendAndPreempt(); 926 try { 927 synchronized (carrierThreadAccessLock()) { 928 Thread carrier = carrierThread; 929 if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) { 930 U.unpark(carrier); 931 } 932 } 933 } finally { 934 enableSuspendAndPreempt(); 935 } 936 return; 937 } 938 } 939 } 940 941 /** 942 * Invoked by unblocker thread to unblock this virtual thread. 943 */ 944 private void unblock() { 945 assert !Thread.currentThread().isVirtual(); 946 blockPermit = true; 947 if (state() == BLOCKED && compareAndSetState(BLOCKED, UNBLOCKED)) { 948 submitRunContinuation(); 949 } 950 } 951 952 /** 953 * Invoked by timer thread when wait timeout for virtual thread has expired. 954 * If the virtual thread is in timed-wait then this method will unblock the thread 955 * and submit its task so that it continues and attempts to reenter the monitor. 956 * This method does nothing if the thread has been woken by notify or interrupt. 957 */ 958 private void waitTimeoutExpired(byte nounce) { 959 assert !Thread.currentThread().isVirtual(); 960 for (;;) { 961 boolean unblocked = false; 962 synchronized (timedWaitLock()) { 963 if (nounce != timedWaitNonce) { 964 // this timeout task is for a past timed-wait 965 return; 966 } 967 int s = state(); 968 if (s == TIMED_WAIT) { 969 unblocked = compareAndSetState(TIMED_WAIT, UNBLOCKED); 970 } else if (s != (TIMED_WAIT | SUSPENDED)) { 971 // notified or interrupted, no longer waiting 972 return; 973 } 974 } 975 if (unblocked) { 976 submitRunContinuation(); 977 return; 978 } 979 // need to retry when thread is suspended in time-wait 980 Thread.yield(); 981 } 982 } 983 984 /** 985 * Invoked by Object.wait to cancel the wait timer. 986 */ 987 void cancelWaitTimeout() { 988 assert Thread.currentThread() == this; 989 Future<?> timeoutTask = this.waitTimeoutTask; 990 if (timeoutTask != null) { 991 // Pin the continuation to prevent the virtual thread from unmounting 992 // when there is contention removing the task. This avoids deadlock that 993 // could arise due to carriers and virtual threads contending for a 994 // lock on the delay queue. 995 Continuation.pin(); 996 try { 997 timeoutTask.cancel(false); 998 } finally { 999 Continuation.unpin(); 1000 } 1001 } 1002 } 1003 1004 /** 1005 * Attempts to yield the current virtual thread (Thread.yield). 1006 */ 1007 void tryYield() { 1008 assert Thread.currentThread() == this; 1009 setState(YIELDING); 1010 boolean yielded = false; 1011 try { 1012 yielded = yieldContinuation(); // may throw 1013 } finally { 1014 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 1015 if (!yielded) { 1016 assert state() == YIELDING; 1017 setState(RUNNING); 1018 } 1019 } 1020 } 1021 1022 /** 1023 * Sleep the current thread for the given sleep time (in nanoseconds). If 1024 * nanos is 0 then the thread will attempt to yield. 1025 * 1026 * @implNote This implementation parks the thread for the given sleeping time 1027 * and will therefore be observed in PARKED state during the sleep. Parking 1028 * will consume the parking permit so this method makes available the parking 1029 * permit after the sleep. This may be observed as a spurious, but benign, 1030 * wakeup when the thread subsequently attempts to park. 1031 * 1032 * @param nanos the maximum number of nanoseconds to sleep 1033 * @throws InterruptedException if interrupted while sleeping 1034 */ 1035 void sleepNanos(long nanos) throws InterruptedException { 1036 assert Thread.currentThread() == this && nanos >= 0; 1037 if (getAndClearInterrupt()) 1038 throw new InterruptedException(); 1039 if (nanos == 0) { 1040 tryYield(); 1041 } else { 1042 // park for the sleep time 1043 try { 1044 long remainingNanos = nanos; 1045 long startNanos = System.nanoTime(); 1046 while (remainingNanos > 0) { 1047 parkNanos(remainingNanos); 1048 if (getAndClearInterrupt()) { 1049 throw new InterruptedException(); 1050 } 1051 remainingNanos = nanos - (System.nanoTime() - startNanos); 1052 } 1053 } finally { 1054 // may have been unparked while sleeping 1055 setParkPermit(true); 1056 } 1057 } 1058 } 1059 1060 /** 1061 * Waits up to {@code nanos} nanoseconds for this virtual thread to terminate. 1062 * A timeout of {@code 0} means to wait forever. 1063 * 1064 * @throws InterruptedException if interrupted while waiting 1065 * @return true if the thread has terminated 1066 */ 1067 boolean joinNanos(long nanos) throws InterruptedException { 1068 if (state() == TERMINATED) 1069 return true; 1070 1071 // ensure termination object exists, then re-check state 1072 CountDownLatch termination = getTermination(); 1073 if (state() == TERMINATED) 1074 return true; 1075 1076 // wait for virtual thread to terminate 1077 if (nanos == 0) { 1078 termination.await(); 1079 } else { 1080 boolean terminated = termination.await(nanos, NANOSECONDS); 1081 if (!terminated) { 1082 // waiting time elapsed 1083 return false; 1084 } 1085 } 1086 assert state() == TERMINATED; 1087 return true; 1088 } 1089 1090 @Override 1091 void blockedOn(Interruptible b) { 1092 disableSuspendAndPreempt(); 1093 try { 1094 super.blockedOn(b); 1095 } finally { 1096 enableSuspendAndPreempt(); 1097 } 1098 } 1099 1100 @Override 1101 @SuppressWarnings("removal") 1102 public void interrupt() { 1103 if (Thread.currentThread() != this) { 1104 checkAccess(); 1105 1106 // if current thread is a virtual thread then prevent it from being 1107 // suspended or unmounted when entering or holding interruptLock 1108 Interruptible blocker; 1109 disableSuspendAndPreempt(); 1110 try { 1111 synchronized (interruptLock) { 1112 interrupted = true; 1113 blocker = nioBlocker(); 1114 if (blocker != null) { 1115 blocker.interrupt(this); 1116 } 1117 1118 // interrupt carrier thread if mounted 1119 Thread carrier = carrierThread; 1120 if (carrier != null) carrier.setInterrupt(); 1121 } 1122 } finally { 1123 enableSuspendAndPreempt(); 1124 } 1125 1126 // notify blocker after releasing interruptLock 1127 if (blocker != null) { 1128 blocker.postInterrupt(); 1129 } 1130 1131 // make available parking permit, unpark thread if parked 1132 unpark(); 1133 1134 // if thread is waiting in Object.wait then schedule to try to reenter 1135 int s = state(); 1136 if ((s == WAIT || s == TIMED_WAIT) && compareAndSetState(s, UNBLOCKED)) { 1137 submitRunContinuation(); 1138 } 1139 1140 } else { 1141 interrupted = true; 1142 carrierThread.setInterrupt(); 1143 setParkPermit(true); 1144 } 1145 } 1146 1147 @Override 1148 public boolean isInterrupted() { 1149 return interrupted; 1150 } 1151 1152 @Override 1153 boolean getAndClearInterrupt() { 1154 assert Thread.currentThread() == this; 1155 boolean oldValue = interrupted; 1156 if (oldValue) { 1157 disableSuspendAndPreempt(); 1158 try { 1159 synchronized (interruptLock) { 1160 interrupted = false; 1161 carrierThread.clearInterrupt(); 1162 } 1163 } finally { 1164 enableSuspendAndPreempt(); 1165 } 1166 } 1167 return oldValue; 1168 } 1169 1170 @Override 1171 Thread.State threadState() { 1172 int s = state(); 1173 switch (s & ~SUSPENDED) { 1174 case NEW: 1175 return Thread.State.NEW; 1176 case STARTED: 1177 // return NEW if thread container not yet set 1178 if (threadContainer() == null) { 1179 return Thread.State.NEW; 1180 } else { 1181 return Thread.State.RUNNABLE; 1182 } 1183 case UNPARKED: 1184 case UNBLOCKED: 1185 case YIELDED: 1186 // runnable, not mounted 1187 return Thread.State.RUNNABLE; 1188 case RUNNING: 1189 // if mounted then return state of carrier thread 1190 if (Thread.currentThread() != this) { 1191 disableSuspendAndPreempt(); 1192 try { 1193 synchronized (carrierThreadAccessLock()) { 1194 Thread carrierThread = this.carrierThread; 1195 if (carrierThread != null) { 1196 return carrierThread.threadState(); 1197 } 1198 } 1199 } finally { 1200 enableSuspendAndPreempt(); 1201 } 1202 } 1203 // runnable, mounted 1204 return Thread.State.RUNNABLE; 1205 case PARKING: 1206 case TIMED_PARKING: 1207 case BLOCKING: 1208 case WAITING: 1209 case TIMED_WAITING: 1210 case YIELDING: 1211 // runnable, in transition 1212 return Thread.State.RUNNABLE; 1213 case PARKED: 1214 case PINNED: 1215 case WAIT: 1216 return Thread.State.WAITING; 1217 case TIMED_PARKED: 1218 case TIMED_PINNED: 1219 case TIMED_WAIT: 1220 return Thread.State.TIMED_WAITING; 1221 case BLOCKED: 1222 return Thread.State.BLOCKED; 1223 case TERMINATED: 1224 return Thread.State.TERMINATED; 1225 default: 1226 throw new InternalError(); 1227 } 1228 } 1229 1230 @Override 1231 boolean alive() { 1232 int s = state; 1233 return (s != NEW && s != TERMINATED); 1234 } 1235 1236 @Override 1237 boolean isTerminated() { 1238 return (state == TERMINATED); 1239 } 1240 1241 @Override 1242 StackTraceElement[] asyncGetStackTrace() { 1243 StackTraceElement[] stackTrace; 1244 do { 1245 stackTrace = (carrierThread != null) 1246 ? super.asyncGetStackTrace() // mounted 1247 : tryGetStackTrace(); // unmounted 1248 if (stackTrace == null) { 1249 Thread.yield(); 1250 } 1251 } while (stackTrace == null); 1252 return stackTrace; 1253 } 1254 1255 /** 1256 * Returns the stack trace for this virtual thread if it is unmounted. 1257 * Returns null if the thread is mounted or in transition. 1258 */ 1259 private StackTraceElement[] tryGetStackTrace() { 1260 int initialState = state() & ~SUSPENDED; 1261 switch (initialState) { 1262 case NEW, STARTED, TERMINATED -> { 1263 return new StackTraceElement[0]; // unmounted, empty stack 1264 } 1265 case RUNNING, PINNED, TIMED_PINNED -> { 1266 return null; // mounted 1267 } 1268 case PARKED, TIMED_PARKED, BLOCKED, WAIT, TIMED_WAIT -> { 1269 // unmounted, not runnable 1270 } 1271 case UNPARKED, UNBLOCKED, YIELDED -> { 1272 // unmounted, runnable 1273 } 1274 case PARKING, TIMED_PARKING, BLOCKING, YIELDING, WAITING, TIMED_WAITING -> { 1275 return null; // in transition 1276 } 1277 default -> throw new InternalError("" + initialState); 1278 } 1279 1280 // thread is unmounted, prevent it from continuing 1281 int suspendedState = initialState | SUSPENDED; 1282 if (!compareAndSetState(initialState, suspendedState)) { 1283 return null; 1284 } 1285 1286 // get stack trace and restore state 1287 StackTraceElement[] stack; 1288 try { 1289 stack = cont.getStackTrace(); 1290 } finally { 1291 assert state == suspendedState; 1292 setState(initialState); 1293 } 1294 boolean resubmit = switch (initialState) { 1295 case UNPARKED, UNBLOCKED, YIELDED -> { 1296 // resubmit as task may have run while suspended 1297 yield true; 1298 } 1299 case PARKED, TIMED_PARKED -> { 1300 // resubmit if unparked while suspended 1301 yield parkPermit && compareAndSetState(initialState, UNPARKED); 1302 } 1303 case BLOCKED -> { 1304 // resubmit if unblocked while suspended 1305 yield blockPermit && compareAndSetState(BLOCKED, UNBLOCKED); 1306 } 1307 case WAIT, TIMED_WAIT -> { 1308 // resubmit if notified or interrupted while waiting (Object.wait) 1309 // waitTimeoutExpired will retry if the timed expired when suspended 1310 yield (notified || interrupted) && compareAndSetState(initialState, UNBLOCKED); 1311 } 1312 default -> throw new InternalError(); 1313 }; 1314 if (resubmit) { 1315 submitRunContinuation(); 1316 } 1317 return stack; 1318 } 1319 1320 @Override 1321 public String toString() { 1322 StringBuilder sb = new StringBuilder("VirtualThread[#"); 1323 sb.append(threadId()); 1324 String name = getName(); 1325 if (!name.isEmpty()) { 1326 sb.append(","); 1327 sb.append(name); 1328 } 1329 sb.append("]/"); 1330 1331 // add the carrier state and thread name when mounted 1332 boolean mounted; 1333 if (Thread.currentThread() == this) { 1334 mounted = appendCarrierInfo(sb); 1335 } else { 1336 disableSuspendAndPreempt(); 1337 try { 1338 synchronized (carrierThreadAccessLock()) { 1339 mounted = appendCarrierInfo(sb); 1340 } 1341 } finally { 1342 enableSuspendAndPreempt(); 1343 } 1344 } 1345 1346 // add virtual thread state when not mounted 1347 if (!mounted) { 1348 String stateAsString = threadState().toString(); 1349 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1350 } 1351 1352 return sb.toString(); 1353 } 1354 1355 /** 1356 * Appends the carrier state and thread name to the string buffer if mounted. 1357 * @return true if mounted, false if not mounted 1358 */ 1359 private boolean appendCarrierInfo(StringBuilder sb) { 1360 assert Thread.currentThread() == this || Thread.holdsLock(carrierThreadAccessLock()); 1361 Thread carrier = carrierThread; 1362 if (carrier != null) { 1363 String stateAsString = carrier.threadState().toString(); 1364 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1365 sb.append('@'); 1366 sb.append(carrier.getName()); 1367 return true; 1368 } else { 1369 return false; 1370 } 1371 } 1372 1373 @Override 1374 public int hashCode() { 1375 return (int) threadId(); 1376 } 1377 1378 @Override 1379 public boolean equals(Object obj) { 1380 return obj == this; 1381 } 1382 1383 /** 1384 * Returns the termination object, creating it if needed. 1385 */ 1386 private CountDownLatch getTermination() { 1387 CountDownLatch termination = this.termination; 1388 if (termination == null) { 1389 termination = new CountDownLatch(1); 1390 if (!U.compareAndSetReference(this, TERMINATION, null, termination)) { 1391 termination = this.termination; 1392 } 1393 } 1394 return termination; 1395 } 1396 1397 /** 1398 * Returns the lock object to synchronize on when accessing carrierThread. 1399 * The lock prevents carrierThread from being reset to null during unmount. 1400 */ 1401 private Object carrierThreadAccessLock() { 1402 // return interruptLock as unmount has to coordinate with interrupt 1403 return interruptLock; 1404 } 1405 1406 /** 1407 * Returns a lock object to coordinating timed-wait setup and timeout handling. 1408 */ 1409 private Object timedWaitLock() { 1410 // use this object for now to avoid the overhead of introducing another lock 1411 return runContinuation; 1412 } 1413 1414 /** 1415 * Disallow the current thread be suspended or preempted. 1416 */ 1417 private void disableSuspendAndPreempt() { 1418 notifyJvmtiDisableSuspend(true); 1419 Continuation.pin(); 1420 } 1421 1422 /** 1423 * Allow the current thread be suspended or preempted. 1424 */ 1425 private void enableSuspendAndPreempt() { 1426 Continuation.unpin(); 1427 notifyJvmtiDisableSuspend(false); 1428 } 1429 1430 // -- wrappers for get/set of state, parking permit, and carrier thread -- 1431 1432 private int state() { 1433 return state; // volatile read 1434 } 1435 1436 private void setState(int newValue) { 1437 state = newValue; // volatile write 1438 } 1439 1440 private boolean compareAndSetState(int expectedValue, int newValue) { 1441 return U.compareAndSetInt(this, STATE, expectedValue, newValue); 1442 } 1443 1444 private boolean compareAndSetOnWaitingList(boolean expectedValue, boolean newValue) { 1445 return U.compareAndSetBoolean(this, ON_WAITING_LIST, expectedValue, newValue); 1446 } 1447 1448 private void setParkPermit(boolean newValue) { 1449 if (parkPermit != newValue) { 1450 parkPermit = newValue; 1451 } 1452 } 1453 1454 private boolean getAndSetParkPermit(boolean newValue) { 1455 if (parkPermit != newValue) { 1456 return U.getAndSetBoolean(this, PARK_PERMIT, newValue); 1457 } else { 1458 return newValue; 1459 } 1460 } 1461 1462 private void setCarrierThread(Thread carrier) { 1463 // U.putReferenceRelease(this, CARRIER_THREAD, carrier); 1464 this.carrierThread = carrier; 1465 } 1466 1467 // -- JVM TI support -- 1468 1469 @IntrinsicCandidate 1470 @JvmtiMountTransition 1471 private native void notifyJvmtiStart(); 1472 1473 @IntrinsicCandidate 1474 @JvmtiMountTransition 1475 private native void notifyJvmtiEnd(); 1476 1477 @IntrinsicCandidate 1478 @JvmtiMountTransition 1479 private native void notifyJvmtiMount(boolean hide); 1480 1481 @IntrinsicCandidate 1482 @JvmtiMountTransition 1483 private native void notifyJvmtiUnmount(boolean hide); 1484 1485 @IntrinsicCandidate 1486 @JvmtiMountTransition 1487 private static native void notifyJvmtiHideFrames(boolean hide); 1488 1489 @IntrinsicCandidate 1490 private static native void notifyJvmtiDisableSuspend(boolean enter); 1491 1492 private static native void registerNatives(); 1493 static { 1494 registerNatives(); 1495 1496 // ensure VTHREAD_GROUP is created, may be accessed by JVMTI 1497 var group = Thread.virtualThreadGroup(); 1498 } 1499 1500 /** 1501 * Creates the default scheduler. 1502 * If the system property {@code jdk.virtualThreadScheduler.implClass} is set then 1503 * its value is the name of a class that implements java.util.concurrent.Executor. 1504 * The class is public in an exported package, has a public no-arg constructor, 1505 * and is visible to the system class loader. 1506 * If the system property is not set then the default scheduler will be a 1507 * ForkJoinPool instance. 1508 */ 1509 private static Executor createDefaultScheduler() { 1510 String propName = "jdk.virtualThreadScheduler.implClass"; 1511 String propValue = GetPropertyAction.privilegedGetProperty(propName); 1512 if (propValue != null) { 1513 try { 1514 Class<?> clazz = Class.forName(propValue, true, 1515 ClassLoader.getSystemClassLoader()); 1516 Constructor<?> ctor = clazz.getConstructor(); 1517 var scheduler = (Executor) ctor.newInstance(); 1518 System.err.println(""" 1519 WARNING: Using custom scheduler, this is an experimental feature."""); 1520 return scheduler; 1521 } catch (Exception ex) { 1522 throw new Error(ex); 1523 } 1524 } else { 1525 return createDefaultForkJoinPoolScheduler(); 1526 } 1527 } 1528 1529 /** 1530 * Creates the default ForkJoinPool scheduler. 1531 */ 1532 @SuppressWarnings("removal") 1533 private static ForkJoinPool createDefaultForkJoinPoolScheduler() { 1534 ForkJoinWorkerThreadFactory factory = pool -> { 1535 PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread(pool); 1536 return AccessController.doPrivileged(pa); 1537 }; 1538 PrivilegedAction<ForkJoinPool> pa = () -> { 1539 int parallelism, maxPoolSize, minRunnable; 1540 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism"); 1541 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize"); 1542 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable"); 1543 if (parallelismValue != null) { 1544 parallelism = Integer.parseInt(parallelismValue); 1545 } else { 1546 parallelism = Runtime.getRuntime().availableProcessors(); 1547 } 1548 if (maxPoolSizeValue != null) { 1549 maxPoolSize = Integer.parseInt(maxPoolSizeValue); 1550 parallelism = Integer.min(parallelism, maxPoolSize); 1551 } else { 1552 maxPoolSize = Integer.max(parallelism, 256); 1553 } 1554 if (minRunnableValue != null) { 1555 minRunnable = Integer.parseInt(minRunnableValue); 1556 } else { 1557 minRunnable = Integer.max(parallelism / 2, 1); 1558 } 1559 Thread.UncaughtExceptionHandler handler = (t, e) -> { }; 1560 boolean asyncMode = true; // FIFO 1561 return new ForkJoinPool(parallelism, factory, handler, asyncMode, 1562 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS); 1563 }; 1564 return AccessController.doPrivileged(pa); 1565 } 1566 1567 /** 1568 * Schedule a runnable task to run after a delay. 1569 */ 1570 private static Future<?> schedule(Runnable command, long delay, TimeUnit unit) { 1571 long tid = Thread.currentThread().threadId(); 1572 int index = (int) tid & (DELAYED_TASK_SCHEDULERS.length - 1); 1573 return DELAYED_TASK_SCHEDULERS[index].schedule(command, delay, unit); 1574 } 1575 1576 /** 1577 * Creates the ScheduledThreadPoolExecutors used to execute delayed tasks. 1578 */ 1579 private static ScheduledExecutorService[] createDelayedTaskSchedulers() { 1580 String propName = "jdk.virtualThreadScheduler.timerQueues"; 1581 String propValue = GetPropertyAction.privilegedGetProperty(propName); 1582 int queueCount; 1583 if (propValue != null) { 1584 queueCount = Integer.parseInt(propValue); 1585 if (queueCount != Integer.highestOneBit(queueCount)) { 1586 throw new RuntimeException("Value of " + propName + " must be power of 2"); 1587 } 1588 } else { 1589 int ncpus = Runtime.getRuntime().availableProcessors(); 1590 queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1); 1591 } 1592 var schedulers = new ScheduledExecutorService[queueCount]; 1593 for (int i = 0; i < queueCount; i++) { 1594 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) 1595 Executors.newScheduledThreadPool(1, task -> { 1596 Thread t = InnocuousThread.newThread("VirtualThread-unparker", task); 1597 t.setDaemon(true); 1598 return t; 1599 }); 1600 stpe.setRemoveOnCancelPolicy(true); 1601 schedulers[i] = stpe; 1602 } 1603 return schedulers; 1604 } 1605 1606 /** 1607 * Schedule virtual threads that are ready to be scheduled after they blocked on 1608 * monitor enter. 1609 */ 1610 private static void unblockVirtualThreads() { 1611 while (true) { 1612 VirtualThread vthread = takeVirtualThreadListToUnblock(); 1613 while (vthread != null) { 1614 assert vthread.onWaitingList; 1615 VirtualThread nextThread = vthread.next; 1616 1617 // remove from list and unblock 1618 vthread.next = null; 1619 boolean changed = vthread.compareAndSetOnWaitingList(true, false); 1620 assert changed; 1621 vthread.unblock(); 1622 1623 vthread = nextThread; 1624 } 1625 } 1626 } 1627 1628 /** 1629 * Retrieves the list of virtual threads that are waiting to be unblocked, waiting 1630 * if necessary until a list of one or more threads becomes available. 1631 */ 1632 private static native VirtualThread takeVirtualThreadListToUnblock(); 1633 1634 static { 1635 var unblocker = InnocuousThread.newThread("VirtualThread-unblocker", 1636 VirtualThread::unblockVirtualThreads); 1637 unblocker.setDaemon(true); 1638 unblocker.start(); 1639 } 1640 }