7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package java.lang;
26
27 import java.security.AccessController;
28 import java.security.PrivilegedAction;
29 import java.util.Locale;
30 import java.util.Objects;
31 import java.util.concurrent.Callable;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.Executor;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.ForkJoinPool;
36 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
37 import java.util.concurrent.ForkJoinTask;
38 import java.util.concurrent.ForkJoinWorkerThread;
39 import java.util.concurrent.Future;
40 import java.util.concurrent.RejectedExecutionException;
41 import java.util.concurrent.ScheduledExecutorService;
42 import java.util.concurrent.ScheduledThreadPoolExecutor;
43 import jdk.internal.event.VirtualThreadEndEvent;
44 import jdk.internal.event.VirtualThreadPinnedEvent;
45 import jdk.internal.event.VirtualThreadStartEvent;
46 import jdk.internal.event.VirtualThreadSubmitFailedEvent;
52 import jdk.internal.vm.StackableScope;
53 import jdk.internal.vm.ThreadContainer;
54 import jdk.internal.vm.ThreadContainers;
55 import jdk.internal.vm.annotation.ChangesCurrentThread;
56 import jdk.internal.vm.annotation.Hidden;
57 import jdk.internal.vm.annotation.IntrinsicCandidate;
58 import jdk.internal.vm.annotation.JvmtiMountTransition;
59 import jdk.internal.vm.annotation.ReservedStackAccess;
60 import sun.nio.ch.Interruptible;
61 import sun.security.action.GetPropertyAction;
62 import static java.util.concurrent.TimeUnit.*;
63
64 /**
65 * A thread that is scheduled by the Java virtual machine rather than the operating
66 * system.
67 */
68 final class VirtualThread extends BaseVirtualThread {
69 private static final Unsafe U = Unsafe.getUnsafe();
70 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
71 private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
72 private static final ScheduledExecutorService UNPARKER = createDelayedTaskScheduler();
73 private static final int TRACE_PINNING_MODE = tracePinningMode();
74
75 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
76 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
77 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
78 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
79
80 // scheduler and continuation
81 private final Executor scheduler;
82 private final Continuation cont;
83 private final Runnable runContinuation;
84
85 // virtual thread state, accessed by VM
86 private volatile int state;
87
88 /*
89 * Virtual thread state transitions:
90 *
91 * NEW -> STARTED // Thread.start, schedule to run
92 * STARTED -> TERMINATED // failed to start
93 * STARTED -> RUNNING // first run
94 * RUNNING -> TERMINATED // done
95 *
96 * RUNNING -> PARKING // Thread parking with LockSupport.park
97 * PARKING -> PARKED // cont.yield successful, parked indefinitely
98 * PARKING -> PINNED // cont.yield failed, parked indefinitely on carrier
99 * PARKED -> RUNNABLE // unparked, schedule to continue
100 * PINNED -> RUNNING // unparked, continue execution on same carrier
101 *
102 * RUNNING -> TIMED_PARKING // Thread parking with LockSupport.parkNanos
103 * TIMED_PARKING -> TIMED_PARKED // cont.yield successful, timed-parked
104 * TIMED_PARKING -> TIMED_PINNED // cont.yield failed, timed-parked on carrier
105 * TIMED_PARKED -> RUNNABLE // unparked, schedule to continue
106 * TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier
107 *
108 * RUNNABLE -> RUNNING // continue execution
109 *
110 * RUNNING -> YIELDING // Thread.yield
111 * YIELDING -> RUNNABLE // yield successful
112 * YIELDING -> RUNNING // yield failed
113 */
114 private static final int NEW = 0;
115 private static final int STARTED = 1;
116 private static final int RUNNABLE = 2; // runnable-unmounted
117 private static final int RUNNING = 3; // runnable-mounted
118
119 // untimed parking
120 private static final int PARKING = 4;
121 private static final int PARKED = 5; // unmounted
122 private static final int PINNED = 6; // mounted
123
124 // timed parking
125 private static final int TIMED_PARKING = 7;
126 private static final int TIMED_PARKED = 8;
127 private static final int TIMED_PINNED = 9;
128
129 private static final int YIELDING = 10; // Thread.yield
130
131 private static final int TERMINATED = 99; // final state
132
133 // can be suspended from scheduling when unmounted
134 private static final int SUSPENDED = 1 << 8;
135
136 // parking permit
137 private volatile boolean parkPermit;
138
139 // carrier thread when mounted, accessed by VM
140 private volatile Thread carrierThread;
141
142 // termination object when joining, created lazily if needed
143 private volatile CountDownLatch termination;
144
145 /**
146 * Returns the continuation scope used for virtual threads.
147 */
148 static ContinuationScope continuationScope() {
149 return VTHREAD_SCOPE;
150 }
151
152 /**
153 * Creates a new {@code VirtualThread} to run the given task with the given
154 * scheduler. If the given scheduler is {@code null} and the current thread
155 * is a platform thread then the newly created virtual thread will use the
156 * default scheduler. If given scheduler is {@code null} and the current
157 * thread is a virtual thread then the current thread's scheduler is used.
158 *
174 scheduler = DEFAULT_SCHEDULER;
175 }
176 }
177
178 this.scheduler = scheduler;
179 this.cont = new VThreadContinuation(this, task);
180 this.runContinuation = this::runContinuation;
181 }
182
183 /**
184 * The continuation that a virtual thread executes.
185 */
186 private static class VThreadContinuation extends Continuation {
187 VThreadContinuation(VirtualThread vthread, Runnable task) {
188 super(VTHREAD_SCOPE, wrap(vthread, task));
189 }
190 @Override
191 protected void onPinned(Continuation.Pinned reason) {
192 if (TRACE_PINNING_MODE > 0) {
193 boolean printAll = (TRACE_PINNING_MODE == 1);
194 PinnedThreadPrinter.printStackTrace(System.out, printAll);
195 }
196 }
197 private static Runnable wrap(VirtualThread vthread, Runnable task) {
198 return new Runnable() {
199 @Hidden
200 public void run() {
201 vthread.run(task);
202 }
203 };
204 }
205 }
206
207 /**
208 * Runs or continues execution on the current thread. The virtual thread is mounted
209 * on the current thread before the task runs or continues. It unmounts when the
210 * task completes or yields.
211 */
212 @ChangesCurrentThread
213 private void runContinuation() {
214 // the carrier must be a platform thread
215 if (Thread.currentThread().isVirtual()) {
216 throw new WrongThreadException();
217 }
218
219 // set state to RUNNING
220 int initialState = state();
221 if (initialState == STARTED && compareAndSetState(STARTED, RUNNING)) {
222 // first run
223 } else if (initialState == RUNNABLE && compareAndSetState(RUNNABLE, RUNNING)) {
224 // consume parking permit
225 setParkPermit(false);
226 } else {
227 // not runnable
228 return;
229 }
230
231 mount();
232 try {
233 cont.run();
234 } finally {
235 unmount();
236 if (cont.isDone()) {
237 afterDone();
238 } else {
239 afterYield();
240 }
241 }
242 }
243
244 /**
245 * Submits the runContinuation task to the scheduler. For the default scheduler,
246 * and calling it on a worker thread, the task will be pushed to the local queue,
247 * otherwise it will be pushed to a submission queue.
248 *
249 * @throws RejectedExecutionException
250 */
251 private void submitRunContinuation() {
252 try {
253 scheduler.execute(runContinuation);
254 } catch (RejectedExecutionException ree) {
255 submitFailed(ree);
256 throw ree;
257 }
258 }
259
260 /**
261 * Submits the runContinuation task to the scheduler with a lazy submit.
262 * @throws RejectedExecutionException
263 * @see ForkJoinPool#lazySubmit(ForkJoinTask)
264 */
265 private void lazySubmitRunContinuation(ForkJoinPool pool) {
266 try {
267 pool.lazySubmit(ForkJoinTask.adapt(runContinuation));
268 } catch (RejectedExecutionException ree) {
269 submitFailed(ree);
270 throw ree;
271 }
272 }
273
274 /**
275 * Submits the runContinuation task to the scheduler as an external submit.
276 * @throws RejectedExecutionException
277 * @see ForkJoinPool#externalSubmit(ForkJoinTask)
278 */
279 private void externalSubmitRunContinuation(ForkJoinPool pool) {
280 try {
281 pool.externalSubmit(ForkJoinTask.adapt(runContinuation));
282 } catch (RejectedExecutionException ree) {
283 submitFailed(ree);
284 throw ree;
285 }
286 }
287
288 /**
289 * If enabled, emits a JFR VirtualThreadSubmitFailedEvent.
290 */
291 private void submitFailed(RejectedExecutionException ree) {
292 var event = new VirtualThreadSubmitFailedEvent();
293 if (event.isEnabled()) {
294 event.javaThreadId = threadId();
295 event.exceptionMessage = ree.getMessage();
357 } else if (carrier.isInterrupted()) {
358 synchronized (interruptLock) {
359 // need to recheck interrupt status
360 if (!interrupted) {
361 carrier.clearInterrupt();
362 }
363 }
364 }
365
366 // set Thread.currentThread() to return this virtual thread
367 carrier.setCurrentThread(this);
368 }
369
370 /**
371 * Unmounts this virtual thread from the carrier. On return, the
372 * current thread is the current platform thread.
373 */
374 @ChangesCurrentThread
375 @ReservedStackAccess
376 private void unmount() {
377 // set Thread.currentThread() to return the platform thread
378 Thread carrier = this.carrierThread;
379 carrier.setCurrentThread(carrier);
380
381 // break connection to carrier thread, synchronized with interrupt
382 synchronized (interruptLock) {
383 setCarrierThread(null);
384 }
385 carrier.clearInterrupt();
386
387 // notify JVMTI after unmount
388 notifyJvmtiUnmount(/*hide*/false);
389 }
390
391 /**
392 * Sets the current thread to the current carrier thread.
393 */
394 @ChangesCurrentThread
395 @JvmtiMountTransition
396 private void switchToCarrierThread() {
440 notifyJvmtiMount(/*hide*/false);
441 }
442 }
443
444 /**
445 * Invoked after the continuation yields. If parking then it sets the state
446 * and also re-submits the task to continue if unparked while parking.
447 * If yielding due to Thread.yield then it just submits the task to continue.
448 */
449 private void afterYield() {
450 assert carrierThread == null;
451
452 int s = state();
453
454 // LockSupport.park/parkNanos
455 if (s == PARKING || s == TIMED_PARKING) {
456 int newState = (s == PARKING) ? PARKED : TIMED_PARKED;
457 setState(newState);
458
459 // may have been unparked while parking
460 if (parkPermit && compareAndSetState(newState, RUNNABLE)) {
461 // lazy submit to continue on the current thread as carrier if possible
462 if (currentThread() instanceof CarrierThread ct) {
463 lazySubmitRunContinuation(ct.getPool());
464 } else {
465 submitRunContinuation();
466 }
467
468 }
469 return;
470 }
471
472 // Thread.yield
473 if (s == YIELDING) {
474 setState(RUNNABLE);
475
476 // external submit if there are no tasks in the local task queue
477 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
478 externalSubmitRunContinuation(ct.getPool());
479 } else {
480 submitRunContinuation();
481 }
482 return;
483 }
484
485 assert false;
486 }
487
488 /**
489 * Invoked after the continuation completes.
490 */
491 private void afterDone() {
492 afterDone(true);
493 }
494
495 /**
496 * Invoked after the continuation completes (or start failed). Sets the thread
497 * state to TERMINATED and notifies anyone waiting for the thread to terminate.
498 *
499 * @param notifyContainer true if its container should be notified
500 */
501 private void afterDone(boolean notifyContainer) {
502 assert carrierThread == null;
503 setState(TERMINATED);
504
528 @Override
529 void start(ThreadContainer container) {
530 if (!compareAndSetState(NEW, STARTED)) {
531 throw new IllegalThreadStateException("Already started");
532 }
533
534 // bind thread to container
535 assert threadContainer() == null;
536 setThreadContainer(container);
537
538 // start thread
539 boolean addedToContainer = false;
540 boolean started = false;
541 try {
542 container.onStart(this); // may throw
543 addedToContainer = true;
544
545 // scoped values may be inherited
546 inheritScopedValueBindings(container);
547
548 // submit task to run thread
549 submitRunContinuation();
550 started = true;
551 } finally {
552 if (!started) {
553 afterDone(addedToContainer);
554 }
555 }
556 }
557
558 @Override
559 public void start() {
560 start(ThreadContainers.root());
561 }
562
563 @Override
564 public void run() {
565 // do nothing
566 }
567
568 /**
569 * Parks until unparked or interrupted. If already unparked then the parking
601 * Parks up to the given waiting time or until unparked or interrupted.
602 * If already unparked then the parking permit is consumed and this method
603 * completes immediately (meaning it doesn't yield). It also completes immediately
604 * if the interrupt status is set or the waiting time is {@code <= 0}.
605 *
606 * @param nanos the maximum number of nanoseconds to wait.
607 */
608 @Override
609 void parkNanos(long nanos) {
610 assert Thread.currentThread() == this;
611
612 // complete immediately if parking permit available or interrupted
613 if (getAndSetParkPermit(false) || interrupted)
614 return;
615
616 // park the thread for the waiting time
617 if (nanos > 0) {
618 long startTime = System.nanoTime();
619
620 boolean yielded = false;
621 Future<?> unparker = scheduleUnpark(this::unpark, nanos);
622 setState(TIMED_PARKING);
623 try {
624 yielded = yieldContinuation(); // may throw
625 } finally {
626 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
627 if (!yielded) {
628 assert state() == TIMED_PARKING;
629 setState(RUNNING);
630 }
631 cancel(unparker);
632 }
633
634 // park on carrier thread for remaining time when pinned
635 if (!yielded) {
636 long remainingNanos = nanos - (System.nanoTime() - startTime);
637 parkOnCarrierThread(true, remainingNanos);
638 }
639 }
640 }
641
666 U.park(false, nanos);
667 }
668 }
669 } finally {
670 setState(RUNNING);
671 }
672
673 // consume parking permit
674 setParkPermit(false);
675
676 if (event != null) {
677 try {
678 event.commit();
679 } catch (OutOfMemoryError e) {
680 // ignore
681 }
682 }
683 }
684
685 /**
686 * Schedule an unpark task to run after a given delay.
687 */
688 @ChangesCurrentThread
689 private Future<?> scheduleUnpark(Runnable unparker, long nanos) {
690 // need to switch to current carrier thread to avoid nested parking
691 switchToCarrierThread();
692 try {
693 return UNPARKER.schedule(unparker, nanos, NANOSECONDS);
694 } finally {
695 switchToVirtualThread(this);
696 }
697 }
698
699 /**
700 * Cancels a task if it has not completed.
701 */
702 @ChangesCurrentThread
703 private void cancel(Future<?> future) {
704 if (!future.isDone()) {
705 // need to switch to current carrier thread to avoid nested parking
706 switchToCarrierThread();
707 try {
708 future.cancel(false);
709 } finally {
710 switchToVirtualThread(this);
711 }
712 }
713 }
714
715 /**
716 * Re-enables this virtual thread for scheduling. If the virtual thread was
717 * {@link #park() parked} then it will be unblocked, otherwise its next call
718 * to {@code park} or {@linkplain #parkNanos(long) parkNanos} is guaranteed
719 * not to block.
720 * @throws RejectedExecutionException if the scheduler cannot accept a task
721 */
722 @Override
723 @ChangesCurrentThread
724 void unpark() {
725 Thread currentThread = Thread.currentThread();
726 if (!getAndSetParkPermit(true) && currentThread != this) {
727 int s = state();
728 boolean parked = (s == PARKED) || (s == TIMED_PARKED);
729 if (parked && compareAndSetState(s, RUNNABLE)) {
730 if (currentThread instanceof VirtualThread vthread) {
731 vthread.switchToCarrierThread();
732 try {
733 submitRunContinuation();
734 } finally {
735 switchToVirtualThread(vthread);
736 }
737 } else {
738 submitRunContinuation();
739 }
740 } else if ((s == PINNED) || (s == TIMED_PINNED)) {
741 // unpark carrier thread when pinned.
742 synchronized (carrierThreadAccessLock()) {
743 Thread carrier = carrierThread;
744 if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) {
745 U.unpark(carrier);
746 }
747 }
748 }
749 }
750 }
751
752 /**
753 * Attempts to yield the current virtual thread (Thread.yield).
754 */
755 void tryYield() {
756 assert Thread.currentThread() == this;
757 setState(YIELDING);
758 boolean yielded = false;
759 try {
760 yielded = yieldContinuation(); // may throw
761 } finally {
762 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
763 if (!yielded) {
764 assert state() == YIELDING;
765 setState(RUNNING);
766 }
767 }
768 }
769
770 /**
771 * Sleep the current thread for the given sleep time (in nanoseconds). If
827 } else {
828 boolean terminated = termination.await(nanos, NANOSECONDS);
829 if (!terminated) {
830 // waiting time elapsed
831 return false;
832 }
833 }
834 assert state() == TERMINATED;
835 return true;
836 }
837
838 @Override
839 @SuppressWarnings("removal")
840 public void interrupt() {
841 if (Thread.currentThread() != this) {
842 checkAccess();
843 synchronized (interruptLock) {
844 interrupted = true;
845 Interruptible b = nioBlocker;
846 if (b != null) {
847 b.interrupt(this);
848 }
849
850 // interrupt carrier thread if mounted
851 Thread carrier = carrierThread;
852 if (carrier != null) carrier.setInterrupt();
853 }
854 } else {
855 interrupted = true;
856 carrierThread.setInterrupt();
857 }
858 unpark();
859 }
860
861 @Override
862 public boolean isInterrupted() {
863 return interrupted;
864 }
865
866 @Override
867 boolean getAndClearInterrupt() {
868 assert Thread.currentThread() == this;
869 boolean oldValue = interrupted;
870 if (oldValue) {
871 synchronized (interruptLock) {
872 interrupted = false;
873 carrierThread.clearInterrupt();
874 }
875 }
876 return oldValue;
877 }
878
879 @Override
880 Thread.State threadState() {
881 int s = state();
882 switch (s & ~SUSPENDED) {
883 case NEW:
884 return Thread.State.NEW;
885 case STARTED:
886 // return NEW if thread container not yet set
887 if (threadContainer() == null) {
888 return Thread.State.NEW;
889 } else {
890 return Thread.State.RUNNABLE;
891 }
892 case RUNNABLE:
893 // runnable, not mounted
894 return Thread.State.RUNNABLE;
895 case RUNNING:
896 // if mounted then return state of carrier thread
897 synchronized (carrierThreadAccessLock()) {
898 Thread carrierThread = this.carrierThread;
899 if (carrierThread != null) {
900 return carrierThread.threadState();
901 }
902 }
903 // runnable, mounted
904 return Thread.State.RUNNABLE;
905 case PARKING:
906 case TIMED_PARKING:
907 case YIELDING:
908 // runnable, mounted, not yet waiting
909 return Thread.State.RUNNABLE;
910 case PARKED:
911 case PINNED:
912 return State.WAITING;
913 case TIMED_PARKED:
914 case TIMED_PINNED:
915 return State.TIMED_WAITING;
916 case TERMINATED:
917 return Thread.State.TERMINATED;
918 default:
919 throw new InternalError();
920 }
921 }
922
923 @Override
924 boolean alive() {
925 int s = state;
926 return (s != NEW && s != TERMINATED);
927 }
928
929 @Override
930 boolean isTerminated() {
931 return (state == TERMINATED);
932 }
933
934 @Override
935 StackTraceElement[] asyncGetStackTrace() {
936 StackTraceElement[] stackTrace;
937 do {
938 stackTrace = (carrierThread != null)
939 ? super.asyncGetStackTrace() // mounted
940 : tryGetStackTrace(); // unmounted
941 if (stackTrace == null) {
942 Thread.yield();
943 }
944 } while (stackTrace == null);
945 return stackTrace;
946 }
947
948 /**
949 * Returns the stack trace for this virtual thread if it is unmounted.
950 * Returns null if the thread is in another state.
951 */
952 private StackTraceElement[] tryGetStackTrace() {
953 int initialState = state();
954 return switch (initialState) {
955 case RUNNABLE, PARKED, TIMED_PARKED -> {
956 int suspendedState = initialState | SUSPENDED;
957 if (compareAndSetState(initialState, suspendedState)) {
958 try {
959 yield cont.getStackTrace();
960 } finally {
961 assert state == suspendedState;
962 setState(initialState);
963
964 // re-submit if runnable
965 // re-submit if unparked while suspended
966 if (initialState == RUNNABLE
967 || (parkPermit && compareAndSetState(initialState, RUNNABLE))) {
968 try {
969 submitRunContinuation();
970 } catch (RejectedExecutionException ignore) { }
971 }
972 }
973 }
974 yield null;
975 }
976 case NEW, STARTED, TERMINATED -> new StackTraceElement[0]; // empty stack
977 default -> null;
978 };
979 }
980
981 @Override
982 public String toString() {
983 StringBuilder sb = new StringBuilder("VirtualThread[#");
984 sb.append(threadId());
985 String name = getName();
986 if (!name.isEmpty()) {
987 sb.append(",");
988 sb.append(name);
989 }
990 sb.append("]/");
991 Thread carrier = carrierThread;
992 if (carrier != null) {
993 // include the carrier thread state and name when mounted
994 synchronized (carrierThreadAccessLock()) {
995 carrier = carrierThread;
996 if (carrier != null) {
997 String stateAsString = carrier.threadState().toString();
998 sb.append(stateAsString.toLowerCase(Locale.ROOT));
999 sb.append('@');
1000 sb.append(carrier.getName());
1001 }
1002 }
1003 }
1004 // include virtual thread state when not mounted
1005 if (carrier == null) {
1006 String stateAsString = threadState().toString();
1007 sb.append(stateAsString.toLowerCase(Locale.ROOT));
1008 }
1009 return sb.toString();
1010 }
1011
1012 @Override
1013 public int hashCode() {
1014 return (int) threadId();
1015 }
1016
1017 @Override
1018 public boolean equals(Object obj) {
1019 return obj == this;
1020 }
1021
1022 /**
1023 * Returns the termination object, creating it if needed.
1024 */
1025 private CountDownLatch getTermination() {
1026 CountDownLatch termination = this.termination;
1027 if (termination == null) {
1028 termination = new CountDownLatch(1);
1029 if (!U.compareAndSetReference(this, TERMINATION, null, termination)) {
1030 termination = this.termination;
1031 }
1032 }
1033 return termination;
1034 }
1035
1036 /**
1037 * Returns the lock object to synchronize on when accessing carrierThread.
1038 * The lock prevents carrierThread from being reset to null during unmount.
1039 */
1040 private Object carrierThreadAccessLock() {
1041 // return interruptLock as unmount has to coordinate with interrupt
1100 private static native void registerNatives();
1101 static {
1102 registerNatives();
1103 }
1104
1105 /**
1106 * Creates the default scheduler.
1107 */
1108 @SuppressWarnings("removal")
1109 private static ForkJoinPool createDefaultScheduler() {
1110 ForkJoinWorkerThreadFactory factory = pool -> {
1111 PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread(pool);
1112 return AccessController.doPrivileged(pa);
1113 };
1114 PrivilegedAction<ForkJoinPool> pa = () -> {
1115 int parallelism, maxPoolSize, minRunnable;
1116 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
1117 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
1118 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
1119 if (parallelismValue != null) {
1120 parallelism = Integer.parseInt(parallelismValue);
1121 } else {
1122 parallelism = Runtime.getRuntime().availableProcessors();
1123 }
1124 if (maxPoolSizeValue != null) {
1125 maxPoolSize = Integer.parseInt(maxPoolSizeValue);
1126 parallelism = Integer.min(parallelism, maxPoolSize);
1127 } else {
1128 maxPoolSize = Integer.max(parallelism, 256);
1129 }
1130 if (minRunnableValue != null) {
1131 minRunnable = Integer.parseInt(minRunnableValue);
1132 } else {
1133 minRunnable = Integer.max(parallelism / 2, 1);
1134 }
1135 Thread.UncaughtExceptionHandler handler = (t, e) -> { };
1136 boolean asyncMode = true; // FIFO
1137 return new ForkJoinPool(parallelism, factory, handler, asyncMode,
1138 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);
1139 };
1140 return AccessController.doPrivileged(pa);
1141 }
1142
1143 /**
1144 * Creates the ScheduledThreadPoolExecutor used for timed unpark.
1145 */
1146 private static ScheduledExecutorService createDelayedTaskScheduler() {
1147 String propValue = GetPropertyAction.privilegedGetProperty("jdk.unparker.maxPoolSize");
1148 int poolSize;
1149 if (propValue != null) {
1150 poolSize = Integer.parseInt(propValue);
1151 } else {
1152 poolSize = 1;
1153 }
1154 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor)
1155 Executors.newScheduledThreadPool(poolSize, task -> {
1156 return InnocuousThread.newThread("VirtualThread-unparker", task);
1157 });
1158 stpe.setRemoveOnCancelPolicy(true);
1159 return stpe;
1160 }
1161
1162 /**
1163 * Reads the value of the jdk.tracePinnedThreads property to determine if stack
1164 * traces should be printed when a carrier thread is pinned when a virtual thread
1165 * attempts to park.
1166 */
1167 private static int tracePinningMode() {
1168 String propValue = GetPropertyAction.privilegedGetProperty("jdk.tracePinnedThreads");
1169 if (propValue != null) {
1170 if (propValue.length() == 0 || "full".equalsIgnoreCase(propValue))
1171 return 1;
1172 if ("short".equalsIgnoreCase(propValue))
1173 return 2;
1174 }
1175 return 0;
1176 }
1177 }
|
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package java.lang;
26
27 import java.nio.charset.StandardCharsets;
28 import java.security.AccessController;
29 import java.security.PrivilegedAction;
30 import java.util.Locale;
31 import java.util.Objects;
32 import java.util.concurrent.Callable;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.Executor;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.ForkJoinPool;
37 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
38 import java.util.concurrent.ForkJoinTask;
39 import java.util.concurrent.ForkJoinWorkerThread;
40 import java.util.concurrent.Future;
41 import java.util.concurrent.RejectedExecutionException;
42 import java.util.concurrent.ScheduledExecutorService;
43 import java.util.concurrent.ScheduledThreadPoolExecutor;
44 import jdk.internal.event.VirtualThreadEndEvent;
45 import jdk.internal.event.VirtualThreadPinnedEvent;
46 import jdk.internal.event.VirtualThreadStartEvent;
47 import jdk.internal.event.VirtualThreadSubmitFailedEvent;
53 import jdk.internal.vm.StackableScope;
54 import jdk.internal.vm.ThreadContainer;
55 import jdk.internal.vm.ThreadContainers;
56 import jdk.internal.vm.annotation.ChangesCurrentThread;
57 import jdk.internal.vm.annotation.Hidden;
58 import jdk.internal.vm.annotation.IntrinsicCandidate;
59 import jdk.internal.vm.annotation.JvmtiMountTransition;
60 import jdk.internal.vm.annotation.ReservedStackAccess;
61 import sun.nio.ch.Interruptible;
62 import sun.security.action.GetPropertyAction;
63 import static java.util.concurrent.TimeUnit.*;
64
65 /**
66 * A thread that is scheduled by the Java virtual machine rather than the operating
67 * system.
68 */
69 final class VirtualThread extends BaseVirtualThread {
70 private static final Unsafe U = Unsafe.getUnsafe();
71 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
72 private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
73 private static final ScheduledExecutorService[] DELAYED_TASK_SCHEDULERS = createDelayedTaskSchedulers();
74 private static final int TRACE_PINNING_MODE = tracePinningMode();
75
76 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
77 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
78 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
79 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
80
81 // scheduler and continuation
82 private final Executor scheduler;
83 private final Continuation cont;
84 private final Runnable runContinuation;
85
86 // virtual thread state, accessed by VM
87 private volatile int state;
88
89 /*
90 * Virtual thread state transitions:
91 *
92 * NEW -> STARTED // Thread.start, schedule to run
93 * STARTED -> TERMINATED // failed to start
94 * STARTED -> RUNNING // first run
95 * RUNNING -> TERMINATED // done
96 *
97 * RUNNING -> PARKING // Thread parking with LockSupport.park
98 * PARKING -> PARKED // cont.yield successful, parked indefinitely
99 * PARKING -> PINNED // cont.yield failed, parked indefinitely on carrier
100 * PARKED -> UNPARKED // unparked, may be scheduled to continue
101 * PINNED -> RUNNING // unparked, continue execution on same carrier
102 * UNPARKED -> RUNNING // continue execution after park
103 *
104 * RUNNING -> TIMED_PARKING // Thread parking with LockSupport.parkNanos
105 * TIMED_PARKING -> TIMED_PARKED // cont.yield successful, timed-parked
106 * TIMED_PARKING -> TIMED_PINNED // cont.yield failed, timed-parked on carrier
107 * TIMED_PARKED -> UNPARKED // unparked, may be scheduled to continue
108 * TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier
109 *
110 * RUNNABLE -> BLOCKING // blocking on monitor enter
111 * BLOCKING -> BLOCKED // blocked on monitor enter
112 * BLOCKED -> UNBLOCKED // unblocked, may be scheduled to continue
113 * UNBLOCKED -> RUNNING // continue execution after blocked on monitor enter
114 *
115 * RUNNING -> YIELDING // Thread.yield
116 * YIELDING -> YIELDED // cont.yield successful, may be scheduled to continue
117 * YIELDING -> RUNNING // cont.yield failed
118 * YIELDED -> RUNNING // continue execution after Thread.yield
119 */
120 private static final int NEW = 0;
121 private static final int STARTED = 1;
122 private static final int RUNNING = 2; // runnable-unmounted
123
124 // untimed and timed parking
125 private static final int PARKING = 3;
126 private static final int PARKED = 4; // unmounted
127 private static final int PINNED = 5; // mounted
128 private static final int TIMED_PARKING = 6;
129 private static final int TIMED_PARKED = 7; // unmounted
130 private static final int TIMED_PINNED = 8; // mounted
131 private static final int UNPARKED = 9; // unmounted but runnable
132
133 // Thread.yield
134 private static final int YIELDING = 10;
135 private static final int YIELDED = 11; // unmounted but runnable
136
137 // monitor enter
138 private static final int BLOCKING = 12;
139 private static final int BLOCKED = 13; // unmounted
140 private static final int UNBLOCKED = 14; // unmounted but runnable
141
142 private static final int TERMINATED = 99; // final state
143
144 // can be suspended from scheduling when unmounted
145 private static final int SUSPENDED = 1 << 8;
146
147 // parking permit
148 private volatile boolean parkPermit;
149
150 // unblocked
151 private volatile boolean unblocked;
152
153 // carrier thread when mounted, accessed by VM
154 private volatile Thread carrierThread;
155
156 // termination object when joining, created lazily if needed
157 private volatile CountDownLatch termination;
158
159 /**
160 * Returns the continuation scope used for virtual threads.
161 */
162 static ContinuationScope continuationScope() {
163 return VTHREAD_SCOPE;
164 }
165
166 /**
167 * Creates a new {@code VirtualThread} to run the given task with the given
168 * scheduler. If the given scheduler is {@code null} and the current thread
169 * is a platform thread then the newly created virtual thread will use the
170 * default scheduler. If given scheduler is {@code null} and the current
171 * thread is a virtual thread then the current thread's scheduler is used.
172 *
188 scheduler = DEFAULT_SCHEDULER;
189 }
190 }
191
192 this.scheduler = scheduler;
193 this.cont = new VThreadContinuation(this, task);
194 this.runContinuation = this::runContinuation;
195 }
196
197 /**
198 * The continuation that a virtual thread executes.
199 */
200 private static class VThreadContinuation extends Continuation {
201 VThreadContinuation(VirtualThread vthread, Runnable task) {
202 super(VTHREAD_SCOPE, wrap(vthread, task));
203 }
204 @Override
205 protected void onPinned(Continuation.Pinned reason) {
206 if (TRACE_PINNING_MODE > 0) {
207 boolean printAll = (TRACE_PINNING_MODE == 1);
208 VirtualThread vthread = (VirtualThread) Thread.currentThread();
209 int oldState = vthread.state();
210 try {
211 // avoid printing when in transition states
212 vthread.setState(RUNNING);
213 PinnedThreadPrinter.printStackTrace(System.out, printAll);
214 } finally {
215 vthread.setState(oldState);
216 }
217 }
218 }
219 private static Runnable wrap(VirtualThread vthread, Runnable task) {
220 return new Runnable() {
221 @Hidden
222 public void run() {
223 vthread.run(task);
224 }
225 };
226 }
227 }
228
229 /**
230 * Runs or continues execution on the current thread. The virtual thread is mounted
231 * on the current thread before the task runs or continues. It unmounts when the
232 * task completes or yields.
233 */
234 @ChangesCurrentThread
235 private void runContinuation() {
236 // the carrier must be a platform thread
237 if (Thread.currentThread().isVirtual()) {
238 throw new WrongThreadException();
239 }
240
241 // set state to RUNNING
242 int initialState = state();
243 if (initialState == STARTED || initialState == UNPARKED
244 || initialState == UNBLOCKED || initialState == YIELDED) {
245 // newly started or continue after parking/blocking/Thread.yield
246 if (!compareAndSetState(initialState, RUNNING)) {
247 return;
248 }
249 // consume parking permit when continuing after parking
250 if (initialState == UNPARKED) {
251 setParkPermit(false);
252 }
253 } else {
254 // not runnable
255 return;
256 }
257
258 mount();
259 try {
260 cont.run();
261 } finally {
262 unmount();
263 if (cont.isDone()) {
264 afterDone();
265 } else {
266 afterYield();
267 }
268 }
269 }
270
271 /**
272 * Submits the runContinuation task to the scheduler. For the default scheduler,
273 * and calling it on a worker thread, the task will be pushed to the local queue,
274 * otherwise it will be pushed to an external submission queue.
275 * @throws RejectedExecutionException
276 */
277 private void submitRunContinuation() {
278 try {
279 scheduler.execute(runContinuation);
280 } catch (RejectedExecutionException ree) {
281 submitFailed(ree);
282 throw ree;
283 }
284 }
285
286 /**
287 * Submits the runContinuation task the scheduler. For the default scheduler, the task
288 * will be pushed to an external submission queue.
289 * @throws RejectedExecutionException
290 */
291 private void externalSubmitRunContinuation() {
292 if (scheduler == DEFAULT_SCHEDULER
293 && currentCarrierThread() instanceof CarrierThread ct) {
294 externalSubmitRunContinuation(ct.getPool());
295 } else {
296 submitRunContinuation();
297 }
298 }
299
300 /**
301 * Submits the runContinuation task to given scheduler with a lazy submit.
302 * @throws RejectedExecutionException
303 * @see ForkJoinPool#lazySubmit(ForkJoinTask)
304 */
305 private void lazySubmitRunContinuation(ForkJoinPool pool) {
306 try {
307 pool.lazySubmit(ForkJoinTask.adapt(runContinuation));
308 } catch (RejectedExecutionException ree) {
309 submitFailed(ree);
310 throw ree;
311 }
312 }
313
314 /**
315 * Submits the runContinuation task to the given scheduler as an external submit.
316 * @throws RejectedExecutionException
317 * @see ForkJoinPool#externalSubmit(ForkJoinTask)
318 */
319 private void externalSubmitRunContinuation(ForkJoinPool pool) {
320 try {
321 pool.externalSubmit(ForkJoinTask.adapt(runContinuation));
322 } catch (RejectedExecutionException ree) {
323 submitFailed(ree);
324 throw ree;
325 }
326 }
327
328 /**
329 * If enabled, emits a JFR VirtualThreadSubmitFailedEvent.
330 */
331 private void submitFailed(RejectedExecutionException ree) {
332 var event = new VirtualThreadSubmitFailedEvent();
333 if (event.isEnabled()) {
334 event.javaThreadId = threadId();
335 event.exceptionMessage = ree.getMessage();
397 } else if (carrier.isInterrupted()) {
398 synchronized (interruptLock) {
399 // need to recheck interrupt status
400 if (!interrupted) {
401 carrier.clearInterrupt();
402 }
403 }
404 }
405
406 // set Thread.currentThread() to return this virtual thread
407 carrier.setCurrentThread(this);
408 }
409
410 /**
411 * Unmounts this virtual thread from the carrier. On return, the
412 * current thread is the current platform thread.
413 */
414 @ChangesCurrentThread
415 @ReservedStackAccess
416 private void unmount() {
417 assert !Thread.holdsLock(interruptLock);
418
419 // set Thread.currentThread() to return the platform thread
420 Thread carrier = this.carrierThread;
421 carrier.setCurrentThread(carrier);
422
423 // break connection to carrier thread, synchronized with interrupt
424 synchronized (interruptLock) {
425 setCarrierThread(null);
426 }
427 carrier.clearInterrupt();
428
429 // notify JVMTI after unmount
430 notifyJvmtiUnmount(/*hide*/false);
431 }
432
433 /**
434 * Sets the current thread to the current carrier thread.
435 */
436 @ChangesCurrentThread
437 @JvmtiMountTransition
438 private void switchToCarrierThread() {
482 notifyJvmtiMount(/*hide*/false);
483 }
484 }
485
486 /**
487 * Invoked after the continuation yields. If parking then it sets the state
488 * and also re-submits the task to continue if unparked while parking.
489 * If yielding due to Thread.yield then it just submits the task to continue.
490 */
491 private void afterYield() {
492 assert carrierThread == null;
493
494 int s = state();
495
496 // LockSupport.park/parkNanos
497 if (s == PARKING || s == TIMED_PARKING) {
498 int newState = (s == PARKING) ? PARKED : TIMED_PARKED;
499 setState(newState);
500
501 // may have been unparked while parking
502 if (parkPermit && compareAndSetState(newState, UNPARKED)) {
503 // lazy submit to continue on the current thread as carrier if possible
504 if (currentThread() instanceof CarrierThread ct) {
505 lazySubmitRunContinuation(ct.getPool());
506 } else {
507 submitRunContinuation();
508 }
509
510 }
511 return;
512 }
513
514 // Thread.yield
515 if (s == YIELDING) {
516 setState(YIELDED);
517
518 // external submit if there are no tasks in the local task queue
519 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
520 externalSubmitRunContinuation(ct.getPool());
521 } else {
522 submitRunContinuation();
523 }
524 return;
525 }
526
527 // blocking on monitorenter
528 if (s == BLOCKING) {
529 setState(BLOCKED);
530 if (unblocked && compareAndSetState(BLOCKED, UNBLOCKED)) {
531 unblocked = false;
532 submitRunContinuation();
533 }
534 return;
535 }
536
537 assert false;
538 }
539
540 /**
541 * Invoked after the continuation completes.
542 */
543 private void afterDone() {
544 afterDone(true);
545 }
546
547 /**
548 * Invoked after the continuation completes (or start failed). Sets the thread
549 * state to TERMINATED and notifies anyone waiting for the thread to terminate.
550 *
551 * @param notifyContainer true if its container should be notified
552 */
553 private void afterDone(boolean notifyContainer) {
554 assert carrierThread == null;
555 setState(TERMINATED);
556
580 @Override
581 void start(ThreadContainer container) {
582 if (!compareAndSetState(NEW, STARTED)) {
583 throw new IllegalThreadStateException("Already started");
584 }
585
586 // bind thread to container
587 assert threadContainer() == null;
588 setThreadContainer(container);
589
590 // start thread
591 boolean addedToContainer = false;
592 boolean started = false;
593 try {
594 container.onStart(this); // may throw
595 addedToContainer = true;
596
597 // scoped values may be inherited
598 inheritScopedValueBindings(container);
599
600 // submit task to run thread, using externalSubmit if possible
601 externalSubmitRunContinuation();
602 started = true;
603 } finally {
604 if (!started) {
605 afterDone(addedToContainer);
606 }
607 }
608 }
609
610 @Override
611 public void start() {
612 start(ThreadContainers.root());
613 }
614
615 @Override
616 public void run() {
617 // do nothing
618 }
619
620 /**
621 * Parks until unparked or interrupted. If already unparked then the parking
653 * Parks up to the given waiting time or until unparked or interrupted.
654 * If already unparked then the parking permit is consumed and this method
655 * completes immediately (meaning it doesn't yield). It also completes immediately
656 * if the interrupt status is set or the waiting time is {@code <= 0}.
657 *
658 * @param nanos the maximum number of nanoseconds to wait.
659 */
660 @Override
661 void parkNanos(long nanos) {
662 assert Thread.currentThread() == this;
663
664 // complete immediately if parking permit available or interrupted
665 if (getAndSetParkPermit(false) || interrupted)
666 return;
667
668 // park the thread for the waiting time
669 if (nanos > 0) {
670 long startTime = System.nanoTime();
671
672 boolean yielded = false;
673 Future<?> unparker = scheduleUnpark(nanos); // may throw OOME
674 setState(TIMED_PARKING);
675 try {
676 yielded = yieldContinuation(); // may throw
677 } finally {
678 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
679 if (!yielded) {
680 assert state() == TIMED_PARKING;
681 setState(RUNNING);
682 }
683 cancel(unparker);
684 }
685
686 // park on carrier thread for remaining time when pinned
687 if (!yielded) {
688 long remainingNanos = nanos - (System.nanoTime() - startTime);
689 parkOnCarrierThread(true, remainingNanos);
690 }
691 }
692 }
693
718 U.park(false, nanos);
719 }
720 }
721 } finally {
722 setState(RUNNING);
723 }
724
725 // consume parking permit
726 setParkPermit(false);
727
728 if (event != null) {
729 try {
730 event.commit();
731 } catch (OutOfMemoryError e) {
732 // ignore
733 }
734 }
735 }
736
737 /**
738 * Schedule this virtual thread to be unparked after a given delay.
739 */
740 @ChangesCurrentThread
741 private Future<?> scheduleUnpark(long nanos) {
742 assert Thread.currentThread() == this;
743 // need to switch to current carrier thread to avoid nested parking
744 switchToCarrierThread();
745 try {
746 return delayedTaskScheduler().schedule(this::unpark, nanos, NANOSECONDS);
747 } finally {
748 switchToVirtualThread(this);
749 }
750 }
751
752 /**
753 * Cancels a task if it has not completed.
754 */
755 @ChangesCurrentThread
756 private void cancel(Future<?> future) {
757 assert Thread.currentThread() == this;
758 if (!future.isDone()) {
759 // need to switch to current carrier thread to avoid nested parking
760 switchToCarrierThread();
761 try {
762 future.cancel(false);
763 } finally {
764 switchToVirtualThread(this);
765 }
766 }
767 }
768
769 /**
770 * Re-enables this virtual thread for scheduling. If the virtual thread was
771 * {@link #park() parked} then it will be unblocked, otherwise its next call
772 * to {@code park} or {@linkplain #parkNanos(long) parkNanos} is guaranteed
773 * not to block.
774 * @throws RejectedExecutionException if the scheduler cannot accept a task
775 */
776 @Override
777 @ChangesCurrentThread
778 void unpark() {
779 Thread currentThread = Thread.currentThread();
780 if (!getAndSetParkPermit(true) && currentThread != this) {
781 int s = state();
782 boolean parked = (s == PARKED) || (s == TIMED_PARKED);
783 if (parked && compareAndSetState(s, UNPARKED)) {
784 if (currentThread instanceof VirtualThread vthread) {
785 vthread.switchToCarrierThread();
786 try {
787 submitRunContinuation();
788 } finally {
789 switchToVirtualThread(vthread);
790 }
791 } else {
792 submitRunContinuation();
793 }
794 } else if ((s == PINNED) || (s == TIMED_PINNED)) {
795 // unpark carrier thread when pinned
796 synchronized (carrierThreadAccessLock()) {
797 Thread carrier = carrierThread;
798 if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) {
799 U.unpark(carrier);
800 }
801 }
802 }
803 }
804 }
805
806 /**
807 * Re-enables this virtual thread for scheduling after blocking on monitor enter.
808 * @throws RejectedExecutionException if the scheduler cannot accept a task
809 */
810 private void unblock() {
811 assert !Thread.currentThread().isVirtual();
812 unblocked = true;
813 if (state() == BLOCKED && compareAndSetState(BLOCKED, UNBLOCKED)) {
814 unblocked = false;
815 submitRunContinuation();
816 }
817 }
818
819 /**
820 * Attempts to yield the current virtual thread (Thread.yield).
821 */
822 void tryYield() {
823 assert Thread.currentThread() == this;
824 setState(YIELDING);
825 boolean yielded = false;
826 try {
827 yielded = yieldContinuation(); // may throw
828 } finally {
829 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
830 if (!yielded) {
831 assert state() == YIELDING;
832 setState(RUNNING);
833 }
834 }
835 }
836
837 /**
838 * Sleep the current thread for the given sleep time (in nanoseconds). If
894 } else {
895 boolean terminated = termination.await(nanos, NANOSECONDS);
896 if (!terminated) {
897 // waiting time elapsed
898 return false;
899 }
900 }
901 assert state() == TERMINATED;
902 return true;
903 }
904
905 @Override
906 @SuppressWarnings("removal")
907 public void interrupt() {
908 if (Thread.currentThread() != this) {
909 checkAccess();
910 synchronized (interruptLock) {
911 interrupted = true;
912 Interruptible b = nioBlocker;
913 if (b != null) {
914 // ensure current thread doesn't unmount while holding interruptLock
915 Continuation.pin();
916 try {
917 b.interrupt(this);
918 } finally {
919 Continuation.unpin();
920 }
921 }
922
923 // interrupt carrier thread if mounted
924 Thread carrier = carrierThread;
925 if (carrier != null) carrier.setInterrupt();
926 }
927 } else {
928 interrupted = true;
929 carrierThread.setInterrupt();
930 }
931 unpark();
932 }
933
934 @Override
935 public boolean isInterrupted() {
936 return interrupted;
937 }
938
939 @Override
940 boolean getAndClearInterrupt() {
941 assert Thread.currentThread() == this;
942 boolean oldValue = interrupted;
943 if (oldValue) {
944 // ensure current thread doesn't unmount trying to enter interruptLock
945 Continuation.pin();
946 try {
947 synchronized (interruptLock) {
948 interrupted = false;
949 carrierThread.clearInterrupt();
950 }
951 } finally {
952 Continuation.unpin();
953 }
954 }
955 return oldValue;
956 }
957
958 @Override
959 Thread.State threadState() {
960 int s = state();
961 switch (s & ~SUSPENDED) {
962 case NEW:
963 return Thread.State.NEW;
964 case STARTED:
965 // return NEW if thread container not yet set
966 if (threadContainer() == null) {
967 return Thread.State.NEW;
968 } else {
969 return Thread.State.RUNNABLE;
970 }
971 case UNPARKED:
972 case UNBLOCKED:
973 case YIELDED:
974 // runnable, not mounted
975 return Thread.State.RUNNABLE;
976 case RUNNING:
977 // if mounted then return state of carrier thread
978 if (Thread.currentThread() != this) {
979 synchronized (carrierThreadAccessLock()) {
980 Thread carrier = this.carrierThread;
981 if (carrier != null) {
982 return carrier.threadState();
983 }
984 }
985 }
986 // runnable, mounted
987 return Thread.State.RUNNABLE;
988 case PARKING:
989 case TIMED_PARKING:
990 case YIELDING:
991 case BLOCKING:
992 // runnable, not yet waiting/blocked
993 return Thread.State.RUNNABLE;
994 case PARKED:
995 case PINNED:
996 return State.WAITING;
997 case TIMED_PARKED:
998 case TIMED_PINNED:
999 return State.TIMED_WAITING;
1000 case BLOCKED:
1001 return State.BLOCKED;
1002 case TERMINATED:
1003 return Thread.State.TERMINATED;
1004 default:
1005 throw new InternalError();
1006 }
1007 }
1008
1009 @Override
1010 boolean alive() {
1011 int s = state;
1012 return (s != NEW && s != TERMINATED);
1013 }
1014
1015 @Override
1016 boolean isTerminated() {
1017 return (state == TERMINATED);
1018 }
1019
1020 @Override
1021 StackTraceElement[] asyncGetStackTrace() {
1022 StackTraceElement[] stackTrace;
1023 do {
1024 stackTrace = (carrierThread != null)
1025 ? super.asyncGetStackTrace() // mounted
1026 : tryGetStackTrace(); // unmounted
1027 if (stackTrace == null) {
1028 Thread.yield();
1029 }
1030 } while (stackTrace == null);
1031 return stackTrace;
1032 }
1033
1034 /**
1035 * Returns the stack trace for this virtual thread if it is unmounted.
1036 * Returns null if the thread is mounted or in transition.
1037 */
1038 private StackTraceElement[] tryGetStackTrace() {
1039 int initialState = state();
1040 switch (initialState) {
1041 case NEW, STARTED, TERMINATED -> {
1042 return new StackTraceElement[0]; // unmounted, empty stack
1043 }
1044 case RUNNING, PINNED -> {
1045 return null; // mounted
1046 }
1047 case PARKED, TIMED_PARKED, BLOCKED -> {
1048 // unmounted, not runnable
1049 }
1050 case UNPARKED, UNBLOCKED, YIELDED -> {
1051 // unmounted, runnable
1052 }
1053 case PARKING, TIMED_PARKING, BLOCKING, YIELDING -> {
1054 return null; // in transition
1055 }
1056 default -> throw new InternalError();
1057 }
1058
1059 // thread is unmounted, prevent it from continuing
1060 int suspendedState = initialState | SUSPENDED;
1061 if (!compareAndSetState(initialState, suspendedState)) {
1062 return null;
1063 }
1064
1065 // get stack trace and restore state
1066 StackTraceElement[] stack;
1067 try {
1068 stack = cont.getStackTrace();
1069 } finally {
1070 assert state == suspendedState;
1071 setState(initialState);
1072 }
1073 boolean resubmit = switch (initialState) {
1074 case UNPARKED, UNBLOCKED, YIELDED -> {
1075 // resubmit as task may have run while suspended
1076 yield true;
1077 }
1078 case PARKED, TIMED_PARKED -> {
1079 // resubmit if unparked while suspended
1080 yield parkPermit && compareAndSetState(initialState, UNPARKED);
1081 }
1082 case BLOCKED -> {
1083 // resubmit if unblocked while suspended
1084 yield unblocked && compareAndSetState(BLOCKED, UNBLOCKED);
1085 }
1086 default -> throw new InternalError();
1087 };
1088 if (resubmit) {
1089 submitRunContinuation();
1090 }
1091 return stack;
1092 }
1093
1094 @Override
1095 public String toString() {
1096 StringBuilder sb = new StringBuilder("VirtualThread[#");
1097 sb.append(threadId());
1098 String name = getName();
1099 if (!name.isEmpty()) {
1100 sb.append(",");
1101 sb.append(name);
1102 }
1103 sb.append("]/");
1104
1105 // include the carrier thread state and name when mounted
1106 Thread carrier = carrierThread;
1107 if (Thread.currentThread() == this) {
1108 appendCarrierInfo(sb, carrier);
1109 } else if (carrier != null) {
1110 if (Thread.currentThread() != this) {
1111 synchronized (carrierThreadAccessLock()) {
1112 carrier = carrierThread; // re-read
1113 appendCarrierInfo(sb, carrier);
1114 }
1115 }
1116 }
1117
1118 // include virtual thread state when not mounted
1119 if (carrier == null) {
1120 String stateAsString = threadState().toString();
1121 sb.append(stateAsString.toLowerCase(Locale.ROOT));
1122 }
1123 return sb.toString();
1124 }
1125
1126 /**
1127 * Appends the carrier's state and name to the given string builder when mounted.
1128 */
1129 private void appendCarrierInfo(StringBuilder sb, Thread carrier) {
1130 if (carrier != null) {
1131 String stateAsString = carrier.threadState().toString();
1132 sb.append(stateAsString.toLowerCase(Locale.ROOT));
1133 sb.append('@');
1134 sb.append(carrier.getName());
1135 }
1136 }
1137
1138 @Override
1139 public int hashCode() {
1140 return (int) threadId();
1141 }
1142
1143 @Override
1144 public boolean equals(Object obj) {
1145 return obj == this;
1146 }
1147
1148 /**
1149 * Returns a ScheduledExecutorService to execute a delayed task.
1150 */
1151 private ScheduledExecutorService delayedTaskScheduler() {
1152 long tid = Thread.currentThread().threadId();
1153 int index = (int) tid & (DELAYED_TASK_SCHEDULERS.length - 1);
1154 return DELAYED_TASK_SCHEDULERS[index];
1155 }
1156
1157 /**
1158 * Returns the termination object, creating it if needed.
1159 */
1160 private CountDownLatch getTermination() {
1161 CountDownLatch termination = this.termination;
1162 if (termination == null) {
1163 termination = new CountDownLatch(1);
1164 if (!U.compareAndSetReference(this, TERMINATION, null, termination)) {
1165 termination = this.termination;
1166 }
1167 }
1168 return termination;
1169 }
1170
1171 /**
1172 * Returns the lock object to synchronize on when accessing carrierThread.
1173 * The lock prevents carrierThread from being reset to null during unmount.
1174 */
1175 private Object carrierThreadAccessLock() {
1176 // return interruptLock as unmount has to coordinate with interrupt
1235 private static native void registerNatives();
1236 static {
1237 registerNatives();
1238 }
1239
1240 /**
1241 * Creates the default scheduler.
1242 */
1243 @SuppressWarnings("removal")
1244 private static ForkJoinPool createDefaultScheduler() {
1245 ForkJoinWorkerThreadFactory factory = pool -> {
1246 PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread(pool);
1247 return AccessController.doPrivileged(pa);
1248 };
1249 PrivilegedAction<ForkJoinPool> pa = () -> {
1250 int parallelism, maxPoolSize, minRunnable;
1251 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
1252 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
1253 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
1254 if (parallelismValue != null) {
1255 parallelism = Integer.max(Integer.parseInt(parallelismValue), 1);
1256 } else {
1257 parallelism = Runtime.getRuntime().availableProcessors();
1258 }
1259 if (maxPoolSizeValue != null) {
1260 maxPoolSize = Integer.parseInt(maxPoolSizeValue);
1261 if (maxPoolSize > 0) {
1262 parallelism = Integer.min(parallelism, maxPoolSize);
1263 } else {
1264 maxPoolSize = parallelism; // no spares
1265 }
1266 } else {
1267 maxPoolSize = Integer.max(parallelism, 256);
1268 }
1269 if (minRunnableValue != null) {
1270 minRunnable = Integer.parseInt(minRunnableValue);
1271 } else {
1272 minRunnable = Integer.max(parallelism / 2, 1);
1273 }
1274 Thread.UncaughtExceptionHandler handler = (t, e) -> { };
1275 boolean asyncMode = true; // FIFO
1276 return new ForkJoinPool(parallelism, factory, handler, asyncMode,
1277 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);
1278 };
1279 return AccessController.doPrivileged(pa);
1280 }
1281
1282 /**
1283 * Invoked by the VM for the Thread.vthread_scheduler diagnostic command.
1284 */
1285 private static byte[] printDefaultScheduler() {
1286 return String.format("%s%n", DEFAULT_SCHEDULER.toString())
1287 .getBytes(StandardCharsets.UTF_8);
1288 }
1289
1290 /**
1291 * Creates the ScheduledThreadPoolExecutors used to execute delayed tasks.
1292 */
1293 private static ScheduledExecutorService[] createDelayedTaskSchedulers() {
1294 String propName = "jdk.virtualThreadScheduler.timerQueues";
1295 String propValue = GetPropertyAction.privilegedGetProperty(propName);
1296 int queueCount;
1297 if (propValue != null) {
1298 queueCount = Integer.parseInt(propValue);
1299 if (queueCount != Integer.highestOneBit(queueCount)) {
1300 throw new RuntimeException("Value of " + propName + " must be power of 2");
1301 }
1302 } else {
1303 int ncpus = Runtime.getRuntime().availableProcessors();
1304 queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1);
1305 }
1306 var schedulers = new ScheduledExecutorService[queueCount];
1307 for (int i = 0; i < queueCount; i++) {
1308 ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor)
1309 Executors.newScheduledThreadPool(1, task -> {
1310 Thread t = InnocuousThread.newThread("VirtualThread-unparker", task);
1311 t.setDaemon(true);
1312 return t;
1313 });
1314 stpe.setRemoveOnCancelPolicy(true);
1315 schedulers[i] = stpe;
1316 }
1317 return schedulers;
1318 }
1319
1320 /**
1321 * Reads the value of the jdk.tracePinnedThreads property to determine if stack
1322 * traces should be printed when a carrier thread is pinned when a virtual thread
1323 * attempts to park.
1324 */
1325 private static int tracePinningMode() {
1326 String propValue = GetPropertyAction.privilegedGetProperty("jdk.tracePinnedThreads");
1327 if (propValue != null) {
1328 if (propValue.length() == 0 || "full".equalsIgnoreCase(propValue))
1329 return 1;
1330 if ("short".equalsIgnoreCase(propValue))
1331 return 2;
1332 }
1333 return 0;
1334 }
1335
1336 /**
1337 * Unblock virtual threads that are ready to be scheduled again.
1338 */
1339 private static void processPendingList() {
1340 // TBD invoke unblock
1341 }
1342
1343 static {
1344 var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
1345 VirtualThread::processPendingList);
1346 unblocker.setDaemon(true);
1347 unblocker.start();
1348 }
1349 }
|