< prev index next >

test/jdk/java/nio/channels/vthread/BlockingChannelOps.java

Print this page
*** 24,19 ***
  /*
   * @test id=default
   * @bug 8284161
   * @summary Test virtual threads doing blocking I/O on NIO channels
   * @library /test/lib
!  * @run junit/timeout=480 BlockingChannelOps
   */
  
  /*
   * @test id=poller-modes
   * @requires (os.family == "linux") | (os.family == "mac")
   * @library /test/lib
   * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 BlockingChannelOps
   * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 BlockingChannelOps
   */
  
  /*
   * @test id=no-vmcontinuations
   * @requires vm.continuations
--- 24,29 ---
  /*
   * @test id=default
   * @bug 8284161
   * @summary Test virtual threads doing blocking I/O on NIO channels
   * @library /test/lib
!  * @run junit/othervm/timeout=480 BlockingChannelOps
   */
  
  /*
   * @test id=poller-modes
   * @requires (os.family == "linux") | (os.family == "mac")
   * @library /test/lib
   * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 BlockingChannelOps
   * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 BlockingChannelOps
+  * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 BlockingChannelOps
+  */
+ 
+ /*
+  * @test id=io_uring
+  * @requires os.family == "linux"
+  * @library /test/lib
+  * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 -Djdk.io_uring=true BlockingChannelOps
+  * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 -Djdk.io_uring=true BlockingChannelOps
+  * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 -Djdk.io_uring=true BlockingChannelOps
   */
  
  /*
   * @test id=no-vmcontinuations
   * @requires vm.continuations

*** 60,24 ***
  import java.nio.channels.Pipe;
  import java.nio.channels.ReadableByteChannel;
  import java.nio.channels.ServerSocketChannel;
  import java.nio.channels.SocketChannel;
  import java.nio.channels.WritableByteChannel;
  import java.util.concurrent.locks.LockSupport;
  
  import jdk.test.lib.thread.VThreadRunner;
! import org.junit.jupiter.api.Test;
  import static org.junit.jupiter.api.Assertions.*;
  
  class BlockingChannelOps {
  
      /**
       * SocketChannel read/write, no blocking.
       */
!     @Test
!     void testSocketChannelReadWrite1() throws Exception {
!         VThreadRunner.run(() -> {
              try (var connection = new Connection()) {
                  SocketChannel sc1 = connection.channel1();
                  SocketChannel sc2 = connection.channel2();
  
                  // write to sc1
--- 70,53 ---
  import java.nio.channels.Pipe;
  import java.nio.channels.ReadableByteChannel;
  import java.nio.channels.ServerSocketChannel;
  import java.nio.channels.SocketChannel;
  import java.nio.channels.WritableByteChannel;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.Executors;
+ import java.util.concurrent.ThreadFactory;
  import java.util.concurrent.locks.LockSupport;
+ import java.util.stream.Stream;
  
  import jdk.test.lib.thread.VThreadRunner;
! import jdk.test.lib.thread.VThreadScheduler;
+ 
+ import org.junit.jupiter.api.BeforeAll;
+ import org.junit.jupiter.params.ParameterizedTest;
+ import org.junit.jupiter.params.provider.MethodSource;
  import static org.junit.jupiter.api.Assertions.*;
  
  class BlockingChannelOps {
+     private static ExecutorService threadPool;
+     private static Thread.VirtualThreadScheduler customScheduler;
+ 
+     @BeforeAll
+     static void setup() {
+         ThreadFactory factory = Thread.ofPlatform().daemon().factory();
+         threadPool = Executors.newCachedThreadPool(factory);
+         customScheduler = (_, task) -> threadPool.execute(task);
+     }
+ 
+     /**
+      * Returns the Thread.Builder to create the virtual thread.
+      */
+     static Stream<Thread.Builder.OfVirtual> threadBuilders() {
+         if (VThreadScheduler.supportsCustomScheduler()) {
+             return Stream.of(Thread.ofVirtual(), Thread.ofVirtual().scheduler(customScheduler));
+         } else {
+             return Stream.of(Thread.ofVirtual());
+         }
+     }
  
      /**
       * SocketChannel read/write, no blocking.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testSocketChannelReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (var connection = new Connection()) {
                  SocketChannel sc1 = connection.channel1();
                  SocketChannel sc2 = connection.channel2();
  
                  // write to sc1

*** 95,13 ***
      }
  
      /**
       * Virtual thread blocks in SocketChannel read.
       */
!     @Test
!     void testSocketChannelRead() throws Exception {
!         VThreadRunner.run(() -> {
              try (var connection = new Connection()) {
                  SocketChannel sc1 = connection.channel1();
                  SocketChannel sc2 = connection.channel2();
  
                  // write to sc1 when current thread blocks in sc2.read
--- 134,14 ---
      }
  
      /**
       * Virtual thread blocks in SocketChannel read.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testSocketChannelRead(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (var connection = new Connection()) {
                  SocketChannel sc1 = connection.channel1();
                  SocketChannel sc2 = connection.channel2();
  
                  // write to sc1 when current thread blocks in sc2.read

*** 118,13 ***
      }
  
      /**
       * Virtual thread blocks in SocketChannel write.
       */
!     @Test
!     void testSocketChannelWrite() throws Exception {
!         VThreadRunner.run(() -> {
              try (var connection = new Connection()) {
                  SocketChannel sc1 = connection.channel1();
                  SocketChannel sc2 = connection.channel2();
  
                  // read from sc2 to EOF when current thread blocks in sc1.write
--- 158,14 ---
      }
  
      /**
       * Virtual thread blocks in SocketChannel write.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testSocketChannelWrite(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (var connection = new Connection()) {
                  SocketChannel sc1 = connection.channel1();
                  SocketChannel sc2 = connection.channel2();
  
                  // read from sc2 to EOF when current thread blocks in sc1.write

*** 146,13 ***
      }
  
      /**
       * SocketChannel close while virtual thread blocked in read.
       */
!     @Test
!     void testSocketChannelReadAsyncClose() throws Exception {
!         VThreadRunner.run(() -> {
              try (var connection = new Connection()) {
                  SocketChannel sc = connection.channel1();
                  runAfterParkedAsync(sc::close);
                  try {
                      int n = sc.read(ByteBuffer.allocate(100));
--- 187,14 ---
      }
  
      /**
       * SocketChannel close while virtual thread blocked in read.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testSocketChannelReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (var connection = new Connection()) {
                  SocketChannel sc = connection.channel1();
                  runAfterParkedAsync(sc::close);
                  try {
                      int n = sc.read(ByteBuffer.allocate(100));

*** 163,13 ***
      }
  
      /**
       * SocketChannel shutdownInput while virtual thread blocked in read.
       */
!     @Test
!     void testSocketChannelReadAsyncShutdownInput() throws Exception {
!         VThreadRunner.run(() -> {
              try (var connection = new Connection()) {
                  SocketChannel sc = connection.channel1();
                  runAfterParkedAsync(sc::shutdownInput);
                  int n = sc.read(ByteBuffer.allocate(100));
                  assertEquals(-1, n);
--- 205,14 ---
      }
  
      /**
       * SocketChannel shutdownInput while virtual thread blocked in read.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testSocketChannelReadAsyncShutdownInput(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (var connection = new Connection()) {
                  SocketChannel sc = connection.channel1();
                  runAfterParkedAsync(sc::shutdownInput);
                  int n = sc.read(ByteBuffer.allocate(100));
                  assertEquals(-1, n);

*** 179,13 ***
      }
  
      /**
       * Virtual thread interrupted while blocked in SocketChannel read.
       */
!     @Test
!     void testSocketChannelReadInterrupt() throws Exception {
!         VThreadRunner.run(() -> {
              try (var connection = new Connection()) {
                  SocketChannel sc = connection.channel1();
  
                  // interrupt current thread when it blocks in read
                  Thread thisThread = Thread.currentThread();
--- 222,14 ---
      }
  
      /**
       * Virtual thread interrupted while blocked in SocketChannel read.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testSocketChannelReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (var connection = new Connection()) {
                  SocketChannel sc = connection.channel1();
  
                  // interrupt current thread when it blocks in read
                  Thread thisThread = Thread.currentThread();

*** 202,13 ***
      }
  
      /**
       * SocketChannel close while virtual thread blocked in write.
       */
!     @Test
!     void testSocketChannelWriteAsyncClose() throws Exception {
!         VThreadRunner.run(() -> {
              boolean done = false;
              while (!done) {
                  try (var connection = new Connection()) {
                      SocketChannel sc = connection.channel1();
  
--- 246,14 ---
      }
  
      /**
       * SocketChannel close while virtual thread blocked in write.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testSocketChannelWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              boolean done = false;
              while (!done) {
                  try (var connection = new Connection()) {
                      SocketChannel sc = connection.channel1();
  

*** 233,17 ***
                  }
              }
          });
      }
  
- 
      /**
       * SocketChannel shutdownOutput while virtual thread blocked in write.
       */
!     @Test
!     void testSocketChannelWriteAsyncShutdownOutput() throws Exception {
!         VThreadRunner.run(() -> {
              try (var connection = new Connection()) {
                  SocketChannel sc = connection.channel1();
  
                  // shutdown output when current thread blocks in write
                  runAfterParkedAsync(sc::shutdownOutput);
--- 278,17 ---
                  }
              }
          });
      }
  
      /**
       * SocketChannel shutdownOutput while virtual thread blocked in write.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testSocketChannelWriteAsyncShutdownOutput(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (var connection = new Connection()) {
                  SocketChannel sc = connection.channel1();
  
                  // shutdown output when current thread blocks in write
                  runAfterParkedAsync(sc::shutdownOutput);

*** 263,13 ***
      }
  
      /**
       * Virtual thread interrupted while blocked in SocketChannel write.
       */
!     @Test
!     void testSocketChannelWriteInterrupt() throws Exception {
!         VThreadRunner.run(() -> {
              boolean done = false;
              while (!done) {
                  try (var connection = new Connection()) {
                      SocketChannel sc = connection.channel1();
  
--- 308,14 ---
      }
  
      /**
       * Virtual thread interrupted while blocked in SocketChannel write.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testSocketChannelWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              boolean done = false;
              while (!done) {
                  try (var connection = new Connection()) {
                      SocketChannel sc = connection.channel1();
  

*** 299,25 ***
      }
  
      /**
       * Virtual thread blocks in SocketChannel adaptor read.
       */
!     @Test
!     void testSocketAdaptorRead1() throws Exception {
!         testSocketAdaptorRead(0);
      }
  
      /**
       * Virtual thread blocks in SocketChannel adaptor read with timeout.
       */
!     @Test
!     void testSocketAdaptorRead2() throws Exception {
!         testSocketAdaptorRead(60_000);
      }
  
!     private void testSocketAdaptorRead(int timeout) throws Exception {
!         VThreadRunner.run(() -> {
              try (var connection = new Connection()) {
                  SocketChannel sc1 = connection.channel1();
                  SocketChannel sc2 = connection.channel2();
  
                  // write to sc1 when currnet thread blocks reading from sc2
--- 345,28 ---
      }
  
      /**
       * Virtual thread blocks in SocketChannel adaptor read.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testSocketAdaptorRead1(Thread.Builder.OfVirtual builder) throws Exception {
+         testSocketAdaptorRead(builder, 0);
      }
  
      /**
       * Virtual thread blocks in SocketChannel adaptor read with timeout.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testSocketAdaptorRead2(Thread.Builder.OfVirtual builder) throws Exception {
+         testSocketAdaptorRead(builder, 60_000);
      }
  
!     private void testSocketAdaptorRead(Thread.Builder.OfVirtual builder,
!                                        int timeout) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (var connection = new Connection()) {
                  SocketChannel sc1 = connection.channel1();
                  SocketChannel sc2 = connection.channel2();
  
                  // write to sc1 when currnet thread blocks reading from sc2

*** 336,13 ***
      }
  
      /**
       * ServerSocketChannel accept, no blocking.
       */
!     @Test
!     void testServerSocketChannelAccept1() throws Exception {
!         VThreadRunner.run(() -> {
              try (var ssc = ServerSocketChannel.open()) {
                  ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
                  var sc1 = SocketChannel.open(ssc.getLocalAddress());
                  // accept should not block
                  var sc2 = ssc.accept();
--- 385,14 ---
      }
  
      /**
       * ServerSocketChannel accept, no blocking.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testServerSocketChannelAccept1(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (var ssc = ServerSocketChannel.open()) {
                  ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
                  var sc1 = SocketChannel.open(ssc.getLocalAddress());
                  // accept should not block
                  var sc2 = ssc.accept();

*** 353,13 ***
      }
  
      /**
       * Virtual thread blocks in ServerSocketChannel accept.
       */
!     @Test
!     void testServerSocketChannelAccept2() throws Exception {
!         VThreadRunner.run(() -> {
              try (var ssc = ServerSocketChannel.open()) {
                  ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
                  var sc1 = SocketChannel.open();
  
                  // connect when current thread when it blocks in accept
--- 403,14 ---
      }
  
      /**
       * Virtual thread blocks in ServerSocketChannel accept.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testServerSocketChannelAccept2(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (var ssc = ServerSocketChannel.open()) {
                  ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
                  var sc1 = SocketChannel.open();
  
                  // connect when current thread when it blocks in accept

*** 374,13 ***
      }
  
      /**
       * SeverSocketChannel close while virtual thread blocked in accept.
       */
!     @Test
!     void testServerSocketChannelAcceptAsyncClose() throws Exception {
!         VThreadRunner.run(() -> {
              try (var ssc = ServerSocketChannel.open()) {
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  ssc.bind(new InetSocketAddress(lh, 0));
                  runAfterParkedAsync(ssc::close);
                  try {
--- 425,14 ---
      }
  
      /**
       * SeverSocketChannel close while virtual thread blocked in accept.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testServerSocketChannelAcceptAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (var ssc = ServerSocketChannel.open()) {
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  ssc.bind(new InetSocketAddress(lh, 0));
                  runAfterParkedAsync(ssc::close);
                  try {

*** 393,13 ***
      }
  
      /**
       * Virtual thread interrupted while blocked in ServerSocketChannel accept.
       */
!     @Test
!     void testServerSocketChannelAcceptInterrupt() throws Exception {
!         VThreadRunner.run(() -> {
              try (var ssc = ServerSocketChannel.open()) {
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  ssc.bind(new InetSocketAddress(lh, 0));
  
                  // interrupt current thread when it blocks in accept
--- 445,14 ---
      }
  
      /**
       * Virtual thread interrupted while blocked in ServerSocketChannel accept.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testServerSocketChannelAcceptInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (var ssc = ServerSocketChannel.open()) {
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  ssc.bind(new InetSocketAddress(lh, 0));
  
                  // interrupt current thread when it blocks in accept

*** 418,25 ***
      }
  
      /**
       * Virtual thread blocks in ServerSocketChannel adaptor accept.
       */
!     @Test
!     void testSocketChannelAdaptorAccept1() throws Exception {
!         testSocketChannelAdaptorAccept(0);
      }
  
      /**
       * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
       */
!     @Test
!     void testSocketChannelAdaptorAccept2() throws Exception {
!         testSocketChannelAdaptorAccept(60_000);
      }
  
!     private void testSocketChannelAdaptorAccept(int timeout) throws Exception {
!         VThreadRunner.run(() -> {
              try (var ssc = ServerSocketChannel.open()) {
                  ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
                  var sc = SocketChannel.open();
  
                  // interrupt current thread when it blocks in accept
--- 471,28 ---
      }
  
      /**
       * Virtual thread blocks in ServerSocketChannel adaptor accept.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testSocketChannelAdaptorAccept1(Thread.Builder.OfVirtual builder) throws Exception {
+         testSocketChannelAdaptorAccept(builder, 0);
      }
  
      /**
       * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testSocketChannelAdaptorAccept2(Thread.Builder.OfVirtual builder) throws Exception {
+         testSocketChannelAdaptorAccept(builder, 60_000);
      }
  
!     private void testSocketChannelAdaptorAccept(Thread.Builder.OfVirtual builder,
!                                                 int timeout) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (var ssc = ServerSocketChannel.open()) {
                  ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
                  var sc = SocketChannel.open();
  
                  // interrupt current thread when it blocks in accept

*** 453,13 ***
      }
  
      /**
       * DatagramChannel receive/send, no blocking.
       */
!     @Test
!     void testDatagramChannelSendReceive1() throws Exception {
!         VThreadRunner.run(() -> {
              try (DatagramChannel dc1 = DatagramChannel.open();
                   DatagramChannel dc2 = DatagramChannel.open()) {
  
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  dc2.bind(new InetSocketAddress(lh, 0));
--- 509,14 ---
      }
  
      /**
       * DatagramChannel receive/send, no blocking.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testDatagramChannelSendReceive1(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (DatagramChannel dc1 = DatagramChannel.open();
                   DatagramChannel dc2 = DatagramChannel.open()) {
  
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  dc2.bind(new InetSocketAddress(lh, 0));

*** 478,13 ***
      }
  
      /**
       * Virtual thread blocks in DatagramChannel receive.
       */
!     @Test
!     void testDatagramChannelSendReceive2() throws Exception {
!         VThreadRunner.run(() -> {
              try (DatagramChannel dc1 = DatagramChannel.open();
                   DatagramChannel dc2 = DatagramChannel.open()) {
  
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  dc2.bind(new InetSocketAddress(lh, 0));
--- 535,14 ---
      }
  
      /**
       * Virtual thread blocks in DatagramChannel receive.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testDatagramChannelSendReceive2(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (DatagramChannel dc1 = DatagramChannel.open();
                   DatagramChannel dc2 = DatagramChannel.open()) {
  
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  dc2.bind(new InetSocketAddress(lh, 0));

*** 502,13 ***
      }
  
      /**
       * DatagramChannel close while virtual thread blocked in receive.
       */
!     @Test
!     void testDatagramChannelReceiveAsyncClose() throws Exception {
!         VThreadRunner.run(() -> {
              try (DatagramChannel dc = DatagramChannel.open()) {
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  dc.bind(new InetSocketAddress(lh, 0));
                  runAfterParkedAsync(dc::close);
                  try {
--- 560,14 ---
      }
  
      /**
       * DatagramChannel close while virtual thread blocked in receive.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testDatagramChannelReceiveAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (DatagramChannel dc = DatagramChannel.open()) {
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  dc.bind(new InetSocketAddress(lh, 0));
                  runAfterParkedAsync(dc::close);
                  try {

*** 520,13 ***
      }
  
      /**
       * Virtual thread interrupted while blocked in DatagramChannel receive.
       */
!     @Test
!     void testDatagramChannelReceiveInterrupt() throws Exception {
!         VThreadRunner.run(() -> {
              try (DatagramChannel dc = DatagramChannel.open()) {
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  dc.bind(new InetSocketAddress(lh, 0));
  
                  // interrupt current thread when it blocks in receive
--- 579,14 ---
      }
  
      /**
       * Virtual thread interrupted while blocked in DatagramChannel receive.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testDatagramChannelReceiveInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (DatagramChannel dc = DatagramChannel.open()) {
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  dc.bind(new InetSocketAddress(lh, 0));
  
                  // interrupt current thread when it blocks in receive

*** 544,25 ***
      }
  
      /**
       * Virtual thread blocks in DatagramSocket adaptor receive.
       */
!     @Test
!     void testDatagramSocketAdaptorReceive1() throws Exception {
!         testDatagramSocketAdaptorReceive(0);
      }
  
      /**
       * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
       */
!     @Test
!     void testDatagramSocketAdaptorReceive2() throws Exception {
!         testDatagramSocketAdaptorReceive(60_000);
      }
  
!     private void testDatagramSocketAdaptorReceive(int timeout) throws Exception {
!         VThreadRunner.run(() -> {
              try (DatagramChannel dc1 = DatagramChannel.open();
                   DatagramChannel dc2 = DatagramChannel.open()) {
  
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  dc2.bind(new InetSocketAddress(lh, 0));
--- 604,28 ---
      }
  
      /**
       * Virtual thread blocks in DatagramSocket adaptor receive.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testDatagramSocketAdaptorReceive1(Thread.Builder.OfVirtual builder) throws Exception {
+         testDatagramSocketAdaptorReceive(builder, 0);
      }
  
      /**
       * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testDatagramSocketAdaptorReceive2(Thread.Builder.OfVirtual builder) throws Exception {
+         testDatagramSocketAdaptorReceive(builder, 60_000);
      }
  
!     private void testDatagramSocketAdaptorReceive(Thread.Builder.OfVirtual builder,
!                                                   int timeout) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (DatagramChannel dc1 = DatagramChannel.open();
                   DatagramChannel dc2 = DatagramChannel.open()) {
  
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  dc2.bind(new InetSocketAddress(lh, 0));

*** 583,26 ***
      }
  
      /**
       * DatagramChannel close while virtual thread blocked in adaptor receive.
       */
!     @Test
!     void testDatagramSocketAdaptorReceiveAsyncClose1() throws Exception {
!         testDatagramSocketAdaptorReceiveAsyncClose(0);
      }
  
      /**
       * DatagramChannel close while virtual thread blocked in adaptor receive
       * with timeout.
       */
!     @Test
!     void testDatagramSocketAdaptorReceiveAsyncClose2() throws Exception {
!         testDatagramSocketAdaptorReceiveAsyncClose(60_1000);
      }
  
!     private void testDatagramSocketAdaptorReceiveAsyncClose(int timeout) throws Exception {
!         VThreadRunner.run(() -> {
              try (DatagramChannel dc = DatagramChannel.open()) {
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  dc.bind(new InetSocketAddress(lh, 0));
  
                  byte[] array = new byte[100];
--- 646,29 ---
      }
  
      /**
       * DatagramChannel close while virtual thread blocked in adaptor receive.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testDatagramSocketAdaptorReceiveAsyncClose1(Thread.Builder.OfVirtual builder) throws Exception {
+         testDatagramSocketAdaptorReceiveAsyncClose(builder, 0);
      }
  
      /**
       * DatagramChannel close while virtual thread blocked in adaptor receive
       * with timeout.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testDatagramSocketAdaptorReceiveAsyncClose2(Thread.Builder.OfVirtual builder) throws Exception {
+         testDatagramSocketAdaptorReceiveAsyncClose(builder, 60_1000);
      }
  
!     private void testDatagramSocketAdaptorReceiveAsyncClose(Thread.Builder.OfVirtual builder,
!                                                             int timeout) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (DatagramChannel dc = DatagramChannel.open()) {
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  dc.bind(new InetSocketAddress(lh, 0));
  
                  byte[] array = new byte[100];

*** 619,26 ***
      }
  
      /**
       * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
       */
!     @Test
!     void testDatagramSocketAdaptorReceiveInterrupt1() throws Exception {
!         testDatagramSocketAdaptorReceiveInterrupt(0);
      }
  
      /**
       * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
       * with timeout.
       */
!     @Test
!     void testDatagramSocketAdaptorReceiveInterrupt2() throws Exception {
!         testDatagramSocketAdaptorReceiveInterrupt(60_1000);
      }
  
!     private void testDatagramSocketAdaptorReceiveInterrupt(int timeout) throws Exception {
!         VThreadRunner.run(() -> {
              try (DatagramChannel dc = DatagramChannel.open()) {
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  dc.bind(new InetSocketAddress(lh, 0));
  
                  byte[] array = new byte[100];
--- 685,29 ---
      }
  
      /**
       * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testDatagramSocketAdaptorReceiveInterrupt1(Thread.Builder.OfVirtual builder) throws Exception {
+         testDatagramSocketAdaptorReceiveInterrupt(builder, 0);
      }
  
      /**
       * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
       * with timeout.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testDatagramSocketAdaptorReceiveInterrupt2(Thread.Builder.OfVirtual builder) throws Exception {
+         testDatagramSocketAdaptorReceiveInterrupt(builder, 60_1000);
      }
  
!     private void testDatagramSocketAdaptorReceiveInterrupt(Thread.Builder.OfVirtual builder,
!                                                            int timeout) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (DatagramChannel dc = DatagramChannel.open()) {
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  dc.bind(new InetSocketAddress(lh, 0));
  
                  byte[] array = new byte[100];

*** 661,13 ***
      }
  
      /**
       * Pipe read/write, no blocking.
       */
!     @Test
!     void testPipeReadWrite1() throws Exception {
!         VThreadRunner.run(() -> {
              Pipe p = Pipe.open();
              try (Pipe.SinkChannel sink = p.sink();
                   Pipe.SourceChannel source = p.source()) {
  
                  // write should not block
--- 730,14 ---
      }
  
      /**
       * Pipe read/write, no blocking.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testPipeReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              Pipe p = Pipe.open();
              try (Pipe.SinkChannel sink = p.sink();
                   Pipe.SourceChannel source = p.source()) {
  
                  // write should not block

*** 685,13 ***
      }
  
      /**
       * Virtual thread blocks in Pipe.SourceChannel read.
       */
!     @Test
!     void testPipeReadWrite2() throws Exception {
!         VThreadRunner.run(() -> {
              Pipe p = Pipe.open();
              try (Pipe.SinkChannel sink = p.sink();
                   Pipe.SourceChannel source = p.source()) {
  
                  // write from sink when current thread blocks reading from source
--- 755,14 ---
      }
  
      /**
       * Virtual thread blocks in Pipe.SourceChannel read.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testPipeReadWrite2(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              Pipe p = Pipe.open();
              try (Pipe.SinkChannel sink = p.sink();
                   Pipe.SourceChannel source = p.source()) {
  
                  // write from sink when current thread blocks reading from source

*** 708,13 ***
      }
  
      /**
       * Virtual thread blocks in Pipe.SinkChannel write.
       */
!     @Test
!     void testPipeReadWrite3() throws Exception {
!         VThreadRunner.run(() -> {
              Pipe p = Pipe.open();
              try (Pipe.SinkChannel sink = p.sink();
                   Pipe.SourceChannel source = p.source()) {
  
                  // read from source to EOF when current thread blocking in write
--- 779,14 ---
      }
  
      /**
       * Virtual thread blocks in Pipe.SinkChannel write.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testPipeReadWrite3(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              Pipe p = Pipe.open();
              try (Pipe.SinkChannel sink = p.sink();
                   Pipe.SourceChannel source = p.source()) {
  
                  // read from source to EOF when current thread blocking in write

*** 736,13 ***
      }
  
      /**
       * Pipe.SourceChannel close while virtual thread blocked in read.
       */
!     @Test
!     void testPipeReadAsyncClose() throws Exception {
!         VThreadRunner.run(() -> {
              Pipe p = Pipe.open();
              try (Pipe.SinkChannel sink = p.sink();
                   Pipe.SourceChannel source = p.source()) {
                  runAfterParkedAsync(source::close);
                  try {
--- 808,14 ---
      }
  
      /**
       * Pipe.SourceChannel close while virtual thread blocked in read.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testPipeReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              Pipe p = Pipe.open();
              try (Pipe.SinkChannel sink = p.sink();
                   Pipe.SourceChannel source = p.source()) {
                  runAfterParkedAsync(source::close);
                  try {

*** 754,13 ***
      }
  
      /**
       * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
       */
!     @Test
!     void testPipeReadInterrupt() throws Exception {
!         VThreadRunner.run(() -> {
              Pipe p = Pipe.open();
              try (Pipe.SinkChannel sink = p.sink();
                   Pipe.SourceChannel source = p.source()) {
  
                  // interrupt current thread when it blocks reading from source
--- 827,14 ---
      }
  
      /**
       * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testPipeReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              Pipe p = Pipe.open();
              try (Pipe.SinkChannel sink = p.sink();
                   Pipe.SourceChannel source = p.source()) {
  
                  // interrupt current thread when it blocks reading from source

*** 778,13 ***
      }
  
      /**
       * Pipe.SinkChannel close while virtual thread blocked in write.
       */
!     @Test
!     void testPipeWriteAsyncClose() throws Exception {
!         VThreadRunner.run(() -> {
              boolean done = false;
              while (!done) {
                  Pipe p = Pipe.open();
                  try (Pipe.SinkChannel sink = p.sink();
                       Pipe.SourceChannel source = p.source()) {
--- 852,14 ---
      }
  
      /**
       * Pipe.SinkChannel close while virtual thread blocked in write.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testPipeWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              boolean done = false;
              while (!done) {
                  Pipe p = Pipe.open();
                  try (Pipe.SinkChannel sink = p.sink();
                       Pipe.SourceChannel source = p.source()) {

*** 813,13 ***
      }
  
      /**
       * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
       */
!     @Test
!     void testPipeWriteInterrupt() throws Exception {
!         VThreadRunner.run(() -> {
              boolean done = false;
              while (!done) {
                  Pipe p = Pipe.open();
                  try (Pipe.SinkChannel sink = p.sink();
                       Pipe.SourceChannel source = p.source()) {
--- 888,14 ---
      }
  
      /**
       * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testPipeWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              boolean done = false;
              while (!done) {
                  Pipe p = Pipe.open();
                  try (Pipe.SinkChannel sink = p.sink();
                       Pipe.SourceChannel source = p.source()) {
< prev index next >