1 /*
  2  * Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /**
 25  * @test id=default
 26  * @bug 8284161
 27  * @summary Test virtual threads doing blocking I/O on NIO channels
 28  * @library /test/lib
 29  * @run junit BlockingChannelOps
 30  */
 31 
 32 /**
 33  * @test id=poller-modes
 34  * @requires (os.family == "linux") | (os.family == "mac")
 35  * @library /test/lib
 36  * @run junit/othervm -Djdk.pollerMode=1 BlockingChannelOps
 37  * @run junit/othervm -Djdk.pollerMode=2 BlockingChannelOps
 38  */
 39 
 40 /**
 41  * @test id=no-vmcontinuations
 42  * @requires vm.continuations
 43  * @library /test/lib
 44  * @run junit/othervm -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
 45  */
 46 
 47 import java.io.Closeable;
 48 import java.io.IOException;
 49 import java.net.DatagramPacket;
 50 import java.net.InetAddress;
 51 import java.net.InetSocketAddress;
 52 import java.net.Socket;
 53 import java.net.SocketAddress;
 54 import java.net.SocketException;
 55 import java.nio.ByteBuffer;
 56 import java.nio.channels.AsynchronousCloseException;
 57 import java.nio.channels.ClosedByInterruptException;
 58 import java.nio.channels.ClosedChannelException;
 59 import java.nio.channels.DatagramChannel;
 60 import java.nio.channels.Pipe;
 61 import java.nio.channels.ReadableByteChannel;
 62 import java.nio.channels.ServerSocketChannel;
 63 import java.nio.channels.SocketChannel;
 64 import java.nio.channels.WritableByteChannel;
 65 import java.util.concurrent.ExecutorService;
 66 import java.util.concurrent.Executors;
 67 import java.util.stream.Stream;
 68 
 69 import jdk.test.lib.thread.VThreadRunner;
 70 import jdk.test.lib.thread.VThreadScheduler;
 71 
 72 import org.junit.jupiter.api.BeforeAll;
 73 import org.junit.jupiter.api.AfterAll;
 74 import org.junit.jupiter.params.ParameterizedTest;
 75 import org.junit.jupiter.params.provider.MethodSource;
 76 import static org.junit.jupiter.api.Assertions.*;
 77 
 78 class BlockingChannelOps {
 79     private static ExecutorService threadPool;
 80     private static Thread.VirtualThreadScheduler customScheduler;
 81 
 82     @BeforeAll
 83     static void setup() {
 84         threadPool = Executors.newCachedThreadPool();
 85         customScheduler = (_, task) -> threadPool.execute(task);
 86     }
 87 
 88     @AfterAll
 89     static void finish() {
 90         threadPool.shutdown();
 91     }
 92 
 93     /**
 94      * Returns the Thread.Builder to create the virtual thread.
 95      */
 96     static Stream<Thread.Builder.OfVirtual> threadBuilders() {
 97         if (VThreadScheduler.supportsCustomScheduler()) {
 98             return Stream.of(Thread.ofVirtual(), Thread.ofVirtual().scheduler(customScheduler));
 99         } else {
100             return Stream.of(Thread.ofVirtual());
101         }
102     }
103 
104     /**
105      * SocketChannel read/write, no blocking.
106      */
107     @ParameterizedTest
108     @MethodSource("threadBuilders")
109     void testSocketChannelReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
110         VThreadRunner.run(builder, () -> {
111             try (var connection = new Connection()) {
112                 SocketChannel sc1 = connection.channel1();
113                 SocketChannel sc2 = connection.channel2();
114 
115                 // write to sc1
116                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
117                 int n = sc1.write(bb);
118                 assertTrue(n > 0);
119 
120                 // read from sc2 should not block
121                 bb = ByteBuffer.allocate(10);
122                 n = sc2.read(bb);
123                 assertTrue(n > 0);
124                 assertTrue(bb.get(0) == 'X');
125             }
126         });
127     }
128 
129     /**
130      * Virtual thread blocks in SocketChannel read.
131      */
132     @ParameterizedTest
133     @MethodSource("threadBuilders")
134     void testSocketChannelRead(Thread.Builder.OfVirtual builder) throws Exception {
135         VThreadRunner.run(builder, () -> {
136             try (var connection = new Connection()) {
137                 SocketChannel sc1 = connection.channel1();
138                 SocketChannel sc2 = connection.channel2();
139 
140                 // write to sc1 when current thread blocks in sc2.read
141                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
142                 runAfterParkedAsync(() -> sc1.write(bb1));
143 
144                 // read from sc2 should block
145                 ByteBuffer bb2 = ByteBuffer.allocate(10);
146                 int n = sc2.read(bb2);
147                 assertTrue(n > 0);
148                 assertTrue(bb2.get(0) == 'X');
149             }
150         });
151     }
152 
153     /**
154      * Virtual thread blocks in SocketChannel write.
155      */
156     @ParameterizedTest
157     @MethodSource("threadBuilders")
158     void testSocketChannelWrite(Thread.Builder.OfVirtual builder) throws Exception {
159         VThreadRunner.run(builder, () -> {
160             try (var connection = new Connection()) {
161                 SocketChannel sc1 = connection.channel1();
162                 SocketChannel sc2 = connection.channel2();
163 
164                 // read from sc2 to EOF when current thread blocks in sc1.write
165                 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
166 
167                 // write to sc1 should block
168                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
169                 for (int i=0; i<1000; i++) {
170                     int n = sc1.write(bb);
171                     assertTrue(n > 0);
172                     bb.clear();
173                 }
174                 sc1.close();
175 
176                 // wait for reader to finish
177                 reader.join();
178             }
179         });
180     }
181 
182     /**
183      * SocketChannel close while virtual thread blocked in read.
184      */
185     @ParameterizedTest
186     @MethodSource("threadBuilders")
187     void testSocketChannelReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
188         VThreadRunner.run(builder, () -> {
189             try (var connection = new Connection()) {
190                 SocketChannel sc = connection.channel1();
191                 runAfterParkedAsync(sc::close);
192                 try {
193                     int n = sc.read(ByteBuffer.allocate(100));
194                     fail("read returned " + n);
195                 } catch (AsynchronousCloseException expected) { }
196             }
197         });
198     }
199 
200     /**
201      * SocketChannel shutdownInput while virtual thread blocked in read.
202      */
203     @ParameterizedTest
204     @MethodSource("threadBuilders")
205     void testSocketChannelReadAsyncShutdownInput(Thread.Builder.OfVirtual builder) throws Exception {
206         VThreadRunner.run(builder, () -> {
207             try (var connection = new Connection()) {
208                 SocketChannel sc = connection.channel1();
209                 runAfterParkedAsync(sc::shutdownInput);
210                 int n = sc.read(ByteBuffer.allocate(100));
211                 assertEquals(-1, n);
212                 assertTrue(sc.isOpen());
213             }
214         });
215     }
216 
217     /**
218      * Virtual thread interrupted while blocked in SocketChannel read.
219      */
220     @ParameterizedTest
221     @MethodSource("threadBuilders")
222     void testSocketChannelReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
223         VThreadRunner.run(builder, () -> {
224             try (var connection = new Connection()) {
225                 SocketChannel sc = connection.channel1();
226 
227                 // interrupt current thread when it blocks in read
228                 Thread thisThread = Thread.currentThread();
229                 runAfterParkedAsync(thisThread::interrupt);
230 
231                 try {
232                     int n = sc.read(ByteBuffer.allocate(100));
233                     fail("read returned " + n);
234                 } catch (ClosedByInterruptException expected) {
235                     assertTrue(Thread.interrupted());
236                 }
237             }
238         });
239     }
240 
241     /**
242      * SocketChannel close while virtual thread blocked in write.
243      */
244     @ParameterizedTest
245     @MethodSource("threadBuilders")
246     void testSocketChannelWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
247         VThreadRunner.run(builder, () -> {
248             boolean retry = true;
249             while (retry) {
250                 try (var connection = new Connection()) {
251                     SocketChannel sc = connection.channel1();
252 
253                     // close sc when current thread blocks in write
254                     runAfterParkedAsync(sc::close);
255                     try {
256                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
257                         for (;;) {
258                             int n = sc.write(bb);
259                             assertTrue(n > 0);
260                             bb.clear();
261                         }
262                     } catch (AsynchronousCloseException expected) {
263                         // closed when blocked in write
264                         retry = false;
265                     } catch (ClosedChannelException e) {
266                         // closed when not blocked in write, need to retry test
267                     }
268                 }
269             }
270         });
271     }
272 
273 
274     /**
275      * SocketChannel shutdownOutput while virtual thread blocked in write.
276      */
277     @ParameterizedTest
278     @MethodSource("threadBuilders")
279     void testSocketChannelWriteAsyncShutdownOutput(Thread.Builder.OfVirtual builder) throws Exception {
280         VThreadRunner.run(builder, () -> {
281             try (var connection = new Connection()) {
282                 SocketChannel sc = connection.channel1();
283 
284                 // shutdown output when current thread blocks in write
285                 runAfterParkedAsync(sc::shutdownOutput);
286                 try {
287                     ByteBuffer bb = ByteBuffer.allocate(100*1024);
288                     for (;;) {
289                         int n = sc.write(bb);
290                         assertTrue(n > 0);
291                         bb.clear();
292                     }
293                 } catch (ClosedChannelException e) {
294                     // expected
295                 }
296                 assertTrue(sc.isOpen());
297             }
298         });
299     }
300 
301     /**
302      * Virtual thread interrupted while blocked in SocketChannel write.
303      */
304     @ParameterizedTest
305     @MethodSource("threadBuilders")
306     void testSocketChannelWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
307         VThreadRunner.run(builder, () -> {
308             boolean retry = true;
309             while (retry) {
310                 try (var connection = new Connection()) {
311                     SocketChannel sc = connection.channel1();
312 
313                     // interrupt current thread when it blocks in write
314                     Thread thisThread = Thread.currentThread();
315                     runAfterParkedAsync(thisThread::interrupt);
316 
317                     try {
318                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
319                         for (;;) {
320                             int n = sc.write(bb);
321                             assertTrue(n > 0);
322                             bb.clear();
323                         }
324                     } catch (ClosedByInterruptException e) {
325                         // closed when blocked in write
326                         assertTrue(Thread.interrupted());
327                         retry = false;
328                     } catch (ClosedChannelException e) {
329                         // closed when not blocked in write, need to retry test
330                     }
331                 }
332             }
333         });
334     }
335 
336     /**
337      * Virtual thread blocks in SocketChannel adaptor read.
338      */
339     @ParameterizedTest
340     @MethodSource("threadBuilders")
341     void testSocketAdaptorRead1(Thread.Builder.OfVirtual builder) throws Exception {
342         testSocketAdaptorRead(builder, 0);
343     }
344 
345     /**
346      * Virtual thread blocks in SocketChannel adaptor read with timeout.
347      */
348     @ParameterizedTest
349     @MethodSource("threadBuilders")
350     void testSocketAdaptorRead2(Thread.Builder.OfVirtual builder) throws Exception {
351         testSocketAdaptorRead(builder, 60_000);
352     }
353 
354     private void testSocketAdaptorRead(Thread.Builder.OfVirtual builder,
355                                        int timeout) throws Exception {
356         VThreadRunner.run(builder, () -> {
357             try (var connection = new Connection()) {
358                 SocketChannel sc1 = connection.channel1();
359                 SocketChannel sc2 = connection.channel2();
360 
361                 // write to sc1 when currnet thread blocks reading from sc2
362                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
363                 runAfterParkedAsync(() -> sc1.write(bb));
364 
365                 // read from sc2 should block
366                 byte[] array = new byte[100];
367                 if (timeout > 0)
368                     sc2.socket().setSoTimeout(timeout);
369                 int n = sc2.socket().getInputStream().read(array);
370                 assertTrue(n > 0);
371                 assertTrue(array[0] == 'X');
372             }
373         });
374     }
375 
376     /**
377      * ServerSocketChannel accept, no blocking.
378      */
379     @ParameterizedTest
380     @MethodSource("threadBuilders")
381     void testServerSocketChannelAccept1(Thread.Builder.OfVirtual builder) throws Exception {
382         VThreadRunner.run(builder, () -> {
383             try (var ssc = ServerSocketChannel.open()) {
384                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
385                 var sc1 = SocketChannel.open(ssc.getLocalAddress());
386                 // accept should not block
387                 var sc2 = ssc.accept();
388                 sc1.close();
389                 sc2.close();
390             }
391         });
392     }
393 
394     /**
395      * Virtual thread blocks in ServerSocketChannel accept.
396      */
397     @ParameterizedTest
398     @MethodSource("threadBuilders")
399     void testServerSocketChannelAccept2(Thread.Builder.OfVirtual builder) throws Exception {
400         VThreadRunner.run(builder, () -> {
401             try (var ssc = ServerSocketChannel.open()) {
402                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
403                 var sc1 = SocketChannel.open();
404 
405                 // connect when current thread when it blocks in accept
406                 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
407 
408                 // accept should block
409                 var sc2 = ssc.accept();
410                 sc1.close();
411                 sc2.close();
412             }
413         });
414     }
415 
416     /**
417      * SeverSocketChannel close while virtual thread blocked in accept.
418      */
419     @ParameterizedTest
420     @MethodSource("threadBuilders")
421     void testServerSocketChannelAcceptAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
422         VThreadRunner.run(builder, () -> {
423             try (var ssc = ServerSocketChannel.open()) {
424                 InetAddress lh = InetAddress.getLoopbackAddress();
425                 ssc.bind(new InetSocketAddress(lh, 0));
426                 runAfterParkedAsync(ssc::close);
427                 try {
428                     SocketChannel sc = ssc.accept();
429                     sc.close();
430                     fail("connection accepted???");
431                 } catch (AsynchronousCloseException expected) { }
432             }
433         });
434     }
435 
436     /**
437      * Virtual thread interrupted while blocked in ServerSocketChannel accept.
438      */
439     @ParameterizedTest
440     @MethodSource("threadBuilders")
441     void testServerSocketChannelAcceptInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
442         VThreadRunner.run(builder, () -> {
443             try (var ssc = ServerSocketChannel.open()) {
444                 InetAddress lh = InetAddress.getLoopbackAddress();
445                 ssc.bind(new InetSocketAddress(lh, 0));
446 
447                 // interrupt current thread when it blocks in accept
448                 Thread thisThread = Thread.currentThread();
449                 runAfterParkedAsync(thisThread::interrupt);
450 
451                 try {
452                     SocketChannel sc = ssc.accept();
453                     sc.close();
454                     fail("connection accepted???");
455                 } catch (ClosedByInterruptException expected) {
456                     assertTrue(Thread.interrupted());
457                 }
458             }
459         });
460     }
461 
462     /**
463      * Virtual thread blocks in ServerSocketChannel adaptor accept.
464      */
465     @ParameterizedTest
466     @MethodSource("threadBuilders")
467     void testSocketChannelAdaptorAccept1(Thread.Builder.OfVirtual builder) throws Exception {
468         testSocketChannelAdaptorAccept(builder, 0);
469     }
470 
471     /**
472      * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
473      */
474     @ParameterizedTest
475     @MethodSource("threadBuilders")
476     void testSocketChannelAdaptorAccept2(Thread.Builder.OfVirtual builder) throws Exception {
477         testSocketChannelAdaptorAccept(builder, 60_000);
478     }
479 
480     private void testSocketChannelAdaptorAccept(Thread.Builder.OfVirtual builder,
481                                                 int timeout) throws Exception {
482         VThreadRunner.run(builder, () -> {
483             try (var ssc = ServerSocketChannel.open()) {
484                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
485                 var sc = SocketChannel.open();
486 
487                 // interrupt current thread when it blocks in accept
488                 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
489 
490                 // accept should block
491                 if (timeout > 0)
492                     ssc.socket().setSoTimeout(timeout);
493                 Socket s = ssc.socket().accept();
494                 sc.close();
495                 s.close();
496             }
497         });
498     }
499 
500     /**
501      * DatagramChannel receive/send, no blocking.
502      */
503     @ParameterizedTest
504     @MethodSource("threadBuilders")
505     void testDatagramChannelSendReceive1(Thread.Builder.OfVirtual builder) throws Exception {
506         VThreadRunner.run(builder, () -> {
507             try (DatagramChannel dc1 = DatagramChannel.open();
508                  DatagramChannel dc2 = DatagramChannel.open()) {
509 
510                 InetAddress lh = InetAddress.getLoopbackAddress();
511                 dc2.bind(new InetSocketAddress(lh, 0));
512 
513                 // send should not block
514                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
515                 int n = dc1.send(bb, dc2.getLocalAddress());
516                 assertTrue(n > 0);
517 
518                 // receive should not block
519                 bb = ByteBuffer.allocate(10);
520                 dc2.receive(bb);
521                 assertTrue(bb.get(0) == 'X');
522             }
523         });
524     }
525 
526     /**
527      * Virtual thread blocks in DatagramChannel receive.
528      */
529     @ParameterizedTest
530     @MethodSource("threadBuilders")
531     void testDatagramChannelSendReceive2(Thread.Builder.OfVirtual builder) throws Exception {
532         VThreadRunner.run(builder, () -> {
533             try (DatagramChannel dc1 = DatagramChannel.open();
534                  DatagramChannel dc2 = DatagramChannel.open()) {
535 
536                 InetAddress lh = InetAddress.getLoopbackAddress();
537                 dc2.bind(new InetSocketAddress(lh, 0));
538 
539                 // send from dc1 when current thread blocked in dc2.receive
540                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
541                 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
542 
543                 // read from dc2 should block
544                 ByteBuffer bb2 = ByteBuffer.allocate(10);
545                 dc2.receive(bb2);
546                 assertTrue(bb2.get(0) == 'X');
547             }
548         });
549     }
550 
551     /**
552      * DatagramChannel close while virtual thread blocked in receive.
553      */
554     @ParameterizedTest
555     @MethodSource("threadBuilders")
556     void testDatagramChannelReceiveAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
557         VThreadRunner.run(builder, () -> {
558             try (DatagramChannel dc = DatagramChannel.open()) {
559                 InetAddress lh = InetAddress.getLoopbackAddress();
560                 dc.bind(new InetSocketAddress(lh, 0));
561                 runAfterParkedAsync(dc::close);
562                 try {
563                     dc.receive(ByteBuffer.allocate(100));
564                     fail("receive returned");
565                 } catch (AsynchronousCloseException expected) { }
566             }
567         });
568     }
569 
570     /**
571      * Virtual thread interrupted while blocked in DatagramChannel receive.
572      */
573     @ParameterizedTest
574     @MethodSource("threadBuilders")
575     void testDatagramChannelReceiveInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
576         VThreadRunner.run(builder, () -> {
577             try (DatagramChannel dc = DatagramChannel.open()) {
578                 InetAddress lh = InetAddress.getLoopbackAddress();
579                 dc.bind(new InetSocketAddress(lh, 0));
580 
581                 // interrupt current thread when it blocks in receive
582                 Thread thisThread = Thread.currentThread();
583                 runAfterParkedAsync(thisThread::interrupt);
584 
585                 try {
586                     dc.receive(ByteBuffer.allocate(100));
587                     fail("receive returned");
588                 } catch (ClosedByInterruptException expected) {
589                     assertTrue(Thread.interrupted());
590                 }
591             }
592         });
593     }
594 
595     /**
596      * Virtual thread blocks in DatagramSocket adaptor receive.
597      */
598     @ParameterizedTest
599     @MethodSource("threadBuilders")
600     void testDatagramSocketAdaptorReceive1(Thread.Builder.OfVirtual builder) throws Exception {
601         testDatagramSocketAdaptorReceive(builder, 0);
602     }
603 
604     /**
605      * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
606      */
607     @ParameterizedTest
608     @MethodSource("threadBuilders")
609     void testDatagramSocketAdaptorReceive2(Thread.Builder.OfVirtual builder) throws Exception {
610         testDatagramSocketAdaptorReceive(builder, 60_000);
611     }
612 
613     private void testDatagramSocketAdaptorReceive(Thread.Builder.OfVirtual builder,
614                                                   int timeout) throws Exception {
615         VThreadRunner.run(builder, () -> {
616             try (DatagramChannel dc1 = DatagramChannel.open();
617                  DatagramChannel dc2 = DatagramChannel.open()) {
618 
619                 InetAddress lh = InetAddress.getLoopbackAddress();
620                 dc2.bind(new InetSocketAddress(lh, 0));
621 
622                 // send from dc1 when current thread blocks in dc2 receive
623                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
624                 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
625 
626                 // receive should block
627                 byte[] array = new byte[100];
628                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
629                 if (timeout > 0)
630                     dc2.socket().setSoTimeout(timeout);
631                 dc2.socket().receive(p);
632                 assertTrue(p.getLength() == 3 && array[0] == 'X');
633             }
634         });
635     }
636 
637     /**
638      * DatagramChannel close while virtual thread blocked in adaptor receive.
639      */
640     @ParameterizedTest
641     @MethodSource("threadBuilders")
642     void testDatagramSocketAdaptorReceiveAsyncClose1(Thread.Builder.OfVirtual builder) throws Exception {
643         testDatagramSocketAdaptorReceiveAsyncClose(builder, 0);
644     }
645 
646     /**
647      * DatagramChannel close while virtual thread blocked in adaptor receive
648      * with timeout.
649      */
650     @ParameterizedTest
651     @MethodSource("threadBuilders")
652     void testDatagramSocketAdaptorReceiveAsyncClose2(Thread.Builder.OfVirtual builder) throws Exception {
653         testDatagramSocketAdaptorReceiveAsyncClose(builder, 60_1000);
654     }
655 
656     private void testDatagramSocketAdaptorReceiveAsyncClose(Thread.Builder.OfVirtual builder,
657                                                             int timeout) throws Exception {
658         VThreadRunner.run(builder, () -> {
659             try (DatagramChannel dc = DatagramChannel.open()) {
660                 InetAddress lh = InetAddress.getLoopbackAddress();
661                 dc.bind(new InetSocketAddress(lh, 0));
662 
663                 byte[] array = new byte[100];
664                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
665                 if (timeout > 0)
666                     dc.socket().setSoTimeout(timeout);
667 
668                 // close channel/socket when current thread blocks in receive
669                 runAfterParkedAsync(dc::close);
670 
671                 assertThrows(SocketException.class, () -> dc.socket().receive(p));
672             }
673         });
674     }
675 
676     /**
677      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
678      */
679     @ParameterizedTest
680     @MethodSource("threadBuilders")
681     void testDatagramSocketAdaptorReceiveInterrupt1(Thread.Builder.OfVirtual builder) throws Exception {
682         testDatagramSocketAdaptorReceiveInterrupt(builder, 0);
683     }
684 
685     /**
686      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
687      * with timeout.
688      */
689     @ParameterizedTest
690     @MethodSource("threadBuilders")
691     void testDatagramSocketAdaptorReceiveInterrupt2(Thread.Builder.OfVirtual builder) throws Exception {
692         testDatagramSocketAdaptorReceiveInterrupt(builder, 60_1000);
693     }
694 
695     private void testDatagramSocketAdaptorReceiveInterrupt(Thread.Builder.OfVirtual builder,
696                                                            int timeout) throws Exception {
697         VThreadRunner.run(builder, () -> {
698             try (DatagramChannel dc = DatagramChannel.open()) {
699                 InetAddress lh = InetAddress.getLoopbackAddress();
700                 dc.bind(new InetSocketAddress(lh, 0));
701 
702                 byte[] array = new byte[100];
703                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
704                 if (timeout > 0)
705                     dc.socket().setSoTimeout(timeout);
706 
707                 // interrupt current thread when it blocks in receive
708                 Thread thisThread = Thread.currentThread();
709                 runAfterParkedAsync(thisThread::interrupt);
710 
711                 try {
712                     dc.socket().receive(p);
713                     fail();
714                 } catch (ClosedByInterruptException expected) {
715                     assertTrue(Thread.interrupted());
716                 }
717             }
718         });
719     }
720 
721     /**
722      * Pipe read/write, no blocking.
723      */
724     @ParameterizedTest
725     @MethodSource("threadBuilders")
726     void testPipeReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
727         VThreadRunner.run(builder, () -> {
728             Pipe p = Pipe.open();
729             try (Pipe.SinkChannel sink = p.sink();
730                  Pipe.SourceChannel source = p.source()) {
731 
732                 // write should not block
733                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
734                 int n = sink.write(bb);
735                 assertTrue(n > 0);
736 
737                 // read should not block
738                 bb = ByteBuffer.allocate(10);
739                 n = source.read(bb);
740                 assertTrue(n > 0);
741                 assertTrue(bb.get(0) == 'X');
742             }
743         });
744     }
745 
746     /**
747      * Virtual thread blocks in Pipe.SourceChannel read.
748      */
749     @ParameterizedTest
750     @MethodSource("threadBuilders")
751     void testPipeReadWrite2(Thread.Builder.OfVirtual builder) throws Exception {
752         VThreadRunner.run(builder, () -> {
753             Pipe p = Pipe.open();
754             try (Pipe.SinkChannel sink = p.sink();
755                  Pipe.SourceChannel source = p.source()) {
756 
757                 // write from sink when current thread blocks reading from source
758                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
759                 runAfterParkedAsync(() -> sink.write(bb1));
760 
761                 // read should block
762                 ByteBuffer bb2 = ByteBuffer.allocate(10);
763                 int n = source.read(bb2);
764                 assertTrue(n > 0);
765                 assertTrue(bb2.get(0) == 'X');
766             }
767         });
768     }
769 
770     /**
771      * Virtual thread blocks in Pipe.SinkChannel write.
772      */
773     @ParameterizedTest
774     @MethodSource("threadBuilders")
775     void testPipeReadWrite3(Thread.Builder.OfVirtual builder) throws Exception {
776         VThreadRunner.run(builder, () -> {
777             Pipe p = Pipe.open();
778             try (Pipe.SinkChannel sink = p.sink();
779                  Pipe.SourceChannel source = p.source()) {
780 
781                 // read from source to EOF when current thread blocking in write
782                 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
783 
784                 // write to sink should block
785                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
786                 for (int i=0; i<1000; i++) {
787                     int n = sink.write(bb);
788                     assertTrue(n > 0);
789                     bb.clear();
790                 }
791                 sink.close();
792 
793                 // wait for reader to finish
794                 reader.join();
795             }
796         });
797     }
798 
799     /**
800      * Pipe.SourceChannel close while virtual thread blocked in read.
801      */
802     @ParameterizedTest
803     @MethodSource("threadBuilders")
804     void testPipeReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
805         VThreadRunner.run(builder, () -> {
806             Pipe p = Pipe.open();
807             try (Pipe.SinkChannel sink = p.sink();
808                  Pipe.SourceChannel source = p.source()) {
809                 runAfterParkedAsync(source::close);
810                 try {
811                     int n = source.read(ByteBuffer.allocate(100));
812                     fail("read returned " + n);
813                 } catch (AsynchronousCloseException expected) { }
814             }
815         });
816     }
817 
818     /**
819      * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
820      */
821     @ParameterizedTest
822     @MethodSource("threadBuilders")
823     void testPipeReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
824         VThreadRunner.run(builder, () -> {
825             Pipe p = Pipe.open();
826             try (Pipe.SinkChannel sink = p.sink();
827                  Pipe.SourceChannel source = p.source()) {
828 
829                 // interrupt current thread when it blocks reading from source
830                 Thread thisThread = Thread.currentThread();
831                 runAfterParkedAsync(thisThread::interrupt);
832 
833                 try {
834                     int n = source.read(ByteBuffer.allocate(100));
835                     fail("read returned " + n);
836                 } catch (ClosedByInterruptException expected) {
837                     assertTrue(Thread.interrupted());
838                 }
839             }
840         });
841     }
842 
843     /**
844      * Pipe.SinkChannel close while virtual thread blocked in write.
845      */
846     @ParameterizedTest
847     @MethodSource("threadBuilders")
848     void testPipeWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
849         VThreadRunner.run(builder, () -> {
850             boolean retry = true;
851             while (retry) {
852                 Pipe p = Pipe.open();
853                 try (Pipe.SinkChannel sink = p.sink();
854                      Pipe.SourceChannel source = p.source()) {
855 
856                     // close sink when current thread blocks in write
857                     runAfterParkedAsync(sink::close);
858                     try {
859                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
860                         for (;;) {
861                             int n = sink.write(bb);
862                             assertTrue(n > 0);
863                             bb.clear();
864                         }
865                     } catch (AsynchronousCloseException e) {
866                         // closed when blocked in write
867                         retry = false;
868                     } catch (ClosedChannelException e) {
869                         // closed when not blocked in write, need to retry test
870                     }
871                 }
872             }
873         });
874     }
875 
876     /**
877      * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
878      */
879     @ParameterizedTest
880     @MethodSource("threadBuilders")
881     void testPipeWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
882         VThreadRunner.run(builder, () -> {
883             boolean retry = true;
884             while (retry) {
885                 Pipe p = Pipe.open();
886                 try (Pipe.SinkChannel sink = p.sink();
887                      Pipe.SourceChannel source = p.source()) {
888 
889                     // interrupt current thread when it blocks in write
890                     Thread thisThread = Thread.currentThread();
891                     runAfterParkedAsync(thisThread::interrupt);
892 
893                     try {
894                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
895                         for (;;) {
896                             int n = sink.write(bb);
897                             assertTrue(n > 0);
898                             bb.clear();
899                         }
900                     } catch (ClosedByInterruptException expected) {
901                         // closed when blocked in write
902                         assertTrue(Thread.interrupted());
903                         retry = false;
904                     } catch (ClosedChannelException e) {
905                         // closed when not blocked in write, need to retry test
906                     }
907                 }
908             }
909         });
910     }
911 
912     /**
913      * Creates a loopback connection
914      */
915     static class Connection implements Closeable {
916         private final SocketChannel sc1;
917         private final SocketChannel sc2;
918         Connection() throws IOException {
919             var lh = InetAddress.getLoopbackAddress();
920             try (var listener = ServerSocketChannel.open()) {
921                 listener.bind(new InetSocketAddress(lh, 0));
922                 SocketChannel sc1 = SocketChannel.open();
923                 SocketChannel sc2 = null;
924                 try {
925                     sc1.socket().connect(listener.getLocalAddress());
926                     sc2 = listener.accept();
927                 } catch (IOException ioe) {
928                     sc1.close();
929                     throw ioe;
930                 }
931                 this.sc1 = sc1;
932                 this.sc2 = sc2;
933             }
934         }
935         SocketChannel channel1() {
936             return sc1;
937         }
938         SocketChannel channel2() {
939             return sc2;
940         }
941         @Override
942         public void close() throws IOException {
943             sc1.close();
944             sc2.close();
945         }
946     }
947 
948     /**
949      * Read from a channel until all bytes have been read or an I/O error occurs.
950      */
951     static void readToEOF(ReadableByteChannel rbc) throws IOException {
952         ByteBuffer bb = ByteBuffer.allocate(16*1024);
953         int n;
954         while ((n = rbc.read(bb)) > 0) {
955             bb.clear();
956         }
957     }
958 
959     @FunctionalInterface
960     interface ThrowingRunnable {
961         void run() throws Exception;
962     }
963 
964     /**
965      * Runs the given task asynchronously after the current virtual thread has parked.
966      * @return the thread started to run the task
967      */
968     static Thread runAfterParkedAsync(ThrowingRunnable task) {
969         Thread target = Thread.currentThread();
970         if (!target.isVirtual())
971             throw new WrongThreadException();
972         return Thread.ofPlatform().daemon().start(() -> {
973             try {
974                 Thread.State state = target.getState();
975                 while (state != Thread.State.WAITING
976                         && state != Thread.State.TIMED_WAITING) {
977                     Thread.sleep(20);
978                     state = target.getState();
979                 }
980                 Thread.sleep(20);  // give a bit more time to release carrier
981                 task.run();
982             } catch (Exception e) {
983                 e.printStackTrace();
984             }
985         });
986     }
987 }