7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package java.lang;
26
27 import java.util.Locale;
28 import java.util.Objects;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.Executor;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.ForkJoinPool;
33 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
34 import java.util.concurrent.ForkJoinTask;
35 import java.util.concurrent.Future;
36 import java.util.concurrent.RejectedExecutionException;
37 import java.util.concurrent.ScheduledExecutorService;
38 import java.util.concurrent.ScheduledThreadPoolExecutor;
39 import java.util.concurrent.TimeUnit;
40 import jdk.internal.event.VirtualThreadEndEvent;
41 import jdk.internal.event.VirtualThreadStartEvent;
42 import jdk.internal.event.VirtualThreadSubmitFailedEvent;
43 import jdk.internal.misc.CarrierThread;
44 import jdk.internal.misc.InnocuousThread;
45 import jdk.internal.misc.Unsafe;
46 import jdk.internal.vm.Continuation;
47 import jdk.internal.vm.ContinuationScope;
48 import jdk.internal.vm.StackableScope;
49 import jdk.internal.vm.ThreadContainer;
50 import jdk.internal.vm.ThreadContainers;
51 import jdk.internal.vm.annotation.ChangesCurrentThread;
52 import jdk.internal.vm.annotation.Hidden;
53 import jdk.internal.vm.annotation.IntrinsicCandidate;
54 import jdk.internal.vm.annotation.JvmtiHideEvents;
55 import jdk.internal.vm.annotation.JvmtiMountTransition;
56 import jdk.internal.vm.annotation.ReservedStackAccess;
57 import sun.nio.ch.Interruptible;
58 import static java.util.concurrent.TimeUnit.*;
59
60 /**
61 * A thread that is scheduled by the Java virtual machine rather than the operating system.
62 */
63 final class VirtualThread extends BaseVirtualThread {
64 private static final Unsafe U = Unsafe.getUnsafe();
65 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
66 private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
67
68 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
69 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
70 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
71 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
72 private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
73
74 // scheduler and continuation
75 private final Executor scheduler;
76 private final Continuation cont;
77 private final Runnable runContinuation;
78
79 // virtual thread state, accessed by VM
80 private volatile int state;
81
82 /*
83 * Virtual thread state transitions:
84 *
85 * NEW -> STARTED // Thread.start, schedule to run
86 * STARTED -> TERMINATED // failed to start
87 * STARTED -> RUNNING // first run
88 * RUNNING -> TERMINATED // done
89 *
90 * RUNNING -> PARKING // Thread parking with LockSupport.park
91 * PARKING -> PARKED // cont.yield successful, parked indefinitely
92 * PARKING -> PINNED // cont.yield failed, parked indefinitely on carrier
93 * PARKED -> UNPARKED // unparked, may be scheduled to continue
94 * PINNED -> RUNNING // unparked, continue execution on same carrier
95 * UNPARKED -> RUNNING // continue execution after park
96 *
97 * RUNNING -> TIMED_PARKING // Thread parking with LockSupport.parkNanos
172 private volatile boolean interruptibleWait;
173
174 // timed-wait support
175 private byte timedWaitSeqNo;
176
177 // timeout for timed-park and timed-wait, only accessed on current/carrier thread
178 private long timeout;
179
180 // timer task for timed-park and timed-wait, only accessed on current/carrier thread
181 private Future<?> timeoutTask;
182
183 // carrier thread when mounted, accessed by VM
184 private volatile Thread carrierThread;
185
186 // termination object when joining, created lazily if needed
187 private volatile CountDownLatch termination;
188
189 /**
190 * Returns the default scheduler.
191 */
192 static Executor defaultScheduler() {
193 return DEFAULT_SCHEDULER;
194 }
195
196 /**
197 * Returns the continuation scope used for virtual threads.
198 */
199 static ContinuationScope continuationScope() {
200 return VTHREAD_SCOPE;
201 }
202
203 /**
204 * Creates a new {@code VirtualThread} to run the given task with the given
205 * scheduler. If the given scheduler is {@code null} and the current thread
206 * is a platform thread then the newly created virtual thread will use the
207 * default scheduler. If given scheduler is {@code null} and the current
208 * thread is a virtual thread then the current thread's scheduler is used.
209 *
210 * @param scheduler the scheduler or null
211 * @param name thread name
212 * @param characteristics characteristics
213 * @param task the task to execute
214 */
215 VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
216 super(name, characteristics, /*bound*/ false);
217 Objects.requireNonNull(task);
218
219 // choose scheduler if not specified
220 if (scheduler == null) {
221 Thread parent = Thread.currentThread();
222 if (parent instanceof VirtualThread vparent) {
223 scheduler = vparent.scheduler;
224 } else {
225 scheduler = DEFAULT_SCHEDULER;
226 }
227 }
228
229 this.scheduler = scheduler;
230 this.cont = new VThreadContinuation(this, task);
231 this.runContinuation = this::runContinuation;
232 }
233
234 /**
235 * The continuation that a virtual thread executes.
236 */
237 private static class VThreadContinuation extends Continuation {
238 VThreadContinuation(VirtualThread vthread, Runnable task) {
239 super(VTHREAD_SCOPE, wrap(vthread, task));
240 }
241 @Override
242 protected void onPinned(Continuation.Pinned reason) {
243 }
244 private static Runnable wrap(VirtualThread vthread, Runnable task) {
245 return new Runnable() {
246 @Hidden
247 @JvmtiHideEvents
248 public void run() {
249 vthread.notifyJvmtiStart(); // notify JVMTI
250 try {
251 vthread.run(task);
299 if (cont.isDone()) {
300 afterDone();
301 } else {
302 afterYield();
303 }
304 }
305 }
306
307 /**
308 * Cancel timeout task when continuing after timed-park or timed-wait.
309 * The timeout task may be executing, or may have already completed.
310 */
311 private void cancelTimeoutTask() {
312 if (timeoutTask != null) {
313 timeoutTask.cancel(false);
314 timeoutTask = null;
315 }
316 }
317
318 /**
319 * Submits the runContinuation task to the scheduler. For the default scheduler,
320 * and calling it on a worker thread, the task will be pushed to the local queue,
321 * otherwise it will be pushed to an external submission queue.
322 * @param scheduler the scheduler
323 * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown
324 * @throws RejectedExecutionException
325 */
326 private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) {
327 boolean done = false;
328 while (!done) {
329 try {
330 // Pin the continuation to prevent the virtual thread from unmounting
331 // when submitting a task. For the default scheduler this ensures that
332 // the carrier doesn't change when pushing a task. For other schedulers
333 // it avoids deadlock that could arise due to carriers and virtual
334 // threads contending for a lock.
335 if (currentThread().isVirtual()) {
336 Continuation.pin();
337 try {
338 scheduler.execute(runContinuation);
339 } finally {
340 Continuation.unpin();
341 }
342 } else {
343 scheduler.execute(runContinuation);
344 }
345 done = true;
346 } catch (RejectedExecutionException ree) {
347 submitFailed(ree);
348 throw ree;
349 } catch (OutOfMemoryError e) {
350 if (retryOnOOME) {
351 U.park(false, 100_000_000); // 100ms
352 } else {
353 throw e;
354 }
355 }
356 }
357 }
358
359 /**
360 * Submits the runContinuation task to the given scheduler as an external submit.
361 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
362 * @throws RejectedExecutionException
363 * @see ForkJoinPool#externalSubmit(ForkJoinTask)
364 */
365 private void externalSubmitRunContinuation(ForkJoinPool pool) {
366 assert Thread.currentThread() instanceof CarrierThread;
367 try {
368 pool.externalSubmit(ForkJoinTask.adapt(runContinuation));
369 } catch (RejectedExecutionException ree) {
370 submitFailed(ree);
371 throw ree;
372 } catch (OutOfMemoryError e) {
373 submitRunContinuation(pool, true);
374 }
375 }
376
377 /**
378 * Submits the runContinuation task to the scheduler. For the default scheduler,
379 * and calling it on a worker thread, the task will be pushed to the local queue,
380 * otherwise it will be pushed to an external submission queue.
381 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
382 * @throws RejectedExecutionException
383 */
384 private void submitRunContinuation() {
385 submitRunContinuation(scheduler, true);
386 }
387
388 /**
389 * Lazy submit the runContinuation task if invoked on a carrier thread and its local
390 * queue is empty. If not empty, or invoked by another thread, then this method works
391 * like submitRunContinuation and just submits the task to the scheduler.
392 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
393 * @throws RejectedExecutionException
394 * @see ForkJoinPool#lazySubmit(ForkJoinTask)
395 */
396 private void lazySubmitRunContinuation() {
397 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
398 ForkJoinPool pool = ct.getPool();
399 try {
400 pool.lazySubmit(ForkJoinTask.adapt(runContinuation));
401 } catch (RejectedExecutionException ree) {
402 submitFailed(ree);
403 throw ree;
404 } catch (OutOfMemoryError e) {
405 submitRunContinuation();
406 }
407 } else {
408 submitRunContinuation();
409 }
410 }
411
412 /**
413 * Submits the runContinuation task to the scheduler. For the default scheduler, and
414 * calling it a virtual thread that uses the default scheduler, the task will be
415 * pushed to an external submission queue. This method may throw OutOfMemoryError.
416 * @throws RejectedExecutionException
417 * @throws OutOfMemoryError
418 */
419 private void externalSubmitRunContinuationOrThrow() {
420 if (scheduler == DEFAULT_SCHEDULER && currentCarrierThread() instanceof CarrierThread ct) {
421 try {
422 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation));
423 } catch (RejectedExecutionException ree) {
424 submitFailed(ree);
425 throw ree;
426 }
427 } else {
428 submitRunContinuation(scheduler, false);
429 }
430 }
431
432 /**
433 * If enabled, emits a JFR VirtualThreadSubmitFailedEvent.
434 */
435 private void submitFailed(RejectedExecutionException ree) {
436 var event = new VirtualThreadSubmitFailedEvent();
437 if (event.isEnabled()) {
438 event.javaThreadId = threadId();
439 event.exceptionMessage = ree.getMessage();
440 event.commit();
441 }
442 }
443
444 /**
445 * Runs a task in the context of this virtual thread.
446 */
447 private void run(Runnable task) {
448 assert Thread.currentThread() == this && state == RUNNING;
563 long timeout = this.timeout;
564 assert timeout > 0;
565 timeoutTask = schedule(this::parkTimeoutExpired, timeout, NANOSECONDS);
566 setState(newState = TIMED_PARKED);
567 }
568
569 // may have been unparked while parking
570 if (parkPermit && compareAndSetState(newState, UNPARKED)) {
571 // lazy submit if local queue is empty
572 lazySubmitRunContinuation();
573 }
574 return;
575 }
576
577 // Thread.yield
578 if (s == YIELDING) {
579 setState(YIELDED);
580
581 // external submit if there are no tasks in the local task queue
582 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
583 externalSubmitRunContinuation(ct.getPool());
584 } else {
585 submitRunContinuation();
586 }
587 return;
588 }
589
590 // blocking on monitorenter
591 if (s == BLOCKING) {
592 setState(BLOCKED);
593
594 // may have been unblocked while blocking
595 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
596 // lazy submit if local queue is empty
597 lazySubmitRunContinuation();
598 }
599 return;
600 }
601
602 // Object.wait
603 if (s == WAITING || s == TIMED_WAITING) {
720 @Override
721 public void run() {
722 // do nothing
723 }
724
725 /**
726 * Parks until unparked or interrupted. If already unparked then the parking
727 * permit is consumed and this method completes immediately (meaning it doesn't
728 * yield). It also completes immediately if the interrupted status is set.
729 */
730 @Override
731 void park() {
732 assert Thread.currentThread() == this;
733
734 // complete immediately if parking permit available or interrupted
735 if (getAndSetParkPermit(false) || interrupted)
736 return;
737
738 // park the thread
739 boolean yielded = false;
740 setState(PARKING);
741 try {
742 yielded = yieldContinuation();
743 } catch (OutOfMemoryError e) {
744 // park on carrier
745 } finally {
746 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
747 if (!yielded) {
748 assert state() == PARKING;
749 setState(RUNNING);
750 }
751 }
752
753 // park on the carrier thread when pinned
754 if (!yielded) {
755 parkOnCarrierThread(false, 0);
756 }
757 }
758
759 /**
760 * Parks up to the given waiting time or until unparked or interrupted.
761 * If already unparked then the parking permit is consumed and this method
762 * completes immediately (meaning it doesn't yield). It also completes immediately
763 * if the interrupted status is set or the waiting time is {@code <= 0}.
764 *
765 * @param nanos the maximum number of nanoseconds to wait.
766 */
767 @Override
768 void parkNanos(long nanos) {
769 assert Thread.currentThread() == this;
770
771 // complete immediately if parking permit available or interrupted
772 if (getAndSetParkPermit(false) || interrupted)
773 return;
774
775 // park the thread for the waiting time
776 if (nanos > 0) {
777 long startTime = System.nanoTime();
778
779 // park the thread, afterYield will schedule the thread to unpark
780 boolean yielded = false;
781 timeout = nanos;
782 setState(TIMED_PARKING);
783 try {
784 yielded = yieldContinuation();
785 } catch (OutOfMemoryError e) {
786 // park on carrier
787 } finally {
788 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
789 if (!yielded) {
790 assert state() == TIMED_PARKING;
791 setState(RUNNING);
792 }
793 }
794
795 // park on carrier thread for remaining time when pinned (or OOME)
796 if (!yielded) {
797 long remainingNanos = nanos - (System.nanoTime() - startTime);
798 parkOnCarrierThread(true, remainingNanos);
799 }
800 }
801 }
802
803 /**
804 * Parks the current carrier thread up to the given waiting time or until
805 * unparked or interrupted. If the virtual thread is interrupted then the
806 * interrupted status will be propagated to the carrier thread.
807 * @param timed true for a timed park, false for untimed
808 * @param nanos the waiting time in nanoseconds
809 */
1399 @JvmtiMountTransition
1400 private native void notifyJvmtiEnd();
1401
1402 @IntrinsicCandidate
1403 @JvmtiMountTransition
1404 private native void notifyJvmtiMount(boolean hide);
1405
1406 @IntrinsicCandidate
1407 @JvmtiMountTransition
1408 private native void notifyJvmtiUnmount(boolean hide);
1409
1410 @IntrinsicCandidate
1411 private static native void notifyJvmtiDisableSuspend(boolean enter);
1412
1413 private static native void registerNatives();
1414 static {
1415 registerNatives();
1416
1417 // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
1418 var group = Thread.virtualThreadGroup();
1419 }
1420
1421 /**
1422 * Creates the default ForkJoinPool scheduler.
1423 */
1424 private static ForkJoinPool createDefaultScheduler() {
1425 ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool);
1426 int parallelism, maxPoolSize, minRunnable;
1427 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
1428 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
1429 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
1430 if (parallelismValue != null) {
1431 parallelism = Integer.parseInt(parallelismValue);
1432 } else {
1433 parallelism = Runtime.getRuntime().availableProcessors();
1434 }
1435 if (maxPoolSizeValue != null) {
1436 maxPoolSize = Integer.parseInt(maxPoolSizeValue);
1437 parallelism = Integer.min(parallelism, maxPoolSize);
1438 } else {
1439 maxPoolSize = Integer.max(parallelism, 256);
1440 }
1441 if (minRunnableValue != null) {
1442 minRunnable = Integer.parseInt(minRunnableValue);
1443 } else {
1444 minRunnable = Integer.max(parallelism / 2, 1);
1445 }
1446 Thread.UncaughtExceptionHandler handler = (t, e) -> { };
1447 boolean asyncMode = true; // FIFO
1448 return new ForkJoinPool(parallelism, factory, handler, asyncMode,
1449 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);
1450 }
1451
1452 /**
1453 * Schedule a runnable task to run after a delay.
1454 */
1455 private Future<?> schedule(Runnable command, long delay, TimeUnit unit) {
1456 if (scheduler instanceof ForkJoinPool pool) {
1457 return pool.schedule(command, delay, unit);
1458 } else {
1459 return DelayedTaskSchedulers.schedule(command, delay, unit);
1460 }
1461 }
1462
1463 /**
1464 * Supports scheduling a runnable task to run after a delay. It uses a number
1465 * of ScheduledThreadPoolExecutor instances to reduce contention on the delayed
1466 * work queue used. This class is used when using a custom scheduler.
1467 */
1468 private static class DelayedTaskSchedulers {
1469 private static final ScheduledExecutorService[] INSTANCE = createDelayedTaskSchedulers();
1519 assert changed;
1520 vthread.unblock();
1521
1522 vthread = nextThread;
1523 }
1524 }
1525 }
1526
1527 /**
1528 * Retrieves the list of virtual threads that are waiting to be unblocked, waiting
1529 * if necessary until a list of one or more threads becomes available.
1530 */
1531 private static native VirtualThread takeVirtualThreadListToUnblock();
1532
1533 static {
1534 var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
1535 VirtualThread::unblockVirtualThreads);
1536 unblocker.setDaemon(true);
1537 unblocker.start();
1538 }
1539 }
|
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package java.lang;
26
27 import java.lang.invoke.MethodHandles;
28 import java.lang.invoke.VarHandle;
29 import java.lang.reflect.Constructor;
30 import java.util.Locale;
31 import java.util.Objects;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.Executors;
34 import java.util.concurrent.ForkJoinPool;
35 import java.util.concurrent.ForkJoinTask;
36 import java.util.concurrent.Future;
37 import java.util.concurrent.RejectedExecutionException;
38 import java.util.concurrent.ScheduledExecutorService;
39 import java.util.concurrent.ScheduledThreadPoolExecutor;
40 import java.util.concurrent.TimeUnit;
41 import jdk.internal.event.VirtualThreadEndEvent;
42 import jdk.internal.event.VirtualThreadParkEvent;
43 import jdk.internal.event.VirtualThreadStartEvent;
44 import jdk.internal.event.VirtualThreadSubmitFailedEvent;
45 import jdk.internal.invoke.MhUtil;
46 import jdk.internal.misc.CarrierThread;
47 import jdk.internal.misc.InnocuousThread;
48 import jdk.internal.misc.Unsafe;
49 import jdk.internal.vm.Continuation;
50 import jdk.internal.vm.ContinuationScope;
51 import jdk.internal.vm.StackableScope;
52 import jdk.internal.vm.ThreadContainer;
53 import jdk.internal.vm.ThreadContainers;
54 import jdk.internal.vm.annotation.ChangesCurrentThread;
55 import jdk.internal.vm.annotation.Hidden;
56 import jdk.internal.vm.annotation.IntrinsicCandidate;
57 import jdk.internal.vm.annotation.JvmtiHideEvents;
58 import jdk.internal.vm.annotation.JvmtiMountTransition;
59 import jdk.internal.vm.annotation.ReservedStackAccess;
60 import sun.nio.ch.Interruptible;
61 import static java.util.concurrent.TimeUnit.*;
62
63 /**
64 * A thread that is scheduled by the Java virtual machine rather than the operating system.
65 */
66 final class VirtualThread extends BaseVirtualThread {
67 private static final Unsafe U = Unsafe.getUnsafe();
68 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
69
70 private static final ForkJoinPool BUILTIN_SCHEDULER;
71 private static final VirtualThreadScheduler DEFAULT_SCHEDULER;
72 static {
73 // experimental
74 String propValue = System.getProperty("jdk.virtualThreadScheduler.implClass");
75 if (propValue != null) {
76 var builtinScheduler = createBuiltinDefaultScheduler(true);
77 VirtualThreadScheduler defaultScheduler = builtinScheduler.externalView();
78 for (String cn: propValue.split(",")) {
79 defaultScheduler = loadCustomScheduler(defaultScheduler, cn);
80 }
81 BUILTIN_SCHEDULER = builtinScheduler;
82 DEFAULT_SCHEDULER = defaultScheduler;
83 } else {
84 var builtinScheduler = createBuiltinDefaultScheduler(false);
85 BUILTIN_SCHEDULER = builtinScheduler;
86 DEFAULT_SCHEDULER = builtinScheduler;
87 }
88 }
89
90 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
91 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
92 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
93 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
94 private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
95
96 // scheduler and continuation
97 private final VirtualThreadScheduler scheduler;
98 private final Continuation cont;
99 private final VirtualThreadTask runContinuation;
100
101 // virtual thread state, accessed by VM
102 private volatile int state;
103
104 /*
105 * Virtual thread state transitions:
106 *
107 * NEW -> STARTED // Thread.start, schedule to run
108 * STARTED -> TERMINATED // failed to start
109 * STARTED -> RUNNING // first run
110 * RUNNING -> TERMINATED // done
111 *
112 * RUNNING -> PARKING // Thread parking with LockSupport.park
113 * PARKING -> PARKED // cont.yield successful, parked indefinitely
114 * PARKING -> PINNED // cont.yield failed, parked indefinitely on carrier
115 * PARKED -> UNPARKED // unparked, may be scheduled to continue
116 * PINNED -> RUNNING // unparked, continue execution on same carrier
117 * UNPARKED -> RUNNING // continue execution after park
118 *
119 * RUNNING -> TIMED_PARKING // Thread parking with LockSupport.parkNanos
194 private volatile boolean interruptibleWait;
195
196 // timed-wait support
197 private byte timedWaitSeqNo;
198
199 // timeout for timed-park and timed-wait, only accessed on current/carrier thread
200 private long timeout;
201
202 // timer task for timed-park and timed-wait, only accessed on current/carrier thread
203 private Future<?> timeoutTask;
204
205 // carrier thread when mounted, accessed by VM
206 private volatile Thread carrierThread;
207
208 // termination object when joining, created lazily if needed
209 private volatile CountDownLatch termination;
210
211 /**
212 * Returns the default scheduler.
213 */
214 static VirtualThreadScheduler defaultScheduler() {
215 return DEFAULT_SCHEDULER;
216 }
217
218 /**
219 * Returns true if using a custom default scheduler.
220 */
221 static boolean isCustomDefaultScheduler() {
222 return DEFAULT_SCHEDULER != BUILTIN_SCHEDULER;
223 }
224
225 /**
226 * Returns the continuation scope used for virtual threads.
227 */
228 static ContinuationScope continuationScope() {
229 return VTHREAD_SCOPE;
230 }
231
232 /**
233 * Return the scheduler for this thread.
234 * @param revealBuiltin true to reveal the built-in scheduler, false to hide
235 */
236 VirtualThreadScheduler scheduler(boolean revealBuiltin) {
237 if (scheduler instanceof BuiltinDefaultScheduler builtin && !revealBuiltin) {
238 return builtin.externalView();
239 } else {
240 return scheduler;
241 }
242 }
243
244 /**
245 * Creates a new {@code VirtualThread} to run the given task with the given scheduler.
246 *
247 * @param scheduler the scheduler or null for default scheduler
248 * @param name thread name
249 * @param characteristics characteristics
250 * @param task the task to execute
251 */
252 VirtualThread(VirtualThreadScheduler scheduler,
253 String name,
254 int characteristics,
255 Runnable task) {
256 super(name, characteristics, /*bound*/ false);
257 Objects.requireNonNull(task);
258
259 // use default scheduler if not provided
260 if (scheduler == null) {
261 scheduler = DEFAULT_SCHEDULER;
262 }
263
264 this.scheduler = scheduler;
265 this.cont = new VThreadContinuation(this, task);
266
267 if (scheduler == BUILTIN_SCHEDULER) {
268 this.runContinuation = new BuiltinSchedulerTask(this);
269 } else {
270 this.runContinuation = new CustomSchedulerTask(this);
271 }
272 }
273
274 /**
275 * The task to execute when using the built-in scheduler.
276 */
277 static final class BuiltinSchedulerTask implements VirtualThreadTask {
278 private final VirtualThread vthread;
279 BuiltinSchedulerTask(VirtualThread vthread) {
280 this.vthread = vthread;
281 }
282 @Override
283 public Object attach(Object att) {
284 throw new UnsupportedOperationException();
285 }
286 @Override
287 public Object attachment() {
288 throw new UnsupportedOperationException();
289 }
290 @Override
291 public Thread thread() {
292 return vthread;
293 }
294 @Override
295 public void run() {
296 vthread.runContinuation();;
297 }
298 }
299
300 /**
301 * The task to execute when using a custom scheduler.
302 */
303 static final class CustomSchedulerTask implements VirtualThreadTask {
304 private static final VarHandle ATT =
305 MhUtil.findVarHandle(MethodHandles.lookup(), "att", Object.class);
306 private final VirtualThread vthread;
307 private volatile Object att;
308 CustomSchedulerTask(VirtualThread vthread) {
309 this.vthread = vthread;
310 }
311 @Override
312 public Object attach(Object att) {
313 return ATT.getAndSet(this, att);
314 }
315 @Override
316 public Object attachment() {
317 return att;
318 }
319 @Override
320 public Thread thread() {
321 return vthread;
322 }
323 @Override
324 public void run() {
325 vthread.runContinuation();;
326 }
327 }
328
329 /**
330 * The continuation that a virtual thread executes.
331 */
332 private static class VThreadContinuation extends Continuation {
333 VThreadContinuation(VirtualThread vthread, Runnable task) {
334 super(VTHREAD_SCOPE, wrap(vthread, task));
335 }
336 @Override
337 protected void onPinned(Continuation.Pinned reason) {
338 }
339 private static Runnable wrap(VirtualThread vthread, Runnable task) {
340 return new Runnable() {
341 @Hidden
342 @JvmtiHideEvents
343 public void run() {
344 vthread.notifyJvmtiStart(); // notify JVMTI
345 try {
346 vthread.run(task);
394 if (cont.isDone()) {
395 afterDone();
396 } else {
397 afterYield();
398 }
399 }
400 }
401
402 /**
403 * Cancel timeout task when continuing after timed-park or timed-wait.
404 * The timeout task may be executing, or may have already completed.
405 */
406 private void cancelTimeoutTask() {
407 if (timeoutTask != null) {
408 timeoutTask.cancel(false);
409 timeoutTask = null;
410 }
411 }
412
413 /**
414 * Submits the runContinuation task to the scheduler. For the built-in scheduler,
415 * the task will be pushed to the local queue if possible, otherwise it will be
416 * pushed to an external submission queue.
417 * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown
418 * @throws RejectedExecutionException
419 */
420 private void submitRunContinuation(boolean retryOnOOME) {
421 boolean done = false;
422 while (!done) {
423 try {
424 // Pin the continuation to prevent the virtual thread from unmounting
425 // when submitting a task. For the default scheduler this ensures that
426 // the carrier doesn't change when pushing a task. For other schedulers
427 // it avoids deadlock that could arise due to carriers and virtual
428 // threads contending for a lock.
429 if (currentThread().isVirtual()) {
430 Continuation.pin();
431 try {
432 scheduler.onContinue(runContinuation);
433 } finally {
434 Continuation.unpin();
435 }
436 } else {
437 scheduler.onContinue(runContinuation);
438 }
439 done = true;
440 } catch (RejectedExecutionException ree) {
441 submitFailed(ree);
442 throw ree;
443 } catch (OutOfMemoryError e) {
444 if (retryOnOOME) {
445 U.park(false, 100_000_000); // 100ms
446 } else {
447 throw e;
448 }
449 }
450 }
451 }
452
453 /**
454 * Submits the runContinuation task to the scheduler. For the default scheduler,
455 * and calling it on a worker thread, the task will be pushed to the local queue,
456 * otherwise it will be pushed to an external submission queue.
457 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
458 * @throws RejectedExecutionException
459 */
460 private void submitRunContinuation() {
461 submitRunContinuation(true);
462 }
463
464 /**
465 * Invoked from a carrier thread to lazy submit the runContinuation task to the
466 * carrier's local queue if the queue is empty. If not empty, or invoked by a thread
467 * for a custom scheduler, then it just submits the task to the scheduler.
468 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
469 * @throws RejectedExecutionException
470 * @see ForkJoinPool#lazySubmit(ForkJoinTask)
471 */
472 private void lazySubmitRunContinuation() {
473 assert !currentThread().isVirtual();
474 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
475 try {
476 ct.getPool().lazySubmit(ForkJoinTask.adapt(runContinuation));
477 } catch (RejectedExecutionException ree) {
478 submitFailed(ree);
479 throw ree;
480 } catch (OutOfMemoryError e) {
481 submitRunContinuation();
482 }
483 } else {
484 submitRunContinuation();
485 }
486 }
487
488 /**
489 * Invoked from a carrier thread to externally submit the runContinuation task to the
490 * scheduler. If invoked by a thread for a custom scheduler, then it just submits the
491 * task to the scheduler.
492 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
493 * @throws RejectedExecutionException
494 * @see ForkJoinPool#externalSubmit(ForkJoinTask)
495 */
496 private void externalSubmitRunContinuation() {
497 assert !currentThread().isVirtual();
498 if (currentThread() instanceof CarrierThread ct) {
499 try {
500 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation));
501 } catch (RejectedExecutionException ree) {
502 submitFailed(ree);
503 throw ree;
504 } catch (OutOfMemoryError e) {
505 submitRunContinuation();
506 }
507 } else {
508 submitRunContinuation();
509 }
510 }
511
512 /**
513 * Invoked from Thread.start to externally submit the runContinuation task to the
514 * scheduler. If this virtual thread is scheduled by the built-in scheduler,
515 * and this method is called from a virtual thread scheduled by the built-in
516 * scheduler, then it uses externalSubmit to ensure that the task is pushed to an
517 * external submission queue rather than the local queue.
518 * @throws RejectedExecutionException
519 * @throws OutOfMemoryError
520 * @see ForkJoinPool#externalSubmit(ForkJoinTask)
521 */
522 private void externalSubmitRunContinuationOrThrow() {
523 try {
524 if (currentThread().isVirtual()) {
525 // Pin the continuation to prevent the virtual thread from unmounting
526 // when submitting a task. This avoids deadlock that could arise due to
527 // carriers and virtual threads contending for a lock.
528 Continuation.pin();
529 try {
530 if (scheduler == BUILTIN_SCHEDULER
531 && currentCarrierThread() instanceof CarrierThread ct) {
532 ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation));
533 } else {
534 scheduler.onStart(runContinuation);
535 }
536 } finally {
537 Continuation.unpin();
538 }
539 } else {
540 scheduler.onStart(runContinuation);
541 }
542 } catch (RejectedExecutionException ree) {
543 submitFailed(ree);
544 throw ree;
545 }
546 }
547
548 /**
549 * If enabled, emits a JFR VirtualThreadSubmitFailedEvent.
550 */
551 private void submitFailed(RejectedExecutionException ree) {
552 var event = new VirtualThreadSubmitFailedEvent();
553 if (event.isEnabled()) {
554 event.javaThreadId = threadId();
555 event.exceptionMessage = ree.getMessage();
556 event.commit();
557 }
558 }
559
560 /**
561 * Runs a task in the context of this virtual thread.
562 */
563 private void run(Runnable task) {
564 assert Thread.currentThread() == this && state == RUNNING;
679 long timeout = this.timeout;
680 assert timeout > 0;
681 timeoutTask = schedule(this::parkTimeoutExpired, timeout, NANOSECONDS);
682 setState(newState = TIMED_PARKED);
683 }
684
685 // may have been unparked while parking
686 if (parkPermit && compareAndSetState(newState, UNPARKED)) {
687 // lazy submit if local queue is empty
688 lazySubmitRunContinuation();
689 }
690 return;
691 }
692
693 // Thread.yield
694 if (s == YIELDING) {
695 setState(YIELDED);
696
697 // external submit if there are no tasks in the local task queue
698 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
699 externalSubmitRunContinuation();
700 } else {
701 submitRunContinuation();
702 }
703 return;
704 }
705
706 // blocking on monitorenter
707 if (s == BLOCKING) {
708 setState(BLOCKED);
709
710 // may have been unblocked while blocking
711 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
712 // lazy submit if local queue is empty
713 lazySubmitRunContinuation();
714 }
715 return;
716 }
717
718 // Object.wait
719 if (s == WAITING || s == TIMED_WAITING) {
836 @Override
837 public void run() {
838 // do nothing
839 }
840
841 /**
842 * Parks until unparked or interrupted. If already unparked then the parking
843 * permit is consumed and this method completes immediately (meaning it doesn't
844 * yield). It also completes immediately if the interrupted status is set.
845 */
846 @Override
847 void park() {
848 assert Thread.currentThread() == this;
849
850 // complete immediately if parking permit available or interrupted
851 if (getAndSetParkPermit(false) || interrupted)
852 return;
853
854 // park the thread
855 boolean yielded = false;
856 long eventStartTime = VirtualThreadParkEvent.eventStartTime();
857 setState(PARKING);
858 try {
859 yielded = yieldContinuation();
860 } catch (OutOfMemoryError e) {
861 // park on carrier
862 } finally {
863 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
864 if (yielded) {
865 VirtualThreadParkEvent.offer(eventStartTime, Long.MIN_VALUE);
866 } else {
867 assert state() == PARKING;
868 setState(RUNNING);
869 }
870 }
871
872 // park on the carrier thread when pinned
873 if (!yielded) {
874 parkOnCarrierThread(false, 0);
875 }
876 }
877
878 /**
879 * Parks up to the given waiting time or until unparked or interrupted.
880 * If already unparked then the parking permit is consumed and this method
881 * completes immediately (meaning it doesn't yield). It also completes immediately
882 * if the interrupted status is set or the waiting time is {@code <= 0}.
883 *
884 * @param nanos the maximum number of nanoseconds to wait.
885 */
886 @Override
887 void parkNanos(long nanos) {
888 assert Thread.currentThread() == this;
889
890 // complete immediately if parking permit available or interrupted
891 if (getAndSetParkPermit(false) || interrupted)
892 return;
893
894 // park the thread for the waiting time
895 if (nanos > 0) {
896 long startTime = System.nanoTime();
897
898 // park the thread, afterYield will schedule the thread to unpark
899 boolean yielded = false;
900 long eventStartTime = VirtualThreadParkEvent.eventStartTime();
901 timeout = nanos;
902 setState(TIMED_PARKING);
903 try {
904 yielded = yieldContinuation();
905 } catch (OutOfMemoryError e) {
906 // park on carrier
907 } finally {
908 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
909 if (yielded) {
910 VirtualThreadParkEvent.offer(eventStartTime, nanos);
911 } else {
912 assert state() == TIMED_PARKING;
913 setState(RUNNING);
914 }
915 }
916
917 // park on carrier thread for remaining time when pinned (or OOME)
918 if (!yielded) {
919 long remainingNanos = nanos - (System.nanoTime() - startTime);
920 parkOnCarrierThread(true, remainingNanos);
921 }
922 }
923 }
924
925 /**
926 * Parks the current carrier thread up to the given waiting time or until
927 * unparked or interrupted. If the virtual thread is interrupted then the
928 * interrupted status will be propagated to the carrier thread.
929 * @param timed true for a timed park, false for untimed
930 * @param nanos the waiting time in nanoseconds
931 */
1521 @JvmtiMountTransition
1522 private native void notifyJvmtiEnd();
1523
1524 @IntrinsicCandidate
1525 @JvmtiMountTransition
1526 private native void notifyJvmtiMount(boolean hide);
1527
1528 @IntrinsicCandidate
1529 @JvmtiMountTransition
1530 private native void notifyJvmtiUnmount(boolean hide);
1531
1532 @IntrinsicCandidate
1533 private static native void notifyJvmtiDisableSuspend(boolean enter);
1534
1535 private static native void registerNatives();
1536 static {
1537 registerNatives();
1538
1539 // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
1540 var group = Thread.virtualThreadGroup();
1541
1542 // ensure event class is initialized
1543 try {
1544 MethodHandles.lookup().ensureInitialized(VirtualThreadParkEvent.class);
1545 } catch (IllegalAccessException e) {
1546 throw new ExceptionInInitializerError(e);
1547 }
1548 }
1549
1550 /**
1551 * Loads a VirtualThreadScheduler with the given class name. The class must be public
1552 * in an exported package, with public one-arg or no-arg constructor, and be visible
1553 * to the system class loader.
1554 * @param delegate the scheduler that the custom scheduler may delegate to
1555 * @param cn the class name of the custom scheduler
1556 */
1557 private static VirtualThreadScheduler loadCustomScheduler(VirtualThreadScheduler delegate, String cn) {
1558 try {
1559 Class<?> clazz = Class.forName(cn, true, ClassLoader.getSystemClassLoader());
1560 VirtualThreadScheduler scheduler;
1561 try {
1562 // 1-arg constructor
1563 Constructor<?> ctor = clazz.getConstructor(VirtualThreadScheduler.class);
1564 scheduler = (VirtualThreadScheduler) ctor.newInstance(delegate);
1565 } catch (NoSuchMethodException e) {
1566 // 0-arg constructor
1567 Constructor<?> ctor = clazz.getConstructor();
1568 scheduler = (VirtualThreadScheduler) ctor.newInstance();
1569 }
1570 System.err.println("""
1571 WARNING: Using custom default scheduler, this is an experimental feature!""");
1572 return scheduler;
1573 } catch (Exception ex) {
1574 throw new Error(ex);
1575 }
1576 }
1577
1578 /**
1579 * Creates the built-in ForkJoinPool scheduler.
1580 * @param wrapped true if wrapped by a custom default scheduler
1581 */
1582 private static BuiltinDefaultScheduler createBuiltinDefaultScheduler(boolean wrapped) {
1583 int parallelism, maxPoolSize, minRunnable;
1584 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
1585 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
1586 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
1587 if (parallelismValue != null) {
1588 parallelism = Integer.parseInt(parallelismValue);
1589 } else {
1590 parallelism = Runtime.getRuntime().availableProcessors();
1591 }
1592 if (maxPoolSizeValue != null) {
1593 maxPoolSize = Integer.parseInt(maxPoolSizeValue);
1594 parallelism = Integer.min(parallelism, maxPoolSize);
1595 } else {
1596 maxPoolSize = Integer.max(parallelism, 256);
1597 }
1598 if (minRunnableValue != null) {
1599 minRunnable = Integer.parseInt(minRunnableValue);
1600 } else {
1601 minRunnable = Integer.max(parallelism / 2, 1);
1602 }
1603 return new BuiltinDefaultScheduler(parallelism, maxPoolSize, minRunnable, wrapped);
1604 }
1605
1606 /**
1607 * The built-in ForkJoinPool scheduler.
1608 */
1609 private static class BuiltinDefaultScheduler
1610 extends ForkJoinPool implements VirtualThreadScheduler {
1611
1612 private static final StableValue<VirtualThreadScheduler> VIEW = StableValue.of();
1613
1614 BuiltinDefaultScheduler(int parallelism, int maxPoolSize, int minRunnable, boolean wrapped) {
1615 ForkJoinWorkerThreadFactory factory = wrapped
1616 ? ForkJoinPool.defaultForkJoinWorkerThreadFactory
1617 : CarrierThread::new;
1618 Thread.UncaughtExceptionHandler handler = (t, e) -> { };
1619 boolean asyncMode = true; // FIFO
1620 super(parallelism, factory, handler, asyncMode,
1621 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);
1622 }
1623
1624 private void adaptAndExecute(Runnable task) {
1625 execute(ForkJoinTask.adapt(task));
1626 }
1627
1628 @Override
1629 public void onStart(VirtualThreadTask task) {
1630 adaptAndExecute(task);
1631 }
1632
1633 @Override
1634 public void onContinue(VirtualThreadTask task) {
1635 adaptAndExecute(task);
1636 }
1637
1638 /**
1639 * Wraps the scheduler to avoid leaking a direct reference.
1640 */
1641 VirtualThreadScheduler externalView() {
1642 BuiltinDefaultScheduler builtin = this;
1643 return VIEW.orElseSet(() -> {
1644 return new VirtualThreadScheduler() {
1645 private void execute(VirtualThreadTask task) {
1646 var vthread = (VirtualThread) task.thread();
1647 VirtualThreadScheduler scheduler = vthread.scheduler;
1648 if (scheduler == this || scheduler == DEFAULT_SCHEDULER) {
1649 builtin.adaptAndExecute(task);
1650 } else {
1651 throw new IllegalArgumentException();
1652 }
1653 }
1654 @Override
1655 public void onStart(VirtualThreadTask task) {
1656 execute(task);
1657 }
1658 @Override
1659 public void onContinue(VirtualThreadTask task) {
1660 execute(task);
1661 }
1662 };
1663 });
1664 }
1665 }
1666
1667 /**
1668 * Schedule a runnable task to run after a delay.
1669 */
1670 private Future<?> schedule(Runnable command, long delay, TimeUnit unit) {
1671 if (scheduler instanceof ForkJoinPool pool) {
1672 return pool.schedule(command, delay, unit);
1673 } else {
1674 return DelayedTaskSchedulers.schedule(command, delay, unit);
1675 }
1676 }
1677
1678 /**
1679 * Supports scheduling a runnable task to run after a delay. It uses a number
1680 * of ScheduledThreadPoolExecutor instances to reduce contention on the delayed
1681 * work queue used. This class is used when using a custom scheduler.
1682 */
1683 private static class DelayedTaskSchedulers {
1684 private static final ScheduledExecutorService[] INSTANCE = createDelayedTaskSchedulers();
1734 assert changed;
1735 vthread.unblock();
1736
1737 vthread = nextThread;
1738 }
1739 }
1740 }
1741
1742 /**
1743 * Retrieves the list of virtual threads that are waiting to be unblocked, waiting
1744 * if necessary until a list of one or more threads becomes available.
1745 */
1746 private static native VirtualThread takeVirtualThreadListToUnblock();
1747
1748 static {
1749 var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
1750 VirtualThread::unblockVirtualThreads);
1751 unblocker.setDaemon(true);
1752 unblocker.start();
1753 }
1754 }
|