7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package java.lang;
26
27 import java.util.Locale;
28 import java.util.Objects;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.Executor;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.ForkJoinPool;
33 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
34 import java.util.concurrent.ForkJoinTask;
35 import java.util.concurrent.Future;
36 import java.util.concurrent.RejectedExecutionException;
37 import java.util.concurrent.ScheduledExecutorService;
38 import java.util.concurrent.ScheduledThreadPoolExecutor;
39 import java.util.concurrent.TimeUnit;
40 import jdk.internal.event.VirtualThreadEndEvent;
41 import jdk.internal.event.VirtualThreadStartEvent;
42 import jdk.internal.event.VirtualThreadSubmitFailedEvent;
43 import jdk.internal.misc.CarrierThread;
44 import jdk.internal.misc.InnocuousThread;
45 import jdk.internal.misc.Unsafe;
46 import jdk.internal.vm.Continuation;
47 import jdk.internal.vm.ContinuationScope;
48 import jdk.internal.vm.StackableScope;
49 import jdk.internal.vm.ThreadContainer;
50 import jdk.internal.vm.ThreadContainers;
51 import jdk.internal.vm.annotation.ChangesCurrentThread;
52 import jdk.internal.vm.annotation.Hidden;
53 import jdk.internal.vm.annotation.IntrinsicCandidate;
54 import jdk.internal.vm.annotation.JvmtiHideEvents;
55 import jdk.internal.vm.annotation.JvmtiMountTransition;
56 import jdk.internal.vm.annotation.ReservedStackAccess;
57 import sun.nio.ch.Interruptible;
58 import static java.util.concurrent.TimeUnit.*;
59
60 /**
61 * A thread that is scheduled by the Java virtual machine rather than the operating system.
62 */
63 final class VirtualThread extends BaseVirtualThread {
64 private static final Unsafe U = Unsafe.getUnsafe();
65 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
66 private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
67
68 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
69 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
70 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
71 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
72 private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
73
74 // scheduler and continuation
75 private final Executor scheduler;
76 private final Continuation cont;
77 private final Runnable runContinuation;
78
79 // virtual thread state, accessed by VM
80 private volatile int state;
81
82 /*
83 * Virtual thread state transitions:
84 *
85 * NEW -> STARTED // Thread.start, schedule to run
86 * STARTED -> TERMINATED // failed to start
151 private static final int TERMINATED = 99; // final state
152
153 // can be suspended from scheduling when unmounted
154 private static final int SUSPENDED = 1 << 8;
155
156 // parking permit made available by LockSupport.unpark
157 private volatile boolean parkPermit;
158
159 // blocking permit made available by unblocker thread when another thread exits monitor
160 private volatile boolean blockPermit;
161
162 // true when on the list of virtual threads waiting to be unblocked
163 private volatile boolean onWaitingList;
164
165 // next virtual thread on the list of virtual threads waiting to be unblocked
166 private volatile VirtualThread next;
167
168 // notified by Object.notify/notifyAll while waiting in Object.wait
169 private volatile boolean notified;
170
171 // timed-wait support
172 private byte timedWaitSeqNo;
173
174 // timeout for timed-park and timed-wait, only accessed on current/carrier thread
175 private long timeout;
176
177 // timer task for timed-park and timed-wait, only accessed on current/carrier thread
178 private Future<?> timeoutTask;
179
180 // carrier thread when mounted, accessed by VM
181 private volatile Thread carrierThread;
182
183 // termination object when joining, created lazily if needed
184 private volatile CountDownLatch termination;
185
186 /**
187 * Returns the default scheduler.
188 */
189 static Executor defaultScheduler() {
190 return DEFAULT_SCHEDULER;
191 }
192
193 /**
194 * Returns the continuation scope used for virtual threads.
195 */
196 static ContinuationScope continuationScope() {
197 return VTHREAD_SCOPE;
198 }
199
200 /**
201 * Creates a new {@code VirtualThread} to run the given task with the given
202 * scheduler. If the given scheduler is {@code null} and the current thread
203 * is a platform thread then the newly created virtual thread will use the
204 * default scheduler. If given scheduler is {@code null} and the current
205 * thread is a virtual thread then the current thread's scheduler is used.
206 *
207 * @param scheduler the scheduler or null
208 * @param name thread name
209 * @param characteristics characteristics
210 * @param task the task to execute
211 */
212 VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
213 super(name, characteristics, /*bound*/ false);
214 Objects.requireNonNull(task);
215
216 // choose scheduler if not specified
217 if (scheduler == null) {
218 Thread parent = Thread.currentThread();
219 if (parent instanceof VirtualThread vparent) {
220 scheduler = vparent.scheduler;
221 } else {
222 scheduler = DEFAULT_SCHEDULER;
223 }
224 }
225
226 this.scheduler = scheduler;
227 this.cont = new VThreadContinuation(this, task);
228 this.runContinuation = this::runContinuation;
229 }
230
231 /**
232 * The continuation that a virtual thread executes.
233 */
234 private static class VThreadContinuation extends Continuation {
235 VThreadContinuation(VirtualThread vthread, Runnable task) {
236 super(VTHREAD_SCOPE, wrap(vthread, task));
237 }
238 @Override
239 protected void onPinned(Continuation.Pinned reason) {
240 }
241 private static Runnable wrap(VirtualThread vthread, Runnable task) {
242 return new Runnable() {
243 @Hidden
244 @JvmtiHideEvents
245 public void run() {
246 vthread.notifyJvmtiStart(); // notify JVMTI
247 try {
248 vthread.run(task);
249 } finally {
250 vthread.notifyJvmtiEnd(); // notify JVMTI
251 }
252 }
253 };
254 }
255 }
256
257 /**
258 * Runs or continues execution on the current thread. The virtual thread is mounted
259 * on the current thread before the task runs or continues. It unmounts when the
260 * task completes or yields.
261 */
262 @ChangesCurrentThread // allow mount/unmount to be inlined
263 private void runContinuation() {
264 // the carrier must be a platform thread
265 if (Thread.currentThread().isVirtual()) {
266 throw new WrongThreadException();
267 }
268
269 // set state to RUNNING
270 int initialState = state();
271 if (initialState == STARTED || initialState == UNPARKED
272 || initialState == UNBLOCKED || initialState == YIELDED) {
273 // newly started or continue after parking/blocking/Thread.yield
274 if (!compareAndSetState(initialState, RUNNING)) {
275 return;
276 }
295 unmount();
296 if (cont.isDone()) {
297 afterDone();
298 } else {
299 afterYield();
300 }
301 }
302 }
303
304 /**
305 * Cancel timeout task when continuing after timed-park or timed-wait.
306 * The timeout task may be executing, or may have already completed.
307 */
308 private void cancelTimeoutTask() {
309 if (timeoutTask != null) {
310 timeoutTask.cancel(false);
311 timeoutTask = null;
312 }
313 }
314
315 /**
316 * Submits the runContinuation task to the scheduler. For the default scheduler,
317 * and calling it on a worker thread, the task will be pushed to the local queue,
318 * otherwise it will be pushed to an external submission queue.
319 * @param scheduler the scheduler
320 * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown
321 * @throws RejectedExecutionException
322 */
323 private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) {
324 boolean done = false;
325 while (!done) {
326 try {
327 // Pin the continuation to prevent the virtual thread from unmounting
328 // when submitting a task. For the default scheduler this ensures that
329 // the carrier doesn't change when pushing a task. For other schedulers
330 // it avoids deadlock that could arise due to carriers and virtual
331 // threads contending for a lock.
332 if (currentThread().isVirtual()) {
333 Continuation.pin();
334 try {
335 scheduler.execute(runContinuation);
336 } finally {
337 Continuation.unpin();
338 }
339 } else {
340 scheduler.execute(runContinuation);
341 }
342 done = true;
343 } catch (RejectedExecutionException ree) {
344 submitFailed(ree);
345 throw ree;
346 } catch (OutOfMemoryError e) {
347 if (retryOnOOME) {
348 U.park(false, 100_000_000); // 100ms
349 } else {
350 throw e;
351 }
352 }
353 }
354 }
355
356 /**
357 * Submits the runContinuation task to the given scheduler as an external submit.
358 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
359 * @throws RejectedExecutionException
360 * @see ForkJoinPool#externalSubmit(ForkJoinTask)
582 submitRunContinuation();
583 }
584 return;
585 }
586
587 // blocking on monitorenter
588 if (s == BLOCKING) {
589 setState(BLOCKED);
590
591 // may have been unblocked while blocking
592 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
593 // lazy submit if local queue is empty
594 lazySubmitRunContinuation();
595 }
596 return;
597 }
598
599 // Object.wait
600 if (s == WAITING || s == TIMED_WAITING) {
601 int newState;
602 if (s == WAITING) {
603 setState(newState = WAIT);
604 } else {
605 // For timed-wait, a timeout task is scheduled to execute. The timeout
606 // task will change the thread state to UNBLOCKED and submit the thread
607 // to the scheduler. A sequence number is used to ensure that the timeout
608 // task only unblocks the thread for this timed-wait. We synchronize with
609 // the timeout task to coordinate access to the sequence number and to
610 // ensure the timeout task doesn't execute until the thread has got to
611 // the TIMED_WAIT state.
612 long timeout = this.timeout;
613 assert timeout > 0;
614 synchronized (timedWaitLock()) {
615 byte seqNo = ++timedWaitSeqNo;
616 timeoutTask = schedule(() -> waitTimeoutExpired(seqNo), timeout, MILLISECONDS);
617 setState(newState = TIMED_WAIT);
618 }
619 }
620
621 // may have been notified while in transition to wait state
622 if (notified && compareAndSetState(newState, BLOCKED)) {
623 // may have even been unblocked already
624 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
625 submitRunContinuation();
626 }
627 return;
628 }
629
630 // may have been interrupted while in transition to wait state
631 if (interrupted && compareAndSetState(newState, UNBLOCKED)) {
632 submitRunContinuation();
633 return;
634 }
635 return;
636 }
637
638 assert false;
639 }
640
641 /**
642 * Invoked after the continuation completes.
643 */
644 private void afterDone() {
645 afterDone(true);
646 }
647
648 /**
649 * Invoked after the continuation completes (or start failed). Sets the thread
650 * state to TERMINATED and notifies anyone waiting for the thread to terminate.
651 *
1397
1398 @IntrinsicCandidate
1399 @JvmtiMountTransition
1400 private native void notifyJvmtiMount(boolean hide);
1401
1402 @IntrinsicCandidate
1403 @JvmtiMountTransition
1404 private native void notifyJvmtiUnmount(boolean hide);
1405
1406 @IntrinsicCandidate
1407 private static native void notifyJvmtiDisableSuspend(boolean enter);
1408
1409 private static native void registerNatives();
1410 static {
1411 registerNatives();
1412
1413 // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
1414 var group = Thread.virtualThreadGroup();
1415 }
1416
1417 /**
1418 * Creates the default ForkJoinPool scheduler.
1419 */
1420 private static ForkJoinPool createDefaultScheduler() {
1421 ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool);
1422 int parallelism, maxPoolSize, minRunnable;
1423 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
1424 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
1425 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
1426 if (parallelismValue != null) {
1427 parallelism = Integer.parseInt(parallelismValue);
1428 } else {
1429 parallelism = Runtime.getRuntime().availableProcessors();
1430 }
1431 if (maxPoolSizeValue != null) {
1432 maxPoolSize = Integer.parseInt(maxPoolSizeValue);
1433 parallelism = Integer.min(parallelism, maxPoolSize);
1434 } else {
1435 maxPoolSize = Integer.max(parallelism, 256);
1436 }
1437 if (minRunnableValue != null) {
1438 minRunnable = Integer.parseInt(minRunnableValue);
1439 } else {
1440 minRunnable = Integer.max(parallelism / 2, 1);
1515 assert changed;
1516 vthread.unblock();
1517
1518 vthread = nextThread;
1519 }
1520 }
1521 }
1522
1523 /**
1524 * Retrieves the list of virtual threads that are waiting to be unblocked, waiting
1525 * if necessary until a list of one or more threads becomes available.
1526 */
1527 private static native VirtualThread takeVirtualThreadListToUnblock();
1528
1529 static {
1530 var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
1531 VirtualThread::unblockVirtualThreads);
1532 unblocker.setDaemon(true);
1533 unblocker.start();
1534 }
1535 }
|
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package java.lang;
26
27 import java.lang.invoke.MethodHandles;
28 import java.lang.invoke.VarHandle;
29 import java.lang.reflect.Constructor;
30 import java.util.Locale;
31 import java.util.Objects;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.Executor;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.ForkJoinPool;
36 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
37 import java.util.concurrent.ForkJoinTask;
38 import java.util.concurrent.Future;
39 import java.util.concurrent.RejectedExecutionException;
40 import java.util.concurrent.ScheduledExecutorService;
41 import java.util.concurrent.ScheduledThreadPoolExecutor;
42 import java.util.concurrent.TimeUnit;
43 import jdk.internal.event.VirtualThreadEndEvent;
44 import jdk.internal.event.VirtualThreadStartEvent;
45 import jdk.internal.event.VirtualThreadSubmitFailedEvent;
46 import jdk.internal.invoke.MhUtil;
47 import jdk.internal.misc.CarrierThread;
48 import jdk.internal.misc.InnocuousThread;
49 import jdk.internal.misc.Unsafe;
50 import jdk.internal.vm.Continuation;
51 import jdk.internal.vm.ContinuationScope;
52 import jdk.internal.vm.StackableScope;
53 import jdk.internal.vm.ThreadContainer;
54 import jdk.internal.vm.ThreadContainers;
55 import jdk.internal.vm.annotation.ChangesCurrentThread;
56 import jdk.internal.vm.annotation.Hidden;
57 import jdk.internal.vm.annotation.IntrinsicCandidate;
58 import jdk.internal.vm.annotation.JvmtiHideEvents;
59 import jdk.internal.vm.annotation.JvmtiMountTransition;
60 import jdk.internal.vm.annotation.ReservedStackAccess;
61 import sun.nio.ch.Interruptible;
62 import static java.util.concurrent.TimeUnit.*;
63
64 /**
65 * A thread that is scheduled by the Java virtual machine rather than the operating system.
66 */
67 final class VirtualThread extends BaseVirtualThread {
68 private static final Unsafe U = Unsafe.getUnsafe();
69 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
70
71 private static final Executor DEFAULT_SCHEDULER;
72 private static final boolean USE_CUSTOM_RUNNER;
73 static {
74 // experimental
75 String propValue = System.getProperty("jdk.virtualThreadScheduler.implClass");
76 if (propValue != null) {
77 DEFAULT_SCHEDULER = createCustomDefaultScheduler(propValue);
78 USE_CUSTOM_RUNNER = true;
79 } else {
80 DEFAULT_SCHEDULER = createDefaultForkJoinPoolScheduler();
81 USE_CUSTOM_RUNNER = false;
82 }
83 }
84
85 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
86 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
87 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
88 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
89 private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
90
91 // scheduler and continuation
92 private final Executor scheduler;
93 private final Continuation cont;
94 private final Runnable runContinuation;
95
96 // virtual thread state, accessed by VM
97 private volatile int state;
98
99 /*
100 * Virtual thread state transitions:
101 *
102 * NEW -> STARTED // Thread.start, schedule to run
103 * STARTED -> TERMINATED // failed to start
168 private static final int TERMINATED = 99; // final state
169
170 // can be suspended from scheduling when unmounted
171 private static final int SUSPENDED = 1 << 8;
172
173 // parking permit made available by LockSupport.unpark
174 private volatile boolean parkPermit;
175
176 // blocking permit made available by unblocker thread when another thread exits monitor
177 private volatile boolean blockPermit;
178
179 // true when on the list of virtual threads waiting to be unblocked
180 private volatile boolean onWaitingList;
181
182 // next virtual thread on the list of virtual threads waiting to be unblocked
183 private volatile VirtualThread next;
184
185 // notified by Object.notify/notifyAll while waiting in Object.wait
186 private volatile boolean notified;
187
188 // true when waiting in Object.wait, false for VM internal uninterruptible Object.wait
189 private volatile boolean interruptableWait;
190
191 // timed-wait support
192 private byte timedWaitSeqNo;
193
194 // timeout for timed-park and timed-wait, only accessed on current/carrier thread
195 private long timeout;
196
197 // timer task for timed-park and timed-wait, only accessed on current/carrier thread
198 private Future<?> timeoutTask;
199
200 // carrier thread when mounted, accessed by VM
201 private volatile Thread carrierThread;
202
203 // termination object when joining, created lazily if needed
204 private volatile CountDownLatch termination;
205
206 /**
207 * Returns the default scheduler.
208 */
209 static Executor defaultScheduler() {
210 return DEFAULT_SCHEDULER;
211 }
212
213 /**
214 * Returns the continuation scope used for virtual threads.
215 */
216 static ContinuationScope continuationScope() {
217 return VTHREAD_SCOPE;
218 }
219
220 /**
221 * Return the scheduler for this thread.
222 */
223 Executor scheduler() {
224 return scheduler;
225 }
226
227 /**
228 * Creates a new {@code VirtualThread} to run the given task with the given scheduler.
229 *
230 * @param scheduler the scheduler or null for default scheduler
231 * @param name thread name
232 * @param characteristics characteristics
233 * @param task the task to execute
234 */
235 VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
236 super(name, characteristics, /*bound*/ false);
237 Objects.requireNonNull(task);
238
239 // use default scheduler if not provided
240 if (scheduler == null) {
241 scheduler = DEFAULT_SCHEDULER;
242 }
243
244 this.scheduler = scheduler;
245 this.cont = new VThreadContinuation(this, task);
246 if (USE_CUSTOM_RUNNER || (scheduler != DEFAULT_SCHEDULER)) {
247 this.runContinuation = new CustomRunner(this);
248 } else {
249 this.runContinuation = this::runContinuation;
250 }
251 }
252
253 /**
254 * The continuation that a virtual thread executes.
255 */
256 private static class VThreadContinuation extends Continuation {
257 VThreadContinuation(VirtualThread vthread, Runnable task) {
258 super(VTHREAD_SCOPE, wrap(vthread, task));
259 }
260 @Override
261 protected void onPinned(Continuation.Pinned reason) {
262 }
263 private static Runnable wrap(VirtualThread vthread, Runnable task) {
264 return new Runnable() {
265 @Hidden
266 @JvmtiHideEvents
267 public void run() {
268 vthread.notifyJvmtiStart(); // notify JVMTI
269 try {
270 vthread.run(task);
271 } finally {
272 vthread.notifyJvmtiEnd(); // notify JVMTI
273 }
274 }
275 };
276 }
277 }
278
279 /**
280 * The task to execute when using a custom scheduler.
281 */
282 private static class CustomRunner implements VirtualThreadTask {
283 private static final VarHandle ATTACHMENT =
284 MhUtil.findVarHandle(MethodHandles.lookup(), "attachment", Object.class);
285 private final VirtualThread vthread;
286 private volatile Object attachment;
287 CustomRunner(VirtualThread vthread) {
288 this.vthread = vthread;
289 }
290 @Override
291 public void run() {
292 vthread.runContinuation();
293 }
294 @Override
295 public Thread thread() {
296 return vthread;
297 }
298 @Override
299 public Object attach(Object ob) {
300 return ATTACHMENT.getAndSet(this, ob);
301 }
302 @Override
303 public Object attachment() {
304 return attachment;
305 }
306 @Override
307 public String toString() {
308 return vthread.toString();
309 }
310 }
311
312 /**
313 * Returns the object attached to the virtual thread's task.
314 */
315 Object currentTaskAttachment() {
316 assert Thread.currentThread() == this;
317 if (runContinuation instanceof CustomRunner runner) {
318 return runner.attachment();
319 } else {
320 return null;
321 }
322 }
323
324 /**
325 * Runs or continues execution on the current thread. The virtual thread is mounted
326 * on the current thread before the task runs or continues. It unmounts when the
327 * task completes or yields.
328 */
329 @ChangesCurrentThread // allow mount/unmount to be inlined
330 private void runContinuation() {
331 // the carrier must be a platform thread
332 if (Thread.currentThread().isVirtual()) {
333 throw new WrongThreadException();
334 }
335
336 // set state to RUNNING
337 int initialState = state();
338 if (initialState == STARTED || initialState == UNPARKED
339 || initialState == UNBLOCKED || initialState == YIELDED) {
340 // newly started or continue after parking/blocking/Thread.yield
341 if (!compareAndSetState(initialState, RUNNING)) {
342 return;
343 }
362 unmount();
363 if (cont.isDone()) {
364 afterDone();
365 } else {
366 afterYield();
367 }
368 }
369 }
370
371 /**
372 * Cancel timeout task when continuing after timed-park or timed-wait.
373 * The timeout task may be executing, or may have already completed.
374 */
375 private void cancelTimeoutTask() {
376 if (timeoutTask != null) {
377 timeoutTask.cancel(false);
378 timeoutTask = null;
379 }
380 }
381
382 /**
383 * Submits the given task to the given executor. If the scheduler is a
384 * ForkJoinPool then the task is first adapted to a ForkJoinTask.
385 */
386 private void submit(Executor executor, Runnable task) {
387 if (executor instanceof ForkJoinPool pool) {
388 pool.submit(ForkJoinTask.adapt(task));
389 } else {
390 executor.execute(task);
391 }
392 }
393
394 /**
395 * Submits the runContinuation task to the scheduler. For the default scheduler,
396 * and calling it on a worker thread, the task will be pushed to the local queue,
397 * otherwise it will be pushed to an external submission queue.
398 * @param scheduler the scheduler
399 * @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown
400 * @throws RejectedExecutionException
401 */
402 private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) {
403 boolean done = false;
404 while (!done) {
405 try {
406 // Pin the continuation to prevent the virtual thread from unmounting
407 // when submitting a task. For the default scheduler this ensures that
408 // the carrier doesn't change when pushing a task. For other schedulers
409 // it avoids deadlock that could arise due to carriers and virtual
410 // threads contending for a lock.
411 if (currentThread().isVirtual()) {
412 Continuation.pin();
413 try {
414 submit(scheduler, runContinuation);
415 } finally {
416 Continuation.unpin();
417 }
418 } else {
419 submit(scheduler, runContinuation);
420 }
421 done = true;
422 } catch (RejectedExecutionException ree) {
423 submitFailed(ree);
424 throw ree;
425 } catch (OutOfMemoryError e) {
426 if (retryOnOOME) {
427 U.park(false, 100_000_000); // 100ms
428 } else {
429 throw e;
430 }
431 }
432 }
433 }
434
435 /**
436 * Submits the runContinuation task to the given scheduler as an external submit.
437 * If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
438 * @throws RejectedExecutionException
439 * @see ForkJoinPool#externalSubmit(ForkJoinTask)
661 submitRunContinuation();
662 }
663 return;
664 }
665
666 // blocking on monitorenter
667 if (s == BLOCKING) {
668 setState(BLOCKED);
669
670 // may have been unblocked while blocking
671 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
672 // lazy submit if local queue is empty
673 lazySubmitRunContinuation();
674 }
675 return;
676 }
677
678 // Object.wait
679 if (s == WAITING || s == TIMED_WAITING) {
680 int newState;
681 boolean interruptable = interruptableWait;
682 if (s == WAITING) {
683 setState(newState = WAIT);
684 } else {
685 // For timed-wait, a timeout task is scheduled to execute. The timeout
686 // task will change the thread state to UNBLOCKED and submit the thread
687 // to the scheduler. A sequence number is used to ensure that the timeout
688 // task only unblocks the thread for this timed-wait. We synchronize with
689 // the timeout task to coordinate access to the sequence number and to
690 // ensure the timeout task doesn't execute until the thread has got to
691 // the TIMED_WAIT state.
692 long timeout = this.timeout;
693 assert timeout > 0;
694 synchronized (timedWaitLock()) {
695 byte seqNo = ++timedWaitSeqNo;
696 timeoutTask = schedule(() -> waitTimeoutExpired(seqNo), timeout, MILLISECONDS);
697 setState(newState = TIMED_WAIT);
698 }
699 }
700
701 // may have been notified while in transition to wait state
702 if (notified && compareAndSetState(newState, BLOCKED)) {
703 // may have even been unblocked already
704 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
705 submitRunContinuation();
706 }
707 return;
708 }
709
710 // may have been interrupted while in transition to wait state
711 if (interruptable && interrupted && compareAndSetState(newState, UNBLOCKED)) {
712 submitRunContinuation();
713 return;
714 }
715 return;
716 }
717
718 assert false;
719 }
720
721 /**
722 * Invoked after the continuation completes.
723 */
724 private void afterDone() {
725 afterDone(true);
726 }
727
728 /**
729 * Invoked after the continuation completes (or start failed). Sets the thread
730 * state to TERMINATED and notifies anyone waiting for the thread to terminate.
731 *
1477
1478 @IntrinsicCandidate
1479 @JvmtiMountTransition
1480 private native void notifyJvmtiMount(boolean hide);
1481
1482 @IntrinsicCandidate
1483 @JvmtiMountTransition
1484 private native void notifyJvmtiUnmount(boolean hide);
1485
1486 @IntrinsicCandidate
1487 private static native void notifyJvmtiDisableSuspend(boolean enter);
1488
1489 private static native void registerNatives();
1490 static {
1491 registerNatives();
1492
1493 // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
1494 var group = Thread.virtualThreadGroup();
1495 }
1496
1497 /**
1498 * Loads a java.util.concurrent.Executor with the given class name to use at the
1499 * default scheduler. The class is public in an exported package, has a public
1500 * no-arg constructor, and is visible to the system class loader.
1501 */
1502 private static Executor createCustomDefaultScheduler(String cn) {
1503 try {
1504 Class<?> clazz = Class.forName(cn, true, ClassLoader.getSystemClassLoader());
1505 Constructor<?> ctor = clazz.getConstructor();
1506 var scheduler = (Executor) ctor.newInstance();
1507 System.err.println("""
1508 WARNING: Using custom default scheduler, this is an experimental feature!""");
1509 return scheduler;
1510 } catch (Exception ex) {
1511 throw new Error(ex);
1512 }
1513 }
1514
1515 /**
1516 * Creates the default ForkJoinPool scheduler.
1517 */
1518 private static ForkJoinPool createDefaultForkJoinPoolScheduler() {
1519 ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool);
1520 int parallelism, maxPoolSize, minRunnable;
1521 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
1522 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
1523 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
1524 if (parallelismValue != null) {
1525 parallelism = Integer.parseInt(parallelismValue);
1526 } else {
1527 parallelism = Runtime.getRuntime().availableProcessors();
1528 }
1529 if (maxPoolSizeValue != null) {
1530 maxPoolSize = Integer.parseInt(maxPoolSizeValue);
1531 parallelism = Integer.min(parallelism, maxPoolSize);
1532 } else {
1533 maxPoolSize = Integer.max(parallelism, 256);
1534 }
1535 if (minRunnableValue != null) {
1536 minRunnable = Integer.parseInt(minRunnableValue);
1537 } else {
1538 minRunnable = Integer.max(parallelism / 2, 1);
1613 assert changed;
1614 vthread.unblock();
1615
1616 vthread = nextThread;
1617 }
1618 }
1619 }
1620
1621 /**
1622 * Retrieves the list of virtual threads that are waiting to be unblocked, waiting
1623 * if necessary until a list of one or more threads becomes available.
1624 */
1625 private static native VirtualThread takeVirtualThreadListToUnblock();
1626
1627 static {
1628 var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
1629 VirtualThread::unblockVirtualThreads);
1630 unblocker.setDaemon(true);
1631 unblocker.start();
1632 }
1633 }
|