< prev index next >

src/java.base/share/classes/java/lang/VirtualThread.java

Print this page
@@ -84,45 +84,56 @@
  
      // virtual thread state, accessed by VM
      private volatile int state;
  
      /*
-      * Virtual thread state and transitions:
+      * Virtual thread state transitions:
       *
-      *      NEW -> STARTED         // Thread.start
+      *      NEW -> STARTED         // Thread.start, schedule to run
       *  STARTED -> TERMINATED      // failed to start
       *  STARTED -> RUNNING         // first run
+      *  RUNNING -> TERMINATED      // done
       *
-      *  RUNNING -> PARKING         // Thread attempts to park
-      *  PARKING -> PARKED          // cont.yield successful, thread is parked
-      *  PARKING -> PINNED          // cont.yield failed, thread is pinned
-      *
-      *   PARKED -> RUNNABLE        // unpark or interrupted
-      *   PINNED -> RUNNABLE        // unpark or interrupted
+      *  RUNNING -> PARKING         // Thread parking with LockSupport.park
+      *  PARKING -> PARKED          // cont.yield successful, parked indefinitely
+      *  PARKING -> PINNED          // cont.yield failed, parked indefinitely on carrier
+      *   PARKED -> UNPARKED        // unparked, may be scheduled to continue
+      *   PINNED -> RUNNING         // unparked, continue execution on same carrier
+      * UNPARKED -> RUNNING         // continue execution after park
       *
-      * RUNNABLE -> RUNNING         // continue execution
+      *       RUNNING -> TIMED_PARKING   // Thread parking with LockSupport.parkNanos
+      * TIMED_PARKING -> TIMED_PARKED    // cont.yield successful, timed-parked
+      * TIMED_PARKING -> TIMED_PINNED    // cont.yield failed, timed-parked on carrier
+      *  TIMED_PARKED -> UNPARKED        // unparked, may be scheduled to continue
+      *  TIMED_PINNED -> RUNNING         // unparked, continue execution on same carrier
       *
       *  RUNNING -> YIELDING        // Thread.yield
-      * YIELDING -> RUNNABLE        // yield successful
-      * YIELDING -> RUNNING         // yield failed
-      *
-      *  RUNNING -> TERMINATED      // done
+      * YIELDING -> YIELDED         // cont.yield successful, may be scheduled to continue
+      * YIELDING -> RUNNING         // cont.yield failed
+      *  YIELDED -> RUNNING         // continue execution after Thread.yield
       */
      private static final int NEW      = 0;
      private static final int STARTED  = 1;
-     private static final int RUNNABLE = 2;     // runnable-unmounted
-     private static final int RUNNING  = 3;     // runnable-mounted
-     private static final int PARKING  = 4;
-     private static final int PARKED   = 5;     // unmounted
-     private static final int PINNED   = 6;     // mounted
-     private static final int YIELDING = 7;     // Thread.yield
+     private static final int RUNNING  = 2;     // runnable-mounted
+ 
+     // untimed and timed parking
+     private static final int PARKING       = 3;
+     private static final int PARKED        = 4;     // unmounted
+     private static final int PINNED        = 5;     // mounted
+     private static final int TIMED_PARKING = 6;
+     private static final int TIMED_PARKED  = 7;     // unmounted
+     private static final int TIMED_PINNED  = 8;     // mounted
+     private static final int UNPARKED      = 9;     // unmounted but runnable
+ 
+     // Thread.yield
+     private static final int YIELDING = 10;
+     private static final int YIELDED  = 11;         // unmounted but runnable
+ 
      private static final int TERMINATED = 99;  // final state
  
      // can be suspended from scheduling when unmounted
      private static final int SUSPENDED = 1 << 8;
-     private static final int RUNNABLE_SUSPENDED = (RUNNABLE | SUSPENDED);
-     private static final int PARKED_SUSPENDED   = (PARKED | SUSPENDED);
  
      // parking permit
      private volatile boolean parkPermit;
  
      // carrier thread when mounted, accessed by VM

@@ -178,11 +189,19 @@
          }
          @Override
          protected void onPinned(Continuation.Pinned reason) {
              if (TRACE_PINNING_MODE > 0) {
                  boolean printAll = (TRACE_PINNING_MODE == 1);
-                 PinnedThreadPrinter.printStackTrace(System.out, printAll);
+                 VirtualThread vthread = (VirtualThread) Thread.currentThread();
+                 int oldState = vthread.state();
+                 try {
+                     // avoid printing when in transition states
+                     vthread.setState(RUNNING);
+                     PinnedThreadPrinter.printStackTrace(System.out, reason, printAll);
+                 } finally {
+                     vthread.setState(oldState);
+                 }
              }
          }
          private static Runnable wrap(VirtualThread vthread, Runnable task) {
              return new Runnable() {
                  @Hidden

@@ -192,49 +211,54 @@
              };
          }
      }
  
      /**
-      * Runs or continues execution of the continuation on the current thread.
+      * Runs or continues execution on the current thread. The virtual thread is mounted
+      * on the current thread before the task runs or continues. It unmounts when the
+      * task completes or yields.
       */
+     @ChangesCurrentThread
      private void runContinuation() {
          // the carrier must be a platform thread
          if (Thread.currentThread().isVirtual()) {
              throw new WrongThreadException();
          }
  
          // set state to RUNNING
          int initialState = state();
-         if (initialState == STARTED && compareAndSetState(STARTED, RUNNING)) {
-             // first run
-         } else if (initialState == RUNNABLE && compareAndSetState(RUNNABLE, RUNNING)) {
-             // consume parking permit
-             setParkPermit(false);
+         if (initialState == STARTED || initialState == UNPARKED || initialState == YIELDED) {
+             // newly started or continue after parking/blocking/Thread.yield
+             if (!compareAndSetState(initialState, RUNNING)) {
+                 return;
+             }
+             // consume parking permit when continuing after parking
+             if (initialState == UNPARKED) {
+                 setParkPermit(false);
+             }
          } else {
              // not runnable
              return;
          }
  
-         // notify JVMTI before mount
-         notifyJvmtiMount(/*hide*/true);
- 
+         mount();
          try {
              cont.run();
          } finally {
+             unmount();
              if (cont.isDone()) {
-                 afterTerminate();
+                 afterDone();
              } else {
                  afterYield();
              }
          }
      }
  
      /**
       * Submits the runContinuation task to the scheduler. For the default scheduler,
       * and calling it on a worker thread, the task will be pushed to the local queue,
-      * otherwise it will be pushed to a submission queue.
-      *
+      * otherwise it will be pushed to an external submission queue.
       * @throws RejectedExecutionException
       */
      private void submitRunContinuation() {
          try {
              scheduler.execute(runContinuation);

@@ -243,11 +267,11 @@
              throw ree;
          }
      }
  
      /**
-      * Submits the runContinuation task to the scheduler with a lazy submit.
+      * Submits the runContinuation task to given scheduler with a lazy submit.
       * @throws RejectedExecutionException
       * @see ForkJoinPool#lazySubmit(ForkJoinTask)
       */
      private void lazySubmitRunContinuation(ForkJoinPool pool) {
          try {

@@ -257,11 +281,11 @@
              throw ree;
          }
      }
  
      /**
-      * Submits the runContinuation task to the scheduler as an external submit.
+      * Submits the runContinuation task to the given scheduler as an external submit.
       * @throws RejectedExecutionException
       * @see ForkJoinPool#externalSubmit(ForkJoinTask)
       */
      private void externalSubmitRunContinuation(ForkJoinPool pool) {
          try {

@@ -283,20 +307,16 @@
              event.commit();
          }
      }
  
      /**
-      * Runs a task in the context of this virtual thread. The virtual thread is
-      * mounted on the current (carrier) thread before the task runs. It unmounts
-      * from its carrier thread when the task completes.
+      * Runs a task in the context of this virtual thread.
       */
-     @ChangesCurrentThread
      private void run(Runnable task) {
-         assert state == RUNNING;
+         assert Thread.currentThread() == this && state == RUNNING;
  
-         // first mount
-         mount();
+         // notify JVMTI, may post VirtualThreadStart event
          notifyJvmtiStart();
  
          // emit JFR event if enabled
          if (VirtualThreadStartEvent.isTurnedOn()) {
              var event = new VirtualThreadStartEvent();

@@ -320,16 +340,12 @@
                      event.javaThreadId = threadId();
                      event.commit();
                  }
  
              } finally {
-                 // last unmount
+                 // notify JVMTI, may post VirtualThreadEnd event
                  notifyJvmtiEnd();
-                 unmount();
- 
-                 // final state
-                 setState(TERMINATED);
              }
          }
      }
  
      /**

@@ -337,10 +353,13 @@
       * return, the current thread is the virtual thread.
       */
      @ChangesCurrentThread
      @ReservedStackAccess
      private void mount() {
+         // notify JVMTI before mount
+         notifyJvmtiMount(/*hide*/true);
+ 
          // sets the carrier thread
          Thread carrier = Thread.currentCarrierThread();
          setCarrierThread(carrier);
  
          // sync up carrier thread interrupt status if needed

@@ -373,10 +392,13 @@
          // break connection to carrier thread, synchronized with interrupt
          synchronized (interruptLock) {
              setCarrierThread(null);
          }
          carrier.clearInterrupt();
+ 
+         // notify JVMTI after unmount
+         notifyJvmtiUnmount(/*hide*/false);
      }
  
      /**
       * Sets the current thread to the current carrier thread.
       */

@@ -415,90 +437,83 @@
              switchToVirtualThread(this);
          }
       }
  
      /**
-      * Unmounts this virtual thread, invokes Continuation.yield, and re-mounts the
-      * thread when continued. When enabled, JVMTI must be notified from this method.
-      * @return true if the yield was successful
+      * Invokes Continuation.yield, notifying JVMTI (if enabled) to hide frames until
+      * the continuation continues.
       */
      @Hidden
-     @ChangesCurrentThread
      private boolean yieldContinuation() {
-         // unmount
          notifyJvmtiUnmount(/*hide*/true);
-         unmount();
          try {
              return Continuation.yield(VTHREAD_SCOPE);
          } finally {
-             // re-mount
-             mount();
              notifyJvmtiMount(/*hide*/false);
          }
      }
  
      /**
       * Invoked after the continuation yields. If parking then it sets the state
       * and also re-submits the task to continue if unparked while parking.
       * If yielding due to Thread.yield then it just submits the task to continue.
       */
      private void afterYield() {
-         int s = state();
-         assert (s == PARKING || s == YIELDING) && (carrierThread == null);
+         assert carrierThread == null;
  
-         if (s == PARKING) {
-             setState(PARKED);
+         int s = state();
  
-             // notify JVMTI that unmount has completed, thread is parked
-             notifyJvmtiUnmount(/*hide*/false);
+         // LockSupport.park/parkNanos
+         if (s == PARKING || s == TIMED_PARKING) {
+             int newState = (s == PARKING) ? PARKED : TIMED_PARKED;
+             setState(newState);
  
              // may have been unparked while parking
-             if (parkPermit && compareAndSetState(PARKED, RUNNABLE)) {
+             if (parkPermit && compareAndSetState(newState, UNPARKED)) {
                  // lazy submit to continue on the current thread as carrier if possible
                  if (currentThread() instanceof CarrierThread ct) {
                      lazySubmitRunContinuation(ct.getPool());
                  } else {
                      submitRunContinuation();
                  }
  
              }
-         } else if (s == YIELDING) {   // Thread.yield
-             setState(RUNNABLE);
+             return;
+         }
  
-             // notify JVMTI that unmount has completed, thread is runnable
-             notifyJvmtiUnmount(/*hide*/false);
+         // Thread.yield
+         if (s == YIELDING) {
+             setState(YIELDED);
  
              // external submit if there are no tasks in the local task queue
              if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
                  externalSubmitRunContinuation(ct.getPool());
              } else {
                  submitRunContinuation();
              }
+             return;
          }
+ 
+         assert false;
      }
  
      /**
-      * Invoked after the thread terminates execution. It notifies anyone
-      * waiting for the thread to terminate.
+      * Invoked after the continuation completes.
       */
-     private void afterTerminate() {
-         afterTerminate(true, true);
+     private void afterDone() {
+         afterDone(true);
      }
  
      /**
-      * Invoked after the thread terminates (or start failed). This method
-      * notifies anyone waiting for the thread to terminate.
+      * Invoked after the continuation completes (or start failed). Sets the thread
+      * state to TERMINATED and notifies anyone waiting for the thread to terminate.
       *
       * @param notifyContainer true if its container should be notified
-      * @param executed true if the thread executed, false if it failed to start
       */
-     private void afterTerminate(boolean notifyContainer, boolean executed) {
-         assert (state() == TERMINATED) && (carrierThread == null);
- 
-         if (executed) {
-             notifyJvmtiUnmount(/*hide*/false);
-         }
+     private void afterDone(boolean notifyContainer) {
+         assert carrierThread == null;
+         setState(TERMINATED);
  
          // notify anyone waiting for this virtual thread to terminate
          CountDownLatch termination = this.termination;
          if (termination != null) {
              assert termination.getCount() == 1;

@@ -544,12 +559,11 @@
              // submit task to run thread
              submitRunContinuation();
              started = true;
          } finally {
              if (!started) {
-                 setState(TERMINATED);
-                 afterTerminate(addedToContainer, /*executed*/false);
+                 afterDone(addedToContainer);
              }
          }
      }
  
      @Override

@@ -613,18 +627,18 @@
          // park the thread for the waiting time
          if (nanos > 0) {
              long startTime = System.nanoTime();
  
              boolean yielded = false;
-             Future<?> unparker = scheduleUnpark(this::unpark, nanos);
-             setState(PARKING);
+             Future<?> unparker = scheduleUnpark(nanos);  // may throw OOME
+             setState(TIMED_PARKING);
              try {
                  yielded = yieldContinuation();  // may throw
              } finally {
                  assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
                  if (!yielded) {
-                     assert state() == PARKING;
+                     assert state() == TIMED_PARKING;
                      setState(RUNNING);
                  }
                  cancel(unparker);
              }
  

@@ -652,11 +666,11 @@
              event.begin();
          } catch (OutOfMemoryError e) {
              event = null;
          }
  
-         setState(PINNED);
+         setState(timed ? TIMED_PINNED : PINNED);
          try {
              if (!parkPermit) {
                  if (!timed) {
                      U.park(false, 0);
                  } else if (nanos > 0) {

@@ -678,18 +692,19 @@
              }
          }
      }
  
      /**
-      * Schedule an unpark task to run after a given delay.
+      * Schedule this virtual thread to be unparked after a given delay.
       */
      @ChangesCurrentThread
-     private Future<?> scheduleUnpark(Runnable unparker, long nanos) {
+     private Future<?> scheduleUnpark(long nanos) {
+         assert Thread.currentThread() == this;
          // need to switch to current carrier thread to avoid nested parking
          switchToCarrierThread();
          try {
-             return UNPARKER.schedule(unparker, nanos, NANOSECONDS);
+             return UNPARKER.schedule(this::unpark, nanos, NANOSECONDS);
          } finally {
              switchToVirtualThread(this);
          }
      }
  

@@ -720,26 +735,27 @@
      @ChangesCurrentThread
      void unpark() {
          Thread currentThread = Thread.currentThread();
          if (!getAndSetParkPermit(true) && currentThread != this) {
              int s = state();
-             if (s == PARKED && compareAndSetState(PARKED, RUNNABLE)) {
+             boolean parked = (s == PARKED) || (s == TIMED_PARKED);
+             if (parked && compareAndSetState(s, UNPARKED)) {
                  if (currentThread instanceof VirtualThread vthread) {
                      vthread.switchToCarrierThread();
                      try {
                          submitRunContinuation();
                      } finally {
                          switchToVirtualThread(vthread);
                      }
                  } else {
                      submitRunContinuation();
                  }
-             } else if (s == PINNED) {
-                 // unpark carrier thread when pinned.
+             } else if ((s == PINNED) || (s == TIMED_PINNED)) {
+                 // unpark carrier thread when pinned
                  synchronized (carrierThreadAccessLock()) {
                      Thread carrier = carrierThread;
-                     if (carrier != null && state() == PINNED) {
+                     if (carrier != null && ((s = state()) == PINNED || s == TIMED_PINNED)) {
                          U.unpark(carrier);
                      }
                  }
              }
          }

@@ -872,22 +888,23 @@
          return oldValue;
      }
  
      @Override
      Thread.State threadState() {
-         switch (state()) {
+         int s = state();
+         switch (s & ~SUSPENDED) {
              case NEW:
                  return Thread.State.NEW;
              case STARTED:
                  // return NEW if thread container not yet set
                  if (threadContainer() == null) {
                      return Thread.State.NEW;
                  } else {
                      return Thread.State.RUNNABLE;
                  }
-             case RUNNABLE:
-             case RUNNABLE_SUSPENDED:
+             case UNPARKED:
+             case YIELDED:
                  // runnable, not mounted
                  return Thread.State.RUNNABLE;
              case RUNNING:
                  // if mounted then return state of carrier thread
                  synchronized (carrierThreadAccessLock()) {

@@ -897,17 +914,20 @@
                      }
                  }
                  // runnable, mounted
                  return Thread.State.RUNNABLE;
              case PARKING:
+             case TIMED_PARKING:
              case YIELDING:
-                 // runnable, mounted, not yet waiting
+                 // runnable, in transition
                  return Thread.State.RUNNABLE;
              case PARKED:
-             case PARKED_SUSPENDED:
              case PINNED:
-                 return Thread.State.WAITING;
+                 return State.WAITING;
+             case TIMED_PARKED:
+             case TIMED_PINNED:
+                 return State.TIMED_WAITING;
              case TERMINATED:
                  return Thread.State.TERMINATED;
              default:
                  throw new InternalError();
          }

@@ -938,39 +958,62 @@
          return stackTrace;
      }
  
      /**
       * Returns the stack trace for this virtual thread if it is unmounted.
-      * Returns null if the thread is in another state.
+      * Returns null if the thread is mounted or in transition.
       */
      private StackTraceElement[] tryGetStackTrace() {
-         int initialState = state();
-         return switch (initialState) {
-             case RUNNABLE, PARKED -> {
-                 int suspendedState = initialState | SUSPENDED;
-                 if (compareAndSetState(initialState, suspendedState)) {
-                     try {
-                         yield cont.getStackTrace();
-                     } finally {
-                         assert state == suspendedState;
-                         setState(initialState);
- 
-                         // re-submit if runnable
-                         // re-submit if unparked while suspended
-                         if (initialState == RUNNABLE
-                             || (parkPermit && compareAndSetState(PARKED, RUNNABLE))) {
-                             try {
-                                 submitRunContinuation();
-                             } catch (RejectedExecutionException ignore) { }
-                         }
-                     }
-                 }
-                 yield null;
+         int initialState = state() & ~SUSPENDED;
+         switch (initialState) {
+             case NEW, STARTED, TERMINATED -> {
+                 return new StackTraceElement[0];  // unmounted, empty stack
+             }
+             case RUNNING, PINNED, TIMED_PINNED -> {
+                 return null;   // mounted
+             }
+             case PARKED, TIMED_PARKED -> {
+                 // unmounted, not runnable
              }
-             case NEW, STARTED, TERMINATED ->  new StackTraceElement[0];  // empty stack
-             default -> null;
+             case UNPARKED, YIELDED -> {
+                 // unmounted, runnable
+             }
+             case PARKING, TIMED_PARKING, YIELDING -> {
+                 return null;  // in transition
+             }
+             default -> throw new InternalError("" + initialState);
+         }
+ 
+         // thread is unmounted, prevent it from continuing
+         int suspendedState = initialState | SUSPENDED;
+         if (!compareAndSetState(initialState, suspendedState)) {
+             return null;
+         }
+ 
+         // get stack trace and restore state
+         StackTraceElement[] stack;
+         try {
+             stack = cont.getStackTrace();
+         } finally {
+             assert state == suspendedState;
+             setState(initialState);
+         }
+         boolean resubmit = switch (initialState) {
+             case UNPARKED, YIELDED -> {
+                 // resubmit as task may have run while suspended
+                 yield true;
+             }
+             case PARKED, TIMED_PARKED -> {
+                 // resubmit if unparked while suspended
+                 yield parkPermit && compareAndSetState(initialState, UNPARKED);
+             }
+             default -> throw new InternalError();
          };
+         if (resubmit) {
+             submitRunContinuation();
+         }
+         return stack;
      }
  
      @Override
      public String toString() {
          StringBuilder sb = new StringBuilder("VirtualThread[#");
< prev index next >