7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package java.lang;
26
27 import java.util.Locale;
28 import java.util.Objects;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.Executor;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.ForkJoinPool;
33 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
34 import java.util.concurrent.ForkJoinTask;
35 import java.util.concurrent.Future;
36 import java.util.concurrent.RejectedExecutionException;
37 import java.util.concurrent.ScheduledExecutorService;
38 import java.util.concurrent.ScheduledThreadPoolExecutor;
39 import java.util.concurrent.TimeUnit;
40 import jdk.internal.event.VirtualThreadEndEvent;
41 import jdk.internal.event.VirtualThreadStartEvent;
42 import jdk.internal.event.VirtualThreadSubmitFailedEvent;
43 import jdk.internal.misc.CarrierThread;
44 import jdk.internal.misc.InnocuousThread;
45 import jdk.internal.misc.Unsafe;
46 import jdk.internal.vm.Continuation;
47 import jdk.internal.vm.ContinuationScope;
48 import jdk.internal.vm.StackableScope;
49 import jdk.internal.vm.ThreadContainer;
50 import jdk.internal.vm.ThreadContainers;
51 import jdk.internal.vm.annotation.ChangesCurrentThread;
52 import jdk.internal.vm.annotation.Hidden;
53 import jdk.internal.vm.annotation.IntrinsicCandidate;
54 import jdk.internal.vm.annotation.JvmtiHideEvents;
55 import jdk.internal.vm.annotation.JvmtiMountTransition;
56 import jdk.internal.vm.annotation.ReservedStackAccess;
57 import sun.nio.ch.Interruptible;
58 import static java.util.concurrent.TimeUnit.*;
59
60 /**
61 * A thread that is scheduled by the Java virtual machine rather than the operating system.
62 */
63 final class VirtualThread extends BaseVirtualThread {
64 private static final Unsafe U = Unsafe.getUnsafe();
65 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
66 private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
67
68 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
69 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
70 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
71 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
72 private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
73
74 // scheduler and continuation
75 private final Executor scheduler;
76 private final Continuation cont;
77 private final Runnable runContinuation;
78
79 // virtual thread state, accessed by VM
80 private volatile int state;
81
82 /*
83 * Virtual thread state transitions:
84 *
85 * NEW -> STARTED // Thread.start, schedule to run
86 * STARTED -> TERMINATED // failed to start
151 private static final int TERMINATED = 99; // final state
152
153 // can be suspended from scheduling when unmounted
154 private static final int SUSPENDED = 1 << 8;
155
156 // parking permit made available by LockSupport.unpark
157 private volatile boolean parkPermit;
158
159 // blocking permit made available by unblocker thread when another thread exits monitor
160 private volatile boolean blockPermit;
161
162 // true when on the list of virtual threads waiting to be unblocked
163 private volatile boolean onWaitingList;
164
165 // next virtual thread on the list of virtual threads waiting to be unblocked
166 private volatile VirtualThread next;
167
168 // notified by Object.notify/notifyAll while waiting in Object.wait
169 private volatile boolean notified;
170
171 // timed-wait support
172 private byte timedWaitSeqNo;
173
174 // timeout for timed-park and timed-wait, only accessed on current/carrier thread
175 private long timeout;
176
177 // timer task for timed-park and timed-wait, only accessed on current/carrier thread
178 private Future<?> timeoutTask;
179
180 // carrier thread when mounted, accessed by VM
181 private volatile Thread carrierThread;
182
183 // termination object when joining, created lazily if needed
184 private volatile CountDownLatch termination;
185
186 /**
187 * Returns the default scheduler.
188 */
189 static Executor defaultScheduler() {
190 return DEFAULT_SCHEDULER;
582 submitRunContinuation();
583 }
584 return;
585 }
586
587 // blocking on monitorenter
588 if (s == BLOCKING) {
589 setState(BLOCKED);
590
591 // may have been unblocked while blocking
592 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
593 // lazy submit if local queue is empty
594 lazySubmitRunContinuation();
595 }
596 return;
597 }
598
599 // Object.wait
600 if (s == WAITING || s == TIMED_WAITING) {
601 int newState;
602 if (s == WAITING) {
603 setState(newState = WAIT);
604 } else {
605 // For timed-wait, a timeout task is scheduled to execute. The timeout
606 // task will change the thread state to UNBLOCKED and submit the thread
607 // to the scheduler. A sequence number is used to ensure that the timeout
608 // task only unblocks the thread for this timed-wait. We synchronize with
609 // the timeout task to coordinate access to the sequence number and to
610 // ensure the timeout task doesn't execute until the thread has got to
611 // the TIMED_WAIT state.
612 long timeout = this.timeout;
613 assert timeout > 0;
614 synchronized (timedWaitLock()) {
615 byte seqNo = ++timedWaitSeqNo;
616 timeoutTask = schedule(() -> waitTimeoutExpired(seqNo), timeout, MILLISECONDS);
617 setState(newState = TIMED_WAIT);
618 }
619 }
620
621 // may have been notified while in transition to wait state
622 if (notified && compareAndSetState(newState, BLOCKED)) {
623 // may have even been unblocked already
624 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
625 submitRunContinuation();
626 }
627 return;
628 }
629
630 // may have been interrupted while in transition to wait state
631 if (interrupted && compareAndSetState(newState, UNBLOCKED)) {
632 submitRunContinuation();
633 return;
634 }
635 return;
636 }
637
638 assert false;
639 }
640
641 /**
642 * Invoked after the continuation completes.
643 */
644 private void afterDone() {
645 afterDone(true);
646 }
647
648 /**
649 * Invoked after the continuation completes (or start failed). Sets the thread
650 * state to TERMINATED and notifies anyone waiting for the thread to terminate.
651 *
652 * @param notifyContainer true if its container should be notified
653 */
654 private void afterDone(boolean notifyContainer) {
655 assert carrierThread == null;
656 setState(TERMINATED);
657
658 // notify anyone waiting for this virtual thread to terminate
659 CountDownLatch termination = this.termination;
660 if (termination != null) {
661 assert termination.getCount() == 1;
662 termination.countDown();
663 }
664
665 // notify container
666 if (notifyContainer) {
667 threadContainer().onExit(this);
668 }
669
670 // clear references to thread locals
671 clearReferences();
672 }
673
674 /**
675 * Schedules this {@code VirtualThread} to execute.
676 *
677 * @throws IllegalStateException if the container is shutdown or closed
678 * @throws IllegalThreadStateException if the thread has already been started
679 * @throws RejectedExecutionException if the scheduler cannot accept a task
680 */
681 @Override
682 void start(ThreadContainer container) {
683 if (!compareAndSetState(NEW, STARTED)) {
684 throw new IllegalThreadStateException("Already started");
685 }
686
687 // bind thread to container
688 assert threadContainer() == null;
689 setThreadContainer(container);
690
691 // start thread
692 boolean addedToContainer = false;
693 boolean started = false;
694 try {
695 container.onStart(this); // may throw
696 addedToContainer = true;
697
698 // scoped values may be inherited
699 inheritScopedValueBindings(container);
700
701 // submit task to run thread, using externalSubmit if possible
702 externalSubmitRunContinuationOrThrow();
703 started = true;
704 } finally {
705 if (!started) {
706 afterDone(addedToContainer);
707 }
708 }
709 }
710
711 @Override
712 public void start() {
713 start(ThreadContainers.root());
714 }
715
1333 }
1334
1335 /**
1336 * Disallow the current thread be suspended or preempted.
1337 */
1338 private void disableSuspendAndPreempt() {
1339 notifyJvmtiDisableSuspend(true);
1340 Continuation.pin();
1341 }
1342
1343 /**
1344 * Allow the current thread be suspended or preempted.
1345 */
1346 private void enableSuspendAndPreempt() {
1347 Continuation.unpin();
1348 notifyJvmtiDisableSuspend(false);
1349 }
1350
1351 // -- wrappers for get/set of state, parking permit, and carrier thread --
1352
1353 private int state() {
1354 return state; // volatile read
1355 }
1356
1357 private void setState(int newValue) {
1358 state = newValue; // volatile write
1359 }
1360
1361 private boolean compareAndSetState(int expectedValue, int newValue) {
1362 return U.compareAndSetInt(this, STATE, expectedValue, newValue);
1363 }
1364
1365 private boolean compareAndSetOnWaitingList(boolean expectedValue, boolean newValue) {
1366 return U.compareAndSetBoolean(this, ON_WAITING_LIST, expectedValue, newValue);
1367 }
1368
1369 private void setParkPermit(boolean newValue) {
1370 if (parkPermit != newValue) {
1371 parkPermit = newValue;
1372 }
1373 }
1374
1375 private boolean getAndSetParkPermit(boolean newValue) {
1376 if (parkPermit != newValue) {
1377 return U.getAndSetBoolean(this, PARK_PERMIT, newValue);
1378 } else {
1379 return newValue;
1380 }
1381 }
1382
1383 private void setCarrierThread(Thread carrier) {
1384 // U.putReferenceRelease(this, CARRIER_THREAD, carrier);
1385 this.carrierThread = carrier;
1386 }
1387
1388 // -- JVM TI support --
1389
1390 @IntrinsicCandidate
1391 @JvmtiMountTransition
1392 private native void notifyJvmtiStart();
1393
1394 @IntrinsicCandidate
1395 @JvmtiMountTransition
1396 private native void notifyJvmtiEnd();
1397
1398 @IntrinsicCandidate
1399 @JvmtiMountTransition
1400 private native void notifyJvmtiMount(boolean hide);
1401
1402 @IntrinsicCandidate
1403 @JvmtiMountTransition
1404 private native void notifyJvmtiUnmount(boolean hide);
1405
1406 @IntrinsicCandidate
1407 private static native void notifyJvmtiDisableSuspend(boolean enter);
1408
1409 private static native void registerNatives();
1410 static {
1411 registerNatives();
1412
1413 // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
1414 var group = Thread.virtualThreadGroup();
1415 }
1416
1417 /**
1418 * Creates the default ForkJoinPool scheduler.
1419 */
1420 private static ForkJoinPool createDefaultScheduler() {
1421 ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool);
1422 int parallelism, maxPoolSize, minRunnable;
1423 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
1424 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
1425 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
1426 if (parallelismValue != null) {
1427 parallelism = Integer.parseInt(parallelismValue);
1428 } else {
1429 parallelism = Runtime.getRuntime().availableProcessors();
1430 }
1431 if (maxPoolSizeValue != null) {
1432 maxPoolSize = Integer.parseInt(maxPoolSizeValue);
1433 parallelism = Integer.min(parallelism, maxPoolSize);
1434 } else {
1435 maxPoolSize = Integer.max(parallelism, 256);
1436 }
1437 if (minRunnableValue != null) {
1438 minRunnable = Integer.parseInt(minRunnableValue);
1439 } else {
1440 minRunnable = Integer.max(parallelism / 2, 1);
1515 assert changed;
1516 vthread.unblock();
1517
1518 vthread = nextThread;
1519 }
1520 }
1521 }
1522
1523 /**
1524 * Retrieves the list of virtual threads that are waiting to be unblocked, waiting
1525 * if necessary until a list of one or more threads becomes available.
1526 */
1527 private static native VirtualThread takeVirtualThreadListToUnblock();
1528
1529 static {
1530 var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
1531 VirtualThread::unblockVirtualThreads);
1532 unblocker.setDaemon(true);
1533 unblocker.start();
1534 }
1535 }
|
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package java.lang;
26
27 import java.lang.reflect.Constructor;
28 import java.util.Locale;
29 import java.util.Objects;
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.Executor;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.ForkJoinPool;
34 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
35 import java.util.concurrent.ForkJoinTask;
36 import java.util.concurrent.Future;
37 import java.util.concurrent.RejectedExecutionException;
38 import java.util.concurrent.ScheduledExecutorService;
39 import java.util.concurrent.ScheduledThreadPoolExecutor;
40 import java.util.concurrent.TimeUnit;
41 import jdk.internal.event.VirtualThreadEndEvent;
42 import jdk.internal.event.VirtualThreadStartEvent;
43 import jdk.internal.event.VirtualThreadSubmitFailedEvent;
44 import jdk.internal.misc.CarrierThread;
45 import jdk.internal.misc.InnocuousThread;
46 import jdk.internal.misc.Unsafe;
47 import jdk.internal.vm.Continuation;
48 import jdk.internal.vm.ContinuationScope;
49 import jdk.internal.vm.StackableScope;
50 import jdk.internal.vm.ThreadContainer;
51 import jdk.internal.vm.ThreadContainers;
52 import jdk.internal.vm.annotation.ChangesCurrentThread;
53 import jdk.internal.vm.annotation.Hidden;
54 import jdk.internal.vm.annotation.IntrinsicCandidate;
55 import jdk.internal.vm.annotation.JvmtiHideEvents;
56 import jdk.internal.vm.annotation.JvmtiMountTransition;
57 import jdk.internal.vm.annotation.ReservedStackAccess;
58 import sun.nio.ch.Interruptible;
59 import static java.util.concurrent.TimeUnit.*;
60
61 /**
62 * A thread that is scheduled by the Java virtual machine rather than the operating system.
63 */
64 final class VirtualThread extends BaseVirtualThread {
65 private static final Unsafe U = Unsafe.getUnsafe();
66 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
67 private static final Executor DEFAULT_SCHEDULER = createDefaultScheduler();
68
69 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
70 private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
71 private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
72 private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
73 private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
74
75 // scheduler and continuation
76 private final Executor scheduler;
77 private final Continuation cont;
78 private final Runnable runContinuation;
79
80 // virtual thread state, accessed by VM
81 private volatile int state;
82
83 /*
84 * Virtual thread state transitions:
85 *
86 * NEW -> STARTED // Thread.start, schedule to run
87 * STARTED -> TERMINATED // failed to start
152 private static final int TERMINATED = 99; // final state
153
154 // can be suspended from scheduling when unmounted
155 private static final int SUSPENDED = 1 << 8;
156
157 // parking permit made available by LockSupport.unpark
158 private volatile boolean parkPermit;
159
160 // blocking permit made available by unblocker thread when another thread exits monitor
161 private volatile boolean blockPermit;
162
163 // true when on the list of virtual threads waiting to be unblocked
164 private volatile boolean onWaitingList;
165
166 // next virtual thread on the list of virtual threads waiting to be unblocked
167 private volatile VirtualThread next;
168
169 // notified by Object.notify/notifyAll while waiting in Object.wait
170 private volatile boolean notified;
171
172 // true when virtual thread is executing Java level Object.wait, false on VM internal Object.wait
173 private volatile boolean interruptableWait;
174
175 // timed-wait support
176 private byte timedWaitSeqNo;
177
178 // timeout for timed-park and timed-wait, only accessed on current/carrier thread
179 private long timeout;
180
181 // timer task for timed-park and timed-wait, only accessed on current/carrier thread
182 private Future<?> timeoutTask;
183
184 // carrier thread when mounted, accessed by VM
185 private volatile Thread carrierThread;
186
187 // termination object when joining, created lazily if needed
188 private volatile CountDownLatch termination;
189
190 /**
191 * Returns the default scheduler.
192 */
193 static Executor defaultScheduler() {
194 return DEFAULT_SCHEDULER;
586 submitRunContinuation();
587 }
588 return;
589 }
590
591 // blocking on monitorenter
592 if (s == BLOCKING) {
593 setState(BLOCKED);
594
595 // may have been unblocked while blocking
596 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
597 // lazy submit if local queue is empty
598 lazySubmitRunContinuation();
599 }
600 return;
601 }
602
603 // Object.wait
604 if (s == WAITING || s == TIMED_WAITING) {
605 int newState;
606 boolean interruptable = interruptableWait;
607 if (s == WAITING) {
608 setState(newState = WAIT);
609 } else {
610 // For timed-wait, a timeout task is scheduled to execute. The timeout
611 // task will change the thread state to UNBLOCKED and submit the thread
612 // to the scheduler. A sequence number is used to ensure that the timeout
613 // task only unblocks the thread for this timed-wait. We synchronize with
614 // the timeout task to coordinate access to the sequence number and to
615 // ensure the timeout task doesn't execute until the thread has got to
616 // the TIMED_WAIT state.
617 long timeout = this.timeout;
618 assert timeout > 0;
619 synchronized (timedWaitLock()) {
620 byte seqNo = ++timedWaitSeqNo;
621 timeoutTask = schedule(() -> waitTimeoutExpired(seqNo), timeout, MILLISECONDS);
622 setState(newState = TIMED_WAIT);
623 }
624 }
625
626 // may have been notified while in transition to wait state
627 if (notified && compareAndSetState(newState, BLOCKED)) {
628 // may have even been unblocked already
629 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
630 submitRunContinuation();
631 }
632 return;
633 }
634
635 // may have been interrupted while in transition to wait state
636 if (interruptable && interrupted && compareAndSetState(newState, UNBLOCKED)) {
637 submitRunContinuation();
638 return;
639 }
640 return;
641 }
642
643 assert false;
644 }
645
646 /**
647 * Invoked after the continuation completes.
648 */
649 private void afterDone() {
650 afterDone(true);
651 }
652
653 /**
654 * Invoked after the continuation completes (or start failed). Sets the thread
655 * state to TERMINATED and notifies anyone waiting for the thread to terminate.
656 *
657 * @param notifyContainer true if its container should be notified
658 */
659 private void afterDone(boolean notifyContainer) {
660 assert carrierThread == null;
661 setState(TERMINATED);
662
663 // notify anyone waiting for this virtual thread to terminate
664 CountDownLatch termination = this.termination;
665 if (termination != null) {
666 assert termination.getCount() == 1;
667 termination.countDown();
668 }
669
670 // notify container
671 if (notifyContainer) {
672 threadContainer().remove(this);
673 }
674
675 // clear references to thread locals
676 clearReferences();
677 }
678
679 /**
680 * Schedules this {@code VirtualThread} to execute.
681 *
682 * @throws IllegalStateException if the container is shutdown or closed
683 * @throws IllegalThreadStateException if the thread has already been started
684 * @throws RejectedExecutionException if the scheduler cannot accept a task
685 */
686 @Override
687 void start(ThreadContainer container) {
688 if (!compareAndSetState(NEW, STARTED)) {
689 throw new IllegalThreadStateException("Already started");
690 }
691
692 // bind thread to container
693 assert threadContainer() == null;
694 setThreadContainer(container);
695
696 // start thread
697 boolean addedToContainer = false;
698 boolean started = false;
699 try {
700 container.add(this); // may throw
701 addedToContainer = true;
702
703 // scoped values may be inherited
704 inheritScopedValueBindings(container);
705
706 // submit task to run thread, using externalSubmit if possible
707 externalSubmitRunContinuationOrThrow();
708 started = true;
709 } finally {
710 if (!started) {
711 afterDone(addedToContainer);
712 }
713 }
714 }
715
716 @Override
717 public void start() {
718 start(ThreadContainers.root());
719 }
720
1338 }
1339
1340 /**
1341 * Disallow the current thread be suspended or preempted.
1342 */
1343 private void disableSuspendAndPreempt() {
1344 notifyJvmtiDisableSuspend(true);
1345 Continuation.pin();
1346 }
1347
1348 /**
1349 * Allow the current thread be suspended or preempted.
1350 */
1351 private void enableSuspendAndPreempt() {
1352 Continuation.unpin();
1353 notifyJvmtiDisableSuspend(false);
1354 }
1355
1356 // -- wrappers for get/set of state, parking permit, and carrier thread --
1357
1358 int state() {
1359 return state; // volatile read
1360 }
1361
1362 private void setState(int newValue) {
1363 state = newValue; // volatile write
1364 }
1365
1366 private boolean compareAndSetState(int expectedValue, int newValue) {
1367 return U.compareAndSetInt(this, STATE, expectedValue, newValue);
1368 }
1369
1370 private boolean compareAndSetOnWaitingList(boolean expectedValue, boolean newValue) {
1371 return U.compareAndSetBoolean(this, ON_WAITING_LIST, expectedValue, newValue);
1372 }
1373
1374 private void setParkPermit(boolean newValue) {
1375 if (parkPermit != newValue) {
1376 parkPermit = newValue;
1377 }
1378 }
1379
1380 private boolean getAndSetParkPermit(boolean newValue) {
1381 if (parkPermit != newValue) {
1382 return U.getAndSetBoolean(this, PARK_PERMIT, newValue);
1383 } else {
1384 return newValue;
1385 }
1386 }
1387
1388 Thread carrierThread() {
1389 return carrierThread;
1390 }
1391
1392 private void setCarrierThread(Thread carrier) {
1393 // U.putReferenceRelease(this, CARRIER_THREAD, carrier);
1394 this.carrierThread = carrier;
1395 }
1396
1397 // -- JVM TI support --
1398
1399 @IntrinsicCandidate
1400 @JvmtiMountTransition
1401 private native void notifyJvmtiStart();
1402
1403 @IntrinsicCandidate
1404 @JvmtiMountTransition
1405 private native void notifyJvmtiEnd();
1406
1407 @IntrinsicCandidate
1408 @JvmtiMountTransition
1409 private native void notifyJvmtiMount(boolean hide);
1410
1411 @IntrinsicCandidate
1412 @JvmtiMountTransition
1413 private native void notifyJvmtiUnmount(boolean hide);
1414
1415 @IntrinsicCandidate
1416 private static native void notifyJvmtiDisableSuspend(boolean enter);
1417
1418 private static native void registerNatives();
1419 static {
1420 registerNatives();
1421
1422 // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
1423 var group = Thread.virtualThreadGroup();
1424 }
1425
1426 /**
1427 * Creates the default scheduler.
1428 * If the system property {@code jdk.virtualThreadScheduler.implClass} is set then
1429 * its value is the name of a class that implements java.util.concurrent.Executor.
1430 * The class is public in an exported package, has a public no-arg constructor,
1431 * and is visible to the system class loader.
1432 * If the system property is not set then the default scheduler will be a
1433 * ForkJoinPool instance.
1434 */
1435 private static Executor createDefaultScheduler() {
1436 String propValue = System.getProperty("jdk.virtualThreadScheduler.implClass");
1437 if (propValue != null) {
1438 try {
1439 Class<?> clazz = Class.forName(propValue, true,
1440 ClassLoader.getSystemClassLoader());
1441 Constructor<?> ctor = clazz.getConstructor();
1442 var scheduler = (Executor) ctor.newInstance();
1443 System.err.println("""
1444 WARNING: Using custom scheduler, this is an experimental feature.""");
1445 return scheduler;
1446 } catch (Exception ex) {
1447 throw new Error(ex);
1448 }
1449 } else {
1450 return createDefaultForkJoinPoolScheduler();
1451 }
1452 }
1453
1454 /**
1455 * Creates the default ForkJoinPool scheduler.
1456 */
1457 private static ForkJoinPool createDefaultForkJoinPoolScheduler() {
1458 ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool);
1459 int parallelism, maxPoolSize, minRunnable;
1460 String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
1461 String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
1462 String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
1463 if (parallelismValue != null) {
1464 parallelism = Integer.parseInt(parallelismValue);
1465 } else {
1466 parallelism = Runtime.getRuntime().availableProcessors();
1467 }
1468 if (maxPoolSizeValue != null) {
1469 maxPoolSize = Integer.parseInt(maxPoolSizeValue);
1470 parallelism = Integer.min(parallelism, maxPoolSize);
1471 } else {
1472 maxPoolSize = Integer.max(parallelism, 256);
1473 }
1474 if (minRunnableValue != null) {
1475 minRunnable = Integer.parseInt(minRunnableValue);
1476 } else {
1477 minRunnable = Integer.max(parallelism / 2, 1);
1552 assert changed;
1553 vthread.unblock();
1554
1555 vthread = nextThread;
1556 }
1557 }
1558 }
1559
1560 /**
1561 * Retrieves the list of virtual threads that are waiting to be unblocked, waiting
1562 * if necessary until a list of one or more threads becomes available.
1563 */
1564 private static native VirtualThread takeVirtualThreadListToUnblock();
1565
1566 static {
1567 var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
1568 VirtualThread::unblockVirtualThreads);
1569 unblocker.setDaemon(true);
1570 unblocker.start();
1571 }
1572 }
|