7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package java.lang;
26
27 import java.security.AccessController;
28 import java.security.PrivilegedAction;
29 import java.util.Locale;
30 import java.util.Objects;
31 import java.util.concurrent.Callable;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.Executor;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.ForkJoinPool;
36 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
37 import java.util.concurrent.ForkJoinTask;
38 import java.util.concurrent.ForkJoinWorkerThread;
39 import java.util.concurrent.Future;
40 import java.util.concurrent.RejectedExecutionException;
41 import java.util.concurrent.ScheduledExecutorService;
42 import java.util.concurrent.ScheduledThreadPoolExecutor;
43 import jdk.internal.event.VirtualThreadEndEvent;
44 import jdk.internal.event.VirtualThreadPinnedEvent;
45 import jdk.internal.event.VirtualThreadStartEvent;
46 import jdk.internal.event.VirtualThreadSubmitFailedEvent;
47 import jdk.internal.misc.CarrierThread;
48 import jdk.internal.misc.InnocuousThread;
49 import jdk.internal.misc.Unsafe;
50 import jdk.internal.vm.Continuation;
51 import jdk.internal.vm.ContinuationScope;
52 import jdk.internal.vm.StackableScope;
53 import jdk.internal.vm.ThreadContainer;
54 import jdk.internal.vm.ThreadContainers;
55 import jdk.internal.vm.annotation.ChangesCurrentThread;
56 import jdk.internal.vm.annotation.Hidden;
57 import jdk.internal.vm.annotation.IntrinsicCandidate;
58 import jdk.internal.vm.annotation.JvmtiMountTransition;
59 import jdk.internal.vm.annotation.ReservedStackAccess;
60 import sun.nio.ch.Interruptible;
61 import sun.security.action.GetPropertyAction;
62 import static java.util.concurrent.TimeUnit.*;
63
64 /**
65 * A thread that is scheduled by the Java virtual machine rather than the operating
66 * system.
67 */
68 final class VirtualThread extends BaseVirtualThread {
69 private static final Unsafe U = Unsafe.getUnsafe();
70 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
71 private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
72 private static final ScheduledExecutorService UNPARKER = createDelayedTaskScheduler();
73 private static final int TRACE_PINNING_MODE = tracePinningMode();
74
75 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
76 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
77 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
78 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
79
80 // scheduler and continuation
81 private final Executor scheduler;
82 private final Continuation cont;
83 private final Runnable runContinuation;
84
85 // virtual thread state, accessed by VM
86 private volatile int state;
87
88 /*
89 * Virtual thread state transitions:
90 *
91 * NEW -> STARTED // Thread.start, schedule to run
92 * STARTED -> TERMINATED // failed to start
93 * STARTED -> RUNNING // first run
94 * RUNNING -> TERMINATED // done
95 *
96 * RUNNING -> PARKING // Thread parking with LockSupport.park
97 * PARKING -> PARKED // cont.yield successful, parked indefinitely
98 * PARKING -> PINNED // cont.yield failed, parked indefinitely on carrier
99 * PARKED -> UNPARKED // unparked, may be scheduled to continue
100 * PINNED -> RUNNING // unparked, continue execution on same carrier
101 * UNPARKED -> RUNNING // continue execution after park
102 *
103 * RUNNING -> TIMED_PARKING // Thread parking with LockSupport.parkNanos
104 * TIMED_PARKING -> TIMED_PARKED // cont.yield successful, timed-parked
105 * TIMED_PARKING -> TIMED_PINNED // cont.yield failed, timed-parked on carrier
106 * TIMED_PARKED -> UNPARKED // unparked, may be scheduled to continue
107 * TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier
108 *
109 * RUNNING -> YIELDING // Thread.yield
110 * YIELDING -> YIELDED // cont.yield successful, may be scheduled to continue
111 * YIELDING -> RUNNING // cont.yield failed
112 * YIELDED -> RUNNING // continue execution after Thread.yield
113 */
114 private static final int NEW = 0;
115 private static final int STARTED = 1;
116 private static final int RUNNING = 2; // runnable-mounted
117
118 // untimed and timed parking
119 private static final int PARKING = 3;
120 private static final int PARKED = 4; // unmounted
121 private static final int PINNED = 5; // mounted
122 private static final int TIMED_PARKING = 6;
123 private static final int TIMED_PARKED = 7; // unmounted
124 private static final int TIMED_PINNED = 8; // mounted
125 private static final int UNPARKED = 9; // unmounted but runnable
126
127 // Thread.yield
128 private static final int YIELDING = 10;
129 private static final int YIELDED = 11; // unmounted but runnable
130
131 private static final int TERMINATED = 99; // final state
132
133 // can be suspended from scheduling when unmounted
134 private static final int SUSPENDED = 1 << 8;
135
136 // parking permit
137 private volatile boolean parkPermit;
138
139 // carrier thread when mounted, accessed by VM
140 private volatile Thread carrierThread;
141
142 // termination object when joining, created lazily if needed
143 private volatile CountDownLatch termination;
144
145 /**
146 * Returns the continuation scope used for virtual threads.
147 */
148 static ContinuationScope continuationScope() {
149 return VTHREAD_SCOPE;
150 }
151
152 /**
153 * Creates a new {@code VirtualThread} to run the given task with the given
154 * scheduler. If the given scheduler is {@code null} and the current thread
155 * is a platform thread then the newly created virtual thread will use the
156 * default scheduler. If given scheduler is {@code null} and the current
157 * thread is a virtual thread then the current thread's scheduler is used.
158 *
159 * @param scheduler the scheduler or null
160 * @param name thread name
161 * @param characteristics characteristics
162 * @param task the task to execute
163 */
164 VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
172 scheduler = vparent.scheduler;
173 } else {
174 scheduler = DEFAULT_SCHEDULER;
175 }
176 }
177
178 this.scheduler = scheduler;
179 this.cont = new VThreadContinuation(this, task);
180 this.runContinuation = this::runContinuation;
181 }
182
183 /**
184 * The continuation that a virtual thread executes.
185 */
186 private static class VThreadContinuation extends Continuation {
187 VThreadContinuation(VirtualThread vthread, Runnable task) {
188 super(VTHREAD_SCOPE, wrap(vthread, task));
189 }
190 @Override
191 protected void onPinned(Continuation.Pinned reason) {
192 if (TRACE_PINNING_MODE > 0) {
193 boolean printAll = (TRACE_PINNING_MODE == 1);
194 VirtualThread vthread = (VirtualThread) Thread.currentThread();
195 int oldState = vthread.state();
196 try {
197 // avoid printing when in transition states
198 vthread.setState(RUNNING);
199 PinnedThreadPrinter.printStackTrace(System.out, reason, printAll);
200 } finally {
201 vthread.setState(oldState);
202 }
203 }
204 }
205 private static Runnable wrap(VirtualThread vthread, Runnable task) {
206 return new Runnable() {
207 @Hidden
208 public void run() {
209 vthread.run(task);
210 }
211 };
212 }
213 }
214
215 /**
216 * Runs or continues execution on the current thread. The virtual thread is mounted
217 * on the current thread before the task runs or continues. It unmounts when the
218 * task completes or yields.
219 */
220 @ChangesCurrentThread
221 private void runContinuation() {
222 // the carrier must be a platform thread
223 if (Thread.currentThread().isVirtual()) {
224 throw new WrongThreadException();
225 }
226
227 // set state to RUNNING
228 int initialState = state();
229 if (initialState == STARTED || initialState == UNPARKED || initialState == YIELDED) {
230 // newly started or continue after parking/blocking/Thread.yield
231 if (!compareAndSetState(initialState, RUNNING)) {
232 return;
233 }
234 // consume parking permit when continuing after parking
235 if (initialState == UNPARKED) {
236 setParkPermit(false);
237 }
238 } else {
239 // not runnable
240 return;
241 }
242
243 mount();
244 try {
245 cont.run();
246 } finally {
247 unmount();
248 if (cont.isDone()) {
249 afterDone();
251 afterYield();
252 }
253 }
254 }
255
256 /**
257 * Submits the runContinuation task to the scheduler. For the default scheduler,
258 * and calling it on a worker thread, the task will be pushed to the local queue,
259 * otherwise it will be pushed to an external submission queue.
260 * @throws RejectedExecutionException
261 */
262 private void submitRunContinuation() {
263 try {
264 scheduler.execute(runContinuation);
265 } catch (RejectedExecutionException ree) {
266 submitFailed(ree);
267 throw ree;
268 }
269 }
270
271 /**
272 * Submits the runContinuation task to given scheduler with a lazy submit.
273 * @throws RejectedExecutionException
274 * @see ForkJoinPool#lazySubmit(ForkJoinTask)
275 */
276 private void lazySubmitRunContinuation(ForkJoinPool pool) {
277 try {
278 pool.lazySubmit(ForkJoinTask.adapt(runContinuation));
279 } catch (RejectedExecutionException ree) {
280 submitFailed(ree);
281 throw ree;
282 }
283 }
284
285 /**
286 * Submits the runContinuation task to the given scheduler as an external submit.
287 * @throws RejectedExecutionException
288 * @see ForkJoinPool#externalSubmit(ForkJoinTask)
289 */
290 private void externalSubmitRunContinuation(ForkJoinPool pool) {
368 } else if (carrier.isInterrupted()) {
369 synchronized (interruptLock) {
370 // need to recheck interrupt status
371 if (!interrupted) {
372 carrier.clearInterrupt();
373 }
374 }
375 }
376
377 // set Thread.currentThread() to return this virtual thread
378 carrier.setCurrentThread(this);
379 }
380
381 /**
382 * Unmounts this virtual thread from the carrier. On return, the
383 * current thread is the current platform thread.
384 */
385 @ChangesCurrentThread
386 @ReservedStackAccess
387 private void unmount() {
388 // set Thread.currentThread() to return the platform thread
389 Thread carrier = this.carrierThread;
390 carrier.setCurrentThread(carrier);
391
392 // break connection to carrier thread, synchronized with interrupt
393 synchronized (interruptLock) {
394 setCarrierThread(null);
395 }
396 carrier.clearInterrupt();
397
398 // notify JVMTI after unmount
399 notifyJvmtiUnmount(/*hide*/false);
400 }
401
402 /**
403 * Sets the current thread to the current carrier thread.
404 */
405 @ChangesCurrentThread
406 @JvmtiMountTransition
407 private void switchToCarrierThread() {
408 notifyJvmtiHideFrames(true);
409 Thread carrier = this.carrierThread;
410 assert Thread.currentThread() == this
411 && carrier == Thread.currentCarrierThread();
412 carrier.setCurrentThread(carrier);
413 }
414
415 /**
416 * Sets the current thread to the given virtual thread.
417 */
418 @ChangesCurrentThread
419 @JvmtiMountTransition
420 private void switchToVirtualThread(VirtualThread vthread) {
421 Thread carrier = vthread.carrierThread;
422 assert carrier == Thread.currentCarrierThread();
423 carrier.setCurrentThread(vthread);
424 notifyJvmtiHideFrames(false);
425 }
426
427 /**
428 * Executes the given value returning task on the current carrier thread.
429 */
430 @ChangesCurrentThread
431 <V> V executeOnCarrierThread(Callable<V> task) throws Exception {
432 assert Thread.currentThread() == this;
443 * the continuation continues.
444 */
445 @Hidden
446 private boolean yieldContinuation() {
447 notifyJvmtiUnmount(/*hide*/true);
448 try {
449 return Continuation.yield(VTHREAD_SCOPE);
450 } finally {
451 notifyJvmtiMount(/*hide*/false);
452 }
453 }
454
455 /**
456 * Invoked after the continuation yields. If parking then it sets the state
457 * and also re-submits the task to continue if unparked while parking.
458 * If yielding due to Thread.yield then it just submits the task to continue.
459 */
460 private void afterYield() {
461 assert carrierThread == null;
462
463 int s = state();
464
465 // LockSupport.park/parkNanos
466 if (s == PARKING || s == TIMED_PARKING) {
467 int newState = (s == PARKING) ? PARKED : TIMED_PARKED;
468 setState(newState);
469
470 // may have been unparked while parking
471 if (parkPermit && compareAndSetState(newState, UNPARKED)) {
472 // lazy submit to continue on the current thread as carrier if possible
473 if (currentThread() instanceof CarrierThread ct) {
474 lazySubmitRunContinuation(ct.getPool());
475 } else {
476 submitRunContinuation();
477 }
478
479 }
480 return;
481 }
482
483 // Thread.yield
484 if (s == YIELDING) {
485 setState(YIELDED);
486
487 // external submit if there are no tasks in the local task queue
488 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
489 externalSubmitRunContinuation(ct.getPool());
490 } else {
491 submitRunContinuation();
492 }
493 return;
494 }
495
496 assert false;
497 }
498
499 /**
500 * Invoked after the continuation completes.
501 */
502 private void afterDone() {
503 afterDone(true);
504 }
505
506 /**
507 * Invoked after the continuation completes (or start failed). Sets the thread
508 * state to TERMINATED and notifies anyone waiting for the thread to terminate.
509 *
510 * @param notifyContainer true if its container should be notified
511 */
512 private void afterDone(boolean notifyContainer) {
513 assert carrierThread == null;
514 setState(TERMINATED);
515
539 @Override
540 void start(ThreadContainer container) {
541 if (!compareAndSetState(NEW, STARTED)) {
542 throw new IllegalThreadStateException("Already started");
543 }
544
545 // bind thread to container
546 assert threadContainer() == null;
547 setThreadContainer(container);
548
549 // start thread
550 boolean addedToContainer = false;
551 boolean started = false;
552 try {
553 container.onStart(this); // may throw
554 addedToContainer = true;
555
556 // scoped values may be inherited
557 inheritScopedValueBindings(container);
558
559 // submit task to run thread
560 submitRunContinuation();
561 started = true;
562 } finally {
563 if (!started) {
564 afterDone(addedToContainer);
565 }
566 }
567 }
568
569 @Override
570 public void start() {
571 start(ThreadContainers.root());
572 }
573
574 @Override
575 public void run() {
576 // do nothing
577 }
578
579 /**
580 * Parks until unparked or interrupted. If already unparked then the parking
643 }
644
645 // park on carrier thread for remaining time when pinned
646 if (!yielded) {
647 long remainingNanos = nanos - (System.nanoTime() - startTime);
648 parkOnCarrierThread(true, remainingNanos);
649 }
650 }
651 }
652
653 /**
654 * Parks the current carrier thread up to the given waiting time or until
655 * unparked or interrupted. If the virtual thread is interrupted then the
656 * interrupt status will be propagated to the carrier thread.
657 * @param timed true for a timed park, false for untimed
658 * @param nanos the waiting time in nanoseconds
659 */
660 private void parkOnCarrierThread(boolean timed, long nanos) {
661 assert state() == RUNNING;
662
663 VirtualThreadPinnedEvent event;
664 try {
665 event = new VirtualThreadPinnedEvent();
666 event.begin();
667 } catch (OutOfMemoryError e) {
668 event = null;
669 }
670
671 setState(timed ? TIMED_PINNED : PINNED);
672 try {
673 if (!parkPermit) {
674 if (!timed) {
675 U.park(false, 0);
676 } else if (nanos > 0) {
677 U.park(false, nanos);
678 }
679 }
680 } finally {
681 setState(RUNNING);
682 }
683
684 // consume parking permit
685 setParkPermit(false);
686
687 if (event != null) {
688 try {
689 event.commit();
690 } catch (OutOfMemoryError e) {
691 // ignore
692 }
693 }
694 }
695
696 /**
697 * Schedule this virtual thread to be unparked after a given delay.
698 */
699 @ChangesCurrentThread
700 private Future<?> scheduleUnpark(long nanos) {
701 assert Thread.currentThread() == this;
702 // need to switch to current carrier thread to avoid nested parking
703 switchToCarrierThread();
704 try {
705 return UNPARKER.schedule(this::unpark, nanos, NANOSECONDS);
706 } finally {
707 switchToVirtualThread(this);
708 }
709 }
710
711 /**
712 * Cancels a task if it has not completed.
713 */
714 @ChangesCurrentThread
715 private void cancel(Future<?> future) {
716 if (!future.isDone()) {
717 // need to switch to current carrier thread to avoid nested parking
718 switchToCarrierThread();
719 try {
720 future.cancel(false);
721 } finally {
722 switchToVirtualThread(this);
723 }
724 }
725 }
726
727 /**
728 * Re-enables this virtual thread for scheduling. If the virtual thread was
729 * {@link #park() parked} then it will be unblocked, otherwise its next call
730 * to {@code park} or {@linkplain #parkNanos(long) parkNanos} is guaranteed
731 * not to block.
732 * @throws RejectedExecutionException if the scheduler cannot accept a task
733 */
734 @Override
735 @ChangesCurrentThread
736 void unpark() {
737 Thread currentThread = Thread.currentThread();
738 if (!getAndSetParkPermit(true) && currentThread != this) {
739 int s = state();
740 boolean parked = (s == PARKED) || (s == TIMED_PARKED);
741 if (parked && compareAndSetState(s, UNPARKED)) {
742 if (currentThread instanceof VirtualThread vthread) {
743 vthread.switchToCarrierThread();
744 try {
745 submitRunContinuation();
746 } finally {
747 switchToVirtualThread(vthread);
748 }
749 } else {
750 submitRunContinuation();
751 }
752 } else if ((s == PINNED) || (s == TIMED_PINNED)) {
753 // unpark carrier thread when pinned
754 notifyJvmtiDisableSuspend(true);
755 try {
756 synchronized (carrierThreadAccessLock()) {
757 Thread carrier = carrierThread;
758 if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) {
759 U.unpark(carrier);
760 }
761 }
762 } finally {
763 notifyJvmtiDisableSuspend(false);
764 }
765 }
766 }
767 }
768
769 /**
770 * Attempts to yield the current virtual thread (Thread.yield).
771 */
772 void tryYield() {
773 assert Thread.currentThread() == this;
774 setState(YIELDING);
775 boolean yielded = false;
776 try {
777 yielded = yieldContinuation(); // may throw
778 } finally {
779 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
780 if (!yielded) {
781 assert state() == YIELDING;
782 setState(RUNNING);
783 }
784 }
785 }
786
787 /**
788 * Sleep the current thread for the given sleep time (in nanoseconds). If
837 CountDownLatch termination = getTermination();
838 if (state() == TERMINATED)
839 return true;
840
841 // wait for virtual thread to terminate
842 if (nanos == 0) {
843 termination.await();
844 } else {
845 boolean terminated = termination.await(nanos, NANOSECONDS);
846 if (!terminated) {
847 // waiting time elapsed
848 return false;
849 }
850 }
851 assert state() == TERMINATED;
852 return true;
853 }
854
855 @Override
856 void blockedOn(Interruptible b) {
857 notifyJvmtiDisableSuspend(true);
858 try {
859 super.blockedOn(b);
860 } finally {
861 notifyJvmtiDisableSuspend(false);
862 }
863 }
864
865 @Override
866 @SuppressWarnings("removal")
867 public void interrupt() {
868 if (Thread.currentThread() != this) {
869 checkAccess();
870
871 // if current thread is a virtual thread then prevent it from being
872 // suspended when entering or holding interruptLock
873 Interruptible blocker;
874 notifyJvmtiDisableSuspend(true);
875 try {
876 synchronized (interruptLock) {
877 interrupted = true;
878 blocker = nioBlocker();
879 if (blocker != null) {
880 blocker.interrupt(this);
881 }
882
883 // interrupt carrier thread if mounted
884 Thread carrier = carrierThread;
885 if (carrier != null) carrier.setInterrupt();
886 }
887 } finally {
888 notifyJvmtiDisableSuspend(false);
889 }
890
891 // notify blocker after releasing interruptLock
892 if (blocker != null) {
893 blocker.postInterrupt();
894 }
895 } else {
896 interrupted = true;
897 carrierThread.setInterrupt();
898 }
899 unpark();
900 }
901
902 @Override
903 public boolean isInterrupted() {
904 return interrupted;
905 }
906
907 @Override
908 boolean getAndClearInterrupt() {
909 assert Thread.currentThread() == this;
910 boolean oldValue = interrupted;
911 if (oldValue) {
912 notifyJvmtiDisableSuspend(true);
913 try {
914 synchronized (interruptLock) {
915 interrupted = false;
916 carrierThread.clearInterrupt();
917 }
918 } finally {
919 notifyJvmtiDisableSuspend(false);
920 }
921 }
922 return oldValue;
923 }
924
925 @Override
926 Thread.State threadState() {
927 int s = state();
928 switch (s & ~SUSPENDED) {
929 case NEW:
930 return Thread.State.NEW;
931 case STARTED:
932 // return NEW if thread container not yet set
933 if (threadContainer() == null) {
934 return Thread.State.NEW;
935 } else {
936 return Thread.State.RUNNABLE;
937 }
938 case UNPARKED:
939 case YIELDED:
940 // runnable, not mounted
941 return Thread.State.RUNNABLE;
942 case RUNNING:
943 // if mounted then return state of carrier thread
944 notifyJvmtiDisableSuspend(true);
945 try {
946 synchronized (carrierThreadAccessLock()) {
947 Thread carrierThread = this.carrierThread;
948 if (carrierThread != null) {
949 return carrierThread.threadState();
950 }
951 }
952 } finally {
953 notifyJvmtiDisableSuspend(false);
954 }
955 // runnable, mounted
956 return Thread.State.RUNNABLE;
957 case PARKING:
958 case TIMED_PARKING:
959 case YIELDING:
960 // runnable, in transition
961 return Thread.State.RUNNABLE;
962 case PARKED:
963 case PINNED:
964 return State.WAITING;
965 case TIMED_PARKED:
966 case TIMED_PINNED:
967 return State.TIMED_WAITING;
968 case TERMINATED:
969 return Thread.State.TERMINATED;
970 default:
971 throw new InternalError();
972 }
973 }
974
975 @Override
976 boolean alive() {
977 int s = state;
978 return (s != NEW && s != TERMINATED);
979 }
980
981 @Override
982 boolean isTerminated() {
983 return (state == TERMINATED);
984 }
985
986 @Override
987 StackTraceElement[] asyncGetStackTrace() {
988 StackTraceElement[] stackTrace;
989 do {
990 stackTrace = (carrierThread != null)
991 ? super.asyncGetStackTrace() // mounted
992 : tryGetStackTrace(); // unmounted
993 if (stackTrace == null) {
994 Thread.yield();
995 }
996 } while (stackTrace == null);
997 return stackTrace;
998 }
999
1000 /**
1001 * Returns the stack trace for this virtual thread if it is unmounted.
1002 * Returns null if the thread is mounted or in transition.
1003 */
1004 private StackTraceElement[] tryGetStackTrace() {
1005 int initialState = state() & ~SUSPENDED;
1006 switch (initialState) {
1007 case NEW, STARTED, TERMINATED -> {
1008 return new StackTraceElement[0]; // unmounted, empty stack
1009 }
1010 case RUNNING, PINNED, TIMED_PINNED -> {
1011 return null; // mounted
1012 }
1013 case PARKED, TIMED_PARKED -> {
1014 // unmounted, not runnable
1015 }
1016 case UNPARKED, YIELDED -> {
1017 // unmounted, runnable
1018 }
1019 case PARKING, TIMED_PARKING, YIELDING -> {
1020 return null; // in transition
1021 }
1022 default -> throw new InternalError("" + initialState);
1023 }
1024
1025 // thread is unmounted, prevent it from continuing
1026 int suspendedState = initialState | SUSPENDED;
1027 if (!compareAndSetState(initialState, suspendedState)) {
1028 return null;
1029 }
1030
1031 // get stack trace and restore state
1032 StackTraceElement[] stack;
1033 try {
1034 stack = cont.getStackTrace();
1035 } finally {
1036 assert state == suspendedState;
1037 setState(initialState);
1038 }
1039 boolean resubmit = switch (initialState) {
1040 case UNPARKED, YIELDED -> {
1041 // resubmit as task may have run while suspended
1042 yield true;
1043 }
1044 case PARKED, TIMED_PARKED -> {
1045 // resubmit if unparked while suspended
1046 yield parkPermit && compareAndSetState(initialState, UNPARKED);
1047 }
1048 default -> throw new InternalError();
1049 };
1050 if (resubmit) {
1051 submitRunContinuation();
1052 }
1053 return stack;
1054 }
1055
1056 @Override
1057 public String toString() {
1058 StringBuilder sb = new StringBuilder("VirtualThread[#");
1059 sb.append(threadId());
1060 String name = getName();
1061 if (!name.isEmpty()) {
1062 sb.append(",");
1063 sb.append(name);
1064 }
1065 sb.append("]/");
1066 Thread carrier = carrierThread;
1067 if (carrier != null) {
1068 // include the carrier thread state and name when mounted
1069 notifyJvmtiDisableSuspend(true);
1070 try {
1071 synchronized (carrierThreadAccessLock()) {
1072 carrier = carrierThread;
1073 if (carrier != null) {
1074 String stateAsString = carrier.threadState().toString();
1075 sb.append(stateAsString.toLowerCase(Locale.ROOT));
1076 sb.append('@');
1077 sb.append(carrier.getName());
1078 }
1079 }
1080 } finally {
1081 notifyJvmtiDisableSuspend(false);
1082 }
1083 }
1084 // include virtual thread state when not mounted
1085 if (carrier == null) {
1086 String stateAsString = threadState().toString();
1087 sb.append(stateAsString.toLowerCase(Locale.ROOT));
1088 }
1089 return sb.toString();
1090 }
1091
1092 @Override
1093 public int hashCode() {
1094 return (int) threadId();
1095 }
1096
1097 @Override
1098 public boolean equals(Object obj) {
1099 return obj == this;
1100 }
1101
1102 /**
1103 * Returns the termination object, creating it if needed.
1104 */
1105 private CountDownLatch getTermination() {
1106 CountDownLatch termination = this.termination;
1107 if (termination == null) {
1108 termination = new CountDownLatch(1);
1109 if (!U.compareAndSetReference(this, TERMINATION, null, termination)) {
1110 termination = this.termination;
1111 }
1112 }
1113 return termination;
1114 }
1115
1116 /**
1117 * Returns the lock object to synchronize on when accessing carrierThread.
1118 * The lock prevents carrierThread from being reset to null during unmount.
1119 */
1120 private Object carrierThreadAccessLock() {
1121 // return interruptLock as unmount has to coordinate with interrupt
1122 return interruptLock;
1123 }
1124
1125 // -- wrappers for get/set of state, parking permit, and carrier thread --
1126
1127 private int state() {
1128 return state; // volatile read
1129 }
1130
1131 private void setState(int newValue) {
1132 state = newValue; // volatile write
1133 }
1134
1135 private boolean compareAndSetState(int expectedValue, int newValue) {
1136 return U.compareAndSetInt(this, STATE, expectedValue, newValue);
1137 }
1138
1139 private void setParkPermit(boolean newValue) {
1140 if (parkPermit != newValue) {
1141 parkPermit = newValue;
1142 }
1143 }
1144
1145 private boolean getAndSetParkPermit(boolean newValue) {
1146 if (parkPermit != newValue) {
1147 return U.getAndSetBoolean(this, PARK_PERMIT, newValue);
1148 } else {
1149 return newValue;
1150 }
1151 }
1152
1153 private void setCarrierThread(Thread carrier) {
1154 // U.putReferenceRelease(this, CARRIER_THREAD, carrier);
1155 this.carrierThread = carrier;
1156 }
1157
1158 // -- JVM TI support --
1159
1160 @IntrinsicCandidate
1161 @JvmtiMountTransition
1162 private native void notifyJvmtiStart();
1163
1164 @IntrinsicCandidate
1165 @JvmtiMountTransition
1166 private native void notifyJvmtiEnd();
1167
1168 @IntrinsicCandidate
1169 @JvmtiMountTransition
1170 private native void notifyJvmtiMount(boolean hide);
1171
1172 @IntrinsicCandidate
1173 @JvmtiMountTransition
1174 private native void notifyJvmtiUnmount(boolean hide);
1175
1176 @IntrinsicCandidate
1177 @JvmtiMountTransition
1207 if (maxPoolSizeValue != null) {
1208 maxPoolSize = Integer.parseInt(maxPoolSizeValue);
1209 parallelism = Integer.min(parallelism, maxPoolSize);
1210 } else {
1211 maxPoolSize = Integer.max(parallelism, 256);
1212 }
1213 if (minRunnableValue != null) {
1214 minRunnable = Integer.parseInt(minRunnableValue);
1215 } else {
1216 minRunnable = Integer.max(parallelism / 2, 1);
1217 }
1218 Thread.UncaughtExceptionHandler handler = (t, e) -> { };
1219 boolean asyncMode = true; // FIFO
1220 return new ForkJoinPool(parallelism, factory, handler, asyncMode,
1221 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);
1222 };
1223 return AccessController.doPrivileged(pa);
1224 }
1225
1226 /**
1227 * Creates the ScheduledThreadPoolExecutor used for timed unpark.
1228 */
1229 private static ScheduledExecutorService createDelayedTaskScheduler() {
1230 String propValue = GetPropertyAction.privilegedGetProperty("jdk.unparker.maxPoolSize");
1231 int poolSize;
1232 if (propValue != null) {
1233 poolSize = Integer.parseInt(propValue);
1234 } else {
1235 poolSize = 1;
1236 }
1237 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor)
1238 Executors.newScheduledThreadPool(poolSize, task -> {
1239 return InnocuousThread.newThread("VirtualThread-unparker", task);
1240 });
1241 stpe.setRemoveOnCancelPolicy(true);
1242 return stpe;
1243 }
1244
1245 /**
1246 * Reads the value of the jdk.tracePinnedThreads property to determine if stack
1247 * traces should be printed when a carrier thread is pinned when a virtual thread
1248 * attempts to park.
1249 */
1250 private static int tracePinningMode() {
1251 String propValue = GetPropertyAction.privilegedGetProperty("jdk.tracePinnedThreads");
1252 if (propValue != null) {
1253 if (propValue.length() == 0 || "full".equalsIgnoreCase(propValue))
1254 return 1;
1255 if ("short".equalsIgnoreCase(propValue))
1256 return 2;
1257 }
1258 return 0;
1259 }
1260 }
|
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package java.lang;
26
27 import java.nio.charset.StandardCharsets;
28 import java.security.AccessController;
29 import java.security.PrivilegedAction;
30 import java.util.Locale;
31 import java.util.Objects;
32 import java.util.concurrent.Callable;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.Executor;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.ForkJoinPool;
37 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
38 import java.util.concurrent.ForkJoinTask;
39 import java.util.concurrent.ForkJoinWorkerThread;
40 import java.util.concurrent.Future;
41 import java.util.concurrent.RejectedExecutionException;
42 import java.util.concurrent.ScheduledExecutorService;
43 import java.util.concurrent.ScheduledThreadPoolExecutor;
44 import jdk.internal.event.VirtualThreadEndEvent;
45 import jdk.internal.event.VirtualThreadStartEvent;
46 import jdk.internal.event.VirtualThreadSubmitFailedEvent;
47 import jdk.internal.misc.CarrierThread;
48 import jdk.internal.misc.InnocuousThread;
49 import jdk.internal.misc.Unsafe;
50 import jdk.internal.vm.Continuation;
51 import jdk.internal.vm.ContinuationScope;
52 import jdk.internal.vm.StackableScope;
53 import jdk.internal.vm.ThreadContainer;
54 import jdk.internal.vm.ThreadContainers;
55 import jdk.internal.vm.annotation.ChangesCurrentThread;
56 import jdk.internal.vm.annotation.Hidden;
57 import jdk.internal.vm.annotation.IntrinsicCandidate;
58 import jdk.internal.vm.annotation.JvmtiMountTransition;
59 import jdk.internal.vm.annotation.ReservedStackAccess;
60 import sun.nio.ch.Interruptible;
61 import sun.security.action.GetPropertyAction;
62 import static java.util.concurrent.TimeUnit.*;
63
64 /**
65 * A thread that is scheduled by the Java virtual machine rather than the operating
66 * system.
67 */
68 final class VirtualThread extends BaseVirtualThread {
69 private static final Unsafe U = Unsafe.getUnsafe();
70 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
71 private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
72 private static final ScheduledExecutorService[] DELAYED_TASK_SCHEDULERS = createDelayedTaskSchedulers();
73
74 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
75 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
76 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
77 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
78 private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
79
80 // scheduler and continuation
81 private final Executor scheduler;
82 private final Continuation cont;
83 private final Runnable runContinuation;
84
85 // virtual thread state, accessed by VM
86 private volatile int state;
87
88 /*
89 * Virtual thread state transitions:
90 *
91 * NEW -> STARTED // Thread.start, schedule to run
92 * STARTED -> TERMINATED // failed to start
93 * STARTED -> RUNNING // first run
94 * RUNNING -> TERMINATED // done
95 *
96 * RUNNING -> PARKING // Thread parking with LockSupport.park
97 * PARKING -> PARKED // cont.yield successful, parked indefinitely
98 * PARKING -> PINNED // cont.yield failed, parked indefinitely on carrier
99 * PARKED -> UNPARKED // unparked, may be scheduled to continue
100 * PINNED -> RUNNING // unparked, continue execution on same carrier
101 * UNPARKED -> RUNNING // continue execution after park
102 *
103 * RUNNING -> TIMED_PARKING // Thread parking with LockSupport.parkNanos
104 * TIMED_PARKING -> TIMED_PARKED // cont.yield successful, timed-parked
105 * TIMED_PARKING -> TIMED_PINNED // cont.yield failed, timed-parked on carrier
106 * TIMED_PARKED -> UNPARKED // unparked, may be scheduled to continue
107 * TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier
108 *
109 * RUNNABLE -> BLOCKING // blocking on monitor enter
110 * BLOCKING -> BLOCKED // blocked on monitor enter
111 * BLOCKED -> UNBLOCKED // unblocked, may be scheduled to continue
112 * UNBLOCKED -> RUNNING // continue execution after blocked on monitor enter
113 *
114 * RUNNING -> YIELDING // Thread.yield
115 * YIELDING -> YIELDED // cont.yield successful, may be scheduled to continue
116 * YIELDING -> RUNNING // cont.yield failed
117 * YIELDED -> RUNNING // continue execution after Thread.yield
118 */
119 private static final int NEW = 0;
120 private static final int STARTED = 1;
121 private static final int RUNNING = 2; // runnable-mounted
122
123 // untimed and timed parking
124 private static final int PARKING = 3;
125 private static final int PARKED = 4; // unmounted
126 private static final int PINNED = 5; // mounted
127 private static final int TIMED_PARKING = 6;
128 private static final int TIMED_PARKED = 7; // unmounted
129 private static final int TIMED_PINNED = 8; // mounted
130 private static final int UNPARKED = 9; // unmounted but runnable
131
132 // Thread.yield
133 private static final int YIELDING = 10;
134 private static final int YIELDED = 11; // unmounted but runnable
135
136 // monitor enter
137 private static final int BLOCKING = 12;
138 private static final int BLOCKED = 13; // unmounted
139 private static final int UNBLOCKED = 14; // unmounted but runnable
140
141 private static final int TERMINATED = 99; // final state
142
143 // can be suspended from scheduling when unmounted
144 private static final int SUSPENDED = 1 << 8;
145
146 // parking permit
147 private volatile boolean parkPermit;
148
149 // used to mark thread as ready to be unblocked while it is concurrently blocking
150 private volatile boolean unblocked;
151
152 // a positive value if "responsible thread" blocked on monitor enter, accessed by VM
153 private volatile byte recheckInterval;
154
155 // carrier thread when mounted, accessed by VM
156 private volatile Thread carrierThread;
157
158 // termination object when joining, created lazily if needed
159 private volatile CountDownLatch termination;
160
161 // has the value 1 when on the list of virtual threads waiting to be unblocked
162 private volatile byte onWaitingList;
163
164 // next virtual thread on the list of virtual threads waiting to be unblocked
165 private volatile VirtualThread next;
166
167 /**
168 * Returns the continuation scope used for virtual threads.
169 */
170 static ContinuationScope continuationScope() {
171 return VTHREAD_SCOPE;
172 }
173
174 /**
175 * Creates a new {@code VirtualThread} to run the given task with the given
176 * scheduler. If the given scheduler is {@code null} and the current thread
177 * is a platform thread then the newly created virtual thread will use the
178 * default scheduler. If given scheduler is {@code null} and the current
179 * thread is a virtual thread then the current thread's scheduler is used.
180 *
181 * @param scheduler the scheduler or null
182 * @param name thread name
183 * @param characteristics characteristics
184 * @param task the task to execute
185 */
186 VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
194 scheduler = vparent.scheduler;
195 } else {
196 scheduler = DEFAULT_SCHEDULER;
197 }
198 }
199
200 this.scheduler = scheduler;
201 this.cont = new VThreadContinuation(this, task);
202 this.runContinuation = this::runContinuation;
203 }
204
205 /**
206 * The continuation that a virtual thread executes.
207 */
208 private static class VThreadContinuation extends Continuation {
209 VThreadContinuation(VirtualThread vthread, Runnable task) {
210 super(VTHREAD_SCOPE, wrap(vthread, task));
211 }
212 @Override
213 protected void onPinned(Continuation.Pinned reason) {
214 // emit JFR event
215 virtualThreadPinnedEvent(reason.value());
216 }
217 private static Runnable wrap(VirtualThread vthread, Runnable task) {
218 return new Runnable() {
219 @Hidden
220 public void run() {
221 vthread.run(task);
222 }
223 };
224 }
225 }
226
227 /**
228 * Runs or continues execution on the current thread. The virtual thread is mounted
229 * on the current thread before the task runs or continues. It unmounts when the
230 * task completes or yields.
231 */
232 @ChangesCurrentThread
233 private void runContinuation() {
234 // the carrier must be a platform thread
235 if (Thread.currentThread().isVirtual()) {
236 throw new WrongThreadException();
237 }
238
239 // set state to RUNNING
240 int initialState = state();
241 if (initialState == STARTED || initialState == UNPARKED
242 || initialState == UNBLOCKED || initialState == YIELDED) {
243 // newly started or continue after parking/blocking/Thread.yield
244 if (!compareAndSetState(initialState, RUNNING)) {
245 return;
246 }
247 // consume parking permit when continuing after parking
248 if (initialState == UNPARKED) {
249 setParkPermit(false);
250 }
251 } else {
252 // not runnable
253 return;
254 }
255
256 mount();
257 try {
258 cont.run();
259 } finally {
260 unmount();
261 if (cont.isDone()) {
262 afterDone();
264 afterYield();
265 }
266 }
267 }
268
269 /**
270 * Submits the runContinuation task to the scheduler. For the default scheduler,
271 * and calling it on a worker thread, the task will be pushed to the local queue,
272 * otherwise it will be pushed to an external submission queue.
273 * @throws RejectedExecutionException
274 */
275 private void submitRunContinuation() {
276 try {
277 scheduler.execute(runContinuation);
278 } catch (RejectedExecutionException ree) {
279 submitFailed(ree);
280 throw ree;
281 }
282 }
283
284 /**
285 * Submits the runContinuation task the scheduler. For the default scheduler, the task
286 * will be pushed to an external submission queue.
287 * @throws RejectedExecutionException
288 */
289 private void externalSubmitRunContinuation() {
290 if (scheduler == DEFAULT_SCHEDULER
291 && currentCarrierThread() instanceof CarrierThread ct) {
292 externalSubmitRunContinuation(ct.getPool());
293 } else {
294 submitRunContinuation();
295 }
296 }
297
298 /**
299 * Submits the runContinuation task to given scheduler with a lazy submit.
300 * @throws RejectedExecutionException
301 * @see ForkJoinPool#lazySubmit(ForkJoinTask)
302 */
303 private void lazySubmitRunContinuation(ForkJoinPool pool) {
304 try {
305 pool.lazySubmit(ForkJoinTask.adapt(runContinuation));
306 } catch (RejectedExecutionException ree) {
307 submitFailed(ree);
308 throw ree;
309 }
310 }
311
312 /**
313 * Submits the runContinuation task to the given scheduler as an external submit.
314 * @throws RejectedExecutionException
315 * @see ForkJoinPool#externalSubmit(ForkJoinTask)
316 */
317 private void externalSubmitRunContinuation(ForkJoinPool pool) {
395 } else if (carrier.isInterrupted()) {
396 synchronized (interruptLock) {
397 // need to recheck interrupt status
398 if (!interrupted) {
399 carrier.clearInterrupt();
400 }
401 }
402 }
403
404 // set Thread.currentThread() to return this virtual thread
405 carrier.setCurrentThread(this);
406 }
407
408 /**
409 * Unmounts this virtual thread from the carrier. On return, the
410 * current thread is the current platform thread.
411 */
412 @ChangesCurrentThread
413 @ReservedStackAccess
414 private void unmount() {
415 assert !Thread.holdsLock(interruptLock);
416
417 // set Thread.currentThread() to return the platform thread
418 Thread carrier = this.carrierThread;
419 carrier.setCurrentThread(carrier);
420
421 // break connection to carrier thread, synchronized with interrupt
422 synchronized (interruptLock) {
423 setCarrierThread(null);
424 }
425 carrier.clearInterrupt();
426
427 // notify JVMTI after unmount
428 notifyJvmtiUnmount(/*hide*/false);
429 }
430
431 /**
432 * Sets the current thread to the current carrier thread.
433 */
434 @ChangesCurrentThread
435 @JvmtiMountTransition
436 private void switchToCarrierThread() {
437 notifyJvmtiHideFrames(true);
438 Thread carrier = this.carrierThread;
439 assert Thread.currentThread() == this
440 && carrier == Thread.currentCarrierThread();
441 carrier.setCurrentThread(carrier);
442 setLockId(this.threadId()); // keep lockid of vthread
443 }
444
445 /**
446 * Sets the current thread to the given virtual thread.
447 */
448 @ChangesCurrentThread
449 @JvmtiMountTransition
450 private void switchToVirtualThread(VirtualThread vthread) {
451 Thread carrier = vthread.carrierThread;
452 assert carrier == Thread.currentCarrierThread();
453 carrier.setCurrentThread(vthread);
454 notifyJvmtiHideFrames(false);
455 }
456
457 /**
458 * Executes the given value returning task on the current carrier thread.
459 */
460 @ChangesCurrentThread
461 <V> V executeOnCarrierThread(Callable<V> task) throws Exception {
462 assert Thread.currentThread() == this;
473 * the continuation continues.
474 */
475 @Hidden
476 private boolean yieldContinuation() {
477 notifyJvmtiUnmount(/*hide*/true);
478 try {
479 return Continuation.yield(VTHREAD_SCOPE);
480 } finally {
481 notifyJvmtiMount(/*hide*/false);
482 }
483 }
484
485 /**
486 * Invoked after the continuation yields. If parking then it sets the state
487 * and also re-submits the task to continue if unparked while parking.
488 * If yielding due to Thread.yield then it just submits the task to continue.
489 */
490 private void afterYield() {
491 assert carrierThread == null;
492
493 // re-adjust parallelism if the virtual thread yielded when compensating
494 if (currentThread() instanceof CarrierThread ct) {
495 ct.endBlocking();
496 }
497
498 int s = state();
499
500 // LockSupport.park/parkNanos
501 if (s == PARKING || s == TIMED_PARKING) {
502 int newState = (s == PARKING) ? PARKED : TIMED_PARKED;
503 setState(newState);
504
505 // may have been unparked while parking
506 if (parkPermit && compareAndSetState(newState, UNPARKED)) {
507 // lazy submit to continue on the current thread as carrier if possible
508 if (currentThread() instanceof CarrierThread ct) {
509 lazySubmitRunContinuation(ct.getPool());
510 } else {
511 submitRunContinuation();
512 }
513
514 }
515 return;
516 }
517
518 // Thread.yield
519 if (s == YIELDING) {
520 setState(YIELDED);
521
522 // external submit if there are no tasks in the local task queue
523 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
524 externalSubmitRunContinuation(ct.getPool());
525 } else {
526 submitRunContinuation();
527 }
528 return;
529 }
530
531 // blocking on monitorenter
532 if (s == BLOCKING) {
533 setState(BLOCKED);
534
535 // may have been unblocked while blocking
536 if (unblocked && compareAndSetState(BLOCKED, UNBLOCKED)) {
537 unblocked = false;
538 submitRunContinuation();
539 return;
540 }
541
542 // if thread is the designated responsible thread for a monitor then schedule
543 // it to wakeup so that it can check and recover. See objectMonitor.cpp.
544 int recheckInterval = this.recheckInterval;
545 if (recheckInterval > 0 && state() == BLOCKED) {
546 assert recheckInterval >= 1 && recheckInterval <= 6;
547 // 4 ^ (recheckInterval - 1) = 1, 4, 16, ... 1024
548 long delay = 1 << (recheckInterval - 1) << (recheckInterval - 1);
549 Future<?> unblocker = delayedTaskScheduler().schedule(this::unblock, delay, MILLISECONDS);
550 // cancel if unblocked while scheduling the unblock
551 if (state() != BLOCKED) {
552 unblocker.cancel(false);
553 }
554 }
555 return;
556 }
557
558 assert false;
559 }
560
561 /**
562 * Invoked after the continuation completes.
563 */
564 private void afterDone() {
565 afterDone(true);
566 }
567
568 /**
569 * Invoked after the continuation completes (or start failed). Sets the thread
570 * state to TERMINATED and notifies anyone waiting for the thread to terminate.
571 *
572 * @param notifyContainer true if its container should be notified
573 */
574 private void afterDone(boolean notifyContainer) {
575 assert carrierThread == null;
576 setState(TERMINATED);
577
601 @Override
602 void start(ThreadContainer container) {
603 if (!compareAndSetState(NEW, STARTED)) {
604 throw new IllegalThreadStateException("Already started");
605 }
606
607 // bind thread to container
608 assert threadContainer() == null;
609 setThreadContainer(container);
610
611 // start thread
612 boolean addedToContainer = false;
613 boolean started = false;
614 try {
615 container.onStart(this); // may throw
616 addedToContainer = true;
617
618 // scoped values may be inherited
619 inheritScopedValueBindings(container);
620
621 // submit task to run thread, using externalSubmit if possible
622 externalSubmitRunContinuation();
623 started = true;
624 } finally {
625 if (!started) {
626 afterDone(addedToContainer);
627 }
628 }
629 }
630
631 @Override
632 public void start() {
633 start(ThreadContainers.root());
634 }
635
636 @Override
637 public void run() {
638 // do nothing
639 }
640
641 /**
642 * Parks until unparked or interrupted. If already unparked then the parking
705 }
706
707 // park on carrier thread for remaining time when pinned
708 if (!yielded) {
709 long remainingNanos = nanos - (System.nanoTime() - startTime);
710 parkOnCarrierThread(true, remainingNanos);
711 }
712 }
713 }
714
715 /**
716 * Parks the current carrier thread up to the given waiting time or until
717 * unparked or interrupted. If the virtual thread is interrupted then the
718 * interrupt status will be propagated to the carrier thread.
719 * @param timed true for a timed park, false for untimed
720 * @param nanos the waiting time in nanoseconds
721 */
722 private void parkOnCarrierThread(boolean timed, long nanos) {
723 assert state() == RUNNING;
724
725 setState(timed ? TIMED_PINNED : PINNED);
726 try {
727 if (!parkPermit) {
728 if (!timed) {
729 U.park(false, 0);
730 } else if (nanos > 0) {
731 U.park(false, nanos);
732 }
733 }
734 } finally {
735 setState(RUNNING);
736 }
737
738 // consume parking permit
739 setParkPermit(false);
740 }
741
742 /**
743 * jdk.VirtualThreadPinned is emitted by HotSpot VM when pinned. Call into VM to
744 * emit event to avoid having a JFR in Java with the same name (but different ID)
745 * to events emitted by the VM.
746 */
747 private static native void virtualThreadPinnedEvent(int reason);
748
749 /**
750 * Schedule this virtual thread to be unparked after a given delay.
751 */
752 @ChangesCurrentThread
753 private Future<?> scheduleUnpark(long nanos) {
754 assert Thread.currentThread() == this;
755 // need to switch to current carrier thread to avoid nested parking
756 switchToCarrierThread();
757 try {
758 return delayedTaskScheduler().schedule(this::unpark, nanos, NANOSECONDS);
759 } finally {
760 switchToVirtualThread(this);
761 }
762 }
763
764 /**
765 * Cancels a task if it has not completed.
766 */
767 @ChangesCurrentThread
768 private void cancel(Future<?> future) {
769 assert Thread.currentThread() == this;
770 if (!future.isDone()) {
771 // need to switch to current carrier thread to avoid nested parking
772 switchToCarrierThread();
773 try {
774 future.cancel(false);
775 } finally {
776 switchToVirtualThread(this);
777 }
778 }
779 }
780
781 /**
782 * Re-enables this virtual thread for scheduling. If the virtual thread was
783 * {@link #park() parked} then it will be unblocked, otherwise its next call
784 * to {@code park} or {@linkplain #parkNanos(long) parkNanos} is guaranteed
785 * not to block.
786 * @throws RejectedExecutionException if the scheduler cannot accept a task
787 */
788 @Override
789 @ChangesCurrentThread
790 void unpark() {
791 Thread currentThread = Thread.currentThread();
792 if (!getAndSetParkPermit(true) && currentThread != this) {
793 int s = state();
794 boolean parked = (s == PARKED) || (s == TIMED_PARKED);
795 if (parked && compareAndSetState(s, UNPARKED)) {
796 if (currentThread instanceof VirtualThread vthread) {
797 vthread.switchToCarrierThread();
798 try {
799 submitRunContinuation();
800 } finally {
801 switchToVirtualThread(vthread);
802 }
803 } else {
804 submitRunContinuation();
805 }
806 } else if ((s == PINNED) || (s == TIMED_PINNED)) {
807 // unpark carrier thread when pinned
808 disableSuspendAndPreempt();
809 try {
810 synchronized (carrierThreadAccessLock()) {
811 Thread carrier = carrierThread;
812 if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) {
813 U.unpark(carrier);
814 }
815 }
816 } finally {
817 enableSuspendAndPreempt();
818 }
819 }
820 }
821 }
822
823 /**
824 * Re-enables this virtual thread for scheduling after blocking on monitor enter.
825 * @throws RejectedExecutionException if the scheduler cannot accept a task
826 */
827 private void unblock() {
828 assert !Thread.currentThread().isVirtual();
829 unblocked = true;
830 if (state() == BLOCKED && compareAndSetState(BLOCKED, UNBLOCKED)) {
831 unblocked = false;
832 submitRunContinuation();
833 }
834 }
835
836 /**
837 * Attempts to yield the current virtual thread (Thread.yield).
838 */
839 void tryYield() {
840 assert Thread.currentThread() == this;
841 setState(YIELDING);
842 boolean yielded = false;
843 try {
844 yielded = yieldContinuation(); // may throw
845 } finally {
846 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
847 if (!yielded) {
848 assert state() == YIELDING;
849 setState(RUNNING);
850 }
851 }
852 }
853
854 /**
855 * Sleep the current thread for the given sleep time (in nanoseconds). If
904 CountDownLatch termination = getTermination();
905 if (state() == TERMINATED)
906 return true;
907
908 // wait for virtual thread to terminate
909 if (nanos == 0) {
910 termination.await();
911 } else {
912 boolean terminated = termination.await(nanos, NANOSECONDS);
913 if (!terminated) {
914 // waiting time elapsed
915 return false;
916 }
917 }
918 assert state() == TERMINATED;
919 return true;
920 }
921
922 @Override
923 void blockedOn(Interruptible b) {
924 disableSuspendAndPreempt();
925 try {
926 super.blockedOn(b);
927 } finally {
928 enableSuspendAndPreempt();
929 }
930 }
931
932 @Override
933 @SuppressWarnings("removal")
934 public void interrupt() {
935 if (Thread.currentThread() != this) {
936 checkAccess();
937
938 // if current thread is a virtual thread then prevent it from being
939 // suspended or unmounted when entering or holding interruptLock
940 Interruptible blocker;
941 disableSuspendAndPreempt();
942 try {
943 synchronized (interruptLock) {
944 interrupted = true;
945 blocker = nioBlocker();
946 if (blocker != null) {
947 blocker.interrupt(this);
948 }
949
950 // interrupt carrier thread if mounted
951 Thread carrier = carrierThread;
952 if (carrier != null) carrier.setInterrupt();
953 }
954 } finally {
955 enableSuspendAndPreempt();
956 }
957
958 // notify blocker after releasing interruptLock
959 if (blocker != null) {
960 blocker.postInterrupt();
961 }
962
963 } else {
964 interrupted = true;
965 carrierThread.setInterrupt();
966 }
967 unpark();
968 }
969
970 @Override
971 public boolean isInterrupted() {
972 return interrupted;
973 }
974
975 @Override
976 boolean getAndClearInterrupt() {
977 assert Thread.currentThread() == this;
978 boolean oldValue = interrupted;
979 if (oldValue) {
980 disableSuspendAndPreempt();
981 try {
982 synchronized (interruptLock) {
983 interrupted = false;
984 carrierThread.clearInterrupt();
985 }
986 } finally {
987 enableSuspendAndPreempt();
988 }
989 }
990 return oldValue;
991 }
992
993 @Override
994 Thread.State threadState() {
995 int s = state();
996 switch (s & ~SUSPENDED) {
997 case NEW:
998 return Thread.State.NEW;
999 case STARTED:
1000 // return NEW if thread container not yet set
1001 if (threadContainer() == null) {
1002 return Thread.State.NEW;
1003 } else {
1004 return Thread.State.RUNNABLE;
1005 }
1006 case UNPARKED:
1007 case YIELDED:
1008 // runnable, not mounted
1009 return Thread.State.RUNNABLE;
1010 case UNBLOCKED:
1011 // if designated responsible thread for monitor then thread is blocked
1012 if (isResponsibleForMonitor()) {
1013 return Thread.State.BLOCKED;
1014 } else {
1015 return Thread.State.RUNNABLE;
1016 }
1017 case RUNNING:
1018 // if designated responsible thread for monitor then thread is blocked
1019 if (isResponsibleForMonitor()) {
1020 return Thread.State.BLOCKED;
1021 }
1022 // if mounted then return state of carrier thread
1023 if (Thread.currentThread() != this) {
1024 disableSuspendAndPreempt();
1025 try {
1026 synchronized (carrierThreadAccessLock()) {
1027 Thread carrierThread = this.carrierThread;
1028 if (carrierThread != null) {
1029 return carrierThread.threadState();
1030 }
1031 }
1032 } finally {
1033 enableSuspendAndPreempt();
1034 }
1035 }
1036 // runnable, mounted
1037 return Thread.State.RUNNABLE;
1038 case PARKING:
1039 case TIMED_PARKING:
1040 case YIELDING:
1041 // runnable, in transition
1042 return Thread.State.RUNNABLE;
1043 case PARKED:
1044 case PINNED:
1045 return State.WAITING;
1046 case TIMED_PARKED:
1047 case TIMED_PINNED:
1048 return State.TIMED_WAITING;
1049 case BLOCKING:
1050 case BLOCKED:
1051 return State.BLOCKED;
1052 case TERMINATED:
1053 return Thread.State.TERMINATED;
1054 default:
1055 throw new InternalError();
1056 }
1057 }
1058
1059 /**
1060 * Returns true if thread is the designated responsible thread for a monitor.
1061 * See objectMonitor.cpp for details.
1062 */
1063 private boolean isResponsibleForMonitor() {
1064 return (recheckInterval > 0);
1065 }
1066
1067 @Override
1068 boolean alive() {
1069 int s = state;
1070 return (s != NEW && s != TERMINATED);
1071 }
1072
1073 @Override
1074 boolean isTerminated() {
1075 return (state == TERMINATED);
1076 }
1077
1078 @Override
1079 StackTraceElement[] asyncGetStackTrace() {
1080 StackTraceElement[] stackTrace;
1081 do {
1082 stackTrace = (carrierThread != null)
1083 ? super.asyncGetStackTrace() // mounted
1084 : tryGetStackTrace(); // unmounted
1085 if (stackTrace == null) {
1086 Thread.yield();
1087 }
1088 } while (stackTrace == null);
1089 return stackTrace;
1090 }
1091
1092 /**
1093 * Returns the stack trace for this virtual thread if it is unmounted.
1094 * Returns null if the thread is mounted or in transition.
1095 */
1096 private StackTraceElement[] tryGetStackTrace() {
1097 int initialState = state() & ~SUSPENDED;
1098 switch (initialState) {
1099 case NEW, STARTED, TERMINATED -> {
1100 return new StackTraceElement[0]; // unmounted, empty stack
1101 }
1102 case RUNNING, PINNED, TIMED_PINNED -> {
1103 return null; // mounted
1104 }
1105 case PARKED, TIMED_PARKED, BLOCKED -> {
1106 // unmounted, not runnable
1107 }
1108 case UNPARKED, UNBLOCKED, YIELDED -> {
1109 // unmounted, runnable
1110 }
1111 case PARKING, TIMED_PARKING, BLOCKING, YIELDING -> {
1112 return null; // in transition
1113 }
1114 default -> throw new InternalError("" + initialState);
1115 }
1116
1117 // thread is unmounted, prevent it from continuing
1118 int suspendedState = initialState | SUSPENDED;
1119 if (!compareAndSetState(initialState, suspendedState)) {
1120 return null;
1121 }
1122
1123 // get stack trace and restore state
1124 StackTraceElement[] stack;
1125 try {
1126 stack = cont.getStackTrace();
1127 } finally {
1128 assert state == suspendedState;
1129 setState(initialState);
1130 }
1131 boolean resubmit = switch (initialState) {
1132 case UNPARKED, UNBLOCKED, YIELDED -> {
1133 // resubmit as task may have run while suspended
1134 yield true;
1135 }
1136 case PARKED, TIMED_PARKED -> {
1137 // resubmit if unparked while suspended
1138 yield parkPermit && compareAndSetState(initialState, UNPARKED);
1139 }
1140 case BLOCKED -> {
1141 // resubmit if unblocked while suspended
1142 yield unblocked && compareAndSetState(BLOCKED, UNBLOCKED);
1143 }
1144 default -> throw new InternalError();
1145 };
1146 if (resubmit) {
1147 submitRunContinuation();
1148 }
1149 return stack;
1150 }
1151
1152 @Override
1153 public String toString() {
1154 StringBuilder sb = new StringBuilder("VirtualThread[#");
1155 sb.append(threadId());
1156 String name = getName();
1157 if (!name.isEmpty()) {
1158 sb.append(",");
1159 sb.append(name);
1160 }
1161 sb.append("]/");
1162
1163 // add the carrier state and thread name when mounted
1164 boolean mounted;
1165 if (Thread.currentThread() == this) {
1166 mounted = appendCarrierInfo(sb);
1167 } else {
1168 disableSuspendAndPreempt();
1169 try {
1170 synchronized (carrierThreadAccessLock()) {
1171 mounted = appendCarrierInfo(sb);
1172 }
1173 } finally {
1174 enableSuspendAndPreempt();
1175 }
1176 }
1177
1178 // add virtual thread state when not mounted
1179 if (!mounted) {
1180 String stateAsString = threadState().toString();
1181 sb.append(stateAsString.toLowerCase(Locale.ROOT));
1182 }
1183
1184 return sb.toString();
1185 }
1186
1187 /**
1188 * Appends the carrier state and thread name to the string buffer if mounted.
1189 * @return true if mounted, false if not mounted
1190 */
1191 private boolean appendCarrierInfo(StringBuilder sb) {
1192 assert Thread.currentThread() == this || Thread.holdsLock(carrierThreadAccessLock());
1193 Thread carrier = carrierThread;
1194 if (carrier != null) {
1195 String stateAsString = carrier.threadState().toString();
1196 sb.append(stateAsString.toLowerCase(Locale.ROOT));
1197 sb.append('@');
1198 sb.append(carrier.getName());
1199 return true;
1200 } else {
1201 return false;
1202 }
1203 }
1204
1205 @Override
1206 public int hashCode() {
1207 return (int) threadId();
1208 }
1209
1210 @Override
1211 public boolean equals(Object obj) {
1212 return obj == this;
1213 }
1214
1215 /**
1216 * Returns a ScheduledExecutorService to execute a delayed task.
1217 */
1218 private ScheduledExecutorService delayedTaskScheduler() {
1219 long tid = Thread.currentThread().threadId();
1220 int index = (int) tid & (DELAYED_TASK_SCHEDULERS.length - 1);
1221 return DELAYED_TASK_SCHEDULERS[index];
1222 }
1223
1224 /**
1225 * Returns the termination object, creating it if needed.
1226 */
1227 private CountDownLatch getTermination() {
1228 CountDownLatch termination = this.termination;
1229 if (termination == null) {
1230 termination = new CountDownLatch(1);
1231 if (!U.compareAndSetReference(this, TERMINATION, null, termination)) {
1232 termination = this.termination;
1233 }
1234 }
1235 return termination;
1236 }
1237
1238 /**
1239 * Returns the lock object to synchronize on when accessing carrierThread.
1240 * The lock prevents carrierThread from being reset to null during unmount.
1241 */
1242 private Object carrierThreadAccessLock() {
1243 // return interruptLock as unmount has to coordinate with interrupt
1244 return interruptLock;
1245 }
1246
1247 /**
1248 * Disallow the current thread be suspended or preempted.
1249 */
1250 private void disableSuspendAndPreempt() {
1251 notifyJvmtiDisableSuspend(true);
1252 Continuation.pin();
1253 }
1254
1255 /**
1256 * Allow the current thread be suspended or preempted.
1257 */
1258 private void enableSuspendAndPreempt() {
1259 Continuation.unpin();
1260 notifyJvmtiDisableSuspend(false);
1261 }
1262
1263 // -- wrappers for get/set of state, parking permit, and carrier thread --
1264
1265 private int state() {
1266 return state; // volatile read
1267 }
1268
1269 private void setState(int newValue) {
1270 state = newValue; // volatile write
1271 }
1272
1273 private boolean compareAndSetState(int expectedValue, int newValue) {
1274 return U.compareAndSetInt(this, STATE, expectedValue, newValue);
1275 }
1276
1277 private boolean compareAndSetOnWaitingList(byte expectedValue, byte newValue) {
1278 return U.compareAndSetByte(this, ON_WAITING_LIST, expectedValue, newValue);
1279 }
1280
1281 private void setParkPermit(boolean newValue) {
1282 if (parkPermit != newValue) {
1283 parkPermit = newValue;
1284 }
1285 }
1286
1287 private boolean getAndSetParkPermit(boolean newValue) {
1288 if (parkPermit != newValue) {
1289 return U.getAndSetBoolean(this, PARK_PERMIT, newValue);
1290 } else {
1291 return newValue;
1292 }
1293 }
1294
1295 private void setCarrierThread(Thread carrier) {
1296 // U.putReferenceRelease(this, CARRIER_THREAD, carrier);
1297 this.carrierThread = carrier;
1298 }
1299
1300 @IntrinsicCandidate
1301 private static native void setLockId(long tid);
1302
1303 // -- JVM TI support --
1304
1305 @IntrinsicCandidate
1306 @JvmtiMountTransition
1307 private native void notifyJvmtiStart();
1308
1309 @IntrinsicCandidate
1310 @JvmtiMountTransition
1311 private native void notifyJvmtiEnd();
1312
1313 @IntrinsicCandidate
1314 @JvmtiMountTransition
1315 private native void notifyJvmtiMount(boolean hide);
1316
1317 @IntrinsicCandidate
1318 @JvmtiMountTransition
1319 private native void notifyJvmtiUnmount(boolean hide);
1320
1321 @IntrinsicCandidate
1322 @JvmtiMountTransition
1352 if (maxPoolSizeValue != null) {
1353 maxPoolSize = Integer.parseInt(maxPoolSizeValue);
1354 parallelism = Integer.min(parallelism, maxPoolSize);
1355 } else {
1356 maxPoolSize = Integer.max(parallelism, 256);
1357 }
1358 if (minRunnableValue != null) {
1359 minRunnable = Integer.parseInt(minRunnableValue);
1360 } else {
1361 minRunnable = Integer.max(parallelism / 2, 1);
1362 }
1363 Thread.UncaughtExceptionHandler handler = (t, e) -> { };
1364 boolean asyncMode = true; // FIFO
1365 return new ForkJoinPool(parallelism, factory, handler, asyncMode,
1366 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);
1367 };
1368 return AccessController.doPrivileged(pa);
1369 }
1370
1371 /**
1372 * Invoked by the VM for the Thread.vthread_scheduler diagnostic command.
1373 */
1374 private static byte[] printDefaultScheduler() {
1375 return String.format("%s%n", DEFAULT_SCHEDULER.toString())
1376 .getBytes(StandardCharsets.UTF_8);
1377 }
1378
1379 /**
1380 * Creates the ScheduledThreadPoolExecutors used to execute delayed tasks.
1381 */
1382 private static ScheduledExecutorService[] createDelayedTaskSchedulers() {
1383 String propName = "jdk.virtualThreadScheduler.timerQueues";
1384 String propValue = GetPropertyAction.privilegedGetProperty(propName);
1385 int queueCount;
1386 if (propValue != null) {
1387 queueCount = Integer.parseInt(propValue);
1388 if (queueCount != Integer.highestOneBit(queueCount)) {
1389 throw new RuntimeException("Value of " + propName + " must be power of 2");
1390 }
1391 } else {
1392 int ncpus = Runtime.getRuntime().availableProcessors();
1393 queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1);
1394 }
1395 var schedulers = new ScheduledExecutorService[queueCount];
1396 for (int i = 0; i < queueCount; i++) {
1397 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor)
1398 Executors.newScheduledThreadPool(1, task -> {
1399 Thread t = InnocuousThread.newThread("VirtualThread-unparker", task);
1400 t.setDaemon(true);
1401 return t;
1402 });
1403 stpe.setRemoveOnCancelPolicy(true);
1404 schedulers[i] = stpe;
1405 }
1406 return schedulers;
1407 }
1408
1409 /**
1410 * Schedule virtual threads that are ready to be scheduled after they blocked on
1411 * monitor enter.
1412 */
1413 private static void unblockVirtualThreads() {
1414 while (true) {
1415 VirtualThread vthread = takeVirtualThreadListToUnblock();
1416 while (vthread != null) {
1417 assert vthread.onWaitingList == 1;
1418 VirtualThread nextThread = vthread.next;
1419
1420 // remove from list and unblock
1421 vthread.next = null;
1422 boolean changed = vthread.compareAndSetOnWaitingList((byte) 1, (byte) 0);
1423 assert changed;
1424 vthread.unblock();
1425
1426 vthread = nextThread;
1427 }
1428 }
1429 }
1430
1431 /**
1432 * Retrieves the list of virtual threads that are waiting to be unblocked, waiting
1433 * if necessary until a list of one or more threads becomes available.
1434 */
1435 private static native VirtualThread takeVirtualThreadListToUnblock();
1436
1437 static {
1438 var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
1439 VirtualThread::unblockVirtualThreads);
1440 unblocker.setDaemon(true);
1441 unblocker.start();
1442 }
1443 }
|