1 /* 2 * Copyright (c) 2018, 2024, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.lang; 26 27 import java.lang.reflect.Constructor; 28 import java.security.AccessController; 29 import java.security.PrivilegedAction; 30 import java.util.Arrays; 31 import java.util.Locale; 32 import java.util.Objects; 33 import java.util.concurrent.Callable; 34 import java.util.concurrent.CountDownLatch; 35 import java.util.concurrent.Executor; 36 import java.util.concurrent.Executors; 37 import java.util.concurrent.ForkJoinPool; 38 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory; 39 import java.util.concurrent.ForkJoinTask; 40 import java.util.concurrent.ForkJoinWorkerThread; 41 import java.util.concurrent.Future; 42 import java.util.concurrent.RejectedExecutionException; 43 import java.util.concurrent.ScheduledExecutorService; 44 import java.util.concurrent.ScheduledThreadPoolExecutor; 45 import java.util.concurrent.TimeUnit; 46 import java.util.stream.Stream; 47 import jdk.internal.event.VirtualThreadEndEvent; 48 import jdk.internal.event.VirtualThreadStartEvent; 49 import jdk.internal.event.VirtualThreadSubmitFailedEvent; 50 import jdk.internal.misc.CarrierThread; 51 import jdk.internal.misc.InnocuousThread; 52 import jdk.internal.misc.Unsafe; 53 import jdk.internal.vm.Continuation; 54 import jdk.internal.vm.ContinuationScope; 55 import jdk.internal.vm.StackableScope; 56 import jdk.internal.vm.ThreadContainer; 57 import jdk.internal.vm.ThreadContainers; 58 import jdk.internal.vm.annotation.ChangesCurrentThread; 59 import jdk.internal.vm.annotation.Hidden; 60 import jdk.internal.vm.annotation.IntrinsicCandidate; 61 import jdk.internal.vm.annotation.JvmtiMountTransition; 62 import jdk.internal.vm.annotation.ReservedStackAccess; 63 import sun.nio.ch.Interruptible; 64 import sun.security.action.GetPropertyAction; 65 import static java.util.concurrent.TimeUnit.*; 66 67 /** 68 * A thread that is scheduled by the Java virtual machine rather than the operating system. 69 */ 70 final class VirtualThread extends BaseVirtualThread { 71 private static final Unsafe U = Unsafe.getUnsafe(); 72 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads"); 73 private static final Executor DEFAULT_SCHEDULER = createDefaultScheduler(); 74 private static final ScheduledExecutorService[] DELAYED_TASK_SCHEDULERS = createDelayedTaskSchedulers(); 75 76 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state"); 77 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit"); 78 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread"); 79 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination"); 80 private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList"); 81 82 // scheduler and continuation 83 private final Executor scheduler; 84 private final Continuation cont; 85 private final Runnable runContinuation; 86 87 // virtual thread state, accessed by VM 88 private volatile int state; 89 90 /* 91 * Virtual thread state transitions: 92 * 93 * NEW -> STARTED // Thread.start, schedule to run 94 * STARTED -> TERMINATED // failed to start 95 * STARTED -> RUNNING // first run 96 * RUNNING -> TERMINATED // done 97 * 98 * RUNNING -> PARKING // Thread parking with LockSupport.park 99 * PARKING -> PARKED // cont.yield successful, parked indefinitely 100 * PARKING -> PINNED // cont.yield failed, parked indefinitely on carrier 101 * PARKED -> UNPARKED // unparked, may be scheduled to continue 102 * PINNED -> RUNNING // unparked, continue execution on same carrier 103 * UNPARKED -> RUNNING // continue execution after park 104 * 105 * RUNNING -> TIMED_PARKING // Thread parking with LockSupport.parkNanos 106 * TIMED_PARKING -> TIMED_PARKED // cont.yield successful, timed-parked 107 * TIMED_PARKING -> TIMED_PINNED // cont.yield failed, timed-parked on carrier 108 * TIMED_PARKED -> UNPARKED // unparked, may be scheduled to continue 109 * TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier 110 * 111 * RUNNING -> BLOCKING // blocking on monitor enter 112 * BLOCKING -> BLOCKED // blocked on monitor enter 113 * BLOCKED -> UNBLOCKED // unblocked, may be scheduled to continue 114 * UNBLOCKED -> RUNNING // continue execution after blocked on monitor enter 115 * 116 * RUNNING -> WAITING // transitional state during wait on monitor 117 * WAITING -> WAITED // waiting on monitor 118 * WAITED -> BLOCKED // notified, waiting to be unblocked by monitor owner 119 * WAITED -> UNBLOCKED // timed-out/interrupted 120 * 121 * RUNNING -> TIMED_WAITING // transition state during timed-waiting on monitor 122 * TIMED_WAITING -> TIMED_WAITED // timed-waiting on monitor 123 * TIMED_WAITED -> BLOCKED // notified, waiting to be unblocked by monitor owner 124 * TIMED_WAITED -> UNBLOCKED // timed-out/interrupted 125 * 126 * RUNNING -> YIELDING // Thread.yield 127 * YIELDING -> YIELDED // cont.yield successful, may be scheduled to continue 128 * YIELDING -> RUNNING // cont.yield failed 129 * YIELDED -> RUNNING // continue execution after Thread.yield 130 */ 131 private static final int NEW = 0; 132 private static final int STARTED = 1; 133 private static final int RUNNING = 2; // runnable-mounted 134 135 // untimed and timed parking 136 private static final int PARKING = 3; 137 private static final int PARKED = 4; // unmounted 138 private static final int PINNED = 5; // mounted 139 private static final int TIMED_PARKING = 6; 140 private static final int TIMED_PARKED = 7; // unmounted 141 private static final int TIMED_PINNED = 8; // mounted 142 private static final int UNPARKED = 9; // unmounted but runnable 143 144 // Thread.yield 145 private static final int YIELDING = 10; 146 private static final int YIELDED = 11; // unmounted but runnable 147 148 // monitor enter 149 private static final int BLOCKING = 12; 150 private static final int BLOCKED = 13; // unmounted 151 private static final int UNBLOCKED = 14; // unmounted but runnable 152 153 // monitor wait/timed-wait 154 private static final int WAITING = 15; 155 private static final int WAIT = 16; // waiting in Object.wait 156 private static final int TIMED_WAITING = 17; 157 private static final int TIMED_WAIT = 18; // waiting in timed-Object.wait 158 159 private static final int TERMINATED = 99; // final state 160 161 // can be suspended from scheduling when unmounted 162 private static final int SUSPENDED = 1 << 8; 163 164 // parking permit 165 private volatile boolean parkPermit; 166 167 // used to mark thread as ready to be unblocked 168 private volatile boolean unblocked; 169 170 // notified by Object.notify/notifyAll while waiting in Object.wait 171 private volatile boolean notified; 172 173 // timed-wait support 174 private long waitTimeout; 175 private byte timedWaitNonce; 176 private volatile Future<?> waitTimeoutTask; 177 178 // a positive value if "responsible thread" blocked on monitor enter, accessed by VM 179 private volatile byte recheckInterval; 180 181 // carrier thread when mounted, accessed by VM 182 private volatile Thread carrierThread; 183 184 // termination object when joining, created lazily if needed 185 private volatile CountDownLatch termination; 186 187 // has the value 1 when on the list of virtual threads waiting to be unblocked 188 private volatile byte onWaitingList; 189 190 // next virtual thread on the list of virtual threads waiting to be unblocked 191 private volatile VirtualThread next; 192 193 /** 194 * Returns the default scheduler. 195 */ 196 static Executor defaultScheduler() { 197 return DEFAULT_SCHEDULER; 198 } 199 200 /** 201 * Returns a stream of the delayed task schedulers used to support timed operations. 202 */ 203 static Stream<ScheduledExecutorService> delayedTaskSchedulers() { 204 return Arrays.stream(DELAYED_TASK_SCHEDULERS); 205 } 206 207 /** 208 * Returns the continuation scope used for virtual threads. 209 */ 210 static ContinuationScope continuationScope() { 211 return VTHREAD_SCOPE; 212 } 213 214 /** 215 * Creates a new {@code VirtualThread} to run the given task with the given 216 * scheduler. If the given scheduler is {@code null} and the current thread 217 * is a platform thread then the newly created virtual thread will use the 218 * default scheduler. If given scheduler is {@code null} and the current 219 * thread is a virtual thread then the current thread's scheduler is used. 220 * 221 * @param scheduler the scheduler or null 222 * @param name thread name 223 * @param characteristics characteristics 224 * @param task the task to execute 225 */ 226 VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) { 227 super(name, characteristics, /*bound*/ false); 228 Objects.requireNonNull(task); 229 230 // choose scheduler if not specified 231 if (scheduler == null) { 232 Thread parent = Thread.currentThread(); 233 if (parent instanceof VirtualThread vparent) { 234 scheduler = vparent.scheduler; 235 } else { 236 scheduler = DEFAULT_SCHEDULER; 237 } 238 } 239 240 this.scheduler = scheduler; 241 this.cont = new VThreadContinuation(this, task); 242 this.runContinuation = this::runContinuation; 243 } 244 245 /** 246 * The continuation that a virtual thread executes. 247 */ 248 private static class VThreadContinuation extends Continuation { 249 VThreadContinuation(VirtualThread vthread, Runnable task) { 250 super(VTHREAD_SCOPE, wrap(vthread, task)); 251 } 252 @Override 253 protected void onPinned(Continuation.Pinned reason) { 254 // emit JFR event 255 virtualThreadPinnedEvent(reason.reasonCode(), reason.reasonString()); 256 } 257 private static Runnable wrap(VirtualThread vthread, Runnable task) { 258 return new Runnable() { 259 @Hidden 260 public void run() { 261 vthread.run(task); 262 } 263 }; 264 } 265 } 266 267 /** 268 * jdk.VirtualThreadPinned is emitted by HotSpot VM when pinned. Call into VM to 269 * emit event to avoid having a JFR event in Java with the same name (but different ID) 270 * to events emitted by the VM. 271 */ 272 private static native void virtualThreadPinnedEvent(int reason, String reasonString); 273 274 /** 275 * Runs or continues execution on the current thread. The virtual thread is mounted 276 * on the current thread before the task runs or continues. It unmounts when the 277 * task completes or yields. 278 */ 279 @ChangesCurrentThread // allow mount/unmount to be inlined 280 private void runContinuation() { 281 // the carrier must be a platform thread 282 if (Thread.currentThread().isVirtual()) { 283 throw new WrongThreadException(); 284 } 285 286 // set state to RUNNING 287 int initialState = state(); 288 if (initialState == STARTED || initialState == UNPARKED 289 || initialState == UNBLOCKED || initialState == YIELDED) { 290 // newly started or continue after parking/blocking/Thread.yield 291 if (!compareAndSetState(initialState, RUNNING)) { 292 return; 293 } 294 // consume parking permit when continuing after parking 295 if (initialState == UNPARKED) { 296 setParkPermit(false); 297 } 298 } else { 299 // not runnable 300 return; 301 } 302 303 mount(); 304 try { 305 cont.run(); 306 } finally { 307 unmount(); 308 if (cont.isDone()) { 309 afterDone(); 310 } else { 311 afterYield(); 312 } 313 } 314 } 315 316 /** 317 * Submits the runContinuation task to the scheduler. For the default scheduler, 318 * and calling it on a worker thread, the task will be pushed to the local queue, 319 * otherwise it will be pushed to an external submission queue. 320 * @param scheduler the scheduler 321 * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown 322 * @throws RejectedExecutionException 323 */ 324 @ChangesCurrentThread 325 private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) { 326 boolean done = false; 327 while (!done) { 328 try { 329 // The scheduler's execute method is invoked in the context of the 330 // carrier thread. For the default scheduler this ensures that the 331 // current thread is a ForkJoinWorkerThread so the task will be pushed 332 // to the local queue. For other schedulers, it avoids deadlock that 333 // would arise due to platform and virtual threads contending for a 334 // lock on the scheduler's submission queue. 335 if (currentThread() instanceof VirtualThread vthread) { 336 vthread.switchToCarrierThread(); 337 try { 338 scheduler.execute(runContinuation); 339 } finally { 340 switchToVirtualThread(vthread); 341 } 342 } else { 343 scheduler.execute(runContinuation); 344 } 345 done = true; 346 } catch (RejectedExecutionException ree) { 347 submitFailed(ree); 348 throw ree; 349 } catch (OutOfMemoryError e) { 350 if (retryOnOOME) { 351 U.park(false, 100_000_000); // 100ms 352 } else { 353 throw e; 354 } 355 } 356 } 357 } 358 359 /** 360 * Submits the runContinuation task to given scheduler with a lazy submit. 361 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 362 * @throws RejectedExecutionException 363 * @see ForkJoinPool#lazySubmit(ForkJoinTask) 364 */ 365 private void lazySubmitRunContinuation(ForkJoinPool pool) { 366 assert Thread.currentThread() instanceof CarrierThread; 367 try { 368 pool.lazySubmit(ForkJoinTask.adapt(runContinuation)); 369 } catch (RejectedExecutionException ree) { 370 submitFailed(ree); 371 throw ree; 372 } catch (OutOfMemoryError e) { 373 submitRunContinuation(pool, true); 374 } 375 } 376 377 /** 378 * Submits the runContinuation task to the given scheduler as an external submit. 379 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 380 * @throws RejectedExecutionException 381 * @see ForkJoinPool#externalSubmit(ForkJoinTask) 382 */ 383 private void externalSubmitRunContinuation(ForkJoinPool pool) { 384 assert Thread.currentThread() instanceof CarrierThread; 385 try { 386 pool.externalSubmit(ForkJoinTask.adapt(runContinuation)); 387 } catch (RejectedExecutionException ree) { 388 submitFailed(ree); 389 throw ree; 390 } catch (OutOfMemoryError e) { 391 submitRunContinuation(pool, true); 392 } 393 } 394 395 /** 396 * Submits the runContinuation task to the scheduler. For the default scheduler, 397 * and calling it on a worker thread, the task will be pushed to the local queue, 398 * otherwise it will be pushed to an external submission queue. 399 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 400 * @throws RejectedExecutionException 401 */ 402 private void submitRunContinuation() { 403 submitRunContinuation(scheduler, true); 404 } 405 406 /** 407 * Submits the runContinuation task to the scheduler. For the default scheduler, and 408 * calling it a virtual thread that uses the default scheduler, the task will be 409 * pushed to an external submission queue. This method may throw OutOfMemoryError. 410 * @throws RejectedExecutionException 411 * @throws OutOfMemoryError 412 */ 413 private void externalSubmitRunContinuationOrThrow() { 414 if (scheduler == DEFAULT_SCHEDULER && currentCarrierThread() instanceof CarrierThread ct) { 415 try { 416 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation)); 417 } catch (RejectedExecutionException ree) { 418 submitFailed(ree); 419 throw ree; 420 } 421 } else { 422 submitRunContinuation(scheduler, false); 423 } 424 } 425 426 /** 427 * If enabled, emits a JFR VirtualThreadSubmitFailedEvent. 428 */ 429 private void submitFailed(RejectedExecutionException ree) { 430 var event = new VirtualThreadSubmitFailedEvent(); 431 if (event.isEnabled()) { 432 event.javaThreadId = threadId(); 433 event.exceptionMessage = ree.getMessage(); 434 event.commit(); 435 } 436 } 437 438 /** 439 * Runs a task in the context of this virtual thread. 440 */ 441 private void run(Runnable task) { 442 assert Thread.currentThread() == this && state == RUNNING; 443 444 // notify JVMTI, may post VirtualThreadStart event 445 notifyJvmtiStart(); 446 447 // emit JFR event if enabled 448 if (VirtualThreadStartEvent.isTurnedOn()) { 449 var event = new VirtualThreadStartEvent(); 450 event.javaThreadId = threadId(); 451 event.commit(); 452 } 453 454 Object bindings = Thread.scopedValueBindings(); 455 try { 456 runWith(bindings, task); 457 } catch (Throwable exc) { 458 dispatchUncaughtException(exc); 459 } finally { 460 try { 461 // pop any remaining scopes from the stack, this may block 462 StackableScope.popAll(); 463 464 // emit JFR event if enabled 465 if (VirtualThreadEndEvent.isTurnedOn()) { 466 var event = new VirtualThreadEndEvent(); 467 event.javaThreadId = threadId(); 468 event.commit(); 469 } 470 471 } finally { 472 // notify JVMTI, may post VirtualThreadEnd event 473 notifyJvmtiEnd(); 474 } 475 } 476 } 477 478 /** 479 * Mounts this virtual thread onto the current platform thread. On 480 * return, the current thread is the virtual thread. 481 */ 482 @ChangesCurrentThread 483 @ReservedStackAccess 484 private void mount() { 485 // notify JVMTI before mount 486 notifyJvmtiMount(/*hide*/true); 487 488 // sets the carrier thread 489 Thread carrier = Thread.currentCarrierThread(); 490 setCarrierThread(carrier); 491 492 // sync up carrier thread interrupt status if needed 493 if (interrupted) { 494 carrier.setInterrupt(); 495 } else if (carrier.isInterrupted()) { 496 synchronized (interruptLock) { 497 // need to recheck interrupt status 498 if (!interrupted) { 499 carrier.clearInterrupt(); 500 } 501 } 502 } 503 504 // set Thread.currentThread() to return this virtual thread 505 carrier.setCurrentThread(this); 506 } 507 508 /** 509 * Unmounts this virtual thread from the carrier. On return, the 510 * current thread is the current platform thread. 511 */ 512 @ChangesCurrentThread 513 @ReservedStackAccess 514 private void unmount() { 515 assert !Thread.holdsLock(interruptLock); 516 517 // set Thread.currentThread() to return the platform thread 518 Thread carrier = this.carrierThread; 519 carrier.setCurrentThread(carrier); 520 521 // break connection to carrier thread, synchronized with interrupt 522 synchronized (interruptLock) { 523 setCarrierThread(null); 524 } 525 carrier.clearInterrupt(); 526 527 // notify JVMTI after unmount 528 notifyJvmtiUnmount(/*hide*/false); 529 } 530 531 /** 532 * Sets the current thread to the current carrier thread. 533 */ 534 @ChangesCurrentThread 535 @JvmtiMountTransition 536 private void switchToCarrierThread() { 537 notifyJvmtiHideFrames(true); 538 Thread carrier = this.carrierThread; 539 assert Thread.currentThread() == this 540 && carrier == Thread.currentCarrierThread(); 541 carrier.setCurrentThread(carrier); 542 setLockId(this.threadId()); // keep lockid of vthread 543 } 544 545 /** 546 * Sets the current thread to the given virtual thread. 547 */ 548 @ChangesCurrentThread 549 @JvmtiMountTransition 550 private static void switchToVirtualThread(VirtualThread vthread) { 551 Thread carrier = vthread.carrierThread; 552 assert carrier == Thread.currentCarrierThread(); 553 carrier.setCurrentThread(vthread); 554 notifyJvmtiHideFrames(false); 555 } 556 557 /** 558 * Executes the given value returning task on the current carrier thread. 559 */ 560 @ChangesCurrentThread 561 <V> V executeOnCarrierThread(Callable<V> task) throws Exception { 562 assert Thread.currentThread() == this; 563 switchToCarrierThread(); 564 try { 565 return task.call(); 566 } finally { 567 switchToVirtualThread(this); 568 } 569 } 570 571 /** 572 * Invokes Continuation.yield, notifying JVMTI (if enabled) to hide frames until 573 * the continuation continues. 574 */ 575 @Hidden 576 private boolean yieldContinuation() { 577 notifyJvmtiUnmount(/*hide*/true); 578 try { 579 return Continuation.yield(VTHREAD_SCOPE); 580 } finally { 581 notifyJvmtiMount(/*hide*/false); 582 } 583 } 584 585 /** 586 * Invoked after the continuation yields. If parking then it sets the state 587 * and also re-submits the task to continue if unparked while parking. 588 * If yielding due to Thread.yield then it just submits the task to continue. 589 */ 590 private void afterYield() { 591 assert carrierThread == null; 592 593 // re-adjust parallelism if the virtual thread yielded when compensating 594 if (currentThread() instanceof CarrierThread ct) { 595 ct.endBlocking(); 596 } 597 598 int s = state(); 599 600 // LockSupport.park/parkNanos 601 if (s == PARKING || s == TIMED_PARKING) { 602 int newState = (s == PARKING) ? PARKED : TIMED_PARKED; 603 setState(newState); 604 605 // may have been unparked while parking 606 if (parkPermit && compareAndSetState(newState, UNPARKED)) { 607 // lazy submit to continue on the current carrier if possible 608 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 609 lazySubmitRunContinuation(ct.getPool()); 610 } else { 611 submitRunContinuation(); 612 } 613 } 614 return; 615 } 616 617 // Thread.yield 618 if (s == YIELDING) { 619 setState(YIELDED); 620 621 // external submit if there are no tasks in the local task queue 622 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 623 externalSubmitRunContinuation(ct.getPool()); 624 } else { 625 submitRunContinuation(); 626 } 627 return; 628 } 629 630 // blocking on monitorenter 631 if (s == BLOCKING) { 632 setState(BLOCKED); 633 634 // may have been unblocked while blocking 635 if (unblocked && compareAndSetState(BLOCKED, UNBLOCKED)) { 636 unblocked = false; 637 submitRunContinuation(); 638 return; 639 } 640 641 // if thread is the designated responsible thread for a monitor then schedule 642 // it to wakeup so that it can check and recover. See objectMonitor.cpp. 643 int recheckInterval = this.recheckInterval; 644 if (recheckInterval > 0) { 645 assert recheckInterval >= 1 && recheckInterval <= 6; 646 // 4 ^ (recheckInterval - 1) = 1, 4, 16, ... 1024 647 long delay = 1 << (recheckInterval - 1) << (recheckInterval - 1); 648 schedule(this::unblock, delay, MILLISECONDS); 649 } 650 return; 651 } 652 653 // Object.wait 654 if (s == WAITING || s == TIMED_WAITING) { 655 byte nonce; 656 int newState; 657 if (s == WAITING) { 658 nonce = 0; // not used 659 setState(newState = WAIT); 660 } else { 661 // synchronize with timeout task (previous timed-wait may be running) 662 synchronized (timedWaitLock()) { 663 nonce = ++timedWaitNonce; 664 setState(newState = TIMED_WAIT); 665 } 666 } 667 668 // may have been notified while in transition to wait state 669 if (notified && compareAndSetState(newState, BLOCKED)) { 670 // may have even been unblocked already 671 if (unblocked && compareAndSetState(BLOCKED, UNBLOCKED)) { 672 unblocked = false; 673 submitRunContinuation(); 674 } 675 return; 676 } 677 678 // may have been interrupted while in transition to wait state 679 if (interrupted && compareAndSetState(newState, UNBLOCKED)) { 680 submitRunContinuation(); 681 return; 682 } 683 684 // schedule wakeup 685 if (newState == TIMED_WAIT) { 686 assert waitTimeout > 0; 687 waitTimeoutTask = schedule(() -> waitTimeoutExpired(nonce), waitTimeout, MILLISECONDS); 688 } 689 return; 690 } 691 692 assert false; 693 } 694 695 /** 696 * Invoked after the continuation completes. 697 */ 698 private void afterDone() { 699 afterDone(true); 700 } 701 702 /** 703 * Invoked after the continuation completes (or start failed). Sets the thread 704 * state to TERMINATED and notifies anyone waiting for the thread to terminate. 705 * 706 * @param notifyContainer true if its container should be notified 707 */ 708 private void afterDone(boolean notifyContainer) { 709 assert carrierThread == null; 710 setState(TERMINATED); 711 712 // notify anyone waiting for this virtual thread to terminate 713 CountDownLatch termination = this.termination; 714 if (termination != null) { 715 assert termination.getCount() == 1; 716 termination.countDown(); 717 } 718 719 // notify container 720 if (notifyContainer) { 721 threadContainer().onExit(this); 722 } 723 724 // clear references to thread locals 725 clearReferences(); 726 } 727 728 /** 729 * Schedules this {@code VirtualThread} to execute. 730 * 731 * @throws IllegalStateException if the container is shutdown or closed 732 * @throws IllegalThreadStateException if the thread has already been started 733 * @throws RejectedExecutionException if the scheduler cannot accept a task 734 */ 735 @Override 736 void start(ThreadContainer container) { 737 if (!compareAndSetState(NEW, STARTED)) { 738 throw new IllegalThreadStateException("Already started"); 739 } 740 741 // bind thread to container 742 assert threadContainer() == null; 743 setThreadContainer(container); 744 745 // start thread 746 boolean addedToContainer = false; 747 boolean started = false; 748 try { 749 container.onStart(this); // may throw 750 addedToContainer = true; 751 752 // scoped values may be inherited 753 inheritScopedValueBindings(container); 754 755 // submit task to run thread, using externalSubmit if possible 756 externalSubmitRunContinuationOrThrow(); 757 started = true; 758 } finally { 759 if (!started) { 760 afterDone(addedToContainer); 761 } 762 } 763 } 764 765 @Override 766 public void start() { 767 start(ThreadContainers.root()); 768 } 769 770 @Override 771 public void run() { 772 // do nothing 773 } 774 775 /** 776 * Parks until unparked or interrupted. If already unparked then the parking 777 * permit is consumed and this method completes immediately (meaning it doesn't 778 * yield). It also completes immediately if the interrupt status is set. 779 */ 780 @Override 781 void park() { 782 assert Thread.currentThread() == this; 783 784 // complete immediately if parking permit available or interrupted 785 if (getAndSetParkPermit(false) || interrupted) 786 return; 787 788 // park the thread 789 boolean yielded = false; 790 setState(PARKING); 791 try { 792 yielded = yieldContinuation(); // may throw 793 } finally { 794 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 795 if (!yielded) { 796 assert state() == PARKING; 797 setState(RUNNING); 798 } 799 } 800 801 // park on the carrier thread when pinned 802 if (!yielded) { 803 parkOnCarrierThread(false, 0); 804 } 805 } 806 807 /** 808 * Parks up to the given waiting time or until unparked or interrupted. 809 * If already unparked then the parking permit is consumed and this method 810 * completes immediately (meaning it doesn't yield). It also completes immediately 811 * if the interrupt status is set or the waiting time is {@code <= 0}. 812 * 813 * @param nanos the maximum number of nanoseconds to wait. 814 */ 815 @Override 816 void parkNanos(long nanos) { 817 assert Thread.currentThread() == this; 818 819 // complete immediately if parking permit available or interrupted 820 if (getAndSetParkPermit(false) || interrupted) 821 return; 822 823 // park the thread for the waiting time 824 if (nanos > 0) { 825 long startTime = System.nanoTime(); 826 827 boolean yielded = false; 828 Future<?> unparker = scheduleUnpark(nanos); // may throw OOME 829 setState(TIMED_PARKING); 830 try { 831 yielded = yieldContinuation(); // may throw 832 } finally { 833 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 834 if (!yielded) { 835 assert state() == TIMED_PARKING; 836 setState(RUNNING); 837 } 838 cancel(unparker); 839 } 840 841 // park on carrier thread for remaining time when pinned 842 if (!yielded) { 843 long remainingNanos = nanos - (System.nanoTime() - startTime); 844 parkOnCarrierThread(true, remainingNanos); 845 } 846 } 847 } 848 849 /** 850 * Parks the current carrier thread up to the given waiting time or until 851 * unparked or interrupted. If the virtual thread is interrupted then the 852 * interrupt status will be propagated to the carrier thread. 853 * @param timed true for a timed park, false for untimed 854 * @param nanos the waiting time in nanoseconds 855 */ 856 private void parkOnCarrierThread(boolean timed, long nanos) { 857 assert state() == RUNNING; 858 859 setState(timed ? TIMED_PINNED : PINNED); 860 try { 861 if (!parkPermit) { 862 if (!timed) { 863 U.park(false, 0); 864 } else if (nanos > 0) { 865 U.park(false, nanos); 866 } 867 } 868 } finally { 869 setState(RUNNING); 870 } 871 872 // consume parking permit 873 setParkPermit(false); 874 } 875 876 /** 877 * Invoked by parkNanos to schedule this virtual thread to be unparked after 878 * a given delay. 879 */ 880 @ChangesCurrentThread 881 private Future<?> scheduleUnpark(long nanos) { 882 assert Thread.currentThread() == this; 883 // need to switch to current carrier thread to avoid nested parking 884 switchToCarrierThread(); 885 try { 886 return schedule(this::unpark, nanos, NANOSECONDS); 887 } finally { 888 switchToVirtualThread(this); 889 } 890 } 891 892 /** 893 * Invoked by parkNanos to cancel the unpark timer. 894 */ 895 @ChangesCurrentThread 896 private void cancel(Future<?> future) { 897 assert Thread.currentThread() == this; 898 if (!future.isDone()) { 899 // need to switch to current carrier thread to avoid nested parking 900 switchToCarrierThread(); 901 try { 902 future.cancel(false); 903 } finally { 904 switchToVirtualThread(this); 905 } 906 } 907 } 908 909 /** 910 * Re-enables this virtual thread for scheduling. If this virtual thread is parked 911 * then its task is scheduled to continue, otherwise its next call to {@code park} or 912 * {@linkplain #parkNanos(long) parkNanos} is guaranteed not to block. 913 * @throws RejectedExecutionException if the scheduler cannot accept a task 914 */ 915 @Override 916 void unpark() { 917 if (!getAndSetParkPermit(true) && currentThread() != this) { 918 int s = state(); 919 920 // unparked while parked 921 if ((s == PARKED || s == TIMED_PARKED) && compareAndSetState(s, UNPARKED)) { 922 submitRunContinuation(); 923 return; 924 } 925 926 // unparked while parked when pinned 927 if (s == PINNED || s == TIMED_PINNED) { 928 // unpark carrier thread when pinned 929 disableSuspendAndPreempt(); 930 try { 931 synchronized (carrierThreadAccessLock()) { 932 Thread carrier = carrierThread; 933 if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) { 934 U.unpark(carrier); 935 } 936 } 937 } finally { 938 enableSuspendAndPreempt(); 939 } 940 return; 941 } 942 } 943 } 944 945 /** 946 * Invoked by unblocker thread to unblock this virtual thread. 947 */ 948 private void unblock() { 949 assert !Thread.currentThread().isVirtual(); 950 unblocked = true; 951 if (state() == BLOCKED && compareAndSetState(BLOCKED, UNBLOCKED)) { 952 unblocked = false; 953 submitRunContinuation(); 954 } 955 } 956 957 /** 958 * Invoked by timer thread when wait timeout for virtual thread has expired. 959 * If the virtual thread is in timed-wait then this method will unblock the thread 960 * and submit its task so that it continues and attempts to reenter the monitor. 961 * This method does nothing if the thread has been woken by notify or interrupt. 962 */ 963 private void waitTimeoutExpired(byte nounce) { 964 assert !Thread.currentThread().isVirtual(); 965 for (;;) { 966 boolean unblocked = false; 967 synchronized (timedWaitLock()) { 968 if (nounce != timedWaitNonce) { 969 // this timeout task is for a past timed-wait 970 return; 971 } 972 int s = state(); 973 if (s == TIMED_WAIT) { 974 unblocked = compareAndSetState(TIMED_WAIT, UNBLOCKED); 975 } else if (s != (TIMED_WAIT | SUSPENDED)) { 976 // notified or interrupted, no longer waiting 977 return; 978 } 979 } 980 if (unblocked) { 981 submitRunContinuation(); 982 return; 983 } 984 // need to retry when thread is suspended in time-wait 985 Thread.yield(); 986 } 987 } 988 989 /** 990 * Invoked by Object.wait to cancel the wait timer. 991 */ 992 void cancelWaitTimeout() { 993 assert Thread.currentThread() == this; 994 Future<?> timeoutTask = this.waitTimeoutTask; 995 if (timeoutTask != null) { 996 // Pin the continuation to prevent the virtual thread from unmounting 997 // when there is contention removing the task. This avoids deadlock that 998 // could arise due to carriers and virtual threads contending for a 999 // lock on the delay queue. 1000 Continuation.pin(); 1001 try { 1002 timeoutTask.cancel(false); 1003 } finally { 1004 Continuation.unpin(); 1005 } 1006 } 1007 } 1008 1009 /** 1010 * Attempts to yield the current virtual thread (Thread.yield). 1011 */ 1012 void tryYield() { 1013 assert Thread.currentThread() == this; 1014 setState(YIELDING); 1015 boolean yielded = false; 1016 try { 1017 yielded = yieldContinuation(); // may throw 1018 } finally { 1019 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 1020 if (!yielded) { 1021 assert state() == YIELDING; 1022 setState(RUNNING); 1023 } 1024 } 1025 } 1026 1027 /** 1028 * Sleep the current thread for the given sleep time (in nanoseconds). If 1029 * nanos is 0 then the thread will attempt to yield. 1030 * 1031 * @implNote This implementation parks the thread for the given sleeping time 1032 * and will therefore be observed in PARKED state during the sleep. Parking 1033 * will consume the parking permit so this method makes available the parking 1034 * permit after the sleep. This may be observed as a spurious, but benign, 1035 * wakeup when the thread subsequently attempts to park. 1036 * 1037 * @param nanos the maximum number of nanoseconds to sleep 1038 * @throws InterruptedException if interrupted while sleeping 1039 */ 1040 void sleepNanos(long nanos) throws InterruptedException { 1041 assert Thread.currentThread() == this && nanos >= 0; 1042 if (getAndClearInterrupt()) 1043 throw new InterruptedException(); 1044 if (nanos == 0) { 1045 tryYield(); 1046 } else { 1047 // park for the sleep time 1048 try { 1049 long remainingNanos = nanos; 1050 long startNanos = System.nanoTime(); 1051 while (remainingNanos > 0) { 1052 parkNanos(remainingNanos); 1053 if (getAndClearInterrupt()) { 1054 throw new InterruptedException(); 1055 } 1056 remainingNanos = nanos - (System.nanoTime() - startNanos); 1057 } 1058 } finally { 1059 // may have been unparked while sleeping 1060 setParkPermit(true); 1061 } 1062 } 1063 } 1064 1065 /** 1066 * Waits up to {@code nanos} nanoseconds for this virtual thread to terminate. 1067 * A timeout of {@code 0} means to wait forever. 1068 * 1069 * @throws InterruptedException if interrupted while waiting 1070 * @return true if the thread has terminated 1071 */ 1072 boolean joinNanos(long nanos) throws InterruptedException { 1073 if (state() == TERMINATED) 1074 return true; 1075 1076 // ensure termination object exists, then re-check state 1077 CountDownLatch termination = getTermination(); 1078 if (state() == TERMINATED) 1079 return true; 1080 1081 // wait for virtual thread to terminate 1082 if (nanos == 0) { 1083 termination.await(); 1084 } else { 1085 boolean terminated = termination.await(nanos, NANOSECONDS); 1086 if (!terminated) { 1087 // waiting time elapsed 1088 return false; 1089 } 1090 } 1091 assert state() == TERMINATED; 1092 return true; 1093 } 1094 1095 @Override 1096 void blockedOn(Interruptible b) { 1097 disableSuspendAndPreempt(); 1098 try { 1099 super.blockedOn(b); 1100 } finally { 1101 enableSuspendAndPreempt(); 1102 } 1103 } 1104 1105 @Override 1106 @SuppressWarnings("removal") 1107 public void interrupt() { 1108 if (Thread.currentThread() != this) { 1109 checkAccess(); 1110 1111 // if current thread is a virtual thread then prevent it from being 1112 // suspended or unmounted when entering or holding interruptLock 1113 Interruptible blocker; 1114 disableSuspendAndPreempt(); 1115 try { 1116 synchronized (interruptLock) { 1117 interrupted = true; 1118 blocker = nioBlocker(); 1119 if (blocker != null) { 1120 blocker.interrupt(this); 1121 } 1122 1123 // interrupt carrier thread if mounted 1124 Thread carrier = carrierThread; 1125 if (carrier != null) carrier.setInterrupt(); 1126 } 1127 } finally { 1128 enableSuspendAndPreempt(); 1129 } 1130 1131 // notify blocker after releasing interruptLock 1132 if (blocker != null) { 1133 blocker.postInterrupt(); 1134 } 1135 1136 // make available parking permit, unpark thread if parked 1137 unpark(); 1138 1139 // if thread is waiting in Object.wait then schedule to try to reenter 1140 int s = state(); 1141 if ((s == WAIT || s == TIMED_WAIT) && compareAndSetState(s, UNBLOCKED)) { 1142 submitRunContinuation(); 1143 } 1144 1145 } else { 1146 interrupted = true; 1147 carrierThread.setInterrupt(); 1148 setParkPermit(true); 1149 } 1150 } 1151 1152 @Override 1153 public boolean isInterrupted() { 1154 return interrupted; 1155 } 1156 1157 @Override 1158 boolean getAndClearInterrupt() { 1159 assert Thread.currentThread() == this; 1160 boolean oldValue = interrupted; 1161 if (oldValue) { 1162 disableSuspendAndPreempt(); 1163 try { 1164 synchronized (interruptLock) { 1165 interrupted = false; 1166 carrierThread.clearInterrupt(); 1167 } 1168 } finally { 1169 enableSuspendAndPreempt(); 1170 } 1171 } 1172 return oldValue; 1173 } 1174 1175 @Override 1176 Thread.State threadState() { 1177 int s = state(); 1178 switch (s & ~SUSPENDED) { 1179 case NEW: 1180 return Thread.State.NEW; 1181 case STARTED: 1182 // return NEW if thread container not yet set 1183 if (threadContainer() == null) { 1184 return Thread.State.NEW; 1185 } else { 1186 return Thread.State.RUNNABLE; 1187 } 1188 case UNPARKED: 1189 case YIELDED: 1190 // runnable, not mounted 1191 return Thread.State.RUNNABLE; 1192 case UNBLOCKED: 1193 // if designated responsible thread for monitor then thread is blocked 1194 if (isResponsibleForMonitor()) { 1195 return Thread.State.BLOCKED; 1196 } else { 1197 return Thread.State.RUNNABLE; 1198 } 1199 case RUNNING: 1200 // if designated responsible thread for monitor then thread is blocked 1201 if (isResponsibleForMonitor()) { 1202 return Thread.State.BLOCKED; 1203 } 1204 // if mounted then return state of carrier thread 1205 if (Thread.currentThread() != this) { 1206 disableSuspendAndPreempt(); 1207 try { 1208 synchronized (carrierThreadAccessLock()) { 1209 Thread carrierThread = this.carrierThread; 1210 if (carrierThread != null) { 1211 return carrierThread.threadState(); 1212 } 1213 } 1214 } finally { 1215 enableSuspendAndPreempt(); 1216 } 1217 } 1218 // runnable, mounted 1219 return Thread.State.RUNNABLE; 1220 case PARKING: 1221 case TIMED_PARKING: 1222 case YIELDING: 1223 case WAITING: 1224 case TIMED_WAITING: 1225 // runnable, in transition 1226 return Thread.State.RUNNABLE; 1227 case PARKED: 1228 case PINNED: 1229 case WAIT: 1230 return State.WAITING; 1231 case TIMED_PARKED: 1232 case TIMED_PINNED: 1233 case TIMED_WAIT: 1234 return State.TIMED_WAITING; 1235 case BLOCKING: 1236 case BLOCKED: 1237 return State.BLOCKED; 1238 case TERMINATED: 1239 return Thread.State.TERMINATED; 1240 default: 1241 throw new InternalError(); 1242 } 1243 } 1244 1245 /** 1246 * Returns true if thread is the designated responsible thread for a monitor. 1247 * See objectMonitor.cpp for details. 1248 */ 1249 private boolean isResponsibleForMonitor() { 1250 return (recheckInterval > 0); 1251 } 1252 1253 @Override 1254 boolean alive() { 1255 int s = state; 1256 return (s != NEW && s != TERMINATED); 1257 } 1258 1259 @Override 1260 boolean isTerminated() { 1261 return (state == TERMINATED); 1262 } 1263 1264 @Override 1265 StackTraceElement[] asyncGetStackTrace() { 1266 StackTraceElement[] stackTrace; 1267 do { 1268 stackTrace = (carrierThread != null) 1269 ? super.asyncGetStackTrace() // mounted 1270 : tryGetStackTrace(); // unmounted 1271 if (stackTrace == null) { 1272 Thread.yield(); 1273 } 1274 } while (stackTrace == null); 1275 return stackTrace; 1276 } 1277 1278 /** 1279 * Returns the stack trace for this virtual thread if it is unmounted. 1280 * Returns null if the thread is mounted or in transition. 1281 */ 1282 private StackTraceElement[] tryGetStackTrace() { 1283 int initialState = state() & ~SUSPENDED; 1284 switch (initialState) { 1285 case NEW, STARTED, TERMINATED -> { 1286 return new StackTraceElement[0]; // unmounted, empty stack 1287 } 1288 case RUNNING, PINNED, TIMED_PINNED -> { 1289 return null; // mounted 1290 } 1291 case PARKED, TIMED_PARKED, BLOCKED, WAIT, TIMED_WAIT -> { 1292 // unmounted, not runnable 1293 } 1294 case UNPARKED, UNBLOCKED, YIELDED -> { 1295 // unmounted, runnable 1296 } 1297 case PARKING, TIMED_PARKING, BLOCKING, YIELDING, WAITING, TIMED_WAITING -> { 1298 return null; // in transition 1299 } 1300 default -> throw new InternalError("" + initialState); 1301 } 1302 1303 // thread is unmounted, prevent it from continuing 1304 int suspendedState = initialState | SUSPENDED; 1305 if (!compareAndSetState(initialState, suspendedState)) { 1306 return null; 1307 } 1308 1309 // get stack trace and restore state 1310 StackTraceElement[] stack; 1311 try { 1312 stack = cont.getStackTrace(); 1313 } finally { 1314 assert state == suspendedState; 1315 setState(initialState); 1316 } 1317 boolean resubmit = switch (initialState) { 1318 case UNPARKED, UNBLOCKED, YIELDED -> { 1319 // resubmit as task may have run while suspended 1320 yield true; 1321 } 1322 case PARKED, TIMED_PARKED -> { 1323 // resubmit if unparked while suspended 1324 yield parkPermit && compareAndSetState(initialState, UNPARKED); 1325 } 1326 case BLOCKED -> { 1327 // resubmit if unblocked while suspended 1328 yield unblocked && compareAndSetState(BLOCKED, UNBLOCKED); 1329 } 1330 case WAIT, TIMED_WAIT -> { 1331 // resubmit if notified or interrupted while waiting (Object.wait) 1332 // waitTimeoutExpired will retry if the timed expired when suspended 1333 yield (notified || interrupted) && compareAndSetState(initialState, UNBLOCKED); 1334 } 1335 default -> throw new InternalError(); 1336 }; 1337 if (resubmit) { 1338 submitRunContinuation(); 1339 } 1340 return stack; 1341 } 1342 1343 @Override 1344 public String toString() { 1345 StringBuilder sb = new StringBuilder("VirtualThread[#"); 1346 sb.append(threadId()); 1347 String name = getName(); 1348 if (!name.isEmpty()) { 1349 sb.append(","); 1350 sb.append(name); 1351 } 1352 sb.append("]/"); 1353 1354 // add the carrier state and thread name when mounted 1355 boolean mounted; 1356 if (Thread.currentThread() == this) { 1357 mounted = appendCarrierInfo(sb); 1358 } else { 1359 disableSuspendAndPreempt(); 1360 try { 1361 synchronized (carrierThreadAccessLock()) { 1362 mounted = appendCarrierInfo(sb); 1363 } 1364 } finally { 1365 enableSuspendAndPreempt(); 1366 } 1367 } 1368 1369 // add virtual thread state when not mounted 1370 if (!mounted) { 1371 String stateAsString = threadState().toString(); 1372 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1373 } 1374 1375 return sb.toString(); 1376 } 1377 1378 /** 1379 * Appends the carrier state and thread name to the string buffer if mounted. 1380 * @return true if mounted, false if not mounted 1381 */ 1382 private boolean appendCarrierInfo(StringBuilder sb) { 1383 assert Thread.currentThread() == this || Thread.holdsLock(carrierThreadAccessLock()); 1384 Thread carrier = carrierThread; 1385 if (carrier != null) { 1386 String stateAsString = carrier.threadState().toString(); 1387 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1388 sb.append('@'); 1389 sb.append(carrier.getName()); 1390 return true; 1391 } else { 1392 return false; 1393 } 1394 } 1395 1396 @Override 1397 public int hashCode() { 1398 return (int) threadId(); 1399 } 1400 1401 @Override 1402 public boolean equals(Object obj) { 1403 return obj == this; 1404 } 1405 1406 /** 1407 * Returns the termination object, creating it if needed. 1408 */ 1409 private CountDownLatch getTermination() { 1410 CountDownLatch termination = this.termination; 1411 if (termination == null) { 1412 termination = new CountDownLatch(1); 1413 if (!U.compareAndSetReference(this, TERMINATION, null, termination)) { 1414 termination = this.termination; 1415 } 1416 } 1417 return termination; 1418 } 1419 1420 /** 1421 * Returns the lock object to synchronize on when accessing carrierThread. 1422 * The lock prevents carrierThread from being reset to null during unmount. 1423 */ 1424 private Object carrierThreadAccessLock() { 1425 // return interruptLock as unmount has to coordinate with interrupt 1426 return interruptLock; 1427 } 1428 1429 /** 1430 * Returns a lock object to coordinating timed-wait setup and timeout handling. 1431 */ 1432 private Object timedWaitLock() { 1433 // use this object for now to avoid the overhead of introducing another lock 1434 return runContinuation; 1435 } 1436 1437 /** 1438 * Disallow the current thread be suspended or preempted. 1439 */ 1440 private void disableSuspendAndPreempt() { 1441 notifyJvmtiDisableSuspend(true); 1442 Continuation.pin(); 1443 } 1444 1445 /** 1446 * Allow the current thread be suspended or preempted. 1447 */ 1448 private void enableSuspendAndPreempt() { 1449 Continuation.unpin(); 1450 notifyJvmtiDisableSuspend(false); 1451 } 1452 1453 // -- wrappers for get/set of state, parking permit, and carrier thread -- 1454 1455 private int state() { 1456 return state; // volatile read 1457 } 1458 1459 private void setState(int newValue) { 1460 state = newValue; // volatile write 1461 } 1462 1463 private boolean compareAndSetState(int expectedValue, int newValue) { 1464 return U.compareAndSetInt(this, STATE, expectedValue, newValue); 1465 } 1466 1467 private boolean compareAndSetOnWaitingList(byte expectedValue, byte newValue) { 1468 return U.compareAndSetByte(this, ON_WAITING_LIST, expectedValue, newValue); 1469 } 1470 1471 private void setParkPermit(boolean newValue) { 1472 if (parkPermit != newValue) { 1473 parkPermit = newValue; 1474 } 1475 } 1476 1477 private boolean getAndSetParkPermit(boolean newValue) { 1478 if (parkPermit != newValue) { 1479 return U.getAndSetBoolean(this, PARK_PERMIT, newValue); 1480 } else { 1481 return newValue; 1482 } 1483 } 1484 1485 private void setCarrierThread(Thread carrier) { 1486 // U.putReferenceRelease(this, CARRIER_THREAD, carrier); 1487 this.carrierThread = carrier; 1488 } 1489 1490 @IntrinsicCandidate 1491 private static native void setLockId(long tid); 1492 1493 // -- JVM TI support -- 1494 1495 @IntrinsicCandidate 1496 @JvmtiMountTransition 1497 private native void notifyJvmtiStart(); 1498 1499 @IntrinsicCandidate 1500 @JvmtiMountTransition 1501 private native void notifyJvmtiEnd(); 1502 1503 @IntrinsicCandidate 1504 @JvmtiMountTransition 1505 private native void notifyJvmtiMount(boolean hide); 1506 1507 @IntrinsicCandidate 1508 @JvmtiMountTransition 1509 private native void notifyJvmtiUnmount(boolean hide); 1510 1511 @IntrinsicCandidate 1512 @JvmtiMountTransition 1513 private static native void notifyJvmtiHideFrames(boolean hide); 1514 1515 @IntrinsicCandidate 1516 private static native void notifyJvmtiDisableSuspend(boolean enter); 1517 1518 private static native void registerNatives(); 1519 static { 1520 registerNatives(); 1521 1522 // ensure VTHREAD_GROUP is created, may be accessed by JVMTI 1523 var group = Thread.virtualThreadGroup(); 1524 } 1525 1526 /** 1527 * Creates the default scheduler. 1528 * If the system property {@code jdk.virtualThreadScheduler.implClass} is set then 1529 * its value is the name of a class that implements java.util.concurrent.Executor. 1530 * The class is public in an exported package, has a public no-arg constructor, 1531 * and is visible to the system class loader. 1532 * If the system property is not set then the default scheduler will be a 1533 * ForkJoinPool instance. 1534 */ 1535 private static Executor createDefaultScheduler() { 1536 String propName = "jdk.virtualThreadScheduler.implClass"; 1537 String propValue = GetPropertyAction.privilegedGetProperty(propName); 1538 if (propValue != null) { 1539 try { 1540 Class<?> clazz = Class.forName(propValue, true, 1541 ClassLoader.getSystemClassLoader()); 1542 Constructor<?> ctor = clazz.getConstructor(); 1543 return (Executor) ctor.newInstance(); 1544 } catch (Exception ex) { 1545 throw new Error(ex); 1546 } 1547 } else { 1548 return createDefaultForkJoinPoolScheduler(); 1549 } 1550 } 1551 1552 /** 1553 * Creates the default ForkJoinPool scheduler. 1554 */ 1555 @SuppressWarnings("removal") 1556 private static ForkJoinPool createDefaultForkJoinPoolScheduler() { 1557 ForkJoinWorkerThreadFactory factory = pool -> { 1558 PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread(pool); 1559 return AccessController.doPrivileged(pa); 1560 }; 1561 PrivilegedAction<ForkJoinPool> pa = () -> { 1562 int parallelism, maxPoolSize, minRunnable; 1563 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism"); 1564 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize"); 1565 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable"); 1566 if (parallelismValue != null) { 1567 parallelism = Integer.parseInt(parallelismValue); 1568 } else { 1569 parallelism = Runtime.getRuntime().availableProcessors(); 1570 } 1571 if (maxPoolSizeValue != null) { 1572 maxPoolSize = Integer.parseInt(maxPoolSizeValue); 1573 parallelism = Integer.min(parallelism, maxPoolSize); 1574 } else { 1575 maxPoolSize = Integer.max(parallelism, 256); 1576 } 1577 if (minRunnableValue != null) { 1578 minRunnable = Integer.parseInt(minRunnableValue); 1579 } else { 1580 minRunnable = Integer.max(parallelism / 2, 1); 1581 } 1582 Thread.UncaughtExceptionHandler handler = (t, e) -> { }; 1583 boolean asyncMode = true; // FIFO 1584 return new ForkJoinPool(parallelism, factory, handler, asyncMode, 1585 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS); 1586 }; 1587 return AccessController.doPrivileged(pa); 1588 } 1589 1590 /** 1591 * Schedule a runnable task to run after a delay. 1592 */ 1593 private static Future<?> schedule(Runnable command, long delay, TimeUnit unit) { 1594 long tid = Thread.currentThread().threadId(); 1595 int index = (int) tid & (DELAYED_TASK_SCHEDULERS.length - 1); 1596 return DELAYED_TASK_SCHEDULERS[index].schedule(command, delay, unit); 1597 } 1598 1599 /** 1600 * Creates the ScheduledThreadPoolExecutors used to execute delayed tasks. 1601 */ 1602 private static ScheduledExecutorService[] createDelayedTaskSchedulers() { 1603 String propName = "jdk.virtualThreadScheduler.timerQueues"; 1604 String propValue = GetPropertyAction.privilegedGetProperty(propName); 1605 int queueCount; 1606 if (propValue != null) { 1607 queueCount = Integer.parseInt(propValue); 1608 if (queueCount != Integer.highestOneBit(queueCount)) { 1609 throw new RuntimeException("Value of " + propName + " must be power of 2"); 1610 } 1611 } else { 1612 int ncpus = Runtime.getRuntime().availableProcessors(); 1613 queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1); 1614 } 1615 var schedulers = new ScheduledExecutorService[queueCount]; 1616 for (int i = 0; i < queueCount; i++) { 1617 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) 1618 Executors.newScheduledThreadPool(1, task -> { 1619 Thread t = InnocuousThread.newThread("VirtualThread-unparker", task); 1620 t.setDaemon(true); 1621 return t; 1622 }); 1623 stpe.setRemoveOnCancelPolicy(true); 1624 schedulers[i] = stpe; 1625 } 1626 return schedulers; 1627 } 1628 1629 /** 1630 * Schedule virtual threads that are ready to be scheduled after they blocked on 1631 * monitor enter. 1632 */ 1633 private static void unblockVirtualThreads() { 1634 while (true) { 1635 VirtualThread vthread = takeVirtualThreadListToUnblock(); 1636 while (vthread != null) { 1637 assert vthread.onWaitingList == 1; 1638 VirtualThread nextThread = vthread.next; 1639 1640 // remove from list and unblock 1641 vthread.next = null; 1642 boolean changed = vthread.compareAndSetOnWaitingList((byte) 1, (byte) 0); 1643 assert changed; 1644 vthread.unblock(); 1645 1646 vthread = nextThread; 1647 } 1648 } 1649 } 1650 1651 /** 1652 * Retrieves the list of virtual threads that are waiting to be unblocked, waiting 1653 * if necessary until a list of one or more threads becomes available. 1654 */ 1655 private static native VirtualThread takeVirtualThreadListToUnblock(); 1656 1657 static { 1658 var unblocker = InnocuousThread.newThread("VirtualThread-unblocker", 1659 VirtualThread::unblockVirtualThreads); 1660 unblocker.setDaemon(true); 1661 unblocker.start(); 1662 } 1663 }