1 /*
  2  * Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /*
 25  * @test id=default
 26  * @bug 8284161
 27  * @summary Test virtual threads doing blocking I/O on NIO channels
 28  * @library /test/lib
 29  * @run junit/timeout=480 BlockingChannelOps
 30  */
 31 
 32 /*
 33  * @test id=poller-modes
 34  * @requires (os.family == "linux") | (os.family == "mac")
 35  * @library /test/lib
 36  * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 BlockingChannelOps
 37  * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 BlockingChannelOps
 38  * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 BlockingChannelOps
 39  */
 40 
 41 /*
 42  * @test id=no-vmcontinuations
 43  * @requires vm.continuations
 44  * @library /test/lib
 45  * @run junit/othervm/timeout=480 -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
 46  */
 47 
 48 import java.io.Closeable;
 49 import java.io.IOException;
 50 import java.net.DatagramPacket;
 51 import java.net.InetAddress;
 52 import java.net.InetSocketAddress;
 53 import java.net.Socket;
 54 import java.net.SocketAddress;
 55 import java.net.SocketException;
 56 import java.nio.ByteBuffer;
 57 import java.nio.channels.AsynchronousCloseException;
 58 import java.nio.channels.ClosedByInterruptException;
 59 import java.nio.channels.ClosedChannelException;
 60 import java.nio.channels.DatagramChannel;
 61 import java.nio.channels.Pipe;
 62 import java.nio.channels.ReadableByteChannel;
 63 import java.nio.channels.ServerSocketChannel;
 64 import java.nio.channels.SocketChannel;
 65 import java.nio.channels.WritableByteChannel;
 66 import java.util.concurrent.locks.LockSupport;
 67 
 68 import jdk.test.lib.thread.VThreadRunner;
 69 import org.junit.jupiter.api.Test;
 70 import org.junit.jupiter.params.ParameterizedTest;
 71 import org.junit.jupiter.params.provider.ValueSource;
 72 import static org.junit.jupiter.api.Assertions.*;
 73 
 74 class BlockingChannelOps {
 75 
 76     /**
 77      * SocketChannel read/write, no blocking.
 78      */
 79     @Test
 80     void testSocketChannelReadWrite1() throws Exception {
 81         VThreadRunner.run(() -> {
 82             try (var connection = new Connection()) {
 83                 SocketChannel sc1 = connection.channel1();
 84                 SocketChannel sc2 = connection.channel2();
 85 
 86                 // write to sc1
 87                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 88                 int n = sc1.write(bb);
 89                 assertTrue(n > 0);
 90 
 91                 // read from sc2 should not block
 92                 bb = ByteBuffer.allocate(10);
 93                 n = sc2.read(bb);
 94                 assertTrue(n > 0);
 95                 assertTrue(bb.get(0) == 'X');
 96             }
 97         });
 98     }
 99 
100     /**
101      * Virtual thread blocks in SocketChannel read.
102      */
103     @Test
104     void testSocketChannelRead() throws Exception {
105         VThreadRunner.run(() -> {
106             try (var connection = new Connection()) {
107                 SocketChannel sc1 = connection.channel1();
108                 SocketChannel sc2 = connection.channel2();
109 
110                 // write to sc1 when current thread blocks in sc2.read
111                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
112                 runAfterParkedAsync(() -> sc1.write(bb1));
113 
114                 // read from sc2 should block
115                 ByteBuffer bb2 = ByteBuffer.allocate(10);
116                 int n = sc2.read(bb2);
117                 assertTrue(n > 0);
118                 assertTrue(bb2.get(0) == 'X');
119             }
120         });
121     }
122 
123     /**
124      * Virtual thread blocks in SocketChannel write.
125      */
126     @Test
127     void testSocketChannelWrite() throws Exception {
128         VThreadRunner.run(() -> {
129             try (var connection = new Connection()) {
130                 SocketChannel sc1 = connection.channel1();
131                 SocketChannel sc2 = connection.channel2();
132 
133                 // read from sc2 to EOF when current thread blocks in sc1.write
134                 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
135 
136                 // write to sc1 should block
137                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
138                 for (int i=0; i<1000; i++) {
139                     int n = sc1.write(bb);
140                     assertTrue(n > 0);
141                     bb.clear();
142                 }
143                 sc1.close();
144 
145                 // wait for reader to finish
146                 reader.join();
147             }
148         });
149     }
150 
151     /**
152      * SocketChannel close while virtual thread blocked in read.
153      */
154     @Test
155     void testSocketChannelReadAsyncClose() throws Exception {
156         VThreadRunner.run(() -> {
157             try (var connection = new Connection()) {
158                 SocketChannel sc = connection.channel1();
159                 runAfterParkedAsync(sc::close);
160                 try {
161                     int n = sc.read(ByteBuffer.allocate(100));
162                     fail("read returned " + n);
163                 } catch (AsynchronousCloseException expected) { }
164             }
165         });
166     }
167 
168     /**
169      * SocketChannel shutdownInput while virtual thread blocked in read.
170      */
171     @Test
172     void testSocketChannelReadAsyncShutdownInput() throws Exception {
173         VThreadRunner.run(() -> {
174             try (var connection = new Connection()) {
175                 SocketChannel sc = connection.channel1();
176                 runAfterParkedAsync(sc::shutdownInput);
177                 int n = sc.read(ByteBuffer.allocate(100));
178                 assertEquals(-1, n);
179                 assertTrue(sc.isOpen());
180             }
181         });
182     }
183 
184     /**
185      * Virtual thread interrupted while blocked in SocketChannel read.
186      */
187     @Test
188     void testSocketChannelReadInterrupt() throws Exception {
189         VThreadRunner.run(() -> {
190             try (var connection = new Connection()) {
191                 SocketChannel sc = connection.channel1();
192 
193                 // interrupt current thread when it blocks in read
194                 Thread thisThread = Thread.currentThread();
195                 runAfterParkedAsync(thisThread::interrupt);
196 
197                 try {
198                     int n = sc.read(ByteBuffer.allocate(100));
199                     fail("read returned " + n);
200                 } catch (ClosedByInterruptException expected) {
201                     assertTrue(Thread.interrupted());
202                 }
203             }
204         });
205     }
206 
207     /**
208      * SocketChannel close while virtual thread blocked in write.
209      */
210     @Test
211     void testSocketChannelWriteAsyncClose() throws Exception {
212         VThreadRunner.run(() -> {
213             boolean done = false;
214             while (!done) {
215                 try (var connection = new Connection()) {
216                     SocketChannel sc = connection.channel1();
217 
218                     // close sc when current thread blocks in write
219                     runAfterParkedAsync(sc::close, true);
220 
221                     // write until channel is closed
222                     try {
223                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
224                         for (;;) {
225                             int n = sc.write(bb);
226                             assertTrue(n > 0);
227                             bb.clear();
228                         }
229                     } catch (AsynchronousCloseException expected) {
230                         // closed when blocked in write
231                         done = true;
232                     } catch (ClosedChannelException e) {
233                         // closed but not blocked in write, need to retry test
234                         System.err.format("%s, need to retry!%n", e);
235                     }
236                 }
237             }
238         });
239     }
240 
241 
242     /**
243      * SocketChannel shutdownOutput while virtual thread blocked in write.
244      */
245     @Test
246     void testSocketChannelWriteAsyncShutdownOutput() throws Exception {
247         VThreadRunner.run(() -> {
248             try (var connection = new Connection()) {
249                 SocketChannel sc = connection.channel1();
250 
251                 // shutdown output when current thread blocks in write
252                 runAfterParkedAsync(sc::shutdownOutput);
253                 try {
254                     ByteBuffer bb = ByteBuffer.allocate(100*1024);
255                     for (;;) {
256                         int n = sc.write(bb);
257                         assertTrue(n > 0);
258                         bb.clear();
259                     }
260                 } catch (ClosedChannelException e) {
261                     // expected
262                 }
263                 assertTrue(sc.isOpen());
264             }
265         });
266     }
267 
268     /**
269      * Virtual thread interrupted while blocked in SocketChannel write.
270      */
271     @Test
272     void testSocketChannelWriteInterrupt() throws Exception {
273         VThreadRunner.run(() -> {
274             boolean done = false;
275             while (!done) {
276                 try (var connection = new Connection()) {
277                     SocketChannel sc = connection.channel1();
278 
279                     // interrupt current thread when it blocks in write
280                     Thread thisThread = Thread.currentThread();
281                     runAfterParkedAsync(thisThread::interrupt, true);
282 
283                     // write until channel is closed
284                     try {
285                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
286                         for (;;) {
287                             int n = sc.write(bb);
288                             assertTrue(n > 0);
289                             bb.clear();
290                         }
291                     } catch (ClosedByInterruptException e) {
292                         // closed when blocked in write
293                         assertTrue(Thread.interrupted());
294                         done = true;
295                     } catch (ClosedChannelException e) {
296                         // closed but not blocked in write, need to retry test
297                         System.err.format("%s, need to retry!%n", e);
298                     }
299                 }
300             }
301         });
302     }
303 
304     /**
305      * Virtual thread blocks in SocketChannel adaptor read.
306      */
307     @ParameterizedTest
308     @ValueSource(ints = { 0, 60_000 })
309     void testSocketAdaptorRead(int timeout) throws Exception {
310         VThreadRunner.run(() -> {
311             try (var connection = new Connection()) {
312                 SocketChannel sc1 = connection.channel1();
313                 SocketChannel sc2 = connection.channel2();
314 
315                 // write to sc1 when currnet thread blocks reading from sc2
316                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
317                 runAfterParkedAsync(() -> sc1.write(bb));
318 
319                 // read from sc2 should block
320                 byte[] array = new byte[100];
321                 if (timeout > 0)
322                     sc2.socket().setSoTimeout(timeout);
323                 int n = sc2.socket().getInputStream().read(array);
324                 assertTrue(n > 0);
325                 assertTrue(array[0] == 'X');
326             }
327         });
328     }
329 
330     /**
331      * ServerSocketChannel accept, no blocking.
332      */
333     @Test
334     void testServerSocketChannelAccept1() throws Exception {
335         VThreadRunner.run(() -> {
336             try (var ssc = ServerSocketChannel.open()) {
337                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
338                 var sc1 = SocketChannel.open(ssc.getLocalAddress());
339                 // accept should not block
340                 var sc2 = ssc.accept();
341                 sc1.close();
342                 sc2.close();
343             }
344         });
345     }
346 
347     /**
348      * Virtual thread blocks in ServerSocketChannel accept.
349      */
350     @Test
351     void testServerSocketChannelAccept2() throws Exception {
352         VThreadRunner.run(() -> {
353             try (var ssc = ServerSocketChannel.open()) {
354                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
355                 var sc1 = SocketChannel.open();
356 
357                 // connect when current thread when it blocks in accept
358                 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
359 
360                 // accept should block
361                 var sc2 = ssc.accept();
362                 sc1.close();
363                 sc2.close();
364             }
365         });
366     }
367 
368     /**
369      * SeverSocketChannel close while virtual thread blocked in accept.
370      */
371     @Test
372     void testServerSocketChannelAcceptAsyncClose() throws Exception {
373         VThreadRunner.run(() -> {
374             try (var ssc = ServerSocketChannel.open()) {
375                 InetAddress lh = InetAddress.getLoopbackAddress();
376                 ssc.bind(new InetSocketAddress(lh, 0));
377                 runAfterParkedAsync(ssc::close);
378                 try {
379                     SocketChannel sc = ssc.accept();
380                     sc.close();
381                     fail("connection accepted???");
382                 } catch (AsynchronousCloseException expected) { }
383             }
384         });
385     }
386 
387     /**
388      * Virtual thread interrupted while blocked in ServerSocketChannel accept.
389      */
390     @Test
391     void testServerSocketChannelAcceptInterrupt() throws Exception {
392         VThreadRunner.run(() -> {
393             try (var ssc = ServerSocketChannel.open()) {
394                 InetAddress lh = InetAddress.getLoopbackAddress();
395                 ssc.bind(new InetSocketAddress(lh, 0));
396 
397                 // interrupt current thread when it blocks in accept
398                 Thread thisThread = Thread.currentThread();
399                 runAfterParkedAsync(thisThread::interrupt);
400 
401                 try {
402                     SocketChannel sc = ssc.accept();
403                     sc.close();
404                     fail("connection accepted???");
405                 } catch (ClosedByInterruptException expected) {
406                     assertTrue(Thread.interrupted());
407                 }
408             }
409         });
410     }
411 
412     /**
413      * Virtual thread blocks in ServerSocketChannel adaptor accept.
414      */
415     @ParameterizedTest
416     @ValueSource(ints = { 0, 60_000 })
417     void testSocketChannelAdaptorAccept(int timeout) throws Exception {
418         VThreadRunner.run(() -> {
419             try (var ssc = ServerSocketChannel.open()) {
420                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
421                 var sc = SocketChannel.open();
422 
423                 // interrupt current thread when it blocks in accept
424                 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
425 
426                 // accept should block
427                 if (timeout > 0)
428                     ssc.socket().setSoTimeout(timeout);
429                 Socket s = ssc.socket().accept();
430                 sc.close();
431                 s.close();
432             }
433         });
434     }
435 
436     /**
437      * DatagramChannel receive/send, no blocking.
438      */
439     @Test
440     void testDatagramChannelSendReceive1() throws Exception {
441         VThreadRunner.run(() -> {
442             try (DatagramChannel dc1 = DatagramChannel.open();
443                  DatagramChannel dc2 = DatagramChannel.open()) {
444 
445                 InetAddress lh = InetAddress.getLoopbackAddress();
446                 dc2.bind(new InetSocketAddress(lh, 0));
447 
448                 // send should not block
449                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
450                 int n = dc1.send(bb, dc2.getLocalAddress());
451                 assertTrue(n > 0);
452 
453                 // receive should not block
454                 bb = ByteBuffer.allocate(10);
455                 dc2.receive(bb);
456                 assertTrue(bb.get(0) == 'X');
457             }
458         });
459     }
460 
461     /**
462      * Virtual thread blocks in DatagramChannel receive.
463      */
464     @Test
465     void testDatagramChannelSendReceive2() throws Exception {
466         VThreadRunner.run(() -> {
467             try (DatagramChannel dc1 = DatagramChannel.open();
468                  DatagramChannel dc2 = DatagramChannel.open()) {
469 
470                 InetAddress lh = InetAddress.getLoopbackAddress();
471                 dc2.bind(new InetSocketAddress(lh, 0));
472 
473                 // send from dc1 when current thread blocked in dc2.receive
474                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
475                 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
476 
477                 // read from dc2 should block
478                 ByteBuffer bb2 = ByteBuffer.allocate(10);
479                 dc2.receive(bb2);
480                 assertTrue(bb2.get(0) == 'X');
481             }
482         });
483     }
484 
485     /**
486      * DatagramChannel close while virtual thread blocked in receive.
487      */
488     @Test
489     void testDatagramChannelReceiveAsyncClose() throws Exception {
490         VThreadRunner.run(() -> {
491             try (DatagramChannel dc = DatagramChannel.open()) {
492                 InetAddress lh = InetAddress.getLoopbackAddress();
493                 dc.bind(new InetSocketAddress(lh, 0));
494                 runAfterParkedAsync(dc::close);
495                 try {
496                     dc.receive(ByteBuffer.allocate(100));
497                     fail("receive returned");
498                 } catch (AsynchronousCloseException expected) { }
499             }
500         });
501     }
502 
503     /**
504      * Virtual thread interrupted while blocked in DatagramChannel receive.
505      */
506     @Test
507     void testDatagramChannelReceiveInterrupt() throws Exception {
508         VThreadRunner.run(() -> {
509             try (DatagramChannel dc = DatagramChannel.open()) {
510                 InetAddress lh = InetAddress.getLoopbackAddress();
511                 dc.bind(new InetSocketAddress(lh, 0));
512 
513                 // interrupt current thread when it blocks in receive
514                 Thread thisThread = Thread.currentThread();
515                 runAfterParkedAsync(thisThread::interrupt);
516 
517                 try {
518                     dc.receive(ByteBuffer.allocate(100));
519                     fail("receive returned");
520                 } catch (ClosedByInterruptException expected) {
521                     assertTrue(Thread.interrupted());
522                 }
523             }
524         });
525     }
526 
527     /**
528      * Virtual thread blocks in DatagramSocket adaptor receive.
529      */
530     @ParameterizedTest
531     @ValueSource(ints = { 0, 60_000 })
532     void testDatagramSocketAdaptorReceive(int timeout) throws Exception {
533         VThreadRunner.run(() -> {
534             try (DatagramChannel dc1 = DatagramChannel.open();
535                  DatagramChannel dc2 = DatagramChannel.open()) {
536 
537                 InetAddress lh = InetAddress.getLoopbackAddress();
538                 dc2.bind(new InetSocketAddress(lh, 0));
539 
540                 // send from dc1 when current thread blocks in dc2 receive
541                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
542                 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
543 
544                 // receive should block
545                 byte[] array = new byte[100];
546                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
547                 if (timeout > 0)
548                     dc2.socket().setSoTimeout(timeout);
549                 dc2.socket().receive(p);
550                 assertTrue(p.getLength() == 3 && array[0] == 'X');
551             }
552         });
553     }
554 
555     /**
556      * DatagramChannel close while virtual thread blocked in adaptor receive.
557      */
558     @ParameterizedTest
559     @ValueSource(ints = { 0, 60_000 })
560     void testDatagramSocketAdaptorReceiveAsyncClose(int timeout) throws Exception {
561         VThreadRunner.run(() -> {
562             try (DatagramChannel dc = DatagramChannel.open()) {
563                 InetAddress lh = InetAddress.getLoopbackAddress();
564                 dc.bind(new InetSocketAddress(lh, 0));
565 
566                 byte[] array = new byte[100];
567                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
568                 if (timeout > 0)
569                     dc.socket().setSoTimeout(timeout);
570 
571                 // close channel/socket when current thread blocks in receive
572                 runAfterParkedAsync(dc::close);
573 
574                 assertThrows(SocketException.class, () -> dc.socket().receive(p));
575             }
576         });
577     }
578 
579     /**
580      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
581      */
582     @ParameterizedTest
583     @ValueSource(ints = { 0, 60_000 })
584     void testDatagramSocketAdaptorReceiveInterrupt(int timeout) throws Exception {
585         VThreadRunner.run(() -> {
586             try (DatagramChannel dc = DatagramChannel.open()) {
587                 InetAddress lh = InetAddress.getLoopbackAddress();
588                 dc.bind(new InetSocketAddress(lh, 0));
589 
590                 byte[] array = new byte[100];
591                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
592                 if (timeout > 0)
593                     dc.socket().setSoTimeout(timeout);
594 
595                 // interrupt current thread when it blocks in receive
596                 Thread thisThread = Thread.currentThread();
597                 runAfterParkedAsync(thisThread::interrupt);
598 
599                 try {
600                     dc.socket().receive(p);
601                     fail();
602                 } catch (ClosedByInterruptException expected) {
603                     assertTrue(Thread.interrupted());
604                 }
605             }
606         });
607     }
608 
609     /**
610      * Pipe read/write, no blocking.
611      */
612     @Test
613     void testPipeReadWrite1() throws Exception {
614         VThreadRunner.run(() -> {
615             Pipe p = Pipe.open();
616             try (Pipe.SinkChannel sink = p.sink();
617                  Pipe.SourceChannel source = p.source()) {
618 
619                 // write should not block
620                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
621                 int n = sink.write(bb);
622                 assertTrue(n > 0);
623 
624                 // read should not block
625                 bb = ByteBuffer.allocate(10);
626                 n = source.read(bb);
627                 assertTrue(n > 0);
628                 assertTrue(bb.get(0) == 'X');
629             }
630         });
631     }
632 
633     /**
634      * Virtual thread blocks in Pipe.SourceChannel read.
635      */
636     @Test
637     void testPipeReadWrite2() throws Exception {
638         VThreadRunner.run(() -> {
639             Pipe p = Pipe.open();
640             try (Pipe.SinkChannel sink = p.sink();
641                  Pipe.SourceChannel source = p.source()) {
642 
643                 // write from sink when current thread blocks reading from source
644                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
645                 runAfterParkedAsync(() -> sink.write(bb1));
646 
647                 // read should block
648                 ByteBuffer bb2 = ByteBuffer.allocate(10);
649                 int n = source.read(bb2);
650                 assertTrue(n > 0);
651                 assertTrue(bb2.get(0) == 'X');
652             }
653         });
654     }
655 
656     /**
657      * Virtual thread blocks in Pipe.SinkChannel write.
658      */
659     @Test
660     void testPipeReadWrite3() throws Exception {
661         VThreadRunner.run(() -> {
662             Pipe p = Pipe.open();
663             try (Pipe.SinkChannel sink = p.sink();
664                  Pipe.SourceChannel source = p.source()) {
665 
666                 // read from source to EOF when current thread blocking in write
667                 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
668 
669                 // write to sink should block
670                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
671                 for (int i=0; i<1000; i++) {
672                     int n = sink.write(bb);
673                     assertTrue(n > 0);
674                     bb.clear();
675                 }
676                 sink.close();
677 
678                 // wait for reader to finish
679                 reader.join();
680             }
681         });
682     }
683 
684     /**
685      * Pipe.SourceChannel close while virtual thread blocked in read.
686      */
687     @Test
688     void testPipeReadAsyncClose() throws Exception {
689         VThreadRunner.run(() -> {
690             Pipe p = Pipe.open();
691             try (Pipe.SinkChannel sink = p.sink();
692                  Pipe.SourceChannel source = p.source()) {
693                 runAfterParkedAsync(source::close);
694                 try {
695                     int n = source.read(ByteBuffer.allocate(100));
696                     fail("read returned " + n);
697                 } catch (AsynchronousCloseException expected) { }
698             }
699         });
700     }
701 
702     /**
703      * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
704      */
705     @Test
706     void testPipeReadInterrupt() throws Exception {
707         VThreadRunner.run(() -> {
708             Pipe p = Pipe.open();
709             try (Pipe.SinkChannel sink = p.sink();
710                  Pipe.SourceChannel source = p.source()) {
711 
712                 // interrupt current thread when it blocks reading from source
713                 Thread thisThread = Thread.currentThread();
714                 runAfterParkedAsync(thisThread::interrupt);
715 
716                 try {
717                     int n = source.read(ByteBuffer.allocate(100));
718                     fail("read returned " + n);
719                 } catch (ClosedByInterruptException expected) {
720                     assertTrue(Thread.interrupted());
721                 }
722             }
723         });
724     }
725 
726     /**
727      * Pipe.SinkChannel close while virtual thread blocked in write.
728      */
729     @Test
730     void testPipeWriteAsyncClose() throws Exception {
731         VThreadRunner.run(() -> {
732             boolean done = false;
733             while (!done) {
734                 Pipe p = Pipe.open();
735                 try (Pipe.SinkChannel sink = p.sink();
736                      Pipe.SourceChannel source = p.source()) {
737 
738                     // close sink when current thread blocks in write
739                     runAfterParkedAsync(sink::close, true);
740 
741                     // write until channel is closed
742                     try {
743                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
744                         for (;;) {
745                             int n = sink.write(bb);
746                             assertTrue(n > 0);
747                             bb.clear();
748                         }
749                     } catch (AsynchronousCloseException e) {
750                         // closed when blocked in write
751                         done = true;
752                     } catch (ClosedChannelException e) {
753                         // closed but not blocked in write, need to retry test
754                         System.err.format("%s, need to retry!%n", e);
755                     }
756                 }
757             }
758         });
759     }
760 
761     /**
762      * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
763      */
764     @Test
765     void testPipeWriteInterrupt() throws Exception {
766         VThreadRunner.run(() -> {
767             boolean done = false;
768             while (!done) {
769                 Pipe p = Pipe.open();
770                 try (Pipe.SinkChannel sink = p.sink();
771                      Pipe.SourceChannel source = p.source()) {
772 
773                     // interrupt current thread when it blocks in write
774                     Thread thisThread = Thread.currentThread();
775                     runAfterParkedAsync(thisThread::interrupt, true);
776 
777                     // write until channel is closed
778                     try {
779                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
780                         for (;;) {
781                             int n = sink.write(bb);
782                             assertTrue(n > 0);
783                             bb.clear();
784                         }
785                     } catch (ClosedByInterruptException expected) {
786                         // closed when blocked in write
787                         assertTrue(Thread.interrupted());
788                         done = true;
789                     } catch (ClosedChannelException e) {
790                         // closed but not blocked in write, need to retry test
791                         System.err.format("%s, need to retry!%n", e);
792                     }
793                 }
794             }
795         });
796     }
797 
798     /**
799      * Creates a loopback connection
800      */
801     static class Connection implements Closeable {
802         private final SocketChannel sc1;
803         private final SocketChannel sc2;
804         Connection() throws IOException {
805             var lh = InetAddress.getLoopbackAddress();
806             try (var listener = ServerSocketChannel.open()) {
807                 listener.bind(new InetSocketAddress(lh, 0));
808                 SocketChannel sc1 = SocketChannel.open();
809                 SocketChannel sc2 = null;
810                 try {
811                     sc1.socket().connect(listener.getLocalAddress());
812                     sc2 = listener.accept();
813                 } catch (IOException ioe) {
814                     sc1.close();
815                     throw ioe;
816                 }
817                 this.sc1 = sc1;
818                 this.sc2 = sc2;
819             }
820         }
821         SocketChannel channel1() {
822             return sc1;
823         }
824         SocketChannel channel2() {
825             return sc2;
826         }
827         @Override
828         public void close() throws IOException {
829             sc1.close();
830             sc2.close();
831         }
832     }
833 
834     /**
835      * Read from a channel until all bytes have been read or an I/O error occurs.
836      */
837     static void readToEOF(ReadableByteChannel rbc) throws IOException {
838         ByteBuffer bb = ByteBuffer.allocate(16*1024);
839         int n;
840         while ((n = rbc.read(bb)) > 0) {
841             bb.clear();
842         }
843     }
844 
845     @FunctionalInterface
846     interface ThrowingRunnable {
847         void run() throws Exception;
848     }
849 
850     /**
851      * Runs the given task asynchronously after the current virtual thread parks.
852      * @param writing if the thread will block in write
853      * @return the thread started to run the task
854      */
855     private static Thread runAfterParkedAsync(ThrowingRunnable task, boolean writing) {
856         Thread target = Thread.currentThread();
857         if (!target.isVirtual())
858             throw new WrongThreadException();
859         return Thread.ofPlatform().daemon().start(() -> {
860             try {
861                 // wait for target thread to park
862                 while (!isWaiting(target)) {
863                     Thread.sleep(20);
864                 }
865 
866                 // if the target thread is parked in write then we nudge it a few times
867                 // to avoid wakeup with some bytes written
868                 if (writing) {
869                     for (int i = 0; i < 3; i++) {
870                         LockSupport.unpark(target);
871                         while (!isWaiting(target)) {
872                             Thread.sleep(20);
873                         }
874                     }
875                 }
876 
877                 task.run();
878 
879             } catch (Exception e) {
880                 e.printStackTrace();
881             }
882         });
883     }
884 
885     private static Thread runAfterParkedAsync(ThrowingRunnable task) {
886         return runAfterParkedAsync(task, false);
887     }
888 
889     /**
890      * Return true if the given Thread is parked.
891      */
892     private static boolean isWaiting(Thread target) {
893         Thread.State state = target.getState();
894         assertNotEquals(Thread.State.TERMINATED, state);
895         return (state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING);
896     }
897 }