1 /*
  2  * Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /**
 25  * @test id=default
 26  * @bug 8284161
 27  * @summary Test virtual threads doing blocking I/O on NIO channels
 28  * @library /test/lib
 29  * @run junit BlockingChannelOps
 30  */
 31 
 32 /**
 33  * @test id=poller-modes
 34  * @requires (os.family == "linux") | (os.family == "mac")
 35  * @library /test/lib
 36  * @run junit/othervm -Djdk.pollerMode=1 BlockingChannelOps
 37  * @run junit/othervm -Djdk.pollerMode=2 BlockingChannelOps
 38  */
 39 
 40 /**
 41  * @test id=no-vmcontinuations
 42  * @requires vm.continuations
 43  * @library /test/lib
 44  * @run junit/othervm -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
 45  */
 46 
 47 import java.io.Closeable;
 48 import java.io.IOException;
 49 import java.net.DatagramPacket;
 50 import java.net.InetAddress;
 51 import java.net.InetSocketAddress;
 52 import java.net.Socket;
 53 import java.net.SocketAddress;
 54 import java.net.SocketException;
 55 import java.nio.ByteBuffer;
 56 import java.nio.channels.AsynchronousCloseException;
 57 import java.nio.channels.ClosedByInterruptException;
 58 import java.nio.channels.ClosedChannelException;
 59 import java.nio.channels.DatagramChannel;
 60 import java.nio.channels.Pipe;
 61 import java.nio.channels.ReadableByteChannel;
 62 import java.nio.channels.ServerSocketChannel;
 63 import java.nio.channels.SocketChannel;
 64 import java.nio.channels.WritableByteChannel;
 65 import java.util.concurrent.ExecutorService;
 66 import java.util.concurrent.Executors;
 67 import java.util.stream.Stream;
 68 
 69 import jdk.test.lib.thread.VThreadRunner;
 70 import jdk.test.lib.thread.VThreadScheduler;
 71 
 72 import org.junit.jupiter.api.BeforeAll;
 73 import org.junit.jupiter.api.AfterAll;
 74 import org.junit.jupiter.params.ParameterizedTest;
 75 import org.junit.jupiter.params.provider.MethodSource;
 76 import static org.junit.jupiter.api.Assertions.*;
 77 
 78 class BlockingChannelOps {
 79 
 80     private static ExecutorService customScheduler;
 81 
 82     @BeforeAll
 83     static void setup() {
 84         customScheduler = Executors.newCachedThreadPool();
 85     }
 86 
 87     @AfterAll
 88     static void finish() {
 89         customScheduler.shutdown();
 90     }
 91 
 92     /**
 93      * Returns the Thread.Builder to create the virtual thread.
 94      */
 95     static Stream<Thread.Builder.OfVirtual> threadBuilders() {
 96         if (VThreadScheduler.supportsCustomScheduler()) {
 97             return Stream.of(Thread.ofVirtual(), Thread.ofVirtual().scheduler(customScheduler));
 98         } else {
 99             return Stream.of(Thread.ofVirtual());
100         }
101     }
102 
103     /**
104      * SocketChannel read/write, no blocking.
105      */
106     @ParameterizedTest
107     @MethodSource("threadBuilders")
108     void testSocketChannelReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
109         VThreadRunner.run(builder, () -> {
110             try (var connection = new Connection()) {
111                 SocketChannel sc1 = connection.channel1();
112                 SocketChannel sc2 = connection.channel2();
113 
114                 // write to sc1
115                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
116                 int n = sc1.write(bb);
117                 assertTrue(n > 0);
118 
119                 // read from sc2 should not block
120                 bb = ByteBuffer.allocate(10);
121                 n = sc2.read(bb);
122                 assertTrue(n > 0);
123                 assertTrue(bb.get(0) == 'X');
124             }
125         });
126     }
127 
128     /**
129      * Virtual thread blocks in SocketChannel read.
130      */
131     @ParameterizedTest
132     @MethodSource("threadBuilders")
133     void testSocketChannelRead(Thread.Builder.OfVirtual builder) throws Exception {
134         VThreadRunner.run(builder, () -> {
135             try (var connection = new Connection()) {
136                 SocketChannel sc1 = connection.channel1();
137                 SocketChannel sc2 = connection.channel2();
138 
139                 // write to sc1 when current thread blocks in sc2.read
140                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
141                 runAfterParkedAsync(() -> sc1.write(bb1));
142 
143                 // read from sc2 should block
144                 ByteBuffer bb2 = ByteBuffer.allocate(10);
145                 int n = sc2.read(bb2);
146                 assertTrue(n > 0);
147                 assertTrue(bb2.get(0) == 'X');
148             }
149         });
150     }
151 
152     /**
153      * Virtual thread blocks in SocketChannel write.
154      */
155     @ParameterizedTest
156     @MethodSource("threadBuilders")
157     void testSocketChannelWrite(Thread.Builder.OfVirtual builder) throws Exception {
158         VThreadRunner.run(builder, () -> {
159             try (var connection = new Connection()) {
160                 SocketChannel sc1 = connection.channel1();
161                 SocketChannel sc2 = connection.channel2();
162 
163                 // read from sc2 to EOF when current thread blocks in sc1.write
164                 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
165 
166                 // write to sc1 should block
167                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
168                 for (int i=0; i<1000; i++) {
169                     int n = sc1.write(bb);
170                     assertTrue(n > 0);
171                     bb.clear();
172                 }
173                 sc1.close();
174 
175                 // wait for reader to finish
176                 reader.join();
177             }
178         });
179     }
180 
181     /**
182      * SocketChannel close while virtual thread blocked in read.
183      */
184     @ParameterizedTest
185     @MethodSource("threadBuilders")
186     void testSocketChannelReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
187         VThreadRunner.run(builder, () -> {
188             try (var connection = new Connection()) {
189                 SocketChannel sc = connection.channel1();
190                 runAfterParkedAsync(sc::close);
191                 try {
192                     int n = sc.read(ByteBuffer.allocate(100));
193                     fail("read returned " + n);
194                 } catch (AsynchronousCloseException expected) { }
195             }
196         });
197     }
198 
199     /**
200      * SocketChannel shutdownInput while virtual thread blocked in read.
201      */
202     @ParameterizedTest
203     @MethodSource("threadBuilders")
204     void testSocketChannelReadAsyncShutdownInput(Thread.Builder.OfVirtual builder) throws Exception {
205         VThreadRunner.run(builder, () -> {
206             try (var connection = new Connection()) {
207                 SocketChannel sc = connection.channel1();
208                 runAfterParkedAsync(sc::shutdownInput);
209                 int n = sc.read(ByteBuffer.allocate(100));
210                 assertEquals(-1, n);
211                 assertTrue(sc.isOpen());
212             }
213         });
214     }
215 
216     /**
217      * Virtual thread interrupted while blocked in SocketChannel read.
218      */
219     @ParameterizedTest
220     @MethodSource("threadBuilders")
221     void testSocketChannelReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
222         VThreadRunner.run(builder, () -> {
223             try (var connection = new Connection()) {
224                 SocketChannel sc = connection.channel1();
225 
226                 // interrupt current thread when it blocks in read
227                 Thread thisThread = Thread.currentThread();
228                 runAfterParkedAsync(thisThread::interrupt);
229 
230                 try {
231                     int n = sc.read(ByteBuffer.allocate(100));
232                     fail("read returned " + n);
233                 } catch (ClosedByInterruptException expected) {
234                     assertTrue(Thread.interrupted());
235                 }
236             }
237         });
238     }
239 
240     /**
241      * SocketChannel close while virtual thread blocked in write.
242      */
243     @ParameterizedTest
244     @MethodSource("threadBuilders")
245     void testSocketChannelWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
246         VThreadRunner.run(builder, () -> {
247             boolean retry = true;
248             while (retry) {
249                 try (var connection = new Connection()) {
250                     SocketChannel sc = connection.channel1();
251 
252                     // close sc when current thread blocks in write
253                     runAfterParkedAsync(sc::close);
254                     try {
255                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
256                         for (;;) {
257                             int n = sc.write(bb);
258                             assertTrue(n > 0);
259                             bb.clear();
260                         }
261                     } catch (AsynchronousCloseException expected) {
262                         // closed when blocked in write
263                         retry = false;
264                     } catch (ClosedChannelException e) {
265                         // closed when not blocked in write, need to retry test
266                     }
267                 }
268             }
269         });
270     }
271 
272 
273     /**
274      * SocketChannel shutdownOutput while virtual thread blocked in write.
275      */
276     @ParameterizedTest
277     @MethodSource("threadBuilders")
278     void testSocketChannelWriteAsyncShutdownOutput(Thread.Builder.OfVirtual builder) throws Exception {
279         VThreadRunner.run(builder, () -> {
280             try (var connection = new Connection()) {
281                 SocketChannel sc = connection.channel1();
282 
283                 // shutdown output when current thread blocks in write
284                 runAfterParkedAsync(sc::shutdownOutput);
285                 try {
286                     ByteBuffer bb = ByteBuffer.allocate(100*1024);
287                     for (;;) {
288                         int n = sc.write(bb);
289                         assertTrue(n > 0);
290                         bb.clear();
291                     }
292                 } catch (ClosedChannelException e) {
293                     // expected
294                 }
295                 assertTrue(sc.isOpen());
296             }
297         });
298     }
299 
300     /**
301      * Virtual thread interrupted while blocked in SocketChannel write.
302      */
303     @ParameterizedTest
304     @MethodSource("threadBuilders")
305     void testSocketChannelWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
306         VThreadRunner.run(builder, () -> {
307             boolean retry = true;
308             while (retry) {
309                 try (var connection = new Connection()) {
310                     SocketChannel sc = connection.channel1();
311 
312                     // interrupt current thread when it blocks in write
313                     Thread thisThread = Thread.currentThread();
314                     runAfterParkedAsync(thisThread::interrupt);
315 
316                     try {
317                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
318                         for (;;) {
319                             int n = sc.write(bb);
320                             assertTrue(n > 0);
321                             bb.clear();
322                         }
323                     } catch (ClosedByInterruptException e) {
324                         // closed when blocked in write
325                         assertTrue(Thread.interrupted());
326                         retry = false;
327                     } catch (ClosedChannelException e) {
328                         // closed when not blocked in write, need to retry test
329                     }
330                 }
331             }
332         });
333     }
334 
335     /**
336      * Virtual thread blocks in SocketChannel adaptor read.
337      */
338     @ParameterizedTest
339     @MethodSource("threadBuilders")
340     void testSocketAdaptorRead1(Thread.Builder.OfVirtual builder) throws Exception {
341         testSocketAdaptorRead(builder, 0);
342     }
343 
344     /**
345      * Virtual thread blocks in SocketChannel adaptor read with timeout.
346      */
347     @ParameterizedTest
348     @MethodSource("threadBuilders")
349     void testSocketAdaptorRead2(Thread.Builder.OfVirtual builder) throws Exception {
350         testSocketAdaptorRead(builder, 60_000);
351     }
352 
353     private void testSocketAdaptorRead(Thread.Builder.OfVirtual builder,
354                                        int timeout) throws Exception {
355         VThreadRunner.run(builder, () -> {
356             try (var connection = new Connection()) {
357                 SocketChannel sc1 = connection.channel1();
358                 SocketChannel sc2 = connection.channel2();
359 
360                 // write to sc1 when currnet thread blocks reading from sc2
361                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
362                 runAfterParkedAsync(() -> sc1.write(bb));
363 
364                 // read from sc2 should block
365                 byte[] array = new byte[100];
366                 if (timeout > 0)
367                     sc2.socket().setSoTimeout(timeout);
368                 int n = sc2.socket().getInputStream().read(array);
369                 assertTrue(n > 0);
370                 assertTrue(array[0] == 'X');
371             }
372         });
373     }
374 
375     /**
376      * ServerSocketChannel accept, no blocking.
377      */
378     @ParameterizedTest
379     @MethodSource("threadBuilders")
380     void testServerSocketChannelAccept1(Thread.Builder.OfVirtual builder) throws Exception {
381         VThreadRunner.run(builder, () -> {
382             try (var ssc = ServerSocketChannel.open()) {
383                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
384                 var sc1 = SocketChannel.open(ssc.getLocalAddress());
385                 // accept should not block
386                 var sc2 = ssc.accept();
387                 sc1.close();
388                 sc2.close();
389             }
390         });
391     }
392 
393     /**
394      * Virtual thread blocks in ServerSocketChannel accept.
395      */
396     @ParameterizedTest
397     @MethodSource("threadBuilders")
398     void testServerSocketChannelAccept2(Thread.Builder.OfVirtual builder) throws Exception {
399         VThreadRunner.run(builder, () -> {
400             try (var ssc = ServerSocketChannel.open()) {
401                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
402                 var sc1 = SocketChannel.open();
403 
404                 // connect when current thread when it blocks in accept
405                 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
406 
407                 // accept should block
408                 var sc2 = ssc.accept();
409                 sc1.close();
410                 sc2.close();
411             }
412         });
413     }
414 
415     /**
416      * SeverSocketChannel close while virtual thread blocked in accept.
417      */
418     @ParameterizedTest
419     @MethodSource("threadBuilders")
420     void testServerSocketChannelAcceptAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
421         VThreadRunner.run(builder, () -> {
422             try (var ssc = ServerSocketChannel.open()) {
423                 InetAddress lh = InetAddress.getLoopbackAddress();
424                 ssc.bind(new InetSocketAddress(lh, 0));
425                 runAfterParkedAsync(ssc::close);
426                 try {
427                     SocketChannel sc = ssc.accept();
428                     sc.close();
429                     fail("connection accepted???");
430                 } catch (AsynchronousCloseException expected) { }
431             }
432         });
433     }
434 
435     /**
436      * Virtual thread interrupted while blocked in ServerSocketChannel accept.
437      */
438     @ParameterizedTest
439     @MethodSource("threadBuilders")
440     void testServerSocketChannelAcceptInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
441         VThreadRunner.run(builder, () -> {
442             try (var ssc = ServerSocketChannel.open()) {
443                 InetAddress lh = InetAddress.getLoopbackAddress();
444                 ssc.bind(new InetSocketAddress(lh, 0));
445 
446                 // interrupt current thread when it blocks in accept
447                 Thread thisThread = Thread.currentThread();
448                 runAfterParkedAsync(thisThread::interrupt);
449 
450                 try {
451                     SocketChannel sc = ssc.accept();
452                     sc.close();
453                     fail("connection accepted???");
454                 } catch (ClosedByInterruptException expected) {
455                     assertTrue(Thread.interrupted());
456                 }
457             }
458         });
459     }
460 
461     /**
462      * Virtual thread blocks in ServerSocketChannel adaptor accept.
463      */
464     @ParameterizedTest
465     @MethodSource("threadBuilders")
466     void testSocketChannelAdaptorAccept1(Thread.Builder.OfVirtual builder) throws Exception {
467         testSocketChannelAdaptorAccept(builder, 0);
468     }
469 
470     /**
471      * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
472      */
473     @ParameterizedTest
474     @MethodSource("threadBuilders")
475     void testSocketChannelAdaptorAccept2(Thread.Builder.OfVirtual builder) throws Exception {
476         testSocketChannelAdaptorAccept(builder, 60_000);
477     }
478 
479     private void testSocketChannelAdaptorAccept(Thread.Builder.OfVirtual builder,
480                                                 int timeout) throws Exception {
481         VThreadRunner.run(builder, () -> {
482             try (var ssc = ServerSocketChannel.open()) {
483                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
484                 var sc = SocketChannel.open();
485 
486                 // interrupt current thread when it blocks in accept
487                 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
488 
489                 // accept should block
490                 if (timeout > 0)
491                     ssc.socket().setSoTimeout(timeout);
492                 Socket s = ssc.socket().accept();
493                 sc.close();
494                 s.close();
495             }
496         });
497     }
498 
499     /**
500      * DatagramChannel receive/send, no blocking.
501      */
502     @ParameterizedTest
503     @MethodSource("threadBuilders")
504     void testDatagramChannelSendReceive1(Thread.Builder.OfVirtual builder) throws Exception {
505         VThreadRunner.run(builder, () -> {
506             try (DatagramChannel dc1 = DatagramChannel.open();
507                  DatagramChannel dc2 = DatagramChannel.open()) {
508 
509                 InetAddress lh = InetAddress.getLoopbackAddress();
510                 dc2.bind(new InetSocketAddress(lh, 0));
511 
512                 // send should not block
513                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
514                 int n = dc1.send(bb, dc2.getLocalAddress());
515                 assertTrue(n > 0);
516 
517                 // receive should not block
518                 bb = ByteBuffer.allocate(10);
519                 dc2.receive(bb);
520                 assertTrue(bb.get(0) == 'X');
521             }
522         });
523     }
524 
525     /**
526      * Virtual thread blocks in DatagramChannel receive.
527      */
528     @ParameterizedTest
529     @MethodSource("threadBuilders")
530     void testDatagramChannelSendReceive2(Thread.Builder.OfVirtual builder) throws Exception {
531         VThreadRunner.run(builder, () -> {
532             try (DatagramChannel dc1 = DatagramChannel.open();
533                  DatagramChannel dc2 = DatagramChannel.open()) {
534 
535                 InetAddress lh = InetAddress.getLoopbackAddress();
536                 dc2.bind(new InetSocketAddress(lh, 0));
537 
538                 // send from dc1 when current thread blocked in dc2.receive
539                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
540                 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
541 
542                 // read from dc2 should block
543                 ByteBuffer bb2 = ByteBuffer.allocate(10);
544                 dc2.receive(bb2);
545                 assertTrue(bb2.get(0) == 'X');
546             }
547         });
548     }
549 
550     /**
551      * DatagramChannel close while virtual thread blocked in receive.
552      */
553     @ParameterizedTest
554     @MethodSource("threadBuilders")
555     void testDatagramChannelReceiveAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
556         VThreadRunner.run(builder, () -> {
557             try (DatagramChannel dc = DatagramChannel.open()) {
558                 InetAddress lh = InetAddress.getLoopbackAddress();
559                 dc.bind(new InetSocketAddress(lh, 0));
560                 runAfterParkedAsync(dc::close);
561                 try {
562                     dc.receive(ByteBuffer.allocate(100));
563                     fail("receive returned");
564                 } catch (AsynchronousCloseException expected) { }
565             }
566         });
567     }
568 
569     /**
570      * Virtual thread interrupted while blocked in DatagramChannel receive.
571      */
572     @ParameterizedTest
573     @MethodSource("threadBuilders")
574     void testDatagramChannelReceiveInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
575         VThreadRunner.run(builder, () -> {
576             try (DatagramChannel dc = DatagramChannel.open()) {
577                 InetAddress lh = InetAddress.getLoopbackAddress();
578                 dc.bind(new InetSocketAddress(lh, 0));
579 
580                 // interrupt current thread when it blocks in receive
581                 Thread thisThread = Thread.currentThread();
582                 runAfterParkedAsync(thisThread::interrupt);
583 
584                 try {
585                     dc.receive(ByteBuffer.allocate(100));
586                     fail("receive returned");
587                 } catch (ClosedByInterruptException expected) {
588                     assertTrue(Thread.interrupted());
589                 }
590             }
591         });
592     }
593 
594     /**
595      * Virtual thread blocks in DatagramSocket adaptor receive.
596      */
597     @ParameterizedTest
598     @MethodSource("threadBuilders")
599     void testDatagramSocketAdaptorReceive1(Thread.Builder.OfVirtual builder) throws Exception {
600         testDatagramSocketAdaptorReceive(builder, 0);
601     }
602 
603     /**
604      * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
605      */
606     @ParameterizedTest
607     @MethodSource("threadBuilders")
608     void testDatagramSocketAdaptorReceive2(Thread.Builder.OfVirtual builder) throws Exception {
609         testDatagramSocketAdaptorReceive(builder, 60_000);
610     }
611 
612     private void testDatagramSocketAdaptorReceive(Thread.Builder.OfVirtual builder,
613                                                   int timeout) throws Exception {
614         VThreadRunner.run(builder, () -> {
615             try (DatagramChannel dc1 = DatagramChannel.open();
616                  DatagramChannel dc2 = DatagramChannel.open()) {
617 
618                 InetAddress lh = InetAddress.getLoopbackAddress();
619                 dc2.bind(new InetSocketAddress(lh, 0));
620 
621                 // send from dc1 when current thread blocks in dc2 receive
622                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
623                 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
624 
625                 // receive should block
626                 byte[] array = new byte[100];
627                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
628                 if (timeout > 0)
629                     dc2.socket().setSoTimeout(timeout);
630                 dc2.socket().receive(p);
631                 assertTrue(p.getLength() == 3 && array[0] == 'X');
632             }
633         });
634     }
635 
636     /**
637      * DatagramChannel close while virtual thread blocked in adaptor receive.
638      */
639     @ParameterizedTest
640     @MethodSource("threadBuilders")
641     void testDatagramSocketAdaptorReceiveAsyncClose1(Thread.Builder.OfVirtual builder) throws Exception {
642         testDatagramSocketAdaptorReceiveAsyncClose(builder, 0);
643     }
644 
645     /**
646      * DatagramChannel close while virtual thread blocked in adaptor receive
647      * with timeout.
648      */
649     @ParameterizedTest
650     @MethodSource("threadBuilders")
651     void testDatagramSocketAdaptorReceiveAsyncClose2(Thread.Builder.OfVirtual builder) throws Exception {
652         testDatagramSocketAdaptorReceiveAsyncClose(builder, 60_1000);
653     }
654 
655     private void testDatagramSocketAdaptorReceiveAsyncClose(Thread.Builder.OfVirtual builder,
656                                                             int timeout) throws Exception {
657         VThreadRunner.run(builder, () -> {
658             try (DatagramChannel dc = DatagramChannel.open()) {
659                 InetAddress lh = InetAddress.getLoopbackAddress();
660                 dc.bind(new InetSocketAddress(lh, 0));
661 
662                 byte[] array = new byte[100];
663                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
664                 if (timeout > 0)
665                     dc.socket().setSoTimeout(timeout);
666 
667                 // close channel/socket when current thread blocks in receive
668                 runAfterParkedAsync(dc::close);
669 
670                 assertThrows(SocketException.class, () -> dc.socket().receive(p));
671             }
672         });
673     }
674 
675     /**
676      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
677      */
678     @ParameterizedTest
679     @MethodSource("threadBuilders")
680     void testDatagramSocketAdaptorReceiveInterrupt1(Thread.Builder.OfVirtual builder) throws Exception {
681         testDatagramSocketAdaptorReceiveInterrupt(builder, 0);
682     }
683 
684     /**
685      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
686      * with timeout.
687      */
688     @ParameterizedTest
689     @MethodSource("threadBuilders")
690     void testDatagramSocketAdaptorReceiveInterrupt2(Thread.Builder.OfVirtual builder) throws Exception {
691         testDatagramSocketAdaptorReceiveInterrupt(builder, 60_1000);
692     }
693 
694     private void testDatagramSocketAdaptorReceiveInterrupt(Thread.Builder.OfVirtual builder,
695                                                            int timeout) throws Exception {
696         VThreadRunner.run(builder, () -> {
697             try (DatagramChannel dc = DatagramChannel.open()) {
698                 InetAddress lh = InetAddress.getLoopbackAddress();
699                 dc.bind(new InetSocketAddress(lh, 0));
700 
701                 byte[] array = new byte[100];
702                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
703                 if (timeout > 0)
704                     dc.socket().setSoTimeout(timeout);
705 
706                 // interrupt current thread when it blocks in receive
707                 Thread thisThread = Thread.currentThread();
708                 runAfterParkedAsync(thisThread::interrupt);
709 
710                 try {
711                     dc.socket().receive(p);
712                     fail();
713                 } catch (ClosedByInterruptException expected) {
714                     assertTrue(Thread.interrupted());
715                 }
716             }
717         });
718     }
719 
720     /**
721      * Pipe read/write, no blocking.
722      */
723     @ParameterizedTest
724     @MethodSource("threadBuilders")
725     void testPipeReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
726         VThreadRunner.run(builder, () -> {
727             Pipe p = Pipe.open();
728             try (Pipe.SinkChannel sink = p.sink();
729                  Pipe.SourceChannel source = p.source()) {
730 
731                 // write should not block
732                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
733                 int n = sink.write(bb);
734                 assertTrue(n > 0);
735 
736                 // read should not block
737                 bb = ByteBuffer.allocate(10);
738                 n = source.read(bb);
739                 assertTrue(n > 0);
740                 assertTrue(bb.get(0) == 'X');
741             }
742         });
743     }
744 
745     /**
746      * Virtual thread blocks in Pipe.SourceChannel read.
747      */
748     @ParameterizedTest
749     @MethodSource("threadBuilders")
750     void testPipeReadWrite2(Thread.Builder.OfVirtual builder) throws Exception {
751         VThreadRunner.run(builder, () -> {
752             Pipe p = Pipe.open();
753             try (Pipe.SinkChannel sink = p.sink();
754                  Pipe.SourceChannel source = p.source()) {
755 
756                 // write from sink when current thread blocks reading from source
757                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
758                 runAfterParkedAsync(() -> sink.write(bb1));
759 
760                 // read should block
761                 ByteBuffer bb2 = ByteBuffer.allocate(10);
762                 int n = source.read(bb2);
763                 assertTrue(n > 0);
764                 assertTrue(bb2.get(0) == 'X');
765             }
766         });
767     }
768 
769     /**
770      * Virtual thread blocks in Pipe.SinkChannel write.
771      */
772     @ParameterizedTest
773     @MethodSource("threadBuilders")
774     void testPipeReadWrite3(Thread.Builder.OfVirtual builder) throws Exception {
775         VThreadRunner.run(builder, () -> {
776             Pipe p = Pipe.open();
777             try (Pipe.SinkChannel sink = p.sink();
778                  Pipe.SourceChannel source = p.source()) {
779 
780                 // read from source to EOF when current thread blocking in write
781                 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
782 
783                 // write to sink should block
784                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
785                 for (int i=0; i<1000; i++) {
786                     int n = sink.write(bb);
787                     assertTrue(n > 0);
788                     bb.clear();
789                 }
790                 sink.close();
791 
792                 // wait for reader to finish
793                 reader.join();
794             }
795         });
796     }
797 
798     /**
799      * Pipe.SourceChannel close while virtual thread blocked in read.
800      */
801     @ParameterizedTest
802     @MethodSource("threadBuilders")
803     void testPipeReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
804         VThreadRunner.run(builder, () -> {
805             Pipe p = Pipe.open();
806             try (Pipe.SinkChannel sink = p.sink();
807                  Pipe.SourceChannel source = p.source()) {
808                 runAfterParkedAsync(source::close);
809                 try {
810                     int n = source.read(ByteBuffer.allocate(100));
811                     fail("read returned " + n);
812                 } catch (AsynchronousCloseException expected) { }
813             }
814         });
815     }
816 
817     /**
818      * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
819      */
820     @ParameterizedTest
821     @MethodSource("threadBuilders")
822     void testPipeReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
823         VThreadRunner.run(builder, () -> {
824             Pipe p = Pipe.open();
825             try (Pipe.SinkChannel sink = p.sink();
826                  Pipe.SourceChannel source = p.source()) {
827 
828                 // interrupt current thread when it blocks reading from source
829                 Thread thisThread = Thread.currentThread();
830                 runAfterParkedAsync(thisThread::interrupt);
831 
832                 try {
833                     int n = source.read(ByteBuffer.allocate(100));
834                     fail("read returned " + n);
835                 } catch (ClosedByInterruptException expected) {
836                     assertTrue(Thread.interrupted());
837                 }
838             }
839         });
840     }
841 
842     /**
843      * Pipe.SinkChannel close while virtual thread blocked in write.
844      */
845     @ParameterizedTest
846     @MethodSource("threadBuilders")
847     void testPipeWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
848         VThreadRunner.run(builder, () -> {
849             boolean retry = true;
850             while (retry) {
851                 Pipe p = Pipe.open();
852                 try (Pipe.SinkChannel sink = p.sink();
853                      Pipe.SourceChannel source = p.source()) {
854 
855                     // close sink when current thread blocks in write
856                     runAfterParkedAsync(sink::close);
857                     try {
858                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
859                         for (;;) {
860                             int n = sink.write(bb);
861                             assertTrue(n > 0);
862                             bb.clear();
863                         }
864                     } catch (AsynchronousCloseException e) {
865                         // closed when blocked in write
866                         retry = false;
867                     } catch (ClosedChannelException e) {
868                         // closed when not blocked in write, need to retry test
869                     }
870                 }
871             }
872         });
873     }
874 
875     /**
876      * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
877      */
878     @ParameterizedTest
879     @MethodSource("threadBuilders")
880     void testPipeWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
881         VThreadRunner.run(builder, () -> {
882             boolean retry = true;
883             while (retry) {
884                 Pipe p = Pipe.open();
885                 try (Pipe.SinkChannel sink = p.sink();
886                      Pipe.SourceChannel source = p.source()) {
887 
888                     // interrupt current thread when it blocks in write
889                     Thread thisThread = Thread.currentThread();
890                     runAfterParkedAsync(thisThread::interrupt);
891 
892                     try {
893                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
894                         for (;;) {
895                             int n = sink.write(bb);
896                             assertTrue(n > 0);
897                             bb.clear();
898                         }
899                     } catch (ClosedByInterruptException expected) {
900                         // closed when blocked in write
901                         assertTrue(Thread.interrupted());
902                         retry = false;
903                     } catch (ClosedChannelException e) {
904                         // closed when not blocked in write, need to retry test
905                     }
906                 }
907             }
908         });
909     }
910 
911     /**
912      * Creates a loopback connection
913      */
914     static class Connection implements Closeable {
915         private final SocketChannel sc1;
916         private final SocketChannel sc2;
917         Connection() throws IOException {
918             var lh = InetAddress.getLoopbackAddress();
919             try (var listener = ServerSocketChannel.open()) {
920                 listener.bind(new InetSocketAddress(lh, 0));
921                 SocketChannel sc1 = SocketChannel.open();
922                 SocketChannel sc2 = null;
923                 try {
924                     sc1.socket().connect(listener.getLocalAddress());
925                     sc2 = listener.accept();
926                 } catch (IOException ioe) {
927                     sc1.close();
928                     throw ioe;
929                 }
930                 this.sc1 = sc1;
931                 this.sc2 = sc2;
932             }
933         }
934         SocketChannel channel1() {
935             return sc1;
936         }
937         SocketChannel channel2() {
938             return sc2;
939         }
940         @Override
941         public void close() throws IOException {
942             sc1.close();
943             sc2.close();
944         }
945     }
946 
947     /**
948      * Read from a channel until all bytes have been read or an I/O error occurs.
949      */
950     static void readToEOF(ReadableByteChannel rbc) throws IOException {
951         ByteBuffer bb = ByteBuffer.allocate(16*1024);
952         int n;
953         while ((n = rbc.read(bb)) > 0) {
954             bb.clear();
955         }
956     }
957 
958     @FunctionalInterface
959     interface ThrowingRunnable {
960         void run() throws Exception;
961     }
962 
963     /**
964      * Runs the given task asynchronously after the current virtual thread has parked.
965      * @return the thread started to run the task
966      */
967     static Thread runAfterParkedAsync(ThrowingRunnable task) {
968         Thread target = Thread.currentThread();
969         if (!target.isVirtual())
970             throw new WrongThreadException();
971         return Thread.ofPlatform().daemon().start(() -> {
972             try {
973                 Thread.State state = target.getState();
974                 while (state != Thread.State.WAITING
975                         && state != Thread.State.TIMED_WAITING) {
976                     Thread.sleep(20);
977                     state = target.getState();
978                 }
979                 Thread.sleep(20);  // give a bit more time to release carrier
980                 task.run();
981             } catch (Exception e) {
982                 e.printStackTrace();
983             }
984         });
985     }
986 }