7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package java.lang;
26
27 import java.security.AccessController;
28 import java.security.PrivilegedAction;
29 import java.util.Locale;
30 import java.util.Objects;
31 import java.util.concurrent.Callable;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.Executor;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.ForkJoinPool;
36 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
37 import java.util.concurrent.ForkJoinTask;
38 import java.util.concurrent.ForkJoinWorkerThread;
39 import java.util.concurrent.Future;
40 import java.util.concurrent.RejectedExecutionException;
41 import java.util.concurrent.ScheduledExecutorService;
42 import java.util.concurrent.ScheduledThreadPoolExecutor;
43 import java.util.concurrent.TimeUnit;
44 import jdk.internal.event.VirtualThreadEndEvent;
45 import jdk.internal.event.VirtualThreadPinnedEvent;
46 import jdk.internal.event.VirtualThreadStartEvent;
47 import jdk.internal.event.VirtualThreadSubmitFailedEvent;
48 import jdk.internal.misc.CarrierThread;
49 import jdk.internal.misc.InnocuousThread;
50 import jdk.internal.misc.Unsafe;
51 import jdk.internal.vm.Continuation;
52 import jdk.internal.vm.ContinuationScope;
53 import jdk.internal.vm.StackableScope;
54 import jdk.internal.vm.ThreadContainer;
55 import jdk.internal.vm.ThreadContainers;
56 import jdk.internal.vm.annotation.ChangesCurrentThread;
57 import jdk.internal.vm.annotation.Hidden;
58 import jdk.internal.vm.annotation.IntrinsicCandidate;
59 import jdk.internal.vm.annotation.JvmtiMountTransition;
60 import jdk.internal.vm.annotation.ReservedStackAccess;
61 import sun.nio.ch.Interruptible;
62 import sun.security.action.GetPropertyAction;
63 import static java.util.concurrent.TimeUnit.*;
64
65 /**
66 * A thread that is scheduled by the Java virtual machine rather than the operating system.
67 */
68 final class VirtualThread extends BaseVirtualThread {
69 private static final Unsafe U = Unsafe.getUnsafe();
70 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
71 private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
72 private static final ScheduledExecutorService[] DELAYED_TASK_SCHEDULERS = createDelayedTaskSchedulers();
73 private static final int TRACE_PINNING_MODE = tracePinningMode();
74
75 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
76 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
77 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
78 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
79
80 // scheduler and continuation
81 private final Executor scheduler;
82 private final Continuation cont;
83 private final Runnable runContinuation;
84
85 // virtual thread state, accessed by VM
86 private volatile int state;
87
88 /*
89 * Virtual thread state transitions:
90 *
91 * NEW -> STARTED // Thread.start, schedule to run
92 * STARTED -> TERMINATED // failed to start
93 * STARTED -> RUNNING // first run
94 * RUNNING -> TERMINATED // done
95 *
96 * RUNNING -> PARKING // Thread parking with LockSupport.park
97 * PARKING -> PARKED // cont.yield successful, parked indefinitely
98 * PARKING -> PINNED // cont.yield failed, parked indefinitely on carrier
99 * PARKED -> UNPARKED // unparked, may be scheduled to continue
100 * PINNED -> RUNNING // unparked, continue execution on same carrier
101 * UNPARKED -> RUNNING // continue execution after park
102 *
103 * RUNNING -> TIMED_PARKING // Thread parking with LockSupport.parkNanos
104 * TIMED_PARKING -> TIMED_PARKED // cont.yield successful, timed-parked
105 * TIMED_PARKING -> TIMED_PINNED // cont.yield failed, timed-parked on carrier
106 * TIMED_PARKED -> UNPARKED // unparked, may be scheduled to continue
107 * TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier
108 *
109 * RUNNING -> YIELDING // Thread.yield
110 * YIELDING -> YIELDED // cont.yield successful, may be scheduled to continue
111 * YIELDING -> RUNNING // cont.yield failed
112 * YIELDED -> RUNNING // continue execution after Thread.yield
113 */
114 private static final int NEW = 0;
115 private static final int STARTED = 1;
116 private static final int RUNNING = 2; // runnable-mounted
117
118 // untimed and timed parking
119 private static final int PARKING = 3;
120 private static final int PARKED = 4; // unmounted
121 private static final int PINNED = 5; // mounted
122 private static final int TIMED_PARKING = 6;
123 private static final int TIMED_PARKED = 7; // unmounted
124 private static final int TIMED_PINNED = 8; // mounted
125 private static final int UNPARKED = 9; // unmounted but runnable
126
127 // Thread.yield
128 private static final int YIELDING = 10;
129 private static final int YIELDED = 11; // unmounted but runnable
130
131 private static final int TERMINATED = 99; // final state
132
133 // can be suspended from scheduling when unmounted
134 private static final int SUSPENDED = 1 << 8;
135
136 // parking permit
137 private volatile boolean parkPermit;
138
139 // carrier thread when mounted, accessed by VM
140 private volatile Thread carrierThread;
141
142 // termination object when joining, created lazily if needed
143 private volatile CountDownLatch termination;
144
145 /**
146 * Returns the continuation scope used for virtual threads.
147 */
148 static ContinuationScope continuationScope() {
149 return VTHREAD_SCOPE;
150 }
151
152 /**
153 * Creates a new {@code VirtualThread} to run the given task with the given
154 * scheduler. If the given scheduler is {@code null} and the current thread
155 * is a platform thread then the newly created virtual thread will use the
156 * default scheduler. If given scheduler is {@code null} and the current
157 * thread is a virtual thread then the current thread's scheduler is used.
158 *
159 * @param scheduler the scheduler or null
160 * @param name thread name
161 * @param characteristics characteristics
162 * @param task the task to execute
163 */
164 VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
172 scheduler = vparent.scheduler;
173 } else {
174 scheduler = DEFAULT_SCHEDULER;
175 }
176 }
177
178 this.scheduler = scheduler;
179 this.cont = new VThreadContinuation(this, task);
180 this.runContinuation = this::runContinuation;
181 }
182
183 /**
184 * The continuation that a virtual thread executes.
185 */
186 private static class VThreadContinuation extends Continuation {
187 VThreadContinuation(VirtualThread vthread, Runnable task) {
188 super(VTHREAD_SCOPE, wrap(vthread, task));
189 }
190 @Override
191 protected void onPinned(Continuation.Pinned reason) {
192 if (TRACE_PINNING_MODE > 0) {
193 boolean printAll = (TRACE_PINNING_MODE == 1);
194 VirtualThread vthread = (VirtualThread) Thread.currentThread();
195 int oldState = vthread.state();
196 try {
197 // avoid printing when in transition states
198 vthread.setState(RUNNING);
199 PinnedThreadPrinter.printStackTrace(System.out, reason, printAll);
200 } finally {
201 vthread.setState(oldState);
202 }
203 }
204 }
205 private static Runnable wrap(VirtualThread vthread, Runnable task) {
206 return new Runnable() {
207 @Hidden
208 public void run() {
209 vthread.run(task);
210 }
211 };
212 }
213 }
214
215 /**
216 * Runs or continues execution on the current thread. The virtual thread is mounted
217 * on the current thread before the task runs or continues. It unmounts when the
218 * task completes or yields.
219 */
220 @ChangesCurrentThread // allow mount/unmount to be inlined
221 private void runContinuation() {
222 // the carrier must be a platform thread
223 if (Thread.currentThread().isVirtual()) {
224 throw new WrongThreadException();
225 }
226
227 // set state to RUNNING
228 int initialState = state();
229 if (initialState == STARTED || initialState == UNPARKED || initialState == YIELDED) {
230 // newly started or continue after parking/blocking/Thread.yield
231 if (!compareAndSetState(initialState, RUNNING)) {
232 return;
233 }
234 // consume parking permit when continuing after parking
235 if (initialState == UNPARKED) {
236 setParkPermit(false);
237 }
238 } else {
239 // not runnable
240 return;
241 }
242
243 mount();
244 try {
245 cont.run();
246 } finally {
247 unmount();
248 if (cont.isDone()) {
249 afterDone();
462 synchronized (interruptLock) {
463 setCarrierThread(null);
464 }
465 carrier.clearInterrupt();
466
467 // notify JVMTI after unmount
468 notifyJvmtiUnmount(/*hide*/false);
469 }
470
471 /**
472 * Sets the current thread to the current carrier thread.
473 */
474 @ChangesCurrentThread
475 @JvmtiMountTransition
476 private void switchToCarrierThread() {
477 notifyJvmtiHideFrames(true);
478 Thread carrier = this.carrierThread;
479 assert Thread.currentThread() == this
480 && carrier == Thread.currentCarrierThread();
481 carrier.setCurrentThread(carrier);
482 }
483
484 /**
485 * Sets the current thread to the given virtual thread.
486 */
487 @ChangesCurrentThread
488 @JvmtiMountTransition
489 private static void switchToVirtualThread(VirtualThread vthread) {
490 Thread carrier = vthread.carrierThread;
491 assert carrier == Thread.currentCarrierThread();
492 carrier.setCurrentThread(vthread);
493 notifyJvmtiHideFrames(false);
494 }
495
496 /**
497 * Executes the given value returning task on the current carrier thread.
498 */
499 @ChangesCurrentThread
500 <V> V executeOnCarrierThread(Callable<V> task) throws Exception {
501 assert Thread.currentThread() == this;
549 } else {
550 submitRunContinuation();
551 }
552 }
553 return;
554 }
555
556 // Thread.yield
557 if (s == YIELDING) {
558 setState(YIELDED);
559
560 // external submit if there are no tasks in the local task queue
561 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
562 externalSubmitRunContinuation(ct.getPool());
563 } else {
564 submitRunContinuation();
565 }
566 return;
567 }
568
569 assert false;
570 }
571
572 /**
573 * Invoked after the continuation completes.
574 */
575 private void afterDone() {
576 afterDone(true);
577 }
578
579 /**
580 * Invoked after the continuation completes (or start failed). Sets the thread
581 * state to TERMINATED and notifies anyone waiting for the thread to terminate.
582 *
583 * @param notifyContainer true if its container should be notified
584 */
585 private void afterDone(boolean notifyContainer) {
586 assert carrierThread == null;
587 setState(TERMINATED);
588
649 // do nothing
650 }
651
652 /**
653 * Parks until unparked or interrupted. If already unparked then the parking
654 * permit is consumed and this method completes immediately (meaning it doesn't
655 * yield). It also completes immediately if the interrupt status is set.
656 */
657 @Override
658 void park() {
659 assert Thread.currentThread() == this;
660
661 // complete immediately if parking permit available or interrupted
662 if (getAndSetParkPermit(false) || interrupted)
663 return;
664
665 // park the thread
666 boolean yielded = false;
667 setState(PARKING);
668 try {
669 yielded = yieldContinuation(); // may throw
670 } finally {
671 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
672 if (!yielded) {
673 assert state() == PARKING;
674 setState(RUNNING);
675 }
676 }
677
678 // park on the carrier thread when pinned
679 if (!yielded) {
680 parkOnCarrierThread(false, 0);
681 }
682 }
683
684 /**
685 * Parks up to the given waiting time or until unparked or interrupted.
686 * If already unparked then the parking permit is consumed and this method
687 * completes immediately (meaning it doesn't yield). It also completes immediately
688 * if the interrupt status is set or the waiting time is {@code <= 0}.
689 *
690 * @param nanos the maximum number of nanoseconds to wait.
691 */
692 @Override
693 void parkNanos(long nanos) {
694 assert Thread.currentThread() == this;
695
696 // complete immediately if parking permit available or interrupted
697 if (getAndSetParkPermit(false) || interrupted)
698 return;
699
700 // park the thread for the waiting time
701 if (nanos > 0) {
702 long startTime = System.nanoTime();
703
704 boolean yielded = false;
705 Future<?> unparker = scheduleUnpark(nanos); // may throw OOME
706 setState(TIMED_PARKING);
707 try {
708 yielded = yieldContinuation(); // may throw
709 } finally {
710 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
711 if (!yielded) {
712 assert state() == TIMED_PARKING;
713 setState(RUNNING);
714 }
715 cancel(unparker);
716 }
717
718 // park on carrier thread for remaining time when pinned
719 if (!yielded) {
720 long remainingNanos = nanos - (System.nanoTime() - startTime);
721 parkOnCarrierThread(true, remainingNanos);
722 }
723 }
724 }
725
726 /**
727 * Parks the current carrier thread up to the given waiting time or until
728 * unparked or interrupted. If the virtual thread is interrupted then the
729 * interrupt status will be propagated to the carrier thread.
730 * @param timed true for a timed park, false for untimed
731 * @param nanos the waiting time in nanoseconds
732 */
733 private void parkOnCarrierThread(boolean timed, long nanos) {
734 assert state() == RUNNING;
735
736 VirtualThreadPinnedEvent event;
737 try {
738 event = new VirtualThreadPinnedEvent();
739 event.begin();
740 } catch (OutOfMemoryError e) {
741 event = null;
742 }
743
744 setState(timed ? TIMED_PINNED : PINNED);
745 try {
746 if (!parkPermit) {
747 if (!timed) {
748 U.park(false, 0);
749 } else if (nanos > 0) {
750 U.park(false, nanos);
751 }
752 }
753 } finally {
754 setState(RUNNING);
755 }
756
757 // consume parking permit
758 setParkPermit(false);
759
760 if (event != null) {
761 try {
762 event.commit();
763 } catch (OutOfMemoryError e) {
764 // ignore
765 }
766 }
767 }
768
769 /**
770 * Schedule this virtual thread to be unparked after a given delay.
771 */
772 @ChangesCurrentThread
773 private Future<?> scheduleUnpark(long nanos) {
774 assert Thread.currentThread() == this;
775 // need to switch to current carrier thread to avoid nested parking
776 switchToCarrierThread();
777 try {
778 return schedule(this::unpark, nanos, NANOSECONDS);
779 } finally {
780 switchToVirtualThread(this);
781 }
782 }
783
784 /**
785 * Cancels a task if it has not completed.
786 */
787 @ChangesCurrentThread
788 private void cancel(Future<?> future) {
789 assert Thread.currentThread() == this;
790 if (!future.isDone()) {
791 // need to switch to current carrier thread to avoid nested parking
792 switchToCarrierThread();
793 try {
794 future.cancel(false);
795 } finally {
796 switchToVirtualThread(this);
797 }
798 }
799 }
800
801 /**
802 * Re-enables this virtual thread for scheduling. If this virtual thread is parked
803 * then its task is scheduled to continue, otherwise its next call to {@code park} or
804 * {@linkplain #parkNanos(long) parkNanos} is guaranteed not to block.
805 * @throws RejectedExecutionException if the scheduler cannot accept a task
817
818 // unparked while parked when pinned
819 if (s == PINNED || s == TIMED_PINNED) {
820 // unpark carrier thread when pinned
821 disableSuspendAndPreempt();
822 try {
823 synchronized (carrierThreadAccessLock()) {
824 Thread carrier = carrierThread;
825 if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) {
826 U.unpark(carrier);
827 }
828 }
829 } finally {
830 enableSuspendAndPreempt();
831 }
832 return;
833 }
834 }
835 }
836
837 /**
838 * Attempts to yield the current virtual thread (Thread.yield).
839 */
840 void tryYield() {
841 assert Thread.currentThread() == this;
842 setState(YIELDING);
843 boolean yielded = false;
844 try {
845 yielded = yieldContinuation(); // may throw
846 } finally {
847 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
848 if (!yielded) {
849 assert state() == YIELDING;
850 setState(RUNNING);
851 }
852 }
853 }
854
855 /**
856 * Sleep the current thread for the given sleep time (in nanoseconds). If
947 if (blocker != null) {
948 blocker.interrupt(this);
949 }
950
951 // interrupt carrier thread if mounted
952 Thread carrier = carrierThread;
953 if (carrier != null) carrier.setInterrupt();
954 }
955 } finally {
956 enableSuspendAndPreempt();
957 }
958
959 // notify blocker after releasing interruptLock
960 if (blocker != null) {
961 blocker.postInterrupt();
962 }
963
964 // make available parking permit, unpark thread if parked
965 unpark();
966
967 } else {
968 interrupted = true;
969 carrierThread.setInterrupt();
970 setParkPermit(true);
971 }
972 }
973
974 @Override
975 public boolean isInterrupted() {
976 return interrupted;
977 }
978
979 @Override
980 boolean getAndClearInterrupt() {
981 assert Thread.currentThread() == this;
982 boolean oldValue = interrupted;
983 if (oldValue) {
984 disableSuspendAndPreempt();
985 try {
986 synchronized (interruptLock) {
994 return oldValue;
995 }
996
997 @Override
998 Thread.State threadState() {
999 int s = state();
1000 switch (s & ~SUSPENDED) {
1001 case NEW:
1002 return Thread.State.NEW;
1003 case STARTED:
1004 // return NEW if thread container not yet set
1005 if (threadContainer() == null) {
1006 return Thread.State.NEW;
1007 } else {
1008 return Thread.State.RUNNABLE;
1009 }
1010 case UNPARKED:
1011 case YIELDED:
1012 // runnable, not mounted
1013 return Thread.State.RUNNABLE;
1014 case RUNNING:
1015 // if mounted then return state of carrier thread
1016 if (Thread.currentThread() != this) {
1017 disableSuspendAndPreempt();
1018 try {
1019 synchronized (carrierThreadAccessLock()) {
1020 Thread carrierThread = this.carrierThread;
1021 if (carrierThread != null) {
1022 return carrierThread.threadState();
1023 }
1024 }
1025 } finally {
1026 enableSuspendAndPreempt();
1027 }
1028 }
1029 // runnable, mounted
1030 return Thread.State.RUNNABLE;
1031 case PARKING:
1032 case TIMED_PARKING:
1033 case YIELDING:
1034 // runnable, in transition
1035 return Thread.State.RUNNABLE;
1036 case PARKED:
1037 case PINNED:
1038 return State.WAITING;
1039 case TIMED_PARKED:
1040 case TIMED_PINNED:
1041 return State.TIMED_WAITING;
1042 case TERMINATED:
1043 return Thread.State.TERMINATED;
1044 default:
1045 throw new InternalError();
1046 }
1047 }
1048
1049 @Override
1050 boolean alive() {
1051 int s = state;
1052 return (s != NEW && s != TERMINATED);
1053 }
1054
1055 @Override
1056 boolean isTerminated() {
1057 return (state == TERMINATED);
1058 }
1059
1060 @Override
1061 StackTraceElement[] asyncGetStackTrace() {
1062 StackTraceElement[] stackTrace;
1063 do {
1064 stackTrace = (carrierThread != null)
1065 ? super.asyncGetStackTrace() // mounted
1066 : tryGetStackTrace(); // unmounted
1067 if (stackTrace == null) {
1068 Thread.yield();
1069 }
1070 } while (stackTrace == null);
1071 return stackTrace;
1072 }
1073
1074 /**
1075 * Returns the stack trace for this virtual thread if it is unmounted.
1076 * Returns null if the thread is mounted or in transition.
1077 */
1078 private StackTraceElement[] tryGetStackTrace() {
1079 int initialState = state() & ~SUSPENDED;
1080 switch (initialState) {
1081 case NEW, STARTED, TERMINATED -> {
1082 return new StackTraceElement[0]; // unmounted, empty stack
1083 }
1084 case RUNNING, PINNED, TIMED_PINNED -> {
1085 return null; // mounted
1086 }
1087 case PARKED, TIMED_PARKED -> {
1088 // unmounted, not runnable
1089 }
1090 case UNPARKED, YIELDED -> {
1091 // unmounted, runnable
1092 }
1093 case PARKING, TIMED_PARKING, YIELDING -> {
1094 return null; // in transition
1095 }
1096 default -> throw new InternalError("" + initialState);
1097 }
1098
1099 // thread is unmounted, prevent it from continuing
1100 int suspendedState = initialState | SUSPENDED;
1101 if (!compareAndSetState(initialState, suspendedState)) {
1102 return null;
1103 }
1104
1105 // get stack trace and restore state
1106 StackTraceElement[] stack;
1107 try {
1108 stack = cont.getStackTrace();
1109 } finally {
1110 assert state == suspendedState;
1111 setState(initialState);
1112 }
1113 boolean resubmit = switch (initialState) {
1114 case UNPARKED, YIELDED -> {
1115 // resubmit as task may have run while suspended
1116 yield true;
1117 }
1118 case PARKED, TIMED_PARKED -> {
1119 // resubmit if unparked while suspended
1120 yield parkPermit && compareAndSetState(initialState, UNPARKED);
1121 }
1122 default -> throw new InternalError();
1123 };
1124 if (resubmit) {
1125 submitRunContinuation();
1126 }
1127 return stack;
1128 }
1129
1130 @Override
1131 public String toString() {
1132 StringBuilder sb = new StringBuilder("VirtualThread[#");
1133 sb.append(threadId());
1134 String name = getName();
1135 if (!name.isEmpty()) {
1136 sb.append(",");
1137 sb.append(name);
1138 }
1139 sb.append("]/");
1140
1141 // add the carrier state and thread name when mounted
1196 private CountDownLatch getTermination() {
1197 CountDownLatch termination = this.termination;
1198 if (termination == null) {
1199 termination = new CountDownLatch(1);
1200 if (!U.compareAndSetReference(this, TERMINATION, null, termination)) {
1201 termination = this.termination;
1202 }
1203 }
1204 return termination;
1205 }
1206
1207 /**
1208 * Returns the lock object to synchronize on when accessing carrierThread.
1209 * The lock prevents carrierThread from being reset to null during unmount.
1210 */
1211 private Object carrierThreadAccessLock() {
1212 // return interruptLock as unmount has to coordinate with interrupt
1213 return interruptLock;
1214 }
1215
1216 /**
1217 * Disallow the current thread be suspended or preempted.
1218 */
1219 private void disableSuspendAndPreempt() {
1220 notifyJvmtiDisableSuspend(true);
1221 Continuation.pin();
1222 }
1223
1224 /**
1225 * Allow the current thread be suspended or preempted.
1226 */
1227 private void enableSuspendAndPreempt() {
1228 Continuation.unpin();
1229 notifyJvmtiDisableSuspend(false);
1230 }
1231
1232 // -- wrappers for get/set of state, parking permit, and carrier thread --
1233
1234 private int state() {
1235 return state; // volatile read
1236 }
1237
1238 private void setState(int newValue) {
1239 state = newValue; // volatile write
1240 }
1241
1242 private boolean compareAndSetState(int expectedValue, int newValue) {
1243 return U.compareAndSetInt(this, STATE, expectedValue, newValue);
1244 }
1245
1246 private void setParkPermit(boolean newValue) {
1247 if (parkPermit != newValue) {
1248 parkPermit = newValue;
1249 }
1250 }
1251
1252 private boolean getAndSetParkPermit(boolean newValue) {
1253 if (parkPermit != newValue) {
1254 return U.getAndSetBoolean(this, PARK_PERMIT, newValue);
1255 } else {
1256 return newValue;
1257 }
1258 }
1259
1260 private void setCarrierThread(Thread carrier) {
1261 // U.putReferenceRelease(this, CARRIER_THREAD, carrier);
1262 this.carrierThread = carrier;
1263 }
1264
1265 // -- JVM TI support --
1276 @JvmtiMountTransition
1277 private native void notifyJvmtiMount(boolean hide);
1278
1279 @IntrinsicCandidate
1280 @JvmtiMountTransition
1281 private native void notifyJvmtiUnmount(boolean hide);
1282
1283 @IntrinsicCandidate
1284 @JvmtiMountTransition
1285 private static native void notifyJvmtiHideFrames(boolean hide);
1286
1287 @IntrinsicCandidate
1288 private static native void notifyJvmtiDisableSuspend(boolean enter);
1289
1290 private static native void registerNatives();
1291 static {
1292 registerNatives();
1293
1294 // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
1295 var group = Thread.virtualThreadGroup();
1296
1297 // ensure VirtualThreadPinnedEvent is loaded/initialized
1298 U.ensureClassInitialized(VirtualThreadPinnedEvent.class);
1299 }
1300
1301 /**
1302 * Creates the default ForkJoinPool scheduler.
1303 */
1304 @SuppressWarnings("removal")
1305 private static ForkJoinPool createDefaultScheduler() {
1306 ForkJoinWorkerThreadFactory factory = pool -> {
1307 PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread(pool);
1308 return AccessController.doPrivileged(pa);
1309 };
1310 PrivilegedAction<ForkJoinPool> pa = () -> {
1311 int parallelism, maxPoolSize, minRunnable;
1312 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
1313 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
1314 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
1315 if (parallelismValue != null) {
1316 parallelism = Integer.parseInt(parallelismValue);
1317 } else {
1318 parallelism = Runtime.getRuntime().availableProcessors();
1319 }
1320 if (maxPoolSizeValue != null) {
1321 maxPoolSize = Integer.parseInt(maxPoolSizeValue);
1322 parallelism = Integer.min(parallelism, maxPoolSize);
1323 } else {
1324 maxPoolSize = Integer.max(parallelism, 256);
1325 }
1359 }
1360 } else {
1361 int ncpus = Runtime.getRuntime().availableProcessors();
1362 queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1);
1363 }
1364 var schedulers = new ScheduledExecutorService[queueCount];
1365 for (int i = 0; i < queueCount; i++) {
1366 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor)
1367 Executors.newScheduledThreadPool(1, task -> {
1368 Thread t = InnocuousThread.newThread("VirtualThread-unparker", task);
1369 t.setDaemon(true);
1370 return t;
1371 });
1372 stpe.setRemoveOnCancelPolicy(true);
1373 schedulers[i] = stpe;
1374 }
1375 return schedulers;
1376 }
1377
1378 /**
1379 * Reads the value of the jdk.tracePinnedThreads property to determine if stack
1380 * traces should be printed when a carrier thread is pinned when a virtual thread
1381 * attempts to park.
1382 */
1383 private static int tracePinningMode() {
1384 String propValue = GetPropertyAction.privilegedGetProperty("jdk.tracePinnedThreads");
1385 if (propValue != null) {
1386 if (propValue.length() == 0 || "full".equalsIgnoreCase(propValue))
1387 return 1;
1388 if ("short".equalsIgnoreCase(propValue))
1389 return 2;
1390 }
1391 return 0;
1392 }
1393 }
|
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package java.lang;
26
27 import java.lang.reflect.Constructor;
28 import java.security.AccessController;
29 import java.security.PrivilegedAction;
30 import java.util.Arrays;
31 import java.util.Locale;
32 import java.util.Objects;
33 import java.util.concurrent.Callable;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.Executor;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.ForkJoinPool;
38 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
39 import java.util.concurrent.ForkJoinTask;
40 import java.util.concurrent.ForkJoinWorkerThread;
41 import java.util.concurrent.Future;
42 import java.util.concurrent.RejectedExecutionException;
43 import java.util.concurrent.ScheduledExecutorService;
44 import java.util.concurrent.ScheduledThreadPoolExecutor;
45 import java.util.concurrent.TimeUnit;
46 import java.util.stream.Stream;
47 import jdk.internal.event.VirtualThreadEndEvent;
48 import jdk.internal.event.VirtualThreadStartEvent;
49 import jdk.internal.event.VirtualThreadSubmitFailedEvent;
50 import jdk.internal.misc.CarrierThread;
51 import jdk.internal.misc.InnocuousThread;
52 import jdk.internal.misc.Unsafe;
53 import jdk.internal.vm.Continuation;
54 import jdk.internal.vm.ContinuationScope;
55 import jdk.internal.vm.StackableScope;
56 import jdk.internal.vm.ThreadContainer;
57 import jdk.internal.vm.ThreadContainers;
58 import jdk.internal.vm.annotation.ChangesCurrentThread;
59 import jdk.internal.vm.annotation.Hidden;
60 import jdk.internal.vm.annotation.IntrinsicCandidate;
61 import jdk.internal.vm.annotation.JvmtiMountTransition;
62 import jdk.internal.vm.annotation.ReservedStackAccess;
63 import sun.nio.ch.Interruptible;
64 import sun.security.action.GetPropertyAction;
65 import static java.util.concurrent.TimeUnit.*;
66
67 /**
68 * A thread that is scheduled by the Java virtual machine rather than the operating system.
69 */
70 final class VirtualThread extends BaseVirtualThread {
71 private static final Unsafe U = Unsafe.getUnsafe();
72 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
73 private static final Executor DEFAULT_SCHEDULER = createDefaultScheduler();
74 private static final ScheduledExecutorService[] DELAYED_TASK_SCHEDULERS = createDelayedTaskSchedulers();
75
76 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
77 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
78 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
79 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
80 private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
81
82 // scheduler and continuation
83 private final Executor scheduler;
84 private final Continuation cont;
85 private final Runnable runContinuation;
86
87 // virtual thread state, accessed by VM
88 private volatile int state;
89
90 /*
91 * Virtual thread state transitions:
92 *
93 * NEW -> STARTED // Thread.start, schedule to run
94 * STARTED -> TERMINATED // failed to start
95 * STARTED -> RUNNING // first run
96 * RUNNING -> TERMINATED // done
97 *
98 * RUNNING -> PARKING // Thread parking with LockSupport.park
99 * PARKING -> PARKED // cont.yield successful, parked indefinitely
100 * PARKING -> PINNED // cont.yield failed, parked indefinitely on carrier
101 * PARKED -> UNPARKED // unparked, may be scheduled to continue
102 * PINNED -> RUNNING // unparked, continue execution on same carrier
103 * UNPARKED -> RUNNING // continue execution after park
104 *
105 * RUNNING -> TIMED_PARKING // Thread parking with LockSupport.parkNanos
106 * TIMED_PARKING -> TIMED_PARKED // cont.yield successful, timed-parked
107 * TIMED_PARKING -> TIMED_PINNED // cont.yield failed, timed-parked on carrier
108 * TIMED_PARKED -> UNPARKED // unparked, may be scheduled to continue
109 * TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier
110 *
111 * RUNNING -> BLOCKING // blocking on monitor enter
112 * BLOCKING -> BLOCKED // blocked on monitor enter
113 * BLOCKED -> UNBLOCKED // unblocked, may be scheduled to continue
114 * UNBLOCKED -> RUNNING // continue execution after blocked on monitor enter
115 *
116 * RUNNING -> WAITING // transitional state during wait on monitor
117 * WAITING -> WAITED // waiting on monitor
118 * WAITED -> BLOCKED // notified, waiting to be unblocked by monitor owner
119 * WAITED -> UNBLOCKED // timed-out/interrupted
120 *
121 * RUNNING -> TIMED_WAITING // transition state during timed-waiting on monitor
122 * TIMED_WAITING -> TIMED_WAITED // timed-waiting on monitor
123 * TIMED_WAITED -> BLOCKED // notified, waiting to be unblocked by monitor owner
124 * TIMED_WAITED -> UNBLOCKED // timed-out/interrupted
125 *
126 * RUNNING -> YIELDING // Thread.yield
127 * YIELDING -> YIELDED // cont.yield successful, may be scheduled to continue
128 * YIELDING -> RUNNING // cont.yield failed
129 * YIELDED -> RUNNING // continue execution after Thread.yield
130 */
131 private static final int NEW = 0;
132 private static final int STARTED = 1;
133 private static final int RUNNING = 2; // runnable-mounted
134
135 // untimed and timed parking
136 private static final int PARKING = 3;
137 private static final int PARKED = 4; // unmounted
138 private static final int PINNED = 5; // mounted
139 private static final int TIMED_PARKING = 6;
140 private static final int TIMED_PARKED = 7; // unmounted
141 private static final int TIMED_PINNED = 8; // mounted
142 private static final int UNPARKED = 9; // unmounted but runnable
143
144 // Thread.yield
145 private static final int YIELDING = 10;
146 private static final int YIELDED = 11; // unmounted but runnable
147
148 // monitor enter
149 private static final int BLOCKING = 12;
150 private static final int BLOCKED = 13; // unmounted
151 private static final int UNBLOCKED = 14; // unmounted but runnable
152
153 // monitor wait/timed-wait
154 private static final int WAITING = 15;
155 private static final int WAIT = 16; // waiting in Object.wait
156 private static final int TIMED_WAITING = 17;
157 private static final int TIMED_WAIT = 18; // waiting in timed-Object.wait
158
159 private static final int TERMINATED = 99; // final state
160
161 // can be suspended from scheduling when unmounted
162 private static final int SUSPENDED = 1 << 8;
163
164 // parking permit
165 private volatile boolean parkPermit;
166
167 // used to mark thread as ready to be unblocked
168 private volatile boolean unblocked;
169
170 // notified by Object.notify/notifyAll while waiting in Object.wait
171 private volatile boolean notified;
172
173 // timed-wait support
174 private long waitTimeout;
175 private byte timedWaitNonce;
176 private volatile Future<?> waitTimeoutTask;
177
178 // a positive value if "responsible thread" blocked on monitor enter, accessed by VM
179 private volatile byte recheckInterval;
180
181 // carrier thread when mounted, accessed by VM
182 private volatile Thread carrierThread;
183
184 // termination object when joining, created lazily if needed
185 private volatile CountDownLatch termination;
186
187 // has the value 1 when on the list of virtual threads waiting to be unblocked
188 private volatile byte onWaitingList;
189
190 // next virtual thread on the list of virtual threads waiting to be unblocked
191 private volatile VirtualThread next;
192
193 /**
194 * Returns the default scheduler.
195 */
196 static Executor defaultScheduler() {
197 return DEFAULT_SCHEDULER;
198 }
199
200 /**
201 * Returns a stream of the delayed task schedulers used to support timed operations.
202 */
203 static Stream<ScheduledExecutorService> delayedTaskSchedulers() {
204 return Arrays.stream(DELAYED_TASK_SCHEDULERS);
205 }
206
207 /**
208 * Returns the continuation scope used for virtual threads.
209 */
210 static ContinuationScope continuationScope() {
211 return VTHREAD_SCOPE;
212 }
213
214 /**
215 * Creates a new {@code VirtualThread} to run the given task with the given
216 * scheduler. If the given scheduler is {@code null} and the current thread
217 * is a platform thread then the newly created virtual thread will use the
218 * default scheduler. If given scheduler is {@code null} and the current
219 * thread is a virtual thread then the current thread's scheduler is used.
220 *
221 * @param scheduler the scheduler or null
222 * @param name thread name
223 * @param characteristics characteristics
224 * @param task the task to execute
225 */
226 VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
234 scheduler = vparent.scheduler;
235 } else {
236 scheduler = DEFAULT_SCHEDULER;
237 }
238 }
239
240 this.scheduler = scheduler;
241 this.cont = new VThreadContinuation(this, task);
242 this.runContinuation = this::runContinuation;
243 }
244
245 /**
246 * The continuation that a virtual thread executes.
247 */
248 private static class VThreadContinuation extends Continuation {
249 VThreadContinuation(VirtualThread vthread, Runnable task) {
250 super(VTHREAD_SCOPE, wrap(vthread, task));
251 }
252 @Override
253 protected void onPinned(Continuation.Pinned reason) {
254 // emit JFR event
255 virtualThreadPinnedEvent(reason.reasonCode(), reason.reasonString());
256 }
257 private static Runnable wrap(VirtualThread vthread, Runnable task) {
258 return new Runnable() {
259 @Hidden
260 public void run() {
261 vthread.run(task);
262 }
263 };
264 }
265 }
266
267 /**
268 * jdk.VirtualThreadPinned is emitted by HotSpot VM when pinned. Call into VM to
269 * emit event to avoid having a JFR event in Java with the same name (but different ID)
270 * to events emitted by the VM.
271 */
272 private static native void virtualThreadPinnedEvent(int reason, String reasonString);
273
274 /**
275 * Runs or continues execution on the current thread. The virtual thread is mounted
276 * on the current thread before the task runs or continues. It unmounts when the
277 * task completes or yields.
278 */
279 @ChangesCurrentThread // allow mount/unmount to be inlined
280 private void runContinuation() {
281 // the carrier must be a platform thread
282 if (Thread.currentThread().isVirtual()) {
283 throw new WrongThreadException();
284 }
285
286 // set state to RUNNING
287 int initialState = state();
288 if (initialState == STARTED || initialState == UNPARKED
289 || initialState == UNBLOCKED || initialState == YIELDED) {
290 // newly started or continue after parking/blocking/Thread.yield
291 if (!compareAndSetState(initialState, RUNNING)) {
292 return;
293 }
294 // consume parking permit when continuing after parking
295 if (initialState == UNPARKED) {
296 setParkPermit(false);
297 }
298 } else {
299 // not runnable
300 return;
301 }
302
303 mount();
304 try {
305 cont.run();
306 } finally {
307 unmount();
308 if (cont.isDone()) {
309 afterDone();
522 synchronized (interruptLock) {
523 setCarrierThread(null);
524 }
525 carrier.clearInterrupt();
526
527 // notify JVMTI after unmount
528 notifyJvmtiUnmount(/*hide*/false);
529 }
530
531 /**
532 * Sets the current thread to the current carrier thread.
533 */
534 @ChangesCurrentThread
535 @JvmtiMountTransition
536 private void switchToCarrierThread() {
537 notifyJvmtiHideFrames(true);
538 Thread carrier = this.carrierThread;
539 assert Thread.currentThread() == this
540 && carrier == Thread.currentCarrierThread();
541 carrier.setCurrentThread(carrier);
542 Thread.setCurrentLockId(this.threadId()); // keep lock ID of virtual thread
543 }
544
545 /**
546 * Sets the current thread to the given virtual thread.
547 */
548 @ChangesCurrentThread
549 @JvmtiMountTransition
550 private static void switchToVirtualThread(VirtualThread vthread) {
551 Thread carrier = vthread.carrierThread;
552 assert carrier == Thread.currentCarrierThread();
553 carrier.setCurrentThread(vthread);
554 notifyJvmtiHideFrames(false);
555 }
556
557 /**
558 * Executes the given value returning task on the current carrier thread.
559 */
560 @ChangesCurrentThread
561 <V> V executeOnCarrierThread(Callable<V> task) throws Exception {
562 assert Thread.currentThread() == this;
610 } else {
611 submitRunContinuation();
612 }
613 }
614 return;
615 }
616
617 // Thread.yield
618 if (s == YIELDING) {
619 setState(YIELDED);
620
621 // external submit if there are no tasks in the local task queue
622 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
623 externalSubmitRunContinuation(ct.getPool());
624 } else {
625 submitRunContinuation();
626 }
627 return;
628 }
629
630 // blocking on monitorenter
631 if (s == BLOCKING) {
632 setState(BLOCKED);
633
634 // may have been unblocked while blocking
635 if (unblocked && compareAndSetState(BLOCKED, UNBLOCKED)) {
636 unblocked = false;
637 submitRunContinuation();
638 return;
639 }
640
641 // if thread is the designated responsible thread for a monitor then schedule
642 // it to wakeup so that it can check and recover. See objectMonitor.cpp.
643 int recheckInterval = this.recheckInterval;
644 if (recheckInterval > 0) {
645 assert recheckInterval >= 1 && recheckInterval <= 6;
646 // 4 ^ (recheckInterval - 1) = 1, 4, 16, ... 1024
647 long delay = 1 << (recheckInterval - 1) << (recheckInterval - 1);
648 schedule(this::unblock, delay, MILLISECONDS);
649 }
650 return;
651 }
652
653 // Object.wait
654 if (s == WAITING || s == TIMED_WAITING) {
655 byte nonce;
656 int newState;
657 if (s == WAITING) {
658 nonce = 0; // not used
659 setState(newState = WAIT);
660 } else {
661 // synchronize with timeout task (previous timed-wait may be running)
662 synchronized (timedWaitLock()) {
663 nonce = ++timedWaitNonce;
664 setState(newState = TIMED_WAIT);
665 }
666 }
667
668 // may have been notified while in transition to wait state
669 if (notified && compareAndSetState(newState, BLOCKED)) {
670 // may have even been unblocked already
671 if (unblocked && compareAndSetState(BLOCKED, UNBLOCKED)) {
672 unblocked = false;
673 submitRunContinuation();
674 }
675 return;
676 }
677
678 // may have been interrupted while in transition to wait state
679 if (interrupted && compareAndSetState(newState, UNBLOCKED)) {
680 submitRunContinuation();
681 return;
682 }
683
684 // schedule wakeup
685 if (newState == TIMED_WAIT) {
686 assert waitTimeout > 0;
687 waitTimeoutTask = schedule(() -> waitTimeoutExpired(nonce), waitTimeout, MILLISECONDS);
688 }
689 return;
690 }
691
692 assert false;
693 }
694
695 /**
696 * Invoked after the continuation completes.
697 */
698 private void afterDone() {
699 afterDone(true);
700 }
701
702 /**
703 * Invoked after the continuation completes (or start failed). Sets the thread
704 * state to TERMINATED and notifies anyone waiting for the thread to terminate.
705 *
706 * @param notifyContainer true if its container should be notified
707 */
708 private void afterDone(boolean notifyContainer) {
709 assert carrierThread == null;
710 setState(TERMINATED);
711
772 // do nothing
773 }
774
775 /**
776 * Parks until unparked or interrupted. If already unparked then the parking
777 * permit is consumed and this method completes immediately (meaning it doesn't
778 * yield). It also completes immediately if the interrupt status is set.
779 */
780 @Override
781 void park() {
782 assert Thread.currentThread() == this;
783
784 // complete immediately if parking permit available or interrupted
785 if (getAndSetParkPermit(false) || interrupted)
786 return;
787
788 // park the thread
789 boolean yielded = false;
790 setState(PARKING);
791 try {
792 yielded = yieldContinuation();
793 } catch (OutOfMemoryError e) {
794 // park on carrier
795 } finally {
796 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
797 if (!yielded) {
798 assert state() == PARKING;
799 setState(RUNNING);
800 }
801 }
802
803 // park on the carrier thread when pinned
804 if (!yielded) {
805 parkOnCarrierThread(false, 0);
806 }
807 }
808
809 /**
810 * Parks up to the given waiting time or until unparked or interrupted.
811 * If already unparked then the parking permit is consumed and this method
812 * completes immediately (meaning it doesn't yield). It also completes immediately
813 * if the interrupt status is set or the waiting time is {@code <= 0}.
814 *
815 * @param nanos the maximum number of nanoseconds to wait.
816 */
817 @Override
818 void parkNanos(long nanos) {
819 assert Thread.currentThread() == this;
820
821 // complete immediately if parking permit available or interrupted
822 if (getAndSetParkPermit(false) || interrupted)
823 return;
824
825 // park the thread for the waiting time
826 if (nanos > 0) {
827 long startTime = System.nanoTime();
828
829 boolean yielded = false;
830 Future<?> unparker = null;
831 try {
832 unparker = scheduleUnpark(nanos);
833 } catch (OutOfMemoryError e) {
834 // park on carrier
835 }
836 if (unparker != null) {
837 setState(TIMED_PARKING);
838 try {
839 yielded = yieldContinuation();
840 } catch (OutOfMemoryError e) {
841 // park on carrier
842 } finally {
843 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
844 if (!yielded) {
845 assert state() == TIMED_PARKING;
846 setState(RUNNING);
847 }
848 cancel(unparker);
849 }
850 }
851
852 // park on carrier thread for remaining time when pinned (or OOME)
853 if (!yielded) {
854 long remainingNanos = nanos - (System.nanoTime() - startTime);
855 parkOnCarrierThread(true, remainingNanos);
856 }
857 }
858 }
859
860 /**
861 * Parks the current carrier thread up to the given waiting time or until
862 * unparked or interrupted. If the virtual thread is interrupted then the
863 * interrupt status will be propagated to the carrier thread.
864 * @param timed true for a timed park, false for untimed
865 * @param nanos the waiting time in nanoseconds
866 */
867 private void parkOnCarrierThread(boolean timed, long nanos) {
868 assert state() == RUNNING;
869
870 setState(timed ? TIMED_PINNED : PINNED);
871 try {
872 if (!parkPermit) {
873 if (!timed) {
874 U.park(false, 0);
875 } else if (nanos > 0) {
876 U.park(false, nanos);
877 }
878 }
879 } finally {
880 setState(RUNNING);
881 }
882
883 // consume parking permit
884 setParkPermit(false);
885 }
886
887 /**
888 * Invoked by parkNanos to schedule this virtual thread to be unparked after
889 * a given delay.
890 */
891 @ChangesCurrentThread
892 private Future<?> scheduleUnpark(long nanos) {
893 assert Thread.currentThread() == this;
894 // need to switch to current carrier thread to avoid nested parking
895 switchToCarrierThread();
896 try {
897 return schedule(this::unpark, nanos, NANOSECONDS);
898 } finally {
899 switchToVirtualThread(this);
900 }
901 }
902
903 /**
904 * Invoked by parkNanos to cancel the unpark timer.
905 */
906 @ChangesCurrentThread
907 private void cancel(Future<?> future) {
908 assert Thread.currentThread() == this;
909 if (!future.isDone()) {
910 // need to switch to current carrier thread to avoid nested parking
911 switchToCarrierThread();
912 try {
913 future.cancel(false);
914 } finally {
915 switchToVirtualThread(this);
916 }
917 }
918 }
919
920 /**
921 * Re-enables this virtual thread for scheduling. If this virtual thread is parked
922 * then its task is scheduled to continue, otherwise its next call to {@code park} or
923 * {@linkplain #parkNanos(long) parkNanos} is guaranteed not to block.
924 * @throws RejectedExecutionException if the scheduler cannot accept a task
936
937 // unparked while parked when pinned
938 if (s == PINNED || s == TIMED_PINNED) {
939 // unpark carrier thread when pinned
940 disableSuspendAndPreempt();
941 try {
942 synchronized (carrierThreadAccessLock()) {
943 Thread carrier = carrierThread;
944 if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) {
945 U.unpark(carrier);
946 }
947 }
948 } finally {
949 enableSuspendAndPreempt();
950 }
951 return;
952 }
953 }
954 }
955
956 /**
957 * Invoked by unblocker thread to unblock this virtual thread.
958 */
959 private void unblock() {
960 assert !Thread.currentThread().isVirtual();
961 unblocked = true;
962 if (state() == BLOCKED && compareAndSetState(BLOCKED, UNBLOCKED)) {
963 unblocked = false;
964 submitRunContinuation();
965 }
966 }
967
968 /**
969 * Invoked by timer thread when wait timeout for virtual thread has expired.
970 * If the virtual thread is in timed-wait then this method will unblock the thread
971 * and submit its task so that it continues and attempts to reenter the monitor.
972 * This method does nothing if the thread has been woken by notify or interrupt.
973 */
974 private void waitTimeoutExpired(byte nounce) {
975 assert !Thread.currentThread().isVirtual();
976 for (;;) {
977 boolean unblocked = false;
978 synchronized (timedWaitLock()) {
979 if (nounce != timedWaitNonce) {
980 // this timeout task is for a past timed-wait
981 return;
982 }
983 int s = state();
984 if (s == TIMED_WAIT) {
985 unblocked = compareAndSetState(TIMED_WAIT, UNBLOCKED);
986 } else if (s != (TIMED_WAIT | SUSPENDED)) {
987 // notified or interrupted, no longer waiting
988 return;
989 }
990 }
991 if (unblocked) {
992 submitRunContinuation();
993 return;
994 }
995 // need to retry when thread is suspended in time-wait
996 Thread.yield();
997 }
998 }
999
1000 /**
1001 * Invoked by Object.wait to cancel the wait timer.
1002 */
1003 void cancelWaitTimeout() {
1004 assert Thread.currentThread() == this;
1005 Future<?> timeoutTask = this.waitTimeoutTask;
1006 if (timeoutTask != null) {
1007 // Pin the continuation to prevent the virtual thread from unmounting
1008 // when there is contention removing the task. This avoids deadlock that
1009 // could arise due to carriers and virtual threads contending for a
1010 // lock on the delay queue.
1011 Continuation.pin();
1012 try {
1013 timeoutTask.cancel(false);
1014 } finally {
1015 Continuation.unpin();
1016 }
1017 }
1018 }
1019
1020 /**
1021 * Attempts to yield the current virtual thread (Thread.yield).
1022 */
1023 void tryYield() {
1024 assert Thread.currentThread() == this;
1025 setState(YIELDING);
1026 boolean yielded = false;
1027 try {
1028 yielded = yieldContinuation(); // may throw
1029 } finally {
1030 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
1031 if (!yielded) {
1032 assert state() == YIELDING;
1033 setState(RUNNING);
1034 }
1035 }
1036 }
1037
1038 /**
1039 * Sleep the current thread for the given sleep time (in nanoseconds). If
1130 if (blocker != null) {
1131 blocker.interrupt(this);
1132 }
1133
1134 // interrupt carrier thread if mounted
1135 Thread carrier = carrierThread;
1136 if (carrier != null) carrier.setInterrupt();
1137 }
1138 } finally {
1139 enableSuspendAndPreempt();
1140 }
1141
1142 // notify blocker after releasing interruptLock
1143 if (blocker != null) {
1144 blocker.postInterrupt();
1145 }
1146
1147 // make available parking permit, unpark thread if parked
1148 unpark();
1149
1150 // if thread is waiting in Object.wait then schedule to try to reenter
1151 int s = state();
1152 if ((s == WAIT || s == TIMED_WAIT) && compareAndSetState(s, UNBLOCKED)) {
1153 submitRunContinuation();
1154 }
1155
1156 } else {
1157 interrupted = true;
1158 carrierThread.setInterrupt();
1159 setParkPermit(true);
1160 }
1161 }
1162
1163 @Override
1164 public boolean isInterrupted() {
1165 return interrupted;
1166 }
1167
1168 @Override
1169 boolean getAndClearInterrupt() {
1170 assert Thread.currentThread() == this;
1171 boolean oldValue = interrupted;
1172 if (oldValue) {
1173 disableSuspendAndPreempt();
1174 try {
1175 synchronized (interruptLock) {
1183 return oldValue;
1184 }
1185
1186 @Override
1187 Thread.State threadState() {
1188 int s = state();
1189 switch (s & ~SUSPENDED) {
1190 case NEW:
1191 return Thread.State.NEW;
1192 case STARTED:
1193 // return NEW if thread container not yet set
1194 if (threadContainer() == null) {
1195 return Thread.State.NEW;
1196 } else {
1197 return Thread.State.RUNNABLE;
1198 }
1199 case UNPARKED:
1200 case YIELDED:
1201 // runnable, not mounted
1202 return Thread.State.RUNNABLE;
1203 case UNBLOCKED:
1204 // if designated responsible thread for monitor then thread is blocked
1205 if (isResponsibleForMonitor()) {
1206 return Thread.State.BLOCKED;
1207 } else {
1208 return Thread.State.RUNNABLE;
1209 }
1210 case RUNNING:
1211 // if designated responsible thread for monitor then thread is blocked
1212 if (isResponsibleForMonitor()) {
1213 return Thread.State.BLOCKED;
1214 }
1215 // if mounted then return state of carrier thread
1216 if (Thread.currentThread() != this) {
1217 disableSuspendAndPreempt();
1218 try {
1219 synchronized (carrierThreadAccessLock()) {
1220 Thread carrierThread = this.carrierThread;
1221 if (carrierThread != null) {
1222 return carrierThread.threadState();
1223 }
1224 }
1225 } finally {
1226 enableSuspendAndPreempt();
1227 }
1228 }
1229 // runnable, mounted
1230 return Thread.State.RUNNABLE;
1231 case PARKING:
1232 case TIMED_PARKING:
1233 case YIELDING:
1234 case WAITING:
1235 case TIMED_WAITING:
1236 // runnable, in transition
1237 return Thread.State.RUNNABLE;
1238 case PARKED:
1239 case PINNED:
1240 case WAIT:
1241 return State.WAITING;
1242 case TIMED_PARKED:
1243 case TIMED_PINNED:
1244 case TIMED_WAIT:
1245 return State.TIMED_WAITING;
1246 case BLOCKING:
1247 case BLOCKED:
1248 return State.BLOCKED;
1249 case TERMINATED:
1250 return Thread.State.TERMINATED;
1251 default:
1252 throw new InternalError();
1253 }
1254 }
1255
1256 /**
1257 * Returns true if thread is the designated responsible thread for a monitor.
1258 * See objectMonitor.cpp for details.
1259 */
1260 private boolean isResponsibleForMonitor() {
1261 return (recheckInterval > 0);
1262 }
1263
1264 @Override
1265 boolean alive() {
1266 int s = state;
1267 return (s != NEW && s != TERMINATED);
1268 }
1269
1270 @Override
1271 boolean isTerminated() {
1272 return (state == TERMINATED);
1273 }
1274
1275 @Override
1276 StackTraceElement[] asyncGetStackTrace() {
1277 StackTraceElement[] stackTrace;
1278 do {
1279 stackTrace = (carrierThread != null)
1280 ? super.asyncGetStackTrace() // mounted
1281 : tryGetStackTrace(); // unmounted
1282 if (stackTrace == null) {
1283 Thread.yield();
1284 }
1285 } while (stackTrace == null);
1286 return stackTrace;
1287 }
1288
1289 /**
1290 * Returns the stack trace for this virtual thread if it is unmounted.
1291 * Returns null if the thread is mounted or in transition.
1292 */
1293 private StackTraceElement[] tryGetStackTrace() {
1294 int initialState = state() & ~SUSPENDED;
1295 switch (initialState) {
1296 case NEW, STARTED, TERMINATED -> {
1297 return new StackTraceElement[0]; // unmounted, empty stack
1298 }
1299 case RUNNING, PINNED, TIMED_PINNED -> {
1300 return null; // mounted
1301 }
1302 case PARKED, TIMED_PARKED, BLOCKED, WAIT, TIMED_WAIT -> {
1303 // unmounted, not runnable
1304 }
1305 case UNPARKED, UNBLOCKED, YIELDED -> {
1306 // unmounted, runnable
1307 }
1308 case PARKING, TIMED_PARKING, BLOCKING, YIELDING, WAITING, TIMED_WAITING -> {
1309 return null; // in transition
1310 }
1311 default -> throw new InternalError("" + initialState);
1312 }
1313
1314 // thread is unmounted, prevent it from continuing
1315 int suspendedState = initialState | SUSPENDED;
1316 if (!compareAndSetState(initialState, suspendedState)) {
1317 return null;
1318 }
1319
1320 // get stack trace and restore state
1321 StackTraceElement[] stack;
1322 try {
1323 stack = cont.getStackTrace();
1324 } finally {
1325 assert state == suspendedState;
1326 setState(initialState);
1327 }
1328 boolean resubmit = switch (initialState) {
1329 case UNPARKED, UNBLOCKED, YIELDED -> {
1330 // resubmit as task may have run while suspended
1331 yield true;
1332 }
1333 case PARKED, TIMED_PARKED -> {
1334 // resubmit if unparked while suspended
1335 yield parkPermit && compareAndSetState(initialState, UNPARKED);
1336 }
1337 case BLOCKED -> {
1338 // resubmit if unblocked while suspended or the responsible thread for a monitor
1339 yield (unblocked || isResponsibleForMonitor())
1340 && compareAndSetState(BLOCKED, UNBLOCKED);
1341 }
1342 case WAIT, TIMED_WAIT -> {
1343 // resubmit if notified or interrupted while waiting (Object.wait)
1344 // waitTimeoutExpired will retry if the timed expired when suspended
1345 yield (notified || interrupted) && compareAndSetState(initialState, UNBLOCKED);
1346 }
1347 default -> throw new InternalError();
1348 };
1349 if (resubmit) {
1350 submitRunContinuation();
1351 }
1352 return stack;
1353 }
1354
1355 @Override
1356 public String toString() {
1357 StringBuilder sb = new StringBuilder("VirtualThread[#");
1358 sb.append(threadId());
1359 String name = getName();
1360 if (!name.isEmpty()) {
1361 sb.append(",");
1362 sb.append(name);
1363 }
1364 sb.append("]/");
1365
1366 // add the carrier state and thread name when mounted
1421 private CountDownLatch getTermination() {
1422 CountDownLatch termination = this.termination;
1423 if (termination == null) {
1424 termination = new CountDownLatch(1);
1425 if (!U.compareAndSetReference(this, TERMINATION, null, termination)) {
1426 termination = this.termination;
1427 }
1428 }
1429 return termination;
1430 }
1431
1432 /**
1433 * Returns the lock object to synchronize on when accessing carrierThread.
1434 * The lock prevents carrierThread from being reset to null during unmount.
1435 */
1436 private Object carrierThreadAccessLock() {
1437 // return interruptLock as unmount has to coordinate with interrupt
1438 return interruptLock;
1439 }
1440
1441 /**
1442 * Returns a lock object to coordinating timed-wait setup and timeout handling.
1443 */
1444 private Object timedWaitLock() {
1445 // use this object for now to avoid the overhead of introducing another lock
1446 return runContinuation;
1447 }
1448
1449 /**
1450 * Disallow the current thread be suspended or preempted.
1451 */
1452 private void disableSuspendAndPreempt() {
1453 notifyJvmtiDisableSuspend(true);
1454 Continuation.pin();
1455 }
1456
1457 /**
1458 * Allow the current thread be suspended or preempted.
1459 */
1460 private void enableSuspendAndPreempt() {
1461 Continuation.unpin();
1462 notifyJvmtiDisableSuspend(false);
1463 }
1464
1465 // -- wrappers for get/set of state, parking permit, and carrier thread --
1466
1467 private int state() {
1468 return state; // volatile read
1469 }
1470
1471 private void setState(int newValue) {
1472 state = newValue; // volatile write
1473 }
1474
1475 private boolean compareAndSetState(int expectedValue, int newValue) {
1476 return U.compareAndSetInt(this, STATE, expectedValue, newValue);
1477 }
1478
1479 private boolean compareAndSetOnWaitingList(byte expectedValue, byte newValue) {
1480 return U.compareAndSetByte(this, ON_WAITING_LIST, expectedValue, newValue);
1481 }
1482
1483 private void setParkPermit(boolean newValue) {
1484 if (parkPermit != newValue) {
1485 parkPermit = newValue;
1486 }
1487 }
1488
1489 private boolean getAndSetParkPermit(boolean newValue) {
1490 if (parkPermit != newValue) {
1491 return U.getAndSetBoolean(this, PARK_PERMIT, newValue);
1492 } else {
1493 return newValue;
1494 }
1495 }
1496
1497 private void setCarrierThread(Thread carrier) {
1498 // U.putReferenceRelease(this, CARRIER_THREAD, carrier);
1499 this.carrierThread = carrier;
1500 }
1501
1502 // -- JVM TI support --
1513 @JvmtiMountTransition
1514 private native void notifyJvmtiMount(boolean hide);
1515
1516 @IntrinsicCandidate
1517 @JvmtiMountTransition
1518 private native void notifyJvmtiUnmount(boolean hide);
1519
1520 @IntrinsicCandidate
1521 @JvmtiMountTransition
1522 private static native void notifyJvmtiHideFrames(boolean hide);
1523
1524 @IntrinsicCandidate
1525 private static native void notifyJvmtiDisableSuspend(boolean enter);
1526
1527 private static native void registerNatives();
1528 static {
1529 registerNatives();
1530
1531 // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
1532 var group = Thread.virtualThreadGroup();
1533 }
1534
1535 /**
1536 * Creates the default scheduler.
1537 * If the system property {@code jdk.virtualThreadScheduler.implClass} is set then
1538 * its value is the name of a class that implements java.util.concurrent.Executor.
1539 * The class is public in an exported package, has a public no-arg constructor,
1540 * and is visible to the system class loader.
1541 * If the system property is not set then the default scheduler will be a
1542 * ForkJoinPool instance.
1543 */
1544 private static Executor createDefaultScheduler() {
1545 String propName = "jdk.virtualThreadScheduler.implClass";
1546 String propValue = GetPropertyAction.privilegedGetProperty(propName);
1547 if (propValue != null) {
1548 try {
1549 Class<?> clazz = Class.forName(propValue, true,
1550 ClassLoader.getSystemClassLoader());
1551 Constructor<?> ctor = clazz.getConstructor();
1552 var scheduler = (Executor) ctor.newInstance();
1553 System.err.println("""
1554 WARNING: Using custom scheduler, this is an experimental feature.""");
1555 return scheduler;
1556 } catch (Exception ex) {
1557 throw new Error(ex);
1558 }
1559 } else {
1560 return createDefaultForkJoinPoolScheduler();
1561 }
1562 }
1563
1564 /**
1565 * Creates the default ForkJoinPool scheduler.
1566 */
1567 @SuppressWarnings("removal")
1568 private static ForkJoinPool createDefaultForkJoinPoolScheduler() {
1569 ForkJoinWorkerThreadFactory factory = pool -> {
1570 PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread(pool);
1571 return AccessController.doPrivileged(pa);
1572 };
1573 PrivilegedAction<ForkJoinPool> pa = () -> {
1574 int parallelism, maxPoolSize, minRunnable;
1575 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
1576 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
1577 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
1578 if (parallelismValue != null) {
1579 parallelism = Integer.parseInt(parallelismValue);
1580 } else {
1581 parallelism = Runtime.getRuntime().availableProcessors();
1582 }
1583 if (maxPoolSizeValue != null) {
1584 maxPoolSize = Integer.parseInt(maxPoolSizeValue);
1585 parallelism = Integer.min(parallelism, maxPoolSize);
1586 } else {
1587 maxPoolSize = Integer.max(parallelism, 256);
1588 }
1622 }
1623 } else {
1624 int ncpus = Runtime.getRuntime().availableProcessors();
1625 queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1);
1626 }
1627 var schedulers = new ScheduledExecutorService[queueCount];
1628 for (int i = 0; i < queueCount; i++) {
1629 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor)
1630 Executors.newScheduledThreadPool(1, task -> {
1631 Thread t = InnocuousThread.newThread("VirtualThread-unparker", task);
1632 t.setDaemon(true);
1633 return t;
1634 });
1635 stpe.setRemoveOnCancelPolicy(true);
1636 schedulers[i] = stpe;
1637 }
1638 return schedulers;
1639 }
1640
1641 /**
1642 * Schedule virtual threads that are ready to be scheduled after they blocked on
1643 * monitor enter.
1644 */
1645 private static void unblockVirtualThreads() {
1646 while (true) {
1647 VirtualThread vthread = takeVirtualThreadListToUnblock();
1648 while (vthread != null) {
1649 assert vthread.onWaitingList == 1;
1650 VirtualThread nextThread = vthread.next;
1651
1652 // remove from list and unblock
1653 vthread.next = null;
1654 boolean changed = vthread.compareAndSetOnWaitingList((byte) 1, (byte) 0);
1655 assert changed;
1656 vthread.unblock();
1657
1658 vthread = nextThread;
1659 }
1660 }
1661 }
1662
1663 /**
1664 * Retrieves the list of virtual threads that are waiting to be unblocked, waiting
1665 * if necessary until a list of one or more threads becomes available.
1666 */
1667 private static native VirtualThread takeVirtualThreadListToUnblock();
1668
1669 static {
1670 var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
1671 VirtualThread::unblockVirtualThreads);
1672 unblocker.setDaemon(true);
1673 unblocker.start();
1674 }
1675 }
|