< prev index next >

test/jdk/java/lang/Thread/virtual/CustomScheduler.java

Print this page
*** 23,60 ***
  
  /**
   * @test
   * @summary Test virtual threads using a custom scheduler
   * @requires vm.continuations
-  * @modules java.base/java.lang:+open
   * @library /test/lib
   * @run junit CustomScheduler
   */
  
  import java.lang.reflect.Field;
  import java.time.Duration;
  import java.util.ArrayList;
  import java.util.List;
! import java.util.concurrent.*;
  import java.util.concurrent.atomic.AtomicBoolean;
  import java.util.concurrent.atomic.AtomicInteger;
  import java.util.concurrent.atomic.AtomicReference;
  import java.util.concurrent.locks.LockSupport;
  
- import jdk.test.lib.thread.VThreadScheduler;
  import jdk.test.lib.thread.VThreadRunner;
  import org.junit.jupiter.api.Test;
  import org.junit.jupiter.api.BeforeAll;
  import org.junit.jupiter.api.AfterAll;
  import static org.junit.jupiter.api.Assertions.*;
  import static org.junit.jupiter.api.Assumptions.*;
  
  class CustomScheduler {
!     private static ExecutorService scheduler1;
!     private static ExecutorService scheduler2;
  
      @BeforeAll
!     static void setup() {
!         scheduler1 = Executors.newFixedThreadPool(1);
!         scheduler2 = Executors.newFixedThreadPool(1);
      }
  
      @AfterAll
      static void shutdown() {
!         scheduler1.shutdown();
!         scheduler2.shutdown();
      }
  
      /**
       * Test platform thread creating a virtual thread that uses a custom scheduler.
       */
      @Test
      void testCustomScheduler1() throws Exception {
!         var ref = new AtomicReference<Executor>();
!         ThreadFactory factory = VThreadScheduler.virtualThreadFactory(scheduler1);
!         Thread thread = factory.newThread(() -> {
-             ref.set(VThreadScheduler.scheduler(Thread.currentThread()));
          });
-         thread.start();
          thread.join();
          assertTrue(ref.get() == scheduler1);
      }
  
      /**
--- 23,67 ---
  
  /**
   * @test
   * @summary Test virtual threads using a custom scheduler
   * @requires vm.continuations
   * @library /test/lib
   * @run junit CustomScheduler
   */
  
  import java.lang.reflect.Field;
  import java.time.Duration;
  import java.util.ArrayList;
  import java.util.List;
! import java.util.concurrent.Executors;
+ import java.util.concurrent.ExecutorService;
  import java.util.concurrent.atomic.AtomicBoolean;
  import java.util.concurrent.atomic.AtomicInteger;
  import java.util.concurrent.atomic.AtomicReference;
  import java.util.concurrent.locks.LockSupport;
  
  import jdk.test.lib.thread.VThreadRunner;
  import org.junit.jupiter.api.Test;
  import org.junit.jupiter.api.BeforeAll;
  import org.junit.jupiter.api.AfterAll;
  import static org.junit.jupiter.api.Assertions.*;
  import static org.junit.jupiter.api.Assumptions.*;
  
  class CustomScheduler {
!     private static Thread.VirtualThreadScheduler defaultScheduler;
!     private static ExecutorService threadPool1, threadPool2;
+     private static Thread.VirtualThreadScheduler scheduler1, scheduler2;
  
      @BeforeAll
!     static void setup() throws Exception {
!         var ref = new AtomicReference<Thread.VirtualThreadScheduler>();
!         Thread thread = Thread.startVirtualThread(() -> {
+             ref.set(Thread.VirtualThreadScheduler.current());
+         });
+         thread.join();
+         defaultScheduler = ref.get();
+ 
+         threadPool1 = Executors.newFixedThreadPool(1);
+         threadPool2 = Executors.newFixedThreadPool(1);
+         scheduler1 = Thread.VirtualThreadScheduler.adapt(threadPool1);
+         scheduler2 = Thread.VirtualThreadScheduler.adapt(threadPool2);
      }
  
      @AfterAll
      static void shutdown() {
!         threadPool1.shutdown();
!         threadPool2.shutdown();
      }
  
      /**
       * Test platform thread creating a virtual thread that uses a custom scheduler.
       */
      @Test
      void testCustomScheduler1() throws Exception {
!         var ref = new AtomicReference<Thread.VirtualThreadScheduler>();
!         Thread thread = Thread.ofVirtual().scheduler(scheduler1).start(() -> {
!             ref.set(Thread.VirtualThreadScheduler.current());
          });
          thread.join();
          assertTrue(ref.get() == scheduler1);
      }
  
      /**

*** 86,62 ***
      void testCustomScheduler2() throws Exception {
          VThreadRunner.run(this::testCustomScheduler1);
      }
  
      /**
!      * Test virtual thread using custom scheduler creating a virtual thread.
!      * The scheduler should be inherited.
       */
      @Test
      void testCustomScheduler3() throws Exception {
!         var ref = new AtomicReference<Executor>();
!         ThreadFactory factory = VThreadScheduler.virtualThreadFactory(scheduler1);
-         Thread thread = factory.newThread(() -> {
              try {
                  Thread.ofVirtual().start(() -> {
!                     ref.set(VThreadScheduler.scheduler(Thread.currentThread()));
                  }).join();
              } catch (Exception e) {
                  e.printStackTrace();
              }
          });
-         thread.start();
          thread.join();
!         assertTrue(ref.get() == scheduler1);
      }
  
      /**
       * Test virtual thread using custom scheduler creating a virtual thread
       * that uses a different custom scheduler.
       */
      @Test
      void testCustomScheduler4() throws Exception {
!         var ref = new AtomicReference<Executor>();
!         ThreadFactory factory1 = VThreadScheduler.virtualThreadFactory(scheduler1);
-         ThreadFactory factory2 = VThreadScheduler.virtualThreadFactory(scheduler2);
-         Thread thread1 = factory1.newThread(() -> {
              try {
!                 Thread thread2 = factory2.newThread(() -> {
!                     ref.set(VThreadScheduler.scheduler(Thread.currentThread()));
                  });
-                 thread2.start();
                  thread2.join();
              } catch (Exception e) {
                  e.printStackTrace();
              }
          });
-         thread1.start();
          thread1.join();
          assertTrue(ref.get() == scheduler2);
      }
  
      /**
       * Test running task on a virtual thread, should thrown WrongThreadException.
       */
      @Test
      void testBadCarrier() {
!         Executor scheduler = (task) -> {
              var exc = new AtomicReference<Throwable>();
              try {
                  Thread.ofVirtual().start(() -> {
                      try {
                          task.run();
--- 93,56 ---
      void testCustomScheduler2() throws Exception {
          VThreadRunner.run(this::testCustomScheduler1);
      }
  
      /**
!      * Test virtual thread using custom scheduler creating a virtual thread that uses
!      * the default scheduler.
       */
      @Test
      void testCustomScheduler3() throws Exception {
!         var ref = new AtomicReference<Thread.VirtualThreadScheduler>();
!         Thread thread = Thread.ofVirtual().scheduler(scheduler1).start(() -> {
              try {
                  Thread.ofVirtual().start(() -> {
!                     ref.set(Thread.VirtualThreadScheduler.current());
                  }).join();
              } catch (Exception e) {
                  e.printStackTrace();
              }
          });
          thread.join();
!         assertTrue(ref.get() == defaultScheduler);
      }
  
      /**
       * Test virtual thread using custom scheduler creating a virtual thread
       * that uses a different custom scheduler.
       */
      @Test
      void testCustomScheduler4() throws Exception {
!         var ref = new AtomicReference<Thread.VirtualThreadScheduler>();
!         Thread thread1 = Thread.ofVirtual().scheduler(scheduler1).start(() -> {
              try {
!                 Thread thread2 = Thread.ofVirtual().scheduler(scheduler2).start(() -> {
!                     ref.set(Thread.VirtualThreadScheduler.current());
                  });
                  thread2.join();
              } catch (Exception e) {
                  e.printStackTrace();
              }
          });
          thread1.join();
          assertTrue(ref.get() == scheduler2);
      }
  
      /**
       * Test running task on a virtual thread, should thrown WrongThreadException.
       */
      @Test
      void testBadCarrier() {
!         Thread.VirtualThreadScheduler scheduler = (_, task) -> {
              var exc = new AtomicReference<Throwable>();
              try {
                  Thread.ofVirtual().start(() -> {
                      try {
                          task.run();

*** 153,13 ***
              } catch (InterruptedException e) {
                  fail();
              }
              assertTrue(exc.get() instanceof WrongThreadException);
          };
!         ThreadFactory factory = VThreadScheduler.virtualThreadFactory(scheduler);
-         Thread thread = factory.newThread(LockSupport::park);
-         thread.start();
      }
  
      /**
       * Test parking with the virtual thread interrupt set, should not leak to the
       * carrier thread when the task completes.
--- 154,11 ---
              } catch (InterruptedException e) {
                  fail();
              }
              assertTrue(exc.get() instanceof WrongThreadException);
          };
!         Thread.ofVirtual().scheduler(scheduler).start(LockSupport::park);
      }
  
      /**
       * Test parking with the virtual thread interrupt set, should not leak to the
       * carrier thread when the task completes.

*** 167,16 ***
      @Test
      void testParkWithInterruptSet() {
          Thread carrier = Thread.currentThread();
          assumeFalse(carrier.isVirtual(), "Main thread is a virtual thread");
          try {
!             ThreadFactory factory = VThreadScheduler.virtualThreadFactory(Runnable::run);
!             Thread vthread = factory.newThread(() -> {
                  Thread.currentThread().interrupt();
                  Thread.yield();
              });
-             vthread.start();
              assertTrue(vthread.isInterrupted());
              assertFalse(carrier.isInterrupted());
          } finally {
              Thread.interrupted();
          }
--- 166,15 ---
      @Test
      void testParkWithInterruptSet() {
          Thread carrier = Thread.currentThread();
          assumeFalse(carrier.isVirtual(), "Main thread is a virtual thread");
          try {
!             var scheduler = Thread.VirtualThreadScheduler.adapt(Runnable::run);
!             Thread vthread = Thread.ofVirtual().scheduler(scheduler).start(() -> {
                  Thread.currentThread().interrupt();
                  Thread.yield();
              });
              assertTrue(vthread.isInterrupted());
              assertFalse(carrier.isInterrupted());
          } finally {
              Thread.interrupted();
          }

*** 189,15 ***
      @Test
      void testTerminateWithInterruptSet() {
          Thread carrier = Thread.currentThread();
          assumeFalse(carrier.isVirtual(), "Main thread is a virtual thread");
          try {
!             ThreadFactory factory = VThreadScheduler.virtualThreadFactory(Runnable::run);
!             Thread vthread = factory.newThread(() -> {
                  Thread.currentThread().interrupt();
              });
-             vthread.start();
              assertTrue(vthread.isInterrupted());
              assertFalse(carrier.isInterrupted());
          } finally {
              Thread.interrupted();
          }
--- 187,14 ---
      @Test
      void testTerminateWithInterruptSet() {
          Thread carrier = Thread.currentThread();
          assumeFalse(carrier.isVirtual(), "Main thread is a virtual thread");
          try {
!             var scheduler = Thread.VirtualThreadScheduler.adapt(Runnable::run);
!             Thread vthread = Thread.ofVirtual().scheduler(scheduler).start(() -> {
                  Thread.currentThread().interrupt();
              });
              assertTrue(vthread.isInterrupted());
              assertFalse(carrier.isInterrupted());
          } finally {
              Thread.interrupted();
          }

*** 207,21 ***
       * Test running task with the carrier interrupt status set.
       */
      @Test
      void testRunWithInterruptSet() throws Exception {
          assumeFalse(Thread.currentThread().isVirtual(), "Main thread is a virtual thread");
!         Executor scheduler = (task) -> {
              Thread.currentThread().interrupt();
              task.run();
!         };
-         ThreadFactory factory = VThreadScheduler.virtualThreadFactory(scheduler);
          try {
              AtomicBoolean interrupted = new AtomicBoolean();
!             Thread vthread = factory.newThread(() -> {
                  interrupted.set(Thread.currentThread().isInterrupted());
              });
-             vthread.start();
              assertFalse(vthread.isInterrupted());
          } finally {
              Thread.interrupted();
          }
      }
--- 204,19 ---
       * Test running task with the carrier interrupt status set.
       */
      @Test
      void testRunWithInterruptSet() throws Exception {
          assumeFalse(Thread.currentThread().isVirtual(), "Main thread is a virtual thread");
!         var scheduler = Thread.VirtualThreadScheduler.adapt(task -> {
              Thread.currentThread().interrupt();
              task.run();
!         });
          try {
              AtomicBoolean interrupted = new AtomicBoolean();
!             Thread vthread = Thread.ofVirtual().scheduler(scheduler).start(() -> {
                  interrupted.set(Thread.currentThread().isInterrupted());
              });
              assertFalse(vthread.isInterrupted());
          } finally {
              Thread.interrupted();
          }
      }

*** 229,42 ***
      /**
       * Test custom scheduler throwing OOME when starting a thread.
       */
      @Test
      void testThreadStartOOME() throws Exception {
!         Executor scheduler = task -> {
              System.err.println("OutOfMemoryError");
              throw new OutOfMemoryError();
!         };
!         ThreadFactory factory = VThreadScheduler.virtualThreadFactory(scheduler);
-         Thread thread = factory.newThread(() -> { });
          assertThrows(OutOfMemoryError.class, thread::start);
      }
  
      /**
       * Test custom scheduler throwing OOME when unparking a thread.
       */
      @Test
      void testThreadUnparkOOME() throws Exception {
          try (ExecutorService executor = Executors.newFixedThreadPool(1)) {
              AtomicInteger counter = new AtomicInteger();
!             Executor scheduler = task -> {
                  switch (counter.getAndIncrement()) {
                      case 0 -> executor.execute(task);             // Thread.start
                      case 1, 2 -> {                                // unpark attempt 1+2
                          System.err.println("OutOfMemoryError");
                          throw new OutOfMemoryError();
                      }
                      default -> executor.execute(task);
                  }
                  executor.execute(task);
!             };
  
              // start thread and wait for it to park
!             ThreadFactory factory = VThreadScheduler.virtualThreadFactory(scheduler);
-             var thread = factory.newThread(LockSupport::park);
-             thread.start();
              await(thread, Thread.State.WAITING);
  
              // unpark thread, this should retry until OOME is not thrown
              LockSupport.unpark(thread);
              thread.join();
--- 224,39 ---
      /**
       * Test custom scheduler throwing OOME when starting a thread.
       */
      @Test
      void testThreadStartOOME() throws Exception {
!         var scheduler = Thread.VirtualThreadScheduler.adapt(task -> {
              System.err.println("OutOfMemoryError");
              throw new OutOfMemoryError();
!         });
!         Thread thread = Thread.ofVirtual().scheduler(scheduler).unstarted(() -> { });
          assertThrows(OutOfMemoryError.class, thread::start);
      }
  
      /**
       * Test custom scheduler throwing OOME when unparking a thread.
       */
      @Test
      void testThreadUnparkOOME() throws Exception {
          try (ExecutorService executor = Executors.newFixedThreadPool(1)) {
              AtomicInteger counter = new AtomicInteger();
!             var scheduler = Thread.VirtualThreadScheduler.adapt(task -> {
                  switch (counter.getAndIncrement()) {
                      case 0 -> executor.execute(task);             // Thread.start
                      case 1, 2 -> {                                // unpark attempt 1+2
                          System.err.println("OutOfMemoryError");
                          throw new OutOfMemoryError();
                      }
                      default -> executor.execute(task);
                  }
                  executor.execute(task);
!             });
  
              // start thread and wait for it to park
!             var thread = Thread.ofVirtual().scheduler(scheduler).start(LockSupport::park);
              await(thread, Thread.State.WAITING);
  
              // unpark thread, this should retry until OOME is not thrown
              LockSupport.unpark(thread);
              thread.join();
< prev index next >