7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package java.lang;
26
27 import java.util.Locale;
28 import java.util.Objects;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.Executor;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.ForkJoinPool;
33 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
34 import java.util.concurrent.ForkJoinTask;
35 import java.util.concurrent.Future;
36 import java.util.concurrent.RejectedExecutionException;
37 import java.util.concurrent.ScheduledExecutorService;
38 import java.util.concurrent.ScheduledThreadPoolExecutor;
39 import java.util.concurrent.TimeUnit;
40 import jdk.internal.event.VirtualThreadEndEvent;
41 import jdk.internal.event.VirtualThreadStartEvent;
42 import jdk.internal.event.VirtualThreadSubmitFailedEvent;
43 import jdk.internal.misc.CarrierThread;
44 import jdk.internal.misc.InnocuousThread;
45 import jdk.internal.misc.Unsafe;
46 import jdk.internal.vm.Continuation;
47 import jdk.internal.vm.ContinuationScope;
48 import jdk.internal.vm.StackableScope;
49 import jdk.internal.vm.ThreadContainer;
50 import jdk.internal.vm.ThreadContainers;
51 import jdk.internal.vm.annotation.ChangesCurrentThread;
52 import jdk.internal.vm.annotation.Hidden;
53 import jdk.internal.vm.annotation.IntrinsicCandidate;
54 import jdk.internal.vm.annotation.JvmtiHideEvents;
55 import jdk.internal.vm.annotation.JvmtiMountTransition;
56 import jdk.internal.vm.annotation.ReservedStackAccess;
57 import sun.nio.ch.Interruptible;
58 import static java.util.concurrent.TimeUnit.*;
59
60 /**
61 * A thread that is scheduled by the Java virtual machine rather than the operating system.
62 */
63 final class VirtualThread extends BaseVirtualThread {
64 private static final Unsafe U = Unsafe.getUnsafe();
65 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
66 private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
67
68 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
69 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
70 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
71 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
72 private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
73
74 // scheduler and continuation
75 private final Executor scheduler;
76 private final Continuation cont;
77 private final Runnable runContinuation;
78
79 // virtual thread state, accessed by VM
80 private volatile int state;
81
82 /*
83 * Virtual thread state transitions:
84 *
85 * NEW -> STARTED // Thread.start, schedule to run
86 * STARTED -> TERMINATED // failed to start
87 * STARTED -> RUNNING // first run
88 * RUNNING -> TERMINATED // done
89 *
90 * RUNNING -> PARKING // Thread parking with LockSupport.park
91 * PARKING -> PARKED // cont.yield successful, parked indefinitely
92 * PARKED -> UNPARKED // unparked, may be scheduled to continue
93 * UNPARKED -> RUNNING // continue execution after park
94 *
95 * PARKING -> RUNNING // cont.yield failed, need to park on carrier
96 * RUNNING -> PINNED // park on carrier
97 * PINNED -> RUNNING // unparked, continue execution on same carrier
137 private static final int TIMED_PINNED = 8; // mounted
138 private static final int UNPARKED = 9; // unmounted but runnable
139
140 // Thread.yield
141 private static final int YIELDING = 10;
142 private static final int YIELDED = 11; // unmounted but runnable
143
144 // monitor enter
145 private static final int BLOCKING = 12;
146 private static final int BLOCKED = 13; // unmounted
147 private static final int UNBLOCKED = 14; // unmounted but runnable
148
149 // monitor wait/timed-wait
150 private static final int WAITING = 15;
151 private static final int WAIT = 16; // waiting in Object.wait
152 private static final int TIMED_WAITING = 17;
153 private static final int TIMED_WAIT = 18; // waiting in timed-Object.wait
154
155 private static final int TERMINATED = 99; // final state
156
157 // can be suspended from scheduling when unmounted
158 private static final int SUSPENDED = 1 << 8;
159
160 // parking permit made available by LockSupport.unpark
161 private volatile boolean parkPermit;
162
163 // blocking permit made available by unblocker thread when another thread exits monitor
164 private volatile boolean blockPermit;
165
166 // true when on the list of virtual threads waiting to be unblocked
167 private volatile boolean onWaitingList;
168
169 // next virtual thread on the list of virtual threads waiting to be unblocked
170 private volatile VirtualThread next;
171
172 // notified by Object.notify/notifyAll while waiting in Object.wait
173 private volatile boolean notified;
174
175 // true when waiting in Object.wait, false for VM internal uninterruptible Object.wait
176 private volatile boolean interruptibleWait;
177
178 // timed-wait support
179 private byte timedWaitSeqNo;
180
181 // timeout for timed-park and timed-wait, only accessed on current/carrier thread
182 private long timeout;
183
184 // timer task for timed-park and timed-wait, only accessed on current/carrier thread
185 private Future<?> timeoutTask;
186
187 // carrier thread when mounted, accessed by VM
188 private volatile Thread carrierThread;
189
190 // termination object when joining, created lazily if needed
191 private volatile CountDownLatch termination;
192
193 /**
194 * Returns the default scheduler.
195 */
196 static Executor defaultScheduler() {
197 return DEFAULT_SCHEDULER;
198 }
199
200 /**
201 * Returns the continuation scope used for virtual threads.
202 */
203 static ContinuationScope continuationScope() {
204 return VTHREAD_SCOPE;
205 }
206
207 /**
208 * Creates a new {@code VirtualThread} to run the given task with the given
209 * scheduler. If the given scheduler is {@code null} and the current thread
210 * is a platform thread then the newly created virtual thread will use the
211 * default scheduler. If given scheduler is {@code null} and the current
212 * thread is a virtual thread then the current thread's scheduler is used.
213 *
214 * @param scheduler the scheduler or null
215 * @param name thread name
216 * @param characteristics characteristics
217 * @param task the task to execute
218 */
219 VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
220 super(name, characteristics, /*bound*/ false);
221 Objects.requireNonNull(task);
222
223 // choose scheduler if not specified
224 if (scheduler == null) {
225 Thread parent = Thread.currentThread();
226 if (parent instanceof VirtualThread vparent) {
227 scheduler = vparent.scheduler;
228 } else {
229 scheduler = DEFAULT_SCHEDULER;
230 }
231 }
232
233 this.scheduler = scheduler;
234 this.cont = new VThreadContinuation(this, task);
235 this.runContinuation = this::runContinuation;
236 }
237
238 /**
239 * The continuation that a virtual thread executes.
240 */
241 private static class VThreadContinuation extends Continuation {
242 VThreadContinuation(VirtualThread vthread, Runnable task) {
243 super(VTHREAD_SCOPE, wrap(vthread, task));
244 }
245 @Override
246 protected void onPinned(Continuation.Pinned reason) {
247 }
248 private static Runnable wrap(VirtualThread vthread, Runnable task) {
249 return new Runnable() {
250 @Hidden
251 @JvmtiHideEvents
252 public void run() {
253 vthread.endFirstTransition();
254 try {
255 vthread.run(task);
303 if (cont.isDone()) {
304 afterDone();
305 } else {
306 afterYield();
307 }
308 }
309 }
310
311 /**
312 * Cancel timeout task when continuing after timed-park or timed-wait.
313 * The timeout task may be executing, or may have already completed.
314 */
315 private void cancelTimeoutTask() {
316 if (timeoutTask != null) {
317 timeoutTask.cancel(false);
318 timeoutTask = null;
319 }
320 }
321
322 /**
323 * Submits the given task to the given executor. If the scheduler is a
324 * ForkJoinPool then the task is first adapted to a ForkJoinTask.
325 */
326 private void submit(Executor executor, Runnable task) {
327 if (executor instanceof ForkJoinPool pool) {
328 pool.submit(ForkJoinTask.adapt(task));
329 } else {
330 executor.execute(task);
331 }
332 }
333
334 /**
335 * Submits the runContinuation task to the scheduler. For the default scheduler,
336 * and calling it on a worker thread, the task will be pushed to the local queue,
337 * otherwise it will be pushed to an external submission queue.
338 * @param scheduler the scheduler
339 * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown
340 * @throws RejectedExecutionException
341 */
342 private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) {
343 boolean done = false;
344 while (!done) {
345 try {
346 // Pin the continuation to prevent the virtual thread from unmounting
347 // when submitting a task. For the default scheduler this ensures that
348 // the carrier doesn't change when pushing a task. For other schedulers
349 // it avoids deadlock that could arise due to carriers and virtual
350 // threads contending for a lock.
351 if (currentThread().isVirtual()) {
352 Continuation.pin();
353 try {
354 submit(scheduler, runContinuation);
355 } finally {
356 Continuation.unpin();
357 }
358 } else {
359 submit(scheduler, runContinuation);
360 }
361 done = true;
362 } catch (RejectedExecutionException ree) {
363 submitFailed(ree);
364 throw ree;
365 } catch (OutOfMemoryError e) {
366 if (retryOnOOME) {
367 U.park(false, 100_000_000); // 100ms
368 } else {
369 throw e;
370 }
371 }
372 }
373 }
374
375 /**
376 * Submits the runContinuation task to the given scheduler as an external submit.
377 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
378 * @throws RejectedExecutionException
379 * @see ForkJoinPool#externalSubmit(ForkJoinTask)
380 */
381 private void externalSubmitRunContinuation(ForkJoinPool pool) {
382 assert Thread.currentThread() instanceof CarrierThread;
383 try {
384 pool.externalSubmit(ForkJoinTask.adapt(runContinuation));
385 } catch (RejectedExecutionException ree) {
386 submitFailed(ree);
387 throw ree;
388 } catch (OutOfMemoryError e) {
389 submitRunContinuation(pool, true);
390 }
391 }
392
393 /**
394 * Submits the runContinuation task to the scheduler. For the default scheduler,
395 * and calling it on a worker thread, the task will be pushed to the local queue,
396 * otherwise it will be pushed to an external submission queue.
397 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
398 * @throws RejectedExecutionException
399 */
400 private void submitRunContinuation() {
401 submitRunContinuation(scheduler, true);
402 }
403
404 /**
405 * Lazy submit the runContinuation task if invoked on a carrier thread and its local
406 * queue is empty. If not empty, or invoked by another thread, then this method works
407 * like submitRunContinuation and just submits the task to the scheduler.
408 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
409 * @throws RejectedExecutionException
410 * @see ForkJoinPool#lazySubmit(ForkJoinTask)
411 */
412 private void lazySubmitRunContinuation() {
413 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
414 ForkJoinPool pool = ct.getPool();
415 try {
416 pool.lazySubmit(ForkJoinTask.adapt(runContinuation));
417 } catch (RejectedExecutionException ree) {
418 submitFailed(ree);
419 throw ree;
420 } catch (OutOfMemoryError e) {
421 submitRunContinuation();
422 }
423 } else {
424 submitRunContinuation();
425 }
426 }
427
428 /**
429 * Submits the runContinuation task to the scheduler. For the default scheduler, and
430 * calling it a virtual thread that uses the default scheduler, the task will be
431 * pushed to an external submission queue. This method may throw OutOfMemoryError.
432 * @throws RejectedExecutionException
433 * @throws OutOfMemoryError
434 */
435 private void externalSubmitRunContinuationOrThrow() {
436 if (scheduler == DEFAULT_SCHEDULER && currentCarrierThread() instanceof CarrierThread ct) {
437 try {
438 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation));
439 } catch (RejectedExecutionException ree) {
440 submitFailed(ree);
441 throw ree;
442 }
443 } else {
444 submitRunContinuation(scheduler, false);
445 }
446 }
447
448 /**
449 * If enabled, emits a JFR VirtualThreadSubmitFailedEvent.
450 */
451 private void submitFailed(RejectedExecutionException ree) {
452 var event = new VirtualThreadSubmitFailedEvent();
453 if (event.isEnabled()) {
454 event.javaThreadId = threadId();
455 event.exceptionMessage = ree.getMessage();
456 event.commit();
457 }
458 }
459
460 /**
461 * Runs a task in the context of this virtual thread.
462 */
463 private void run(Runnable task) {
464 assert Thread.currentThread() == this && state == RUNNING;
478 } finally {
479 // pop any remaining scopes from the stack, this may block
480 StackableScope.popAll();
481
482 // emit JFR event if enabled
483 if (VirtualThreadEndEvent.isTurnedOn()) {
484 var event = new VirtualThreadEndEvent();
485 event.javaThreadId = threadId();
486 event.commit();
487 }
488 }
489 }
490
491 /**
492 * Mounts this virtual thread onto the current platform thread. On
493 * return, the current thread is the virtual thread.
494 */
495 @ChangesCurrentThread
496 @ReservedStackAccess
497 private void mount() {
498 startTransition(/*is_mount*/true);
499 // We assume following volatile accesses provide equivalent
500 // of acquire ordering, otherwise we need U.loadFence() here.
501
502 // sets the carrier thread
503 Thread carrier = Thread.currentCarrierThread();
504 setCarrierThread(carrier);
505
506 // sync up carrier thread interrupted status if needed
507 if (interrupted) {
508 carrier.setInterrupt();
509 } else if (carrier.isInterrupted()) {
510 synchronized (interruptLock) {
511 // need to recheck interrupted status
512 if (!interrupted) {
513 carrier.clearInterrupt();
514 }
515 }
516 }
517
518 // set Thread.currentThread() to return this virtual thread
523 * Unmounts this virtual thread from the carrier. On return, the
524 * current thread is the current platform thread.
525 */
526 @ChangesCurrentThread
527 @ReservedStackAccess
528 private void unmount() {
529 assert !Thread.holdsLock(interruptLock);
530
531 // set Thread.currentThread() to return the platform thread
532 Thread carrier = this.carrierThread;
533 carrier.setCurrentThread(carrier);
534
535 // break connection to carrier thread, synchronized with interrupt
536 synchronized (interruptLock) {
537 setCarrierThread(null);
538 }
539 carrier.clearInterrupt();
540
541 // We assume previous volatile accesses provide equivalent
542 // of release ordering, otherwise we need U.storeFence() here.
543 endTransition(/*is_mount*/false);
544 }
545
546 /**
547 * Invokes Continuation.yield, notifying JVMTI (if enabled) to hide frames until
548 * the continuation continues.
549 */
550 @Hidden
551 private boolean yieldContinuation() {
552 startTransition(/*is_mount*/false);
553 try {
554 return Continuation.yield(VTHREAD_SCOPE);
555 } finally {
556 endTransition(/*is_mount*/true);
557 }
558 }
559
560 /**
561 * Invoked in the context of the carrier thread after the Continuation yields when
562 * parking, blocking on monitor enter, Object.wait, or Thread.yield.
563 */
564 private void afterYield() {
565 assert carrierThread == null;
566
567 // re-adjust parallelism if the virtual thread yielded when compensating
568 if (currentThread() instanceof CarrierThread ct) {
569 ct.endBlocking();
570 }
571
572 int s = state();
573
574 // LockSupport.park/parkNanos
575 if (s == PARKING || s == TIMED_PARKING) {
576 int newState;
581 long timeout = this.timeout;
582 assert timeout > 0;
583 timeoutTask = schedule(this::parkTimeoutExpired, timeout, NANOSECONDS);
584 setState(newState = TIMED_PARKED);
585 }
586
587 // may have been unparked while parking
588 if (parkPermit && compareAndSetState(newState, UNPARKED)) {
589 // lazy submit if local queue is empty
590 lazySubmitRunContinuation();
591 }
592 return;
593 }
594
595 // Thread.yield
596 if (s == YIELDING) {
597 setState(YIELDED);
598
599 // external submit if there are no tasks in the local task queue
600 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
601 externalSubmitRunContinuation(ct.getPool());
602 } else {
603 submitRunContinuation();
604 }
605 return;
606 }
607
608 // blocking on monitorenter
609 if (s == BLOCKING) {
610 setState(BLOCKED);
611
612 // may have been unblocked while blocking
613 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
614 // lazy submit if local queue is empty
615 lazySubmitRunContinuation();
616 }
617 return;
618 }
619
620 // Object.wait
621 if (s == WAITING || s == TIMED_WAITING) {
741 @Override
742 public void run() {
743 // do nothing
744 }
745
746 /**
747 * Parks until unparked or interrupted. If already unparked then the parking
748 * permit is consumed and this method completes immediately (meaning it doesn't
749 * yield). It also completes immediately if the interrupted status is set.
750 */
751 @Override
752 void park() {
753 assert Thread.currentThread() == this;
754
755 // complete immediately if parking permit available or interrupted
756 if (getAndSetParkPermit(false) || interrupted)
757 return;
758
759 // park the thread
760 boolean yielded = false;
761 setState(PARKING);
762 try {
763 yielded = yieldContinuation();
764 } catch (OutOfMemoryError e) {
765 // park on carrier
766 } finally {
767 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
768 if (!yielded) {
769 assert state() == PARKING;
770 setState(RUNNING);
771 }
772 }
773
774 // park on the carrier thread when pinned
775 if (!yielded) {
776 parkOnCarrierThread(false, 0);
777 }
778 }
779
780 /**
781 * Parks up to the given waiting time or until unparked or interrupted.
782 * If already unparked then the parking permit is consumed and this method
783 * completes immediately (meaning it doesn't yield). It also completes immediately
784 * if the interrupted status is set or the waiting time is {@code <= 0}.
785 *
786 * @param nanos the maximum number of nanoseconds to wait.
787 */
788 @Override
789 void parkNanos(long nanos) {
790 assert Thread.currentThread() == this;
791
792 // complete immediately if parking permit available or interrupted
793 if (getAndSetParkPermit(false) || interrupted)
794 return;
795
796 // park the thread for the waiting time
797 if (nanos > 0) {
798 long startTime = System.nanoTime();
799
800 // park the thread, afterYield will schedule the thread to unpark
801 boolean yielded = false;
802 timeout = nanos;
803 setState(TIMED_PARKING);
804 try {
805 yielded = yieldContinuation();
806 } catch (OutOfMemoryError e) {
807 // park on carrier
808 } finally {
809 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
810 if (!yielded) {
811 assert state() == TIMED_PARKING;
812 setState(RUNNING);
813 }
814 }
815
816 // park on carrier thread for remaining time when pinned (or OOME)
817 if (!yielded) {
818 long remainingNanos = nanos - (System.nanoTime() - startTime);
819 parkOnCarrierThread(true, remainingNanos);
820 }
821 }
822 }
823
824 /**
825 * Parks the current carrier thread up to the given waiting time or until
826 * unparked or interrupted. If the virtual thread is interrupted then the
827 * interrupted status will be propagated to the carrier thread.
828 * @param timed true for a timed park, false for untimed
829 * @param nanos the waiting time in nanoseconds
830 */
914 submitRunContinuation();
915 }
916 }
917
918 /**
919 * Invoked by FJP worker thread or STPE thread when park timeout expires.
920 */
921 private void parkTimeoutExpired() {
922 assert !VirtualThread.currentThread().isVirtual();
923 unpark(true);
924 }
925
926 /**
927 * Invoked by FJP worker thread or STPE thread when wait timeout expires.
928 * If the virtual thread is in timed-wait then this method will unblock the thread
929 * and submit its task so that it continues and attempts to reenter the monitor.
930 * This method does nothing if the thread has been woken by notify or interrupt.
931 */
932 private void waitTimeoutExpired(byte seqNo) {
933 assert !Thread.currentThread().isVirtual();
934 for (;;) {
935 boolean unblocked = false;
936 synchronized (timedWaitLock()) {
937 if (seqNo != timedWaitSeqNo) {
938 // this timeout task is for a past timed-wait
939 return;
940 }
941 int s = state();
942 if (s == TIMED_WAIT) {
943 unblocked = compareAndSetState(TIMED_WAIT, UNBLOCKED);
944 } else if (s != (TIMED_WAIT | SUSPENDED)) {
945 // notified or interrupted, no longer waiting
946 return;
947 }
948 }
949 if (unblocked) {
950 lazySubmitRunContinuation();
951 return;
952 }
953 // need to retry when thread is suspended in time-wait
954 Thread.yield();
955 }
956 }
957
958 /**
959 * Attempts to yield the current virtual thread (Thread.yield).
960 */
961 void tryYield() {
962 assert Thread.currentThread() == this;
963 setState(YIELDING);
964 boolean yielded = false;
965 try {
966 yielded = yieldContinuation(); // may throw
967 } finally {
968 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
969 if (!yielded) {
970 assert state() == YIELDING;
971 setState(RUNNING);
972 }
973 }
974 }
975
1103 @Override
1104 boolean getAndClearInterrupt() {
1105 assert Thread.currentThread() == this;
1106 boolean oldValue = interrupted;
1107 if (oldValue) {
1108 disableSuspendAndPreempt();
1109 try {
1110 synchronized (interruptLock) {
1111 interrupted = false;
1112 carrierThread.clearInterrupt();
1113 }
1114 } finally {
1115 enableSuspendAndPreempt();
1116 }
1117 }
1118 return oldValue;
1119 }
1120
1121 @Override
1122 Thread.State threadState() {
1123 int s = state();
1124 switch (s & ~SUSPENDED) {
1125 case NEW:
1126 return Thread.State.NEW;
1127 case STARTED:
1128 // return NEW if thread container not yet set
1129 if (threadContainer() == null) {
1130 return Thread.State.NEW;
1131 } else {
1132 return Thread.State.RUNNABLE;
1133 }
1134 case UNPARKED:
1135 case UNBLOCKED:
1136 case YIELDED:
1137 // runnable, not mounted
1138 return Thread.State.RUNNABLE;
1139 case RUNNING:
1140 // if mounted then return state of carrier thread
1141 if (Thread.currentThread() != this) {
1142 disableSuspendAndPreempt();
1143 try {
1144 synchronized (carrierThreadAccessLock()) {
1172 case BLOCKED:
1173 return Thread.State.BLOCKED;
1174 case TERMINATED:
1175 return Thread.State.TERMINATED;
1176 default:
1177 throw new InternalError();
1178 }
1179 }
1180
1181 @Override
1182 boolean alive() {
1183 int s = state;
1184 return (s != NEW && s != TERMINATED);
1185 }
1186
1187 @Override
1188 boolean isTerminated() {
1189 return (state == TERMINATED);
1190 }
1191
1192 @Override
1193 StackTraceElement[] asyncGetStackTrace() {
1194 StackTraceElement[] stackTrace;
1195 do {
1196 stackTrace = (carrierThread != null)
1197 ? super.asyncGetStackTrace() // mounted
1198 : tryGetStackTrace(); // unmounted
1199 if (stackTrace == null) {
1200 Thread.yield();
1201 }
1202 } while (stackTrace == null);
1203 return stackTrace;
1204 }
1205
1206 /**
1207 * Returns the stack trace for this virtual thread if it is unmounted.
1208 * Returns null if the thread is mounted or in transition.
1209 */
1210 private StackTraceElement[] tryGetStackTrace() {
1211 int initialState = state() & ~SUSPENDED;
1212 switch (initialState) {
1213 case NEW, STARTED, TERMINATED -> {
1214 return new StackTraceElement[0]; // unmounted, empty stack
1215 }
1216 case RUNNING, PINNED, TIMED_PINNED -> {
1217 return null; // mounted
1218 }
1219 case PARKED, TIMED_PARKED, BLOCKED, WAIT, TIMED_WAIT -> {
1220 // unmounted, not runnable
1221 }
1222 case UNPARKED, UNBLOCKED, YIELDED -> {
1223 // unmounted, runnable
1224 }
1225 case PARKING, TIMED_PARKING, BLOCKING, YIELDING, WAITING, TIMED_WAITING -> {
1226 return null; // in transition
1227 }
1228 default -> throw new InternalError("" + initialState);
1229 }
1230
1231 // thread is unmounted, prevent it from continuing
1232 int suspendedState = initialState | SUSPENDED;
1233 if (!compareAndSetState(initialState, suspendedState)) {
1234 return null;
1235 }
1236
1237 // get stack trace and restore state
1238 StackTraceElement[] stack;
1239 try {
1240 stack = cont.getStackTrace();
1241 } finally {
1242 assert state == suspendedState;
1243 setState(initialState);
1244 }
1245 boolean resubmit = switch (initialState) {
1246 case UNPARKED, UNBLOCKED, YIELDED -> {
1247 // resubmit as task may have run while suspended
1248 yield true;
1249 }
1250 case PARKED, TIMED_PARKED -> {
1251 // resubmit if unparked while suspended
1252 yield parkPermit && compareAndSetState(initialState, UNPARKED);
1253 }
1254 case BLOCKED -> {
1255 // resubmit if unblocked while suspended
1256 yield blockPermit && compareAndSetState(BLOCKED, UNBLOCKED);
1257 }
1258 case WAIT, TIMED_WAIT -> {
1259 // resubmit if notified or interrupted while waiting (Object.wait)
1260 // waitTimeoutExpired will retry if the timed expired when suspended
1261 yield (notified || interrupted) && compareAndSetState(initialState, UNBLOCKED);
1262 }
1263 default -> throw new InternalError();
1264 };
1265 if (resubmit) {
1266 submitRunContinuation();
1267 }
1268 return stack;
1269 }
1270
1271 @Override
1272 public String toString() {
1273 StringBuilder sb = new StringBuilder("VirtualThread[#");
1274 sb.append(threadId());
1275 String name = getName();
1276 if (!name.isEmpty()) {
1277 sb.append(",");
1278 sb.append(name);
1279 }
1280 sb.append("]/");
1281
1282 // add the carrier state and thread name when mounted
1283 boolean mounted;
1284 if (Thread.currentThread() == this) {
1285 mounted = appendCarrierInfo(sb);
1286 } else {
1287 disableSuspendAndPreempt();
1288 try {
1289 synchronized (carrierThreadAccessLock()) {
1290 mounted = appendCarrierInfo(sb);
1421 // resuming the virtual thread's continuation on the carrier.
1422 // An "unmount transition" embodies the steps to transfer control from a virtual
1423 // thread to its carrier, suspending the virtual thread's continuation, and
1424 // restoring the thread identity to the platform thread.
1425 // The notifications to the VM are necessary in order to coordinate with functions
1426 // (JVMTI mostly) that disable transitions for one or all virtual threads. Starting
1427 // a transition may block if transitions are disabled. Ending a transition may
1428 // notify a thread that is waiting to disable transitions. The notifications are
1429 // also used to post JVMTI events for virtual thread start and end.
1430
1431 @IntrinsicCandidate
1432 @JvmtiMountTransition
1433 private native void endFirstTransition();
1434
1435 @IntrinsicCandidate
1436 @JvmtiMountTransition
1437 private native void startFinalTransition();
1438
1439 @IntrinsicCandidate
1440 @JvmtiMountTransition
1441 private native void startTransition(boolean is_mount);
1442
1443 @IntrinsicCandidate
1444 @JvmtiMountTransition
1445 private native void endTransition(boolean is_mount);
1446
1447 @IntrinsicCandidate
1448 private static native void notifyJvmtiDisableSuspend(boolean enter);
1449
1450 private static native void registerNatives();
1451 static {
1452 registerNatives();
1453
1454 // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
1455 var group = Thread.virtualThreadGroup();
1456 }
1457
1458 /**
1459 * Creates the default ForkJoinPool scheduler.
1460 */
1461 private static ForkJoinPool createDefaultScheduler() {
1462 ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool);
1463 int parallelism, maxPoolSize, minRunnable;
1464 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
1465 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
1466 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
1467 if (parallelismValue != null) {
1468 parallelism = Integer.parseInt(parallelismValue);
1469 } else {
1470 parallelism = Runtime.getRuntime().availableProcessors();
1471 }
1472 if (maxPoolSizeValue != null) {
1473 maxPoolSize = Integer.parseInt(maxPoolSizeValue);
1474 parallelism = Integer.min(parallelism, maxPoolSize);
1475 } else {
1476 maxPoolSize = Integer.max(parallelism, 256);
1477 }
1478 if (minRunnableValue != null) {
1479 minRunnable = Integer.parseInt(minRunnableValue);
1480 } else {
1481 minRunnable = Integer.max(parallelism / 2, 1);
1482 }
1483 Thread.UncaughtExceptionHandler handler = (t, e) -> { };
1484 boolean asyncMode = true; // FIFO
1485 return new ForkJoinPool(parallelism, factory, handler, asyncMode,
1486 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);
1487 }
1488
1489 /**
1490 * Schedule a runnable task to run after a delay.
1491 */
1492 private Future<?> schedule(Runnable command, long delay, TimeUnit unit) {
1493 if (scheduler instanceof ForkJoinPool pool) {
1494 return pool.schedule(command, delay, unit);
1495 } else {
1496 return DelayedTaskSchedulers.schedule(command, delay, unit);
1497 }
1498 }
1499
1500 /**
1501 * Supports scheduling a runnable task to run after a delay. It uses a number
1502 * of ScheduledThreadPoolExecutor instances to reduce contention on the delayed
1503 * work queue used. This class is used when using a custom scheduler.
1504 */
1505 private static class DelayedTaskSchedulers {
1506 private static final ScheduledExecutorService[] INSTANCE = createDelayedTaskSchedulers();
1507
1508 static Future<?> schedule(Runnable command, long delay, TimeUnit unit) {
1509 long tid = Thread.currentThread().threadId();
1510 int index = (int) tid & (INSTANCE.length - 1);
1511 return INSTANCE[index].schedule(command, delay, unit);
1512 }
1513
1514 private static ScheduledExecutorService[] createDelayedTaskSchedulers() {
1515 String propName = "jdk.virtualThreadScheduler.timerQueues";
1516 String propValue = System.getProperty(propName);
1517 int queueCount;
1518 if (propValue != null) {
1519 queueCount = Integer.parseInt(propValue);
1520 if (queueCount != Integer.highestOneBit(queueCount)) {
1521 throw new RuntimeException("Value of " + propName + " must be power of 2");
1522 }
1523 } else {
1524 int ncpus = Runtime.getRuntime().availableProcessors();
1525 queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1);
|
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package java.lang;
26
27 import java.lang.invoke.MethodHandles;
28 import java.lang.invoke.VarHandle;
29 import java.lang.reflect.Constructor;
30 import java.util.Locale;
31 import java.util.Objects;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.Executors;
34 import java.util.concurrent.ForkJoinPool;
35 import java.util.concurrent.ForkJoinTask;
36 import java.util.concurrent.Future;
37 import java.util.concurrent.RejectedExecutionException;
38 import java.util.concurrent.ScheduledExecutorService;
39 import java.util.concurrent.ScheduledFuture;
40 import java.util.concurrent.ScheduledThreadPoolExecutor;
41 import java.util.concurrent.TimeUnit;
42 import jdk.internal.event.VirtualThreadEndEvent;
43 import jdk.internal.event.VirtualThreadParkEvent;
44 import jdk.internal.event.VirtualThreadStartEvent;
45 import jdk.internal.event.VirtualThreadSubmitFailedEvent;
46 import jdk.internal.invoke.MhUtil;
47 import jdk.internal.misc.CarrierThread;
48 import jdk.internal.misc.InnocuousThread;
49 import jdk.internal.misc.Unsafe;
50 import jdk.internal.vm.Continuation;
51 import jdk.internal.vm.ContinuationScope;
52 import jdk.internal.vm.StackableScope;
53 import jdk.internal.vm.ThreadContainer;
54 import jdk.internal.vm.ThreadContainers;
55 import jdk.internal.vm.annotation.ChangesCurrentThread;
56 import jdk.internal.vm.annotation.Hidden;
57 import jdk.internal.vm.annotation.IntrinsicCandidate;
58 import jdk.internal.vm.annotation.JvmtiHideEvents;
59 import jdk.internal.vm.annotation.JvmtiMountTransition;
60 import jdk.internal.vm.annotation.ReservedStackAccess;
61 import sun.nio.ch.Interruptible;
62 import static java.util.concurrent.TimeUnit.*;
63
64 /**
65 * A thread that is scheduled by the Java virtual machine rather than the operating system.
66 */
67 final class VirtualThread extends BaseVirtualThread {
68 private static final Unsafe U = Unsafe.getUnsafe();
69 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
70
71 private static final BuiltinScheduler BUILTIN_SCHEDULER;
72 private static final VirtualThreadScheduler DEFAULT_SCHEDULER;
73 private static final VirtualThreadScheduler EXTERNAL_VIEW;
74 static {
75 // experimental
76 String propValue = System.getProperty("jdk.virtualThreadScheduler.implClass");
77 if (propValue != null) {
78 BuiltinScheduler builtinScheduler = createBuiltinScheduler(true);
79 VirtualThreadScheduler externalView = builtinScheduler.createExternalView();
80 VirtualThreadScheduler defaultScheduler = loadCustomScheduler(externalView, propValue);
81 BUILTIN_SCHEDULER = builtinScheduler;
82 DEFAULT_SCHEDULER = defaultScheduler;
83 EXTERNAL_VIEW = externalView;
84 } else {
85 var builtinScheduler = createBuiltinScheduler(false);
86 BUILTIN_SCHEDULER = builtinScheduler;
87 DEFAULT_SCHEDULER = builtinScheduler;
88 EXTERNAL_VIEW = builtinScheduler.createExternalView();
89 }
90 }
91
92 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
93 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
94 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
95 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
96 private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
97
98 // scheduler and continuation
99 private final VirtualThreadScheduler scheduler;
100 private final Continuation cont;
101 private final VirtualThreadTask runContinuation;
102
103 // virtual thread state, accessed by VM
104 private volatile int state;
105
106 /*
107 * Virtual thread state transitions:
108 *
109 * NEW -> STARTED // Thread.start, schedule to run
110 * STARTED -> TERMINATED // failed to start
111 * STARTED -> RUNNING // first run
112 * RUNNING -> TERMINATED // done
113 *
114 * RUNNING -> PARKING // Thread parking with LockSupport.park
115 * PARKING -> PARKED // cont.yield successful, parked indefinitely
116 * PARKED -> UNPARKED // unparked, may be scheduled to continue
117 * UNPARKED -> RUNNING // continue execution after park
118 *
119 * PARKING -> RUNNING // cont.yield failed, need to park on carrier
120 * RUNNING -> PINNED // park on carrier
121 * PINNED -> RUNNING // unparked, continue execution on same carrier
161 private static final int TIMED_PINNED = 8; // mounted
162 private static final int UNPARKED = 9; // unmounted but runnable
163
164 // Thread.yield
165 private static final int YIELDING = 10;
166 private static final int YIELDED = 11; // unmounted but runnable
167
168 // monitor enter
169 private static final int BLOCKING = 12;
170 private static final int BLOCKED = 13; // unmounted
171 private static final int UNBLOCKED = 14; // unmounted but runnable
172
173 // monitor wait/timed-wait
174 private static final int WAITING = 15;
175 private static final int WAIT = 16; // waiting in Object.wait
176 private static final int TIMED_WAITING = 17;
177 private static final int TIMED_WAIT = 18; // waiting in timed-Object.wait
178
179 private static final int TERMINATED = 99; // final state
180
181 // parking permit made available by LockSupport.unpark
182 private volatile boolean parkPermit;
183
184 // blocking permit made available by unblocker thread when another thread exits monitor
185 private volatile boolean blockPermit;
186
187 // true when on the list of virtual threads waiting to be unblocked
188 private volatile boolean onWaitingList;
189
190 // next virtual thread on the list of virtual threads waiting to be unblocked
191 private volatile VirtualThread next;
192
193 // notified by Object.notify/notifyAll while waiting in Object.wait
194 private volatile boolean notified;
195
196 // true when waiting in Object.wait, false for VM internal uninterruptible Object.wait
197 private volatile boolean interruptibleWait;
198
199 // timed-wait support
200 private byte timedWaitSeqNo;
201
202 // timeout for timed-park and timed-wait, only accessed on current/carrier thread
203 private long timeout;
204
205 // timer task for timed-park and timed-wait, only accessed on current/carrier thread
206 private Future<?> timeoutTask;
207
208 // carrier thread when mounted, accessed by VM
209 private volatile Thread carrierThread;
210
211 // termination object when joining, created lazily if needed
212 private volatile CountDownLatch termination;
213
214 /**
215 * Return the built-in scheduler.
216 * @param trusted true if caller is trusted, false if not trusted
217 */
218 static VirtualThreadScheduler builtinScheduler(boolean trusted) {
219 return trusted ? BUILTIN_SCHEDULER : EXTERNAL_VIEW;
220 }
221
222 /**
223 * Returns the default scheduler, usually the same as the built-in scheduler.
224 */
225 static VirtualThreadScheduler defaultScheduler() {
226 return DEFAULT_SCHEDULER;
227 }
228
229 /**
230 * Returns the continuation scope used for virtual threads.
231 */
232 static ContinuationScope continuationScope() {
233 return VTHREAD_SCOPE;
234 }
235
236 /**
237 * Return the scheduler for this thread.
238 * @param trusted true if caller is trusted, false if not trusted
239 */
240 VirtualThreadScheduler scheduler(boolean trusted) {
241 if (scheduler == BUILTIN_SCHEDULER && !trusted) {
242 return EXTERNAL_VIEW;
243 } else {
244 return scheduler;
245 }
246 }
247
248 /**
249 * Returns the task to start/continue this virtual thread.
250 */
251 VirtualThreadTask virtualThreadTask() {
252 return runContinuation;
253 }
254
255 /**
256 * Creates a new {@code VirtualThread} to run the given task with the given scheduler.
257 *
258 * @param scheduler the scheduler or null for default scheduler
259 * @param preferredCarrier the preferred carrier or null
260 * @param name thread name
261 * @param characteristics characteristics
262 * @param task the task to execute
263 */
264 VirtualThread(VirtualThreadScheduler scheduler,
265 Thread preferredCarrier,
266 String name,
267 int characteristics,
268 Runnable task) {
269 super(name, characteristics, /*bound*/ false);
270 Objects.requireNonNull(task);
271
272 // use default scheduler if not provided
273 if (scheduler == null) {
274 scheduler = DEFAULT_SCHEDULER;
275 } else if (scheduler == EXTERNAL_VIEW) {
276 throw new UnsupportedOperationException();
277 }
278 this.scheduler = scheduler;
279 this.cont = new VThreadContinuation(this, task);
280
281 if (scheduler == BUILTIN_SCHEDULER) {
282 this.runContinuation = new BuiltinSchedulerTask(this);
283 } else {
284 this.runContinuation = new CustomSchedulerTask(this, preferredCarrier);
285 }
286 }
287
288 /**
289 * The task to execute when using the built-in scheduler.
290 */
291 static final class BuiltinSchedulerTask implements VirtualThreadTask {
292 private final VirtualThread vthread;
293 BuiltinSchedulerTask(VirtualThread vthread) {
294 this.vthread = vthread;
295 }
296 @Override
297 public Thread thread() {
298 return vthread;
299 }
300 @Override
301 public void run() {
302 vthread.runContinuation();;
303 }
304 @Override
305 public Thread preferredCarrier() {
306 throw new UnsupportedOperationException();
307 }
308 @Override
309 public Object attach(Object att) {
310 throw new UnsupportedOperationException();
311 }
312 @Override
313 public Object attachment() {
314 throw new UnsupportedOperationException();
315 }
316 }
317
318 /**
319 * The task to execute when using a custom scheduler.
320 */
321 static final class CustomSchedulerTask implements VirtualThreadTask {
322 private static final VarHandle ATT =
323 MhUtil.findVarHandle(MethodHandles.lookup(), "att", Object.class);
324 private final VirtualThread vthread;
325 private final Thread preferredCarrier;
326 private volatile Object att;
327 CustomSchedulerTask(VirtualThread vthread, Thread preferredCarrier) {
328 this.vthread = vthread;
329 this.preferredCarrier = preferredCarrier;
330 }
331 @Override
332 public Thread thread() {
333 return vthread;
334 }
335 @Override
336 public void run() {
337 vthread.runContinuation();;
338 }
339 @Override
340 public Thread preferredCarrier() {
341 return preferredCarrier;
342 }
343 @Override
344 public Object attach(Object att) {
345 return ATT.getAndSet(this, att);
346 }
347 @Override
348 public Object attachment() {
349 return att;
350 }
351 }
352
353 /**
354 * The continuation that a virtual thread executes.
355 */
356 private static class VThreadContinuation extends Continuation {
357 VThreadContinuation(VirtualThread vthread, Runnable task) {
358 super(VTHREAD_SCOPE, wrap(vthread, task));
359 }
360 @Override
361 protected void onPinned(Continuation.Pinned reason) {
362 }
363 private static Runnable wrap(VirtualThread vthread, Runnable task) {
364 return new Runnable() {
365 @Hidden
366 @JvmtiHideEvents
367 public void run() {
368 vthread.endFirstTransition();
369 try {
370 vthread.run(task);
418 if (cont.isDone()) {
419 afterDone();
420 } else {
421 afterYield();
422 }
423 }
424 }
425
426 /**
427 * Cancel timeout task when continuing after timed-park or timed-wait.
428 * The timeout task may be executing, or may have already completed.
429 */
430 private void cancelTimeoutTask() {
431 if (timeoutTask != null) {
432 timeoutTask.cancel(false);
433 timeoutTask = null;
434 }
435 }
436
437 /**
438 * Submits the runContinuation task to the scheduler. For the built-in scheduler,
439 * the task will be pushed to the local queue if possible, otherwise it will be
440 * pushed to an external submission queue.
441 * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown
442 * @throws RejectedExecutionException
443 */
444 private void submitRunContinuation(boolean retryOnOOME) {
445 boolean done = false;
446 while (!done) {
447 try {
448 // Pin the continuation to prevent the virtual thread from unmounting
449 // when submitting a task. For the default scheduler this ensures that
450 // the carrier doesn't change when pushing a task. For other schedulers
451 // it avoids deadlock that could arise due to carriers and virtual
452 // threads contending for a lock.
453 if (currentThread().isVirtual()) {
454 Continuation.pin();
455 try {
456 scheduler.onContinue(runContinuation);
457 } finally {
458 Continuation.unpin();
459 }
460 } else {
461 scheduler.onContinue(runContinuation);
462 }
463 done = true;
464 } catch (RejectedExecutionException ree) {
465 submitFailed(ree);
466 throw ree;
467 } catch (OutOfMemoryError e) {
468 if (retryOnOOME) {
469 U.park(false, 100_000_000); // 100ms
470 } else {
471 throw e;
472 }
473 }
474 }
475 }
476
477 /**
478 * Submits the runContinuation task to the scheduler. For the default scheduler,
479 * and calling it on a worker thread, the task will be pushed to the local queue,
480 * otherwise it will be pushed to an external submission queue.
481 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
482 * @throws RejectedExecutionException
483 */
484 private void submitRunContinuation() {
485 submitRunContinuation(true);
486 }
487
488 /**
489 * Invoked from a carrier thread to lazy submit the runContinuation task to the
490 * carrier's local queue if the queue is empty. If not empty, or invoked by a thread
491 * for a custom scheduler, then it just submits the task to the scheduler.
492 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
493 * @throws RejectedExecutionException
494 * @see ForkJoinPool#lazySubmit(ForkJoinTask)
495 */
496 private void lazySubmitRunContinuation() {
497 assert !currentThread().isVirtual();
498 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
499 try {
500 ct.getPool().lazySubmit(ForkJoinTask.adapt(runContinuation));
501 } catch (RejectedExecutionException ree) {
502 submitFailed(ree);
503 throw ree;
504 } catch (OutOfMemoryError e) {
505 submitRunContinuation();
506 }
507 } else {
508 submitRunContinuation();
509 }
510 }
511
512 /**
513 * Invoked from a carrier thread to externally submit the runContinuation task to the
514 * scheduler. If invoked by a thread for a custom scheduler, then it just submits the
515 * task to the scheduler.
516 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
517 * @throws RejectedExecutionException
518 * @see ForkJoinPool#externalSubmit(ForkJoinTask)
519 */
520 private void externalSubmitRunContinuation() {
521 assert !currentThread().isVirtual();
522 if (currentThread() instanceof CarrierThread ct) {
523 try {
524 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation));
525 } catch (RejectedExecutionException ree) {
526 submitFailed(ree);
527 throw ree;
528 } catch (OutOfMemoryError e) {
529 submitRunContinuation();
530 }
531 } else {
532 submitRunContinuation();
533 }
534 }
535
536 /**
537 * Invoked from Thread.start to externally submit the runContinuation task to the
538 * scheduler. If this virtual thread is scheduled by the built-in scheduler,
539 * and this method is called from a virtual thread scheduled by the built-in
540 * scheduler, then it uses externalSubmit to ensure that the task is pushed to an
541 * external submission queue rather than the local queue.
542 * @throws RejectedExecutionException
543 * @throws OutOfMemoryError
544 * @see ForkJoinPool#externalSubmit(ForkJoinTask)
545 */
546 private void externalSubmitRunContinuationOrThrow() {
547 try {
548 if (currentThread().isVirtual()) {
549 // Pin the continuation to prevent the virtual thread from unmounting
550 // when submitting a task. This avoids deadlock that could arise due to
551 // carriers and virtual threads contending for a lock.
552 Continuation.pin();
553 try {
554 if (scheduler == BUILTIN_SCHEDULER
555 && currentCarrierThread() instanceof CarrierThread ct) {
556 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation));
557 } else {
558 scheduler.onStart(runContinuation);
559 }
560 } finally {
561 Continuation.unpin();
562 }
563 } else {
564 scheduler.onStart(runContinuation);
565 }
566 } catch (RejectedExecutionException ree) {
567 submitFailed(ree);
568 throw ree;
569 }
570 }
571
572 /**
573 * If enabled, emits a JFR VirtualThreadSubmitFailedEvent.
574 */
575 private void submitFailed(RejectedExecutionException ree) {
576 var event = new VirtualThreadSubmitFailedEvent();
577 if (event.isEnabled()) {
578 event.javaThreadId = threadId();
579 event.exceptionMessage = ree.getMessage();
580 event.commit();
581 }
582 }
583
584 /**
585 * Runs a task in the context of this virtual thread.
586 */
587 private void run(Runnable task) {
588 assert Thread.currentThread() == this && state == RUNNING;
602 } finally {
603 // pop any remaining scopes from the stack, this may block
604 StackableScope.popAll();
605
606 // emit JFR event if enabled
607 if (VirtualThreadEndEvent.isTurnedOn()) {
608 var event = new VirtualThreadEndEvent();
609 event.javaThreadId = threadId();
610 event.commit();
611 }
612 }
613 }
614
615 /**
616 * Mounts this virtual thread onto the current platform thread. On
617 * return, the current thread is the virtual thread.
618 */
619 @ChangesCurrentThread
620 @ReservedStackAccess
621 private void mount() {
622 startTransition(/*mount*/true);
623 // We assume following volatile accesses provide equivalent
624 // of acquire ordering, otherwise we need U.loadFence() here.
625
626 // sets the carrier thread
627 Thread carrier = Thread.currentCarrierThread();
628 setCarrierThread(carrier);
629
630 // sync up carrier thread interrupted status if needed
631 if (interrupted) {
632 carrier.setInterrupt();
633 } else if (carrier.isInterrupted()) {
634 synchronized (interruptLock) {
635 // need to recheck interrupted status
636 if (!interrupted) {
637 carrier.clearInterrupt();
638 }
639 }
640 }
641
642 // set Thread.currentThread() to return this virtual thread
647 * Unmounts this virtual thread from the carrier. On return, the
648 * current thread is the current platform thread.
649 */
650 @ChangesCurrentThread
651 @ReservedStackAccess
652 private void unmount() {
653 assert !Thread.holdsLock(interruptLock);
654
655 // set Thread.currentThread() to return the platform thread
656 Thread carrier = this.carrierThread;
657 carrier.setCurrentThread(carrier);
658
659 // break connection to carrier thread, synchronized with interrupt
660 synchronized (interruptLock) {
661 setCarrierThread(null);
662 }
663 carrier.clearInterrupt();
664
665 // We assume previous volatile accesses provide equivalent
666 // of release ordering, otherwise we need U.storeFence() here.
667 endTransition(/*mount*/false);
668 }
669
670 /**
671 * Invokes Continuation.yield, notifying JVMTI (if enabled) to hide frames until
672 * the continuation continues.
673 */
674 @Hidden
675 private boolean yieldContinuation() {
676 startTransition(/*mount*/false);
677 try {
678 return Continuation.yield(VTHREAD_SCOPE);
679 } finally {
680 endTransition(/*mount*/true);
681 }
682 }
683
684 /**
685 * Invoked in the context of the carrier thread after the Continuation yields when
686 * parking, blocking on monitor enter, Object.wait, or Thread.yield.
687 */
688 private void afterYield() {
689 assert carrierThread == null;
690
691 // re-adjust parallelism if the virtual thread yielded when compensating
692 if (currentThread() instanceof CarrierThread ct) {
693 ct.endBlocking();
694 }
695
696 int s = state();
697
698 // LockSupport.park/parkNanos
699 if (s == PARKING || s == TIMED_PARKING) {
700 int newState;
705 long timeout = this.timeout;
706 assert timeout > 0;
707 timeoutTask = schedule(this::parkTimeoutExpired, timeout, NANOSECONDS);
708 setState(newState = TIMED_PARKED);
709 }
710
711 // may have been unparked while parking
712 if (parkPermit && compareAndSetState(newState, UNPARKED)) {
713 // lazy submit if local queue is empty
714 lazySubmitRunContinuation();
715 }
716 return;
717 }
718
719 // Thread.yield
720 if (s == YIELDING) {
721 setState(YIELDED);
722
723 // external submit if there are no tasks in the local task queue
724 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
725 externalSubmitRunContinuation();
726 } else {
727 submitRunContinuation();
728 }
729 return;
730 }
731
732 // blocking on monitorenter
733 if (s == BLOCKING) {
734 setState(BLOCKED);
735
736 // may have been unblocked while blocking
737 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
738 // lazy submit if local queue is empty
739 lazySubmitRunContinuation();
740 }
741 return;
742 }
743
744 // Object.wait
745 if (s == WAITING || s == TIMED_WAITING) {
865 @Override
866 public void run() {
867 // do nothing
868 }
869
870 /**
871 * Parks until unparked or interrupted. If already unparked then the parking
872 * permit is consumed and this method completes immediately (meaning it doesn't
873 * yield). It also completes immediately if the interrupted status is set.
874 */
875 @Override
876 void park() {
877 assert Thread.currentThread() == this;
878
879 // complete immediately if parking permit available or interrupted
880 if (getAndSetParkPermit(false) || interrupted)
881 return;
882
883 // park the thread
884 boolean yielded = false;
885 long eventStartTime = VirtualThreadParkEvent.eventStartTime();
886 setState(PARKING);
887 try {
888 yielded = yieldContinuation();
889 } catch (OutOfMemoryError e) {
890 // park on carrier
891 } finally {
892 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
893 if (yielded) {
894 VirtualThreadParkEvent.offer(eventStartTime, Long.MIN_VALUE);
895 } else {
896 assert state() == PARKING;
897 setState(RUNNING);
898 }
899 }
900
901 // park on the carrier thread when pinned
902 if (!yielded) {
903 parkOnCarrierThread(false, 0);
904 }
905 }
906
907 /**
908 * Parks up to the given waiting time or until unparked or interrupted.
909 * If already unparked then the parking permit is consumed and this method
910 * completes immediately (meaning it doesn't yield). It also completes immediately
911 * if the interrupted status is set or the waiting time is {@code <= 0}.
912 *
913 * @param nanos the maximum number of nanoseconds to wait.
914 */
915 @Override
916 void parkNanos(long nanos) {
917 assert Thread.currentThread() == this;
918
919 // complete immediately if parking permit available or interrupted
920 if (getAndSetParkPermit(false) || interrupted)
921 return;
922
923 // park the thread for the waiting time
924 if (nanos > 0) {
925 long startTime = System.nanoTime();
926
927 // park the thread, afterYield will schedule the thread to unpark
928 boolean yielded = false;
929 long eventStartTime = VirtualThreadParkEvent.eventStartTime();
930 timeout = nanos;
931 setState(TIMED_PARKING);
932 try {
933 yielded = yieldContinuation();
934 } catch (OutOfMemoryError e) {
935 // park on carrier
936 } finally {
937 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
938 if (yielded) {
939 VirtualThreadParkEvent.offer(eventStartTime, nanos);
940 } else {
941 assert state() == TIMED_PARKING;
942 setState(RUNNING);
943 }
944 }
945
946 // park on carrier thread for remaining time when pinned (or OOME)
947 if (!yielded) {
948 long remainingNanos = nanos - (System.nanoTime() - startTime);
949 parkOnCarrierThread(true, remainingNanos);
950 }
951 }
952 }
953
954 /**
955 * Parks the current carrier thread up to the given waiting time or until
956 * unparked or interrupted. If the virtual thread is interrupted then the
957 * interrupted status will be propagated to the carrier thread.
958 * @param timed true for a timed park, false for untimed
959 * @param nanos the waiting time in nanoseconds
960 */
1044 submitRunContinuation();
1045 }
1046 }
1047
1048 /**
1049 * Invoked by FJP worker thread or STPE thread when park timeout expires.
1050 */
1051 private void parkTimeoutExpired() {
1052 assert !VirtualThread.currentThread().isVirtual();
1053 unpark(true);
1054 }
1055
1056 /**
1057 * Invoked by FJP worker thread or STPE thread when wait timeout expires.
1058 * If the virtual thread is in timed-wait then this method will unblock the thread
1059 * and submit its task so that it continues and attempts to reenter the monitor.
1060 * This method does nothing if the thread has been woken by notify or interrupt.
1061 */
1062 private void waitTimeoutExpired(byte seqNo) {
1063 assert !Thread.currentThread().isVirtual();
1064
1065 synchronized (timedWaitLock()) {
1066 if (seqNo != timedWaitSeqNo) {
1067 // this timeout task is for a past timed-wait
1068 return;
1069 }
1070 if (!compareAndSetState(TIMED_WAIT, UNBLOCKED)) {
1071 // already unblocked
1072 return;
1073 }
1074 }
1075
1076 lazySubmitRunContinuation();
1077 }
1078
1079 /**
1080 * Attempts to yield the current virtual thread (Thread.yield).
1081 */
1082 void tryYield() {
1083 assert Thread.currentThread() == this;
1084 setState(YIELDING);
1085 boolean yielded = false;
1086 try {
1087 yielded = yieldContinuation(); // may throw
1088 } finally {
1089 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
1090 if (!yielded) {
1091 assert state() == YIELDING;
1092 setState(RUNNING);
1093 }
1094 }
1095 }
1096
1224 @Override
1225 boolean getAndClearInterrupt() {
1226 assert Thread.currentThread() == this;
1227 boolean oldValue = interrupted;
1228 if (oldValue) {
1229 disableSuspendAndPreempt();
1230 try {
1231 synchronized (interruptLock) {
1232 interrupted = false;
1233 carrierThread.clearInterrupt();
1234 }
1235 } finally {
1236 enableSuspendAndPreempt();
1237 }
1238 }
1239 return oldValue;
1240 }
1241
1242 @Override
1243 Thread.State threadState() {
1244 switch (state()) {
1245 case NEW:
1246 return Thread.State.NEW;
1247 case STARTED:
1248 // return NEW if thread container not yet set
1249 if (threadContainer() == null) {
1250 return Thread.State.NEW;
1251 } else {
1252 return Thread.State.RUNNABLE;
1253 }
1254 case UNPARKED:
1255 case UNBLOCKED:
1256 case YIELDED:
1257 // runnable, not mounted
1258 return Thread.State.RUNNABLE;
1259 case RUNNING:
1260 // if mounted then return state of carrier thread
1261 if (Thread.currentThread() != this) {
1262 disableSuspendAndPreempt();
1263 try {
1264 synchronized (carrierThreadAccessLock()) {
1292 case BLOCKED:
1293 return Thread.State.BLOCKED;
1294 case TERMINATED:
1295 return Thread.State.TERMINATED;
1296 default:
1297 throw new InternalError();
1298 }
1299 }
1300
1301 @Override
1302 boolean alive() {
1303 int s = state;
1304 return (s != NEW && s != TERMINATED);
1305 }
1306
1307 @Override
1308 boolean isTerminated() {
1309 return (state == TERMINATED);
1310 }
1311
1312 @Override
1313 public String toString() {
1314 StringBuilder sb = new StringBuilder("VirtualThread[#");
1315 sb.append(threadId());
1316 String name = getName();
1317 if (!name.isEmpty()) {
1318 sb.append(",");
1319 sb.append(name);
1320 }
1321 sb.append("]/");
1322
1323 // add the carrier state and thread name when mounted
1324 boolean mounted;
1325 if (Thread.currentThread() == this) {
1326 mounted = appendCarrierInfo(sb);
1327 } else {
1328 disableSuspendAndPreempt();
1329 try {
1330 synchronized (carrierThreadAccessLock()) {
1331 mounted = appendCarrierInfo(sb);
1462 // resuming the virtual thread's continuation on the carrier.
1463 // An "unmount transition" embodies the steps to transfer control from a virtual
1464 // thread to its carrier, suspending the virtual thread's continuation, and
1465 // restoring the thread identity to the platform thread.
1466 // The notifications to the VM are necessary in order to coordinate with functions
1467 // (JVMTI mostly) that disable transitions for one or all virtual threads. Starting
1468 // a transition may block if transitions are disabled. Ending a transition may
1469 // notify a thread that is waiting to disable transitions. The notifications are
1470 // also used to post JVMTI events for virtual thread start and end.
1471
1472 @IntrinsicCandidate
1473 @JvmtiMountTransition
1474 private native void endFirstTransition();
1475
1476 @IntrinsicCandidate
1477 @JvmtiMountTransition
1478 private native void startFinalTransition();
1479
1480 @IntrinsicCandidate
1481 @JvmtiMountTransition
1482 private native void startTransition(boolean mount);
1483
1484 @IntrinsicCandidate
1485 @JvmtiMountTransition
1486 private native void endTransition(boolean mount);
1487
1488 @IntrinsicCandidate
1489 private static native void notifyJvmtiDisableSuspend(boolean enter);
1490
1491 private static native void registerNatives();
1492 static {
1493 registerNatives();
1494
1495 // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
1496 var group = Thread.virtualThreadGroup();
1497
1498 // ensure event class is initialized
1499 try {
1500 MethodHandles.lookup().ensureInitialized(VirtualThreadParkEvent.class);
1501 } catch (IllegalAccessException e) {
1502 throw new ExceptionInInitializerError(e);
1503 }
1504 }
1505
1506 /**
1507 * Loads a VirtualThreadScheduler with the given class name. The class must be public
1508 * in an exported package, with public one-arg or no-arg constructor, and be visible
1509 * to the system class loader.
1510 * @param delegate the scheduler that the custom scheduler may delegate to
1511 * @param cn the class name of the custom scheduler
1512 */
1513 private static VirtualThreadScheduler loadCustomScheduler(VirtualThreadScheduler delegate, String cn) {
1514 VirtualThreadScheduler scheduler;
1515 try {
1516 Class<?> clazz = Class.forName(cn, true, ClassLoader.getSystemClassLoader());
1517 // 1-arg constructor
1518 try {
1519 Constructor<?> ctor = clazz.getConstructor(VirtualThreadScheduler.class);
1520 return (VirtualThreadScheduler) ctor.newInstance(delegate);
1521 } catch (NoSuchMethodException e) {
1522 // 0-arg constructor
1523 Constructor<?> ctor = clazz.getConstructor();
1524 scheduler = (VirtualThreadScheduler) ctor.newInstance();
1525 }
1526 } catch (Exception ex) {
1527 throw new Error(ex);
1528 }
1529 System.err.println("WARNING: Using custom default scheduler, this is an experimental feature!");
1530 return scheduler;
1531 }
1532
1533 /**
1534 * Creates the built-in ForkJoinPool scheduler.
1535 * @param wrapped true if wrapped by a custom default scheduler
1536 */
1537 private static BuiltinScheduler createBuiltinScheduler(boolean wrapped) {
1538 int parallelism, maxPoolSize, minRunnable;
1539 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
1540 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
1541 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
1542 if (parallelismValue != null) {
1543 parallelism = Integer.parseInt(parallelismValue);
1544 } else {
1545 parallelism = Runtime.getRuntime().availableProcessors();
1546 }
1547 if (maxPoolSizeValue != null) {
1548 maxPoolSize = Integer.parseInt(maxPoolSizeValue);
1549 parallelism = Integer.min(parallelism, maxPoolSize);
1550 } else {
1551 maxPoolSize = Integer.max(parallelism, 256);
1552 }
1553 if (minRunnableValue != null) {
1554 minRunnable = Integer.parseInt(minRunnableValue);
1555 } else {
1556 minRunnable = Integer.max(parallelism / 2, 1);
1557 }
1558 return new BuiltinScheduler(parallelism, maxPoolSize, minRunnable, wrapped);
1559 }
1560
1561 /**
1562 * The built-in ForkJoinPool scheduler.
1563 */
1564 private static class BuiltinScheduler
1565 extends ForkJoinPool implements VirtualThreadScheduler {
1566
1567 BuiltinScheduler(int parallelism, int maxPoolSize, int minRunnable, boolean wrapped) {
1568 ForkJoinWorkerThreadFactory factory = wrapped
1569 ? ForkJoinPool.defaultForkJoinWorkerThreadFactory
1570 : CarrierThread::new;
1571 Thread.UncaughtExceptionHandler handler = (t, e) -> { };
1572 boolean asyncMode = true; // FIFO
1573 super(parallelism, factory, handler, asyncMode,
1574 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);
1575 }
1576
1577 private void adaptAndExecute(Runnable task) {
1578 execute(ForkJoinTask.adapt(task));
1579 }
1580
1581 @Override
1582 public void onStart(VirtualThreadTask task) {
1583 adaptAndExecute(task);
1584 }
1585
1586 @Override
1587 public void onContinue(VirtualThreadTask task) {
1588 adaptAndExecute(task);
1589 }
1590
1591 @Override
1592 public ScheduledFuture<?> schedule(Runnable task, long delay, TimeUnit unit) {
1593 return super.schedule(task, delay, unit);
1594 }
1595
1596 /**
1597 * Wraps the scheduler to avoid leaking a direct reference with
1598 * {@link VirtualThreadScheduler#current()}.
1599 */
1600 VirtualThreadScheduler createExternalView() {
1601 BuiltinScheduler builtin = this;
1602 return new VirtualThreadScheduler() {
1603 private void execute(VirtualThreadTask task) {
1604 var vthread = (VirtualThread) task.thread();
1605 VirtualThreadScheduler scheduler = vthread.scheduler;
1606 if (scheduler == this || scheduler == DEFAULT_SCHEDULER) {
1607 builtin.adaptAndExecute(task);
1608 } else {
1609 throw new IllegalArgumentException();
1610 }
1611 }
1612 @Override
1613 public void onStart(VirtualThreadTask task) {
1614 execute(task);
1615 }
1616 @Override
1617 public void onContinue(VirtualThreadTask task) {
1618 execute(task);
1619 }
1620 @Override
1621 public String toString() {
1622 return builtin.toString();
1623 }
1624 };
1625 }
1626 }
1627
1628 /**
1629 * Schedule a runnable task to run after a delay.
1630 */
1631 private Future<?> schedule(Runnable command, long delay, TimeUnit unit) {
1632 return scheduler.schedule(command, delay, unit);
1633 }
1634
1635 /**
1636 * Supports scheduling a runnable task to run after a delay. It uses a number
1637 * of ScheduledThreadPoolExecutor instances to reduce contention on the delayed
1638 * work queue used. This class is used when using a custom scheduler.
1639 */
1640 static class DelayedTaskSchedulers {
1641 private static final ScheduledExecutorService[] INSTANCE = createDelayedTaskSchedulers();
1642
1643 static Future<?> schedule(Runnable command, long delay, TimeUnit unit) {
1644 long tid = Thread.currentThread().threadId();
1645 int index = (int) tid & (INSTANCE.length - 1);
1646 return INSTANCE[index].schedule(command, delay, unit);
1647 }
1648
1649 private static ScheduledExecutorService[] createDelayedTaskSchedulers() {
1650 String propName = "jdk.virtualThreadScheduler.timerQueues";
1651 String propValue = System.getProperty(propName);
1652 int queueCount;
1653 if (propValue != null) {
1654 queueCount = Integer.parseInt(propValue);
1655 if (queueCount != Integer.highestOneBit(queueCount)) {
1656 throw new RuntimeException("Value of " + propName + " must be power of 2");
1657 }
1658 } else {
1659 int ncpus = Runtime.getRuntime().availableProcessors();
1660 queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1);
|