< prev index next >

test/jdk/java/nio/channels/vthread/BlockingChannelOps.java

Print this page
*** 60,23 ***
  import java.nio.channels.Pipe;
  import java.nio.channels.ReadableByteChannel;
  import java.nio.channels.ServerSocketChannel;
  import java.nio.channels.SocketChannel;
  import java.nio.channels.WritableByteChannel;
  
  import jdk.test.lib.thread.VThreadRunner;
! import org.junit.jupiter.api.Test;
  import static org.junit.jupiter.api.Assertions.*;
  
  class BlockingChannelOps {
  
      /**
       * SocketChannel read/write, no blocking.
       */
!     @Test
!     void testSocketChannelReadWrite1() throws Exception {
!         VThreadRunner.run(() -> {
              try (var connection = new Connection()) {
                  SocketChannel sc1 = connection.channel1();
                  SocketChannel sc2 = connection.channel2();
  
                  // write to sc1
--- 60,55 ---
  import java.nio.channels.Pipe;
  import java.nio.channels.ReadableByteChannel;
  import java.nio.channels.ServerSocketChannel;
  import java.nio.channels.SocketChannel;
  import java.nio.channels.WritableByteChannel;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.Executors;
+ import java.util.stream.Stream;
  
  import jdk.test.lib.thread.VThreadRunner;
! import jdk.test.lib.thread.VThreadScheduler;
+ 
+ import org.junit.jupiter.api.BeforeAll;
+ import org.junit.jupiter.api.AfterAll;
+ import org.junit.jupiter.params.ParameterizedTest;
+ import org.junit.jupiter.params.provider.MethodSource;
  import static org.junit.jupiter.api.Assertions.*;
  
  class BlockingChannelOps {
  
+     private static ExecutorService customScheduler;
+ 
+     @BeforeAll
+     static void setup() {
+         customScheduler = Executors.newCachedThreadPool();
+     }
+ 
+     @AfterAll
+     static void finish() {
+         customScheduler.shutdown();
+     }
+ 
+     /**
+      * Returns the Thread.Builder to create the virtual thread.
+      */
+     static Stream<Thread.Builder.OfVirtual> threadBuilders() {
+         if (VThreadScheduler.supportsCustomScheduler()) {
+             return Stream.of(Thread.ofVirtual(), Thread.ofVirtual().scheduler(customScheduler));
+         } else {
+             return Stream.of(Thread.ofVirtual());
+         }
+     }
+ 
      /**
       * SocketChannel read/write, no blocking.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testSocketChannelReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (var connection = new Connection()) {
                  SocketChannel sc1 = connection.channel1();
                  SocketChannel sc2 = connection.channel2();
  
                  // write to sc1

*** 94,13 ***
      }
  
      /**
       * Virtual thread blocks in SocketChannel read.
       */
!     @Test
!     void testSocketChannelRead() throws Exception {
!         VThreadRunner.run(() -> {
              try (var connection = new Connection()) {
                  SocketChannel sc1 = connection.channel1();
                  SocketChannel sc2 = connection.channel2();
  
                  // write to sc1 when current thread blocks in sc2.read
--- 126,14 ---
      }
  
      /**
       * Virtual thread blocks in SocketChannel read.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testSocketChannelRead(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (var connection = new Connection()) {
                  SocketChannel sc1 = connection.channel1();
                  SocketChannel sc2 = connection.channel2();
  
                  // write to sc1 when current thread blocks in sc2.read

*** 117,13 ***
      }
  
      /**
       * Virtual thread blocks in SocketChannel write.
       */
!     @Test
!     void testSocketChannelWrite() throws Exception {
!         VThreadRunner.run(() -> {
              try (var connection = new Connection()) {
                  SocketChannel sc1 = connection.channel1();
                  SocketChannel sc2 = connection.channel2();
  
                  // read from sc2 to EOF when current thread blocks in sc1.write
--- 150,14 ---
      }
  
      /**
       * Virtual thread blocks in SocketChannel write.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testSocketChannelWrite(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (var connection = new Connection()) {
                  SocketChannel sc1 = connection.channel1();
                  SocketChannel sc2 = connection.channel2();
  
                  // read from sc2 to EOF when current thread blocks in sc1.write

*** 145,13 ***
      }
  
      /**
       * SocketChannel close while virtual thread blocked in read.
       */
!     @Test
!     void testSocketChannelReadAsyncClose() throws Exception {
!         VThreadRunner.run(() -> {
              try (var connection = new Connection()) {
                  SocketChannel sc = connection.channel1();
                  runAfterParkedAsync(sc::close);
                  try {
                      int n = sc.read(ByteBuffer.allocate(100));
--- 179,14 ---
      }
  
      /**
       * SocketChannel close while virtual thread blocked in read.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testSocketChannelReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (var connection = new Connection()) {
                  SocketChannel sc = connection.channel1();
                  runAfterParkedAsync(sc::close);
                  try {
                      int n = sc.read(ByteBuffer.allocate(100));

*** 159,16 ***
                  } catch (AsynchronousCloseException expected) { }
              }
          });
      }
  
      /**
       * Virtual thread interrupted while blocked in SocketChannel read.
       */
!     @Test
!     void testSocketChannelReadInterrupt() throws Exception {
!         VThreadRunner.run(() -> {
              try (var connection = new Connection()) {
                  SocketChannel sc = connection.channel1();
  
                  // interrupt current thread when it blocks in read
                  Thread thisThread = Thread.currentThread();
--- 194,34 ---
                  } catch (AsynchronousCloseException expected) { }
              }
          });
      }
  
+     /**
+      * SocketChannel shutdownInput while virtual thread blocked in read.
+      */
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testSocketChannelReadAsyncShutdownInput(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
+             try (var connection = new Connection()) {
+                 SocketChannel sc = connection.channel1();
+                 runAfterParkedAsync(sc::shutdownInput);
+                 int n = sc.read(ByteBuffer.allocate(100));
+                 assertEquals(-1, n);
+                 assertTrue(sc.isOpen());
+             }
+         });
+     }
+ 
      /**
       * Virtual thread interrupted while blocked in SocketChannel read.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testSocketChannelReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (var connection = new Connection()) {
                  SocketChannel sc = connection.channel1();
  
                  // interrupt current thread when it blocks in read
                  Thread thisThread = Thread.currentThread();

*** 185,13 ***
      }
  
      /**
       * SocketChannel close while virtual thread blocked in write.
       */
!     @Test
!     void testSocketChannelWriteAsyncClose() throws Exception {
!         VThreadRunner.run(() -> {
              boolean retry = true;
              while (retry) {
                  try (var connection = new Connection()) {
                      SocketChannel sc = connection.channel1();
  
--- 238,14 ---
      }
  
      /**
       * SocketChannel close while virtual thread blocked in write.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testSocketChannelWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              boolean retry = true;
              while (retry) {
                  try (var connection = new Connection()) {
                      SocketChannel sc = connection.channel1();
  

*** 213,16 ***
                  }
              }
          });
      }
  
      /**
       * Virtual thread interrupted while blocked in SocketChannel write.
       */
!     @Test
!     void testSocketChannelWriteInterrupt() throws Exception {
!         VThreadRunner.run(() -> {
              boolean retry = true;
              while (retry) {
                  try (var connection = new Connection()) {
                      SocketChannel sc = connection.channel1();
  
--- 267,45 ---
                  }
              }
          });
      }
  
+ 
+     /**
+      * SocketChannel shutdownOutput while virtual thread blocked in write.
+      */
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testSocketChannelWriteAsyncShutdownOutput(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
+             try (var connection = new Connection()) {
+                 SocketChannel sc = connection.channel1();
+ 
+                 // shutdown output when current thread blocks in write
+                 runAfterParkedAsync(sc::shutdownOutput);
+                 try {
+                     ByteBuffer bb = ByteBuffer.allocate(100*1024);
+                     for (;;) {
+                         int n = sc.write(bb);
+                         assertTrue(n > 0);
+                         bb.clear();
+                     }
+                 } catch (ClosedChannelException e) {
+                     // expected
+                 }
+                 assertTrue(sc.isOpen());
+             }
+         });
+     }
+ 
      /**
       * Virtual thread interrupted while blocked in SocketChannel write.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testSocketChannelWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              boolean retry = true;
              while (retry) {
                  try (var connection = new Connection()) {
                      SocketChannel sc = connection.channel1();
  

*** 250,25 ***
      }
  
      /**
       * Virtual thread blocks in SocketChannel adaptor read.
       */
!     @Test
!     void testSocketAdaptorRead1() throws Exception {
!         testSocketAdaptorRead(0);
      }
  
      /**
       * Virtual thread blocks in SocketChannel adaptor read with timeout.
       */
!     @Test
!     void testSocketAdaptorRead2() throws Exception {
!         testSocketAdaptorRead(60_000);
      }
  
!     private void testSocketAdaptorRead(int timeout) throws Exception {
!         VThreadRunner.run(() -> {
              try (var connection = new Connection()) {
                  SocketChannel sc1 = connection.channel1();
                  SocketChannel sc2 = connection.channel2();
  
                  // write to sc1 when currnet thread blocks reading from sc2
--- 333,28 ---
      }
  
      /**
       * Virtual thread blocks in SocketChannel adaptor read.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testSocketAdaptorRead1(Thread.Builder.OfVirtual builder) throws Exception {
+         testSocketAdaptorRead(builder, 0);
      }
  
      /**
       * Virtual thread blocks in SocketChannel adaptor read with timeout.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testSocketAdaptorRead2(Thread.Builder.OfVirtual builder) throws Exception {
+         testSocketAdaptorRead(builder, 60_000);
      }
  
!     private void testSocketAdaptorRead(Thread.Builder.OfVirtual builder,
!                                        int timeout) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (var connection = new Connection()) {
                  SocketChannel sc1 = connection.channel1();
                  SocketChannel sc2 = connection.channel2();
  
                  // write to sc1 when currnet thread blocks reading from sc2

*** 287,13 ***
      }
  
      /**
       * ServerSocketChannel accept, no blocking.
       */
!     @Test
!     void testServerSocketChannelAccept1() throws Exception {
!         VThreadRunner.run(() -> {
              try (var ssc = ServerSocketChannel.open()) {
                  ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
                  var sc1 = SocketChannel.open(ssc.getLocalAddress());
                  // accept should not block
                  var sc2 = ssc.accept();
--- 373,14 ---
      }
  
      /**
       * ServerSocketChannel accept, no blocking.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testServerSocketChannelAccept1(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (var ssc = ServerSocketChannel.open()) {
                  ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
                  var sc1 = SocketChannel.open(ssc.getLocalAddress());
                  // accept should not block
                  var sc2 = ssc.accept();

*** 304,13 ***
      }
  
      /**
       * Virtual thread blocks in ServerSocketChannel accept.
       */
!     @Test
!     void testServerSocketChannelAccept2() throws Exception {
!         VThreadRunner.run(() -> {
              try (var ssc = ServerSocketChannel.open()) {
                  ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
                  var sc1 = SocketChannel.open();
  
                  // connect when current thread when it blocks in accept
--- 391,14 ---
      }
  
      /**
       * Virtual thread blocks in ServerSocketChannel accept.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testServerSocketChannelAccept2(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (var ssc = ServerSocketChannel.open()) {
                  ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
                  var sc1 = SocketChannel.open();
  
                  // connect when current thread when it blocks in accept

*** 325,13 ***
      }
  
      /**
       * SeverSocketChannel close while virtual thread blocked in accept.
       */
!     @Test
!     void testServerSocketChannelAcceptAsyncClose() throws Exception {
!         VThreadRunner.run(() -> {
              try (var ssc = ServerSocketChannel.open()) {
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  ssc.bind(new InetSocketAddress(lh, 0));
                  runAfterParkedAsync(ssc::close);
                  try {
--- 413,14 ---
      }
  
      /**
       * SeverSocketChannel close while virtual thread blocked in accept.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testServerSocketChannelAcceptAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (var ssc = ServerSocketChannel.open()) {
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  ssc.bind(new InetSocketAddress(lh, 0));
                  runAfterParkedAsync(ssc::close);
                  try {

*** 344,13 ***
      }
  
      /**
       * Virtual thread interrupted while blocked in ServerSocketChannel accept.
       */
!     @Test
!     void testServerSocketChannelAcceptInterrupt() throws Exception {
!         VThreadRunner.run(() -> {
              try (var ssc = ServerSocketChannel.open()) {
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  ssc.bind(new InetSocketAddress(lh, 0));
  
                  // interrupt current thread when it blocks in accept
--- 433,14 ---
      }
  
      /**
       * Virtual thread interrupted while blocked in ServerSocketChannel accept.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testServerSocketChannelAcceptInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (var ssc = ServerSocketChannel.open()) {
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  ssc.bind(new InetSocketAddress(lh, 0));
  
                  // interrupt current thread when it blocks in accept

*** 369,25 ***
      }
  
      /**
       * Virtual thread blocks in ServerSocketChannel adaptor accept.
       */
!     @Test
!     void testSocketChannelAdaptorAccept1() throws Exception {
!         testSocketChannelAdaptorAccept(0);
      }
  
      /**
       * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
       */
!     @Test
!     void testSocketChannelAdaptorAccept2() throws Exception {
!         testSocketChannelAdaptorAccept(60_000);
      }
  
!     private void testSocketChannelAdaptorAccept(int timeout) throws Exception {
!         VThreadRunner.run(() -> {
              try (var ssc = ServerSocketChannel.open()) {
                  ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
                  var sc = SocketChannel.open();
  
                  // interrupt current thread when it blocks in accept
--- 459,28 ---
      }
  
      /**
       * Virtual thread blocks in ServerSocketChannel adaptor accept.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testSocketChannelAdaptorAccept1(Thread.Builder.OfVirtual builder) throws Exception {
+         testSocketChannelAdaptorAccept(builder, 0);
      }
  
      /**
       * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testSocketChannelAdaptorAccept2(Thread.Builder.OfVirtual builder) throws Exception {
+         testSocketChannelAdaptorAccept(builder, 60_000);
      }
  
!     private void testSocketChannelAdaptorAccept(Thread.Builder.OfVirtual builder,
!                                                 int timeout) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (var ssc = ServerSocketChannel.open()) {
                  ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
                  var sc = SocketChannel.open();
  
                  // interrupt current thread when it blocks in accept

*** 404,13 ***
      }
  
      /**
       * DatagramChannel receive/send, no blocking.
       */
!     @Test
!     void testDatagramChannelSendReceive1() throws Exception {
!         VThreadRunner.run(() -> {
              try (DatagramChannel dc1 = DatagramChannel.open();
                   DatagramChannel dc2 = DatagramChannel.open()) {
  
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  dc2.bind(new InetSocketAddress(lh, 0));
--- 497,14 ---
      }
  
      /**
       * DatagramChannel receive/send, no blocking.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testDatagramChannelSendReceive1(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (DatagramChannel dc1 = DatagramChannel.open();
                   DatagramChannel dc2 = DatagramChannel.open()) {
  
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  dc2.bind(new InetSocketAddress(lh, 0));

*** 429,13 ***
      }
  
      /**
       * Virtual thread blocks in DatagramChannel receive.
       */
!     @Test
!     void testDatagramChannelSendReceive2() throws Exception {
!         VThreadRunner.run(() -> {
              try (DatagramChannel dc1 = DatagramChannel.open();
                   DatagramChannel dc2 = DatagramChannel.open()) {
  
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  dc2.bind(new InetSocketAddress(lh, 0));
--- 523,14 ---
      }
  
      /**
       * Virtual thread blocks in DatagramChannel receive.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testDatagramChannelSendReceive2(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (DatagramChannel dc1 = DatagramChannel.open();
                   DatagramChannel dc2 = DatagramChannel.open()) {
  
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  dc2.bind(new InetSocketAddress(lh, 0));

*** 453,13 ***
      }
  
      /**
       * DatagramChannel close while virtual thread blocked in receive.
       */
!     @Test
!     void testDatagramChannelReceiveAsyncClose() throws Exception {
!         VThreadRunner.run(() -> {
              try (DatagramChannel dc = DatagramChannel.open()) {
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  dc.bind(new InetSocketAddress(lh, 0));
                  runAfterParkedAsync(dc::close);
                  try {
--- 548,14 ---
      }
  
      /**
       * DatagramChannel close while virtual thread blocked in receive.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testDatagramChannelReceiveAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (DatagramChannel dc = DatagramChannel.open()) {
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  dc.bind(new InetSocketAddress(lh, 0));
                  runAfterParkedAsync(dc::close);
                  try {

*** 471,13 ***
      }
  
      /**
       * Virtual thread interrupted while blocked in DatagramChannel receive.
       */
!     @Test
!     void testDatagramChannelReceiveInterrupt() throws Exception {
!         VThreadRunner.run(() -> {
              try (DatagramChannel dc = DatagramChannel.open()) {
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  dc.bind(new InetSocketAddress(lh, 0));
  
                  // interrupt current thread when it blocks in receive
--- 567,14 ---
      }
  
      /**
       * Virtual thread interrupted while blocked in DatagramChannel receive.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testDatagramChannelReceiveInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (DatagramChannel dc = DatagramChannel.open()) {
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  dc.bind(new InetSocketAddress(lh, 0));
  
                  // interrupt current thread when it blocks in receive

*** 495,25 ***
      }
  
      /**
       * Virtual thread blocks in DatagramSocket adaptor receive.
       */
!     @Test
!     void testDatagramSocketAdaptorReceive1() throws Exception {
!         testDatagramSocketAdaptorReceive(0);
      }
  
      /**
       * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
       */
!     @Test
!     void testDatagramSocketAdaptorReceive2() throws Exception {
!         testDatagramSocketAdaptorReceive(60_000);
      }
  
!     private void testDatagramSocketAdaptorReceive(int timeout) throws Exception {
!         VThreadRunner.run(() -> {
              try (DatagramChannel dc1 = DatagramChannel.open();
                   DatagramChannel dc2 = DatagramChannel.open()) {
  
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  dc2.bind(new InetSocketAddress(lh, 0));
--- 592,28 ---
      }
  
      /**
       * Virtual thread blocks in DatagramSocket adaptor receive.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testDatagramSocketAdaptorReceive1(Thread.Builder.OfVirtual builder) throws Exception {
+         testDatagramSocketAdaptorReceive(builder, 0);
      }
  
      /**
       * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testDatagramSocketAdaptorReceive2(Thread.Builder.OfVirtual builder) throws Exception {
+         testDatagramSocketAdaptorReceive(builder, 60_000);
      }
  
!     private void testDatagramSocketAdaptorReceive(Thread.Builder.OfVirtual builder,
!                                                   int timeout) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (DatagramChannel dc1 = DatagramChannel.open();
                   DatagramChannel dc2 = DatagramChannel.open()) {
  
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  dc2.bind(new InetSocketAddress(lh, 0));

*** 534,26 ***
      }
  
      /**
       * DatagramChannel close while virtual thread blocked in adaptor receive.
       */
!     @Test
!     void testDatagramSocketAdaptorReceiveAsyncClose1() throws Exception {
!         testDatagramSocketAdaptorReceiveAsyncClose(0);
      }
  
      /**
       * DatagramChannel close while virtual thread blocked in adaptor receive
       * with timeout.
       */
!     @Test
!     void testDatagramSocketAdaptorReceiveAsyncClose2() throws Exception {
!         testDatagramSocketAdaptorReceiveAsyncClose(60_1000);
      }
  
!     private void testDatagramSocketAdaptorReceiveAsyncClose(int timeout) throws Exception {
!         VThreadRunner.run(() -> {
              try (DatagramChannel dc = DatagramChannel.open()) {
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  dc.bind(new InetSocketAddress(lh, 0));
  
                  byte[] array = new byte[100];
--- 634,29 ---
      }
  
      /**
       * DatagramChannel close while virtual thread blocked in adaptor receive.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testDatagramSocketAdaptorReceiveAsyncClose1(Thread.Builder.OfVirtual builder) throws Exception {
+         testDatagramSocketAdaptorReceiveAsyncClose(builder, 0);
      }
  
      /**
       * DatagramChannel close while virtual thread blocked in adaptor receive
       * with timeout.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testDatagramSocketAdaptorReceiveAsyncClose2(Thread.Builder.OfVirtual builder) throws Exception {
+         testDatagramSocketAdaptorReceiveAsyncClose(builder, 60_1000);
      }
  
!     private void testDatagramSocketAdaptorReceiveAsyncClose(Thread.Builder.OfVirtual builder,
!                                                             int timeout) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (DatagramChannel dc = DatagramChannel.open()) {
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  dc.bind(new InetSocketAddress(lh, 0));
  
                  byte[] array = new byte[100];

*** 570,26 ***
      }
  
      /**
       * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
       */
!     @Test
!     void testDatagramSocketAdaptorReceiveInterrupt1() throws Exception {
!         testDatagramSocketAdaptorReceiveInterrupt(0);
      }
  
      /**
       * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
       * with timeout.
       */
!     @Test
!     void testDatagramSocketAdaptorReceiveInterrupt2() throws Exception {
!         testDatagramSocketAdaptorReceiveInterrupt(60_1000);
      }
  
!     private void testDatagramSocketAdaptorReceiveInterrupt(int timeout) throws Exception {
!         VThreadRunner.run(() -> {
              try (DatagramChannel dc = DatagramChannel.open()) {
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  dc.bind(new InetSocketAddress(lh, 0));
  
                  byte[] array = new byte[100];
--- 673,29 ---
      }
  
      /**
       * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testDatagramSocketAdaptorReceiveInterrupt1(Thread.Builder.OfVirtual builder) throws Exception {
+         testDatagramSocketAdaptorReceiveInterrupt(builder, 0);
      }
  
      /**
       * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
       * with timeout.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testDatagramSocketAdaptorReceiveInterrupt2(Thread.Builder.OfVirtual builder) throws Exception {
+         testDatagramSocketAdaptorReceiveInterrupt(builder, 60_1000);
      }
  
!     private void testDatagramSocketAdaptorReceiveInterrupt(Thread.Builder.OfVirtual builder,
!                                                            int timeout) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (DatagramChannel dc = DatagramChannel.open()) {
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  dc.bind(new InetSocketAddress(lh, 0));
  
                  byte[] array = new byte[100];

*** 612,13 ***
      }
  
      /**
       * Pipe read/write, no blocking.
       */
!     @Test
!     void testPipeReadWrite1() throws Exception {
!         VThreadRunner.run(() -> {
              Pipe p = Pipe.open();
              try (Pipe.SinkChannel sink = p.sink();
                   Pipe.SourceChannel source = p.source()) {
  
                  // write should not block
--- 718,14 ---
      }
  
      /**
       * Pipe read/write, no blocking.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testPipeReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              Pipe p = Pipe.open();
              try (Pipe.SinkChannel sink = p.sink();
                   Pipe.SourceChannel source = p.source()) {
  
                  // write should not block

*** 636,13 ***
      }
  
      /**
       * Virtual thread blocks in Pipe.SourceChannel read.
       */
!     @Test
!     void testPipeReadWrite2() throws Exception {
!         VThreadRunner.run(() -> {
              Pipe p = Pipe.open();
              try (Pipe.SinkChannel sink = p.sink();
                   Pipe.SourceChannel source = p.source()) {
  
                  // write from sink when current thread blocks reading from source
--- 743,14 ---
      }
  
      /**
       * Virtual thread blocks in Pipe.SourceChannel read.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testPipeReadWrite2(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              Pipe p = Pipe.open();
              try (Pipe.SinkChannel sink = p.sink();
                   Pipe.SourceChannel source = p.source()) {
  
                  // write from sink when current thread blocks reading from source

*** 659,13 ***
      }
  
      /**
       * Virtual thread blocks in Pipe.SinkChannel write.
       */
!     @Test
!     void testPipeReadWrite3() throws Exception {
!         VThreadRunner.run(() -> {
              Pipe p = Pipe.open();
              try (Pipe.SinkChannel sink = p.sink();
                   Pipe.SourceChannel source = p.source()) {
  
                  // read from source to EOF when current thread blocking in write
--- 767,14 ---
      }
  
      /**
       * Virtual thread blocks in Pipe.SinkChannel write.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testPipeReadWrite3(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              Pipe p = Pipe.open();
              try (Pipe.SinkChannel sink = p.sink();
                   Pipe.SourceChannel source = p.source()) {
  
                  // read from source to EOF when current thread blocking in write

*** 687,13 ***
      }
  
      /**
       * Pipe.SourceChannel close while virtual thread blocked in read.
       */
!     @Test
!     void testPipeReadAsyncClose() throws Exception {
!         VThreadRunner.run(() -> {
              Pipe p = Pipe.open();
              try (Pipe.SinkChannel sink = p.sink();
                   Pipe.SourceChannel source = p.source()) {
                  runAfterParkedAsync(source::close);
                  try {
--- 796,14 ---
      }
  
      /**
       * Pipe.SourceChannel close while virtual thread blocked in read.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testPipeReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              Pipe p = Pipe.open();
              try (Pipe.SinkChannel sink = p.sink();
                   Pipe.SourceChannel source = p.source()) {
                  runAfterParkedAsync(source::close);
                  try {

*** 705,13 ***
      }
  
      /**
       * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
       */
!     @Test
!     void testPipeReadInterrupt() throws Exception {
!         VThreadRunner.run(() -> {
              Pipe p = Pipe.open();
              try (Pipe.SinkChannel sink = p.sink();
                   Pipe.SourceChannel source = p.source()) {
  
                  // interrupt current thread when it blocks reading from source
--- 815,14 ---
      }
  
      /**
       * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testPipeReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              Pipe p = Pipe.open();
              try (Pipe.SinkChannel sink = p.sink();
                   Pipe.SourceChannel source = p.source()) {
  
                  // interrupt current thread when it blocks reading from source

*** 729,13 ***
      }
  
      /**
       * Pipe.SinkChannel close while virtual thread blocked in write.
       */
!     @Test
!     void testPipeWriteAsyncClose() throws Exception {
!         VThreadRunner.run(() -> {
              boolean retry = true;
              while (retry) {
                  Pipe p = Pipe.open();
                  try (Pipe.SinkChannel sink = p.sink();
                       Pipe.SourceChannel source = p.source()) {
--- 840,14 ---
      }
  
      /**
       * Pipe.SinkChannel close while virtual thread blocked in write.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testPipeWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              boolean retry = true;
              while (retry) {
                  Pipe p = Pipe.open();
                  try (Pipe.SinkChannel sink = p.sink();
                       Pipe.SourceChannel source = p.source()) {

*** 761,13 ***
      }
  
      /**
       * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
       */
!     @Test
!     void testPipeWriteInterrupt() throws Exception {
!         VThreadRunner.run(() -> {
              boolean retry = true;
              while (retry) {
                  Pipe p = Pipe.open();
                  try (Pipe.SinkChannel sink = p.sink();
                       Pipe.SourceChannel source = p.source()) {
--- 873,14 ---
      }
  
      /**
       * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
       */
!     @ParameterizedTest
!     @MethodSource("threadBuilders")
!     void testPipeWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              boolean retry = true;
              while (retry) {
                  Pipe p = Pipe.open();
                  try (Pipe.SinkChannel sink = p.sink();
                       Pipe.SourceChannel source = p.source()) {
< prev index next >