1 /*
  2  * Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /**
 25  * @test id=default
 26  * @bug 8284161
 27  * @summary Test virtual threads doing blocking I/O on NIO channels
 28  * @library /test/lib
 29  * @run junit BlockingChannelOps
 30  */
 31 
 32 /**
 33  * @test id=poller-modes
 34  * @requires (os.family == "linux") | (os.family == "mac")
 35  * @library /test/lib
 36  * @run junit/othervm -Djdk.pollerMode=1 BlockingChannelOps
 37  * @run junit/othervm -Djdk.pollerMode=2 BlockingChannelOps
 38  */
 39 
 40 /**
 41  * @test id=no-vmcontinuations
 42  * @requires vm.continuations
 43  * @library /test/lib
 44  * @run junit/othervm -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
 45  */
 46 
 47 import java.io.Closeable;
 48 import java.io.IOException;
 49 import java.net.DatagramPacket;
 50 import java.net.InetAddress;
 51 import java.net.InetSocketAddress;
 52 import java.net.Socket;
 53 import java.net.SocketAddress;
 54 import java.net.SocketException;
 55 import java.nio.ByteBuffer;
 56 import java.nio.channels.AsynchronousCloseException;
 57 import java.nio.channels.ClosedByInterruptException;
 58 import java.nio.channels.ClosedChannelException;
 59 import java.nio.channels.DatagramChannel;
 60 import java.nio.channels.Pipe;
 61 import java.nio.channels.ReadableByteChannel;
 62 import java.nio.channels.ServerSocketChannel;
 63 import java.nio.channels.SocketChannel;
 64 import java.nio.channels.WritableByteChannel;
 65 
 66 import jdk.test.lib.thread.VThreadRunner;
 67 import org.junit.jupiter.api.Test;
 68 import static org.junit.jupiter.api.Assertions.*;
 69 
 70 class BlockingChannelOps {
 71 
 72     /**
 73      * SocketChannel read/write, no blocking.
 74      */
 75     @Test
 76     void testSocketChannelReadWrite1() throws Exception {
 77         VThreadRunner.run(() -> {
 78             try (var connection = new Connection()) {
 79                 SocketChannel sc1 = connection.channel1();
 80                 SocketChannel sc2 = connection.channel2();
 81 
 82                 // write to sc1
 83                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 84                 int n = sc1.write(bb);
 85                 assertTrue(n > 0);
 86 
 87                 // read from sc2 should not block
 88                 bb = ByteBuffer.allocate(10);
 89                 n = sc2.read(bb);
 90                 assertTrue(n > 0);
 91                 assertTrue(bb.get(0) == 'X');
 92             }
 93         });
 94     }
 95 
 96     /**
 97      * Virtual thread blocks in SocketChannel read.
 98      */
 99     @Test
100     void testSocketChannelRead() throws Exception {
101         VThreadRunner.run(() -> {
102             try (var connection = new Connection()) {
103                 SocketChannel sc1 = connection.channel1();
104                 SocketChannel sc2 = connection.channel2();
105 
106                 // write to sc1 when current thread blocks in sc2.read
107                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
108                 runAfterParkedAsync(() -> sc1.write(bb1));
109 
110                 // read from sc2 should block
111                 ByteBuffer bb2 = ByteBuffer.allocate(10);
112                 int n = sc2.read(bb2);
113                 assertTrue(n > 0);
114                 assertTrue(bb2.get(0) == 'X');
115             }
116         });
117     }
118 
119     /**
120      * Virtual thread blocks in SocketChannel write.
121      */
122     @Test
123     void testSocketChannelWrite() throws Exception {
124         VThreadRunner.run(() -> {
125             try (var connection = new Connection()) {
126                 SocketChannel sc1 = connection.channel1();
127                 SocketChannel sc2 = connection.channel2();
128 
129                 // read from sc2 to EOF when current thread blocks in sc1.write
130                 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
131 
132                 // write to sc1 should block
133                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
134                 for (int i=0; i<1000; i++) {
135                     int n = sc1.write(bb);
136                     assertTrue(n > 0);
137                     bb.clear();
138                 }
139                 sc1.close();
140 
141                 // wait for reader to finish
142                 reader.join();
143             }
144         });
145     }
146 
147     /**
148      * SocketChannel close while virtual thread blocked in read.
149      */
150     @Test
151     void testSocketChannelReadAsyncClose() throws Exception {
152         VThreadRunner.run(() -> {
153             try (var connection = new Connection()) {
154                 SocketChannel sc = connection.channel1();
155                 runAfterParkedAsync(sc::close);
156                 try {
157                     int n = sc.read(ByteBuffer.allocate(100));
158                     fail("read returned " + n);
159                 } catch (AsynchronousCloseException expected) { }
160             }
161         });
162     }
163 
164     /**
165      * Virtual thread interrupted while blocked in SocketChannel read.
166      */
167     @Test
168     void testSocketChannelReadInterrupt() throws Exception {
169         VThreadRunner.run(() -> {
170             try (var connection = new Connection()) {
171                 SocketChannel sc = connection.channel1();
172 
173                 // interrupt current thread when it blocks in read
174                 Thread thisThread = Thread.currentThread();
175                 runAfterParkedAsync(thisThread::interrupt);
176 
177                 try {
178                     int n = sc.read(ByteBuffer.allocate(100));
179                     fail("read returned " + n);
180                 } catch (ClosedByInterruptException expected) {
181                     assertTrue(Thread.interrupted());
182                 }
183             }
184         });
185     }
186 
187     /**
188      * SocketChannel close while virtual thread blocked in write.
189      */
190     @Test
191     void testSocketChannelWriteAsyncClose() throws Exception {
192         VThreadRunner.run(() -> {
193             boolean retry = true;
194             while (retry) {
195                 try (var connection = new Connection()) {
196                     SocketChannel sc = connection.channel1();
197 
198                     // close sc when current thread blocks in write
199                     runAfterParkedAsync(sc::close);
200                     try {
201                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
202                         for (;;) {
203                             int n = sc.write(bb);
204                             assertTrue(n > 0);
205                             bb.clear();
206                         }
207                     } catch (AsynchronousCloseException expected) {
208                         // closed when blocked in write
209                         retry = false;
210                     } catch (ClosedChannelException e) {
211                         // closed when not blocked in write, need to retry test
212                     }
213                 }
214             }
215         });
216     }
217 
218     /**
219      * Virtual thread interrupted while blocked in SocketChannel write.
220      */
221     @Test
222     void testSocketChannelWriteInterrupt() throws Exception {
223         VThreadRunner.run(() -> {
224             boolean retry = true;
225             while (retry) {
226                 try (var connection = new Connection()) {
227                     SocketChannel sc = connection.channel1();
228 
229                     // interrupt current thread when it blocks in write
230                     Thread thisThread = Thread.currentThread();
231                     runAfterParkedAsync(thisThread::interrupt);
232 
233                     try {
234                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
235                         for (;;) {
236                             int n = sc.write(bb);
237                             assertTrue(n > 0);
238                             bb.clear();
239                         }
240                     } catch (ClosedByInterruptException e) {
241                         // closed when blocked in write
242                         assertTrue(Thread.interrupted());
243                         retry = false;
244                     } catch (ClosedChannelException e) {
245                         // closed when not blocked in write, need to retry test
246                     }
247                 }
248             }
249         });
250     }
251 
252     /**
253      * Virtual thread blocks in SocketChannel adaptor read.
254      */
255     @Test
256     void testSocketAdaptorRead1() throws Exception {
257         testSocketAdaptorRead(0);
258     }
259 
260     /**
261      * Virtual thread blocks in SocketChannel adaptor read with timeout.
262      */
263     @Test
264     void testSocketAdaptorRead2() throws Exception {
265         testSocketAdaptorRead(60_000);
266     }
267 
268     private void testSocketAdaptorRead(int timeout) throws Exception {
269         VThreadRunner.run(() -> {
270             try (var connection = new Connection()) {
271                 SocketChannel sc1 = connection.channel1();
272                 SocketChannel sc2 = connection.channel2();
273 
274                 // write to sc1 when currnet thread blocks reading from sc2
275                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
276                 runAfterParkedAsync(() -> sc1.write(bb));
277 
278                 // read from sc2 should block
279                 byte[] array = new byte[100];
280                 if (timeout > 0)
281                     sc2.socket().setSoTimeout(timeout);
282                 int n = sc2.socket().getInputStream().read(array);
283                 assertTrue(n > 0);
284                 assertTrue(array[0] == 'X');
285             }
286         });
287     }
288 
289     /**
290      * ServerSocketChannel accept, no blocking.
291      */
292     @Test
293     void testServerSocketChannelAccept1() throws Exception {
294         VThreadRunner.run(() -> {
295             try (var ssc = ServerSocketChannel.open()) {
296                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
297                 var sc1 = SocketChannel.open(ssc.getLocalAddress());
298                 // accept should not block
299                 var sc2 = ssc.accept();
300                 sc1.close();
301                 sc2.close();
302             }
303         });
304     }
305 
306     /**
307      * Virtual thread blocks in ServerSocketChannel accept.
308      */
309     @Test
310     void testServerSocketChannelAccept2() throws Exception {
311         VThreadRunner.run(() -> {
312             try (var ssc = ServerSocketChannel.open()) {
313                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
314                 var sc1 = SocketChannel.open();
315 
316                 // connect when current thread when it blocks in accept
317                 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
318 
319                 // accept should block
320                 var sc2 = ssc.accept();
321                 sc1.close();
322                 sc2.close();
323             }
324         });
325     }
326 
327     /**
328      * SeverSocketChannel close while virtual thread blocked in accept.
329      */
330     @Test
331     void testServerSocketChannelAcceptAsyncClose() throws Exception {
332         VThreadRunner.run(() -> {
333             try (var ssc = ServerSocketChannel.open()) {
334                 InetAddress lh = InetAddress.getLoopbackAddress();
335                 ssc.bind(new InetSocketAddress(lh, 0));
336                 runAfterParkedAsync(ssc::close);
337                 try {
338                     SocketChannel sc = ssc.accept();
339                     sc.close();
340                     fail("connection accepted???");
341                 } catch (AsynchronousCloseException expected) { }
342             }
343         });
344     }
345 
346     /**
347      * Virtual thread interrupted while blocked in ServerSocketChannel accept.
348      */
349     @Test
350     void testServerSocketChannelAcceptInterrupt() throws Exception {
351         VThreadRunner.run(() -> {
352             try (var ssc = ServerSocketChannel.open()) {
353                 InetAddress lh = InetAddress.getLoopbackAddress();
354                 ssc.bind(new InetSocketAddress(lh, 0));
355 
356                 // interrupt current thread when it blocks in accept
357                 Thread thisThread = Thread.currentThread();
358                 runAfterParkedAsync(thisThread::interrupt);
359 
360                 try {
361                     SocketChannel sc = ssc.accept();
362                     sc.close();
363                     fail("connection accepted???");
364                 } catch (ClosedByInterruptException expected) {
365                     assertTrue(Thread.interrupted());
366                 }
367             }
368         });
369     }
370 
371     /**
372      * Virtual thread blocks in ServerSocketChannel adaptor accept.
373      */
374     @Test
375     void testSocketChannelAdaptorAccept1() throws Exception {
376         testSocketChannelAdaptorAccept(0);
377     }
378 
379     /**
380      * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
381      */
382     @Test
383     void testSocketChannelAdaptorAccept2() throws Exception {
384         testSocketChannelAdaptorAccept(60_000);
385     }
386 
387     private void testSocketChannelAdaptorAccept(int timeout) throws Exception {
388         VThreadRunner.run(() -> {
389             try (var ssc = ServerSocketChannel.open()) {
390                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
391                 var sc = SocketChannel.open();
392 
393                 // interrupt current thread when it blocks in accept
394                 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
395 
396                 // accept should block
397                 if (timeout > 0)
398                     ssc.socket().setSoTimeout(timeout);
399                 Socket s = ssc.socket().accept();
400                 sc.close();
401                 s.close();
402             }
403         });
404     }
405 
406     /**
407      * DatagramChannel receive/send, no blocking.
408      */
409     @Test
410     void testDatagramChannelSendReceive1() throws Exception {
411         VThreadRunner.run(() -> {
412             try (DatagramChannel dc1 = DatagramChannel.open();
413                  DatagramChannel dc2 = DatagramChannel.open()) {
414 
415                 InetAddress lh = InetAddress.getLoopbackAddress();
416                 dc2.bind(new InetSocketAddress(lh, 0));
417 
418                 // send should not block
419                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
420                 int n = dc1.send(bb, dc2.getLocalAddress());
421                 assertTrue(n > 0);
422 
423                 // receive should not block
424                 bb = ByteBuffer.allocate(10);
425                 dc2.receive(bb);
426                 assertTrue(bb.get(0) == 'X');
427             }
428         });
429     }
430 
431     /**
432      * Virtual thread blocks in DatagramChannel receive.
433      */
434     @Test
435     void testDatagramChannelSendReceive2() throws Exception {
436         VThreadRunner.run(() -> {
437             try (DatagramChannel dc1 = DatagramChannel.open();
438                  DatagramChannel dc2 = DatagramChannel.open()) {
439 
440                 InetAddress lh = InetAddress.getLoopbackAddress();
441                 dc2.bind(new InetSocketAddress(lh, 0));
442 
443                 // send from dc1 when current thread blocked in dc2.receive
444                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
445                 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
446 
447                 // read from dc2 should block
448                 ByteBuffer bb2 = ByteBuffer.allocate(10);
449                 dc2.receive(bb2);
450                 assertTrue(bb2.get(0) == 'X');
451             }
452         });
453     }
454 
455     /**
456      * DatagramChannel close while virtual thread blocked in receive.
457      */
458     @Test
459     void testDatagramChannelReceiveAsyncClose() throws Exception {
460         VThreadRunner.run(() -> {
461             try (DatagramChannel dc = DatagramChannel.open()) {
462                 InetAddress lh = InetAddress.getLoopbackAddress();
463                 dc.bind(new InetSocketAddress(lh, 0));
464                 runAfterParkedAsync(dc::close);
465                 try {
466                     dc.receive(ByteBuffer.allocate(100));
467                     fail("receive returned");
468                 } catch (AsynchronousCloseException expected) { }
469             }
470         });
471     }
472 
473     /**
474      * Virtual thread interrupted while blocked in DatagramChannel receive.
475      */
476     @Test
477     void testDatagramChannelReceiveInterrupt() throws Exception {
478         VThreadRunner.run(() -> {
479             try (DatagramChannel dc = DatagramChannel.open()) {
480                 InetAddress lh = InetAddress.getLoopbackAddress();
481                 dc.bind(new InetSocketAddress(lh, 0));
482 
483                 // interrupt current thread when it blocks in receive
484                 Thread thisThread = Thread.currentThread();
485                 runAfterParkedAsync(thisThread::interrupt);
486 
487                 try {
488                     dc.receive(ByteBuffer.allocate(100));
489                     fail("receive returned");
490                 } catch (ClosedByInterruptException expected) {
491                     assertTrue(Thread.interrupted());
492                 }
493             }
494         });
495     }
496 
497     /**
498      * Virtual thread blocks in DatagramSocket adaptor receive.
499      */
500     @Test
501     void testDatagramSocketAdaptorReceive1() throws Exception {
502         testDatagramSocketAdaptorReceive(0);
503     }
504 
505     /**
506      * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
507      */
508     @Test
509     void testDatagramSocketAdaptorReceive2() throws Exception {
510         testDatagramSocketAdaptorReceive(60_000);
511     }
512 
513     private void testDatagramSocketAdaptorReceive(int timeout) throws Exception {
514         VThreadRunner.run(() -> {
515             try (DatagramChannel dc1 = DatagramChannel.open();
516                  DatagramChannel dc2 = DatagramChannel.open()) {
517 
518                 InetAddress lh = InetAddress.getLoopbackAddress();
519                 dc2.bind(new InetSocketAddress(lh, 0));
520 
521                 // send from dc1 when current thread blocks in dc2 receive
522                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
523                 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
524 
525                 // receive should block
526                 byte[] array = new byte[100];
527                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
528                 if (timeout > 0)
529                     dc2.socket().setSoTimeout(timeout);
530                 dc2.socket().receive(p);
531                 assertTrue(p.getLength() == 3 && array[0] == 'X');
532             }
533         });
534     }
535 
536     /**
537      * DatagramChannel close while virtual thread blocked in adaptor receive.
538      */
539     @Test
540     void testDatagramSocketAdaptorReceiveAsyncClose1() throws Exception {
541         testDatagramSocketAdaptorReceiveAsyncClose(0);
542     }
543 
544     /**
545      * DatagramChannel close while virtual thread blocked in adaptor receive
546      * with timeout.
547      */
548     @Test
549     void testDatagramSocketAdaptorReceiveAsyncClose2() throws Exception {
550         testDatagramSocketAdaptorReceiveAsyncClose(60_1000);
551     }
552 
553     private void testDatagramSocketAdaptorReceiveAsyncClose(int timeout) throws Exception {
554         VThreadRunner.run(() -> {
555             try (DatagramChannel dc = DatagramChannel.open()) {
556                 InetAddress lh = InetAddress.getLoopbackAddress();
557                 dc.bind(new InetSocketAddress(lh, 0));
558 
559                 byte[] array = new byte[100];
560                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
561                 if (timeout > 0)
562                     dc.socket().setSoTimeout(timeout);
563 
564                 // close channel/socket when current thread blocks in receive
565                 runAfterParkedAsync(dc::close);
566 
567                 assertThrows(SocketException.class, () -> dc.socket().receive(p));
568             }
569         });
570     }
571 
572     /**
573      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
574      */
575     @Test
576     void testDatagramSocketAdaptorReceiveInterrupt1() throws Exception {
577         testDatagramSocketAdaptorReceiveInterrupt(0);
578     }
579 
580     /**
581      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
582      * with timeout.
583      */
584     @Test
585     void testDatagramSocketAdaptorReceiveInterrupt2() throws Exception {
586         testDatagramSocketAdaptorReceiveInterrupt(60_1000);
587     }
588 
589     private void testDatagramSocketAdaptorReceiveInterrupt(int timeout) throws Exception {
590         VThreadRunner.run(() -> {
591             try (DatagramChannel dc = DatagramChannel.open()) {
592                 InetAddress lh = InetAddress.getLoopbackAddress();
593                 dc.bind(new InetSocketAddress(lh, 0));
594 
595                 byte[] array = new byte[100];
596                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
597                 if (timeout > 0)
598                     dc.socket().setSoTimeout(timeout);
599 
600                 // interrupt current thread when it blocks in receive
601                 Thread thisThread = Thread.currentThread();
602                 runAfterParkedAsync(thisThread::interrupt);
603 
604                 try {
605                     dc.socket().receive(p);
606                     fail();
607                 } catch (ClosedByInterruptException expected) {
608                     assertTrue(Thread.interrupted());
609                 }
610             }
611         });
612     }
613 
614     /**
615      * Pipe read/write, no blocking.
616      */
617     @Test
618     void testPipeReadWrite1() throws Exception {
619         VThreadRunner.run(() -> {
620             Pipe p = Pipe.open();
621             try (Pipe.SinkChannel sink = p.sink();
622                  Pipe.SourceChannel source = p.source()) {
623 
624                 // write should not block
625                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
626                 int n = sink.write(bb);
627                 assertTrue(n > 0);
628 
629                 // read should not block
630                 bb = ByteBuffer.allocate(10);
631                 n = source.read(bb);
632                 assertTrue(n > 0);
633                 assertTrue(bb.get(0) == 'X');
634             }
635         });
636     }
637 
638     /**
639      * Virtual thread blocks in Pipe.SourceChannel read.
640      */
641     @Test
642     void testPipeReadWrite2() throws Exception {
643         VThreadRunner.run(() -> {
644             Pipe p = Pipe.open();
645             try (Pipe.SinkChannel sink = p.sink();
646                  Pipe.SourceChannel source = p.source()) {
647 
648                 // write from sink when current thread blocks reading from source
649                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
650                 runAfterParkedAsync(() -> sink.write(bb1));
651 
652                 // read should block
653                 ByteBuffer bb2 = ByteBuffer.allocate(10);
654                 int n = source.read(bb2);
655                 assertTrue(n > 0);
656                 assertTrue(bb2.get(0) == 'X');
657             }
658         });
659     }
660 
661     /**
662      * Virtual thread blocks in Pipe.SinkChannel write.
663      */
664     @Test
665     void testPipeReadWrite3() throws Exception {
666         VThreadRunner.run(() -> {
667             Pipe p = Pipe.open();
668             try (Pipe.SinkChannel sink = p.sink();
669                  Pipe.SourceChannel source = p.source()) {
670 
671                 // read from source to EOF when current thread blocking in write
672                 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
673 
674                 // write to sink should block
675                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
676                 for (int i=0; i<1000; i++) {
677                     int n = sink.write(bb);
678                     assertTrue(n > 0);
679                     bb.clear();
680                 }
681                 sink.close();
682 
683                 // wait for reader to finish
684                 reader.join();
685             }
686         });
687     }
688 
689     /**
690      * Pipe.SourceChannel close while virtual thread blocked in read.
691      */
692     @Test
693     void testPipeReadAsyncClose() throws Exception {
694         VThreadRunner.run(() -> {
695             Pipe p = Pipe.open();
696             try (Pipe.SinkChannel sink = p.sink();
697                  Pipe.SourceChannel source = p.source()) {
698                 runAfterParkedAsync(source::close);
699                 try {
700                     int n = source.read(ByteBuffer.allocate(100));
701                     fail("read returned " + n);
702                 } catch (AsynchronousCloseException expected) { }
703             }
704         });
705     }
706 
707     /**
708      * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
709      */
710     @Test
711     void testPipeReadInterrupt() throws Exception {
712         VThreadRunner.run(() -> {
713             Pipe p = Pipe.open();
714             try (Pipe.SinkChannel sink = p.sink();
715                  Pipe.SourceChannel source = p.source()) {
716 
717                 // interrupt current thread when it blocks reading from source
718                 Thread thisThread = Thread.currentThread();
719                 runAfterParkedAsync(thisThread::interrupt);
720 
721                 try {
722                     int n = source.read(ByteBuffer.allocate(100));
723                     fail("read returned " + n);
724                 } catch (ClosedByInterruptException expected) {
725                     assertTrue(Thread.interrupted());
726                 }
727             }
728         });
729     }
730 
731     /**
732      * Pipe.SinkChannel close while virtual thread blocked in write.
733      */
734     @Test
735     void testPipeWriteAsyncClose() throws Exception {
736         VThreadRunner.run(() -> {
737             boolean retry = true;
738             while (retry) {
739                 Pipe p = Pipe.open();
740                 try (Pipe.SinkChannel sink = p.sink();
741                      Pipe.SourceChannel source = p.source()) {
742 
743                     // close sink when current thread blocks in write
744                     runAfterParkedAsync(sink::close);
745                     try {
746                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
747                         for (;;) {
748                             int n = sink.write(bb);
749                             assertTrue(n > 0);
750                             bb.clear();
751                         }
752                     } catch (AsynchronousCloseException e) {
753                         // closed when blocked in write
754                         retry = false;
755                     } catch (ClosedChannelException e) {
756                         // closed when not blocked in write, need to retry test
757                     }
758                 }
759             }
760         });
761     }
762 
763     /**
764      * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
765      */
766     @Test
767     void testPipeWriteInterrupt() throws Exception {
768         VThreadRunner.run(() -> {
769             boolean retry = true;
770             while (retry) {
771                 Pipe p = Pipe.open();
772                 try (Pipe.SinkChannel sink = p.sink();
773                      Pipe.SourceChannel source = p.source()) {
774 
775                     // interrupt current thread when it blocks in write
776                     Thread thisThread = Thread.currentThread();
777                     runAfterParkedAsync(thisThread::interrupt);
778 
779                     try {
780                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
781                         for (;;) {
782                             int n = sink.write(bb);
783                             assertTrue(n > 0);
784                             bb.clear();
785                         }
786                     } catch (ClosedByInterruptException expected) {
787                         // closed when blocked in write
788                         assertTrue(Thread.interrupted());
789                         retry = false;
790                     } catch (ClosedChannelException e) {
791                         // closed when not blocked in write, need to retry test
792                     }
793                 }
794             }
795         });
796     }
797 
798     /**
799      * Creates a loopback connection
800      */
801     static class Connection implements Closeable {
802         private final SocketChannel sc1;
803         private final SocketChannel sc2;
804         Connection() throws IOException {
805             var lh = InetAddress.getLoopbackAddress();
806             try (var listener = ServerSocketChannel.open()) {
807                 listener.bind(new InetSocketAddress(lh, 0));
808                 SocketChannel sc1 = SocketChannel.open();
809                 SocketChannel sc2 = null;
810                 try {
811                     sc1.socket().connect(listener.getLocalAddress());
812                     sc2 = listener.accept();
813                 } catch (IOException ioe) {
814                     sc1.close();
815                     throw ioe;
816                 }
817                 this.sc1 = sc1;
818                 this.sc2 = sc2;
819             }
820         }
821         SocketChannel channel1() {
822             return sc1;
823         }
824         SocketChannel channel2() {
825             return sc2;
826         }
827         @Override
828         public void close() throws IOException {
829             sc1.close();
830             sc2.close();
831         }
832     }
833 
834     /**
835      * Read from a channel until all bytes have been read or an I/O error occurs.
836      */
837     static void readToEOF(ReadableByteChannel rbc) throws IOException {
838         ByteBuffer bb = ByteBuffer.allocate(16*1024);
839         int n;
840         while ((n = rbc.read(bb)) > 0) {
841             bb.clear();
842         }
843     }
844 
845     @FunctionalInterface
846     interface ThrowingRunnable {
847         void run() throws Exception;
848     }
849 
850     /**
851      * Runs the given task asynchronously after the current virtual thread has parked.
852      * @return the thread started to run the task
853      */
854     static Thread runAfterParkedAsync(ThrowingRunnable task) {
855         Thread target = Thread.currentThread();
856         if (!target.isVirtual())
857             throw new WrongThreadException();
858         return Thread.ofPlatform().daemon().start(() -> {
859             try {
860                 Thread.State state = target.getState();
861                 while (state != Thread.State.WAITING
862                         && state != Thread.State.TIMED_WAITING) {
863                     Thread.sleep(20);
864                     state = target.getState();
865                 }
866                 Thread.sleep(20);  // give a bit more time to release carrier
867                 task.run();
868             } catch (Exception e) {
869                 e.printStackTrace();
870             }
871         });
872     }
873 }