1 /* 2 * Copyright (c) 2018, 2024, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.lang; 26 27 import java.lang.reflect.Constructor; 28 import java.util.Arrays; 29 import java.util.Locale; 30 import java.util.Objects; 31 import java.util.concurrent.CountDownLatch; 32 import java.util.concurrent.Executor; 33 import java.util.concurrent.Executors; 34 import java.util.concurrent.ForkJoinPool; 35 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory; 36 import java.util.concurrent.ForkJoinTask; 37 import java.util.concurrent.Future; 38 import java.util.concurrent.RejectedExecutionException; 39 import java.util.concurrent.ScheduledExecutorService; 40 import java.util.concurrent.ScheduledThreadPoolExecutor; 41 import java.util.concurrent.TimeUnit; 42 import java.util.stream.Stream; 43 import jdk.internal.event.VirtualThreadEndEvent; 44 import jdk.internal.event.VirtualThreadStartEvent; 45 import jdk.internal.event.VirtualThreadSubmitFailedEvent; 46 import jdk.internal.misc.CarrierThread; 47 import jdk.internal.misc.InnocuousThread; 48 import jdk.internal.misc.Unsafe; 49 import jdk.internal.vm.Continuation; 50 import jdk.internal.vm.ContinuationScope; 51 import jdk.internal.vm.StackableScope; 52 import jdk.internal.vm.ThreadContainer; 53 import jdk.internal.vm.ThreadContainers; 54 import jdk.internal.vm.annotation.ChangesCurrentThread; 55 import jdk.internal.vm.annotation.Hidden; 56 import jdk.internal.vm.annotation.IntrinsicCandidate; 57 import jdk.internal.vm.annotation.JvmtiHideEvents; 58 import jdk.internal.vm.annotation.JvmtiMountTransition; 59 import jdk.internal.vm.annotation.ReservedStackAccess; 60 import sun.nio.ch.Interruptible; 61 import static java.util.concurrent.TimeUnit.*; 62 63 /** 64 * A thread that is scheduled by the Java virtual machine rather than the operating system. 65 */ 66 final class VirtualThread extends BaseVirtualThread { 67 private static final Unsafe U = Unsafe.getUnsafe(); 68 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads"); 69 private static final Executor DEFAULT_SCHEDULER = createDefaultScheduler(); 70 private static final ScheduledExecutorService[] DELAYED_TASK_SCHEDULERS = createDelayedTaskSchedulers(); 71 72 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state"); 73 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit"); 74 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread"); 75 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination"); 76 private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList"); 77 78 // scheduler and continuation 79 private final Executor scheduler; 80 private final Continuation cont; 81 private final Runnable runContinuation; 82 83 // virtual thread state, accessed by VM 84 private volatile int state; 85 86 /* 87 * Virtual thread state transitions: 88 * 89 * NEW -> STARTED // Thread.start, schedule to run 90 * STARTED -> TERMINATED // failed to start 91 * STARTED -> RUNNING // first run 92 * RUNNING -> TERMINATED // done 93 * 94 * RUNNING -> PARKING // Thread parking with LockSupport.park 95 * PARKING -> PARKED // cont.yield successful, parked indefinitely 96 * PARKING -> PINNED // cont.yield failed, parked indefinitely on carrier 97 * PARKED -> UNPARKED // unparked, may be scheduled to continue 98 * PINNED -> RUNNING // unparked, continue execution on same carrier 99 * UNPARKED -> RUNNING // continue execution after park 100 * 101 * RUNNING -> TIMED_PARKING // Thread parking with LockSupport.parkNanos 102 * TIMED_PARKING -> TIMED_PARKED // cont.yield successful, timed-parked 103 * TIMED_PARKING -> TIMED_PINNED // cont.yield failed, timed-parked on carrier 104 * TIMED_PARKED -> UNPARKED // unparked, may be scheduled to continue 105 * TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier 106 * 107 * RUNNING -> BLOCKING // blocking on monitor enter 108 * BLOCKING -> BLOCKED // blocked on monitor enter 109 * BLOCKED -> UNBLOCKED // unblocked, may be scheduled to continue 110 * UNBLOCKED -> RUNNING // continue execution after blocked on monitor enter 111 * 112 * RUNNING -> WAITING // transitional state during wait on monitor 113 * WAITING -> WAITED // waiting on monitor 114 * WAITED -> BLOCKED // notified, waiting to be unblocked by monitor owner 115 * WAITED -> UNBLOCKED // timed-out/interrupted 116 * 117 * RUNNING -> TIMED_WAITING // transition state during timed-waiting on monitor 118 * TIMED_WAITING -> TIMED_WAITED // timed-waiting on monitor 119 * TIMED_WAITED -> BLOCKED // notified, waiting to be unblocked by monitor owner 120 * TIMED_WAITED -> UNBLOCKED // timed-out/interrupted 121 * 122 * RUNNING -> YIELDING // Thread.yield 123 * YIELDING -> YIELDED // cont.yield successful, may be scheduled to continue 124 * YIELDING -> RUNNING // cont.yield failed 125 * YIELDED -> RUNNING // continue execution after Thread.yield 126 */ 127 private static final int NEW = 0; 128 private static final int STARTED = 1; 129 private static final int RUNNING = 2; // runnable-mounted 130 131 // untimed and timed parking 132 private static final int PARKING = 3; 133 private static final int PARKED = 4; // unmounted 134 private static final int PINNED = 5; // mounted 135 private static final int TIMED_PARKING = 6; 136 private static final int TIMED_PARKED = 7; // unmounted 137 private static final int TIMED_PINNED = 8; // mounted 138 private static final int UNPARKED = 9; // unmounted but runnable 139 140 // Thread.yield 141 private static final int YIELDING = 10; 142 private static final int YIELDED = 11; // unmounted but runnable 143 144 // monitor enter 145 private static final int BLOCKING = 12; 146 private static final int BLOCKED = 13; // unmounted 147 private static final int UNBLOCKED = 14; // unmounted but runnable 148 149 // monitor wait/timed-wait 150 private static final int WAITING = 15; 151 private static final int WAIT = 16; // waiting in Object.wait 152 private static final int TIMED_WAITING = 17; 153 private static final int TIMED_WAIT = 18; // waiting in timed-Object.wait 154 155 private static final int TERMINATED = 99; // final state 156 157 // can be suspended from scheduling when unmounted 158 private static final int SUSPENDED = 1 << 8; 159 160 // parking permit made available by LockSupport.unpark 161 private volatile boolean parkPermit; 162 163 // blocking permit made available by unblocker thread when another thread exits monitor 164 private volatile boolean blockPermit; 165 166 // true when on the list of virtual threads waiting to be unblocked 167 private volatile boolean onWaitingList; 168 169 // next virtual thread on the list of virtual threads waiting to be unblocked 170 private volatile VirtualThread next; 171 172 // notified by Object.notify/notifyAll while waiting in Object.wait 173 private volatile boolean notified; 174 175 // timed-wait support 176 private byte timedWaitSeqNo; 177 178 // timeout for timed-park and timed-wait, only accessed on current/carrier thread 179 private long timeout; 180 181 // timer task for timed-park and timed-wait, only accessed on current/carrier thread 182 private Future<?> timeoutTask; 183 184 // carrier thread when mounted, accessed by VM 185 private volatile Thread carrierThread; 186 187 // termination object when joining, created lazily if needed 188 private volatile CountDownLatch termination; 189 190 /** 191 * Returns the default scheduler. 192 */ 193 static Executor defaultScheduler() { 194 return DEFAULT_SCHEDULER; 195 } 196 197 /** 198 * Returns a stream of the delayed task schedulers used to support timed operations. 199 */ 200 static Stream<ScheduledExecutorService> delayedTaskSchedulers() { 201 return Arrays.stream(DELAYED_TASK_SCHEDULERS); 202 } 203 204 /** 205 * Returns the continuation scope used for virtual threads. 206 */ 207 static ContinuationScope continuationScope() { 208 return VTHREAD_SCOPE; 209 } 210 211 /** 212 * Creates a new {@code VirtualThread} to run the given task with the given 213 * scheduler. If the given scheduler is {@code null} and the current thread 214 * is a platform thread then the newly created virtual thread will use the 215 * default scheduler. If given scheduler is {@code null} and the current 216 * thread is a virtual thread then the current thread's scheduler is used. 217 * 218 * @param scheduler the scheduler or null 219 * @param name thread name 220 * @param characteristics characteristics 221 * @param task the task to execute 222 */ 223 VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) { 224 super(name, characteristics, /*bound*/ false); 225 Objects.requireNonNull(task); 226 227 // choose scheduler if not specified 228 if (scheduler == null) { 229 Thread parent = Thread.currentThread(); 230 if (parent instanceof VirtualThread vparent) { 231 scheduler = vparent.scheduler; 232 } else { 233 scheduler = DEFAULT_SCHEDULER; 234 } 235 } 236 237 this.scheduler = scheduler; 238 this.cont = new VThreadContinuation(this, task); 239 this.runContinuation = this::runContinuation; 240 } 241 242 /** 243 * The continuation that a virtual thread executes. 244 */ 245 private static class VThreadContinuation extends Continuation { 246 VThreadContinuation(VirtualThread vthread, Runnable task) { 247 super(VTHREAD_SCOPE, wrap(vthread, task)); 248 } 249 @Override 250 protected void onPinned(Continuation.Pinned reason) { 251 } 252 private static Runnable wrap(VirtualThread vthread, Runnable task) { 253 return new Runnable() { 254 @Hidden 255 @JvmtiHideEvents 256 public void run() { 257 vthread.notifyJvmtiStart(); // notify JVMTI 258 try { 259 vthread.run(task); 260 } finally { 261 vthread.notifyJvmtiEnd(); // notify JVMTI 262 } 263 } 264 }; 265 } 266 } 267 268 /** 269 * Runs or continues execution on the current thread. The virtual thread is mounted 270 * on the current thread before the task runs or continues. It unmounts when the 271 * task completes or yields. 272 */ 273 @ChangesCurrentThread // allow mount/unmount to be inlined 274 private void runContinuation() { 275 // the carrier must be a platform thread 276 if (Thread.currentThread().isVirtual()) { 277 throw new WrongThreadException(); 278 } 279 280 // set state to RUNNING 281 int initialState = state(); 282 if (initialState == STARTED || initialState == UNPARKED 283 || initialState == UNBLOCKED || initialState == YIELDED) { 284 // newly started or continue after parking/blocking/Thread.yield 285 if (!compareAndSetState(initialState, RUNNING)) { 286 return; 287 } 288 // consume permit when continuing after parking or blocking. If continue 289 // after a timed-park or timed-wait then the timeout task is cancelled. 290 if (initialState == UNPARKED) { 291 cancelTimeoutTask(); 292 setParkPermit(false); 293 } else if (initialState == UNBLOCKED) { 294 cancelTimeoutTask(); 295 blockPermit = false; 296 } 297 } else { 298 // not runnable 299 return; 300 } 301 302 mount(); 303 try { 304 cont.run(); 305 } finally { 306 unmount(); 307 if (cont.isDone()) { 308 afterDone(); 309 } else { 310 afterYield(); 311 } 312 } 313 } 314 315 /** 316 * Cancel timeout task when continuing after timed-park or timed-wait. 317 * The timeout task may be executing, or may have already completed. 318 */ 319 private void cancelTimeoutTask() { 320 if (timeoutTask != null) { 321 timeoutTask.cancel(false); 322 timeoutTask = null; 323 } 324 } 325 326 /** 327 * Submits the runContinuation task to the scheduler. For the default scheduler, 328 * and calling it on a worker thread, the task will be pushed to the local queue, 329 * otherwise it will be pushed to an external submission queue. 330 * @param scheduler the scheduler 331 * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown 332 * @throws RejectedExecutionException 333 */ 334 private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) { 335 boolean done = false; 336 while (!done) { 337 try { 338 // Pin the continuation to prevent the virtual thread from unmounting 339 // when submitting a task. For the default scheduler this ensures that 340 // the carrier doesn't change when pushing a task. For other schedulers 341 // it avoids deadlock that could arise due to carriers and virtual 342 // threads contending for a lock. 343 if (currentThread().isVirtual()) { 344 Continuation.pin(); 345 try { 346 scheduler.execute(runContinuation); 347 } finally { 348 Continuation.unpin(); 349 } 350 } else { 351 scheduler.execute(runContinuation); 352 } 353 done = true; 354 } catch (RejectedExecutionException ree) { 355 submitFailed(ree); 356 throw ree; 357 } catch (OutOfMemoryError e) { 358 if (retryOnOOME) { 359 U.park(false, 100_000_000); // 100ms 360 } else { 361 throw e; 362 } 363 } 364 } 365 } 366 367 /** 368 * Submits the runContinuation task to the given scheduler as an external submit. 369 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 370 * @throws RejectedExecutionException 371 * @see ForkJoinPool#externalSubmit(ForkJoinTask) 372 */ 373 private void externalSubmitRunContinuation(ForkJoinPool pool) { 374 assert Thread.currentThread() instanceof CarrierThread; 375 try { 376 pool.externalSubmit(ForkJoinTask.adapt(runContinuation)); 377 } catch (RejectedExecutionException ree) { 378 submitFailed(ree); 379 throw ree; 380 } catch (OutOfMemoryError e) { 381 submitRunContinuation(pool, true); 382 } 383 } 384 385 /** 386 * Submits the runContinuation task to the scheduler. For the default scheduler, 387 * and calling it on a worker thread, the task will be pushed to the local queue, 388 * otherwise it will be pushed to an external submission queue. 389 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 390 * @throws RejectedExecutionException 391 */ 392 private void submitRunContinuation() { 393 submitRunContinuation(scheduler, true); 394 } 395 396 /** 397 * Lazy submit the runContinuation task if invoked on a carrier thread and its local 398 * queue is empty. If not empty, or invoked by another thread, then this method works 399 * like submitRunContinuation and just submits the task to the scheduler. 400 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds. 401 * @throws RejectedExecutionException 402 * @see ForkJoinPool#lazySubmit(ForkJoinTask) 403 */ 404 private void lazySubmitRunContinuation() { 405 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 406 ForkJoinPool pool = ct.getPool(); 407 try { 408 pool.lazySubmit(ForkJoinTask.adapt(runContinuation)); 409 } catch (RejectedExecutionException ree) { 410 submitFailed(ree); 411 throw ree; 412 } catch (OutOfMemoryError e) { 413 submitRunContinuation(); 414 } 415 } else { 416 submitRunContinuation(); 417 } 418 } 419 420 /** 421 * Submits the runContinuation task to the scheduler. For the default scheduler, and 422 * calling it a virtual thread that uses the default scheduler, the task will be 423 * pushed to an external submission queue. This method may throw OutOfMemoryError. 424 * @throws RejectedExecutionException 425 * @throws OutOfMemoryError 426 */ 427 private void externalSubmitRunContinuationOrThrow() { 428 if (scheduler == DEFAULT_SCHEDULER && currentCarrierThread() instanceof CarrierThread ct) { 429 try { 430 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation)); 431 } catch (RejectedExecutionException ree) { 432 submitFailed(ree); 433 throw ree; 434 } 435 } else { 436 submitRunContinuation(scheduler, false); 437 } 438 } 439 440 /** 441 * If enabled, emits a JFR VirtualThreadSubmitFailedEvent. 442 */ 443 private void submitFailed(RejectedExecutionException ree) { 444 var event = new VirtualThreadSubmitFailedEvent(); 445 if (event.isEnabled()) { 446 event.javaThreadId = threadId(); 447 event.exceptionMessage = ree.getMessage(); 448 event.commit(); 449 } 450 } 451 452 /** 453 * Runs a task in the context of this virtual thread. 454 */ 455 private void run(Runnable task) { 456 assert Thread.currentThread() == this && state == RUNNING; 457 458 // emit JFR event if enabled 459 if (VirtualThreadStartEvent.isTurnedOn()) { 460 var event = new VirtualThreadStartEvent(); 461 event.javaThreadId = threadId(); 462 event.commit(); 463 } 464 465 Object bindings = Thread.scopedValueBindings(); 466 try { 467 runWith(bindings, task); 468 } catch (Throwable exc) { 469 dispatchUncaughtException(exc); 470 } finally { 471 // pop any remaining scopes from the stack, this may block 472 StackableScope.popAll(); 473 474 // emit JFR event if enabled 475 if (VirtualThreadEndEvent.isTurnedOn()) { 476 var event = new VirtualThreadEndEvent(); 477 event.javaThreadId = threadId(); 478 event.commit(); 479 } 480 } 481 } 482 483 /** 484 * Mounts this virtual thread onto the current platform thread. On 485 * return, the current thread is the virtual thread. 486 */ 487 @ChangesCurrentThread 488 @ReservedStackAccess 489 private void mount() { 490 // notify JVMTI before mount 491 notifyJvmtiMount(/*hide*/true); 492 493 // sets the carrier thread 494 Thread carrier = Thread.currentCarrierThread(); 495 setCarrierThread(carrier); 496 497 // sync up carrier thread interrupt status if needed 498 if (interrupted) { 499 carrier.setInterrupt(); 500 } else if (carrier.isInterrupted()) { 501 synchronized (interruptLock) { 502 // need to recheck interrupt status 503 if (!interrupted) { 504 carrier.clearInterrupt(); 505 } 506 } 507 } 508 509 // set Thread.currentThread() to return this virtual thread 510 carrier.setCurrentThread(this); 511 } 512 513 /** 514 * Unmounts this virtual thread from the carrier. On return, the 515 * current thread is the current platform thread. 516 */ 517 @ChangesCurrentThread 518 @ReservedStackAccess 519 private void unmount() { 520 assert !Thread.holdsLock(interruptLock); 521 522 // set Thread.currentThread() to return the platform thread 523 Thread carrier = this.carrierThread; 524 carrier.setCurrentThread(carrier); 525 526 // break connection to carrier thread, synchronized with interrupt 527 synchronized (interruptLock) { 528 setCarrierThread(null); 529 } 530 carrier.clearInterrupt(); 531 532 // notify JVMTI after unmount 533 notifyJvmtiUnmount(/*hide*/false); 534 } 535 536 /** 537 * Invokes Continuation.yield, notifying JVMTI (if enabled) to hide frames until 538 * the continuation continues. 539 */ 540 @Hidden 541 private boolean yieldContinuation() { 542 notifyJvmtiUnmount(/*hide*/true); 543 try { 544 return Continuation.yield(VTHREAD_SCOPE); 545 } finally { 546 notifyJvmtiMount(/*hide*/false); 547 } 548 } 549 550 /** 551 * Invoked in the context of the carrier thread after the Continuation yields when 552 * parking, blocking on monitor enter, Object.wait, or Thread.yield. 553 */ 554 private void afterYield() { 555 assert carrierThread == null; 556 557 // re-adjust parallelism if the virtual thread yielded when compensating 558 if (currentThread() instanceof CarrierThread ct) { 559 ct.endBlocking(); 560 } 561 562 int s = state(); 563 564 // LockSupport.park/parkNanos 565 if (s == PARKING || s == TIMED_PARKING) { 566 int newState; 567 if (s == PARKING) { 568 setState(newState = PARKED); 569 } else { 570 // schedule unpark 571 assert timeout > 0; 572 timeoutTask = schedule(this::unpark, timeout, NANOSECONDS); 573 setState(newState = TIMED_PARKED); 574 } 575 576 // may have been unparked while parking 577 if (parkPermit && compareAndSetState(newState, UNPARKED)) { 578 // lazy submit if local queue is empty 579 lazySubmitRunContinuation(); 580 } 581 return; 582 } 583 584 // Thread.yield 585 if (s == YIELDING) { 586 setState(YIELDED); 587 588 // external submit if there are no tasks in the local task queue 589 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) { 590 externalSubmitRunContinuation(ct.getPool()); 591 } else { 592 submitRunContinuation(); 593 } 594 return; 595 } 596 597 // blocking on monitorenter 598 if (s == BLOCKING) { 599 setState(BLOCKED); 600 601 // may have been unblocked while blocking 602 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) { 603 // lazy submit if local queue is empty 604 lazySubmitRunContinuation(); 605 } 606 return; 607 } 608 609 // Object.wait 610 if (s == WAITING || s == TIMED_WAITING) { 611 int newState; 612 if (s == WAITING) { 613 setState(newState = WAIT); 614 } else { 615 // For timed-wait, a timeout task is scheduled to execute. The timeout 616 // task will change the thread state to UNBLOCKED and submit the thread 617 // to the scheduler. A sequence number is used to ensure that the timeout 618 // task only unblocks the thread for this timed-wait. We synchronize with 619 // the timeout task to coordinate access to the sequence number and to 620 // ensure the timeout task doesn't execute until the thread has got to 621 // the TIMED_WAIT state. 622 assert timeout > 0; 623 synchronized (timedWaitLock()) { 624 byte seqNo = ++timedWaitSeqNo; 625 timeoutTask = schedule(() -> waitTimeoutExpired(seqNo), timeout, MILLISECONDS); 626 setState(newState = TIMED_WAIT); 627 } 628 } 629 630 // may have been notified while in transition to wait state 631 if (notified && compareAndSetState(newState, BLOCKED)) { 632 // may have even been unblocked already 633 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) { 634 submitRunContinuation(); 635 } 636 return; 637 } 638 639 // may have been interrupted while in transition to wait state 640 if (interrupted && compareAndSetState(newState, UNBLOCKED)) { 641 submitRunContinuation(); 642 return; 643 } 644 return; 645 } 646 647 assert false; 648 } 649 650 /** 651 * Invoked after the continuation completes. 652 */ 653 private void afterDone() { 654 afterDone(true); 655 } 656 657 /** 658 * Invoked after the continuation completes (or start failed). Sets the thread 659 * state to TERMINATED and notifies anyone waiting for the thread to terminate. 660 * 661 * @param notifyContainer true if its container should be notified 662 */ 663 private void afterDone(boolean notifyContainer) { 664 assert carrierThread == null; 665 setState(TERMINATED); 666 667 // notify anyone waiting for this virtual thread to terminate 668 CountDownLatch termination = this.termination; 669 if (termination != null) { 670 assert termination.getCount() == 1; 671 termination.countDown(); 672 } 673 674 // notify container 675 if (notifyContainer) { 676 threadContainer().onExit(this); 677 } 678 679 // clear references to thread locals 680 clearReferences(); 681 } 682 683 /** 684 * Schedules this {@code VirtualThread} to execute. 685 * 686 * @throws IllegalStateException if the container is shutdown or closed 687 * @throws IllegalThreadStateException if the thread has already been started 688 * @throws RejectedExecutionException if the scheduler cannot accept a task 689 */ 690 @Override 691 void start(ThreadContainer container) { 692 if (!compareAndSetState(NEW, STARTED)) { 693 throw new IllegalThreadStateException("Already started"); 694 } 695 696 // bind thread to container 697 assert threadContainer() == null; 698 setThreadContainer(container); 699 700 // start thread 701 boolean addedToContainer = false; 702 boolean started = false; 703 try { 704 container.onStart(this); // may throw 705 addedToContainer = true; 706 707 // scoped values may be inherited 708 inheritScopedValueBindings(container); 709 710 // submit task to run thread, using externalSubmit if possible 711 externalSubmitRunContinuationOrThrow(); 712 started = true; 713 } finally { 714 if (!started) { 715 afterDone(addedToContainer); 716 } 717 } 718 } 719 720 @Override 721 public void start() { 722 start(ThreadContainers.root()); 723 } 724 725 @Override 726 public void run() { 727 // do nothing 728 } 729 730 /** 731 * Parks until unparked or interrupted. If already unparked then the parking 732 * permit is consumed and this method completes immediately (meaning it doesn't 733 * yield). It also completes immediately if the interrupt status is set. 734 */ 735 @Override 736 void park() { 737 assert Thread.currentThread() == this; 738 739 // complete immediately if parking permit available or interrupted 740 if (getAndSetParkPermit(false) || interrupted) 741 return; 742 743 // park the thread 744 boolean yielded = false; 745 setState(PARKING); 746 try { 747 yielded = yieldContinuation(); 748 } catch (OutOfMemoryError e) { 749 // park on carrier 750 } finally { 751 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 752 if (!yielded) { 753 assert state() == PARKING; 754 setState(RUNNING); 755 } 756 } 757 758 // park on the carrier thread when pinned 759 if (!yielded) { 760 parkOnCarrierThread(false, 0); 761 } 762 } 763 764 /** 765 * Parks up to the given waiting time or until unparked or interrupted. 766 * If already unparked then the parking permit is consumed and this method 767 * completes immediately (meaning it doesn't yield). It also completes immediately 768 * if the interrupt status is set or the waiting time is {@code <= 0}. 769 * 770 * @param nanos the maximum number of nanoseconds to wait. 771 */ 772 @Override 773 void parkNanos(long nanos) { 774 assert Thread.currentThread() == this; 775 776 // complete immediately if parking permit available or interrupted 777 if (getAndSetParkPermit(false) || interrupted) 778 return; 779 780 // park the thread for the waiting time 781 if (nanos > 0) { 782 long startTime = System.nanoTime(); 783 784 // park the thread, afterYield will schedule the thread to unpark 785 boolean yielded = false; 786 timeout = nanos; 787 setState(TIMED_PARKING); 788 try { 789 yielded = yieldContinuation(); 790 } catch (OutOfMemoryError e) { 791 // park on carrier 792 } finally { 793 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 794 if (!yielded) { 795 assert state() == TIMED_PARKING; 796 setState(RUNNING); 797 } 798 } 799 800 // park on carrier thread for remaining time when pinned (or OOME) 801 if (!yielded) { 802 long remainingNanos = nanos - (System.nanoTime() - startTime); 803 parkOnCarrierThread(true, remainingNanos); 804 } 805 } 806 } 807 808 /** 809 * Parks the current carrier thread up to the given waiting time or until 810 * unparked or interrupted. If the virtual thread is interrupted then the 811 * interrupt status will be propagated to the carrier thread. 812 * @param timed true for a timed park, false for untimed 813 * @param nanos the waiting time in nanoseconds 814 */ 815 private void parkOnCarrierThread(boolean timed, long nanos) { 816 assert state() == RUNNING; 817 818 setState(timed ? TIMED_PINNED : PINNED); 819 try { 820 if (!parkPermit) { 821 if (!timed) { 822 U.park(false, 0); 823 } else if (nanos > 0) { 824 U.park(false, nanos); 825 } 826 } 827 } finally { 828 setState(RUNNING); 829 } 830 831 // consume parking permit 832 setParkPermit(false); 833 834 // JFR jdk.VirtualThreadPinned event 835 postPinnedEvent("LockSupport.park"); 836 } 837 838 /** 839 * Call into VM when pinned to record a JFR jdk.VirtualThreadPinned event. 840 * Recording the event in the VM avoids having JFR event recorded in Java 841 * with the same name, but different ID, to events recorded by the VM. 842 */ 843 @Hidden 844 private static native void postPinnedEvent(String op); 845 846 /** 847 * Re-enables this virtual thread for scheduling. If this virtual thread is parked 848 * then its task is scheduled to continue, otherwise its next call to {@code park} or 849 * {@linkplain #parkNanos(long) parkNanos} is guaranteed not to block. 850 * @throws RejectedExecutionException if the scheduler cannot accept a task 851 */ 852 @Override 853 void unpark() { 854 if (!getAndSetParkPermit(true) && currentThread() != this) { 855 int s = state(); 856 857 // unparked while parked 858 if ((s == PARKED || s == TIMED_PARKED) && compareAndSetState(s, UNPARKED)) { 859 submitRunContinuation(); 860 return; 861 } 862 863 // unparked while parked when pinned 864 if (s == PINNED || s == TIMED_PINNED) { 865 // unpark carrier thread when pinned 866 disableSuspendAndPreempt(); 867 try { 868 synchronized (carrierThreadAccessLock()) { 869 Thread carrier = carrierThread; 870 if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) { 871 U.unpark(carrier); 872 } 873 } 874 } finally { 875 enableSuspendAndPreempt(); 876 } 877 return; 878 } 879 } 880 } 881 882 /** 883 * Invoked by unblocker thread to unblock this virtual thread. 884 */ 885 private void unblock() { 886 assert !Thread.currentThread().isVirtual(); 887 blockPermit = true; 888 if (state() == BLOCKED && compareAndSetState(BLOCKED, UNBLOCKED)) { 889 submitRunContinuation(); 890 } 891 } 892 893 /** 894 * Invoked by timer thread when wait timeout for virtual thread has expired. 895 * If the virtual thread is in timed-wait then this method will unblock the thread 896 * and submit its task so that it continues and attempts to reenter the monitor. 897 * This method does nothing if the thread has been woken by notify or interrupt. 898 */ 899 private void waitTimeoutExpired(byte seqNo) { 900 assert !Thread.currentThread().isVirtual(); 901 for (;;) { 902 boolean unblocked = false; 903 synchronized (timedWaitLock()) { 904 if (seqNo != timedWaitSeqNo) { 905 // this timeout task is for a past timed-wait 906 return; 907 } 908 int s = state(); 909 if (s == TIMED_WAIT) { 910 unblocked = compareAndSetState(TIMED_WAIT, UNBLOCKED); 911 } else if (s != (TIMED_WAIT | SUSPENDED)) { 912 // notified or interrupted, no longer waiting 913 return; 914 } 915 } 916 if (unblocked) { 917 submitRunContinuation(); 918 return; 919 } 920 // need to retry when thread is suspended in time-wait 921 Thread.yield(); 922 } 923 } 924 925 /** 926 * Attempts to yield the current virtual thread (Thread.yield). 927 */ 928 void tryYield() { 929 assert Thread.currentThread() == this; 930 setState(YIELDING); 931 boolean yielded = false; 932 try { 933 yielded = yieldContinuation(); // may throw 934 } finally { 935 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING)); 936 if (!yielded) { 937 assert state() == YIELDING; 938 setState(RUNNING); 939 } 940 } 941 } 942 943 /** 944 * Sleep the current thread for the given sleep time (in nanoseconds). If 945 * nanos is 0 then the thread will attempt to yield. 946 * 947 * @implNote This implementation parks the thread for the given sleeping time 948 * and will therefore be observed in PARKED state during the sleep. Parking 949 * will consume the parking permit so this method makes available the parking 950 * permit after the sleep. This may be observed as a spurious, but benign, 951 * wakeup when the thread subsequently attempts to park. 952 * 953 * @param nanos the maximum number of nanoseconds to sleep 954 * @throws InterruptedException if interrupted while sleeping 955 */ 956 void sleepNanos(long nanos) throws InterruptedException { 957 assert Thread.currentThread() == this && nanos >= 0; 958 if (getAndClearInterrupt()) 959 throw new InterruptedException(); 960 if (nanos == 0) { 961 tryYield(); 962 } else { 963 // park for the sleep time 964 try { 965 long remainingNanos = nanos; 966 long startNanos = System.nanoTime(); 967 while (remainingNanos > 0) { 968 parkNanos(remainingNanos); 969 if (getAndClearInterrupt()) { 970 throw new InterruptedException(); 971 } 972 remainingNanos = nanos - (System.nanoTime() - startNanos); 973 } 974 } finally { 975 // may have been unparked while sleeping 976 setParkPermit(true); 977 } 978 } 979 } 980 981 /** 982 * Waits up to {@code nanos} nanoseconds for this virtual thread to terminate. 983 * A timeout of {@code 0} means to wait forever. 984 * 985 * @throws InterruptedException if interrupted while waiting 986 * @return true if the thread has terminated 987 */ 988 boolean joinNanos(long nanos) throws InterruptedException { 989 if (state() == TERMINATED) 990 return true; 991 992 // ensure termination object exists, then re-check state 993 CountDownLatch termination = getTermination(); 994 if (state() == TERMINATED) 995 return true; 996 997 // wait for virtual thread to terminate 998 if (nanos == 0) { 999 termination.await(); 1000 } else { 1001 boolean terminated = termination.await(nanos, NANOSECONDS); 1002 if (!terminated) { 1003 // waiting time elapsed 1004 return false; 1005 } 1006 } 1007 assert state() == TERMINATED; 1008 return true; 1009 } 1010 1011 @Override 1012 void blockedOn(Interruptible b) { 1013 disableSuspendAndPreempt(); 1014 try { 1015 super.blockedOn(b); 1016 } finally { 1017 enableSuspendAndPreempt(); 1018 } 1019 } 1020 1021 @Override 1022 public void interrupt() { 1023 if (Thread.currentThread() != this) { 1024 // if current thread is a virtual thread then prevent it from being 1025 // suspended or unmounted when entering or holding interruptLock 1026 Interruptible blocker; 1027 disableSuspendAndPreempt(); 1028 try { 1029 synchronized (interruptLock) { 1030 interrupted = true; 1031 blocker = nioBlocker(); 1032 if (blocker != null) { 1033 blocker.interrupt(this); 1034 } 1035 1036 // interrupt carrier thread if mounted 1037 Thread carrier = carrierThread; 1038 if (carrier != null) carrier.setInterrupt(); 1039 } 1040 } finally { 1041 enableSuspendAndPreempt(); 1042 } 1043 1044 // notify blocker after releasing interruptLock 1045 if (blocker != null) { 1046 blocker.postInterrupt(); 1047 } 1048 1049 // make available parking permit, unpark thread if parked 1050 unpark(); 1051 1052 // if thread is waiting in Object.wait then schedule to try to reenter 1053 int s = state(); 1054 if ((s == WAIT || s == TIMED_WAIT) && compareAndSetState(s, UNBLOCKED)) { 1055 submitRunContinuation(); 1056 } 1057 1058 } else { 1059 interrupted = true; 1060 carrierThread.setInterrupt(); 1061 setParkPermit(true); 1062 } 1063 } 1064 1065 @Override 1066 public boolean isInterrupted() { 1067 return interrupted; 1068 } 1069 1070 @Override 1071 boolean getAndClearInterrupt() { 1072 assert Thread.currentThread() == this; 1073 boolean oldValue = interrupted; 1074 if (oldValue) { 1075 disableSuspendAndPreempt(); 1076 try { 1077 synchronized (interruptLock) { 1078 interrupted = false; 1079 carrierThread.clearInterrupt(); 1080 } 1081 } finally { 1082 enableSuspendAndPreempt(); 1083 } 1084 } 1085 return oldValue; 1086 } 1087 1088 @Override 1089 Thread.State threadState() { 1090 int s = state(); 1091 switch (s & ~SUSPENDED) { 1092 case NEW: 1093 return Thread.State.NEW; 1094 case STARTED: 1095 // return NEW if thread container not yet set 1096 if (threadContainer() == null) { 1097 return Thread.State.NEW; 1098 } else { 1099 return Thread.State.RUNNABLE; 1100 } 1101 case UNPARKED: 1102 case UNBLOCKED: 1103 case YIELDED: 1104 // runnable, not mounted 1105 return Thread.State.RUNNABLE; 1106 case RUNNING: 1107 // if mounted then return state of carrier thread 1108 if (Thread.currentThread() != this) { 1109 disableSuspendAndPreempt(); 1110 try { 1111 synchronized (carrierThreadAccessLock()) { 1112 Thread carrierThread = this.carrierThread; 1113 if (carrierThread != null) { 1114 return carrierThread.threadState(); 1115 } 1116 } 1117 } finally { 1118 enableSuspendAndPreempt(); 1119 } 1120 } 1121 // runnable, mounted 1122 return Thread.State.RUNNABLE; 1123 case PARKING: 1124 case TIMED_PARKING: 1125 case WAITING: 1126 case TIMED_WAITING: 1127 case YIELDING: 1128 // runnable, in transition 1129 return Thread.State.RUNNABLE; 1130 case PARKED: 1131 case PINNED: 1132 case WAIT: 1133 return Thread.State.WAITING; 1134 case TIMED_PARKED: 1135 case TIMED_PINNED: 1136 case TIMED_WAIT: 1137 return Thread.State.TIMED_WAITING; 1138 case BLOCKING: 1139 case BLOCKED: 1140 return Thread.State.BLOCKED; 1141 case TERMINATED: 1142 return Thread.State.TERMINATED; 1143 default: 1144 throw new InternalError(); 1145 } 1146 } 1147 1148 @Override 1149 boolean alive() { 1150 int s = state; 1151 return (s != NEW && s != TERMINATED); 1152 } 1153 1154 @Override 1155 boolean isTerminated() { 1156 return (state == TERMINATED); 1157 } 1158 1159 @Override 1160 StackTraceElement[] asyncGetStackTrace() { 1161 StackTraceElement[] stackTrace; 1162 do { 1163 stackTrace = (carrierThread != null) 1164 ? super.asyncGetStackTrace() // mounted 1165 : tryGetStackTrace(); // unmounted 1166 if (stackTrace == null) { 1167 Thread.yield(); 1168 } 1169 } while (stackTrace == null); 1170 return stackTrace; 1171 } 1172 1173 /** 1174 * Returns the stack trace for this virtual thread if it is unmounted. 1175 * Returns null if the thread is mounted or in transition. 1176 */ 1177 private StackTraceElement[] tryGetStackTrace() { 1178 int initialState = state() & ~SUSPENDED; 1179 switch (initialState) { 1180 case NEW, STARTED, TERMINATED -> { 1181 return new StackTraceElement[0]; // unmounted, empty stack 1182 } 1183 case RUNNING, PINNED, TIMED_PINNED -> { 1184 return null; // mounted 1185 } 1186 case PARKED, TIMED_PARKED, BLOCKED, WAIT, TIMED_WAIT -> { 1187 // unmounted, not runnable 1188 } 1189 case UNPARKED, UNBLOCKED, YIELDED -> { 1190 // unmounted, runnable 1191 } 1192 case PARKING, TIMED_PARKING, BLOCKING, YIELDING, WAITING, TIMED_WAITING -> { 1193 return null; // in transition 1194 } 1195 default -> throw new InternalError("" + initialState); 1196 } 1197 1198 // thread is unmounted, prevent it from continuing 1199 int suspendedState = initialState | SUSPENDED; 1200 if (!compareAndSetState(initialState, suspendedState)) { 1201 return null; 1202 } 1203 1204 // get stack trace and restore state 1205 StackTraceElement[] stack; 1206 try { 1207 stack = cont.getStackTrace(); 1208 } finally { 1209 assert state == suspendedState; 1210 setState(initialState); 1211 } 1212 boolean resubmit = switch (initialState) { 1213 case UNPARKED, UNBLOCKED, YIELDED -> { 1214 // resubmit as task may have run while suspended 1215 yield true; 1216 } 1217 case PARKED, TIMED_PARKED -> { 1218 // resubmit if unparked while suspended 1219 yield parkPermit && compareAndSetState(initialState, UNPARKED); 1220 } 1221 case BLOCKED -> { 1222 // resubmit if unblocked while suspended 1223 yield blockPermit && compareAndSetState(BLOCKED, UNBLOCKED); 1224 } 1225 case WAIT, TIMED_WAIT -> { 1226 // resubmit if notified or interrupted while waiting (Object.wait) 1227 // waitTimeoutExpired will retry if the timed expired when suspended 1228 yield (notified || interrupted) && compareAndSetState(initialState, UNBLOCKED); 1229 } 1230 default -> throw new InternalError(); 1231 }; 1232 if (resubmit) { 1233 submitRunContinuation(); 1234 } 1235 return stack; 1236 } 1237 1238 @Override 1239 public String toString() { 1240 StringBuilder sb = new StringBuilder("VirtualThread[#"); 1241 sb.append(threadId()); 1242 String name = getName(); 1243 if (!name.isEmpty()) { 1244 sb.append(","); 1245 sb.append(name); 1246 } 1247 sb.append("]/"); 1248 1249 // add the carrier state and thread name when mounted 1250 boolean mounted; 1251 if (Thread.currentThread() == this) { 1252 mounted = appendCarrierInfo(sb); 1253 } else { 1254 disableSuspendAndPreempt(); 1255 try { 1256 synchronized (carrierThreadAccessLock()) { 1257 mounted = appendCarrierInfo(sb); 1258 } 1259 } finally { 1260 enableSuspendAndPreempt(); 1261 } 1262 } 1263 1264 // add virtual thread state when not mounted 1265 if (!mounted) { 1266 String stateAsString = threadState().toString(); 1267 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1268 } 1269 1270 return sb.toString(); 1271 } 1272 1273 /** 1274 * Appends the carrier state and thread name to the string buffer if mounted. 1275 * @return true if mounted, false if not mounted 1276 */ 1277 private boolean appendCarrierInfo(StringBuilder sb) { 1278 assert Thread.currentThread() == this || Thread.holdsLock(carrierThreadAccessLock()); 1279 Thread carrier = carrierThread; 1280 if (carrier != null) { 1281 String stateAsString = carrier.threadState().toString(); 1282 sb.append(stateAsString.toLowerCase(Locale.ROOT)); 1283 sb.append('@'); 1284 sb.append(carrier.getName()); 1285 return true; 1286 } else { 1287 return false; 1288 } 1289 } 1290 1291 @Override 1292 public int hashCode() { 1293 return (int) threadId(); 1294 } 1295 1296 @Override 1297 public boolean equals(Object obj) { 1298 return obj == this; 1299 } 1300 1301 /** 1302 * Returns the termination object, creating it if needed. 1303 */ 1304 private CountDownLatch getTermination() { 1305 CountDownLatch termination = this.termination; 1306 if (termination == null) { 1307 termination = new CountDownLatch(1); 1308 if (!U.compareAndSetReference(this, TERMINATION, null, termination)) { 1309 termination = this.termination; 1310 } 1311 } 1312 return termination; 1313 } 1314 1315 /** 1316 * Returns the lock object to synchronize on when accessing carrierThread. 1317 * The lock prevents carrierThread from being reset to null during unmount. 1318 */ 1319 private Object carrierThreadAccessLock() { 1320 // return interruptLock as unmount has to coordinate with interrupt 1321 return interruptLock; 1322 } 1323 1324 /** 1325 * Returns a lock object for coordinating timed-wait setup and timeout handling. 1326 */ 1327 private Object timedWaitLock() { 1328 // use this object for now to avoid the overhead of introducing another lock 1329 return runContinuation; 1330 } 1331 1332 /** 1333 * Disallow the current thread be suspended or preempted. 1334 */ 1335 private void disableSuspendAndPreempt() { 1336 notifyJvmtiDisableSuspend(true); 1337 Continuation.pin(); 1338 } 1339 1340 /** 1341 * Allow the current thread be suspended or preempted. 1342 */ 1343 private void enableSuspendAndPreempt() { 1344 Continuation.unpin(); 1345 notifyJvmtiDisableSuspend(false); 1346 } 1347 1348 // -- wrappers for get/set of state, parking permit, and carrier thread -- 1349 1350 private int state() { 1351 return state; // volatile read 1352 } 1353 1354 private void setState(int newValue) { 1355 state = newValue; // volatile write 1356 } 1357 1358 private boolean compareAndSetState(int expectedValue, int newValue) { 1359 return U.compareAndSetInt(this, STATE, expectedValue, newValue); 1360 } 1361 1362 private boolean compareAndSetOnWaitingList(boolean expectedValue, boolean newValue) { 1363 return U.compareAndSetBoolean(this, ON_WAITING_LIST, expectedValue, newValue); 1364 } 1365 1366 private void setParkPermit(boolean newValue) { 1367 if (parkPermit != newValue) { 1368 parkPermit = newValue; 1369 } 1370 } 1371 1372 private boolean getAndSetParkPermit(boolean newValue) { 1373 if (parkPermit != newValue) { 1374 return U.getAndSetBoolean(this, PARK_PERMIT, newValue); 1375 } else { 1376 return newValue; 1377 } 1378 } 1379 1380 private void setCarrierThread(Thread carrier) { 1381 // U.putReferenceRelease(this, CARRIER_THREAD, carrier); 1382 this.carrierThread = carrier; 1383 } 1384 1385 // -- JVM TI support -- 1386 1387 @IntrinsicCandidate 1388 @JvmtiMountTransition 1389 private native void notifyJvmtiStart(); 1390 1391 @IntrinsicCandidate 1392 @JvmtiMountTransition 1393 private native void notifyJvmtiEnd(); 1394 1395 @IntrinsicCandidate 1396 @JvmtiMountTransition 1397 private native void notifyJvmtiMount(boolean hide); 1398 1399 @IntrinsicCandidate 1400 @JvmtiMountTransition 1401 private native void notifyJvmtiUnmount(boolean hide); 1402 1403 @IntrinsicCandidate 1404 private static native void notifyJvmtiDisableSuspend(boolean enter); 1405 1406 private static native void registerNatives(); 1407 static { 1408 registerNatives(); 1409 1410 // ensure VTHREAD_GROUP is created, may be accessed by JVMTI 1411 var group = Thread.virtualThreadGroup(); 1412 } 1413 1414 /** 1415 * Creates the default scheduler. 1416 * If the system property {@code jdk.virtualThreadScheduler.implClass} is set then 1417 * its value is the name of a class that implements java.util.concurrent.Executor. 1418 * The class is public in an exported package, has a public no-arg constructor, 1419 * and is visible to the system class loader. 1420 * If the system property is not set then the default scheduler will be a 1421 * ForkJoinPool instance. 1422 */ 1423 private static Executor createDefaultScheduler() { 1424 String propValue = System.getProperty("jdk.virtualThreadScheduler.implClass"); 1425 if (propValue != null) { 1426 try { 1427 Class<?> clazz = Class.forName(propValue, true, 1428 ClassLoader.getSystemClassLoader()); 1429 Constructor<?> ctor = clazz.getConstructor(); 1430 var scheduler = (Executor) ctor.newInstance(); 1431 System.err.println(""" 1432 WARNING: Using custom scheduler, this is an experimental feature."""); 1433 return scheduler; 1434 } catch (Exception ex) { 1435 throw new Error(ex); 1436 } 1437 } else { 1438 return createDefaultForkJoinPoolScheduler(); 1439 } 1440 } 1441 1442 /** 1443 * Creates the default ForkJoinPool scheduler. 1444 */ 1445 private static ForkJoinPool createDefaultForkJoinPoolScheduler() { 1446 ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool); 1447 int parallelism, maxPoolSize, minRunnable; 1448 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism"); 1449 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize"); 1450 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable"); 1451 if (parallelismValue != null) { 1452 parallelism = Integer.parseInt(parallelismValue); 1453 } else { 1454 parallelism = Runtime.getRuntime().availableProcessors(); 1455 } 1456 if (maxPoolSizeValue != null) { 1457 maxPoolSize = Integer.parseInt(maxPoolSizeValue); 1458 parallelism = Integer.min(parallelism, maxPoolSize); 1459 } else { 1460 maxPoolSize = Integer.max(parallelism, 256); 1461 } 1462 if (minRunnableValue != null) { 1463 minRunnable = Integer.parseInt(minRunnableValue); 1464 } else { 1465 minRunnable = Integer.max(parallelism / 2, 1); 1466 } 1467 Thread.UncaughtExceptionHandler handler = (t, e) -> { }; 1468 boolean asyncMode = true; // FIFO 1469 return new ForkJoinPool(parallelism, factory, handler, asyncMode, 1470 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS); 1471 } 1472 1473 /** 1474 * Schedule a runnable task to run after a delay. 1475 */ 1476 private static Future<?> schedule(Runnable command, long delay, TimeUnit unit) { 1477 long tid = Thread.currentThread().threadId(); 1478 int index = (int) tid & (DELAYED_TASK_SCHEDULERS.length - 1); 1479 return DELAYED_TASK_SCHEDULERS[index].schedule(command, delay, unit); 1480 } 1481 1482 /** 1483 * Creates the ScheduledThreadPoolExecutors used to execute delayed tasks. 1484 */ 1485 private static ScheduledExecutorService[] createDelayedTaskSchedulers() { 1486 String propName = "jdk.virtualThreadScheduler.timerQueues"; 1487 String propValue = System.getProperty(propName); 1488 int queueCount; 1489 if (propValue != null) { 1490 queueCount = Integer.parseInt(propValue); 1491 if (queueCount != Integer.highestOneBit(queueCount)) { 1492 throw new RuntimeException("Value of " + propName + " must be power of 2"); 1493 } 1494 } else { 1495 int ncpus = Runtime.getRuntime().availableProcessors(); 1496 queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1); 1497 } 1498 var schedulers = new ScheduledExecutorService[queueCount]; 1499 for (int i = 0; i < queueCount; i++) { 1500 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) 1501 Executors.newScheduledThreadPool(1, task -> { 1502 Thread t = InnocuousThread.newThread("VirtualThread-unparker", task); 1503 t.setDaemon(true); 1504 return t; 1505 }); 1506 stpe.setRemoveOnCancelPolicy(true); 1507 schedulers[i] = stpe; 1508 } 1509 return schedulers; 1510 } 1511 1512 /** 1513 * Schedule virtual threads that are ready to be scheduled after they blocked on 1514 * monitor enter. 1515 */ 1516 private static void unblockVirtualThreads() { 1517 while (true) { 1518 VirtualThread vthread = takeVirtualThreadListToUnblock(); 1519 while (vthread != null) { 1520 assert vthread.onWaitingList; 1521 VirtualThread nextThread = vthread.next; 1522 1523 // remove from list and unblock 1524 vthread.next = null; 1525 boolean changed = vthread.compareAndSetOnWaitingList(true, false); 1526 assert changed; 1527 vthread.unblock(); 1528 1529 vthread = nextThread; 1530 } 1531 } 1532 } 1533 1534 /** 1535 * Retrieves the list of virtual threads that are waiting to be unblocked, waiting 1536 * if necessary until a list of one or more threads becomes available. 1537 */ 1538 private static native VirtualThread takeVirtualThreadListToUnblock(); 1539 1540 static { 1541 var unblocker = InnocuousThread.newThread("VirtualThread-unblocker", 1542 VirtualThread::unblockVirtualThreads); 1543 unblocker.setDaemon(true); 1544 unblocker.start(); 1545 } 1546 }