1 /* 2 * Copyright (c) 2018, 2026, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.lang; 26 27 import java.lang.invoke.MethodHandles; 28 import java.lang.invoke.VarHandle; 29 import java.lang.reflect.Constructor; 30 import java.util.Locale; 31 import java.util.Objects; 32 import java.util.concurrent.Executors; 33 import java.util.concurrent.ForkJoinPool; 34 import java.util.concurrent.ForkJoinTask; 35 import java.util.concurrent.Future; 36 import java.util.concurrent.LinkedTransferQueue; 37 import java.util.concurrent.RejectedExecutionException; 38 import java.util.concurrent.ScheduledExecutorService; 39 import java.util.concurrent.ScheduledFuture; 40 import java.util.concurrent.ScheduledThreadPoolExecutor; 41 import java.util.concurrent.ThreadFactory; 42 import java.util.concurrent.ThreadPoolExecutor; 43 import java.util.concurrent.TimeUnit; 44 import jdk.internal.event.VirtualThreadEndEvent; 45 import jdk.internal.event.VirtualThreadStartEvent; 46 import jdk.internal.event.VirtualThreadSubmitFailedEvent; 47 import jdk.internal.invoke.MhUtil; 48 import jdk.internal.misc.CarrierThread; 49 import jdk.internal.misc.InnocuousThread; 50 import jdk.internal.misc.Unsafe; 51 import jdk.internal.vm.Continuation; 52 import jdk.internal.vm.ContinuationScope; 53 import jdk.internal.vm.StackableScope; 54 import jdk.internal.vm.ThreadContainer; 55 import jdk.internal.vm.ThreadContainers; 56 import jdk.internal.vm.annotation.ChangesCurrentThread; 57 import jdk.internal.vm.annotation.Hidden; 58 import jdk.internal.vm.annotation.IntrinsicCandidate; 59 import jdk.internal.vm.annotation.JvmtiHideEvents; 60 import jdk.internal.vm.annotation.JvmtiMountTransition; 61 import jdk.internal.vm.annotation.ReservedStackAccess; 62 import sun.nio.ch.Interruptible; 63 import static java.util.concurrent.TimeUnit.*; 64 65 /** 66 * A thread that is scheduled by the Java virtual machine rather than the operating system. 67 */ 68 final class VirtualThread extends BaseVirtualThread { 69 private static final Unsafe U = Unsafe.getUnsafe(); 70 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads"); 71 72 private static final VirtualThreadScheduler BUILTIN_SCHEDULER; 73 private static final VirtualThreadScheduler DEFAULT_SCHEDULER; 74 private static final VirtualThreadScheduler EXTERNAL_VIEW; 75 private static final boolean USE_STPE; 76 static { 77 // experimental 78 String propValue = System.getProperty("jdk.virtualThreadScheduler.implClass"); 79 if (propValue != null) { 80 VirtualThreadScheduler builtinScheduler = createBuiltinScheduler(true); 81 VirtualThreadScheduler externalView = createExternalView(builtinScheduler); 82 VirtualThreadScheduler defaultScheduler = loadCustomScheduler(externalView, propValue); 83 BUILTIN_SCHEDULER = builtinScheduler; 84 DEFAULT_SCHEDULER = defaultScheduler; 85 EXTERNAL_VIEW = externalView; 86 } else { 87 var builtinScheduler = createBuiltinScheduler(false); 88 BUILTIN_SCHEDULER = builtinScheduler; 89 DEFAULT_SCHEDULER = builtinScheduler; 90 EXTERNAL_VIEW = createExternalView(builtinScheduler); 91 } 92 USE_STPE = Boolean.getBoolean("jdk.virtualThreadScheduler.useSTPE"); 93 } 94 95 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state"); 96 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit"); 97 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread"); 98 private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList"); 99 100 // scheduler and continuation 101 private final VirtualThreadScheduler scheduler; 102 private final Continuation cont; 103 private final VThreadTask runContinuation; 104 105 // virtual thread state, accessed by VM 106 private volatile int state; 107 108 /* 109 * Virtual thread state transitions: 110 * 111 * NEW -> STARTED // Thread.start, schedule to run 112 * STARTED -> TERMINATED // failed to start 113 * STARTED -> RUNNING // first run 114 * RUNNING -> TERMINATED // done 115 * 116 * RUNNING -> PARKING // Thread parking with LockSupport.park 117 * PARKING -> PARKED // cont.yield successful, parked indefinitely 118 * PARKED -> UNPARKED // unparked, may be scheduled to continue 119 * UNPARKED -> RUNNING // continue execution after park 120 * 121 * PARKING -> RUNNING // cont.yield failed, need to park on carrier 122 * RUNNING -> PINNED // park on carrier 123 * PINNED -> RUNNING // unparked, continue execution on same carrier 124 * 125 * RUNNING -> TIMED_PARKING // Thread parking with LockSupport.parkNanos 126 * TIMED_PARKING -> TIMED_PARKED // cont.yield successful, timed-parked 127 * TIMED_PARKED -> UNPARKED // unparked, may be scheduled to continue 128 * 129 * TIMED_PARKING -> RUNNING // cont.yield failed, need to park on carrier 130 * RUNNING -> TIMED_PINNED // park on carrier 131 * TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier 132 * 133 * RUNNING -> BLOCKING // blocking on monitor enter 134 * BLOCKING -> BLOCKED // blocked on monitor enter 135 * BLOCKED -> UNBLOCKED // unblocked, may be scheduled to continue 136 * UNBLOCKED -> RUNNING // continue execution after blocked on monitor enter 137 * 138 * RUNNING -> WAITING // transitional state during wait on monitor 139 * WAITING -> WAIT // waiting on monitor 140 * WAIT -> BLOCKED // notified, waiting to be unblocked by monitor owner 141 * WAIT -> UNBLOCKED // interrupted 142 * 143 * RUNNING -> TIMED_WAITING // transition state during timed-waiting on monitor 144 * TIMED_WAITING -> TIMED_WAIT // timed-waiting on monitor 145 * TIMED_WAIT -> BLOCKED // notified, waiting to be unblocked by monitor owner 146 * TIMED_WAIT -> UNBLOCKED // timed-out/interrupted 147 * 148 * RUNNING -> YIELDING // Thread.yield 149 * YIELDING -> YIELDED // cont.yield successful, may be scheduled to continue 150 * YIELDING -> RUNNING // cont.yield failed 151 * YIELDED -> RUNNING // continue execution after Thread.yield 152 */ 153 private static final int NEW = 0; 154 private static final int STARTED = 1; 155 private static final int RUNNING = 2; // runnable-mounted 156 157 // untimed and timed parking 158 private static final int PARKING = 3; 159 private static final int PARKED = 4; // unmounted 160 private static final int PINNED = 5; // mounted 161 private static final int TIMED_PARKING = 6; 162 private static final int TIMED_PARKED = 7; // unmounted 163 private static final int TIMED_PINNED = 8; // mounted 164 private static final int UNPARKED = 9; // unmounted but runnable 165 166 // Thread.yield 167 private static final int YIELDING = 10; 168 private static final int YIELDED = 11; // unmounted but runnable 169 170 // monitor enter 171 private static final int BLOCKING = 12; 172 private static final int BLOCKED = 13; // unmounted 173 private static final int UNBLOCKED = 14; // unmounted but runnable 174 175 // monitor wait/timed-wait 176 private static final int WAITING = 15; 177 private static final int WAIT = 16; // waiting in Object.wait 178 private static final int TIMED_WAITING = 17; 179 private static final int TIMED_WAIT = 18; // waiting in timed-Object.wait 180 181 private static final int TERMINATED = 99; // final state 182 183 // parking permit made available by LockSupport.unpark 184 private volatile boolean parkPermit; 185 186 // blocking permit made available by unblocker thread when another thread exits monitor 187 private volatile boolean blockPermit; 188 189 // true when on the list of virtual threads waiting to be unblocked 190 private volatile boolean onWaitingList; 191 192 // next virtual thread on the list of virtual threads waiting to be unblocked 193 private volatile VirtualThread next; 194 195 // notified by Object.notify/notifyAll while waiting in Object.wait 196 private volatile boolean notified; 197 198 // true when waiting in Object.wait, false for VM internal uninterruptible Object.wait 199 private volatile boolean interruptibleWait; 200 201 // timed-wait support 202 private byte timedWaitSeqNo; 203 204 // timeout for timed-park and timed-wait, only accessed on current/carrier thread 205 private long timeout; 206 207 // timer task for timed-park and timed-wait, only accessed on current/carrier thread 208 private Future<?> timeoutTask; 209 210 // carrier thread when mounted, accessed by VM 211 private volatile Thread carrierThread; 212 213 // true to notifyAll after this virtual thread terminates 214 private volatile boolean notifyAllAfterTerminate; 215 216 /** 217 * Return the built-in scheduler. 218 * @param trusted true if caller is trusted, false if not trusted 219 */ 220 static VirtualThreadScheduler builtinScheduler(boolean trusted) { 221 return trusted ? BUILTIN_SCHEDULER : EXTERNAL_VIEW; 222 } 223 224 /** 225 * Returns the default scheduler, usually the same as the built-in scheduler. 226 */ 227 static VirtualThreadScheduler defaultScheduler() { 228 return DEFAULT_SCHEDULER; 229 } 230 231 /** 232 * Returns the continuation scope used for virtual threads. 233 */ 234 static ContinuationScope continuationScope() { 235 return VTHREAD_SCOPE; 236 } 237 238 /** 239 * Returns the task to start/continue this virtual thread. 240 */ 241 VirtualThreadTask virtualThreadTask() { 242 return runContinuation; 243 } 244 245 /** 246 * Creates a new {@code VirtualThread} to run the given task with the given scheduler. 247 * 248 * @param scheduler the scheduler or null for default scheduler 249 * @param preferredCarrier the preferred carrier or null 250 * @param name thread name 251 * @param characteristics characteristics 252 * @param task the task to execute 253 */ 254 VirtualThread(VirtualThreadScheduler scheduler, 255 Thread preferredCarrier, 256 String name, 257 int characteristics, 258 Runnable task) { 259 super(name, characteristics, /*bound*/ false); 260 Objects.requireNonNull(task); 261 262 // use default scheduler if not provided 263 if (scheduler == null) { 264 scheduler = DEFAULT_SCHEDULER; 265 } else if (scheduler == EXTERNAL_VIEW) { 266 throw new UnsupportedOperationException(); 267 } 268 this.scheduler = scheduler; 269 this.cont = new VThreadContinuation(this, task); 270 271 if (scheduler == BUILTIN_SCHEDULER) { 272 this.runContinuation = new VThreadTask(this); 273 } else { 274 this.runContinuation = new CustomVThreadTask(this, preferredCarrier); 275 } 276 } 277 278 /** 279 * The task to start/continue a virtual thread. 280 */ 281 static non-sealed class VThreadTask implements VirtualThreadTask { 282 private final VirtualThread vthread; 283 VThreadTask(VirtualThread vthread) { 284 this.vthread = vthread; 285 } 286 @Override 287 public final Thread thread() { 288 return vthread; 289 } 290 @Override 291 public final void run() { 292 vthread.runContinuation();; 293 } 294 @Override 295 public Thread preferredCarrier() { 296 throw new UnsupportedOperationException(); 297 } 298 @Override 299 public Object attach(Object att) { 300 throw new UnsupportedOperationException(); 301 } 302 @Override 303 public Object attachment() { 304 throw new UnsupportedOperationException(); 305 } 306 } 307 308 /** 309 * The task to start/continue a virtual thread when using a custom scheduler. 310 */ 311 static final class CustomVThreadTask extends VThreadTask { 312 private static final VarHandle ATT = 313 MhUtil.findVarHandle(MethodHandles.lookup(), "att", Object.class); 314 private final Thread preferredCarrier; 315 private volatile Object att; 316 CustomVThreadTask(VirtualThread vthread, Thread preferredCarrier) { 317 super(vthread); 318 this.preferredCarrier = preferredCarrier; 319 } 320 @Override 321 public Thread preferredCarrier() { 322 return preferredCarrier; 323 } 324 @Override 325 public Object attach(Object att) { 326 return ATT.getAndSet(this, att); 327 } 328 @Override 329 public Object attachment() { 330 return att; 331 } 332 } 333 334 /** 335 * The continuation that a virtual thread executes. 336 */ 337 private static class VThreadContinuation extends Continuation { 338 VThreadContinuation(VirtualThread vthread, Runnable task) { 339 super(VTHREAD_SCOPE, wrap(vthread, task)); 340 } 341 @Override 342 protected void onPinned(Continuation.Pinned reason) { 343 } 344 private static Runnable wrap(VirtualThread vthread, Runnable task) { 345 return new Runnable() { 346 @Hidden 347 @JvmtiHideEvents 348 public void run() { 349 vthread.endFirstTransition(); 350 try { 351 vthread.run(task); 352 } finally { 353 vthread.startFinalTransition(); 354 } 355 } 356 }; 357 } 358 } 359 360 /** 361 * Runs or continues execution on the current thread. The virtual thread is mounted 362 * on the current thread before the task runs or continues. It unmounts when the 363 * task completes or yields. 364 */ 365 @ChangesCurrentThread // allow mount/unmount to be inlined 366 private void runContinuation() { 367 // the carrier must be a platform thread 368 if (Thread.currentThread().isVirtual()) { 369 throw new WrongThreadException(); 370 } 371 372 // set state to RUNNING 373 int initialState = state(); 374 if (initialState == STARTED || initialState == UNPARKED 375 || initialState == UNBLOCKED || initialState == YIELDED) { 376 // newly started or continue after parking/blocking/Thread.yield 377 if (!compareAndSetState(initialState, RUNNING)) { 378 return; 379 } 380 // consume permit when continuing after parking or blocking. If continue 381 // after a timed-park or timed-wait then the timeout task is cancelled. 382 if (initialState == UNPARKED) { 383 cancelTimeoutTask(); 384 setParkPermit(false); 385 } else if (initialState == UNBLOCKED) { 386 cancelTimeoutTask(); 387 blockPermit = false; 388 } 389 } else { 390 // not runnable 391 return; 392 } 393 394 mount(); 395 try { 396 cont.run(); 397 } finally { 398 unmount(); 399 if (cont.isDone()) { 400 afterDone(); 401 } else { 402 afterYield(); 403 } 404 } 405 } 406 407 /** 408 * Cancel timeout task when continuing after timed-park or timed-wait. 409 * The timeout task may be executing, or may have already completed. 410 */ 411 private void cancelTimeoutTask() { 412 if (timeoutTask != null) { 413 timeoutTask.cancel(false); 414 timeoutTask = null; 415 } 416 } 417 418 /** 419 * Submits the runContinuation task to the scheduler. For the built-in scheduler, 420 * the task will be pushed to the local queue if possible, otherwise it will be 421 * pushed to an external submission queue. 422 * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown 423 * @throws RejectedExecutionException 424 */ 425 private void submitRunContinuation(boolean retryOnOOME) { 426 boolean done = false; 427 while (!done) { 428 try { 429 // Pin the continuation to prevent the virtual thread from unmounting 430 // when submitting a task. For the default scheduler this ensures that 431 // the carrier doesn't change when pushing a task. For other schedulers 432 // it avoids deadlock that could arise due to carriers and virtual 433 // threads contending for a lock. 434 if (currentThread().isVirtual()) { 435 Continuation.pin(); 436 try { 437 scheduler.onContinue(runContinuation); 438 } finally { 439 Continuation.unpin(); 440 } 441 } else { 442 scheduler.onContinue(runContinuation); 443 } 444 done = true; 445 } catch (RejectedExecutionException ree) { 446 submitFailed(ree); 447 throw ree; 448 } catch (OutOfMemoryError e) { 449 if (retryOnOOME) { 450 U.park(false, 100_000_000); // 100ms 451 } else { 452 throw e; 453 } 454 } 455 } 456 } 457 458 /** 459 * Submits the runContinuation task to the scheduler. For the default scheduler, 460 * and calling it on a worker thread, the task will be pushed to the local queue, 461 * otherwise it will be pushed to an external submission queue. 462 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 463 * @throws RejectedExecutionException 464 */ 465 private void submitRunContinuation() { 466 submitRunContinuation(true); 467 } 468 469 /** 470 * Invoked from a carrier thread to lazy submit the runContinuation task to the 471 * carrier's local queue if the queue is empty. If not empty, or invoked by a thread 472 * for a custom scheduler, then it just submits the task to the scheduler. 473 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 474 * @throws RejectedExecutionException 475 * @see ForkJoinPool#lazySubmit(ForkJoinTask) 476 */ 477 private void lazySubmitRunContinuation() { 478 assert !currentThread().isVirtual(); 479 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 480 try { 481 ct.getPool().lazySubmit(ForkJoinTask.adapt(runContinuation)); 482 } catch (RejectedExecutionException ree) { 483 submitFailed(ree); 484 throw ree; 485 } catch (OutOfMemoryError e) { 486 submitRunContinuation(); 487 } 488 } else { 489 submitRunContinuation(); 490 } 491 } 492 493 /** 494 * Invoked from a carrier thread to externally submit the runContinuation task to the 495 * scheduler. If invoked by a thread for a custom scheduler, then it just submits the 496 * task to the scheduler. 497 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 498 * @throws RejectedExecutionException 499 * @see ForkJoinPool#externalSubmit(ForkJoinTask) 500 */ 501 private void externalSubmitRunContinuation() { 502 assert !currentThread().isVirtual(); 503 if (currentThread() instanceof CarrierThread ct) { 504 try { 505 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation)); 506 } catch (RejectedExecutionException ree) { 507 submitFailed(ree); 508 throw ree; 509 } catch (OutOfMemoryError e) { 510 submitRunContinuation(); 511 } 512 } else { 513 submitRunContinuation(); 514 } 515 } 516 517 /** 518 * If enabled, emits a JFR VirtualThreadSubmitFailedEvent. 519 */ 520 private void submitFailed(RejectedExecutionException ree) { 521 var event = new VirtualThreadSubmitFailedEvent(); 522 if (event.isEnabled()) { 523 event.javaThreadId = threadId(); 524 event.exceptionMessage = ree.getMessage(); 525 event.commit(); 526 } 527 } 528 529 /** 530 * Runs a task in the context of this virtual thread. 531 */ 532 private void run(Runnable task) { 533 assert Thread.currentThread() == this && state == RUNNING; 534 535 // emit JFR event if enabled 536 if (VirtualThreadStartEvent.isTurnedOn()) { 537 var event = new VirtualThreadStartEvent(); 538 event.javaThreadId = threadId(); 539 event.commit(); 540 } 541 542 Object bindings = Thread.scopedValueBindings(); 543 try { 544 runWith(bindings, task); 545 } catch (Throwable exc) { 546 dispatchUncaughtException(exc); 547 } finally { 548 // pop any remaining scopes from the stack, this may block 549 StackableScope.popAll(); 550 551 // emit JFR event if enabled 552 if (VirtualThreadEndEvent.isTurnedOn()) { 553 var event = new VirtualThreadEndEvent(); 554 event.javaThreadId = threadId(); 555 event.commit(); 556 } 557 } 558 } 559 560 /** 561 * Mounts this virtual thread onto the current platform thread. On 562 * return, the current thread is the virtual thread. 563 */ 564 @ChangesCurrentThread 565 @ReservedStackAccess 566 private void mount() { 567 startTransition(/*mount*/true); 568 // We assume following volatile accesses provide equivalent 569 // of acquire ordering, otherwise we need U.loadFence() here. 570 571 // sets the carrier thread 572 Thread carrier = Thread.currentCarrierThread(); 573 setCarrierThread(carrier); 574 575 // sync up carrier thread interrupted status if needed 576 if (interrupted) { 577 carrier.setInterrupt(); 578 } else if (carrier.isInterrupted()) { 579 synchronized (interruptLock) { 580 // need to recheck interrupted status 581 if (!interrupted) { 582 carrier.clearInterrupt(); 583 } 584 } 585 } 586 587 // set Thread.currentThread() to return this virtual thread 588 carrier.setCurrentThread(this); 589 } 590 591 /** 592 * Unmounts this virtual thread from the carrier. On return, the 593 * current thread is the current platform thread. 594 */ 595 @ChangesCurrentThread 596 @ReservedStackAccess 597 private void unmount() { 598 assert !Thread.holdsLock(interruptLock); 599 600 // set Thread.currentThread() to return the platform thread 601 Thread carrier = this.carrierThread; 602 carrier.setCurrentThread(carrier); 603 604 // break connection to carrier thread, synchronized with interrupt 605 synchronized (interruptLock) { 606 setCarrierThread(null); 607 } 608 carrier.clearInterrupt(); 609 610 // We assume previous volatile accesses provide equivalent 611 // of release ordering, otherwise we need U.storeFence() here. 612 endTransition(/*mount*/false); 613 } 614 615 /** 616 * Invokes Continuation.yield, notifying JVMTI (if enabled) to hide frames until 617 * the continuation continues. 618 */ 619 @Hidden 620 private boolean yieldContinuation() { 621 startTransition(/*mount*/false); 622 try { 623 return Continuation.yield(VTHREAD_SCOPE); 624 } finally { 625 endTransition(/*mount*/true); 626 } 627 } 628 629 /** 630 * Invoked in the context of the carrier thread after the Continuation yields when 631 * parking, blocking on monitor enter, Object.wait, or Thread.yield. 632 */ 633 private void afterYield() { 634 assert carrierThread == null; 635 636 // re-adjust parallelism if the virtual thread yielded when compensating 637 if (currentThread() instanceof CarrierThread ct) { 638 ct.endBlocking(); 639 } 640 641 int s = state(); 642 643 // LockSupport.park/parkNanos 644 if (s == PARKING || s == TIMED_PARKING) { 645 int newState; 646 if (s == PARKING) { 647 setState(newState = PARKED); 648 } else { 649 // schedule unpark 650 long timeout = this.timeout; 651 assert timeout > 0; 652 timeoutTask = schedule(this::parkTimeoutExpired, timeout, NANOSECONDS); 653 setState(newState = TIMED_PARKED); 654 } 655 656 // may have been unparked while parking 657 if (parkPermit && compareAndSetState(newState, UNPARKED)) { 658 // lazy submit if local queue is empty 659 lazySubmitRunContinuation(); 660 } 661 return; 662 } 663 664 // Thread.yield 665 if (s == YIELDING) { 666 setState(YIELDED); 667 668 // external submit if there are no tasks in the local task queue 669 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 670 externalSubmitRunContinuation(); 671 } else { 672 submitRunContinuation(); 673 } 674 return; 675 } 676 677 // blocking on monitorenter 678 if (s == BLOCKING) { 679 setState(BLOCKED); 680 681 // may have been unblocked while blocking 682 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) { 683 // lazy submit if local queue is empty 684 lazySubmitRunContinuation(); 685 } 686 return; 687 } 688 689 // Object.wait 690 if (s == WAITING || s == TIMED_WAITING) { 691 int newState; 692 boolean blocked; 693 boolean interruptible = interruptibleWait; 694 if (s == WAITING) { 695 setState(newState = WAIT); 696 // may have been notified while in transition 697 blocked = notified && compareAndSetState(WAIT, BLOCKED); 698 } else { 699 // For timed-wait, a timeout task is scheduled to execute. The timeout 700 // task will change the thread state to UNBLOCKED and submit the thread 701 // to the scheduler. A sequence number is used to ensure that the timeout 702 // task only unblocks the thread for this timed-wait. We synchronize with 703 // the timeout task to coordinate access to the sequence number and to 704 // ensure the timeout task doesn't execute until the thread has got to 705 // the TIMED_WAIT state. 706 long timeout = this.timeout; 707 assert timeout > 0; 708 synchronized (timedWaitLock()) { 709 byte seqNo = ++timedWaitSeqNo; 710 timeoutTask = schedule(() -> waitTimeoutExpired(seqNo), timeout, MILLISECONDS); 711 setState(newState = TIMED_WAIT); 712 // May have been notified while in transition. This must be done while 713 // holding the monitor to avoid changing the state of a new timed wait call. 714 blocked = notified && compareAndSetState(TIMED_WAIT, BLOCKED); 715 } 716 } 717 718 if (blocked) { 719 // may have been unblocked already 720 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) { 721 lazySubmitRunContinuation(); 722 } 723 } else { 724 // may have been interrupted while in transition to wait state 725 if (interruptible && interrupted && compareAndSetState(newState, UNBLOCKED)) { 726 lazySubmitRunContinuation(); 727 } 728 } 729 return; 730 } 731 732 assert false; 733 } 734 735 /** 736 * Invoked after the continuation completes. 737 */ 738 private void afterDone() { 739 afterDone(true); 740 } 741 742 /** 743 * Invoked after the continuation completes (or start failed). Sets the thread 744 * state to TERMINATED and notifies anyone waiting for the thread to terminate. 745 * 746 * @param notifyContainer true if its container should be notified 747 */ 748 private void afterDone(boolean notifyContainer) { 749 assert carrierThread == null; 750 setState(TERMINATED); 751 752 // notifyAll to wakeup any threads waiting for this thread to terminate 753 if (notifyAllAfterTerminate) { 754 synchronized (this) { 755 notifyAll(); 756 } 757 } 758 759 // notify container 760 if (notifyContainer) { 761 threadContainer().remove(this); 762 } 763 764 // clear references to thread locals 765 clearReferences(); 766 } 767 768 /** 769 * Schedules this {@code VirtualThread} to execute. 770 * 771 * @throws IllegalStateException if the container is shutdown or closed 772 * @throws IllegalThreadStateException if the thread has already been started 773 * @throws RejectedExecutionException if the scheduler cannot accept a task 774 */ 775 private void start(ThreadContainer container, boolean lazy) { 776 if (!compareAndSetState(NEW, STARTED)) { 777 throw new IllegalThreadStateException("Already started"); 778 } 779 780 // bind thread to container 781 assert threadContainer() == null; 782 setThreadContainer(container); 783 784 // start thread 785 boolean addedToContainer = false; 786 boolean started = false; 787 try { 788 container.add(this); // may throw 789 addedToContainer = true; 790 791 // scoped values may be inherited 792 inheritScopedValueBindings(container); 793 794 // submit task to schedule 795 try { 796 if (currentThread().isVirtual()) { 797 Continuation.pin(); 798 try { 799 if (scheduler == BUILTIN_SCHEDULER 800 && currentCarrierThread() instanceof CarrierThread ct) { 801 ForkJoinPool pool = ct.getPool(); 802 ForkJoinTask<?> task = ForkJoinTask.adapt(runContinuation); 803 if (lazy) { 804 pool.lazySubmit(task); 805 } else { 806 pool.externalSubmit(task); 807 } 808 } else { 809 scheduler.onStart(runContinuation); 810 } 811 } finally { 812 Continuation.unpin(); 813 } 814 } else { 815 scheduler.onStart(runContinuation); 816 } 817 } catch (RejectedExecutionException ree) { 818 submitFailed(ree); 819 throw ree; 820 } 821 822 started = true; 823 } finally { 824 if (!started) { 825 afterDone(addedToContainer); 826 } 827 } 828 } 829 830 @Override 831 void start(ThreadContainer container) { 832 start(container, false); 833 } 834 835 @Override 836 public void start() { 837 start(ThreadContainers.root(), false); 838 } 839 840 /** 841 * Schedules this thread to begin execution without guarantee that it will execute. 842 */ 843 void lazyStart() { 844 start(ThreadContainers.root(), true); 845 } 846 847 @Override 848 public void run() { 849 // do nothing 850 } 851 852 /** 853 * Invoked by Thread.join before a thread waits for this virtual thread to terminate. 854 */ 855 void beforeJoin() { 856 notifyAllAfterTerminate = true; 857 } 858 859 /** 860 * Parks until unparked or interrupted. If already unparked then the parking 861 * permit is consumed and this method completes immediately (meaning it doesn't 862 * yield). It also completes immediately if the interrupted status is set. 863 */ 864 @Override 865 void park() { 866 assert Thread.currentThread() == this; 867 868 // complete immediately if parking permit available or interrupted 869 if (getAndSetParkPermit(false) || interrupted) 870 return; 871 872 // park the thread 873 boolean yielded = false; 874 setState(PARKING); 875 try { 876 yielded = yieldContinuation(); 877 } catch (OutOfMemoryError e) { 878 // park on carrier 879 } finally { 880 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 881 if (!yielded) { 882 assert state() == PARKING; 883 setState(RUNNING); 884 } 885 } 886 887 // park on the carrier thread when pinned 888 if (!yielded) { 889 parkOnCarrierThread(false, 0); 890 } 891 } 892 893 /** 894 * Parks up to the given waiting time or until unparked or interrupted. 895 * If already unparked then the parking permit is consumed and this method 896 * completes immediately (meaning it doesn't yield). It also completes immediately 897 * if the interrupted status is set or the waiting time is {@code <= 0}. 898 * 899 * @param nanos the maximum number of nanoseconds to wait. 900 */ 901 @Override 902 void parkNanos(long nanos) { 903 assert Thread.currentThread() == this; 904 905 // complete immediately if parking permit available or interrupted 906 if (getAndSetParkPermit(false) || interrupted) 907 return; 908 909 // park the thread for the waiting time 910 if (nanos > 0) { 911 long startTime = System.nanoTime(); 912 913 // park the thread, afterYield will schedule the thread to unpark 914 boolean yielded = false; 915 timeout = nanos; 916 setState(TIMED_PARKING); 917 try { 918 yielded = yieldContinuation(); 919 } catch (OutOfMemoryError e) { 920 // park on carrier 921 } finally { 922 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 923 if (!yielded) { 924 assert state() == TIMED_PARKING; 925 setState(RUNNING); 926 } 927 } 928 929 // park on carrier thread for remaining time when pinned (or OOME) 930 if (!yielded) { 931 long remainingNanos = nanos - (System.nanoTime() - startTime); 932 parkOnCarrierThread(true, remainingNanos); 933 } 934 } 935 } 936 937 /** 938 * Parks the current carrier thread up to the given waiting time or until 939 * unparked or interrupted. If the virtual thread is interrupted then the 940 * interrupted status will be propagated to the carrier thread. 941 * @param timed true for a timed park, false for untimed 942 * @param nanos the waiting time in nanoseconds 943 */ 944 private void parkOnCarrierThread(boolean timed, long nanos) { 945 assert state() == RUNNING; 946 947 setState(timed ? TIMED_PINNED : PINNED); 948 try { 949 if (!parkPermit) { 950 if (!timed) { 951 U.park(false, 0); 952 } else if (nanos > 0) { 953 U.park(false, nanos); 954 } 955 } 956 } finally { 957 setState(RUNNING); 958 } 959 960 // consume parking permit 961 setParkPermit(false); 962 963 // JFR jdk.VirtualThreadPinned event 964 postPinnedEvent("LockSupport.park"); 965 } 966 967 /** 968 * Call into VM when pinned to record a JFR jdk.VirtualThreadPinned event. 969 * Recording the event in the VM avoids having JFR event recorded in Java 970 * with the same name, but different ID, to events recorded by the VM. 971 */ 972 @Hidden 973 private static native void postPinnedEvent(String op); 974 975 /** 976 * Re-enables this virtual thread for scheduling. If this virtual thread is parked 977 * then its task is scheduled to continue, otherwise its next call to {@code park} or 978 * {@linkplain #parkNanos(long) parkNanos} is guaranteed not to block. 979 * @param lazySubmit to use lazySubmit if possible 980 * @throws RejectedExecutionException if the scheduler cannot accept a task 981 */ 982 private void unpark(boolean lazySubmit) { 983 if (!getAndSetParkPermit(true) && currentThread() != this) { 984 int s = state(); 985 986 // unparked while parked 987 if ((s == PARKED || s == TIMED_PARKED) && compareAndSetState(s, UNPARKED)) { 988 989 if (lazySubmit && currentThread().isVirtual()) { 990 Continuation.pin(); 991 try { 992 if (scheduler == BUILTIN_SCHEDULER 993 && currentCarrierThread() instanceof CarrierThread ct) { 994 ct.getPool().lazySubmit(ForkJoinTask.adapt(runContinuation)); 995 return; 996 } 997 } catch (RejectedExecutionException ree) { 998 submitFailed(ree); 999 throw ree; 1000 } catch (OutOfMemoryError e) { 1001 // fall-though 1002 } finally { 1003 Continuation.unpin(); 1004 } 1005 } 1006 1007 submitRunContinuation(); 1008 return; 1009 } 1010 1011 // unparked while parked when pinned 1012 if (s == PINNED || s == TIMED_PINNED) { 1013 // unpark carrier thread when pinned 1014 disableSuspendAndPreempt(); 1015 try { 1016 synchronized (carrierThreadAccessLock()) { 1017 Thread carrier = carrierThread; 1018 if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) { 1019 U.unpark(carrier); 1020 } 1021 } 1022 } finally { 1023 enableSuspendAndPreempt(); 1024 } 1025 return; 1026 } 1027 } 1028 } 1029 1030 @Override 1031 void unpark() { 1032 unpark(false); 1033 } 1034 1035 @Override 1036 void lazyUnpark() { 1037 unpark(true); 1038 } 1039 1040 /** 1041 * Invoked by unblocker thread to unblock this virtual thread. 1042 */ 1043 private void unblock() { 1044 assert !Thread.currentThread().isVirtual(); 1045 blockPermit = true; 1046 if (state() == BLOCKED && compareAndSetState(BLOCKED, UNBLOCKED)) { 1047 submitRunContinuation(); 1048 } 1049 } 1050 1051 /** 1052 * Invoked by FJP worker thread or STPE thread when park timeout expires. 1053 */ 1054 private void parkTimeoutExpired() { 1055 assert !VirtualThread.currentThread().isVirtual(); 1056 unpark(true); 1057 } 1058 1059 /** 1060 * Invoked by FJP worker thread or STPE thread when wait timeout expires. 1061 * If the virtual thread is in timed-wait then this method will unblock the thread 1062 * and submit its task so that it continues and attempts to reenter the monitor. 1063 * This method does nothing if the thread has been woken by notify or interrupt. 1064 */ 1065 private void waitTimeoutExpired(byte seqNo) { 1066 assert !Thread.currentThread().isVirtual(); 1067 1068 synchronized (timedWaitLock()) { 1069 if (seqNo != timedWaitSeqNo) { 1070 // this timeout task is for a past timed-wait 1071 return; 1072 } 1073 if (!compareAndSetState(TIMED_WAIT, UNBLOCKED)) { 1074 // already notified (or interrupted) 1075 return; 1076 } 1077 } 1078 1079 lazySubmitRunContinuation(); 1080 } 1081 1082 /** 1083 * Attempts to yield the current virtual thread (Thread.yield). 1084 */ 1085 void tryYield() { 1086 assert Thread.currentThread() == this; 1087 setState(YIELDING); 1088 boolean yielded = false; 1089 try { 1090 yielded = yieldContinuation(); // may throw 1091 } finally { 1092 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 1093 if (!yielded) { 1094 assert state() == YIELDING; 1095 setState(RUNNING); 1096 } 1097 } 1098 } 1099 1100 /** 1101 * Sleep the current thread for the given sleep time (in nanoseconds). If 1102 * nanos is 0 then the thread will attempt to yield. 1103 * 1104 * @implNote This implementation parks the thread for the given sleeping time 1105 * and will therefore be observed in PARKED state during the sleep. Parking 1106 * will consume the parking permit so this method makes available the parking 1107 * permit after the sleep. This may be observed as a spurious, but benign, 1108 * wakeup when the thread subsequently attempts to park. 1109 * 1110 * @param nanos the maximum number of nanoseconds to sleep 1111 * @throws InterruptedException if interrupted while sleeping 1112 */ 1113 void sleepNanos(long nanos) throws InterruptedException { 1114 assert Thread.currentThread() == this && nanos >= 0; 1115 if (getAndClearInterrupt()) 1116 throw new InterruptedException(); 1117 if (nanos == 0) { 1118 tryYield(); 1119 } else { 1120 // park for the sleep time 1121 try { 1122 long remainingNanos = nanos; 1123 long startNanos = System.nanoTime(); 1124 while (remainingNanos > 0) { 1125 parkNanos(remainingNanos); 1126 if (getAndClearInterrupt()) { 1127 throw new InterruptedException(); 1128 } 1129 remainingNanos = nanos - (System.nanoTime() - startNanos); 1130 } 1131 } finally { 1132 // may have been unparked while sleeping 1133 setParkPermit(true); 1134 } 1135 } 1136 } 1137 1138 @Override 1139 void blockedOn(Interruptible b) { 1140 disableSuspendAndPreempt(); 1141 try { 1142 super.blockedOn(b); 1143 } finally { 1144 enableSuspendAndPreempt(); 1145 } 1146 } 1147 1148 @Override 1149 public void interrupt() { 1150 if (Thread.currentThread() != this) { 1151 // if current thread is a virtual thread then prevent it from being 1152 // suspended or unmounted when entering or holding interruptLock 1153 Interruptible blocker; 1154 disableSuspendAndPreempt(); 1155 try { 1156 synchronized (interruptLock) { 1157 interrupted = true; 1158 blocker = nioBlocker(); 1159 if (blocker != null) { 1160 blocker.interrupt(this); 1161 } 1162 1163 // interrupt carrier thread if mounted 1164 Thread carrier = carrierThread; 1165 if (carrier != null) carrier.setInterrupt(); 1166 } 1167 } finally { 1168 enableSuspendAndPreempt(); 1169 } 1170 1171 // notify blocker after releasing interruptLock 1172 if (blocker != null) { 1173 blocker.postInterrupt(); 1174 } 1175 1176 // make available parking permit, unpark thread if parked 1177 unpark(); 1178 1179 // if thread is waiting in Object.wait then schedule to try to reenter 1180 int s = state(); 1181 if ((s == WAIT || s == TIMED_WAIT) && compareAndSetState(s, UNBLOCKED)) { 1182 submitRunContinuation(); 1183 } 1184 1185 } else { 1186 interrupted = true; 1187 carrierThread.setInterrupt(); 1188 setParkPermit(true); 1189 } 1190 } 1191 1192 @Override 1193 public boolean isInterrupted() { 1194 return interrupted; 1195 } 1196 1197 @Override 1198 boolean getAndClearInterrupt() { 1199 assert Thread.currentThread() == this; 1200 boolean oldValue = interrupted; 1201 if (oldValue) { 1202 disableSuspendAndPreempt(); 1203 try { 1204 synchronized (interruptLock) { 1205 interrupted = false; 1206 carrierThread.clearInterrupt(); 1207 } 1208 } finally { 1209 enableSuspendAndPreempt(); 1210 } 1211 } 1212 return oldValue; 1213 } 1214 1215 @Override 1216 Thread.State threadState() { 1217 switch (state()) { 1218 case NEW: 1219 return Thread.State.NEW; 1220 case STARTED: 1221 // return NEW if thread container not yet set 1222 if (threadContainer() == null) { 1223 return Thread.State.NEW; 1224 } else { 1225 return Thread.State.RUNNABLE; 1226 } 1227 case UNPARKED: 1228 case UNBLOCKED: 1229 case YIELDED: 1230 // runnable, not mounted 1231 return Thread.State.RUNNABLE; 1232 case RUNNING: 1233 // if mounted then return state of carrier thread 1234 if (Thread.currentThread() != this) { 1235 disableSuspendAndPreempt(); 1236 try { 1237 synchronized (carrierThreadAccessLock()) { 1238 Thread carrierThread = this.carrierThread; 1239 if (carrierThread != null) { 1240 return carrierThread.threadState(); 1241 } 1242 } 1243 } finally { 1244 enableSuspendAndPreempt(); 1245 } 1246 } 1247 // runnable, mounted 1248 return Thread.State.RUNNABLE; 1249 case PARKING: 1250 case TIMED_PARKING: 1251 case WAITING: 1252 case TIMED_WAITING: 1253 case YIELDING: 1254 // runnable, in transition 1255 return Thread.State.RUNNABLE; 1256 case PARKED: 1257 case PINNED: 1258 case WAIT: 1259 return Thread.State.WAITING; 1260 case TIMED_PARKED: 1261 case TIMED_PINNED: 1262 case TIMED_WAIT: 1263 return Thread.State.TIMED_WAITING; 1264 case BLOCKING: 1265 case BLOCKED: 1266 return Thread.State.BLOCKED; 1267 case TERMINATED: 1268 return Thread.State.TERMINATED; 1269 default: 1270 throw new InternalError(); 1271 } 1272 } 1273 1274 @Override 1275 boolean alive() { 1276 int s = state; 1277 return (s != NEW && s != TERMINATED); 1278 } 1279 1280 @Override 1281 boolean isTerminated() { 1282 return (state == TERMINATED); 1283 } 1284 1285 @Override 1286 public String toString() { 1287 StringBuilder sb = new StringBuilder("VirtualThread[#"); 1288 sb.append(threadId()); 1289 String name = getName(); 1290 if (!name.isEmpty()) { 1291 sb.append(","); 1292 sb.append(name); 1293 } 1294 sb.append("]/"); 1295 1296 // add the carrier state and thread name when mounted 1297 boolean mounted; 1298 if (Thread.currentThread() == this) { 1299 mounted = appendCarrierInfo(sb); 1300 } else { 1301 disableSuspendAndPreempt(); 1302 try { 1303 synchronized (carrierThreadAccessLock()) { 1304 mounted = appendCarrierInfo(sb); 1305 } 1306 } finally { 1307 enableSuspendAndPreempt(); 1308 } 1309 } 1310 1311 // add virtual thread state when not mounted 1312 if (!mounted) { 1313 String stateAsString = threadState().toString(); 1314 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1315 } 1316 1317 return sb.toString(); 1318 } 1319 1320 /** 1321 * Appends the carrier state and thread name to the string buffer if mounted. 1322 * @return true if mounted, false if not mounted 1323 */ 1324 private boolean appendCarrierInfo(StringBuilder sb) { 1325 assert Thread.currentThread() == this || Thread.holdsLock(carrierThreadAccessLock()); 1326 Thread carrier = carrierThread; 1327 if (carrier != null) { 1328 String stateAsString = carrier.threadState().toString(); 1329 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1330 sb.append('@'); 1331 sb.append(carrier.getName()); 1332 return true; 1333 } else { 1334 return false; 1335 } 1336 } 1337 1338 @Override 1339 public int hashCode() { 1340 return (int) threadId(); 1341 } 1342 1343 @Override 1344 public boolean equals(Object obj) { 1345 return obj == this; 1346 } 1347 1348 /** 1349 * Returns the lock object to synchronize on when accessing carrierThread. 1350 * The lock prevents carrierThread from being reset to null during unmount. 1351 */ 1352 private Object carrierThreadAccessLock() { 1353 // return interruptLock as unmount has to coordinate with interrupt 1354 return interruptLock; 1355 } 1356 1357 /** 1358 * Returns a lock object for coordinating timed-wait setup and timeout handling. 1359 */ 1360 private Object timedWaitLock() { 1361 // use this object for now to avoid the overhead of introducing another lock 1362 return runContinuation; 1363 } 1364 1365 /** 1366 * Disallow the current thread be suspended or preempted. 1367 */ 1368 private void disableSuspendAndPreempt() { 1369 notifyJvmtiDisableSuspend(true); 1370 Continuation.pin(); 1371 } 1372 1373 /** 1374 * Allow the current thread be suspended or preempted. 1375 */ 1376 private void enableSuspendAndPreempt() { 1377 Continuation.unpin(); 1378 notifyJvmtiDisableSuspend(false); 1379 } 1380 1381 // -- wrappers for get/set of state, parking permit, and carrier thread -- 1382 1383 private int state() { 1384 return state; // volatile read 1385 } 1386 1387 private void setState(int newValue) { 1388 state = newValue; // volatile write 1389 } 1390 1391 private boolean compareAndSetState(int expectedValue, int newValue) { 1392 return U.compareAndSetInt(this, STATE, expectedValue, newValue); 1393 } 1394 1395 private boolean compareAndSetOnWaitingList(boolean expectedValue, boolean newValue) { 1396 return U.compareAndSetBoolean(this, ON_WAITING_LIST, expectedValue, newValue); 1397 } 1398 1399 private void setParkPermit(boolean newValue) { 1400 if (parkPermit != newValue) { 1401 parkPermit = newValue; 1402 } 1403 } 1404 1405 private boolean getAndSetParkPermit(boolean newValue) { 1406 if (parkPermit != newValue) { 1407 return U.getAndSetBoolean(this, PARK_PERMIT, newValue); 1408 } else { 1409 return newValue; 1410 } 1411 } 1412 1413 private void setCarrierThread(Thread carrier) { 1414 // U.putReferenceRelease(this, CARRIER_THREAD, carrier); 1415 this.carrierThread = carrier; 1416 } 1417 1418 // The following four methods notify the VM when a "transition" starts and ends. 1419 // A "mount transition" embodies the steps to transfer control from a platform 1420 // thread to a virtual thread, changing the thread identity, and starting or 1421 // resuming the virtual thread's continuation on the carrier. 1422 // An "unmount transition" embodies the steps to transfer control from a virtual 1423 // thread to its carrier, suspending the virtual thread's continuation, and 1424 // restoring the thread identity to the platform thread. 1425 // The notifications to the VM are necessary in order to coordinate with functions 1426 // (JVMTI mostly) that disable transitions for one or all virtual threads. Starting 1427 // a transition may block if transitions are disabled. Ending a transition may 1428 // notify a thread that is waiting to disable transitions. The notifications are 1429 // also used to post JVMTI events for virtual thread start and end. 1430 1431 @IntrinsicCandidate 1432 @JvmtiMountTransition 1433 private native void endFirstTransition(); 1434 1435 @IntrinsicCandidate 1436 @JvmtiMountTransition 1437 private native void startFinalTransition(); 1438 1439 @IntrinsicCandidate 1440 @JvmtiMountTransition 1441 private native void startTransition(boolean mount); 1442 1443 @IntrinsicCandidate 1444 @JvmtiMountTransition 1445 private native void endTransition(boolean mount); 1446 1447 @IntrinsicCandidate 1448 private static native void notifyJvmtiDisableSuspend(boolean enter); 1449 1450 private static native void registerNatives(); 1451 static { 1452 registerNatives(); 1453 1454 // ensure VTHREAD_GROUP is created, may be accessed by JVMTI 1455 var group = Thread.virtualThreadGroup(); 1456 } 1457 1458 /** 1459 * Loads a VirtualThreadScheduler with the given class name. The class must be public 1460 * in an exported package, with public one-arg or no-arg constructor, and be visible 1461 * to the system class loader. 1462 * @param delegate the scheduler that the custom scheduler may delegate to 1463 * @param cn the class name of the custom scheduler 1464 */ 1465 private static VirtualThreadScheduler loadCustomScheduler(VirtualThreadScheduler delegate, String cn) { 1466 VirtualThreadScheduler scheduler; 1467 try { 1468 Class<?> clazz = Class.forName(cn, true, ClassLoader.getSystemClassLoader()); 1469 // 1-arg constructor 1470 try { 1471 Constructor<?> ctor = clazz.getConstructor(VirtualThreadScheduler.class); 1472 return (VirtualThreadScheduler) ctor.newInstance(delegate); 1473 } catch (NoSuchMethodException e) { 1474 // 0-arg constructor 1475 Constructor<?> ctor = clazz.getConstructor(); 1476 scheduler = (VirtualThreadScheduler) ctor.newInstance(); 1477 } 1478 } catch (Exception ex) { 1479 throw new Error(ex); 1480 } 1481 System.err.println("WARNING: Using custom default scheduler, this is an experimental feature!"); 1482 return scheduler; 1483 } 1484 1485 /** 1486 * Creates the built-in ForkJoinPool scheduler. 1487 * @param wrapped true if wrapped by a custom default scheduler 1488 */ 1489 private static VirtualThreadScheduler createBuiltinScheduler(boolean wrapped) { 1490 int parallelism, maxPoolSize, minRunnable; 1491 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism"); 1492 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize"); 1493 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable"); 1494 if (parallelismValue != null) { 1495 parallelism = Integer.parseInt(parallelismValue); 1496 } else { 1497 parallelism = Runtime.getRuntime().availableProcessors(); 1498 } 1499 if (maxPoolSizeValue != null) { 1500 maxPoolSize = Integer.parseInt(maxPoolSizeValue); 1501 parallelism = Integer.min(parallelism, maxPoolSize); 1502 } else { 1503 maxPoolSize = Integer.max(parallelism, 256); 1504 } 1505 if (minRunnableValue != null) { 1506 minRunnable = Integer.parseInt(minRunnableValue); 1507 } else { 1508 minRunnable = Integer.max(parallelism / 2, 1); 1509 } 1510 if (Boolean.getBoolean("jdk.virtualThreadScheduler.useTPE")) { 1511 return new BuiltinThreadPoolExecutorScheduler(parallelism); 1512 } else { 1513 return new BuiltinForkJoinPoolScheduler(parallelism, maxPoolSize, minRunnable, wrapped); 1514 } 1515 } 1516 1517 /** 1518 * The built-in ForkJoinPool scheduler. 1519 */ 1520 private static class BuiltinForkJoinPoolScheduler 1521 extends ForkJoinPool implements VirtualThreadScheduler { 1522 1523 BuiltinForkJoinPoolScheduler(int parallelism, int maxPoolSize, int minRunnable, boolean wrapped) { 1524 ForkJoinWorkerThreadFactory factory = wrapped 1525 ? ForkJoinPool.defaultForkJoinWorkerThreadFactory 1526 : CarrierThread::new; 1527 Thread.UncaughtExceptionHandler handler = (t, e) -> { }; 1528 boolean asyncMode = true; // FIFO 1529 super(parallelism, factory, handler, asyncMode, 1530 0, maxPoolSize, minRunnable, pool -> true, 30L, SECONDS); 1531 } 1532 1533 @Override 1534 public void onStart(VirtualThreadTask task) { 1535 execute(ForkJoinTask.adapt(task)); 1536 } 1537 1538 @Override 1539 public void onContinue(VirtualThreadTask task) { 1540 execute(ForkJoinTask.adapt(task)); 1541 } 1542 1543 @Override 1544 public ScheduledFuture<?> schedule(Runnable task, long delay, TimeUnit unit) { 1545 return super.schedule(task, delay, unit); 1546 } 1547 } 1548 1549 /** 1550 * Built-in ThreadPoolExecutor scheduler. 1551 */ 1552 private static class BuiltinThreadPoolExecutorScheduler 1553 extends ThreadPoolExecutor implements VirtualThreadScheduler { 1554 1555 BuiltinThreadPoolExecutorScheduler(int maxPoolSize) { 1556 ThreadFactory factory = task -> { 1557 Thread t = InnocuousThread.newThread(task); 1558 t.setDaemon(true); 1559 return t; 1560 }; 1561 super(maxPoolSize, maxPoolSize, 1562 0L, SECONDS, 1563 new LinkedTransferQueue<>(), 1564 factory); 1565 } 1566 1567 @Override 1568 public void onStart(VirtualThreadTask task) { 1569 execute(task); 1570 } 1571 1572 @Override 1573 public void onContinue(VirtualThreadTask task) { 1574 execute(task); 1575 } 1576 } 1577 1578 /** 1579 * Wraps the scheduler to avoid leaking a direct reference to built-in scheduler. 1580 */ 1581 static VirtualThreadScheduler createExternalView(VirtualThreadScheduler delegate) { 1582 return new VirtualThreadScheduler() { 1583 private void check(VirtualThreadTask task) { 1584 var vthread = (VirtualThread) task.thread(); 1585 VirtualThreadScheduler scheduler = vthread.scheduler; 1586 if (scheduler != this && scheduler != DEFAULT_SCHEDULER) { 1587 throw new IllegalArgumentException(); 1588 } 1589 } 1590 @Override 1591 public void onStart(VirtualThreadTask task) { 1592 check(task); 1593 delegate.onStart(task); 1594 } 1595 @Override 1596 public void onContinue(VirtualThreadTask task) { 1597 check(task); 1598 delegate.onContinue(task); 1599 } 1600 @Override 1601 public String toString() { 1602 return delegate.toString(); 1603 } 1604 }; 1605 } 1606 1607 /** 1608 * Schedule a runnable task to run after a delay. 1609 */ 1610 private Future<?> schedule(Runnable command, long delay, TimeUnit unit) { 1611 if (USE_STPE) { 1612 return DelayedTaskSchedulers.schedule(command, delay, unit); 1613 } else { 1614 return scheduler.schedule(command, delay, unit); 1615 } 1616 } 1617 1618 /** 1619 * Supports scheduling a runnable task to run after a delay. It uses a number 1620 * of ScheduledThreadPoolExecutor instances to reduce contention on the delayed 1621 * work queue used. This class is used when using a custom scheduler. 1622 */ 1623 static class DelayedTaskSchedulers { 1624 private static final ScheduledExecutorService[] INSTANCE = createDelayedTaskSchedulers(); 1625 1626 static Future<?> schedule(Runnable command, long delay, TimeUnit unit) { 1627 long tid = Thread.currentThread().threadId(); 1628 int index = (int) tid & (INSTANCE.length - 1); 1629 return INSTANCE[index].schedule(command, delay, unit); 1630 } 1631 1632 private static ScheduledExecutorService[] createDelayedTaskSchedulers() { 1633 String propName = "jdk.virtualThreadScheduler.timerQueues"; 1634 String propValue = System.getProperty(propName); 1635 int queueCount; 1636 if (propValue != null) { 1637 queueCount = Integer.parseInt(propValue); 1638 if (queueCount != Integer.highestOneBit(queueCount)) { 1639 throw new RuntimeException("Value of " + propName + " must be power of 2"); 1640 } 1641 } else { 1642 int ncpus = Runtime.getRuntime().availableProcessors(); 1643 queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1); 1644 } 1645 var schedulers = new ScheduledExecutorService[queueCount]; 1646 for (int i = 0; i < queueCount; i++) { 1647 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) 1648 Executors.newScheduledThreadPool(1, task -> { 1649 Thread t = InnocuousThread.newThread("VirtualThread-unparker", task); 1650 t.setDaemon(true); 1651 return t; 1652 }); 1653 stpe.setRemoveOnCancelPolicy(true); 1654 schedulers[i] = stpe; 1655 } 1656 return schedulers; 1657 } 1658 } 1659 1660 /** 1661 * Schedule virtual threads that are ready to be scheduled after they blocked on 1662 * monitor enter. 1663 */ 1664 private static void unblockVirtualThreads() { 1665 while (true) { 1666 VirtualThread vthread = takeVirtualThreadListToUnblock(); 1667 while (vthread != null) { 1668 assert vthread.onWaitingList; 1669 VirtualThread nextThread = vthread.next; 1670 1671 // remove from list and unblock 1672 vthread.next = null; 1673 boolean changed = vthread.compareAndSetOnWaitingList(true, false); 1674 assert changed; 1675 vthread.unblock(); 1676 1677 vthread = nextThread; 1678 } 1679 } 1680 } 1681 1682 /** 1683 * Retrieves the list of virtual threads that are waiting to be unblocked, waiting 1684 * if necessary until a list of one or more threads becomes available. 1685 */ 1686 private static native VirtualThread takeVirtualThreadListToUnblock(); 1687 1688 static { 1689 var unblocker = InnocuousThread.newThread("VirtualThread-unblocker", 1690 VirtualThread::unblockVirtualThreads); 1691 unblocker.setDaemon(true); 1692 unblocker.start(); 1693 } 1694 } --- EOF ---