< prev index next >

test/jdk/java/nio/channels/vthread/BlockingChannelOps.java

Print this page
@@ -60,23 +60,55 @@
  import java.nio.channels.Pipe;
  import java.nio.channels.ReadableByteChannel;
  import java.nio.channels.ServerSocketChannel;
  import java.nio.channels.SocketChannel;
  import java.nio.channels.WritableByteChannel;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.Executors;
+ import java.util.stream.Stream;
  
  import jdk.test.lib.thread.VThreadRunner;
- import org.junit.jupiter.api.Test;
+ import jdk.test.lib.thread.VThreadScheduler;
+ 
+ import org.junit.jupiter.api.BeforeAll;
+ import org.junit.jupiter.api.AfterAll;
+ import org.junit.jupiter.params.ParameterizedTest;
+ import org.junit.jupiter.params.provider.MethodSource;
  import static org.junit.jupiter.api.Assertions.*;
  
  class BlockingChannelOps {
  
+     private static ExecutorService customScheduler;
+ 
+     @BeforeAll
+     static void setup() {
+         customScheduler = Executors.newCachedThreadPool();
+     }
+ 
+     @AfterAll
+     static void finish() {
+         customScheduler.shutdown();
+     }
+ 
+     /**
+      * Returns the Thread.Builder to create the virtual thread.
+      */
+     static Stream<Thread.Builder.OfVirtual> threadBuilders() {
+         if (VThreadScheduler.supportsCustomScheduler()) {
+             return Stream.of(Thread.ofVirtual(), Thread.ofVirtual().scheduler(customScheduler));
+         } else {
+             return Stream.of(Thread.ofVirtual());
+         }
+     }
+ 
      /**
       * SocketChannel read/write, no blocking.
       */
-     @Test
-     void testSocketChannelReadWrite1() throws Exception {
-         VThreadRunner.run(() -> {
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testSocketChannelReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (var connection = new Connection()) {
                  SocketChannel sc1 = connection.channel1();
                  SocketChannel sc2 = connection.channel2();
  
                  // write to sc1

@@ -94,13 +126,14 @@
      }
  
      /**
       * Virtual thread blocks in SocketChannel read.
       */
-     @Test
-     void testSocketChannelRead() throws Exception {
-         VThreadRunner.run(() -> {
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testSocketChannelRead(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (var connection = new Connection()) {
                  SocketChannel sc1 = connection.channel1();
                  SocketChannel sc2 = connection.channel2();
  
                  // write to sc1 when current thread blocks in sc2.read

@@ -117,13 +150,14 @@
      }
  
      /**
       * Virtual thread blocks in SocketChannel write.
       */
-     @Test
-     void testSocketChannelWrite() throws Exception {
-         VThreadRunner.run(() -> {
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testSocketChannelWrite(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (var connection = new Connection()) {
                  SocketChannel sc1 = connection.channel1();
                  SocketChannel sc2 = connection.channel2();
  
                  // read from sc2 to EOF when current thread blocks in sc1.write

@@ -145,13 +179,14 @@
      }
  
      /**
       * SocketChannel close while virtual thread blocked in read.
       */
-     @Test
-     void testSocketChannelReadAsyncClose() throws Exception {
-         VThreadRunner.run(() -> {
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testSocketChannelReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (var connection = new Connection()) {
                  SocketChannel sc = connection.channel1();
                  runAfterParkedAsync(sc::close);
                  try {
                      int n = sc.read(ByteBuffer.allocate(100));

@@ -159,16 +194,34 @@
                  } catch (AsynchronousCloseException expected) { }
              }
          });
      }
  
+     /**
+      * SocketChannel shutdownInput while virtual thread blocked in read.
+      */
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testSocketChannelReadAsyncShutdownInput(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
+             try (var connection = new Connection()) {
+                 SocketChannel sc = connection.channel1();
+                 runAfterParkedAsync(sc::shutdownInput);
+                 int n = sc.read(ByteBuffer.allocate(100));
+                 assertEquals(-1, n);
+                 assertTrue(sc.isOpen());
+             }
+         });
+     }
+ 
      /**
       * Virtual thread interrupted while blocked in SocketChannel read.
       */
-     @Test
-     void testSocketChannelReadInterrupt() throws Exception {
-         VThreadRunner.run(() -> {
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testSocketChannelReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (var connection = new Connection()) {
                  SocketChannel sc = connection.channel1();
  
                  // interrupt current thread when it blocks in read
                  Thread thisThread = Thread.currentThread();

@@ -185,13 +238,14 @@
      }
  
      /**
       * SocketChannel close while virtual thread blocked in write.
       */
-     @Test
-     void testSocketChannelWriteAsyncClose() throws Exception {
-         VThreadRunner.run(() -> {
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testSocketChannelWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              boolean retry = true;
              while (retry) {
                  try (var connection = new Connection()) {
                      SocketChannel sc = connection.channel1();
  

@@ -213,16 +267,45 @@
                  }
              }
          });
      }
  
+ 
+     /**
+      * SocketChannel shutdownOutput while virtual thread blocked in write.
+      */
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testSocketChannelWriteAsyncShutdownOutput(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
+             try (var connection = new Connection()) {
+                 SocketChannel sc = connection.channel1();
+ 
+                 // shutdown output when current thread blocks in write
+                 runAfterParkedAsync(sc::shutdownOutput);
+                 try {
+                     ByteBuffer bb = ByteBuffer.allocate(100*1024);
+                     for (;;) {
+                         int n = sc.write(bb);
+                         assertTrue(n > 0);
+                         bb.clear();
+                     }
+                 } catch (ClosedChannelException e) {
+                     // expected
+                 }
+                 assertTrue(sc.isOpen());
+             }
+         });
+     }
+ 
      /**
       * Virtual thread interrupted while blocked in SocketChannel write.
       */
-     @Test
-     void testSocketChannelWriteInterrupt() throws Exception {
-         VThreadRunner.run(() -> {
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testSocketChannelWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              boolean retry = true;
              while (retry) {
                  try (var connection = new Connection()) {
                      SocketChannel sc = connection.channel1();
  

@@ -250,25 +333,28 @@
      }
  
      /**
       * Virtual thread blocks in SocketChannel adaptor read.
       */
-     @Test
-     void testSocketAdaptorRead1() throws Exception {
-         testSocketAdaptorRead(0);
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testSocketAdaptorRead1(Thread.Builder.OfVirtual builder) throws Exception {
+         testSocketAdaptorRead(builder, 0);
      }
  
      /**
       * Virtual thread blocks in SocketChannel adaptor read with timeout.
       */
-     @Test
-     void testSocketAdaptorRead2() throws Exception {
-         testSocketAdaptorRead(60_000);
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testSocketAdaptorRead2(Thread.Builder.OfVirtual builder) throws Exception {
+         testSocketAdaptorRead(builder, 60_000);
      }
  
-     private void testSocketAdaptorRead(int timeout) throws Exception {
-         VThreadRunner.run(() -> {
+     private void testSocketAdaptorRead(Thread.Builder.OfVirtual builder,
+                                        int timeout) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (var connection = new Connection()) {
                  SocketChannel sc1 = connection.channel1();
                  SocketChannel sc2 = connection.channel2();
  
                  // write to sc1 when currnet thread blocks reading from sc2

@@ -287,13 +373,14 @@
      }
  
      /**
       * ServerSocketChannel accept, no blocking.
       */
-     @Test
-     void testServerSocketChannelAccept1() throws Exception {
-         VThreadRunner.run(() -> {
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testServerSocketChannelAccept1(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (var ssc = ServerSocketChannel.open()) {
                  ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
                  var sc1 = SocketChannel.open(ssc.getLocalAddress());
                  // accept should not block
                  var sc2 = ssc.accept();

@@ -304,13 +391,14 @@
      }
  
      /**
       * Virtual thread blocks in ServerSocketChannel accept.
       */
-     @Test
-     void testServerSocketChannelAccept2() throws Exception {
-         VThreadRunner.run(() -> {
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testServerSocketChannelAccept2(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (var ssc = ServerSocketChannel.open()) {
                  ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
                  var sc1 = SocketChannel.open();
  
                  // connect when current thread when it blocks in accept

@@ -325,13 +413,14 @@
      }
  
      /**
       * SeverSocketChannel close while virtual thread blocked in accept.
       */
-     @Test
-     void testServerSocketChannelAcceptAsyncClose() throws Exception {
-         VThreadRunner.run(() -> {
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testServerSocketChannelAcceptAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (var ssc = ServerSocketChannel.open()) {
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  ssc.bind(new InetSocketAddress(lh, 0));
                  runAfterParkedAsync(ssc::close);
                  try {

@@ -344,13 +433,14 @@
      }
  
      /**
       * Virtual thread interrupted while blocked in ServerSocketChannel accept.
       */
-     @Test
-     void testServerSocketChannelAcceptInterrupt() throws Exception {
-         VThreadRunner.run(() -> {
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testServerSocketChannelAcceptInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (var ssc = ServerSocketChannel.open()) {
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  ssc.bind(new InetSocketAddress(lh, 0));
  
                  // interrupt current thread when it blocks in accept

@@ -369,25 +459,28 @@
      }
  
      /**
       * Virtual thread blocks in ServerSocketChannel adaptor accept.
       */
-     @Test
-     void testSocketChannelAdaptorAccept1() throws Exception {
-         testSocketChannelAdaptorAccept(0);
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testSocketChannelAdaptorAccept1(Thread.Builder.OfVirtual builder) throws Exception {
+         testSocketChannelAdaptorAccept(builder, 0);
      }
  
      /**
       * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
       */
-     @Test
-     void testSocketChannelAdaptorAccept2() throws Exception {
-         testSocketChannelAdaptorAccept(60_000);
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testSocketChannelAdaptorAccept2(Thread.Builder.OfVirtual builder) throws Exception {
+         testSocketChannelAdaptorAccept(builder, 60_000);
      }
  
-     private void testSocketChannelAdaptorAccept(int timeout) throws Exception {
-         VThreadRunner.run(() -> {
+     private void testSocketChannelAdaptorAccept(Thread.Builder.OfVirtual builder,
+                                                 int timeout) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (var ssc = ServerSocketChannel.open()) {
                  ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
                  var sc = SocketChannel.open();
  
                  // interrupt current thread when it blocks in accept

@@ -404,13 +497,14 @@
      }
  
      /**
       * DatagramChannel receive/send, no blocking.
       */
-     @Test
-     void testDatagramChannelSendReceive1() throws Exception {
-         VThreadRunner.run(() -> {
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testDatagramChannelSendReceive1(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (DatagramChannel dc1 = DatagramChannel.open();
                   DatagramChannel dc2 = DatagramChannel.open()) {
  
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  dc2.bind(new InetSocketAddress(lh, 0));

@@ -429,13 +523,14 @@
      }
  
      /**
       * Virtual thread blocks in DatagramChannel receive.
       */
-     @Test
-     void testDatagramChannelSendReceive2() throws Exception {
-         VThreadRunner.run(() -> {
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testDatagramChannelSendReceive2(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (DatagramChannel dc1 = DatagramChannel.open();
                   DatagramChannel dc2 = DatagramChannel.open()) {
  
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  dc2.bind(new InetSocketAddress(lh, 0));

@@ -453,13 +548,14 @@
      }
  
      /**
       * DatagramChannel close while virtual thread blocked in receive.
       */
-     @Test
-     void testDatagramChannelReceiveAsyncClose() throws Exception {
-         VThreadRunner.run(() -> {
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testDatagramChannelReceiveAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (DatagramChannel dc = DatagramChannel.open()) {
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  dc.bind(new InetSocketAddress(lh, 0));
                  runAfterParkedAsync(dc::close);
                  try {

@@ -471,13 +567,14 @@
      }
  
      /**
       * Virtual thread interrupted while blocked in DatagramChannel receive.
       */
-     @Test
-     void testDatagramChannelReceiveInterrupt() throws Exception {
-         VThreadRunner.run(() -> {
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testDatagramChannelReceiveInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (DatagramChannel dc = DatagramChannel.open()) {
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  dc.bind(new InetSocketAddress(lh, 0));
  
                  // interrupt current thread when it blocks in receive

@@ -495,25 +592,28 @@
      }
  
      /**
       * Virtual thread blocks in DatagramSocket adaptor receive.
       */
-     @Test
-     void testDatagramSocketAdaptorReceive1() throws Exception {
-         testDatagramSocketAdaptorReceive(0);
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testDatagramSocketAdaptorReceive1(Thread.Builder.OfVirtual builder) throws Exception {
+         testDatagramSocketAdaptorReceive(builder, 0);
      }
  
      /**
       * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
       */
-     @Test
-     void testDatagramSocketAdaptorReceive2() throws Exception {
-         testDatagramSocketAdaptorReceive(60_000);
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testDatagramSocketAdaptorReceive2(Thread.Builder.OfVirtual builder) throws Exception {
+         testDatagramSocketAdaptorReceive(builder, 60_000);
      }
  
-     private void testDatagramSocketAdaptorReceive(int timeout) throws Exception {
-         VThreadRunner.run(() -> {
+     private void testDatagramSocketAdaptorReceive(Thread.Builder.OfVirtual builder,
+                                                   int timeout) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (DatagramChannel dc1 = DatagramChannel.open();
                   DatagramChannel dc2 = DatagramChannel.open()) {
  
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  dc2.bind(new InetSocketAddress(lh, 0));

@@ -534,26 +634,29 @@
      }
  
      /**
       * DatagramChannel close while virtual thread blocked in adaptor receive.
       */
-     @Test
-     void testDatagramSocketAdaptorReceiveAsyncClose1() throws Exception {
-         testDatagramSocketAdaptorReceiveAsyncClose(0);
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testDatagramSocketAdaptorReceiveAsyncClose1(Thread.Builder.OfVirtual builder) throws Exception {
+         testDatagramSocketAdaptorReceiveAsyncClose(builder, 0);
      }
  
      /**
       * DatagramChannel close while virtual thread blocked in adaptor receive
       * with timeout.
       */
-     @Test
-     void testDatagramSocketAdaptorReceiveAsyncClose2() throws Exception {
-         testDatagramSocketAdaptorReceiveAsyncClose(60_1000);
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testDatagramSocketAdaptorReceiveAsyncClose2(Thread.Builder.OfVirtual builder) throws Exception {
+         testDatagramSocketAdaptorReceiveAsyncClose(builder, 60_1000);
      }
  
-     private void testDatagramSocketAdaptorReceiveAsyncClose(int timeout) throws Exception {
-         VThreadRunner.run(() -> {
+     private void testDatagramSocketAdaptorReceiveAsyncClose(Thread.Builder.OfVirtual builder,
+                                                             int timeout) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (DatagramChannel dc = DatagramChannel.open()) {
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  dc.bind(new InetSocketAddress(lh, 0));
  
                  byte[] array = new byte[100];

@@ -570,26 +673,29 @@
      }
  
      /**
       * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
       */
-     @Test
-     void testDatagramSocketAdaptorReceiveInterrupt1() throws Exception {
-         testDatagramSocketAdaptorReceiveInterrupt(0);
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testDatagramSocketAdaptorReceiveInterrupt1(Thread.Builder.OfVirtual builder) throws Exception {
+         testDatagramSocketAdaptorReceiveInterrupt(builder, 0);
      }
  
      /**
       * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
       * with timeout.
       */
-     @Test
-     void testDatagramSocketAdaptorReceiveInterrupt2() throws Exception {
-         testDatagramSocketAdaptorReceiveInterrupt(60_1000);
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testDatagramSocketAdaptorReceiveInterrupt2(Thread.Builder.OfVirtual builder) throws Exception {
+         testDatagramSocketAdaptorReceiveInterrupt(builder, 60_1000);
      }
  
-     private void testDatagramSocketAdaptorReceiveInterrupt(int timeout) throws Exception {
-         VThreadRunner.run(() -> {
+     private void testDatagramSocketAdaptorReceiveInterrupt(Thread.Builder.OfVirtual builder,
+                                                            int timeout) throws Exception {
+         VThreadRunner.run(builder, () -> {
              try (DatagramChannel dc = DatagramChannel.open()) {
                  InetAddress lh = InetAddress.getLoopbackAddress();
                  dc.bind(new InetSocketAddress(lh, 0));
  
                  byte[] array = new byte[100];

@@ -612,13 +718,14 @@
      }
  
      /**
       * Pipe read/write, no blocking.
       */
-     @Test
-     void testPipeReadWrite1() throws Exception {
-         VThreadRunner.run(() -> {
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testPipeReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              Pipe p = Pipe.open();
              try (Pipe.SinkChannel sink = p.sink();
                   Pipe.SourceChannel source = p.source()) {
  
                  // write should not block

@@ -636,13 +743,14 @@
      }
  
      /**
       * Virtual thread blocks in Pipe.SourceChannel read.
       */
-     @Test
-     void testPipeReadWrite2() throws Exception {
-         VThreadRunner.run(() -> {
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testPipeReadWrite2(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              Pipe p = Pipe.open();
              try (Pipe.SinkChannel sink = p.sink();
                   Pipe.SourceChannel source = p.source()) {
  
                  // write from sink when current thread blocks reading from source

@@ -659,13 +767,14 @@
      }
  
      /**
       * Virtual thread blocks in Pipe.SinkChannel write.
       */
-     @Test
-     void testPipeReadWrite3() throws Exception {
-         VThreadRunner.run(() -> {
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testPipeReadWrite3(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              Pipe p = Pipe.open();
              try (Pipe.SinkChannel sink = p.sink();
                   Pipe.SourceChannel source = p.source()) {
  
                  // read from source to EOF when current thread blocking in write

@@ -687,13 +796,14 @@
      }
  
      /**
       * Pipe.SourceChannel close while virtual thread blocked in read.
       */
-     @Test
-     void testPipeReadAsyncClose() throws Exception {
-         VThreadRunner.run(() -> {
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testPipeReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              Pipe p = Pipe.open();
              try (Pipe.SinkChannel sink = p.sink();
                   Pipe.SourceChannel source = p.source()) {
                  runAfterParkedAsync(source::close);
                  try {

@@ -705,13 +815,14 @@
      }
  
      /**
       * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
       */
-     @Test
-     void testPipeReadInterrupt() throws Exception {
-         VThreadRunner.run(() -> {
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testPipeReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              Pipe p = Pipe.open();
              try (Pipe.SinkChannel sink = p.sink();
                   Pipe.SourceChannel source = p.source()) {
  
                  // interrupt current thread when it blocks reading from source

@@ -729,13 +840,14 @@
      }
  
      /**
       * Pipe.SinkChannel close while virtual thread blocked in write.
       */
-     @Test
-     void testPipeWriteAsyncClose() throws Exception {
-         VThreadRunner.run(() -> {
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testPipeWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              boolean retry = true;
              while (retry) {
                  Pipe p = Pipe.open();
                  try (Pipe.SinkChannel sink = p.sink();
                       Pipe.SourceChannel source = p.source()) {

@@ -761,13 +873,14 @@
      }
  
      /**
       * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
       */
-     @Test
-     void testPipeWriteInterrupt() throws Exception {
-         VThreadRunner.run(() -> {
+     @ParameterizedTest
+     @MethodSource("threadBuilders")
+     void testPipeWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
+         VThreadRunner.run(builder, () -> {
              boolean retry = true;
              while (retry) {
                  Pipe p = Pipe.open();
                  try (Pipe.SinkChannel sink = p.sink();
                       Pipe.SourceChannel source = p.source()) {
< prev index next >