< prev index next >

src/java.base/share/classes/java/lang/VirtualThread.java

Print this page
@@ -1,7 +1,7 @@
  /*
-  * Copyright (c) 2018, 2022, Oracle and/or its affiliates. All rights reserved.
+  * Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved.
   * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   *
   * This code is free software; you can redistribute it and/or modify it
   * under the terms of the GNU General Public License version 2 only, as
   * published by the Free Software Foundation.  Oracle designates this

@@ -27,10 +27,11 @@
  import java.lang.ref.Reference;
  import java.security.AccessController;
  import java.security.PrivilegedAction;
  import java.util.Locale;
  import java.util.Objects;
+ import java.util.concurrent.Callable;
  import java.util.concurrent.CountDownLatch;
  import java.util.concurrent.Executor;
  import java.util.concurrent.Executors;
  import java.util.concurrent.ForkJoinPool;
  import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;

@@ -38,11 +39,10 @@
  import java.util.concurrent.ForkJoinWorkerThread;
  import java.util.concurrent.Future;
  import java.util.concurrent.RejectedExecutionException;
  import java.util.concurrent.ScheduledExecutorService;
  import java.util.concurrent.ScheduledThreadPoolExecutor;
- import jdk.internal.event.ThreadSleepEvent;
  import jdk.internal.event.VirtualThreadEndEvent;
  import jdk.internal.event.VirtualThreadPinnedEvent;
  import jdk.internal.event.VirtualThreadStartEvent;
  import jdk.internal.event.VirtualThreadSubmitFailedEvent;
  import jdk.internal.misc.CarrierThread;

@@ -56,10 +56,11 @@
  import jdk.internal.vm.annotation.ChangesCurrentThread;
  import jdk.internal.vm.annotation.ForceInline;
  import jdk.internal.vm.annotation.Hidden;
  import jdk.internal.vm.annotation.IntrinsicCandidate;
  import jdk.internal.vm.annotation.JvmtiMountTransition;
+ import jdk.internal.vm.annotation.ReservedStackAccess;
  import sun.nio.ch.Interruptible;
  import sun.security.action.GetPropertyAction;
  import static java.util.concurrent.TimeUnit.*;
  
  /**

@@ -173,19 +174,27 @@
      /**
       * The continuation that a virtual thread executes.
       */
      private static class VThreadContinuation extends Continuation {
          VThreadContinuation(VirtualThread vthread, Runnable task) {
-             super(VTHREAD_SCOPE, () -> vthread.run(task));
+             super(VTHREAD_SCOPE, wrap(vthread, task));
          }
          @Override
          protected void onPinned(Continuation.Pinned reason) {
              if (TRACE_PINNING_MODE > 0) {
                  boolean printAll = (TRACE_PINNING_MODE == 1);
                  PinnedThreadPrinter.printStackTrace(System.out, printAll);
              }
          }
+         private static Runnable wrap(VirtualThread vthread, Runnable task) {
+             return new Runnable() {
+                 @Hidden
+                 public void run() {
+                     vthread.run(task);
+                 }
+             };
+         }
      }
  
      /**
       * Runs or continues execution of the continuation on the current thread.
       */

@@ -209,17 +218,17 @@
              // not runnable
              return;
          }
  
          // notify JVMTI before mount
-         notifyJvmtiMount(true, firstRun);
+         notifyJvmtiMount(/*hide*/true, firstRun);
  
          try {
              cont.run();
          } finally {
              if (cont.isDone()) {
-                 afterTerminate(/*executed*/ true);
+                 afterTerminate();
              } else {
                  afterYield();
              }
          }
      }

@@ -289,11 +298,11 @@
      private void run(Runnable task) {
          assert state == RUNNING;
  
          // first mount
          mount();
-         notifyJvmtiMount(false, true);
+         notifyJvmtiMount(/*hide*/false, /*first*/true);
  
          // emit JFR event if enabled
          if (VirtualThreadStartEvent.isTurnedOn()) {
              var event = new VirtualThreadStartEvent();
              event.javaThreadId = threadId();

@@ -317,11 +326,11 @@
                      event.commit();
                  }
  
              } finally {
                  // last unmount
-                 notifyJvmtiUnmount(true, true);
+                 notifyJvmtiUnmount(/*hide*/true, /*last*/true);
                  unmount();
  
                  // final state
                  setState(TERMINATED);
              }

@@ -339,10 +348,11 @@
      /**
       * Mounts this virtual thread onto the current platform thread. On
       * return, the current thread is the virtual thread.
       */
      @ChangesCurrentThread
+     @ReservedStackAccess
      private void mount() {
          // sets the carrier thread
          Thread carrier = Thread.currentCarrierThread();
          setCarrierThread(carrier);
  

@@ -365,10 +375,11 @@
      /**
       * Unmounts this virtual thread from the carrier. On return, the
       * current thread is the current platform thread.
       */
      @ChangesCurrentThread
+     @ReservedStackAccess
      private void unmount() {
          // set Thread.currentThread() to return the platform thread
          Thread carrier = this.carrierThread;
          carrier.setCurrentThread(carrier);
  

@@ -402,26 +413,41 @@
          assert carrier == Thread.currentCarrierThread();
          carrier.setCurrentThread(vthread);
          notifyJvmtiHideFrames(false);
      }
  
+     /**
+      * Executes the given value returning task on the current carrier thread.
+      */
+     @ChangesCurrentThread
+     <V> V executeOnCarrierThread(Callable<V> task) throws Exception {
+         assert Thread.currentThread() == this;
+         switchToCarrierThread();
+         try {
+             return task.call();
+         } finally {
+             switchToVirtualThread(this);
+         }
+      }
+ 
      /**
       * Unmounts this virtual thread, invokes Continuation.yield, and re-mounts the
       * thread when continued. When enabled, JVMTI must be notified from this method.
       * @return true if the yield was successful
       */
+     @Hidden
      @ChangesCurrentThread
      private boolean yieldContinuation() {
          // unmount
-         notifyJvmtiUnmount(true, false);
+         notifyJvmtiUnmount(/*hide*/true, /*last*/false);
          unmount();
          try {
              return Continuation.yield(VTHREAD_SCOPE);
          } finally {
              // re-mount
              mount();
-             notifyJvmtiMount(false, false);
+             notifyJvmtiMount(/*hide*/false, /*first*/false);
          }
      }
  
      /**
       * Invoked after the continuation yields. If parking then it sets the state

@@ -434,11 +460,11 @@
  
          if (s == PARKING) {
              setState(PARKED);
  
              // notify JVMTI that unmount has completed, thread is parked
-             notifyJvmtiUnmount(false, false);
+             notifyJvmtiUnmount(/*hide*/false, /*last*/false);
  
              // may have been unparked while parking
              if (parkPermit && compareAndSetState(PARKED, RUNNABLE)) {
                  // lazy submit to continue on the current thread as carrier if possible
                  if (currentThread() instanceof CarrierThread ct) {

@@ -450,48 +476,57 @@
              }
          } else if (s == YIELDING) {   // Thread.yield
              setState(RUNNABLE);
  
              // notify JVMTI that unmount has completed, thread is runnable
-             notifyJvmtiUnmount(false, false);
+             notifyJvmtiUnmount(/*hide*/false, /*last*/false);
  
              // external submit if there are no tasks in the local task queue
              if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
                  externalSubmitRunContinuation(ct.getPool());
              } else {
                  submitRunContinuation();
              }
          }
      }
  
+     /**
+      * Invoked after the thread terminates execution. It notifies anyone
+      * waiting for the thread to terminate.
+      */
+     private void afterTerminate() {
+         afterTerminate(true, true);
+     }
+ 
      /**
       * Invoked after the thread terminates (or start failed). This method
       * notifies anyone waiting for the thread to terminate.
       *
+      * @param notifyContainer true if its container should be notified
       * @param executed true if the thread executed, false if it failed to start
       */
-     private void afterTerminate(boolean executed) {
+     private void afterTerminate(boolean notifyContainer, boolean executed) {
          assert (state() == TERMINATED) && (carrierThread == null);
  
          if (executed) {
-             notifyJvmtiUnmount(false, true);
+             notifyJvmtiUnmount(/*hide*/false, /*last*/true);
          }
  
          // notify anyone waiting for this virtual thread to terminate
          CountDownLatch termination = this.termination;
          if (termination != null) {
              assert termination.getCount() == 1;
              termination.countDown();
          }
  
-         if (executed) {
-             // notify container if thread executed
+         // notify container
+         if (notifyContainer) {
              threadContainer().onExit(this);
- 
-             // clear references to thread locals
-             clearReferences();
          }
+ 
+         // clear references to thread locals
+         clearReferences();
      }
  
      /**
       * Schedules this {@code VirtualThread} to execute.
       *

@@ -504,27 +539,30 @@
          if (!compareAndSetState(NEW, STARTED)) {
              throw new IllegalThreadStateException("Already started");
          }
  
          // bind thread to container
+         assert threadContainer() == null;
          setThreadContainer(container);
  
          // start thread
+         boolean addedToContainer = false;
          boolean started = false;
-         container.onStart(this); // may throw
          try {
+             container.onStart(this);  // may throw
+             addedToContainer = true;
+ 
              // scoped values may be inherited
              inheritScopedValueBindings(container);
  
              // submit task to run thread
              submitRunContinuation();
              started = true;
          } finally {
              if (!started) {
                  setState(TERMINATED);
-                 container.onExit(this);
-                 afterTerminate(/*executed*/ false);
+                 afterTerminate(addedToContainer, /*executed*/false);
              }
          }
      }
  
      @Override

@@ -549,18 +587,25 @@
          // complete immediately if parking permit available or interrupted
          if (getAndSetParkPermit(false) || interrupted)
              return;
  
          // park the thread
+         boolean yielded = false;
          setState(PARKING);
          try {
-             if (!yieldContinuation()) {
-                 // park on the carrier thread when pinned
-                 parkOnCarrierThread(false, 0);
-             }
+             yielded = yieldContinuation();  // may throw
          } finally {
-             assert (Thread.currentThread() == this) && (state() == RUNNING);
+             assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
+             if (!yielded) {
+                 assert state() == PARKING;
+                 setState(RUNNING);
+             }
+         }
+ 
+         // park on the carrier thread when pinned
+         if (!yielded) {
+             parkOnCarrierThread(false, 0);
          }
      }
  
      /**
       * Parks up to the given waiting time or until unparked or interrupted.

@@ -580,18 +625,21 @@
  
          // park the thread for the waiting time
          if (nanos > 0) {
              long startTime = System.nanoTime();
  
-             boolean yielded;
+             boolean yielded = false;
              Future<?> unparker = scheduleUnpark(this::unpark, nanos);
              setState(PARKING);
              try {
-                 yielded = yieldContinuation();
+                 yielded = yieldContinuation();  // may throw
              } finally {
-                 assert (Thread.currentThread() == this)
-                         && (state() == RUNNING || state() == PARKING);
+                 assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
+                 if (!yielded) {
+                     assert state() == PARKING;
+                     setState(RUNNING);
+                 }
                  cancel(unparker);
              }
  
              // park on carrier thread for remaining time when pinned
              if (!yielded) {

@@ -609,14 +657,19 @@
       * interrupt status will be propagated to the carrier thread.
       * @param timed true for a timed park, false for untimed
       * @param nanos the waiting time in nanoseconds
       */
      private void parkOnCarrierThread(boolean timed, long nanos) {
-         assert state() == PARKING;
+         assert state() == RUNNING;
  
-         var pinnedEvent = new VirtualThreadPinnedEvent();
-         pinnedEvent.begin();
+         VirtualThreadPinnedEvent event;
+         try {
+             event = new VirtualThreadPinnedEvent();
+             event.begin();
+         } catch (OutOfMemoryError e) {
+             event = null;
+         }
  
          setState(PINNED);
          try {
              if (!parkPermit) {
                  if (!timed) {

@@ -630,11 +683,17 @@
          }
  
          // consume parking permit
          setParkPermit(false);
  
-         pinnedEvent.commit();
+         if (event != null) {
+             try {
+                 event.commit();
+             } catch (OutOfMemoryError e) {
+                 // ignore
+             }
+         }
      }
  
      /**
       * Schedule an unpark task to run after a given delay.
       */

@@ -705,57 +764,37 @@
       * Attempts to yield the current virtual thread (Thread.yield).
       */
      void tryYield() {
          assert Thread.currentThread() == this;
          setState(YIELDING);
+         boolean yielded = false;
          try {
-             yieldContinuation();
+             yielded = yieldContinuation();  // may throw
          } finally {
-             assert Thread.currentThread() == this;
-             if (state() != RUNNING) {
+             assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
+             if (!yielded) {
                  assert state() == YIELDING;
                  setState(RUNNING);
              }
          }
      }
  
-     /**
-      * Sleep the current virtual thread for the given sleep time.
-      *
-      * @param nanos the maximum number of nanoseconds to sleep
-      * @throws InterruptedException if interrupted while sleeping
-      */
-     void sleepNanos(long nanos) throws InterruptedException {
-         assert Thread.currentThread() == this;
-         if (nanos >= 0) {
-             if (ThreadSleepEvent.isTurnedOn()) {
-                 ThreadSleepEvent event = new ThreadSleepEvent();
-                 try {
-                     event.time = nanos;
-                     event.begin();
-                     doSleepNanos(nanos);
-                 } finally {
-                     event.commit();
-                 }
-             } else {
-                 doSleepNanos(nanos);
-             }
-         }
-     }
- 
      /**
       * Sleep the current thread for the given sleep time (in nanoseconds). If
       * nanos is 0 then the thread will attempt to yield.
       *
       * @implNote This implementation parks the thread for the given sleeping time
       * and will therefore be observed in PARKED state during the sleep. Parking
       * will consume the parking permit so this method makes available the parking
       * permit after the sleep. This may be observed as a spurious, but benign,
       * wakeup when the thread subsequently attempts to park.
+      *
+      * @param nanos the maximum number of nanoseconds to sleep
+      * @throws InterruptedException if interrupted while sleeping
       */
-     private void doSleepNanos(long nanos) throws InterruptedException {
-         assert nanos >= 0;
+     void sleepNanos(long nanos) throws InterruptedException {
+         assert Thread.currentThread() == this && nanos >= 0;
          if (getAndClearInterrupt())
              throw new InterruptedException();
          if (nanos == 0) {
              tryYield();
          } else {
< prev index next >