1 /*
2 * Copyright (c) 2018, 2022, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package java.lang;
26
27 import java.lang.ref.Reference;
28 import java.security.AccessController;
29 import java.security.PrivilegedAction;
30 import java.util.Locale;
31 import java.util.Objects;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.Executor;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.ForkJoinPool;
36 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
37 import java.util.concurrent.ForkJoinTask;
38 import java.util.concurrent.ForkJoinWorkerThread;
39 import java.util.concurrent.Future;
40 import java.util.concurrent.RejectedExecutionException;
41 import java.util.concurrent.ScheduledExecutorService;
42 import java.util.concurrent.ScheduledThreadPoolExecutor;
43 import jdk.internal.event.ThreadSleepEvent;
44 import jdk.internal.event.VirtualThreadEndEvent;
45 import jdk.internal.event.VirtualThreadPinnedEvent;
46 import jdk.internal.event.VirtualThreadStartEvent;
47 import jdk.internal.event.VirtualThreadSubmitFailedEvent;
48 import jdk.internal.misc.CarrierThread;
49 import jdk.internal.misc.InnocuousThread;
50 import jdk.internal.misc.Unsafe;
51 import jdk.internal.vm.Continuation;
52 import jdk.internal.vm.ContinuationScope;
53 import jdk.internal.vm.StackableScope;
54 import jdk.internal.vm.ThreadContainer;
55 import jdk.internal.vm.ThreadContainers;
56 import jdk.internal.vm.annotation.ChangesCurrentThread;
57 import jdk.internal.vm.annotation.ForceInline;
58 import jdk.internal.vm.annotation.Hidden;
59 import jdk.internal.vm.annotation.IntrinsicCandidate;
60 import jdk.internal.vm.annotation.JvmtiMountTransition;
61 import sun.nio.ch.Interruptible;
62 import sun.security.action.GetPropertyAction;
63 import static java.util.concurrent.TimeUnit.*;
64
65 /**
66 * A thread that is scheduled by the Java virtual machine rather than the operating
67 * system.
68 */
69 final class VirtualThread extends BaseVirtualThread {
70 private static final Unsafe U = Unsafe.getUnsafe();
71 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
72 private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
73 private static final ScheduledExecutorService UNPARKER = createDelayedTaskScheduler();
74 private static final int TRACE_PINNING_MODE = tracePinningMode();
75
76 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
77 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
78 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
79 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
80
158 // choose scheduler if not specified
159 if (scheduler == null) {
160 Thread parent = Thread.currentThread();
161 if (parent instanceof VirtualThread vparent) {
162 scheduler = vparent.scheduler;
163 } else {
164 scheduler = DEFAULT_SCHEDULER;
165 }
166 }
167
168 this.scheduler = scheduler;
169 this.cont = new VThreadContinuation(this, task);
170 this.runContinuation = this::runContinuation;
171 }
172
173 /**
174 * The continuation that a virtual thread executes.
175 */
176 private static class VThreadContinuation extends Continuation {
177 VThreadContinuation(VirtualThread vthread, Runnable task) {
178 super(VTHREAD_SCOPE, () -> vthread.run(task));
179 }
180 @Override
181 protected void onPinned(Continuation.Pinned reason) {
182 if (TRACE_PINNING_MODE > 0) {
183 boolean printAll = (TRACE_PINNING_MODE == 1);
184 PinnedThreadPrinter.printStackTrace(System.out, printAll);
185 }
186 }
187 }
188
189 /**
190 * Runs or continues execution of the continuation on the current thread.
191 */
192 private void runContinuation() {
193 // the carrier must be a platform thread
194 if (Thread.currentThread().isVirtual()) {
195 throw new WrongThreadException();
196 }
197
198 // set state to RUNNING
199 boolean firstRun;
200 int initialState = state();
201 if (initialState == STARTED && compareAndSetState(STARTED, RUNNING)) {
202 // first run
203 firstRun = true;
204 } else if (initialState == RUNNABLE && compareAndSetState(RUNNABLE, RUNNING)) {
205 // consume parking permit
206 setParkPermit(false);
207 firstRun = false;
208 } else {
209 // not runnable
210 return;
211 }
212
213 // notify JVMTI before mount
214 notifyJvmtiMount(true, firstRun);
215
216 try {
217 cont.run();
218 } finally {
219 if (cont.isDone()) {
220 afterTerminate(/*executed*/ true);
221 } else {
222 afterYield();
223 }
224 }
225 }
226
227 /**
228 * Submits the runContinuation task to the scheduler. For the default scheduler,
229 * and calling it on a worker thread, the task will be pushed to the local queue,
230 * otherwise it will be pushed to a submission queue.
231 *
232 * @throws RejectedExecutionException
233 */
234 private void submitRunContinuation() {
235 try {
236 scheduler.execute(runContinuation);
237 } catch (RejectedExecutionException ree) {
238 submitFailed(ree);
239 throw ree;
240 }
274 private void submitFailed(RejectedExecutionException ree) {
275 var event = new VirtualThreadSubmitFailedEvent();
276 if (event.isEnabled()) {
277 event.javaThreadId = threadId();
278 event.exceptionMessage = ree.getMessage();
279 event.commit();
280 }
281 }
282
283 /**
284 * Runs a task in the context of this virtual thread. The virtual thread is
285 * mounted on the current (carrier) thread before the task runs. It unmounts
286 * from its carrier thread when the task completes.
287 */
288 @ChangesCurrentThread
289 private void run(Runnable task) {
290 assert state == RUNNING;
291
292 // first mount
293 mount();
294 notifyJvmtiMount(false, true);
295
296 // emit JFR event if enabled
297 if (VirtualThreadStartEvent.isTurnedOn()) {
298 var event = new VirtualThreadStartEvent();
299 event.javaThreadId = threadId();
300 event.commit();
301 }
302
303 Object bindings = scopedValueBindings();
304 try {
305 runWith(bindings, task);
306 } catch (Throwable exc) {
307 dispatchUncaughtException(exc);
308 } finally {
309 try {
310 // pop any remaining scopes from the stack, this may block
311 StackableScope.popAll();
312
313 // emit JFR event if enabled
314 if (VirtualThreadEndEvent.isTurnedOn()) {
315 var event = new VirtualThreadEndEvent();
316 event.javaThreadId = threadId();
317 event.commit();
318 }
319
320 } finally {
321 // last unmount
322 notifyJvmtiUnmount(true, true);
323 unmount();
324
325 // final state
326 setState(TERMINATED);
327 }
328 }
329 }
330
331 @Hidden
332 @ForceInline
333 private void runWith(Object bindings, Runnable op) {
334 ensureMaterializedForStackWalk(bindings);
335 op.run();
336 Reference.reachabilityFence(bindings);
337 }
338
339 /**
340 * Mounts this virtual thread onto the current platform thread. On
341 * return, the current thread is the virtual thread.
342 */
343 @ChangesCurrentThread
344 private void mount() {
345 // sets the carrier thread
346 Thread carrier = Thread.currentCarrierThread();
347 setCarrierThread(carrier);
348
349 // sync up carrier thread interrupt status if needed
350 if (interrupted) {
351 carrier.setInterrupt();
352 } else if (carrier.isInterrupted()) {
353 synchronized (interruptLock) {
354 // need to recheck interrupt status
355 if (!interrupted) {
356 carrier.clearInterrupt();
357 }
358 }
359 }
360
361 // set Thread.currentThread() to return this virtual thread
362 carrier.setCurrentThread(this);
363 }
364
365 /**
366 * Unmounts this virtual thread from the carrier. On return, the
367 * current thread is the current platform thread.
368 */
369 @ChangesCurrentThread
370 private void unmount() {
371 // set Thread.currentThread() to return the platform thread
372 Thread carrier = this.carrierThread;
373 carrier.setCurrentThread(carrier);
374
375 // break connection to carrier thread, synchronized with interrupt
376 synchronized (interruptLock) {
377 setCarrierThread(null);
378 }
379 carrier.clearInterrupt();
380 }
381
382 /**
383 * Sets the current thread to the current carrier thread.
384 */
385 @ChangesCurrentThread
386 @JvmtiMountTransition
387 private void switchToCarrierThread() {
388 notifyJvmtiHideFrames(true);
389 Thread carrier = this.carrierThread;
390 assert Thread.currentThread() == this
391 && carrier == Thread.currentCarrierThread();
392 carrier.setCurrentThread(carrier);
393 }
394
395 /**
396 * Sets the current thread to the given virtual thread.
397 */
398 @ChangesCurrentThread
399 @JvmtiMountTransition
400 private void switchToVirtualThread(VirtualThread vthread) {
401 Thread carrier = vthread.carrierThread;
402 assert carrier == Thread.currentCarrierThread();
403 carrier.setCurrentThread(vthread);
404 notifyJvmtiHideFrames(false);
405 }
406
407 /**
408 * Unmounts this virtual thread, invokes Continuation.yield, and re-mounts the
409 * thread when continued. When enabled, JVMTI must be notified from this method.
410 * @return true if the yield was successful
411 */
412 @ChangesCurrentThread
413 private boolean yieldContinuation() {
414 // unmount
415 notifyJvmtiUnmount(true, false);
416 unmount();
417 try {
418 return Continuation.yield(VTHREAD_SCOPE);
419 } finally {
420 // re-mount
421 mount();
422 notifyJvmtiMount(false, false);
423 }
424 }
425
426 /**
427 * Invoked after the continuation yields. If parking then it sets the state
428 * and also re-submits the task to continue if unparked while parking.
429 * If yielding due to Thread.yield then it just submits the task to continue.
430 */
431 private void afterYield() {
432 int s = state();
433 assert (s == PARKING || s == YIELDING) && (carrierThread == null);
434
435 if (s == PARKING) {
436 setState(PARKED);
437
438 // notify JVMTI that unmount has completed, thread is parked
439 notifyJvmtiUnmount(false, false);
440
441 // may have been unparked while parking
442 if (parkPermit && compareAndSetState(PARKED, RUNNABLE)) {
443 // lazy submit to continue on the current thread as carrier if possible
444 if (currentThread() instanceof CarrierThread ct) {
445 lazySubmitRunContinuation(ct.getPool());
446 } else {
447 submitRunContinuation();
448 }
449
450 }
451 } else if (s == YIELDING) { // Thread.yield
452 setState(RUNNABLE);
453
454 // notify JVMTI that unmount has completed, thread is runnable
455 notifyJvmtiUnmount(false, false);
456
457 // external submit if there are no tasks in the local task queue
458 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
459 externalSubmitRunContinuation(ct.getPool());
460 } else {
461 submitRunContinuation();
462 }
463 }
464 }
465
466 /**
467 * Invoked after the thread terminates (or start failed). This method
468 * notifies anyone waiting for the thread to terminate.
469 *
470 * @param executed true if the thread executed, false if it failed to start
471 */
472 private void afterTerminate(boolean executed) {
473 assert (state() == TERMINATED) && (carrierThread == null);
474
475 if (executed) {
476 notifyJvmtiUnmount(false, true);
477 }
478
479 // notify anyone waiting for this virtual thread to terminate
480 CountDownLatch termination = this.termination;
481 if (termination != null) {
482 assert termination.getCount() == 1;
483 termination.countDown();
484 }
485
486 if (executed) {
487 // notify container if thread executed
488 threadContainer().onExit(this);
489
490 // clear references to thread locals
491 clearReferences();
492 }
493 }
494
495 /**
496 * Schedules this {@code VirtualThread} to execute.
497 *
498 * @throws IllegalStateException if the container is shutdown or closed
499 * @throws IllegalThreadStateException if the thread has already been started
500 * @throws RejectedExecutionException if the scheduler cannot accept a task
501 */
502 @Override
503 void start(ThreadContainer container) {
504 if (!compareAndSetState(NEW, STARTED)) {
505 throw new IllegalThreadStateException("Already started");
506 }
507
508 // bind thread to container
509 setThreadContainer(container);
510
511 // start thread
512 boolean started = false;
513 container.onStart(this); // may throw
514 try {
515 // scoped values may be inherited
516 inheritScopedValueBindings(container);
517
518 // submit task to run thread
519 submitRunContinuation();
520 started = true;
521 } finally {
522 if (!started) {
523 setState(TERMINATED);
524 container.onExit(this);
525 afterTerminate(/*executed*/ false);
526 }
527 }
528 }
529
530 @Override
531 public void start() {
532 start(ThreadContainers.root());
533 }
534
535 @Override
536 public void run() {
537 // do nothing
538 }
539
540 /**
541 * Parks until unparked or interrupted. If already unparked then the parking
542 * permit is consumed and this method completes immediately (meaning it doesn't
543 * yield). It also completes immediately if the interrupt status is set.
544 */
545 @Override
546 void park() {
547 assert Thread.currentThread() == this;
548
549 // complete immediately if parking permit available or interrupted
550 if (getAndSetParkPermit(false) || interrupted)
551 return;
552
553 // park the thread
554 setState(PARKING);
555 try {
556 if (!yieldContinuation()) {
557 // park on the carrier thread when pinned
558 parkOnCarrierThread(false, 0);
559 }
560 } finally {
561 assert (Thread.currentThread() == this) && (state() == RUNNING);
562 }
563 }
564
565 /**
566 * Parks up to the given waiting time or until unparked or interrupted.
567 * If already unparked then the parking permit is consumed and this method
568 * completes immediately (meaning it doesn't yield). It also completes immediately
569 * if the interrupt status is set or the waiting time is {@code <= 0}.
570 *
571 * @param nanos the maximum number of nanoseconds to wait.
572 */
573 @Override
574 void parkNanos(long nanos) {
575 assert Thread.currentThread() == this;
576
577 // complete immediately if parking permit available or interrupted
578 if (getAndSetParkPermit(false) || interrupted)
579 return;
580
581 // park the thread for the waiting time
582 if (nanos > 0) {
583 long startTime = System.nanoTime();
584
585 boolean yielded;
586 Future<?> unparker = scheduleUnpark(this::unpark, nanos);
587 setState(PARKING);
588 try {
589 yielded = yieldContinuation();
590 } finally {
591 assert (Thread.currentThread() == this)
592 && (state() == RUNNING || state() == PARKING);
593 cancel(unparker);
594 }
595
596 // park on carrier thread for remaining time when pinned
597 if (!yielded) {
598 long deadline = startTime + nanos;
599 if (deadline < 0L)
600 deadline = Long.MAX_VALUE;
601 parkOnCarrierThread(true, deadline - System.nanoTime());
602 }
603 }
604 }
605
606 /**
607 * Parks the current carrier thread up to the given waiting time or until
608 * unparked or interrupted. If the virtual thread is interrupted then the
609 * interrupt status will be propagated to the carrier thread.
610 * @param timed true for a timed park, false for untimed
611 * @param nanos the waiting time in nanoseconds
612 */
613 private void parkOnCarrierThread(boolean timed, long nanos) {
614 assert state() == PARKING;
615
616 var pinnedEvent = new VirtualThreadPinnedEvent();
617 pinnedEvent.begin();
618
619 setState(PINNED);
620 try {
621 if (!parkPermit) {
622 if (!timed) {
623 U.park(false, 0);
624 } else if (nanos > 0) {
625 U.park(false, nanos);
626 }
627 }
628 } finally {
629 setState(RUNNING);
630 }
631
632 // consume parking permit
633 setParkPermit(false);
634
635 pinnedEvent.commit();
636 }
637
638 /**
639 * Schedule an unpark task to run after a given delay.
640 */
641 @ChangesCurrentThread
642 private Future<?> scheduleUnpark(Runnable unparker, long nanos) {
643 // need to switch to current carrier thread to avoid nested parking
644 switchToCarrierThread();
645 try {
646 return UNPARKER.schedule(unparker, nanos, NANOSECONDS);
647 } finally {
648 switchToVirtualThread(this);
649 }
650 }
651
652 /**
653 * Cancels a task if it has not completed.
654 */
655 @ChangesCurrentThread
690 submitRunContinuation();
691 }
692 } else if (s == PINNED) {
693 // unpark carrier thread when pinned.
694 synchronized (carrierThreadAccessLock()) {
695 Thread carrier = carrierThread;
696 if (carrier != null && state() == PINNED) {
697 U.unpark(carrier);
698 }
699 }
700 }
701 }
702 }
703
704 /**
705 * Attempts to yield the current virtual thread (Thread.yield).
706 */
707 void tryYield() {
708 assert Thread.currentThread() == this;
709 setState(YIELDING);
710 try {
711 yieldContinuation();
712 } finally {
713 assert Thread.currentThread() == this;
714 if (state() != RUNNING) {
715 assert state() == YIELDING;
716 setState(RUNNING);
717 }
718 }
719 }
720
721 /**
722 * Sleep the current virtual thread for the given sleep time.
723 *
724 * @param nanos the maximum number of nanoseconds to sleep
725 * @throws InterruptedException if interrupted while sleeping
726 */
727 void sleepNanos(long nanos) throws InterruptedException {
728 assert Thread.currentThread() == this;
729 if (nanos >= 0) {
730 if (ThreadSleepEvent.isTurnedOn()) {
731 ThreadSleepEvent event = new ThreadSleepEvent();
732 try {
733 event.time = nanos;
734 event.begin();
735 doSleepNanos(nanos);
736 } finally {
737 event.commit();
738 }
739 } else {
740 doSleepNanos(nanos);
741 }
742 }
743 }
744
745 /**
746 * Sleep the current thread for the given sleep time (in nanoseconds). If
747 * nanos is 0 then the thread will attempt to yield.
748 *
749 * @implNote This implementation parks the thread for the given sleeping time
750 * and will therefore be observed in PARKED state during the sleep. Parking
751 * will consume the parking permit so this method makes available the parking
752 * permit after the sleep. This may be observed as a spurious, but benign,
753 * wakeup when the thread subsequently attempts to park.
754 */
755 private void doSleepNanos(long nanos) throws InterruptedException {
756 assert nanos >= 0;
757 if (getAndClearInterrupt())
758 throw new InterruptedException();
759 if (nanos == 0) {
760 tryYield();
761 } else {
762 // park for the sleep time
763 try {
764 long remainingNanos = nanos;
765 long startNanos = System.nanoTime();
766 while (remainingNanos > 0) {
767 parkNanos(remainingNanos);
768 if (getAndClearInterrupt()) {
769 throw new InterruptedException();
770 }
771 remainingNanos = nanos - (System.nanoTime() - startNanos);
772 }
773 } finally {
774 // may have been unparked while sleeping
775 setParkPermit(true);
776 }
|
1 /*
2 * Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package java.lang;
26
27 import java.lang.ref.Reference;
28 import java.security.AccessController;
29 import java.security.PrivilegedAction;
30 import java.util.Locale;
31 import java.util.Objects;
32 import java.util.concurrent.Callable;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.Executor;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.ForkJoinPool;
37 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
38 import java.util.concurrent.ForkJoinTask;
39 import java.util.concurrent.ForkJoinWorkerThread;
40 import java.util.concurrent.Future;
41 import java.util.concurrent.RejectedExecutionException;
42 import java.util.concurrent.ScheduledExecutorService;
43 import java.util.concurrent.ScheduledThreadPoolExecutor;
44 import jdk.internal.event.VirtualThreadEndEvent;
45 import jdk.internal.event.VirtualThreadPinnedEvent;
46 import jdk.internal.event.VirtualThreadStartEvent;
47 import jdk.internal.event.VirtualThreadSubmitFailedEvent;
48 import jdk.internal.misc.CarrierThread;
49 import jdk.internal.misc.InnocuousThread;
50 import jdk.internal.misc.Unsafe;
51 import jdk.internal.vm.Continuation;
52 import jdk.internal.vm.ContinuationScope;
53 import jdk.internal.vm.StackableScope;
54 import jdk.internal.vm.ThreadContainer;
55 import jdk.internal.vm.ThreadContainers;
56 import jdk.internal.vm.annotation.ChangesCurrentThread;
57 import jdk.internal.vm.annotation.ForceInline;
58 import jdk.internal.vm.annotation.Hidden;
59 import jdk.internal.vm.annotation.IntrinsicCandidate;
60 import jdk.internal.vm.annotation.JvmtiMountTransition;
61 import jdk.internal.vm.annotation.ReservedStackAccess;
62 import sun.nio.ch.Interruptible;
63 import sun.security.action.GetPropertyAction;
64 import static java.util.concurrent.TimeUnit.*;
65
66 /**
67 * A thread that is scheduled by the Java virtual machine rather than the operating
68 * system.
69 */
70 final class VirtualThread extends BaseVirtualThread {
71 private static final Unsafe U = Unsafe.getUnsafe();
72 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
73 private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
74 private static final ScheduledExecutorService UNPARKER = createDelayedTaskScheduler();
75 private static final int TRACE_PINNING_MODE = tracePinningMode();
76
77 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
78 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
79 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
80 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
81
159 // choose scheduler if not specified
160 if (scheduler == null) {
161 Thread parent = Thread.currentThread();
162 if (parent instanceof VirtualThread vparent) {
163 scheduler = vparent.scheduler;
164 } else {
165 scheduler = DEFAULT_SCHEDULER;
166 }
167 }
168
169 this.scheduler = scheduler;
170 this.cont = new VThreadContinuation(this, task);
171 this.runContinuation = this::runContinuation;
172 }
173
174 /**
175 * The continuation that a virtual thread executes.
176 */
177 private static class VThreadContinuation extends Continuation {
178 VThreadContinuation(VirtualThread vthread, Runnable task) {
179 super(VTHREAD_SCOPE, wrap(vthread, task));
180 }
181 @Override
182 protected void onPinned(Continuation.Pinned reason) {
183 if (TRACE_PINNING_MODE > 0) {
184 boolean printAll = (TRACE_PINNING_MODE == 1);
185 PinnedThreadPrinter.printStackTrace(System.out, printAll);
186 }
187 }
188 private static Runnable wrap(VirtualThread vthread, Runnable task) {
189 return new Runnable() {
190 @Hidden
191 public void run() {
192 vthread.run(task);
193 }
194 };
195 }
196 }
197
198 /**
199 * Runs or continues execution of the continuation on the current thread.
200 */
201 private void runContinuation() {
202 // the carrier must be a platform thread
203 if (Thread.currentThread().isVirtual()) {
204 throw new WrongThreadException();
205 }
206
207 // set state to RUNNING
208 boolean firstRun;
209 int initialState = state();
210 if (initialState == STARTED && compareAndSetState(STARTED, RUNNING)) {
211 // first run
212 firstRun = true;
213 } else if (initialState == RUNNABLE && compareAndSetState(RUNNABLE, RUNNING)) {
214 // consume parking permit
215 setParkPermit(false);
216 firstRun = false;
217 } else {
218 // not runnable
219 return;
220 }
221
222 // notify JVMTI before mount
223 notifyJvmtiMount(/*hide*/true, firstRun);
224
225 try {
226 cont.run();
227 } finally {
228 if (cont.isDone()) {
229 afterTerminate();
230 } else {
231 afterYield();
232 }
233 }
234 }
235
236 /**
237 * Submits the runContinuation task to the scheduler. For the default scheduler,
238 * and calling it on a worker thread, the task will be pushed to the local queue,
239 * otherwise it will be pushed to a submission queue.
240 *
241 * @throws RejectedExecutionException
242 */
243 private void submitRunContinuation() {
244 try {
245 scheduler.execute(runContinuation);
246 } catch (RejectedExecutionException ree) {
247 submitFailed(ree);
248 throw ree;
249 }
283 private void submitFailed(RejectedExecutionException ree) {
284 var event = new VirtualThreadSubmitFailedEvent();
285 if (event.isEnabled()) {
286 event.javaThreadId = threadId();
287 event.exceptionMessage = ree.getMessage();
288 event.commit();
289 }
290 }
291
292 /**
293 * Runs a task in the context of this virtual thread. The virtual thread is
294 * mounted on the current (carrier) thread before the task runs. It unmounts
295 * from its carrier thread when the task completes.
296 */
297 @ChangesCurrentThread
298 private void run(Runnable task) {
299 assert state == RUNNING;
300
301 // first mount
302 mount();
303 notifyJvmtiMount(/*hide*/false, /*first*/true);
304
305 // emit JFR event if enabled
306 if (VirtualThreadStartEvent.isTurnedOn()) {
307 var event = new VirtualThreadStartEvent();
308 event.javaThreadId = threadId();
309 event.commit();
310 }
311
312 Object bindings = scopedValueBindings();
313 try {
314 runWith(bindings, task);
315 } catch (Throwable exc) {
316 dispatchUncaughtException(exc);
317 } finally {
318 try {
319 // pop any remaining scopes from the stack, this may block
320 StackableScope.popAll();
321
322 // emit JFR event if enabled
323 if (VirtualThreadEndEvent.isTurnedOn()) {
324 var event = new VirtualThreadEndEvent();
325 event.javaThreadId = threadId();
326 event.commit();
327 }
328
329 } finally {
330 // last unmount
331 notifyJvmtiUnmount(/*hide*/true, /*last*/true);
332 unmount();
333
334 // final state
335 setState(TERMINATED);
336 }
337 }
338 }
339
340 @Hidden
341 @ForceInline
342 private void runWith(Object bindings, Runnable op) {
343 ensureMaterializedForStackWalk(bindings);
344 op.run();
345 Reference.reachabilityFence(bindings);
346 }
347
348 /**
349 * Mounts this virtual thread onto the current platform thread. On
350 * return, the current thread is the virtual thread.
351 */
352 @ChangesCurrentThread
353 @ReservedStackAccess
354 private void mount() {
355 // sets the carrier thread
356 Thread carrier = Thread.currentCarrierThread();
357 setCarrierThread(carrier);
358
359 // sync up carrier thread interrupt status if needed
360 if (interrupted) {
361 carrier.setInterrupt();
362 } else if (carrier.isInterrupted()) {
363 synchronized (interruptLock) {
364 // need to recheck interrupt status
365 if (!interrupted) {
366 carrier.clearInterrupt();
367 }
368 }
369 }
370
371 // set Thread.currentThread() to return this virtual thread
372 carrier.setCurrentThread(this);
373 }
374
375 /**
376 * Unmounts this virtual thread from the carrier. On return, the
377 * current thread is the current platform thread.
378 */
379 @ChangesCurrentThread
380 @ReservedStackAccess
381 private void unmount() {
382 // set Thread.currentThread() to return the platform thread
383 Thread carrier = this.carrierThread;
384 carrier.setCurrentThread(carrier);
385
386 // break connection to carrier thread, synchronized with interrupt
387 synchronized (interruptLock) {
388 setCarrierThread(null);
389 }
390 carrier.clearInterrupt();
391 }
392
393 /**
394 * Sets the current thread to the current carrier thread.
395 */
396 @ChangesCurrentThread
397 @JvmtiMountTransition
398 private void switchToCarrierThread() {
399 notifyJvmtiHideFrames(true);
400 Thread carrier = this.carrierThread;
401 assert Thread.currentThread() == this
402 && carrier == Thread.currentCarrierThread();
403 carrier.setCurrentThread(carrier);
404 }
405
406 /**
407 * Sets the current thread to the given virtual thread.
408 */
409 @ChangesCurrentThread
410 @JvmtiMountTransition
411 private void switchToVirtualThread(VirtualThread vthread) {
412 Thread carrier = vthread.carrierThread;
413 assert carrier == Thread.currentCarrierThread();
414 carrier.setCurrentThread(vthread);
415 notifyJvmtiHideFrames(false);
416 }
417
418 /**
419 * Executes the given value returning task on the current carrier thread.
420 */
421 @ChangesCurrentThread
422 <V> V executeOnCarrierThread(Callable<V> task) throws Exception {
423 assert Thread.currentThread() == this;
424 switchToCarrierThread();
425 try {
426 return task.call();
427 } finally {
428 switchToVirtualThread(this);
429 }
430 }
431
432 /**
433 * Unmounts this virtual thread, invokes Continuation.yield, and re-mounts the
434 * thread when continued. When enabled, JVMTI must be notified from this method.
435 * @return true if the yield was successful
436 */
437 @Hidden
438 @ChangesCurrentThread
439 private boolean yieldContinuation() {
440 // unmount
441 notifyJvmtiUnmount(/*hide*/true, /*last*/false);
442 unmount();
443 try {
444 return Continuation.yield(VTHREAD_SCOPE);
445 } finally {
446 // re-mount
447 mount();
448 notifyJvmtiMount(/*hide*/false, /*first*/false);
449 }
450 }
451
452 /**
453 * Invoked after the continuation yields. If parking then it sets the state
454 * and also re-submits the task to continue if unparked while parking.
455 * If yielding due to Thread.yield then it just submits the task to continue.
456 */
457 private void afterYield() {
458 int s = state();
459 assert (s == PARKING || s == YIELDING) && (carrierThread == null);
460
461 if (s == PARKING) {
462 setState(PARKED);
463
464 // notify JVMTI that unmount has completed, thread is parked
465 notifyJvmtiUnmount(/*hide*/false, /*last*/false);
466
467 // may have been unparked while parking
468 if (parkPermit && compareAndSetState(PARKED, RUNNABLE)) {
469 // lazy submit to continue on the current thread as carrier if possible
470 if (currentThread() instanceof CarrierThread ct) {
471 lazySubmitRunContinuation(ct.getPool());
472 } else {
473 submitRunContinuation();
474 }
475
476 }
477 } else if (s == YIELDING) { // Thread.yield
478 setState(RUNNABLE);
479
480 // notify JVMTI that unmount has completed, thread is runnable
481 notifyJvmtiUnmount(/*hide*/false, /*last*/false);
482
483 // external submit if there are no tasks in the local task queue
484 if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
485 externalSubmitRunContinuation(ct.getPool());
486 } else {
487 submitRunContinuation();
488 }
489 }
490 }
491
492 /**
493 * Invoked after the thread terminates execution. It notifies anyone
494 * waiting for the thread to terminate.
495 */
496 private void afterTerminate() {
497 afterTerminate(true, true);
498 }
499
500 /**
501 * Invoked after the thread terminates (or start failed). This method
502 * notifies anyone waiting for the thread to terminate.
503 *
504 * @param notifyContainer true if its container should be notified
505 * @param executed true if the thread executed, false if it failed to start
506 */
507 private void afterTerminate(boolean notifyContainer, boolean executed) {
508 assert (state() == TERMINATED) && (carrierThread == null);
509
510 if (executed) {
511 notifyJvmtiUnmount(/*hide*/false, /*last*/true);
512 }
513
514 // notify anyone waiting for this virtual thread to terminate
515 CountDownLatch termination = this.termination;
516 if (termination != null) {
517 assert termination.getCount() == 1;
518 termination.countDown();
519 }
520
521 // notify container
522 if (notifyContainer) {
523 threadContainer().onExit(this);
524 }
525
526 // clear references to thread locals
527 clearReferences();
528 }
529
530 /**
531 * Schedules this {@code VirtualThread} to execute.
532 *
533 * @throws IllegalStateException if the container is shutdown or closed
534 * @throws IllegalThreadStateException if the thread has already been started
535 * @throws RejectedExecutionException if the scheduler cannot accept a task
536 */
537 @Override
538 void start(ThreadContainer container) {
539 if (!compareAndSetState(NEW, STARTED)) {
540 throw new IllegalThreadStateException("Already started");
541 }
542
543 // bind thread to container
544 setThreadContainer(container);
545
546 // start thread
547 boolean addedToContainer = false;
548 boolean started = false;
549 try {
550 container.onStart(this); // may throw
551 addedToContainer = true;
552
553 // scoped values may be inherited
554 inheritScopedValueBindings(container);
555
556 // submit task to run thread
557 submitRunContinuation();
558 started = true;
559 } finally {
560 if (!started) {
561 setState(TERMINATED);
562 afterTerminate(addedToContainer, /*executed*/false);
563 }
564 }
565 }
566
567 @Override
568 public void start() {
569 start(ThreadContainers.root());
570 }
571
572 @Override
573 public void run() {
574 // do nothing
575 }
576
577 /**
578 * Parks until unparked or interrupted. If already unparked then the parking
579 * permit is consumed and this method completes immediately (meaning it doesn't
580 * yield). It also completes immediately if the interrupt status is set.
581 */
582 @Override
583 void park() {
584 assert Thread.currentThread() == this;
585
586 // complete immediately if parking permit available or interrupted
587 if (getAndSetParkPermit(false) || interrupted)
588 return;
589
590 // park the thread
591 boolean yielded = false;
592 setState(PARKING);
593 try {
594 yielded = yieldContinuation(); // may throw
595 } finally {
596 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
597 if (!yielded) {
598 assert state() == PARKING;
599 setState(RUNNING);
600 }
601 }
602
603 // park on the carrier thread when pinned
604 if (!yielded) {
605 parkOnCarrierThread(false, 0);
606 }
607 }
608
609 /**
610 * Parks up to the given waiting time or until unparked or interrupted.
611 * If already unparked then the parking permit is consumed and this method
612 * completes immediately (meaning it doesn't yield). It also completes immediately
613 * if the interrupt status is set or the waiting time is {@code <= 0}.
614 *
615 * @param nanos the maximum number of nanoseconds to wait.
616 */
617 @Override
618 void parkNanos(long nanos) {
619 assert Thread.currentThread() == this;
620
621 // complete immediately if parking permit available or interrupted
622 if (getAndSetParkPermit(false) || interrupted)
623 return;
624
625 // park the thread for the waiting time
626 if (nanos > 0) {
627 long startTime = System.nanoTime();
628
629 boolean yielded = false;
630 Future<?> unparker = scheduleUnpark(this::unpark, nanos);
631 setState(PARKING);
632 try {
633 yielded = yieldContinuation(); // may throw
634 } finally {
635 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
636 if (!yielded) {
637 assert state() == PARKING;
638 setState(RUNNING);
639 }
640 cancel(unparker);
641 }
642
643 // park on carrier thread for remaining time when pinned
644 if (!yielded) {
645 long deadline = startTime + nanos;
646 if (deadline < 0L)
647 deadline = Long.MAX_VALUE;
648 parkOnCarrierThread(true, deadline - System.nanoTime());
649 }
650 }
651 }
652
653 /**
654 * Parks the current carrier thread up to the given waiting time or until
655 * unparked or interrupted. If the virtual thread is interrupted then the
656 * interrupt status will be propagated to the carrier thread.
657 * @param timed true for a timed park, false for untimed
658 * @param nanos the waiting time in nanoseconds
659 */
660 private void parkOnCarrierThread(boolean timed, long nanos) {
661 assert state() == RUNNING;
662
663 VirtualThreadPinnedEvent event;
664 try {
665 event = new VirtualThreadPinnedEvent();
666 event.begin();
667 } catch (OutOfMemoryError e) {
668 event = null;
669 }
670
671 setState(PINNED);
672 try {
673 if (!parkPermit) {
674 if (!timed) {
675 U.park(false, 0);
676 } else if (nanos > 0) {
677 U.park(false, nanos);
678 }
679 }
680 } finally {
681 setState(RUNNING);
682 }
683
684 // consume parking permit
685 setParkPermit(false);
686
687 if (event != null) {
688 try {
689 event.commit();
690 } catch (OutOfMemoryError e) {
691 // ignore
692 }
693 }
694 }
695
696 /**
697 * Schedule an unpark task to run after a given delay.
698 */
699 @ChangesCurrentThread
700 private Future<?> scheduleUnpark(Runnable unparker, long nanos) {
701 // need to switch to current carrier thread to avoid nested parking
702 switchToCarrierThread();
703 try {
704 return UNPARKER.schedule(unparker, nanos, NANOSECONDS);
705 } finally {
706 switchToVirtualThread(this);
707 }
708 }
709
710 /**
711 * Cancels a task if it has not completed.
712 */
713 @ChangesCurrentThread
748 submitRunContinuation();
749 }
750 } else if (s == PINNED) {
751 // unpark carrier thread when pinned.
752 synchronized (carrierThreadAccessLock()) {
753 Thread carrier = carrierThread;
754 if (carrier != null && state() == PINNED) {
755 U.unpark(carrier);
756 }
757 }
758 }
759 }
760 }
761
762 /**
763 * Attempts to yield the current virtual thread (Thread.yield).
764 */
765 void tryYield() {
766 assert Thread.currentThread() == this;
767 setState(YIELDING);
768 boolean yielded = false;
769 try {
770 yielded = yieldContinuation(); // may throw
771 } finally {
772 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
773 if (!yielded) {
774 assert state() == YIELDING;
775 setState(RUNNING);
776 }
777 }
778 }
779
780 /**
781 * Sleep the current thread for the given sleep time (in nanoseconds). If
782 * nanos is 0 then the thread will attempt to yield.
783 *
784 * @implNote This implementation parks the thread for the given sleeping time
785 * and will therefore be observed in PARKED state during the sleep. Parking
786 * will consume the parking permit so this method makes available the parking
787 * permit after the sleep. This may be observed as a spurious, but benign,
788 * wakeup when the thread subsequently attempts to park.
789 *
790 * @param nanos the maximum number of nanoseconds to sleep
791 * @throws InterruptedException if interrupted while sleeping
792 */
793 void sleepNanos(long nanos) throws InterruptedException {
794 assert Thread.currentThread() == this && nanos >= 0;
795 if (getAndClearInterrupt())
796 throw new InterruptedException();
797 if (nanos == 0) {
798 tryYield();
799 } else {
800 // park for the sleep time
801 try {
802 long remainingNanos = nanos;
803 long startNanos = System.nanoTime();
804 while (remainingNanos > 0) {
805 parkNanos(remainingNanos);
806 if (getAndClearInterrupt()) {
807 throw new InterruptedException();
808 }
809 remainingNanos = nanos - (System.nanoTime() - startNanos);
810 }
811 } finally {
812 // may have been unparked while sleeping
813 setParkPermit(true);
814 }
|