< prev index next > src/java.base/share/classes/java/lang/VirtualThread.java
Print this page
/*
- * Copyright (c) 2018, 2022, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
import java.lang.ref.Reference;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Locale;
import java.util.Objects;
+ import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
- import jdk.internal.event.ThreadSleepEvent;
import jdk.internal.event.VirtualThreadEndEvent;
import jdk.internal.event.VirtualThreadPinnedEvent;
import jdk.internal.event.VirtualThreadStartEvent;
import jdk.internal.event.VirtualThreadSubmitFailedEvent;
import jdk.internal.misc.CarrierThread;
import jdk.internal.vm.annotation.ChangesCurrentThread;
import jdk.internal.vm.annotation.ForceInline;
import jdk.internal.vm.annotation.Hidden;
import jdk.internal.vm.annotation.IntrinsicCandidate;
import jdk.internal.vm.annotation.JvmtiMountTransition;
+ import jdk.internal.vm.annotation.ReservedStackAccess;
import sun.nio.ch.Interruptible;
import sun.security.action.GetPropertyAction;
import static java.util.concurrent.TimeUnit.*;
/**
/**
* The continuation that a virtual thread executes.
*/
private static class VThreadContinuation extends Continuation {
VThreadContinuation(VirtualThread vthread, Runnable task) {
- super(VTHREAD_SCOPE, () -> vthread.run(task));
+ super(VTHREAD_SCOPE, wrap(vthread, task));
}
@Override
protected void onPinned(Continuation.Pinned reason) {
if (TRACE_PINNING_MODE > 0) {
boolean printAll = (TRACE_PINNING_MODE == 1);
PinnedThreadPrinter.printStackTrace(System.out, printAll);
}
}
+ private static Runnable wrap(VirtualThread vthread, Runnable task) {
+ return new Runnable() {
+ @Hidden
+ public void run() {
+ vthread.run(task);
+ }
+ };
+ }
}
/**
* Runs or continues execution of the continuation on the current thread.
*/
// not runnable
return;
}
// notify JVMTI before mount
- notifyJvmtiMount(true, firstRun);
+ notifyJvmtiMount(/*hide*/true, firstRun);
try {
cont.run();
} finally {
if (cont.isDone()) {
- afterTerminate(/*executed*/ true);
+ afterTerminate();
} else {
afterYield();
}
}
}
private void run(Runnable task) {
assert state == RUNNING;
// first mount
mount();
- notifyJvmtiMount(false, true);
+ notifyJvmtiMount(/*hide*/false, /*first*/true);
// emit JFR event if enabled
if (VirtualThreadStartEvent.isTurnedOn()) {
var event = new VirtualThreadStartEvent();
event.javaThreadId = threadId();
event.commit();
}
} finally {
// last unmount
- notifyJvmtiUnmount(true, true);
+ notifyJvmtiUnmount(/*hide*/true, /*last*/true);
unmount();
// final state
setState(TERMINATED);
}
/**
* Mounts this virtual thread onto the current platform thread. On
* return, the current thread is the virtual thread.
*/
@ChangesCurrentThread
+ @ReservedStackAccess
private void mount() {
// sets the carrier thread
Thread carrier = Thread.currentCarrierThread();
setCarrierThread(carrier);
/**
* Unmounts this virtual thread from the carrier. On return, the
* current thread is the current platform thread.
*/
@ChangesCurrentThread
+ @ReservedStackAccess
private void unmount() {
// set Thread.currentThread() to return the platform thread
Thread carrier = this.carrierThread;
carrier.setCurrentThread(carrier);
assert carrier == Thread.currentCarrierThread();
carrier.setCurrentThread(vthread);
notifyJvmtiHideFrames(false);
}
+ /**
+ * Executes the given value returning task on the current carrier thread.
+ */
+ @ChangesCurrentThread
+ <V> V executeOnCarrierThread(Callable<V> task) throws Exception {
+ assert Thread.currentThread() == this;
+ switchToCarrierThread();
+ try {
+ return task.call();
+ } finally {
+ switchToVirtualThread(this);
+ }
+ }
+
/**
* Unmounts this virtual thread, invokes Continuation.yield, and re-mounts the
* thread when continued. When enabled, JVMTI must be notified from this method.
* @return true if the yield was successful
*/
+ @Hidden
@ChangesCurrentThread
private boolean yieldContinuation() {
// unmount
- notifyJvmtiUnmount(true, false);
+ notifyJvmtiUnmount(/*hide*/true, /*last*/false);
unmount();
try {
return Continuation.yield(VTHREAD_SCOPE);
} finally {
// re-mount
mount();
- notifyJvmtiMount(false, false);
+ notifyJvmtiMount(/*hide*/false, /*first*/false);
}
}
/**
* Invoked after the continuation yields. If parking then it sets the state
if (s == PARKING) {
setState(PARKED);
// notify JVMTI that unmount has completed, thread is parked
- notifyJvmtiUnmount(false, false);
+ notifyJvmtiUnmount(/*hide*/false, /*last*/false);
// may have been unparked while parking
if (parkPermit && compareAndSetState(PARKED, RUNNABLE)) {
// lazy submit to continue on the current thread as carrier if possible
if (currentThread() instanceof CarrierThread ct) {
}
} else if (s == YIELDING) { // Thread.yield
setState(RUNNABLE);
// notify JVMTI that unmount has completed, thread is runnable
- notifyJvmtiUnmount(false, false);
+ notifyJvmtiUnmount(/*hide*/false, /*last*/false);
// external submit if there are no tasks in the local task queue
if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
externalSubmitRunContinuation(ct.getPool());
} else {
submitRunContinuation();
}
}
}
+ /**
+ * Invoked after the thread terminates execution. It notifies anyone
+ * waiting for the thread to terminate.
+ */
+ private void afterTerminate() {
+ afterTerminate(true, true);
+ }
+
/**
* Invoked after the thread terminates (or start failed). This method
* notifies anyone waiting for the thread to terminate.
*
+ * @param notifyContainer true if its container should be notified
* @param executed true if the thread executed, false if it failed to start
*/
- private void afterTerminate(boolean executed) {
+ private void afterTerminate(boolean notifyContainer, boolean executed) {
assert (state() == TERMINATED) && (carrierThread == null);
if (executed) {
- notifyJvmtiUnmount(false, true);
+ notifyJvmtiUnmount(/*hide*/false, /*last*/true);
}
// notify anyone waiting for this virtual thread to terminate
CountDownLatch termination = this.termination;
if (termination != null) {
assert termination.getCount() == 1;
termination.countDown();
}
- if (executed) {
- // notify container if thread executed
+ // notify container
+ if (notifyContainer) {
threadContainer().onExit(this);
-
- // clear references to thread locals
- clearReferences();
}
+
+ // clear references to thread locals
+ clearReferences();
}
/**
* Schedules this {@code VirtualThread} to execute.
*
if (!compareAndSetState(NEW, STARTED)) {
throw new IllegalThreadStateException("Already started");
}
// bind thread to container
+ assert threadContainer() == null;
setThreadContainer(container);
// start thread
+ boolean addedToContainer = false;
boolean started = false;
- container.onStart(this); // may throw
try {
+ container.onStart(this); // may throw
+ addedToContainer = true;
+
// scoped values may be inherited
inheritScopedValueBindings(container);
// submit task to run thread
submitRunContinuation();
started = true;
} finally {
if (!started) {
setState(TERMINATED);
- container.onExit(this);
- afterTerminate(/*executed*/ false);
+ afterTerminate(addedToContainer, /*executed*/false);
}
}
}
@Override
// complete immediately if parking permit available or interrupted
if (getAndSetParkPermit(false) || interrupted)
return;
// park the thread
+ boolean yielded = false;
setState(PARKING);
try {
- if (!yieldContinuation()) {
- // park on the carrier thread when pinned
- parkOnCarrierThread(false, 0);
- }
+ yielded = yieldContinuation(); // may throw
} finally {
- assert (Thread.currentThread() == this) && (state() == RUNNING);
+ assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
+ if (!yielded) {
+ assert state() == PARKING;
+ setState(RUNNING);
+ }
+ }
+
+ // park on the carrier thread when pinned
+ if (!yielded) {
+ parkOnCarrierThread(false, 0);
}
}
/**
* Parks up to the given waiting time or until unparked or interrupted.
// park the thread for the waiting time
if (nanos > 0) {
long startTime = System.nanoTime();
- boolean yielded;
+ boolean yielded = false;
Future<?> unparker = scheduleUnpark(this::unpark, nanos);
setState(PARKING);
try {
- yielded = yieldContinuation();
+ yielded = yieldContinuation(); // may throw
} finally {
- assert (Thread.currentThread() == this)
- && (state() == RUNNING || state() == PARKING);
+ assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
+ if (!yielded) {
+ assert state() == PARKING;
+ setState(RUNNING);
+ }
cancel(unparker);
}
// park on carrier thread for remaining time when pinned
if (!yielded) {
* interrupt status will be propagated to the carrier thread.
* @param timed true for a timed park, false for untimed
* @param nanos the waiting time in nanoseconds
*/
private void parkOnCarrierThread(boolean timed, long nanos) {
- assert state() == PARKING;
+ assert state() == RUNNING;
- var pinnedEvent = new VirtualThreadPinnedEvent();
- pinnedEvent.begin();
+ VirtualThreadPinnedEvent event;
+ try {
+ event = new VirtualThreadPinnedEvent();
+ event.begin();
+ } catch (OutOfMemoryError e) {
+ event = null;
+ }
setState(PINNED);
try {
if (!parkPermit) {
if (!timed) {
}
// consume parking permit
setParkPermit(false);
- pinnedEvent.commit();
+ if (event != null) {
+ try {
+ event.commit();
+ } catch (OutOfMemoryError e) {
+ // ignore
+ }
+ }
}
/**
* Schedule an unpark task to run after a given delay.
*/
* Attempts to yield the current virtual thread (Thread.yield).
*/
void tryYield() {
assert Thread.currentThread() == this;
setState(YIELDING);
+ boolean yielded = false;
try {
- yieldContinuation();
+ yielded = yieldContinuation(); // may throw
} finally {
- assert Thread.currentThread() == this;
- if (state() != RUNNING) {
+ assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
+ if (!yielded) {
assert state() == YIELDING;
setState(RUNNING);
}
}
}
- /**
- * Sleep the current virtual thread for the given sleep time.
- *
- * @param nanos the maximum number of nanoseconds to sleep
- * @throws InterruptedException if interrupted while sleeping
- */
- void sleepNanos(long nanos) throws InterruptedException {
- assert Thread.currentThread() == this;
- if (nanos >= 0) {
- if (ThreadSleepEvent.isTurnedOn()) {
- ThreadSleepEvent event = new ThreadSleepEvent();
- try {
- event.time = nanos;
- event.begin();
- doSleepNanos(nanos);
- } finally {
- event.commit();
- }
- } else {
- doSleepNanos(nanos);
- }
- }
- }
-
/**
* Sleep the current thread for the given sleep time (in nanoseconds). If
* nanos is 0 then the thread will attempt to yield.
*
* @implNote This implementation parks the thread for the given sleeping time
* and will therefore be observed in PARKED state during the sleep. Parking
* will consume the parking permit so this method makes available the parking
* permit after the sleep. This may be observed as a spurious, but benign,
* wakeup when the thread subsequently attempts to park.
+ *
+ * @param nanos the maximum number of nanoseconds to sleep
+ * @throws InterruptedException if interrupted while sleeping
*/
- private void doSleepNanos(long nanos) throws InterruptedException {
- assert nanos >= 0;
+ void sleepNanos(long nanos) throws InterruptedException {
+ assert Thread.currentThread() == this && nanos >= 0;
if (getAndClearInterrupt())
throw new InterruptedException();
if (nanos == 0) {
tryYield();
} else {
< prev index next >