1 /*
  2  * Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /*
 25  * @test id=default
 26  * @bug 8284161
 27  * @summary Test virtual threads doing blocking I/O on NIO channels
 28  * @library /test/lib
 29  * @run junit/othervm BlockingChannelOps
 30  */
 31 
 32 /*
 33  * @test id=poller-modes
 34  * @requires (os.family == "linux") | (os.family == "mac")
 35  * @library /test/lib
 36  * @run junit/othervm -Djdk.pollerMode=1 BlockingChannelOps
 37  * @run junit/othervm -Djdk.pollerMode=2 BlockingChannelOps
 38  * @run junit/othervm -Djdk.pollerMode=3 BlockingChannelOps
 39  */
 40 
 41 /*
 42  * @test id=io_uring
 43  * @requires os.family == "linux"
 44  * @library /test/lib
 45  * @run junit/othervm -Djdk.pollerMode=1 -Djdk.io_uring=true BlockingChannelOps
 46  * @run junit/othervm -Djdk.pollerMode=2 -Djdk.io_uring=true BlockingChannelOps
 47  * @run junit/othervm -Djdk.pollerMode=3 -Djdk.io_uring=true BlockingChannelOps
 48  */
 49 
 50 /*
 51  * @test id=no-vmcontinuations
 52  * @requires vm.continuations
 53  * @library /test/lib
 54  * @run junit/othervm -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
 55  */
 56 
 57 import java.io.Closeable;
 58 import java.io.IOException;
 59 import java.net.DatagramPacket;
 60 import java.net.InetAddress;
 61 import java.net.InetSocketAddress;
 62 import java.net.Socket;
 63 import java.net.SocketAddress;
 64 import java.net.SocketException;
 65 import java.nio.ByteBuffer;
 66 import java.nio.channels.AsynchronousCloseException;
 67 import java.nio.channels.ClosedByInterruptException;
 68 import java.nio.channels.ClosedChannelException;
 69 import java.nio.channels.DatagramChannel;
 70 import java.nio.channels.Pipe;
 71 import java.nio.channels.ReadableByteChannel;
 72 import java.nio.channels.ServerSocketChannel;
 73 import java.nio.channels.SocketChannel;
 74 import java.nio.channels.WritableByteChannel;
 75 import java.util.concurrent.ExecutorService;
 76 import java.util.concurrent.Executors;
 77 import java.util.concurrent.ThreadFactory;
 78 import java.util.stream.Stream;
 79 
 80 import jdk.test.lib.thread.VThreadRunner;
 81 import jdk.test.lib.thread.VThreadScheduler;
 82 
 83 import org.junit.jupiter.api.BeforeAll;
 84 import org.junit.jupiter.params.ParameterizedTest;
 85 import org.junit.jupiter.params.provider.MethodSource;
 86 import static org.junit.jupiter.api.Assertions.*;
 87 
 88 class BlockingChannelOps {
 89     private static ExecutorService threadPool;
 90     private static Thread.VirtualThreadScheduler customScheduler;
 91 
 92     @BeforeAll
 93     static void setup() {
 94         ThreadFactory factory = Thread.ofPlatform().daemon().factory();
 95         threadPool = Executors.newCachedThreadPool(factory);
 96         customScheduler = (_, task) -> threadPool.execute(task);
 97     }
 98 
 99     /**
100      * Returns the Thread.Builder to create the virtual thread.
101      */
102     static Stream<Thread.Builder.OfVirtual> threadBuilders() {
103         if (VThreadScheduler.supportsCustomScheduler()) {
104             return Stream.of(Thread.ofVirtual(), Thread.ofVirtual().scheduler(customScheduler));
105         } else {
106             return Stream.of(Thread.ofVirtual());
107         }
108     }
109 
110     /**
111      * SocketChannel read/write, no blocking.
112      */
113     @ParameterizedTest
114     @MethodSource("threadBuilders")
115     void testSocketChannelReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
116         VThreadRunner.run(builder, () -> {
117             try (var connection = new Connection()) {
118                 SocketChannel sc1 = connection.channel1();
119                 SocketChannel sc2 = connection.channel2();
120 
121                 // write to sc1
122                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
123                 int n = sc1.write(bb);
124                 assertTrue(n > 0);
125 
126                 // read from sc2 should not block
127                 bb = ByteBuffer.allocate(10);
128                 n = sc2.read(bb);
129                 assertTrue(n > 0);
130                 assertTrue(bb.get(0) == 'X');
131             }
132         });
133     }
134 
135     /**
136      * Virtual thread blocks in SocketChannel read.
137      */
138     @ParameterizedTest
139     @MethodSource("threadBuilders")
140     void testSocketChannelRead(Thread.Builder.OfVirtual builder) throws Exception {
141         VThreadRunner.run(builder, () -> {
142             try (var connection = new Connection()) {
143                 SocketChannel sc1 = connection.channel1();
144                 SocketChannel sc2 = connection.channel2();
145 
146                 // write to sc1 when current thread blocks in sc2.read
147                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
148                 runAfterParkedAsync(() -> sc1.write(bb1));
149 
150                 // read from sc2 should block
151                 ByteBuffer bb2 = ByteBuffer.allocate(10);
152                 int n = sc2.read(bb2);
153                 assertTrue(n > 0);
154                 assertTrue(bb2.get(0) == 'X');
155             }
156         });
157     }
158 
159     /**
160      * Virtual thread blocks in SocketChannel write.
161      */
162     @ParameterizedTest
163     @MethodSource("threadBuilders")
164     void testSocketChannelWrite(Thread.Builder.OfVirtual builder) throws Exception {
165         VThreadRunner.run(builder, () -> {
166             try (var connection = new Connection()) {
167                 SocketChannel sc1 = connection.channel1();
168                 SocketChannel sc2 = connection.channel2();
169 
170                 // read from sc2 to EOF when current thread blocks in sc1.write
171                 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
172 
173                 // write to sc1 should block
174                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
175                 for (int i=0; i<1000; i++) {
176                     int n = sc1.write(bb);
177                     assertTrue(n > 0);
178                     bb.clear();
179                 }
180                 sc1.close();
181 
182                 // wait for reader to finish
183                 reader.join();
184             }
185         });
186     }
187 
188     /**
189      * SocketChannel close while virtual thread blocked in read.
190      */
191     @ParameterizedTest
192     @MethodSource("threadBuilders")
193     void testSocketChannelReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
194         VThreadRunner.run(builder, () -> {
195             try (var connection = new Connection()) {
196                 SocketChannel sc = connection.channel1();
197                 runAfterParkedAsync(sc::close);
198                 try {
199                     int n = sc.read(ByteBuffer.allocate(100));
200                     fail("read returned " + n);
201                 } catch (AsynchronousCloseException expected) { }
202             }
203         });
204     }
205 
206     /**
207      * SocketChannel shutdownInput while virtual thread blocked in read.
208      */
209     @ParameterizedTest
210     @MethodSource("threadBuilders")
211     void testSocketChannelReadAsyncShutdownInput(Thread.Builder.OfVirtual builder) throws Exception {
212         VThreadRunner.run(builder, () -> {
213             try (var connection = new Connection()) {
214                 SocketChannel sc = connection.channel1();
215                 runAfterParkedAsync(sc::shutdownInput);
216                 int n = sc.read(ByteBuffer.allocate(100));
217                 assertEquals(-1, n);
218                 assertTrue(sc.isOpen());
219             }
220         });
221     }
222 
223     /**
224      * Virtual thread interrupted while blocked in SocketChannel read.
225      */
226     @ParameterizedTest
227     @MethodSource("threadBuilders")
228     void testSocketChannelReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
229         VThreadRunner.run(builder, () -> {
230             try (var connection = new Connection()) {
231                 SocketChannel sc = connection.channel1();
232 
233                 // interrupt current thread when it blocks in read
234                 Thread thisThread = Thread.currentThread();
235                 runAfterParkedAsync(thisThread::interrupt);
236 
237                 try {
238                     int n = sc.read(ByteBuffer.allocate(100));
239                     fail("read returned " + n);
240                 } catch (ClosedByInterruptException expected) {
241                     assertTrue(Thread.interrupted());
242                 }
243             }
244         });
245     }
246 
247     /**
248      * SocketChannel close while virtual thread blocked in write.
249      */
250     @ParameterizedTest
251     @MethodSource("threadBuilders")
252     void testSocketChannelWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
253         VThreadRunner.run(builder, () -> {
254             boolean retry = true;
255             while (retry) {
256                 try (var connection = new Connection()) {
257                     SocketChannel sc = connection.channel1();
258 
259                     // close sc when current thread blocks in write
260                     runAfterParkedAsync(sc::close);
261                     try {
262                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
263                         for (;;) {
264                             int n = sc.write(bb);
265                             assertTrue(n > 0);
266                             bb.clear();
267                         }
268                     } catch (AsynchronousCloseException expected) {
269                         // closed when blocked in write
270                         retry = false;
271                     } catch (ClosedChannelException e) {
272                         // closed when not blocked in write, need to retry test
273                     }
274                 }
275             }
276         });
277     }
278 
279 
280     /**
281      * SocketChannel shutdownOutput while virtual thread blocked in write.
282      */
283     @ParameterizedTest
284     @MethodSource("threadBuilders")
285     void testSocketChannelWriteAsyncShutdownOutput(Thread.Builder.OfVirtual builder) throws Exception {
286         VThreadRunner.run(builder, () -> {
287             try (var connection = new Connection()) {
288                 SocketChannel sc = connection.channel1();
289 
290                 // shutdown output when current thread blocks in write
291                 runAfterParkedAsync(sc::shutdownOutput);
292                 try {
293                     ByteBuffer bb = ByteBuffer.allocate(100*1024);
294                     for (;;) {
295                         int n = sc.write(bb);
296                         assertTrue(n > 0);
297                         bb.clear();
298                     }
299                 } catch (ClosedChannelException e) {
300                     // expected
301                 }
302                 assertTrue(sc.isOpen());
303             }
304         });
305     }
306 
307     /**
308      * Virtual thread interrupted while blocked in SocketChannel write.
309      */
310     @ParameterizedTest
311     @MethodSource("threadBuilders")
312     void testSocketChannelWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
313         VThreadRunner.run(builder, () -> {
314             boolean retry = true;
315             while (retry) {
316                 try (var connection = new Connection()) {
317                     SocketChannel sc = connection.channel1();
318 
319                     // interrupt current thread when it blocks in write
320                     Thread thisThread = Thread.currentThread();
321                     runAfterParkedAsync(thisThread::interrupt);
322 
323                     try {
324                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
325                         for (;;) {
326                             int n = sc.write(bb);
327                             assertTrue(n > 0);
328                             bb.clear();
329                         }
330                     } catch (ClosedByInterruptException e) {
331                         // closed when blocked in write
332                         assertTrue(Thread.interrupted());
333                         retry = false;
334                     } catch (ClosedChannelException e) {
335                         // closed when not blocked in write, need to retry test
336                     }
337                 }
338             }
339         });
340     }
341 
342     /**
343      * Virtual thread blocks in SocketChannel adaptor read.
344      */
345     @ParameterizedTest
346     @MethodSource("threadBuilders")
347     void testSocketAdaptorRead1(Thread.Builder.OfVirtual builder) throws Exception {
348         testSocketAdaptorRead(builder, 0);
349     }
350 
351     /**
352      * Virtual thread blocks in SocketChannel adaptor read with timeout.
353      */
354     @ParameterizedTest
355     @MethodSource("threadBuilders")
356     void testSocketAdaptorRead2(Thread.Builder.OfVirtual builder) throws Exception {
357         testSocketAdaptorRead(builder, 60_000);
358     }
359 
360     private void testSocketAdaptorRead(Thread.Builder.OfVirtual builder,
361                                        int timeout) throws Exception {
362         VThreadRunner.run(builder, () -> {
363             try (var connection = new Connection()) {
364                 SocketChannel sc1 = connection.channel1();
365                 SocketChannel sc2 = connection.channel2();
366 
367                 // write to sc1 when currnet thread blocks reading from sc2
368                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
369                 runAfterParkedAsync(() -> sc1.write(bb));
370 
371                 // read from sc2 should block
372                 byte[] array = new byte[100];
373                 if (timeout > 0)
374                     sc2.socket().setSoTimeout(timeout);
375                 int n = sc2.socket().getInputStream().read(array);
376                 assertTrue(n > 0);
377                 assertTrue(array[0] == 'X');
378             }
379         });
380     }
381 
382     /**
383      * ServerSocketChannel accept, no blocking.
384      */
385     @ParameterizedTest
386     @MethodSource("threadBuilders")
387     void testServerSocketChannelAccept1(Thread.Builder.OfVirtual builder) throws Exception {
388         VThreadRunner.run(builder, () -> {
389             try (var ssc = ServerSocketChannel.open()) {
390                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
391                 var sc1 = SocketChannel.open(ssc.getLocalAddress());
392                 // accept should not block
393                 var sc2 = ssc.accept();
394                 sc1.close();
395                 sc2.close();
396             }
397         });
398     }
399 
400     /**
401      * Virtual thread blocks in ServerSocketChannel accept.
402      */
403     @ParameterizedTest
404     @MethodSource("threadBuilders")
405     void testServerSocketChannelAccept2(Thread.Builder.OfVirtual builder) throws Exception {
406         VThreadRunner.run(builder, () -> {
407             try (var ssc = ServerSocketChannel.open()) {
408                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
409                 var sc1 = SocketChannel.open();
410 
411                 // connect when current thread when it blocks in accept
412                 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
413 
414                 // accept should block
415                 var sc2 = ssc.accept();
416                 sc1.close();
417                 sc2.close();
418             }
419         });
420     }
421 
422     /**
423      * SeverSocketChannel close while virtual thread blocked in accept.
424      */
425     @ParameterizedTest
426     @MethodSource("threadBuilders")
427     void testServerSocketChannelAcceptAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
428         VThreadRunner.run(builder, () -> {
429             try (var ssc = ServerSocketChannel.open()) {
430                 InetAddress lh = InetAddress.getLoopbackAddress();
431                 ssc.bind(new InetSocketAddress(lh, 0));
432                 runAfterParkedAsync(ssc::close);
433                 try {
434                     SocketChannel sc = ssc.accept();
435                     sc.close();
436                     fail("connection accepted???");
437                 } catch (AsynchronousCloseException expected) { }
438             }
439         });
440     }
441 
442     /**
443      * Virtual thread interrupted while blocked in ServerSocketChannel accept.
444      */
445     @ParameterizedTest
446     @MethodSource("threadBuilders")
447     void testServerSocketChannelAcceptInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
448         VThreadRunner.run(builder, () -> {
449             try (var ssc = ServerSocketChannel.open()) {
450                 InetAddress lh = InetAddress.getLoopbackAddress();
451                 ssc.bind(new InetSocketAddress(lh, 0));
452 
453                 // interrupt current thread when it blocks in accept
454                 Thread thisThread = Thread.currentThread();
455                 runAfterParkedAsync(thisThread::interrupt);
456 
457                 try {
458                     SocketChannel sc = ssc.accept();
459                     sc.close();
460                     fail("connection accepted???");
461                 } catch (ClosedByInterruptException expected) {
462                     assertTrue(Thread.interrupted());
463                 }
464             }
465         });
466     }
467 
468     /**
469      * Virtual thread blocks in ServerSocketChannel adaptor accept.
470      */
471     @ParameterizedTest
472     @MethodSource("threadBuilders")
473     void testSocketChannelAdaptorAccept1(Thread.Builder.OfVirtual builder) throws Exception {
474         testSocketChannelAdaptorAccept(builder, 0);
475     }
476 
477     /**
478      * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
479      */
480     @ParameterizedTest
481     @MethodSource("threadBuilders")
482     void testSocketChannelAdaptorAccept2(Thread.Builder.OfVirtual builder) throws Exception {
483         testSocketChannelAdaptorAccept(builder, 60_000);
484     }
485 
486     private void testSocketChannelAdaptorAccept(Thread.Builder.OfVirtual builder,
487                                                 int timeout) throws Exception {
488         VThreadRunner.run(builder, () -> {
489             try (var ssc = ServerSocketChannel.open()) {
490                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
491                 var sc = SocketChannel.open();
492 
493                 // interrupt current thread when it blocks in accept
494                 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
495 
496                 // accept should block
497                 if (timeout > 0)
498                     ssc.socket().setSoTimeout(timeout);
499                 Socket s = ssc.socket().accept();
500                 sc.close();
501                 s.close();
502             }
503         });
504     }
505 
506     /**
507      * DatagramChannel receive/send, no blocking.
508      */
509     @ParameterizedTest
510     @MethodSource("threadBuilders")
511     void testDatagramChannelSendReceive1(Thread.Builder.OfVirtual builder) throws Exception {
512         VThreadRunner.run(builder, () -> {
513             try (DatagramChannel dc1 = DatagramChannel.open();
514                  DatagramChannel dc2 = DatagramChannel.open()) {
515 
516                 InetAddress lh = InetAddress.getLoopbackAddress();
517                 dc2.bind(new InetSocketAddress(lh, 0));
518 
519                 // send should not block
520                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
521                 int n = dc1.send(bb, dc2.getLocalAddress());
522                 assertTrue(n > 0);
523 
524                 // receive should not block
525                 bb = ByteBuffer.allocate(10);
526                 dc2.receive(bb);
527                 assertTrue(bb.get(0) == 'X');
528             }
529         });
530     }
531 
532     /**
533      * Virtual thread blocks in DatagramChannel receive.
534      */
535     @ParameterizedTest
536     @MethodSource("threadBuilders")
537     void testDatagramChannelSendReceive2(Thread.Builder.OfVirtual builder) throws Exception {
538         VThreadRunner.run(builder, () -> {
539             try (DatagramChannel dc1 = DatagramChannel.open();
540                  DatagramChannel dc2 = DatagramChannel.open()) {
541 
542                 InetAddress lh = InetAddress.getLoopbackAddress();
543                 dc2.bind(new InetSocketAddress(lh, 0));
544 
545                 // send from dc1 when current thread blocked in dc2.receive
546                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
547                 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
548 
549                 // read from dc2 should block
550                 ByteBuffer bb2 = ByteBuffer.allocate(10);
551                 dc2.receive(bb2);
552                 assertTrue(bb2.get(0) == 'X');
553             }
554         });
555     }
556 
557     /**
558      * DatagramChannel close while virtual thread blocked in receive.
559      */
560     @ParameterizedTest
561     @MethodSource("threadBuilders")
562     void testDatagramChannelReceiveAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
563         VThreadRunner.run(builder, () -> {
564             try (DatagramChannel dc = DatagramChannel.open()) {
565                 InetAddress lh = InetAddress.getLoopbackAddress();
566                 dc.bind(new InetSocketAddress(lh, 0));
567                 runAfterParkedAsync(dc::close);
568                 try {
569                     dc.receive(ByteBuffer.allocate(100));
570                     fail("receive returned");
571                 } catch (AsynchronousCloseException expected) { }
572             }
573         });
574     }
575 
576     /**
577      * Virtual thread interrupted while blocked in DatagramChannel receive.
578      */
579     @ParameterizedTest
580     @MethodSource("threadBuilders")
581     void testDatagramChannelReceiveInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
582         VThreadRunner.run(builder, () -> {
583             try (DatagramChannel dc = DatagramChannel.open()) {
584                 InetAddress lh = InetAddress.getLoopbackAddress();
585                 dc.bind(new InetSocketAddress(lh, 0));
586 
587                 // interrupt current thread when it blocks in receive
588                 Thread thisThread = Thread.currentThread();
589                 runAfterParkedAsync(thisThread::interrupt);
590 
591                 try {
592                     dc.receive(ByteBuffer.allocate(100));
593                     fail("receive returned");
594                 } catch (ClosedByInterruptException expected) {
595                     assertTrue(Thread.interrupted());
596                 }
597             }
598         });
599     }
600 
601     /**
602      * Virtual thread blocks in DatagramSocket adaptor receive.
603      */
604     @ParameterizedTest
605     @MethodSource("threadBuilders")
606     void testDatagramSocketAdaptorReceive1(Thread.Builder.OfVirtual builder) throws Exception {
607         testDatagramSocketAdaptorReceive(builder, 0);
608     }
609 
610     /**
611      * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
612      */
613     @ParameterizedTest
614     @MethodSource("threadBuilders")
615     void testDatagramSocketAdaptorReceive2(Thread.Builder.OfVirtual builder) throws Exception {
616         testDatagramSocketAdaptorReceive(builder, 60_000);
617     }
618 
619     private void testDatagramSocketAdaptorReceive(Thread.Builder.OfVirtual builder,
620                                                   int timeout) throws Exception {
621         VThreadRunner.run(builder, () -> {
622             try (DatagramChannel dc1 = DatagramChannel.open();
623                  DatagramChannel dc2 = DatagramChannel.open()) {
624 
625                 InetAddress lh = InetAddress.getLoopbackAddress();
626                 dc2.bind(new InetSocketAddress(lh, 0));
627 
628                 // send from dc1 when current thread blocks in dc2 receive
629                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
630                 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
631 
632                 // receive should block
633                 byte[] array = new byte[100];
634                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
635                 if (timeout > 0)
636                     dc2.socket().setSoTimeout(timeout);
637                 dc2.socket().receive(p);
638                 assertTrue(p.getLength() == 3 && array[0] == 'X');
639             }
640         });
641     }
642 
643     /**
644      * DatagramChannel close while virtual thread blocked in adaptor receive.
645      */
646     @ParameterizedTest
647     @MethodSource("threadBuilders")
648     void testDatagramSocketAdaptorReceiveAsyncClose1(Thread.Builder.OfVirtual builder) throws Exception {
649         testDatagramSocketAdaptorReceiveAsyncClose(builder, 0);
650     }
651 
652     /**
653      * DatagramChannel close while virtual thread blocked in adaptor receive
654      * with timeout.
655      */
656     @ParameterizedTest
657     @MethodSource("threadBuilders")
658     void testDatagramSocketAdaptorReceiveAsyncClose2(Thread.Builder.OfVirtual builder) throws Exception {
659         testDatagramSocketAdaptorReceiveAsyncClose(builder, 60_1000);
660     }
661 
662     private void testDatagramSocketAdaptorReceiveAsyncClose(Thread.Builder.OfVirtual builder,
663                                                             int timeout) throws Exception {
664         VThreadRunner.run(builder, () -> {
665             try (DatagramChannel dc = DatagramChannel.open()) {
666                 InetAddress lh = InetAddress.getLoopbackAddress();
667                 dc.bind(new InetSocketAddress(lh, 0));
668 
669                 byte[] array = new byte[100];
670                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
671                 if (timeout > 0)
672                     dc.socket().setSoTimeout(timeout);
673 
674                 // close channel/socket when current thread blocks in receive
675                 runAfterParkedAsync(dc::close);
676 
677                 assertThrows(SocketException.class, () -> dc.socket().receive(p));
678             }
679         });
680     }
681 
682     /**
683      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
684      */
685     @ParameterizedTest
686     @MethodSource("threadBuilders")
687     void testDatagramSocketAdaptorReceiveInterrupt1(Thread.Builder.OfVirtual builder) throws Exception {
688         testDatagramSocketAdaptorReceiveInterrupt(builder, 0);
689     }
690 
691     /**
692      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
693      * with timeout.
694      */
695     @ParameterizedTest
696     @MethodSource("threadBuilders")
697     void testDatagramSocketAdaptorReceiveInterrupt2(Thread.Builder.OfVirtual builder) throws Exception {
698         testDatagramSocketAdaptorReceiveInterrupt(builder, 60_1000);
699     }
700 
701     private void testDatagramSocketAdaptorReceiveInterrupt(Thread.Builder.OfVirtual builder,
702                                                            int timeout) throws Exception {
703         VThreadRunner.run(builder, () -> {
704             try (DatagramChannel dc = DatagramChannel.open()) {
705                 InetAddress lh = InetAddress.getLoopbackAddress();
706                 dc.bind(new InetSocketAddress(lh, 0));
707 
708                 byte[] array = new byte[100];
709                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
710                 if (timeout > 0)
711                     dc.socket().setSoTimeout(timeout);
712 
713                 // interrupt current thread when it blocks in receive
714                 Thread thisThread = Thread.currentThread();
715                 runAfterParkedAsync(thisThread::interrupt);
716 
717                 try {
718                     dc.socket().receive(p);
719                     fail();
720                 } catch (ClosedByInterruptException expected) {
721                     assertTrue(Thread.interrupted());
722                 }
723             }
724         });
725     }
726 
727     /**
728      * Pipe read/write, no blocking.
729      */
730     @ParameterizedTest
731     @MethodSource("threadBuilders")
732     void testPipeReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
733         VThreadRunner.run(builder, () -> {
734             Pipe p = Pipe.open();
735             try (Pipe.SinkChannel sink = p.sink();
736                  Pipe.SourceChannel source = p.source()) {
737 
738                 // write should not block
739                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
740                 int n = sink.write(bb);
741                 assertTrue(n > 0);
742 
743                 // read should not block
744                 bb = ByteBuffer.allocate(10);
745                 n = source.read(bb);
746                 assertTrue(n > 0);
747                 assertTrue(bb.get(0) == 'X');
748             }
749         });
750     }
751 
752     /**
753      * Virtual thread blocks in Pipe.SourceChannel read.
754      */
755     @ParameterizedTest
756     @MethodSource("threadBuilders")
757     void testPipeReadWrite2(Thread.Builder.OfVirtual builder) throws Exception {
758         VThreadRunner.run(builder, () -> {
759             Pipe p = Pipe.open();
760             try (Pipe.SinkChannel sink = p.sink();
761                  Pipe.SourceChannel source = p.source()) {
762 
763                 // write from sink when current thread blocks reading from source
764                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
765                 runAfterParkedAsync(() -> sink.write(bb1));
766 
767                 // read should block
768                 ByteBuffer bb2 = ByteBuffer.allocate(10);
769                 int n = source.read(bb2);
770                 assertTrue(n > 0);
771                 assertTrue(bb2.get(0) == 'X');
772             }
773         });
774     }
775 
776     /**
777      * Virtual thread blocks in Pipe.SinkChannel write.
778      */
779     @ParameterizedTest
780     @MethodSource("threadBuilders")
781     void testPipeReadWrite3(Thread.Builder.OfVirtual builder) throws Exception {
782         VThreadRunner.run(builder, () -> {
783             Pipe p = Pipe.open();
784             try (Pipe.SinkChannel sink = p.sink();
785                  Pipe.SourceChannel source = p.source()) {
786 
787                 // read from source to EOF when current thread blocking in write
788                 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
789 
790                 // write to sink should block
791                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
792                 for (int i=0; i<1000; i++) {
793                     int n = sink.write(bb);
794                     assertTrue(n > 0);
795                     bb.clear();
796                 }
797                 sink.close();
798 
799                 // wait for reader to finish
800                 reader.join();
801             }
802         });
803     }
804 
805     /**
806      * Pipe.SourceChannel close while virtual thread blocked in read.
807      */
808     @ParameterizedTest
809     @MethodSource("threadBuilders")
810     void testPipeReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
811         VThreadRunner.run(builder, () -> {
812             Pipe p = Pipe.open();
813             try (Pipe.SinkChannel sink = p.sink();
814                  Pipe.SourceChannel source = p.source()) {
815                 runAfterParkedAsync(source::close);
816                 try {
817                     int n = source.read(ByteBuffer.allocate(100));
818                     fail("read returned " + n);
819                 } catch (AsynchronousCloseException expected) { }
820             }
821         });
822     }
823 
824     /**
825      * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
826      */
827     @ParameterizedTest
828     @MethodSource("threadBuilders")
829     void testPipeReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
830         VThreadRunner.run(builder, () -> {
831             Pipe p = Pipe.open();
832             try (Pipe.SinkChannel sink = p.sink();
833                  Pipe.SourceChannel source = p.source()) {
834 
835                 // interrupt current thread when it blocks reading from source
836                 Thread thisThread = Thread.currentThread();
837                 runAfterParkedAsync(thisThread::interrupt);
838 
839                 try {
840                     int n = source.read(ByteBuffer.allocate(100));
841                     fail("read returned " + n);
842                 } catch (ClosedByInterruptException expected) {
843                     assertTrue(Thread.interrupted());
844                 }
845             }
846         });
847     }
848 
849     /**
850      * Pipe.SinkChannel close while virtual thread blocked in write.
851      */
852     @ParameterizedTest
853     @MethodSource("threadBuilders")
854     void testPipeWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
855         VThreadRunner.run(builder, () -> {
856             boolean retry = true;
857             while (retry) {
858                 Pipe p = Pipe.open();
859                 try (Pipe.SinkChannel sink = p.sink();
860                      Pipe.SourceChannel source = p.source()) {
861 
862                     // close sink when current thread blocks in write
863                     runAfterParkedAsync(sink::close);
864                     try {
865                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
866                         for (;;) {
867                             int n = sink.write(bb);
868                             assertTrue(n > 0);
869                             bb.clear();
870                         }
871                     } catch (AsynchronousCloseException e) {
872                         // closed when blocked in write
873                         retry = false;
874                     } catch (ClosedChannelException e) {
875                         // closed when not blocked in write, need to retry test
876                     }
877                 }
878             }
879         });
880     }
881 
882     /**
883      * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
884      */
885     @ParameterizedTest
886     @MethodSource("threadBuilders")
887     void testPipeWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
888         VThreadRunner.run(builder, () -> {
889             boolean retry = true;
890             while (retry) {
891                 Pipe p = Pipe.open();
892                 try (Pipe.SinkChannel sink = p.sink();
893                      Pipe.SourceChannel source = p.source()) {
894 
895                     // interrupt current thread when it blocks in write
896                     Thread thisThread = Thread.currentThread();
897                     runAfterParkedAsync(thisThread::interrupt);
898 
899                     try {
900                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
901                         for (;;) {
902                             int n = sink.write(bb);
903                             assertTrue(n > 0);
904                             bb.clear();
905                         }
906                     } catch (ClosedByInterruptException expected) {
907                         // closed when blocked in write
908                         assertTrue(Thread.interrupted());
909                         retry = false;
910                     } catch (ClosedChannelException e) {
911                         // closed when not blocked in write, need to retry test
912                     }
913                 }
914             }
915         });
916     }
917 
918     /**
919      * Creates a loopback connection
920      */
921     static class Connection implements Closeable {
922         private final SocketChannel sc1;
923         private final SocketChannel sc2;
924         Connection() throws IOException {
925             var lh = InetAddress.getLoopbackAddress();
926             try (var listener = ServerSocketChannel.open()) {
927                 listener.bind(new InetSocketAddress(lh, 0));
928                 SocketChannel sc1 = SocketChannel.open();
929                 SocketChannel sc2 = null;
930                 try {
931                     sc1.socket().connect(listener.getLocalAddress());
932                     sc2 = listener.accept();
933                 } catch (IOException ioe) {
934                     sc1.close();
935                     throw ioe;
936                 }
937                 this.sc1 = sc1;
938                 this.sc2 = sc2;
939             }
940         }
941         SocketChannel channel1() {
942             return sc1;
943         }
944         SocketChannel channel2() {
945             return sc2;
946         }
947         @Override
948         public void close() throws IOException {
949             sc1.close();
950             sc2.close();
951         }
952     }
953 
954     /**
955      * Read from a channel until all bytes have been read or an I/O error occurs.
956      */
957     static void readToEOF(ReadableByteChannel rbc) throws IOException {
958         ByteBuffer bb = ByteBuffer.allocate(16*1024);
959         int n;
960         while ((n = rbc.read(bb)) > 0) {
961             bb.clear();
962         }
963     }
964 
965     @FunctionalInterface
966     interface ThrowingRunnable {
967         void run() throws Exception;
968     }
969 
970     /**
971      * Runs the given task asynchronously after the current virtual thread has parked.
972      * @return the thread started to run the task
973      */
974     static Thread runAfterParkedAsync(ThrowingRunnable task) {
975         Thread target = Thread.currentThread();
976         if (!target.isVirtual())
977             throw new WrongThreadException();
978         return Thread.ofPlatform().daemon().start(() -> {
979             try {
980                 Thread.State state = target.getState();
981                 while (state != Thread.State.WAITING
982                         && state != Thread.State.TIMED_WAITING) {
983                     Thread.sleep(20);
984                     state = target.getState();
985                 }
986                 Thread.sleep(20);  // give a bit more time to release carrier
987                 task.run();
988             } catch (Exception e) {
989                 e.printStackTrace();
990             }
991         });
992     }
993 }