1 /* 2 * Copyright (c) 2018, 2026, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.lang; 26 27 import java.util.Locale; 28 import java.util.Objects; 29 import java.util.concurrent.CountDownLatch; 30 import java.util.concurrent.Executor; 31 import java.util.concurrent.Executors; 32 import java.util.concurrent.ForkJoinPool; 33 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory; 34 import java.util.concurrent.ForkJoinTask; 35 import java.util.concurrent.Future; 36 import java.util.concurrent.RejectedExecutionException; 37 import java.util.concurrent.ScheduledExecutorService; 38 import java.util.concurrent.ScheduledThreadPoolExecutor; 39 import java.util.concurrent.TimeUnit; 40 import jdk.internal.event.VirtualThreadEndEvent; 41 import jdk.internal.event.VirtualThreadStartEvent; 42 import jdk.internal.event.VirtualThreadSubmitFailedEvent; 43 import jdk.internal.misc.CarrierThread; 44 import jdk.internal.misc.InnocuousThread; 45 import jdk.internal.misc.Unsafe; 46 import jdk.internal.vm.Continuation; 47 import jdk.internal.vm.ContinuationScope; 48 import jdk.internal.vm.StackableScope; 49 import jdk.internal.vm.ThreadContainer; 50 import jdk.internal.vm.ThreadContainers; 51 import jdk.internal.vm.annotation.ChangesCurrentThread; 52 import jdk.internal.vm.annotation.Hidden; 53 import jdk.internal.vm.annotation.IntrinsicCandidate; 54 import jdk.internal.vm.annotation.JvmtiHideEvents; 55 import jdk.internal.vm.annotation.JvmtiMountTransition; 56 import jdk.internal.vm.annotation.ReservedStackAccess; 57 import sun.nio.ch.Interruptible; 58 import static java.util.concurrent.TimeUnit.*; 59 60 /** 61 * A thread that is scheduled by the Java virtual machine rather than the operating system. 62 */ 63 final class VirtualThread extends BaseVirtualThread { 64 private static final Unsafe U = Unsafe.getUnsafe(); 65 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads"); 66 private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler(); 67 68 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state"); 69 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit"); 70 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread"); 71 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination"); 72 private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList"); 73 74 // scheduler and continuation 75 private final Executor scheduler; 76 private final Continuation cont; 77 private final Runnable runContinuation; 78 79 // virtual thread state, accessed by VM 80 private volatile int state; 81 82 /* 83 * Virtual thread state transitions: 84 * 85 * NEW -> STARTED // Thread.start, schedule to run 86 * STARTED -> TERMINATED // failed to start 87 * STARTED -> RUNNING // first run 88 * RUNNING -> TERMINATED // done 89 * 90 * RUNNING -> PARKING // Thread parking with LockSupport.park 91 * PARKING -> PARKED // cont.yield successful, parked indefinitely 92 * PARKED -> UNPARKED // unparked, may be scheduled to continue 93 * UNPARKED -> RUNNING // continue execution after park 94 * 95 * PARKING -> RUNNING // cont.yield failed, need to park on carrier 96 * RUNNING -> PINNED // park on carrier 97 * PINNED -> RUNNING // unparked, continue execution on same carrier 98 * 99 * RUNNING -> TIMED_PARKING // Thread parking with LockSupport.parkNanos 100 * TIMED_PARKING -> TIMED_PARKED // cont.yield successful, timed-parked 101 * TIMED_PARKED -> UNPARKED // unparked, may be scheduled to continue 102 * 103 * TIMED_PARKING -> RUNNING // cont.yield failed, need to park on carrier 104 * RUNNING -> TIMED_PINNED // park on carrier 105 * TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier 106 * 107 * RUNNING -> BLOCKING // blocking on monitor enter 108 * BLOCKING -> BLOCKED // blocked on monitor enter 109 * BLOCKED -> UNBLOCKED // unblocked, may be scheduled to continue 110 * UNBLOCKED -> RUNNING // continue execution after blocked on monitor enter 111 * 112 * RUNNING -> WAITING // transitional state during wait on monitor 113 * WAITING -> WAIT // waiting on monitor 114 * WAIT -> BLOCKED // notified, waiting to be unblocked by monitor owner 115 * WAIT -> UNBLOCKED // interrupted 116 * 117 * RUNNING -> TIMED_WAITING // transition state during timed-waiting on monitor 118 * TIMED_WAITING -> TIMED_WAIT // timed-waiting on monitor 119 * TIMED_WAIT -> BLOCKED // notified, waiting to be unblocked by monitor owner 120 * TIMED_WAIT -> UNBLOCKED // timed-out/interrupted 121 * 122 * RUNNING -> YIELDING // Thread.yield 123 * YIELDING -> YIELDED // cont.yield successful, may be scheduled to continue 124 * YIELDING -> RUNNING // cont.yield failed 125 * YIELDED -> RUNNING // continue execution after Thread.yield 126 */ 127 private static final int NEW = 0; 128 private static final int STARTED = 1; 129 private static final int RUNNING = 2; // runnable-mounted 130 131 // untimed and timed parking 132 private static final int PARKING = 3; 133 private static final int PARKED = 4; // unmounted 134 private static final int PINNED = 5; // mounted 135 private static final int TIMED_PARKING = 6; 136 private static final int TIMED_PARKED = 7; // unmounted 137 private static final int TIMED_PINNED = 8; // mounted 138 private static final int UNPARKED = 9; // unmounted but runnable 139 140 // Thread.yield 141 private static final int YIELDING = 10; 142 private static final int YIELDED = 11; // unmounted but runnable 143 144 // monitor enter 145 private static final int BLOCKING = 12; 146 private static final int BLOCKED = 13; // unmounted 147 private static final int UNBLOCKED = 14; // unmounted but runnable 148 149 // monitor wait/timed-wait 150 private static final int WAITING = 15; 151 private static final int WAIT = 16; // waiting in Object.wait 152 private static final int TIMED_WAITING = 17; 153 private static final int TIMED_WAIT = 18; // waiting in timed-Object.wait 154 155 private static final int TERMINATED = 99; // final state 156 157 // parking permit made available by LockSupport.unpark 158 private volatile boolean parkPermit; 159 160 // blocking permit made available by unblocker thread when another thread exits monitor 161 private volatile boolean blockPermit; 162 163 // true when on the list of virtual threads waiting to be unblocked 164 private volatile boolean onWaitingList; 165 166 // next virtual thread on the list of virtual threads waiting to be unblocked 167 private volatile VirtualThread next; 168 169 // notified by Object.notify/notifyAll while waiting in Object.wait 170 private volatile boolean notified; 171 172 // true when waiting in Object.wait, false for VM internal uninterruptible Object.wait 173 private volatile boolean interruptibleWait; 174 175 // timed-wait support 176 private byte timedWaitSeqNo; 177 178 // timeout for timed-park and timed-wait, only accessed on current/carrier thread 179 private long timeout; 180 181 // timer task for timed-park and timed-wait, only accessed on current/carrier thread 182 private Future<?> timeoutTask; 183 184 // carrier thread when mounted, accessed by VM 185 private volatile Thread carrierThread; 186 187 // termination object when joining, created lazily if needed 188 private volatile CountDownLatch termination; 189 190 /** 191 * Returns the default scheduler. 192 */ 193 static Executor defaultScheduler() { 194 return DEFAULT_SCHEDULER; 195 } 196 197 /** 198 * Returns the continuation scope used for virtual threads. 199 */ 200 static ContinuationScope continuationScope() { 201 return VTHREAD_SCOPE; 202 } 203 204 /** 205 * Creates a new {@code VirtualThread} to run the given task with the given 206 * scheduler. If the given scheduler is {@code null} and the current thread 207 * is a platform thread then the newly created virtual thread will use the 208 * default scheduler. If given scheduler is {@code null} and the current 209 * thread is a virtual thread then the current thread's scheduler is used. 210 * 211 * @param scheduler the scheduler or null 212 * @param name thread name 213 * @param characteristics characteristics 214 * @param task the task to execute 215 */ 216 VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) { 217 super(name, characteristics, /*bound*/ false); 218 Objects.requireNonNull(task); 219 220 // choose scheduler if not specified 221 if (scheduler == null) { 222 Thread parent = Thread.currentThread(); 223 if (parent instanceof VirtualThread vparent) { 224 scheduler = vparent.scheduler; 225 } else { 226 scheduler = DEFAULT_SCHEDULER; 227 } 228 } 229 230 this.scheduler = scheduler; 231 this.cont = new VThreadContinuation(this, task); 232 this.runContinuation = this::runContinuation; 233 } 234 235 /** 236 * The continuation that a virtual thread executes. 237 */ 238 private static class VThreadContinuation extends Continuation { 239 VThreadContinuation(VirtualThread vthread, Runnable task) { 240 super(VTHREAD_SCOPE, wrap(vthread, task)); 241 } 242 @Override 243 protected void onPinned(Continuation.Pinned reason) { 244 } 245 private static Runnable wrap(VirtualThread vthread, Runnable task) { 246 return new Runnable() { 247 @Hidden 248 @JvmtiHideEvents 249 public void run() { 250 vthread.endFirstTransition(); 251 try { 252 vthread.run(task); 253 } finally { 254 vthread.startFinalTransition(); 255 } 256 } 257 }; 258 } 259 } 260 261 /** 262 * Runs or continues execution on the current thread. The virtual thread is mounted 263 * on the current thread before the task runs or continues. It unmounts when the 264 * task completes or yields. 265 */ 266 @ChangesCurrentThread // allow mount/unmount to be inlined 267 private void runContinuation() { 268 // the carrier must be a platform thread 269 if (Thread.currentThread().isVirtual()) { 270 throw new WrongThreadException(); 271 } 272 273 // set state to RUNNING 274 int initialState = state(); 275 if (initialState == STARTED || initialState == UNPARKED 276 || initialState == UNBLOCKED || initialState == YIELDED) { 277 // newly started or continue after parking/blocking/Thread.yield 278 if (!compareAndSetState(initialState, RUNNING)) { 279 return; 280 } 281 // consume permit when continuing after parking or blocking. If continue 282 // after a timed-park or timed-wait then the timeout task is cancelled. 283 if (initialState == UNPARKED) { 284 cancelTimeoutTask(); 285 setParkPermit(false); 286 } else if (initialState == UNBLOCKED) { 287 cancelTimeoutTask(); 288 blockPermit = false; 289 } 290 } else { 291 // not runnable 292 return; 293 } 294 295 mount(); 296 try { 297 cont.run(); 298 } finally { 299 unmount(); 300 if (cont.isDone()) { 301 afterDone(); 302 } else { 303 afterYield(); 304 } 305 } 306 } 307 308 /** 309 * Cancel timeout task when continuing after timed-park or timed-wait. 310 * The timeout task may be executing, or may have already completed. 311 */ 312 private void cancelTimeoutTask() { 313 if (timeoutTask != null) { 314 timeoutTask.cancel(false); 315 timeoutTask = null; 316 } 317 } 318 319 /** 320 * Submits the given task to the given executor. If the scheduler is a 321 * ForkJoinPool then the task is first adapted to a ForkJoinTask. 322 */ 323 private void submit(Executor executor, Runnable task) { 324 if (executor instanceof ForkJoinPool pool) { 325 pool.submit(ForkJoinTask.adapt(task)); 326 } else { 327 executor.execute(task); 328 } 329 } 330 331 /** 332 * Submits the runContinuation task to the scheduler. For the default scheduler, 333 * and calling it on a worker thread, the task will be pushed to the local queue, 334 * otherwise it will be pushed to an external submission queue. 335 * @param scheduler the scheduler 336 * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown 337 * @throws RejectedExecutionException 338 */ 339 private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) { 340 boolean done = false; 341 while (!done) { 342 try { 343 // Pin the continuation to prevent the virtual thread from unmounting 344 // when submitting a task. For the default scheduler this ensures that 345 // the carrier doesn't change when pushing a task. For other schedulers 346 // it avoids deadlock that could arise due to carriers and virtual 347 // threads contending for a lock. 348 if (currentThread().isVirtual()) { 349 Continuation.pin(); 350 try { 351 submit(scheduler, runContinuation); 352 } finally { 353 Continuation.unpin(); 354 } 355 } else { 356 submit(scheduler, runContinuation); 357 } 358 done = true; 359 } catch (RejectedExecutionException ree) { 360 submitFailed(ree); 361 throw ree; 362 } catch (OutOfMemoryError e) { 363 if (retryOnOOME) { 364 U.park(false, 100_000_000); // 100ms 365 } else { 366 throw e; 367 } 368 } 369 } 370 } 371 372 /** 373 * Submits the runContinuation task to the given scheduler as an external submit. 374 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 375 * @throws RejectedExecutionException 376 * @see ForkJoinPool#externalSubmit(ForkJoinTask) 377 */ 378 private void externalSubmitRunContinuation(ForkJoinPool pool) { 379 assert Thread.currentThread() instanceof CarrierThread; 380 try { 381 pool.externalSubmit(ForkJoinTask.adapt(runContinuation)); 382 } catch (RejectedExecutionException ree) { 383 submitFailed(ree); 384 throw ree; 385 } catch (OutOfMemoryError e) { 386 submitRunContinuation(pool, true); 387 } 388 } 389 390 /** 391 * Submits the runContinuation task to the scheduler. For the default scheduler, 392 * and calling it on a worker thread, the task will be pushed to the local queue, 393 * otherwise it will be pushed to an external submission queue. 394 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 395 * @throws RejectedExecutionException 396 */ 397 private void submitRunContinuation() { 398 submitRunContinuation(scheduler, true); 399 } 400 401 /** 402 * Lazy submit the runContinuation task if invoked on a carrier thread and its local 403 * queue is empty. If not empty, or invoked by another thread, then this method works 404 * like submitRunContinuation and just submits the task to the scheduler. 405 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 406 * @throws RejectedExecutionException 407 * @see ForkJoinPool#lazySubmit(ForkJoinTask) 408 */ 409 private void lazySubmitRunContinuation() { 410 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 411 ForkJoinPool pool = ct.getPool(); 412 try { 413 pool.lazySubmit(ForkJoinTask.adapt(runContinuation)); 414 } catch (RejectedExecutionException ree) { 415 submitFailed(ree); 416 throw ree; 417 } catch (OutOfMemoryError e) { 418 submitRunContinuation(); 419 } 420 } else { 421 submitRunContinuation(); 422 } 423 } 424 425 /** 426 * Submits the runContinuation task to the scheduler. For the default scheduler, and 427 * calling it a virtual thread that uses the default scheduler, the task will be 428 * pushed to an external submission queue. This method may throw OutOfMemoryError. 429 * @throws RejectedExecutionException 430 * @throws OutOfMemoryError 431 */ 432 private void externalSubmitRunContinuationOrThrow() { 433 if (scheduler == DEFAULT_SCHEDULER && currentCarrierThread() instanceof CarrierThread ct) { 434 try { 435 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation)); 436 } catch (RejectedExecutionException ree) { 437 submitFailed(ree); 438 throw ree; 439 } 440 } else { 441 submitRunContinuation(scheduler, false); 442 } 443 } 444 445 /** 446 * If enabled, emits a JFR VirtualThreadSubmitFailedEvent. 447 */ 448 private void submitFailed(RejectedExecutionException ree) { 449 var event = new VirtualThreadSubmitFailedEvent(); 450 if (event.isEnabled()) { 451 event.javaThreadId = threadId(); 452 event.exceptionMessage = ree.getMessage(); 453 event.commit(); 454 } 455 } 456 457 /** 458 * Runs a task in the context of this virtual thread. 459 */ 460 private void run(Runnable task) { 461 assert Thread.currentThread() == this && state == RUNNING; 462 463 // emit JFR event if enabled 464 if (VirtualThreadStartEvent.isTurnedOn()) { 465 var event = new VirtualThreadStartEvent(); 466 event.javaThreadId = threadId(); 467 event.commit(); 468 } 469 470 Object bindings = Thread.scopedValueBindings(); 471 try { 472 runWith(bindings, task); 473 } catch (Throwable exc) { 474 dispatchUncaughtException(exc); 475 } finally { 476 // pop any remaining scopes from the stack, this may block 477 StackableScope.popAll(); 478 479 // emit JFR event if enabled 480 if (VirtualThreadEndEvent.isTurnedOn()) { 481 var event = new VirtualThreadEndEvent(); 482 event.javaThreadId = threadId(); 483 event.commit(); 484 } 485 } 486 } 487 488 /** 489 * Mounts this virtual thread onto the current platform thread. On 490 * return, the current thread is the virtual thread. 491 */ 492 @ChangesCurrentThread 493 @ReservedStackAccess 494 private void mount() { 495 startTransition(/*mount*/true); 496 // We assume following volatile accesses provide equivalent 497 // of acquire ordering, otherwise we need U.loadFence() here. 498 499 // sets the carrier thread 500 Thread carrier = Thread.currentCarrierThread(); 501 setCarrierThread(carrier); 502 503 // sync up carrier thread interrupted status if needed 504 if (interrupted) { 505 carrier.setInterrupt(); 506 } else if (carrier.isInterrupted()) { 507 synchronized (interruptLock) { 508 // need to recheck interrupted status 509 if (!interrupted) { 510 carrier.clearInterrupt(); 511 } 512 } 513 } 514 515 // set Thread.currentThread() to return this virtual thread 516 carrier.setCurrentThread(this); 517 } 518 519 /** 520 * Unmounts this virtual thread from the carrier. On return, the 521 * current thread is the current platform thread. 522 */ 523 @ChangesCurrentThread 524 @ReservedStackAccess 525 private void unmount() { 526 assert !Thread.holdsLock(interruptLock); 527 528 // set Thread.currentThread() to return the platform thread 529 Thread carrier = this.carrierThread; 530 carrier.setCurrentThread(carrier); 531 532 // break connection to carrier thread, synchronized with interrupt 533 synchronized (interruptLock) { 534 setCarrierThread(null); 535 } 536 carrier.clearInterrupt(); 537 538 // We assume previous volatile accesses provide equivalent 539 // of release ordering, otherwise we need U.storeFence() here. 540 endTransition(/*mount*/false); 541 } 542 543 /** 544 * Invokes Continuation.yield, notifying JVMTI (if enabled) to hide frames until 545 * the continuation continues. 546 */ 547 @Hidden 548 private boolean yieldContinuation() { 549 startTransition(/*mount*/false); 550 try { 551 return Continuation.yield(VTHREAD_SCOPE); 552 } finally { 553 endTransition(/*mount*/true); 554 } 555 } 556 557 /** 558 * Invoked in the context of the carrier thread after the Continuation yields when 559 * parking, blocking on monitor enter, Object.wait, or Thread.yield. 560 */ 561 private void afterYield() { 562 assert carrierThread == null; 563 564 // re-adjust parallelism if the virtual thread yielded when compensating 565 if (currentThread() instanceof CarrierThread ct) { 566 ct.endBlocking(); 567 } 568 569 int s = state(); 570 571 // LockSupport.park/parkNanos 572 if (s == PARKING || s == TIMED_PARKING) { 573 int newState; 574 if (s == PARKING) { 575 setState(newState = PARKED); 576 } else { 577 // schedule unpark 578 long timeout = this.timeout; 579 assert timeout > 0; 580 timeoutTask = schedule(this::parkTimeoutExpired, timeout, NANOSECONDS); 581 setState(newState = TIMED_PARKED); 582 } 583 584 // may have been unparked while parking 585 if (parkPermit && compareAndSetState(newState, UNPARKED)) { 586 // lazy submit if local queue is empty 587 lazySubmitRunContinuation(); 588 } 589 return; 590 } 591 592 // Thread.yield 593 if (s == YIELDING) { 594 setState(YIELDED); 595 596 // external submit if there are no tasks in the local task queue 597 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 598 externalSubmitRunContinuation(ct.getPool()); 599 } else { 600 submitRunContinuation(); 601 } 602 return; 603 } 604 605 // blocking on monitorenter 606 if (s == BLOCKING) { 607 setState(BLOCKED); 608 609 // may have been unblocked while blocking 610 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) { 611 // lazy submit if local queue is empty 612 lazySubmitRunContinuation(); 613 } 614 return; 615 } 616 617 // Object.wait 618 if (s == WAITING || s == TIMED_WAITING) { 619 int newState; 620 boolean blocked; 621 boolean interruptible = interruptibleWait; 622 if (s == WAITING) { 623 setState(newState = WAIT); 624 // may have been notified while in transition 625 blocked = notified && compareAndSetState(WAIT, BLOCKED); 626 } else { 627 // For timed-wait, a timeout task is scheduled to execute. The timeout 628 // task will change the thread state to UNBLOCKED and submit the thread 629 // to the scheduler. A sequence number is used to ensure that the timeout 630 // task only unblocks the thread for this timed-wait. We synchronize with 631 // the timeout task to coordinate access to the sequence number and to 632 // ensure the timeout task doesn't execute until the thread has got to 633 // the TIMED_WAIT state. 634 long timeout = this.timeout; 635 assert timeout > 0; 636 synchronized (timedWaitLock()) { 637 byte seqNo = ++timedWaitSeqNo; 638 timeoutTask = schedule(() -> waitTimeoutExpired(seqNo), timeout, MILLISECONDS); 639 setState(newState = TIMED_WAIT); 640 // May have been notified while in transition. This must be done while 641 // holding the monitor to avoid changing the state of a new timed wait call. 642 blocked = notified && compareAndSetState(TIMED_WAIT, BLOCKED); 643 } 644 } 645 646 if (blocked) { 647 // may have been unblocked already 648 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) { 649 lazySubmitRunContinuation(); 650 } 651 } else { 652 // may have been interrupted while in transition to wait state 653 if (interruptible && interrupted && compareAndSetState(newState, UNBLOCKED)) { 654 lazySubmitRunContinuation(); 655 } 656 } 657 return; 658 } 659 660 assert false; 661 } 662 663 /** 664 * Invoked after the continuation completes. 665 */ 666 private void afterDone() { 667 afterDone(true); 668 } 669 670 /** 671 * Invoked after the continuation completes (or start failed). Sets the thread 672 * state to TERMINATED and notifies anyone waiting for the thread to terminate. 673 * 674 * @param notifyContainer true if its container should be notified 675 */ 676 private void afterDone(boolean notifyContainer) { 677 assert carrierThread == null; 678 setState(TERMINATED); 679 680 // notify anyone waiting for this virtual thread to terminate 681 CountDownLatch termination = this.termination; 682 if (termination != null) { 683 assert termination.getCount() == 1; 684 termination.countDown(); 685 } 686 687 // notify container 688 if (notifyContainer) { 689 threadContainer().remove(this); 690 } 691 692 // clear references to thread locals 693 clearReferences(); 694 } 695 696 /** 697 * Schedules this {@code VirtualThread} to execute. 698 * 699 * @throws IllegalStateException if the container is shutdown or closed 700 * @throws IllegalThreadStateException if the thread has already been started 701 * @throws RejectedExecutionException if the scheduler cannot accept a task 702 */ 703 @Override 704 void start(ThreadContainer container) { 705 if (!compareAndSetState(NEW, STARTED)) { 706 throw new IllegalThreadStateException("Already started"); 707 } 708 709 // bind thread to container 710 assert threadContainer() == null; 711 setThreadContainer(container); 712 713 // start thread 714 boolean addedToContainer = false; 715 boolean started = false; 716 try { 717 container.add(this); // may throw 718 addedToContainer = true; 719 720 // scoped values may be inherited 721 inheritScopedValueBindings(container); 722 723 // submit task to run thread, using externalSubmit if possible 724 externalSubmitRunContinuationOrThrow(); 725 started = true; 726 } finally { 727 if (!started) { 728 afterDone(addedToContainer); 729 } 730 } 731 } 732 733 @Override 734 public void start() { 735 start(ThreadContainers.root()); 736 } 737 738 @Override 739 public void run() { 740 // do nothing 741 } 742 743 /** 744 * Parks until unparked or interrupted. If already unparked then the parking 745 * permit is consumed and this method completes immediately (meaning it doesn't 746 * yield). It also completes immediately if the interrupted status is set. 747 */ 748 @Override 749 void park() { 750 assert Thread.currentThread() == this; 751 752 // complete immediately if parking permit available or interrupted 753 if (getAndSetParkPermit(false) || interrupted) 754 return; 755 756 // park the thread 757 boolean yielded = false; 758 setState(PARKING); 759 try { 760 yielded = yieldContinuation(); 761 } catch (OutOfMemoryError e) { 762 // park on carrier 763 } finally { 764 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 765 if (!yielded) { 766 assert state() == PARKING; 767 setState(RUNNING); 768 } 769 } 770 771 // park on the carrier thread when pinned 772 if (!yielded) { 773 parkOnCarrierThread(false, 0); 774 } 775 } 776 777 /** 778 * Parks up to the given waiting time or until unparked or interrupted. 779 * If already unparked then the parking permit is consumed and this method 780 * completes immediately (meaning it doesn't yield). It also completes immediately 781 * if the interrupted status is set or the waiting time is {@code <= 0}. 782 * 783 * @param nanos the maximum number of nanoseconds to wait. 784 */ 785 @Override 786 void parkNanos(long nanos) { 787 assert Thread.currentThread() == this; 788 789 // complete immediately if parking permit available or interrupted 790 if (getAndSetParkPermit(false) || interrupted) 791 return; 792 793 // park the thread for the waiting time 794 if (nanos > 0) { 795 long startTime = System.nanoTime(); 796 797 // park the thread, afterYield will schedule the thread to unpark 798 boolean yielded = false; 799 timeout = nanos; 800 setState(TIMED_PARKING); 801 try { 802 yielded = yieldContinuation(); 803 } catch (OutOfMemoryError e) { 804 // park on carrier 805 } finally { 806 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 807 if (!yielded) { 808 assert state() == TIMED_PARKING; 809 setState(RUNNING); 810 } 811 } 812 813 // park on carrier thread for remaining time when pinned (or OOME) 814 if (!yielded) { 815 long remainingNanos = nanos - (System.nanoTime() - startTime); 816 parkOnCarrierThread(true, remainingNanos); 817 } 818 } 819 } 820 821 /** 822 * Parks the current carrier thread up to the given waiting time or until 823 * unparked or interrupted. If the virtual thread is interrupted then the 824 * interrupted status will be propagated to the carrier thread. 825 * @param timed true for a timed park, false for untimed 826 * @param nanos the waiting time in nanoseconds 827 */ 828 private void parkOnCarrierThread(boolean timed, long nanos) { 829 assert state() == RUNNING; 830 831 setState(timed ? TIMED_PINNED : PINNED); 832 try { 833 if (!parkPermit) { 834 if (!timed) { 835 U.park(false, 0); 836 } else if (nanos > 0) { 837 U.park(false, nanos); 838 } 839 } 840 } finally { 841 setState(RUNNING); 842 } 843 844 // consume parking permit 845 setParkPermit(false); 846 847 // JFR jdk.VirtualThreadPinned event 848 postPinnedEvent("LockSupport.park"); 849 } 850 851 /** 852 * Call into VM when pinned to record a JFR jdk.VirtualThreadPinned event. 853 * Recording the event in the VM avoids having JFR event recorded in Java 854 * with the same name, but different ID, to events recorded by the VM. 855 */ 856 @Hidden 857 private static native void postPinnedEvent(String op); 858 859 /** 860 * Re-enables this virtual thread for scheduling. If this virtual thread is parked 861 * then its task is scheduled to continue, otherwise its next call to {@code park} or 862 * {@linkplain #parkNanos(long) parkNanos} is guaranteed not to block. 863 * @param lazySubmit to use lazySubmit if possible 864 * @throws RejectedExecutionException if the scheduler cannot accept a task 865 */ 866 private void unpark(boolean lazySubmit) { 867 if (!getAndSetParkPermit(true) && currentThread() != this) { 868 int s = state(); 869 870 // unparked while parked 871 if ((s == PARKED || s == TIMED_PARKED) && compareAndSetState(s, UNPARKED)) { 872 if (lazySubmit) { 873 lazySubmitRunContinuation(); 874 } else { 875 submitRunContinuation(); 876 } 877 return; 878 } 879 880 // unparked while parked when pinned 881 if (s == PINNED || s == TIMED_PINNED) { 882 // unpark carrier thread when pinned 883 disableSuspendAndPreempt(); 884 try { 885 synchronized (carrierThreadAccessLock()) { 886 Thread carrier = carrierThread; 887 if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) { 888 U.unpark(carrier); 889 } 890 } 891 } finally { 892 enableSuspendAndPreempt(); 893 } 894 return; 895 } 896 } 897 } 898 899 @Override 900 void unpark() { 901 unpark(false); 902 } 903 904 /** 905 * Invoked by unblocker thread to unblock this virtual thread. 906 */ 907 private void unblock() { 908 assert !Thread.currentThread().isVirtual(); 909 blockPermit = true; 910 if (state() == BLOCKED && compareAndSetState(BLOCKED, UNBLOCKED)) { 911 submitRunContinuation(); 912 } 913 } 914 915 /** 916 * Invoked by FJP worker thread or STPE thread when park timeout expires. 917 */ 918 private void parkTimeoutExpired() { 919 assert !VirtualThread.currentThread().isVirtual(); 920 unpark(true); 921 } 922 923 /** 924 * Invoked by FJP worker thread or STPE thread when wait timeout expires. 925 * If the virtual thread is in timed-wait then this method will unblock the thread 926 * and submit its task so that it continues and attempts to reenter the monitor. 927 * This method does nothing if the thread has been woken by notify or interrupt. 928 */ 929 private void waitTimeoutExpired(byte seqNo) { 930 assert !Thread.currentThread().isVirtual(); 931 932 synchronized (timedWaitLock()) { 933 if (seqNo != timedWaitSeqNo) { 934 // this timeout task is for a past timed-wait 935 return; 936 } 937 if (!compareAndSetState(TIMED_WAIT, UNBLOCKED)) { 938 // already notified (or interrupted) 939 return; 940 } 941 } 942 943 lazySubmitRunContinuation(); 944 } 945 946 /** 947 * Attempts to yield the current virtual thread (Thread.yield). 948 */ 949 void tryYield() { 950 assert Thread.currentThread() == this; 951 setState(YIELDING); 952 boolean yielded = false; 953 try { 954 yielded = yieldContinuation(); // may throw 955 } finally { 956 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 957 if (!yielded) { 958 assert state() == YIELDING; 959 setState(RUNNING); 960 } 961 } 962 } 963 964 /** 965 * Sleep the current thread for the given sleep time (in nanoseconds). If 966 * nanos is 0 then the thread will attempt to yield. 967 * 968 * @implNote This implementation parks the thread for the given sleeping time 969 * and will therefore be observed in PARKED state during the sleep. Parking 970 * will consume the parking permit so this method makes available the parking 971 * permit after the sleep. This may be observed as a spurious, but benign, 972 * wakeup when the thread subsequently attempts to park. 973 * 974 * @param nanos the maximum number of nanoseconds to sleep 975 * @throws InterruptedException if interrupted while sleeping 976 */ 977 void sleepNanos(long nanos) throws InterruptedException { 978 assert Thread.currentThread() == this && nanos >= 0; 979 if (getAndClearInterrupt()) 980 throw new InterruptedException(); 981 if (nanos == 0) { 982 tryYield(); 983 } else { 984 // park for the sleep time 985 try { 986 long remainingNanos = nanos; 987 long startNanos = System.nanoTime(); 988 while (remainingNanos > 0) { 989 parkNanos(remainingNanos); 990 if (getAndClearInterrupt()) { 991 throw new InterruptedException(); 992 } 993 remainingNanos = nanos - (System.nanoTime() - startNanos); 994 } 995 } finally { 996 // may have been unparked while sleeping 997 setParkPermit(true); 998 } 999 } 1000 } 1001 1002 /** 1003 * Waits up to {@code nanos} nanoseconds for this virtual thread to terminate. 1004 * A timeout of {@code 0} means to wait forever. 1005 * 1006 * @throws InterruptedException if interrupted while waiting 1007 * @return true if the thread has terminated 1008 */ 1009 boolean joinNanos(long nanos) throws InterruptedException { 1010 if (state() == TERMINATED) 1011 return true; 1012 1013 // ensure termination object exists, then re-check state 1014 CountDownLatch termination = getTermination(); 1015 if (state() == TERMINATED) 1016 return true; 1017 1018 // wait for virtual thread to terminate 1019 if (nanos == 0) { 1020 termination.await(); 1021 } else { 1022 boolean terminated = termination.await(nanos, NANOSECONDS); 1023 if (!terminated) { 1024 // waiting time elapsed 1025 return false; 1026 } 1027 } 1028 assert state() == TERMINATED; 1029 return true; 1030 } 1031 1032 @Override 1033 void blockedOn(Interruptible b) { 1034 disableSuspendAndPreempt(); 1035 try { 1036 super.blockedOn(b); 1037 } finally { 1038 enableSuspendAndPreempt(); 1039 } 1040 } 1041 1042 @Override 1043 public void interrupt() { 1044 if (Thread.currentThread() != this) { 1045 // if current thread is a virtual thread then prevent it from being 1046 // suspended or unmounted when entering or holding interruptLock 1047 Interruptible blocker; 1048 disableSuspendAndPreempt(); 1049 try { 1050 synchronized (interruptLock) { 1051 interrupted = true; 1052 blocker = nioBlocker(); 1053 if (blocker != null) { 1054 blocker.interrupt(this); 1055 } 1056 1057 // interrupt carrier thread if mounted 1058 Thread carrier = carrierThread; 1059 if (carrier != null) carrier.setInterrupt(); 1060 } 1061 } finally { 1062 enableSuspendAndPreempt(); 1063 } 1064 1065 // notify blocker after releasing interruptLock 1066 if (blocker != null) { 1067 blocker.postInterrupt(); 1068 } 1069 1070 // make available parking permit, unpark thread if parked 1071 unpark(); 1072 1073 // if thread is waiting in Object.wait then schedule to try to reenter 1074 int s = state(); 1075 if ((s == WAIT || s == TIMED_WAIT) && compareAndSetState(s, UNBLOCKED)) { 1076 submitRunContinuation(); 1077 } 1078 1079 } else { 1080 interrupted = true; 1081 carrierThread.setInterrupt(); 1082 setParkPermit(true); 1083 } 1084 } 1085 1086 @Override 1087 public boolean isInterrupted() { 1088 return interrupted; 1089 } 1090 1091 @Override 1092 boolean getAndClearInterrupt() { 1093 assert Thread.currentThread() == this; 1094 boolean oldValue = interrupted; 1095 if (oldValue) { 1096 disableSuspendAndPreempt(); 1097 try { 1098 synchronized (interruptLock) { 1099 interrupted = false; 1100 carrierThread.clearInterrupt(); 1101 } 1102 } finally { 1103 enableSuspendAndPreempt(); 1104 } 1105 } 1106 return oldValue; 1107 } 1108 1109 @Override 1110 Thread.State threadState() { 1111 switch (state()) { 1112 case NEW: 1113 return Thread.State.NEW; 1114 case STARTED: 1115 // return NEW if thread container not yet set 1116 if (threadContainer() == null) { 1117 return Thread.State.NEW; 1118 } else { 1119 return Thread.State.RUNNABLE; 1120 } 1121 case UNPARKED: 1122 case UNBLOCKED: 1123 case YIELDED: 1124 // runnable, not mounted 1125 return Thread.State.RUNNABLE; 1126 case RUNNING: 1127 // if mounted then return state of carrier thread 1128 if (Thread.currentThread() != this) { 1129 disableSuspendAndPreempt(); 1130 try { 1131 synchronized (carrierThreadAccessLock()) { 1132 Thread carrierThread = this.carrierThread; 1133 if (carrierThread != null) { 1134 return carrierThread.threadState(); 1135 } 1136 } 1137 } finally { 1138 enableSuspendAndPreempt(); 1139 } 1140 } 1141 // runnable, mounted 1142 return Thread.State.RUNNABLE; 1143 case PARKING: 1144 case TIMED_PARKING: 1145 case WAITING: 1146 case TIMED_WAITING: 1147 case YIELDING: 1148 // runnable, in transition 1149 return Thread.State.RUNNABLE; 1150 case PARKED: 1151 case PINNED: 1152 case WAIT: 1153 return Thread.State.WAITING; 1154 case TIMED_PARKED: 1155 case TIMED_PINNED: 1156 case TIMED_WAIT: 1157 return Thread.State.TIMED_WAITING; 1158 case BLOCKING: 1159 case BLOCKED: 1160 return Thread.State.BLOCKED; 1161 case TERMINATED: 1162 return Thread.State.TERMINATED; 1163 default: 1164 throw new InternalError(); 1165 } 1166 } 1167 1168 @Override 1169 boolean alive() { 1170 int s = state; 1171 return (s != NEW && s != TERMINATED); 1172 } 1173 1174 @Override 1175 boolean isTerminated() { 1176 return (state == TERMINATED); 1177 } 1178 1179 @Override 1180 public String toString() { 1181 StringBuilder sb = new StringBuilder("VirtualThread[#"); 1182 sb.append(threadId()); 1183 String name = getName(); 1184 if (!name.isEmpty()) { 1185 sb.append(","); 1186 sb.append(name); 1187 } 1188 sb.append("]/"); 1189 1190 // add the carrier state and thread name when mounted 1191 boolean mounted; 1192 if (Thread.currentThread() == this) { 1193 mounted = appendCarrierInfo(sb); 1194 } else { 1195 disableSuspendAndPreempt(); 1196 try { 1197 synchronized (carrierThreadAccessLock()) { 1198 mounted = appendCarrierInfo(sb); 1199 } 1200 } finally { 1201 enableSuspendAndPreempt(); 1202 } 1203 } 1204 1205 // add virtual thread state when not mounted 1206 if (!mounted) { 1207 String stateAsString = threadState().toString(); 1208 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1209 } 1210 1211 return sb.toString(); 1212 } 1213 1214 /** 1215 * Appends the carrier state and thread name to the string buffer if mounted. 1216 * @return true if mounted, false if not mounted 1217 */ 1218 private boolean appendCarrierInfo(StringBuilder sb) { 1219 assert Thread.currentThread() == this || Thread.holdsLock(carrierThreadAccessLock()); 1220 Thread carrier = carrierThread; 1221 if (carrier != null) { 1222 String stateAsString = carrier.threadState().toString(); 1223 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1224 sb.append('@'); 1225 sb.append(carrier.getName()); 1226 return true; 1227 } else { 1228 return false; 1229 } 1230 } 1231 1232 @Override 1233 public int hashCode() { 1234 return (int) threadId(); 1235 } 1236 1237 @Override 1238 public boolean equals(Object obj) { 1239 return obj == this; 1240 } 1241 1242 /** 1243 * Returns the termination object, creating it if needed. 1244 */ 1245 private CountDownLatch getTermination() { 1246 CountDownLatch termination = this.termination; 1247 if (termination == null) { 1248 termination = new CountDownLatch(1); 1249 if (!U.compareAndSetReference(this, TERMINATION, null, termination)) { 1250 termination = this.termination; 1251 } 1252 } 1253 return termination; 1254 } 1255 1256 /** 1257 * Returns the lock object to synchronize on when accessing carrierThread. 1258 * The lock prevents carrierThread from being reset to null during unmount. 1259 */ 1260 private Object carrierThreadAccessLock() { 1261 // return interruptLock as unmount has to coordinate with interrupt 1262 return interruptLock; 1263 } 1264 1265 /** 1266 * Returns a lock object for coordinating timed-wait setup and timeout handling. 1267 */ 1268 private Object timedWaitLock() { 1269 // use this object for now to avoid the overhead of introducing another lock 1270 return runContinuation; 1271 } 1272 1273 /** 1274 * Disallow the current thread be suspended or preempted. 1275 */ 1276 private void disableSuspendAndPreempt() { 1277 notifyJvmtiDisableSuspend(true); 1278 Continuation.pin(); 1279 } 1280 1281 /** 1282 * Allow the current thread be suspended or preempted. 1283 */ 1284 private void enableSuspendAndPreempt() { 1285 Continuation.unpin(); 1286 notifyJvmtiDisableSuspend(false); 1287 } 1288 1289 // -- wrappers for get/set of state, parking permit, and carrier thread -- 1290 1291 private int state() { 1292 return state; // volatile read 1293 } 1294 1295 private void setState(int newValue) { 1296 state = newValue; // volatile write 1297 } 1298 1299 private boolean compareAndSetState(int expectedValue, int newValue) { 1300 return U.compareAndSetInt(this, STATE, expectedValue, newValue); 1301 } 1302 1303 private boolean compareAndSetOnWaitingList(boolean expectedValue, boolean newValue) { 1304 return U.compareAndSetBoolean(this, ON_WAITING_LIST, expectedValue, newValue); 1305 } 1306 1307 private void setParkPermit(boolean newValue) { 1308 if (parkPermit != newValue) { 1309 parkPermit = newValue; 1310 } 1311 } 1312 1313 private boolean getAndSetParkPermit(boolean newValue) { 1314 if (parkPermit != newValue) { 1315 return U.getAndSetBoolean(this, PARK_PERMIT, newValue); 1316 } else { 1317 return newValue; 1318 } 1319 } 1320 1321 private void setCarrierThread(Thread carrier) { 1322 // U.putReferenceRelease(this, CARRIER_THREAD, carrier); 1323 this.carrierThread = carrier; 1324 } 1325 1326 // The following four methods notify the VM when a "transition" starts and ends. 1327 // A "mount transition" embodies the steps to transfer control from a platform 1328 // thread to a virtual thread, changing the thread identity, and starting or 1329 // resuming the virtual thread's continuation on the carrier. 1330 // An "unmount transition" embodies the steps to transfer control from a virtual 1331 // thread to its carrier, suspending the virtual thread's continuation, and 1332 // restoring the thread identity to the platform thread. 1333 // The notifications to the VM are necessary in order to coordinate with functions 1334 // (JVMTI mostly) that disable transitions for one or all virtual threads. Starting 1335 // a transition may block if transitions are disabled. Ending a transition may 1336 // notify a thread that is waiting to disable transitions. The notifications are 1337 // also used to post JVMTI events for virtual thread start and end. 1338 1339 @IntrinsicCandidate 1340 @JvmtiMountTransition 1341 private native void endFirstTransition(); 1342 1343 @IntrinsicCandidate 1344 @JvmtiMountTransition 1345 private native void startFinalTransition(); 1346 1347 @IntrinsicCandidate 1348 @JvmtiMountTransition 1349 private native void startTransition(boolean mount); 1350 1351 @IntrinsicCandidate 1352 @JvmtiMountTransition 1353 private native void endTransition(boolean mount); 1354 1355 @IntrinsicCandidate 1356 private static native void notifyJvmtiDisableSuspend(boolean enter); 1357 1358 private static native void registerNatives(); 1359 static { 1360 registerNatives(); 1361 1362 // ensure VTHREAD_GROUP is created, may be accessed by JVMTI 1363 var group = Thread.virtualThreadGroup(); 1364 } 1365 1366 /** 1367 * Creates the default ForkJoinPool scheduler. 1368 */ 1369 private static ForkJoinPool createDefaultScheduler() { 1370 ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool); 1371 int parallelism, maxPoolSize, minRunnable; 1372 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism"); 1373 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize"); 1374 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable"); 1375 if (parallelismValue != null) { 1376 parallelism = Integer.parseInt(parallelismValue); 1377 } else { 1378 parallelism = Runtime.getRuntime().availableProcessors(); 1379 } 1380 if (maxPoolSizeValue != null) { 1381 maxPoolSize = Integer.parseInt(maxPoolSizeValue); 1382 parallelism = Integer.min(parallelism, maxPoolSize); 1383 } else { 1384 maxPoolSize = Integer.max(parallelism, 256); 1385 } 1386 if (minRunnableValue != null) { 1387 minRunnable = Integer.parseInt(minRunnableValue); 1388 } else { 1389 minRunnable = Integer.max(parallelism / 2, 1); 1390 } 1391 Thread.UncaughtExceptionHandler handler = (t, e) -> { }; 1392 boolean asyncMode = true; // FIFO 1393 return new ForkJoinPool(parallelism, factory, handler, asyncMode, 1394 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS); 1395 } 1396 1397 /** 1398 * Schedule a runnable task to run after a delay. 1399 */ 1400 private Future<?> schedule(Runnable command, long delay, TimeUnit unit) { 1401 if (scheduler instanceof ForkJoinPool pool) { 1402 return pool.schedule(command, delay, unit); 1403 } else { 1404 return DelayedTaskSchedulers.schedule(command, delay, unit); 1405 } 1406 } 1407 1408 /** 1409 * Supports scheduling a runnable task to run after a delay. It uses a number 1410 * of ScheduledThreadPoolExecutor instances to reduce contention on the delayed 1411 * work queue used. This class is used when using a custom scheduler. 1412 */ 1413 private static class DelayedTaskSchedulers { 1414 private static final ScheduledExecutorService[] INSTANCE = createDelayedTaskSchedulers(); 1415 1416 static Future<?> schedule(Runnable command, long delay, TimeUnit unit) { 1417 long tid = Thread.currentThread().threadId(); 1418 int index = (int) tid & (INSTANCE.length - 1); 1419 return INSTANCE[index].schedule(command, delay, unit); 1420 } 1421 1422 private static ScheduledExecutorService[] createDelayedTaskSchedulers() { 1423 String propName = "jdk.virtualThreadScheduler.timerQueues"; 1424 String propValue = System.getProperty(propName); 1425 int queueCount; 1426 if (propValue != null) { 1427 queueCount = Integer.parseInt(propValue); 1428 if (queueCount != Integer.highestOneBit(queueCount)) { 1429 throw new RuntimeException("Value of " + propName + " must be power of 2"); 1430 } 1431 } else { 1432 int ncpus = Runtime.getRuntime().availableProcessors(); 1433 queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1); 1434 } 1435 var schedulers = new ScheduledExecutorService[queueCount]; 1436 for (int i = 0; i < queueCount; i++) { 1437 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) 1438 Executors.newScheduledThreadPool(1, task -> { 1439 Thread t = InnocuousThread.newThread("VirtualThread-unparker", task); 1440 t.setDaemon(true); 1441 return t; 1442 }); 1443 stpe.setRemoveOnCancelPolicy(true); 1444 schedulers[i] = stpe; 1445 } 1446 return schedulers; 1447 } 1448 } 1449 1450 /** 1451 * Schedule virtual threads that are ready to be scheduled after they blocked on 1452 * monitor enter. 1453 */ 1454 private static void unblockVirtualThreads() { 1455 while (true) { 1456 VirtualThread vthread = takeVirtualThreadListToUnblock(); 1457 while (vthread != null) { 1458 assert vthread.onWaitingList; 1459 VirtualThread nextThread = vthread.next; 1460 1461 // remove from list and unblock 1462 vthread.next = null; 1463 boolean changed = vthread.compareAndSetOnWaitingList(true, false); 1464 assert changed; 1465 vthread.unblock(); 1466 1467 vthread = nextThread; 1468 } 1469 } 1470 } 1471 1472 /** 1473 * Retrieves the list of virtual threads that are waiting to be unblocked, waiting 1474 * if necessary until a list of one or more threads becomes available. 1475 */ 1476 private static native VirtualThread takeVirtualThreadListToUnblock(); 1477 1478 static { 1479 var unblocker = InnocuousThread.newThread("VirtualThread-unblocker", 1480 VirtualThread::unblockVirtualThreads); 1481 unblocker.setDaemon(true); 1482 unblocker.start(); 1483 } 1484 } --- EOF ---