7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package java.lang;
26
27 import java.util.Locale;
28 import java.util.Objects;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.Executor;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.ForkJoinPool;
33 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
34 import java.util.concurrent.ForkJoinTask;
35 import java.util.concurrent.ForkJoinWorkerThread;
36 import java.util.concurrent.Future;
37 import java.util.concurrent.RejectedExecutionException;
38 import java.util.concurrent.ScheduledExecutorService;
39 import java.util.concurrent.ScheduledThreadPoolExecutor;
40 import java.util.concurrent.TimeUnit;
41 import jdk.internal.event.VirtualThreadEndEvent;
42 import jdk.internal.event.VirtualThreadStartEvent;
43 import jdk.internal.event.VirtualThreadSubmitFailedEvent;
44 import jdk.internal.misc.CarrierThread;
45 import jdk.internal.misc.InnocuousThread;
46 import jdk.internal.misc.Unsafe;
47 import jdk.internal.vm.Continuation;
48 import jdk.internal.vm.ContinuationScope;
49 import jdk.internal.vm.StackableScope;
50 import jdk.internal.vm.ThreadContainer;
51 import jdk.internal.vm.ThreadContainers;
52 import jdk.internal.vm.annotation.ChangesCurrentThread;
53 import jdk.internal.vm.annotation.Hidden;
54 import jdk.internal.vm.annotation.IntrinsicCandidate;
55 import jdk.internal.vm.annotation.JvmtiHideEvents;
56 import jdk.internal.vm.annotation.JvmtiMountTransition;
57 import jdk.internal.vm.annotation.ReservedStackAccess;
58 import sun.nio.ch.Interruptible;
59 import static java.util.concurrent.TimeUnit.*;
60
61 /**
62 * A thread that is scheduled by the Java virtual machine rather than the operating system.
63 */
64 final class VirtualThread extends BaseVirtualThread {
65 private static final Unsafe U = Unsafe.getUnsafe();
66 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
67 private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
68 private static final ScheduledExecutorService[] DELAYED_TASK_SCHEDULERS = createDelayedTaskSchedulers();
69
70 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
71 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
72 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
73 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
74 private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
75
76 // scheduler and continuation
77 private final Executor scheduler;
78 private final Continuation cont;
79 private final Runnable runContinuation;
80
81 // virtual thread state, accessed by VM
82 private volatile int state;
83
84 /*
85 * Virtual thread state transitions:
86 *
87 * NEW -> STARTED // Thread.start, schedule to run
175
176 // timeout for timed-park and timed-wait, only accessed on current/carrier thread
177 private long timeout;
178
179 // timer task for timed-park and timed-wait, only accessed on current/carrier thread
180 private Future<?> timeoutTask;
181
182 // carrier thread when mounted, accessed by VM
183 private volatile Thread carrierThread;
184
185 // termination object when joining, created lazily if needed
186 private volatile CountDownLatch termination;
187
188 /**
189 * Returns the default scheduler.
190 */
191 static Executor defaultScheduler() {
192 return DEFAULT_SCHEDULER;
193 }
194
195 /**
196 * Returns the continuation scope used for virtual threads.
197 */
198 static ContinuationScope continuationScope() {
199 return VTHREAD_SCOPE;
200 }
201
202 /**
203 * Creates a new {@code VirtualThread} to run the given task with the given
204 * scheduler. If the given scheduler is {@code null} and the current thread
205 * is a platform thread then the newly created virtual thread will use the
206 * default scheduler. If given scheduler is {@code null} and the current
207 * thread is a virtual thread then the current thread's scheduler is used.
208 *
209 * @param scheduler the scheduler or null
210 * @param name thread name
211 * @param characteristics characteristics
212 * @param task the task to execute
213 */
214 VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
1385
1386 @IntrinsicCandidate
1387 @JvmtiMountTransition
1388 private native void notifyJvmtiMount(boolean hide);
1389
1390 @IntrinsicCandidate
1391 @JvmtiMountTransition
1392 private native void notifyJvmtiUnmount(boolean hide);
1393
1394 @IntrinsicCandidate
1395 private static native void notifyJvmtiDisableSuspend(boolean enter);
1396
1397 private static native void registerNatives();
1398 static {
1399 registerNatives();
1400
1401 // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
1402 var group = Thread.virtualThreadGroup();
1403 }
1404
1405 /**
1406 * Creates the default ForkJoinPool scheduler.
1407 */
1408 private static ForkJoinPool createDefaultScheduler() {
1409 ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool);
1410 int parallelism, maxPoolSize, minRunnable;
1411 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
1412 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
1413 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
1414 if (parallelismValue != null) {
1415 parallelism = Integer.parseInt(parallelismValue);
1416 } else {
1417 parallelism = Runtime.getRuntime().availableProcessors();
1418 }
1419 if (maxPoolSizeValue != null) {
1420 maxPoolSize = Integer.parseInt(maxPoolSizeValue);
1421 parallelism = Integer.min(parallelism, maxPoolSize);
1422 } else {
1423 maxPoolSize = Integer.max(parallelism, 256);
1424 }
1425 if (minRunnableValue != null) {
1426 minRunnable = Integer.parseInt(minRunnableValue);
1427 } else {
1428 minRunnable = Integer.max(parallelism / 2, 1);
|
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package java.lang;
26
27 import java.lang.reflect.Constructor;
28 import java.util.Arrays;
29 import java.util.Locale;
30 import java.util.Objects;
31 import java.util.concurrent.CountDownLatch;
32 import java.util.concurrent.Executor;
33 import java.util.concurrent.Executors;
34 import java.util.concurrent.ForkJoinPool;
35 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
36 import java.util.concurrent.ForkJoinTask;
37 import java.util.concurrent.Future;
38 import java.util.concurrent.RejectedExecutionException;
39 import java.util.concurrent.ScheduledExecutorService;
40 import java.util.concurrent.ScheduledThreadPoolExecutor;
41 import java.util.concurrent.TimeUnit;
42 import java.util.stream.Stream;
43 import jdk.internal.event.VirtualThreadEndEvent;
44 import jdk.internal.event.VirtualThreadStartEvent;
45 import jdk.internal.event.VirtualThreadSubmitFailedEvent;
46 import jdk.internal.misc.CarrierThread;
47 import jdk.internal.misc.InnocuousThread;
48 import jdk.internal.misc.Unsafe;
49 import jdk.internal.vm.Continuation;
50 import jdk.internal.vm.ContinuationScope;
51 import jdk.internal.vm.StackableScope;
52 import jdk.internal.vm.ThreadContainer;
53 import jdk.internal.vm.ThreadContainers;
54 import jdk.internal.vm.annotation.ChangesCurrentThread;
55 import jdk.internal.vm.annotation.Hidden;
56 import jdk.internal.vm.annotation.IntrinsicCandidate;
57 import jdk.internal.vm.annotation.JvmtiHideEvents;
58 import jdk.internal.vm.annotation.JvmtiMountTransition;
59 import jdk.internal.vm.annotation.ReservedStackAccess;
60 import sun.nio.ch.Interruptible;
61 import static java.util.concurrent.TimeUnit.*;
62
63 /**
64 * A thread that is scheduled by the Java virtual machine rather than the operating system.
65 */
66 final class VirtualThread extends BaseVirtualThread {
67 private static final Unsafe U = Unsafe.getUnsafe();
68 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
69 private static final Executor DEFAULT_SCHEDULER = createDefaultScheduler();
70 private static final ScheduledExecutorService[] DELAYED_TASK_SCHEDULERS = createDelayedTaskSchedulers();
71
72 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
73 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
74 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
75 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
76 private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
77
78 // scheduler and continuation
79 private final Executor scheduler;
80 private final Continuation cont;
81 private final Runnable runContinuation;
82
83 // virtual thread state, accessed by VM
84 private volatile int state;
85
86 /*
87 * Virtual thread state transitions:
88 *
89 * NEW -> STARTED // Thread.start, schedule to run
177
178 // timeout for timed-park and timed-wait, only accessed on current/carrier thread
179 private long timeout;
180
181 // timer task for timed-park and timed-wait, only accessed on current/carrier thread
182 private Future<?> timeoutTask;
183
184 // carrier thread when mounted, accessed by VM
185 private volatile Thread carrierThread;
186
187 // termination object when joining, created lazily if needed
188 private volatile CountDownLatch termination;
189
190 /**
191 * Returns the default scheduler.
192 */
193 static Executor defaultScheduler() {
194 return DEFAULT_SCHEDULER;
195 }
196
197 /**
198 * Returns a stream of the delayed task schedulers used to support timed operations.
199 */
200 static Stream<ScheduledExecutorService> delayedTaskSchedulers() {
201 return Arrays.stream(DELAYED_TASK_SCHEDULERS);
202 }
203
204 /**
205 * Returns the continuation scope used for virtual threads.
206 */
207 static ContinuationScope continuationScope() {
208 return VTHREAD_SCOPE;
209 }
210
211 /**
212 * Creates a new {@code VirtualThread} to run the given task with the given
213 * scheduler. If the given scheduler is {@code null} and the current thread
214 * is a platform thread then the newly created virtual thread will use the
215 * default scheduler. If given scheduler is {@code null} and the current
216 * thread is a virtual thread then the current thread's scheduler is used.
217 *
218 * @param scheduler the scheduler or null
219 * @param name thread name
220 * @param characteristics characteristics
221 * @param task the task to execute
222 */
223 VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
1394
1395 @IntrinsicCandidate
1396 @JvmtiMountTransition
1397 private native void notifyJvmtiMount(boolean hide);
1398
1399 @IntrinsicCandidate
1400 @JvmtiMountTransition
1401 private native void notifyJvmtiUnmount(boolean hide);
1402
1403 @IntrinsicCandidate
1404 private static native void notifyJvmtiDisableSuspend(boolean enter);
1405
1406 private static native void registerNatives();
1407 static {
1408 registerNatives();
1409
1410 // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
1411 var group = Thread.virtualThreadGroup();
1412 }
1413
1414 /**
1415 * Creates the default scheduler.
1416 * If the system property {@code jdk.virtualThreadScheduler.implClass} is set then
1417 * its value is the name of a class that implements java.util.concurrent.Executor.
1418 * The class is public in an exported package, has a public no-arg constructor,
1419 * and is visible to the system class loader.
1420 * If the system property is not set then the default scheduler will be a
1421 * ForkJoinPool instance.
1422 */
1423 private static Executor createDefaultScheduler() {
1424 String propValue = System.getProperty("jdk.virtualThreadScheduler.implClass");
1425 if (propValue != null) {
1426 try {
1427 Class<?> clazz = Class.forName(propValue, true,
1428 ClassLoader.getSystemClassLoader());
1429 Constructor<?> ctor = clazz.getConstructor();
1430 var scheduler = (Executor) ctor.newInstance();
1431 System.err.println("""
1432 WARNING: Using custom scheduler, this is an experimental feature.""");
1433 return scheduler;
1434 } catch (Exception ex) {
1435 throw new Error(ex);
1436 }
1437 } else {
1438 return createDefaultForkJoinPoolScheduler();
1439 }
1440 }
1441
1442 /**
1443 * Creates the default ForkJoinPool scheduler.
1444 */
1445 private static ForkJoinPool createDefaultForkJoinPoolScheduler() {
1446 ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool);
1447 int parallelism, maxPoolSize, minRunnable;
1448 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
1449 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
1450 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
1451 if (parallelismValue != null) {
1452 parallelism = Integer.parseInt(parallelismValue);
1453 } else {
1454 parallelism = Runtime.getRuntime().availableProcessors();
1455 }
1456 if (maxPoolSizeValue != null) {
1457 maxPoolSize = Integer.parseInt(maxPoolSizeValue);
1458 parallelism = Integer.min(parallelism, maxPoolSize);
1459 } else {
1460 maxPoolSize = Integer.max(parallelism, 256);
1461 }
1462 if (minRunnableValue != null) {
1463 minRunnable = Integer.parseInt(minRunnableValue);
1464 } else {
1465 minRunnable = Integer.max(parallelism / 2, 1);
|