1 /*
  2  * Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /*
 25  * @test id=default
 26  * @bug 8284161
 27  * @summary Test virtual threads doing blocking I/O on NIO channels
 28  * @library /test/lib
 29  * @run junit/timeout=480 BlockingChannelOps
 30  */
 31 
 32 /*
 33  * @test id=poller-modes
 34  * @requires (os.family == "linux") | (os.family == "mac")
 35  * @library /test/lib
 36  * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 BlockingChannelOps
 37  * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 BlockingChannelOps
 38  */
 39 
 40 /*
 41  * @test id=no-vmcontinuations
 42  * @requires vm.continuations
 43  * @library /test/lib
 44  * @run junit/othervm/timeout=480 -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
 45  */
 46 
 47 import java.io.Closeable;
 48 import java.io.IOException;
 49 import java.net.DatagramPacket;
 50 import java.net.InetAddress;
 51 import java.net.InetSocketAddress;
 52 import java.net.Socket;
 53 import java.net.SocketAddress;
 54 import java.net.SocketException;
 55 import java.nio.ByteBuffer;
 56 import java.nio.channels.AsynchronousCloseException;
 57 import java.nio.channels.ClosedByInterruptException;
 58 import java.nio.channels.ClosedChannelException;
 59 import java.nio.channels.DatagramChannel;
 60 import java.nio.channels.Pipe;
 61 import java.nio.channels.ReadableByteChannel;
 62 import java.nio.channels.ServerSocketChannel;
 63 import java.nio.channels.SocketChannel;
 64 import java.nio.channels.WritableByteChannel;
 65 import java.util.concurrent.locks.LockSupport;
 66 
 67 import jdk.test.lib.thread.VThreadRunner;
 68 import org.junit.jupiter.api.Test;
 69 import org.junit.jupiter.params.ParameterizedTest;
 70 import org.junit.jupiter.params.provider.ValueSource;
 71 import static org.junit.jupiter.api.Assertions.*;
 72 
 73 class BlockingChannelOps {
 74 
 75     /**
 76      * SocketChannel read/write, no blocking.
 77      */
 78     @Test
 79     void testSocketChannelReadWrite1() throws Exception {
 80         VThreadRunner.run(() -> {
 81             try (var connection = new Connection()) {
 82                 SocketChannel sc1 = connection.channel1();
 83                 SocketChannel sc2 = connection.channel2();
 84 
 85                 // write to sc1
 86                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 87                 int n = sc1.write(bb);
 88                 assertTrue(n > 0);
 89 
 90                 // read from sc2 should not block
 91                 bb = ByteBuffer.allocate(10);
 92                 n = sc2.read(bb);
 93                 assertTrue(n > 0);
 94                 assertTrue(bb.get(0) == 'X');
 95             }
 96         });
 97     }
 98 
 99     /**
100      * Virtual thread blocks in SocketChannel read.
101      */
102     @Test
103     void testSocketChannelRead() throws Exception {
104         VThreadRunner.run(() -> {
105             try (var connection = new Connection()) {
106                 SocketChannel sc1 = connection.channel1();
107                 SocketChannel sc2 = connection.channel2();
108 
109                 // write to sc1 when current thread blocks in sc2.read
110                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
111                 runAfterParkedAsync(() -> sc1.write(bb1));
112 
113                 // read from sc2 should block
114                 ByteBuffer bb2 = ByteBuffer.allocate(10);
115                 int n = sc2.read(bb2);
116                 assertTrue(n > 0);
117                 assertTrue(bb2.get(0) == 'X');
118             }
119         });
120     }
121 
122     /**
123      * Virtual thread blocks in SocketChannel write.
124      */
125     @Test
126     void testSocketChannelWrite() throws Exception {
127         VThreadRunner.run(() -> {
128             try (var connection = new Connection()) {
129                 SocketChannel sc1 = connection.channel1();
130                 SocketChannel sc2 = connection.channel2();
131 
132                 // read from sc2 to EOF when current thread blocks in sc1.write
133                 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
134 
135                 // write to sc1 should block
136                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
137                 for (int i=0; i<1000; i++) {
138                     int n = sc1.write(bb);
139                     assertTrue(n > 0);
140                     bb.clear();
141                 }
142                 sc1.close();
143 
144                 // wait for reader to finish
145                 reader.join();
146             }
147         });
148     }
149 
150     /**
151      * SocketChannel close while virtual thread blocked in read.
152      */
153     @Test
154     void testSocketChannelReadAsyncClose() throws Exception {
155         VThreadRunner.run(() -> {
156             try (var connection = new Connection()) {
157                 SocketChannel sc = connection.channel1();
158                 runAfterParkedAsync(sc::close);
159                 try {
160                     int n = sc.read(ByteBuffer.allocate(100));
161                     fail("read returned " + n);
162                 } catch (AsynchronousCloseException expected) { }
163             }
164         });
165     }
166 
167     /**
168      * SocketChannel shutdownInput while virtual thread blocked in read.
169      */
170     @Test
171     void testSocketChannelReadAsyncShutdownInput() throws Exception {
172         VThreadRunner.run(() -> {
173             try (var connection = new Connection()) {
174                 SocketChannel sc = connection.channel1();
175                 runAfterParkedAsync(sc::shutdownInput);
176                 int n = sc.read(ByteBuffer.allocate(100));
177                 assertEquals(-1, n);
178                 assertTrue(sc.isOpen());
179             }
180         });
181     }
182 
183     /**
184      * Virtual thread interrupted while blocked in SocketChannel read.
185      */
186     @Test
187     void testSocketChannelReadInterrupt() throws Exception {
188         VThreadRunner.run(() -> {
189             try (var connection = new Connection()) {
190                 SocketChannel sc = connection.channel1();
191 
192                 // interrupt current thread when it blocks in read
193                 Thread thisThread = Thread.currentThread();
194                 runAfterParkedAsync(thisThread::interrupt);
195 
196                 try {
197                     int n = sc.read(ByteBuffer.allocate(100));
198                     fail("read returned " + n);
199                 } catch (ClosedByInterruptException expected) {
200                     assertTrue(Thread.interrupted());
201                 }
202             }
203         });
204     }
205 
206     /**
207      * SocketChannel close while virtual thread blocked in write.
208      */
209     @Test
210     void testSocketChannelWriteAsyncClose() throws Exception {
211         VThreadRunner.run(() -> {
212             boolean done = false;
213             while (!done) {
214                 try (var connection = new Connection()) {
215                     SocketChannel sc = connection.channel1();
216 
217                     // close sc when current thread blocks in write
218                     runAfterParkedAsync(sc::close, true);
219 
220                     // write until channel is closed
221                     try {
222                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
223                         for (;;) {
224                             int n = sc.write(bb);
225                             assertTrue(n > 0);
226                             bb.clear();
227                         }
228                     } catch (AsynchronousCloseException expected) {
229                         // closed when blocked in write
230                         done = true;
231                     } catch (ClosedChannelException e) {
232                         // closed but not blocked in write, need to retry test
233                         System.err.format("%s, need to retry!%n", e);
234                     }
235                 }
236             }
237         });
238     }
239 
240 
241     /**
242      * SocketChannel shutdownOutput while virtual thread blocked in write.
243      */
244     @Test
245     void testSocketChannelWriteAsyncShutdownOutput() throws Exception {
246         VThreadRunner.run(() -> {
247             try (var connection = new Connection()) {
248                 SocketChannel sc = connection.channel1();
249 
250                 // shutdown output when current thread blocks in write
251                 runAfterParkedAsync(sc::shutdownOutput);
252                 try {
253                     ByteBuffer bb = ByteBuffer.allocate(100*1024);
254                     for (;;) {
255                         int n = sc.write(bb);
256                         assertTrue(n > 0);
257                         bb.clear();
258                     }
259                 } catch (ClosedChannelException e) {
260                     // expected
261                 }
262                 assertTrue(sc.isOpen());
263             }
264         });
265     }
266 
267     /**
268      * Virtual thread interrupted while blocked in SocketChannel write.
269      */
270     @Test
271     void testSocketChannelWriteInterrupt() throws Exception {
272         VThreadRunner.run(() -> {
273             boolean done = false;
274             while (!done) {
275                 try (var connection = new Connection()) {
276                     SocketChannel sc = connection.channel1();
277 
278                     // interrupt current thread when it blocks in write
279                     Thread thisThread = Thread.currentThread();
280                     runAfterParkedAsync(thisThread::interrupt, true);
281 
282                     // write until channel is closed
283                     try {
284                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
285                         for (;;) {
286                             int n = sc.write(bb);
287                             assertTrue(n > 0);
288                             bb.clear();
289                         }
290                     } catch (ClosedByInterruptException e) {
291                         // closed when blocked in write
292                         assertTrue(Thread.interrupted());
293                         done = true;
294                     } catch (ClosedChannelException e) {
295                         // closed but not blocked in write, need to retry test
296                         System.err.format("%s, need to retry!%n", e);
297                     }
298                 }
299             }
300         });
301     }
302 
303     /**
304      * Virtual thread blocks in SocketChannel adaptor read.
305      */
306     @ParameterizedTest
307     @ValueSource(ints = { 0, 60_000 })
308     void testSocketAdaptorRead(int timeout) throws Exception {
309         VThreadRunner.run(() -> {
310             try (var connection = new Connection()) {
311                 SocketChannel sc1 = connection.channel1();
312                 SocketChannel sc2 = connection.channel2();
313 
314                 // write to sc1 when currnet thread blocks reading from sc2
315                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
316                 runAfterParkedAsync(() -> sc1.write(bb));
317 
318                 // read from sc2 should block
319                 byte[] array = new byte[100];
320                 if (timeout > 0)
321                     sc2.socket().setSoTimeout(timeout);
322                 int n = sc2.socket().getInputStream().read(array);
323                 assertTrue(n > 0);
324                 assertTrue(array[0] == 'X');
325             }
326         });
327     }
328 
329     /**
330      * ServerSocketChannel accept, no blocking.
331      */
332     @Test
333     void testServerSocketChannelAccept1() throws Exception {
334         VThreadRunner.run(() -> {
335             try (var ssc = ServerSocketChannel.open()) {
336                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
337                 var sc1 = SocketChannel.open(ssc.getLocalAddress());
338                 // accept should not block
339                 var sc2 = ssc.accept();
340                 sc1.close();
341                 sc2.close();
342             }
343         });
344     }
345 
346     /**
347      * Virtual thread blocks in ServerSocketChannel accept.
348      */
349     @Test
350     void testServerSocketChannelAccept2() throws Exception {
351         VThreadRunner.run(() -> {
352             try (var ssc = ServerSocketChannel.open()) {
353                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
354                 var sc1 = SocketChannel.open();
355 
356                 // connect when current thread when it blocks in accept
357                 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
358 
359                 // accept should block
360                 var sc2 = ssc.accept();
361                 sc1.close();
362                 sc2.close();
363             }
364         });
365     }
366 
367     /**
368      * SeverSocketChannel close while virtual thread blocked in accept.
369      */
370     @Test
371     void testServerSocketChannelAcceptAsyncClose() throws Exception {
372         VThreadRunner.run(() -> {
373             try (var ssc = ServerSocketChannel.open()) {
374                 InetAddress lh = InetAddress.getLoopbackAddress();
375                 ssc.bind(new InetSocketAddress(lh, 0));
376                 runAfterParkedAsync(ssc::close);
377                 try {
378                     SocketChannel sc = ssc.accept();
379                     sc.close();
380                     fail("connection accepted???");
381                 } catch (AsynchronousCloseException expected) { }
382             }
383         });
384     }
385 
386     /**
387      * Virtual thread interrupted while blocked in ServerSocketChannel accept.
388      */
389     @Test
390     void testServerSocketChannelAcceptInterrupt() throws Exception {
391         VThreadRunner.run(() -> {
392             try (var ssc = ServerSocketChannel.open()) {
393                 InetAddress lh = InetAddress.getLoopbackAddress();
394                 ssc.bind(new InetSocketAddress(lh, 0));
395 
396                 // interrupt current thread when it blocks in accept
397                 Thread thisThread = Thread.currentThread();
398                 runAfterParkedAsync(thisThread::interrupt);
399 
400                 try {
401                     SocketChannel sc = ssc.accept();
402                     sc.close();
403                     fail("connection accepted???");
404                 } catch (ClosedByInterruptException expected) {
405                     assertTrue(Thread.interrupted());
406                 }
407             }
408         });
409     }
410 
411     /**
412      * Virtual thread blocks in ServerSocketChannel adaptor accept.
413      */
414     @ParameterizedTest
415     @ValueSource(ints = { 0, 60_000 })
416     void testSocketChannelAdaptorAccept(int timeout) throws Exception {
417         VThreadRunner.run(() -> {
418             try (var ssc = ServerSocketChannel.open()) {
419                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
420                 var sc = SocketChannel.open();
421 
422                 // interrupt current thread when it blocks in accept
423                 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
424 
425                 // accept should block
426                 if (timeout > 0)
427                     ssc.socket().setSoTimeout(timeout);
428                 Socket s = ssc.socket().accept();
429                 sc.close();
430                 s.close();
431             }
432         });
433     }
434 
435     /**
436      * DatagramChannel receive/send, no blocking.
437      */
438     @Test
439     void testDatagramChannelSendReceive1() throws Exception {
440         VThreadRunner.run(() -> {
441             try (DatagramChannel dc1 = DatagramChannel.open();
442                  DatagramChannel dc2 = DatagramChannel.open()) {
443 
444                 InetAddress lh = InetAddress.getLoopbackAddress();
445                 dc2.bind(new InetSocketAddress(lh, 0));
446 
447                 // send should not block
448                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
449                 int n = dc1.send(bb, dc2.getLocalAddress());
450                 assertTrue(n > 0);
451 
452                 // receive should not block
453                 bb = ByteBuffer.allocate(10);
454                 dc2.receive(bb);
455                 assertTrue(bb.get(0) == 'X');
456             }
457         });
458     }
459 
460     /**
461      * Virtual thread blocks in DatagramChannel receive.
462      */
463     @Test
464     void testDatagramChannelSendReceive2() throws Exception {
465         VThreadRunner.run(() -> {
466             try (DatagramChannel dc1 = DatagramChannel.open();
467                  DatagramChannel dc2 = DatagramChannel.open()) {
468 
469                 InetAddress lh = InetAddress.getLoopbackAddress();
470                 dc2.bind(new InetSocketAddress(lh, 0));
471 
472                 // send from dc1 when current thread blocked in dc2.receive
473                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
474                 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
475 
476                 // read from dc2 should block
477                 ByteBuffer bb2 = ByteBuffer.allocate(10);
478                 dc2.receive(bb2);
479                 assertTrue(bb2.get(0) == 'X');
480             }
481         });
482     }
483 
484     /**
485      * DatagramChannel close while virtual thread blocked in receive.
486      */
487     @Test
488     void testDatagramChannelReceiveAsyncClose() throws Exception {
489         VThreadRunner.run(() -> {
490             try (DatagramChannel dc = DatagramChannel.open()) {
491                 InetAddress lh = InetAddress.getLoopbackAddress();
492                 dc.bind(new InetSocketAddress(lh, 0));
493                 runAfterParkedAsync(dc::close);
494                 try {
495                     dc.receive(ByteBuffer.allocate(100));
496                     fail("receive returned");
497                 } catch (AsynchronousCloseException expected) { }
498             }
499         });
500     }
501 
502     /**
503      * Virtual thread interrupted while blocked in DatagramChannel receive.
504      */
505     @Test
506     void testDatagramChannelReceiveInterrupt() throws Exception {
507         VThreadRunner.run(() -> {
508             try (DatagramChannel dc = DatagramChannel.open()) {
509                 InetAddress lh = InetAddress.getLoopbackAddress();
510                 dc.bind(new InetSocketAddress(lh, 0));
511 
512                 // interrupt current thread when it blocks in receive
513                 Thread thisThread = Thread.currentThread();
514                 runAfterParkedAsync(thisThread::interrupt);
515 
516                 try {
517                     dc.receive(ByteBuffer.allocate(100));
518                     fail("receive returned");
519                 } catch (ClosedByInterruptException expected) {
520                     assertTrue(Thread.interrupted());
521                 }
522             }
523         });
524     }
525 
526     /**
527      * Virtual thread blocks in DatagramSocket adaptor receive.
528      */
529     @ParameterizedTest
530     @ValueSource(ints = { 0, 60_000 })
531     void testDatagramSocketAdaptorReceive(int timeout) throws Exception {
532         VThreadRunner.run(() -> {
533             try (DatagramChannel dc1 = DatagramChannel.open();
534                  DatagramChannel dc2 = DatagramChannel.open()) {
535 
536                 InetAddress lh = InetAddress.getLoopbackAddress();
537                 dc2.bind(new InetSocketAddress(lh, 0));
538 
539                 // send from dc1 when current thread blocks in dc2 receive
540                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
541                 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
542 
543                 // receive should block
544                 byte[] array = new byte[100];
545                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
546                 if (timeout > 0)
547                     dc2.socket().setSoTimeout(timeout);
548                 dc2.socket().receive(p);
549                 assertTrue(p.getLength() == 3 && array[0] == 'X');
550             }
551         });
552     }
553 
554     /**
555      * DatagramChannel close while virtual thread blocked in adaptor receive.
556      */
557     @ParameterizedTest
558     @ValueSource(ints = { 0, 60_000 })
559     void testDatagramSocketAdaptorReceiveAsyncClose(int timeout) throws Exception {
560         VThreadRunner.run(() -> {
561             try (DatagramChannel dc = DatagramChannel.open()) {
562                 InetAddress lh = InetAddress.getLoopbackAddress();
563                 dc.bind(new InetSocketAddress(lh, 0));
564 
565                 byte[] array = new byte[100];
566                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
567                 if (timeout > 0)
568                     dc.socket().setSoTimeout(timeout);
569 
570                 // close channel/socket when current thread blocks in receive
571                 runAfterParkedAsync(dc::close);
572 
573                 assertThrows(SocketException.class, () -> dc.socket().receive(p));
574             }
575         });
576     }
577 
578     /**
579      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
580      */
581     @ParameterizedTest
582     @ValueSource(ints = { 0, 60_000 })
583     void testDatagramSocketAdaptorReceiveInterrupt(int timeout) throws Exception {
584         VThreadRunner.run(() -> {
585             try (DatagramChannel dc = DatagramChannel.open()) {
586                 InetAddress lh = InetAddress.getLoopbackAddress();
587                 dc.bind(new InetSocketAddress(lh, 0));
588 
589                 byte[] array = new byte[100];
590                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
591                 if (timeout > 0)
592                     dc.socket().setSoTimeout(timeout);
593 
594                 // interrupt current thread when it blocks in receive
595                 Thread thisThread = Thread.currentThread();
596                 runAfterParkedAsync(thisThread::interrupt);
597 
598                 try {
599                     dc.socket().receive(p);
600                     fail();
601                 } catch (ClosedByInterruptException expected) {
602                     assertTrue(Thread.interrupted());
603                 }
604             }
605         });
606     }
607 
608     /**
609      * Pipe read/write, no blocking.
610      */
611     @Test
612     void testPipeReadWrite1() throws Exception {
613         VThreadRunner.run(() -> {
614             Pipe p = Pipe.open();
615             try (Pipe.SinkChannel sink = p.sink();
616                  Pipe.SourceChannel source = p.source()) {
617 
618                 // write should not block
619                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
620                 int n = sink.write(bb);
621                 assertTrue(n > 0);
622 
623                 // read should not block
624                 bb = ByteBuffer.allocate(10);
625                 n = source.read(bb);
626                 assertTrue(n > 0);
627                 assertTrue(bb.get(0) == 'X');
628             }
629         });
630     }
631 
632     /**
633      * Virtual thread blocks in Pipe.SourceChannel read.
634      */
635     @Test
636     void testPipeReadWrite2() throws Exception {
637         VThreadRunner.run(() -> {
638             Pipe p = Pipe.open();
639             try (Pipe.SinkChannel sink = p.sink();
640                  Pipe.SourceChannel source = p.source()) {
641 
642                 // write from sink when current thread blocks reading from source
643                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
644                 runAfterParkedAsync(() -> sink.write(bb1));
645 
646                 // read should block
647                 ByteBuffer bb2 = ByteBuffer.allocate(10);
648                 int n = source.read(bb2);
649                 assertTrue(n > 0);
650                 assertTrue(bb2.get(0) == 'X');
651             }
652         });
653     }
654 
655     /**
656      * Virtual thread blocks in Pipe.SinkChannel write.
657      */
658     @Test
659     void testPipeReadWrite3() throws Exception {
660         VThreadRunner.run(() -> {
661             Pipe p = Pipe.open();
662             try (Pipe.SinkChannel sink = p.sink();
663                  Pipe.SourceChannel source = p.source()) {
664 
665                 // read from source to EOF when current thread blocking in write
666                 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
667 
668                 // write to sink should block
669                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
670                 for (int i=0; i<1000; i++) {
671                     int n = sink.write(bb);
672                     assertTrue(n > 0);
673                     bb.clear();
674                 }
675                 sink.close();
676 
677                 // wait for reader to finish
678                 reader.join();
679             }
680         });
681     }
682 
683     /**
684      * Pipe.SourceChannel close while virtual thread blocked in read.
685      */
686     @Test
687     void testPipeReadAsyncClose() throws Exception {
688         VThreadRunner.run(() -> {
689             Pipe p = Pipe.open();
690             try (Pipe.SinkChannel sink = p.sink();
691                  Pipe.SourceChannel source = p.source()) {
692                 runAfterParkedAsync(source::close);
693                 try {
694                     int n = source.read(ByteBuffer.allocate(100));
695                     fail("read returned " + n);
696                 } catch (AsynchronousCloseException expected) { }
697             }
698         });
699     }
700 
701     /**
702      * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
703      */
704     @Test
705     void testPipeReadInterrupt() throws Exception {
706         VThreadRunner.run(() -> {
707             Pipe p = Pipe.open();
708             try (Pipe.SinkChannel sink = p.sink();
709                  Pipe.SourceChannel source = p.source()) {
710 
711                 // interrupt current thread when it blocks reading from source
712                 Thread thisThread = Thread.currentThread();
713                 runAfterParkedAsync(thisThread::interrupt);
714 
715                 try {
716                     int n = source.read(ByteBuffer.allocate(100));
717                     fail("read returned " + n);
718                 } catch (ClosedByInterruptException expected) {
719                     assertTrue(Thread.interrupted());
720                 }
721             }
722         });
723     }
724 
725     /**
726      * Pipe.SinkChannel close while virtual thread blocked in write.
727      */
728     @Test
729     void testPipeWriteAsyncClose() throws Exception {
730         VThreadRunner.run(() -> {
731             boolean done = false;
732             while (!done) {
733                 Pipe p = Pipe.open();
734                 try (Pipe.SinkChannel sink = p.sink();
735                      Pipe.SourceChannel source = p.source()) {
736 
737                     // close sink when current thread blocks in write
738                     runAfterParkedAsync(sink::close, true);
739 
740                     // write until channel is closed
741                     try {
742                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
743                         for (;;) {
744                             int n = sink.write(bb);
745                             assertTrue(n > 0);
746                             bb.clear();
747                         }
748                     } catch (AsynchronousCloseException e) {
749                         // closed when blocked in write
750                         done = true;
751                     } catch (ClosedChannelException e) {
752                         // closed but not blocked in write, need to retry test
753                         System.err.format("%s, need to retry!%n", e);
754                     }
755                 }
756             }
757         });
758     }
759 
760     /**
761      * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
762      */
763     @Test
764     void testPipeWriteInterrupt() throws Exception {
765         VThreadRunner.run(() -> {
766             boolean done = false;
767             while (!done) {
768                 Pipe p = Pipe.open();
769                 try (Pipe.SinkChannel sink = p.sink();
770                      Pipe.SourceChannel source = p.source()) {
771 
772                     // interrupt current thread when it blocks in write
773                     Thread thisThread = Thread.currentThread();
774                     runAfterParkedAsync(thisThread::interrupt, true);
775 
776                     // write until channel is closed
777                     try {
778                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
779                         for (;;) {
780                             int n = sink.write(bb);
781                             assertTrue(n > 0);
782                             bb.clear();
783                         }
784                     } catch (ClosedByInterruptException expected) {
785                         // closed when blocked in write
786                         assertTrue(Thread.interrupted());
787                         done = true;
788                     } catch (ClosedChannelException e) {
789                         // closed but not blocked in write, need to retry test
790                         System.err.format("%s, need to retry!%n", e);
791                     }
792                 }
793             }
794         });
795     }
796 
797     /**
798      * Creates a loopback connection
799      */
800     static class Connection implements Closeable {
801         private final SocketChannel sc1;
802         private final SocketChannel sc2;
803         Connection() throws IOException {
804             var lh = InetAddress.getLoopbackAddress();
805             try (var listener = ServerSocketChannel.open()) {
806                 listener.bind(new InetSocketAddress(lh, 0));
807                 SocketChannel sc1 = SocketChannel.open();
808                 SocketChannel sc2 = null;
809                 try {
810                     sc1.socket().connect(listener.getLocalAddress());
811                     sc2 = listener.accept();
812                 } catch (IOException ioe) {
813                     sc1.close();
814                     throw ioe;
815                 }
816                 this.sc1 = sc1;
817                 this.sc2 = sc2;
818             }
819         }
820         SocketChannel channel1() {
821             return sc1;
822         }
823         SocketChannel channel2() {
824             return sc2;
825         }
826         @Override
827         public void close() throws IOException {
828             sc1.close();
829             sc2.close();
830         }
831     }
832 
833     /**
834      * Read from a channel until all bytes have been read or an I/O error occurs.
835      */
836     static void readToEOF(ReadableByteChannel rbc) throws IOException {
837         ByteBuffer bb = ByteBuffer.allocate(16*1024);
838         int n;
839         while ((n = rbc.read(bb)) > 0) {
840             bb.clear();
841         }
842     }
843 
844     @FunctionalInterface
845     interface ThrowingRunnable {
846         void run() throws Exception;
847     }
848 
849     /**
850      * Runs the given task asynchronously after the current virtual thread parks.
851      * @param writing if the thread will block in write
852      * @return the thread started to run the task
853      */
854     private static Thread runAfterParkedAsync(ThrowingRunnable task, boolean writing) {
855         Thread target = Thread.currentThread();
856         if (!target.isVirtual())
857             throw new WrongThreadException();
858         return Thread.ofPlatform().daemon().start(() -> {
859             try {
860                 // wait for target thread to park
861                 while (!isWaiting(target)) {
862                     Thread.sleep(20);
863                 }
864 
865                 // if the target thread is parked in write then we nudge it a few times
866                 // to avoid wakeup with some bytes written
867                 if (writing) {
868                     for (int i = 0; i < 3; i++) {
869                         LockSupport.unpark(target);
870                         while (!isWaiting(target)) {
871                             Thread.sleep(20);
872                         }
873                     }
874                 }
875 
876                 task.run();
877 
878             } catch (Exception e) {
879                 e.printStackTrace();
880             }
881         });
882     }
883 
884     private static Thread runAfterParkedAsync(ThrowingRunnable task) {
885         return runAfterParkedAsync(task, false);
886     }
887 
888     /**
889      * Return true if the given Thread is parked.
890      */
891     private static boolean isWaiting(Thread target) {
892         Thread.State state = target.getState();
893         assertNotEquals(Thread.State.TERMINATED, state);
894         return (state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING);
895     }
896 }