< prev index next >

src/java.base/share/classes/java/lang/VirtualThread.java

Print this page

   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.lang;
  26 

  27 import java.util.Locale;
  28 import java.util.Objects;
  29 import java.util.concurrent.CountDownLatch;
  30 import java.util.concurrent.Executor;
  31 import java.util.concurrent.Executors;
  32 import java.util.concurrent.ForkJoinPool;
  33 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
  34 import java.util.concurrent.ForkJoinTask;
  35 import java.util.concurrent.Future;
  36 import java.util.concurrent.RejectedExecutionException;
  37 import java.util.concurrent.ScheduledExecutorService;
  38 import java.util.concurrent.ScheduledThreadPoolExecutor;
  39 import java.util.concurrent.TimeUnit;
  40 import jdk.internal.event.VirtualThreadEndEvent;
  41 import jdk.internal.event.VirtualThreadStartEvent;
  42 import jdk.internal.event.VirtualThreadSubmitFailedEvent;
  43 import jdk.internal.misc.CarrierThread;
  44 import jdk.internal.misc.InnocuousThread;
  45 import jdk.internal.misc.Unsafe;
  46 import jdk.internal.vm.Continuation;
  47 import jdk.internal.vm.ContinuationScope;
  48 import jdk.internal.vm.StackableScope;
  49 import jdk.internal.vm.ThreadContainer;
  50 import jdk.internal.vm.ThreadContainers;
  51 import jdk.internal.vm.annotation.ChangesCurrentThread;
  52 import jdk.internal.vm.annotation.Hidden;
  53 import jdk.internal.vm.annotation.IntrinsicCandidate;
  54 import jdk.internal.vm.annotation.JvmtiHideEvents;
  55 import jdk.internal.vm.annotation.JvmtiMountTransition;
  56 import jdk.internal.vm.annotation.ReservedStackAccess;
  57 import sun.nio.ch.Interruptible;
  58 import static java.util.concurrent.TimeUnit.*;
  59 
  60 /**
  61  * A thread that is scheduled by the Java virtual machine rather than the operating system.
  62  */
  63 final class VirtualThread extends BaseVirtualThread {
  64     private static final Unsafe U = Unsafe.getUnsafe();
  65     private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
  66     private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
  67 
  68     private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
  69     private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
  70     private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
  71     private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
  72     private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
  73 
  74     // scheduler and continuation
  75     private final Executor scheduler;
  76     private final Continuation cont;
  77     private final Runnable runContinuation;
  78 
  79     // virtual thread state, accessed by VM
  80     private volatile int state;
  81 
  82     /*
  83      * Virtual thread state transitions:
  84      *
  85      *      NEW -> STARTED         // Thread.start, schedule to run
  86      *  STARTED -> TERMINATED      // failed to start

 151     private static final int TERMINATED = 99;  // final state
 152 
 153     // can be suspended from scheduling when unmounted
 154     private static final int SUSPENDED = 1 << 8;
 155 
 156     // parking permit made available by LockSupport.unpark
 157     private volatile boolean parkPermit;
 158 
 159     // blocking permit made available by unblocker thread when another thread exits monitor
 160     private volatile boolean blockPermit;
 161 
 162     // true when on the list of virtual threads waiting to be unblocked
 163     private volatile boolean onWaitingList;
 164 
 165     // next virtual thread on the list of virtual threads waiting to be unblocked
 166     private volatile VirtualThread next;
 167 
 168     // notified by Object.notify/notifyAll while waiting in Object.wait
 169     private volatile boolean notified;
 170 



 171     // timed-wait support
 172     private byte timedWaitSeqNo;
 173 
 174     // timeout for timed-park and timed-wait, only accessed on current/carrier thread
 175     private long timeout;
 176 
 177     // timer task for timed-park and timed-wait, only accessed on current/carrier thread
 178     private Future<?> timeoutTask;
 179 
 180     // carrier thread when mounted, accessed by VM
 181     private volatile Thread carrierThread;
 182 
 183     // termination object when joining, created lazily if needed
 184     private volatile CountDownLatch termination;
 185 
 186     /**
 187      * Returns the default scheduler.
 188      */
 189     static Executor defaultScheduler() {
 190         return DEFAULT_SCHEDULER;

 582                 submitRunContinuation();
 583             }
 584             return;
 585         }
 586 
 587         // blocking on monitorenter
 588         if (s == BLOCKING) {
 589             setState(BLOCKED);
 590 
 591             // may have been unblocked while blocking
 592             if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
 593                 // lazy submit if local queue is empty
 594                 lazySubmitRunContinuation();
 595             }
 596             return;
 597         }
 598 
 599         // Object.wait
 600         if (s == WAITING || s == TIMED_WAITING) {
 601             int newState;

 602             if (s == WAITING) {
 603                 setState(newState = WAIT);
 604             } else {
 605                 // For timed-wait, a timeout task is scheduled to execute. The timeout
 606                 // task will change the thread state to UNBLOCKED and submit the thread
 607                 // to the scheduler. A sequence number is used to ensure that the timeout
 608                 // task only unblocks the thread for this timed-wait. We synchronize with
 609                 // the timeout task to coordinate access to the sequence number and to
 610                 // ensure the timeout task doesn't execute until the thread has got to
 611                 // the TIMED_WAIT state.
 612                 long timeout = this.timeout;
 613                 assert timeout > 0;
 614                 synchronized (timedWaitLock()) {
 615                     byte seqNo = ++timedWaitSeqNo;
 616                     timeoutTask = schedule(() -> waitTimeoutExpired(seqNo), timeout, MILLISECONDS);
 617                     setState(newState = TIMED_WAIT);
 618                 }
 619             }
 620 
 621             // may have been notified while in transition to wait state
 622             if (notified && compareAndSetState(newState, BLOCKED)) {
 623                 // may have even been unblocked already
 624                 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
 625                     submitRunContinuation();
 626                 }
 627                 return;
 628             }
 629 
 630             // may have been interrupted while in transition to wait state
 631             if (interrupted && compareAndSetState(newState, UNBLOCKED)) {
 632                 submitRunContinuation();
 633                 return;
 634             }
 635             return;
 636         }
 637 
 638         assert false;
 639     }
 640 
 641     /**
 642      * Invoked after the continuation completes.
 643      */
 644     private void afterDone() {
 645         afterDone(true);
 646     }
 647 
 648     /**
 649      * Invoked after the continuation completes (or start failed). Sets the thread
 650      * state to TERMINATED and notifies anyone waiting for the thread to terminate.
 651      *
 652      * @param notifyContainer true if its container should be notified
 653      */
 654     private void afterDone(boolean notifyContainer) {
 655         assert carrierThread == null;
 656         setState(TERMINATED);
 657 
 658         // notify anyone waiting for this virtual thread to terminate
 659         CountDownLatch termination = this.termination;
 660         if (termination != null) {
 661             assert termination.getCount() == 1;
 662             termination.countDown();
 663         }
 664 
 665         // notify container
 666         if (notifyContainer) {
 667             threadContainer().onExit(this);
 668         }
 669 
 670         // clear references to thread locals
 671         clearReferences();
 672     }
 673 
 674     /**
 675      * Schedules this {@code VirtualThread} to execute.
 676      *
 677      * @throws IllegalStateException if the container is shutdown or closed
 678      * @throws IllegalThreadStateException if the thread has already been started
 679      * @throws RejectedExecutionException if the scheduler cannot accept a task
 680      */
 681     @Override
 682     void start(ThreadContainer container) {
 683         if (!compareAndSetState(NEW, STARTED)) {
 684             throw new IllegalThreadStateException("Already started");
 685         }
 686 
 687         // bind thread to container
 688         assert threadContainer() == null;
 689         setThreadContainer(container);
 690 
 691         // start thread
 692         boolean addedToContainer = false;
 693         boolean started = false;
 694         try {
 695             container.onStart(this);  // may throw
 696             addedToContainer = true;
 697 
 698             // scoped values may be inherited
 699             inheritScopedValueBindings(container);
 700 
 701             // submit task to run thread, using externalSubmit if possible
 702             externalSubmitRunContinuationOrThrow();
 703             started = true;
 704         } finally {
 705             if (!started) {
 706                 afterDone(addedToContainer);
 707             }
 708         }
 709     }
 710 
 711     @Override
 712     public void start() {
 713         start(ThreadContainers.root());
 714     }
 715 

1333     }
1334 
1335     /**
1336      * Disallow the current thread be suspended or preempted.
1337      */
1338     private void disableSuspendAndPreempt() {
1339         notifyJvmtiDisableSuspend(true);
1340         Continuation.pin();
1341     }
1342 
1343     /**
1344      * Allow the current thread be suspended or preempted.
1345      */
1346     private void enableSuspendAndPreempt() {
1347         Continuation.unpin();
1348         notifyJvmtiDisableSuspend(false);
1349     }
1350 
1351     // -- wrappers for get/set of state, parking permit, and carrier thread --
1352 
1353     private int state() {
1354         return state;  // volatile read
1355     }
1356 
1357     private void setState(int newValue) {
1358         state = newValue;  // volatile write
1359     }
1360 
1361     private boolean compareAndSetState(int expectedValue, int newValue) {
1362         return U.compareAndSetInt(this, STATE, expectedValue, newValue);
1363     }
1364 
1365     private boolean compareAndSetOnWaitingList(boolean expectedValue, boolean newValue) {
1366         return U.compareAndSetBoolean(this, ON_WAITING_LIST, expectedValue, newValue);
1367     }
1368 
1369     private void setParkPermit(boolean newValue) {
1370         if (parkPermit != newValue) {
1371             parkPermit = newValue;
1372         }
1373     }
1374 
1375     private boolean getAndSetParkPermit(boolean newValue) {
1376         if (parkPermit != newValue) {
1377             return U.getAndSetBoolean(this, PARK_PERMIT, newValue);
1378         } else {
1379             return newValue;
1380         }
1381     }
1382 




1383     private void setCarrierThread(Thread carrier) {
1384         // U.putReferenceRelease(this, CARRIER_THREAD, carrier);
1385         this.carrierThread = carrier;
1386     }
1387 
1388     // -- JVM TI support --
1389 
1390     @IntrinsicCandidate
1391     @JvmtiMountTransition
1392     private native void notifyJvmtiStart();
1393 
1394     @IntrinsicCandidate
1395     @JvmtiMountTransition
1396     private native void notifyJvmtiEnd();
1397 
1398     @IntrinsicCandidate
1399     @JvmtiMountTransition
1400     private native void notifyJvmtiMount(boolean hide);
1401 
1402     @IntrinsicCandidate
1403     @JvmtiMountTransition
1404     private native void notifyJvmtiUnmount(boolean hide);
1405 
1406     @IntrinsicCandidate
1407     private static native void notifyJvmtiDisableSuspend(boolean enter);
1408 
1409     private static native void registerNatives();
1410     static {
1411         registerNatives();
1412 
1413         // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
1414         var group = Thread.virtualThreadGroup();
1415     }
1416 




























1417     /**
1418      * Creates the default ForkJoinPool scheduler.
1419      */
1420     private static ForkJoinPool createDefaultScheduler() {
1421         ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool);
1422         int parallelism, maxPoolSize, minRunnable;
1423         String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
1424         String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
1425         String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
1426         if (parallelismValue != null) {
1427             parallelism = Integer.parseInt(parallelismValue);
1428         } else {
1429             parallelism = Runtime.getRuntime().availableProcessors();
1430         }
1431         if (maxPoolSizeValue != null) {
1432             maxPoolSize = Integer.parseInt(maxPoolSizeValue);
1433             parallelism = Integer.min(parallelism, maxPoolSize);
1434         } else {
1435             maxPoolSize = Integer.max(parallelism, 256);
1436         }
1437         if (minRunnableValue != null) {
1438             minRunnable = Integer.parseInt(minRunnableValue);
1439         } else {
1440             minRunnable = Integer.max(parallelism / 2, 1);

1515                 assert changed;
1516                 vthread.unblock();
1517 
1518                 vthread = nextThread;
1519             }
1520         }
1521     }
1522 
1523     /**
1524      * Retrieves the list of virtual threads that are waiting to be unblocked, waiting
1525      * if necessary until a list of one or more threads becomes available.
1526      */
1527     private static native VirtualThread takeVirtualThreadListToUnblock();
1528 
1529     static {
1530         var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
1531                 VirtualThread::unblockVirtualThreads);
1532         unblocker.setDaemon(true);
1533         unblocker.start();
1534     }
1535 }

   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.lang;
  26 
  27 import java.lang.reflect.Constructor;
  28 import java.util.Locale;
  29 import java.util.Objects;
  30 import java.util.concurrent.CountDownLatch;
  31 import java.util.concurrent.Executor;
  32 import java.util.concurrent.Executors;
  33 import java.util.concurrent.ForkJoinPool;
  34 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
  35 import java.util.concurrent.ForkJoinTask;
  36 import java.util.concurrent.Future;
  37 import java.util.concurrent.RejectedExecutionException;
  38 import java.util.concurrent.ScheduledExecutorService;
  39 import java.util.concurrent.ScheduledThreadPoolExecutor;
  40 import java.util.concurrent.TimeUnit;
  41 import jdk.internal.event.VirtualThreadEndEvent;
  42 import jdk.internal.event.VirtualThreadStartEvent;
  43 import jdk.internal.event.VirtualThreadSubmitFailedEvent;
  44 import jdk.internal.misc.CarrierThread;
  45 import jdk.internal.misc.InnocuousThread;
  46 import jdk.internal.misc.Unsafe;
  47 import jdk.internal.vm.Continuation;
  48 import jdk.internal.vm.ContinuationScope;
  49 import jdk.internal.vm.StackableScope;
  50 import jdk.internal.vm.ThreadContainer;
  51 import jdk.internal.vm.ThreadContainers;
  52 import jdk.internal.vm.annotation.ChangesCurrentThread;
  53 import jdk.internal.vm.annotation.Hidden;
  54 import jdk.internal.vm.annotation.IntrinsicCandidate;
  55 import jdk.internal.vm.annotation.JvmtiHideEvents;
  56 import jdk.internal.vm.annotation.JvmtiMountTransition;
  57 import jdk.internal.vm.annotation.ReservedStackAccess;
  58 import sun.nio.ch.Interruptible;
  59 import static java.util.concurrent.TimeUnit.*;
  60 
  61 /**
  62  * A thread that is scheduled by the Java virtual machine rather than the operating system.
  63  */
  64 final class VirtualThread extends BaseVirtualThread {
  65     private static final Unsafe U = Unsafe.getUnsafe();
  66     private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
  67     private static final Executor DEFAULT_SCHEDULER = createDefaultScheduler();
  68 
  69     private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
  70     private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
  71     private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
  72     private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
  73     private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
  74 
  75     // scheduler and continuation
  76     private final Executor scheduler;
  77     private final Continuation cont;
  78     private final Runnable runContinuation;
  79 
  80     // virtual thread state, accessed by VM
  81     private volatile int state;
  82 
  83     /*
  84      * Virtual thread state transitions:
  85      *
  86      *      NEW -> STARTED         // Thread.start, schedule to run
  87      *  STARTED -> TERMINATED      // failed to start

 152     private static final int TERMINATED = 99;  // final state
 153 
 154     // can be suspended from scheduling when unmounted
 155     private static final int SUSPENDED = 1 << 8;
 156 
 157     // parking permit made available by LockSupport.unpark
 158     private volatile boolean parkPermit;
 159 
 160     // blocking permit made available by unblocker thread when another thread exits monitor
 161     private volatile boolean blockPermit;
 162 
 163     // true when on the list of virtual threads waiting to be unblocked
 164     private volatile boolean onWaitingList;
 165 
 166     // next virtual thread on the list of virtual threads waiting to be unblocked
 167     private volatile VirtualThread next;
 168 
 169     // notified by Object.notify/notifyAll while waiting in Object.wait
 170     private volatile boolean notified;
 171 
 172     // true when virtual thread is executing Java level Object.wait, false on VM internal Object.wait
 173     private volatile boolean interruptableWait;
 174 
 175     // timed-wait support
 176     private byte timedWaitSeqNo;
 177 
 178     // timeout for timed-park and timed-wait, only accessed on current/carrier thread
 179     private long timeout;
 180 
 181     // timer task for timed-park and timed-wait, only accessed on current/carrier thread
 182     private Future<?> timeoutTask;
 183 
 184     // carrier thread when mounted, accessed by VM
 185     private volatile Thread carrierThread;
 186 
 187     // termination object when joining, created lazily if needed
 188     private volatile CountDownLatch termination;
 189 
 190     /**
 191      * Returns the default scheduler.
 192      */
 193     static Executor defaultScheduler() {
 194         return DEFAULT_SCHEDULER;

 586                 submitRunContinuation();
 587             }
 588             return;
 589         }
 590 
 591         // blocking on monitorenter
 592         if (s == BLOCKING) {
 593             setState(BLOCKED);
 594 
 595             // may have been unblocked while blocking
 596             if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
 597                 // lazy submit if local queue is empty
 598                 lazySubmitRunContinuation();
 599             }
 600             return;
 601         }
 602 
 603         // Object.wait
 604         if (s == WAITING || s == TIMED_WAITING) {
 605             int newState;
 606             boolean interruptable = interruptableWait;
 607             if (s == WAITING) {
 608                 setState(newState = WAIT);
 609             } else {
 610                 // For timed-wait, a timeout task is scheduled to execute. The timeout
 611                 // task will change the thread state to UNBLOCKED and submit the thread
 612                 // to the scheduler. A sequence number is used to ensure that the timeout
 613                 // task only unblocks the thread for this timed-wait. We synchronize with
 614                 // the timeout task to coordinate access to the sequence number and to
 615                 // ensure the timeout task doesn't execute until the thread has got to
 616                 // the TIMED_WAIT state.
 617                 long timeout = this.timeout;
 618                 assert timeout > 0;
 619                 synchronized (timedWaitLock()) {
 620                     byte seqNo = ++timedWaitSeqNo;
 621                     timeoutTask = schedule(() -> waitTimeoutExpired(seqNo), timeout, MILLISECONDS);
 622                     setState(newState = TIMED_WAIT);
 623                 }
 624             }
 625 
 626             // may have been notified while in transition to wait state
 627             if (notified && compareAndSetState(newState, BLOCKED)) {
 628                 // may have even been unblocked already
 629                 if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
 630                     submitRunContinuation();
 631                 }
 632                 return;
 633             }
 634 
 635             // may have been interrupted while in transition to wait state
 636             if (interruptable && interrupted && compareAndSetState(newState, UNBLOCKED)) {
 637                 submitRunContinuation();
 638                 return;
 639             }
 640             return;
 641         }
 642 
 643         assert false;
 644     }
 645 
 646     /**
 647      * Invoked after the continuation completes.
 648      */
 649     private void afterDone() {
 650         afterDone(true);
 651     }
 652 
 653     /**
 654      * Invoked after the continuation completes (or start failed). Sets the thread
 655      * state to TERMINATED and notifies anyone waiting for the thread to terminate.
 656      *
 657      * @param notifyContainer true if its container should be notified
 658      */
 659     private void afterDone(boolean notifyContainer) {
 660         assert carrierThread == null;
 661         setState(TERMINATED);
 662 
 663         // notify anyone waiting for this virtual thread to terminate
 664         CountDownLatch termination = this.termination;
 665         if (termination != null) {
 666             assert termination.getCount() == 1;
 667             termination.countDown();
 668         }
 669 
 670         // notify container
 671         if (notifyContainer) {
 672             threadContainer().remove(this);
 673         }
 674 
 675         // clear references to thread locals
 676         clearReferences();
 677     }
 678 
 679     /**
 680      * Schedules this {@code VirtualThread} to execute.
 681      *
 682      * @throws IllegalStateException if the container is shutdown or closed
 683      * @throws IllegalThreadStateException if the thread has already been started
 684      * @throws RejectedExecutionException if the scheduler cannot accept a task
 685      */
 686     @Override
 687     void start(ThreadContainer container) {
 688         if (!compareAndSetState(NEW, STARTED)) {
 689             throw new IllegalThreadStateException("Already started");
 690         }
 691 
 692         // bind thread to container
 693         assert threadContainer() == null;
 694         setThreadContainer(container);
 695 
 696         // start thread
 697         boolean addedToContainer = false;
 698         boolean started = false;
 699         try {
 700             container.add(this);  // may throw
 701             addedToContainer = true;
 702 
 703             // scoped values may be inherited
 704             inheritScopedValueBindings(container);
 705 
 706             // submit task to run thread, using externalSubmit if possible
 707             externalSubmitRunContinuationOrThrow();
 708             started = true;
 709         } finally {
 710             if (!started) {
 711                 afterDone(addedToContainer);
 712             }
 713         }
 714     }
 715 
 716     @Override
 717     public void start() {
 718         start(ThreadContainers.root());
 719     }
 720 

1338     }
1339 
1340     /**
1341      * Disallow the current thread be suspended or preempted.
1342      */
1343     private void disableSuspendAndPreempt() {
1344         notifyJvmtiDisableSuspend(true);
1345         Continuation.pin();
1346     }
1347 
1348     /**
1349      * Allow the current thread be suspended or preempted.
1350      */
1351     private void enableSuspendAndPreempt() {
1352         Continuation.unpin();
1353         notifyJvmtiDisableSuspend(false);
1354     }
1355 
1356     // -- wrappers for get/set of state, parking permit, and carrier thread --
1357 
1358     int state() {
1359         return state;  // volatile read
1360     }
1361 
1362     private void setState(int newValue) {
1363         state = newValue;  // volatile write
1364     }
1365 
1366     private boolean compareAndSetState(int expectedValue, int newValue) {
1367         return U.compareAndSetInt(this, STATE, expectedValue, newValue);
1368     }
1369 
1370     private boolean compareAndSetOnWaitingList(boolean expectedValue, boolean newValue) {
1371         return U.compareAndSetBoolean(this, ON_WAITING_LIST, expectedValue, newValue);
1372     }
1373 
1374     private void setParkPermit(boolean newValue) {
1375         if (parkPermit != newValue) {
1376             parkPermit = newValue;
1377         }
1378     }
1379 
1380     private boolean getAndSetParkPermit(boolean newValue) {
1381         if (parkPermit != newValue) {
1382             return U.getAndSetBoolean(this, PARK_PERMIT, newValue);
1383         } else {
1384             return newValue;
1385         }
1386     }
1387 
1388     Thread carrierThread() {
1389         return carrierThread;
1390     }
1391 
1392     private void setCarrierThread(Thread carrier) {
1393         // U.putReferenceRelease(this, CARRIER_THREAD, carrier);
1394         this.carrierThread = carrier;
1395     }
1396 
1397     // -- JVM TI support --
1398 
1399     @IntrinsicCandidate
1400     @JvmtiMountTransition
1401     private native void notifyJvmtiStart();
1402 
1403     @IntrinsicCandidate
1404     @JvmtiMountTransition
1405     private native void notifyJvmtiEnd();
1406 
1407     @IntrinsicCandidate
1408     @JvmtiMountTransition
1409     private native void notifyJvmtiMount(boolean hide);
1410 
1411     @IntrinsicCandidate
1412     @JvmtiMountTransition
1413     private native void notifyJvmtiUnmount(boolean hide);
1414 
1415     @IntrinsicCandidate
1416     private static native void notifyJvmtiDisableSuspend(boolean enter);
1417 
1418     private static native void registerNatives();
1419     static {
1420         registerNatives();
1421 
1422         // ensure VTHREAD_GROUP is created, may be accessed by JVMTI
1423         var group = Thread.virtualThreadGroup();
1424     }
1425 
1426     /**
1427      * Creates the default scheduler.
1428      * If the system property {@code jdk.virtualThreadScheduler.implClass} is set then
1429      * its value is the name of a class that implements java.util.concurrent.Executor.
1430      * The class is public in an exported package, has a public no-arg constructor,
1431      * and is visible to the system class loader.
1432      * If the system property is not set then the default scheduler will be a
1433      * ForkJoinPool instance.
1434      */
1435     private static Executor createDefaultScheduler() {
1436         String propValue = System.getProperty("jdk.virtualThreadScheduler.implClass");
1437         if (propValue != null) {
1438             try {
1439                 Class<?> clazz = Class.forName(propValue, true,
1440                         ClassLoader.getSystemClassLoader());
1441                 Constructor<?> ctor = clazz.getConstructor();
1442                 var scheduler = (Executor) ctor.newInstance();
1443                 System.err.println("""
1444                     WARNING: Using custom scheduler, this is an experimental feature.""");
1445                 return scheduler;
1446             } catch (Exception ex) {
1447                 throw new Error(ex);
1448             }
1449         } else {
1450             return createDefaultForkJoinPoolScheduler();
1451         }
1452     }
1453 
1454     /**
1455      * Creates the default ForkJoinPool scheduler.
1456      */
1457     private static ForkJoinPool createDefaultForkJoinPoolScheduler() {
1458         ForkJoinWorkerThreadFactory factory = pool -> new CarrierThread(pool);
1459         int parallelism, maxPoolSize, minRunnable;
1460         String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
1461         String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
1462         String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
1463         if (parallelismValue != null) {
1464             parallelism = Integer.parseInt(parallelismValue);
1465         } else {
1466             parallelism = Runtime.getRuntime().availableProcessors();
1467         }
1468         if (maxPoolSizeValue != null) {
1469             maxPoolSize = Integer.parseInt(maxPoolSizeValue);
1470             parallelism = Integer.min(parallelism, maxPoolSize);
1471         } else {
1472             maxPoolSize = Integer.max(parallelism, 256);
1473         }
1474         if (minRunnableValue != null) {
1475             minRunnable = Integer.parseInt(minRunnableValue);
1476         } else {
1477             minRunnable = Integer.max(parallelism / 2, 1);

1552                 assert changed;
1553                 vthread.unblock();
1554 
1555                 vthread = nextThread;
1556             }
1557         }
1558     }
1559 
1560     /**
1561      * Retrieves the list of virtual threads that are waiting to be unblocked, waiting
1562      * if necessary until a list of one or more threads becomes available.
1563      */
1564     private static native VirtualThread takeVirtualThreadListToUnblock();
1565 
1566     static {
1567         var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
1568                 VirtualThread::unblockVirtualThreads);
1569         unblocker.setDaemon(true);
1570         unblocker.start();
1571     }
1572 }
< prev index next >