1 /*
  2  * Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /*
 25  * @test id=default
 26  * @bug 8284161
 27  * @summary Test virtual threads doing blocking I/O on NIO channels
 28  * @library /test/lib
 29  * @run junit/timeout=480 BlockingChannelOps
 30  */
 31 
 32 /*
 33  * @test id=poller-modes
 34  * @requires (os.family == "linux") | (os.family == "mac")
 35  * @library /test/lib
 36  * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 BlockingChannelOps
 37  * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 BlockingChannelOps
 38  */
 39 
 40 /*
 41  * @test id=no-vmcontinuations
 42  * @requires vm.continuations
 43  * @library /test/lib
 44  * @run junit/othervm/timeout=480 -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
 45  */
 46 
 47 import java.io.Closeable;
 48 import java.io.IOException;
 49 import java.net.DatagramPacket;
 50 import java.net.InetAddress;
 51 import java.net.InetSocketAddress;
 52 import java.net.Socket;
 53 import java.net.SocketAddress;
 54 import java.net.SocketException;
 55 import java.nio.ByteBuffer;
 56 import java.nio.channels.AsynchronousCloseException;
 57 import java.nio.channels.ClosedByInterruptException;
 58 import java.nio.channels.ClosedChannelException;
 59 import java.nio.channels.DatagramChannel;
 60 import java.nio.channels.Pipe;
 61 import java.nio.channels.ReadableByteChannel;
 62 import java.nio.channels.ServerSocketChannel;
 63 import java.nio.channels.SocketChannel;
 64 import java.nio.channels.WritableByteChannel;
 65 import java.util.concurrent.locks.LockSupport;
 66 
 67 import jdk.test.lib.thread.VThreadRunner;
 68 import org.junit.jupiter.api.Test;
 69 import static org.junit.jupiter.api.Assertions.*;
 70 
 71 class BlockingChannelOps {
 72 
 73     /**
 74      * SocketChannel read/write, no blocking.
 75      */
 76     @Test
 77     void testSocketChannelReadWrite1() throws Exception {
 78         VThreadRunner.run(() -> {
 79             try (var connection = new Connection()) {
 80                 SocketChannel sc1 = connection.channel1();
 81                 SocketChannel sc2 = connection.channel2();
 82 
 83                 // write to sc1
 84                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 85                 int n = sc1.write(bb);
 86                 assertTrue(n > 0);
 87 
 88                 // read from sc2 should not block
 89                 bb = ByteBuffer.allocate(10);
 90                 n = sc2.read(bb);
 91                 assertTrue(n > 0);
 92                 assertTrue(bb.get(0) == 'X');
 93             }
 94         });
 95     }
 96 
 97     /**
 98      * Virtual thread blocks in SocketChannel read.
 99      */
100     @Test
101     void testSocketChannelRead() throws Exception {
102         VThreadRunner.run(() -> {
103             try (var connection = new Connection()) {
104                 SocketChannel sc1 = connection.channel1();
105                 SocketChannel sc2 = connection.channel2();
106 
107                 // write to sc1 when current thread blocks in sc2.read
108                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
109                 runAfterParkedAsync(() -> sc1.write(bb1));
110 
111                 // read from sc2 should block
112                 ByteBuffer bb2 = ByteBuffer.allocate(10);
113                 int n = sc2.read(bb2);
114                 assertTrue(n > 0);
115                 assertTrue(bb2.get(0) == 'X');
116             }
117         });
118     }
119 
120     /**
121      * Virtual thread blocks in SocketChannel write.
122      */
123     @Test
124     void testSocketChannelWrite() throws Exception {
125         VThreadRunner.run(() -> {
126             try (var connection = new Connection()) {
127                 SocketChannel sc1 = connection.channel1();
128                 SocketChannel sc2 = connection.channel2();
129 
130                 // read from sc2 to EOF when current thread blocks in sc1.write
131                 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
132 
133                 // write to sc1 should block
134                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
135                 for (int i=0; i<1000; i++) {
136                     int n = sc1.write(bb);
137                     assertTrue(n > 0);
138                     bb.clear();
139                 }
140                 sc1.close();
141 
142                 // wait for reader to finish
143                 reader.join();
144             }
145         });
146     }
147 
148     /**
149      * SocketChannel close while virtual thread blocked in read.
150      */
151     @Test
152     void testSocketChannelReadAsyncClose() throws Exception {
153         VThreadRunner.run(() -> {
154             try (var connection = new Connection()) {
155                 SocketChannel sc = connection.channel1();
156                 runAfterParkedAsync(sc::close);
157                 try {
158                     int n = sc.read(ByteBuffer.allocate(100));
159                     fail("read returned " + n);
160                 } catch (AsynchronousCloseException expected) { }
161             }
162         });
163     }
164 
165     /**
166      * SocketChannel shutdownInput while virtual thread blocked in read.
167      */
168     @Test
169     void testSocketChannelReadAsyncShutdownInput() throws Exception {
170         VThreadRunner.run(() -> {
171             try (var connection = new Connection()) {
172                 SocketChannel sc = connection.channel1();
173                 runAfterParkedAsync(sc::shutdownInput);
174                 int n = sc.read(ByteBuffer.allocate(100));
175                 assertEquals(-1, n);
176                 assertTrue(sc.isOpen());
177             }
178         });
179     }
180 
181     /**
182      * Virtual thread interrupted while blocked in SocketChannel read.
183      */
184     @Test
185     void testSocketChannelReadInterrupt() throws Exception {
186         VThreadRunner.run(() -> {
187             try (var connection = new Connection()) {
188                 SocketChannel sc = connection.channel1();
189 
190                 // interrupt current thread when it blocks in read
191                 Thread thisThread = Thread.currentThread();
192                 runAfterParkedAsync(thisThread::interrupt);
193 
194                 try {
195                     int n = sc.read(ByteBuffer.allocate(100));
196                     fail("read returned " + n);
197                 } catch (ClosedByInterruptException expected) {
198                     assertTrue(Thread.interrupted());
199                 }
200             }
201         });
202     }
203 
204     /**
205      * SocketChannel close while virtual thread blocked in write.
206      */
207     @Test
208     void testSocketChannelWriteAsyncClose() throws Exception {
209         VThreadRunner.run(() -> {
210             boolean done = false;
211             while (!done) {
212                 try (var connection = new Connection()) {
213                     SocketChannel sc = connection.channel1();
214 
215                     // close sc when current thread blocks in write
216                     runAfterParkedAsync(sc::close, true);
217 
218                     // write until channel is closed
219                     try {
220                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
221                         for (;;) {
222                             int n = sc.write(bb);
223                             assertTrue(n > 0);
224                             bb.clear();
225                         }
226                     } catch (AsynchronousCloseException expected) {
227                         // closed when blocked in write
228                         done = true;
229                     } catch (ClosedChannelException e) {
230                         // closed but not blocked in write, need to retry test
231                         System.err.format("%s, need to retry!%n", e);
232                     }
233                 }
234             }
235         });
236     }
237 
238 
239     /**
240      * SocketChannel shutdownOutput while virtual thread blocked in write.
241      */
242     @Test
243     void testSocketChannelWriteAsyncShutdownOutput() throws Exception {
244         VThreadRunner.run(() -> {
245             try (var connection = new Connection()) {
246                 SocketChannel sc = connection.channel1();
247 
248                 // shutdown output when current thread blocks in write
249                 runAfterParkedAsync(sc::shutdownOutput);
250                 try {
251                     ByteBuffer bb = ByteBuffer.allocate(100*1024);
252                     for (;;) {
253                         int n = sc.write(bb);
254                         assertTrue(n > 0);
255                         bb.clear();
256                     }
257                 } catch (ClosedChannelException e) {
258                     // expected
259                 }
260                 assertTrue(sc.isOpen());
261             }
262         });
263     }
264 
265     /**
266      * Virtual thread interrupted while blocked in SocketChannel write.
267      */
268     @Test
269     void testSocketChannelWriteInterrupt() throws Exception {
270         VThreadRunner.run(() -> {
271             boolean done = false;
272             while (!done) {
273                 try (var connection = new Connection()) {
274                     SocketChannel sc = connection.channel1();
275 
276                     // interrupt current thread when it blocks in write
277                     Thread thisThread = Thread.currentThread();
278                     runAfterParkedAsync(thisThread::interrupt, true);
279 
280                     // write until channel is closed
281                     try {
282                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
283                         for (;;) {
284                             int n = sc.write(bb);
285                             assertTrue(n > 0);
286                             bb.clear();
287                         }
288                     } catch (ClosedByInterruptException e) {
289                         // closed when blocked in write
290                         assertTrue(Thread.interrupted());
291                         done = true;
292                     } catch (ClosedChannelException e) {
293                         // closed but not blocked in write, need to retry test
294                         System.err.format("%s, need to retry!%n", e);
295                     }
296                 }
297             }
298         });
299     }
300 
301     /**
302      * Virtual thread blocks in SocketChannel adaptor read.
303      */
304     @Test
305     void testSocketAdaptorRead1() throws Exception {
306         testSocketAdaptorRead(0);
307     }
308 
309     /**
310      * Virtual thread blocks in SocketChannel adaptor read with timeout.
311      */
312     @Test
313     void testSocketAdaptorRead2() throws Exception {
314         testSocketAdaptorRead(60_000);
315     }
316 
317     private void testSocketAdaptorRead(int timeout) throws Exception {
318         VThreadRunner.run(() -> {
319             try (var connection = new Connection()) {
320                 SocketChannel sc1 = connection.channel1();
321                 SocketChannel sc2 = connection.channel2();
322 
323                 // write to sc1 when currnet thread blocks reading from sc2
324                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
325                 runAfterParkedAsync(() -> sc1.write(bb));
326 
327                 // read from sc2 should block
328                 byte[] array = new byte[100];
329                 if (timeout > 0)
330                     sc2.socket().setSoTimeout(timeout);
331                 int n = sc2.socket().getInputStream().read(array);
332                 assertTrue(n > 0);
333                 assertTrue(array[0] == 'X');
334             }
335         });
336     }
337 
338     /**
339      * ServerSocketChannel accept, no blocking.
340      */
341     @Test
342     void testServerSocketChannelAccept1() throws Exception {
343         VThreadRunner.run(() -> {
344             try (var ssc = ServerSocketChannel.open()) {
345                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
346                 var sc1 = SocketChannel.open(ssc.getLocalAddress());
347                 // accept should not block
348                 var sc2 = ssc.accept();
349                 sc1.close();
350                 sc2.close();
351             }
352         });
353     }
354 
355     /**
356      * Virtual thread blocks in ServerSocketChannel accept.
357      */
358     @Test
359     void testServerSocketChannelAccept2() throws Exception {
360         VThreadRunner.run(() -> {
361             try (var ssc = ServerSocketChannel.open()) {
362                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
363                 var sc1 = SocketChannel.open();
364 
365                 // connect when current thread when it blocks in accept
366                 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
367 
368                 // accept should block
369                 var sc2 = ssc.accept();
370                 sc1.close();
371                 sc2.close();
372             }
373         });
374     }
375 
376     /**
377      * SeverSocketChannel close while virtual thread blocked in accept.
378      */
379     @Test
380     void testServerSocketChannelAcceptAsyncClose() throws Exception {
381         VThreadRunner.run(() -> {
382             try (var ssc = ServerSocketChannel.open()) {
383                 InetAddress lh = InetAddress.getLoopbackAddress();
384                 ssc.bind(new InetSocketAddress(lh, 0));
385                 runAfterParkedAsync(ssc::close);
386                 try {
387                     SocketChannel sc = ssc.accept();
388                     sc.close();
389                     fail("connection accepted???");
390                 } catch (AsynchronousCloseException expected) { }
391             }
392         });
393     }
394 
395     /**
396      * Virtual thread interrupted while blocked in ServerSocketChannel accept.
397      */
398     @Test
399     void testServerSocketChannelAcceptInterrupt() throws Exception {
400         VThreadRunner.run(() -> {
401             try (var ssc = ServerSocketChannel.open()) {
402                 InetAddress lh = InetAddress.getLoopbackAddress();
403                 ssc.bind(new InetSocketAddress(lh, 0));
404 
405                 // interrupt current thread when it blocks in accept
406                 Thread thisThread = Thread.currentThread();
407                 runAfterParkedAsync(thisThread::interrupt);
408 
409                 try {
410                     SocketChannel sc = ssc.accept();
411                     sc.close();
412                     fail("connection accepted???");
413                 } catch (ClosedByInterruptException expected) {
414                     assertTrue(Thread.interrupted());
415                 }
416             }
417         });
418     }
419 
420     /**
421      * Virtual thread blocks in ServerSocketChannel adaptor accept.
422      */
423     @Test
424     void testSocketChannelAdaptorAccept1() throws Exception {
425         testSocketChannelAdaptorAccept(0);
426     }
427 
428     /**
429      * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
430      */
431     @Test
432     void testSocketChannelAdaptorAccept2() throws Exception {
433         testSocketChannelAdaptorAccept(60_000);
434     }
435 
436     private void testSocketChannelAdaptorAccept(int timeout) throws Exception {
437         VThreadRunner.run(() -> {
438             try (var ssc = ServerSocketChannel.open()) {
439                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
440                 var sc = SocketChannel.open();
441 
442                 // interrupt current thread when it blocks in accept
443                 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
444 
445                 // accept should block
446                 if (timeout > 0)
447                     ssc.socket().setSoTimeout(timeout);
448                 Socket s = ssc.socket().accept();
449                 sc.close();
450                 s.close();
451             }
452         });
453     }
454 
455     /**
456      * DatagramChannel receive/send, no blocking.
457      */
458     @Test
459     void testDatagramChannelSendReceive1() throws Exception {
460         VThreadRunner.run(() -> {
461             try (DatagramChannel dc1 = DatagramChannel.open();
462                  DatagramChannel dc2 = DatagramChannel.open()) {
463 
464                 InetAddress lh = InetAddress.getLoopbackAddress();
465                 dc2.bind(new InetSocketAddress(lh, 0));
466 
467                 // send should not block
468                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
469                 int n = dc1.send(bb, dc2.getLocalAddress());
470                 assertTrue(n > 0);
471 
472                 // receive should not block
473                 bb = ByteBuffer.allocate(10);
474                 dc2.receive(bb);
475                 assertTrue(bb.get(0) == 'X');
476             }
477         });
478     }
479 
480     /**
481      * Virtual thread blocks in DatagramChannel receive.
482      */
483     @Test
484     void testDatagramChannelSendReceive2() throws Exception {
485         VThreadRunner.run(() -> {
486             try (DatagramChannel dc1 = DatagramChannel.open();
487                  DatagramChannel dc2 = DatagramChannel.open()) {
488 
489                 InetAddress lh = InetAddress.getLoopbackAddress();
490                 dc2.bind(new InetSocketAddress(lh, 0));
491 
492                 // send from dc1 when current thread blocked in dc2.receive
493                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
494                 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
495 
496                 // read from dc2 should block
497                 ByteBuffer bb2 = ByteBuffer.allocate(10);
498                 dc2.receive(bb2);
499                 assertTrue(bb2.get(0) == 'X');
500             }
501         });
502     }
503 
504     /**
505      * DatagramChannel close while virtual thread blocked in receive.
506      */
507     @Test
508     void testDatagramChannelReceiveAsyncClose() throws Exception {
509         VThreadRunner.run(() -> {
510             try (DatagramChannel dc = DatagramChannel.open()) {
511                 InetAddress lh = InetAddress.getLoopbackAddress();
512                 dc.bind(new InetSocketAddress(lh, 0));
513                 runAfterParkedAsync(dc::close);
514                 try {
515                     dc.receive(ByteBuffer.allocate(100));
516                     fail("receive returned");
517                 } catch (AsynchronousCloseException expected) { }
518             }
519         });
520     }
521 
522     /**
523      * Virtual thread interrupted while blocked in DatagramChannel receive.
524      */
525     @Test
526     void testDatagramChannelReceiveInterrupt() throws Exception {
527         VThreadRunner.run(() -> {
528             try (DatagramChannel dc = DatagramChannel.open()) {
529                 InetAddress lh = InetAddress.getLoopbackAddress();
530                 dc.bind(new InetSocketAddress(lh, 0));
531 
532                 // interrupt current thread when it blocks in receive
533                 Thread thisThread = Thread.currentThread();
534                 runAfterParkedAsync(thisThread::interrupt);
535 
536                 try {
537                     dc.receive(ByteBuffer.allocate(100));
538                     fail("receive returned");
539                 } catch (ClosedByInterruptException expected) {
540                     assertTrue(Thread.interrupted());
541                 }
542             }
543         });
544     }
545 
546     /**
547      * Virtual thread blocks in DatagramSocket adaptor receive.
548      */
549     @Test
550     void testDatagramSocketAdaptorReceive1() throws Exception {
551         testDatagramSocketAdaptorReceive(0);
552     }
553 
554     /**
555      * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
556      */
557     @Test
558     void testDatagramSocketAdaptorReceive2() throws Exception {
559         testDatagramSocketAdaptorReceive(60_000);
560     }
561 
562     private void testDatagramSocketAdaptorReceive(int timeout) throws Exception {
563         VThreadRunner.run(() -> {
564             try (DatagramChannel dc1 = DatagramChannel.open();
565                  DatagramChannel dc2 = DatagramChannel.open()) {
566 
567                 InetAddress lh = InetAddress.getLoopbackAddress();
568                 dc2.bind(new InetSocketAddress(lh, 0));
569 
570                 // send from dc1 when current thread blocks in dc2 receive
571                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
572                 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
573 
574                 // receive should block
575                 byte[] array = new byte[100];
576                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
577                 if (timeout > 0)
578                     dc2.socket().setSoTimeout(timeout);
579                 dc2.socket().receive(p);
580                 assertTrue(p.getLength() == 3 && array[0] == 'X');
581             }
582         });
583     }
584 
585     /**
586      * DatagramChannel close while virtual thread blocked in adaptor receive.
587      */
588     @Test
589     void testDatagramSocketAdaptorReceiveAsyncClose1() throws Exception {
590         testDatagramSocketAdaptorReceiveAsyncClose(0);
591     }
592 
593     /**
594      * DatagramChannel close while virtual thread blocked in adaptor receive
595      * with timeout.
596      */
597     @Test
598     void testDatagramSocketAdaptorReceiveAsyncClose2() throws Exception {
599         testDatagramSocketAdaptorReceiveAsyncClose(60_1000);
600     }
601 
602     private void testDatagramSocketAdaptorReceiveAsyncClose(int timeout) throws Exception {
603         VThreadRunner.run(() -> {
604             try (DatagramChannel dc = DatagramChannel.open()) {
605                 InetAddress lh = InetAddress.getLoopbackAddress();
606                 dc.bind(new InetSocketAddress(lh, 0));
607 
608                 byte[] array = new byte[100];
609                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
610                 if (timeout > 0)
611                     dc.socket().setSoTimeout(timeout);
612 
613                 // close channel/socket when current thread blocks in receive
614                 runAfterParkedAsync(dc::close);
615 
616                 assertThrows(SocketException.class, () -> dc.socket().receive(p));
617             }
618         });
619     }
620 
621     /**
622      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
623      */
624     @Test
625     void testDatagramSocketAdaptorReceiveInterrupt1() throws Exception {
626         testDatagramSocketAdaptorReceiveInterrupt(0);
627     }
628 
629     /**
630      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
631      * with timeout.
632      */
633     @Test
634     void testDatagramSocketAdaptorReceiveInterrupt2() throws Exception {
635         testDatagramSocketAdaptorReceiveInterrupt(60_1000);
636     }
637 
638     private void testDatagramSocketAdaptorReceiveInterrupt(int timeout) throws Exception {
639         VThreadRunner.run(() -> {
640             try (DatagramChannel dc = DatagramChannel.open()) {
641                 InetAddress lh = InetAddress.getLoopbackAddress();
642                 dc.bind(new InetSocketAddress(lh, 0));
643 
644                 byte[] array = new byte[100];
645                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
646                 if (timeout > 0)
647                     dc.socket().setSoTimeout(timeout);
648 
649                 // interrupt current thread when it blocks in receive
650                 Thread thisThread = Thread.currentThread();
651                 runAfterParkedAsync(thisThread::interrupt);
652 
653                 try {
654                     dc.socket().receive(p);
655                     fail();
656                 } catch (ClosedByInterruptException expected) {
657                     assertTrue(Thread.interrupted());
658                 }
659             }
660         });
661     }
662 
663     /**
664      * Pipe read/write, no blocking.
665      */
666     @Test
667     void testPipeReadWrite1() throws Exception {
668         VThreadRunner.run(() -> {
669             Pipe p = Pipe.open();
670             try (Pipe.SinkChannel sink = p.sink();
671                  Pipe.SourceChannel source = p.source()) {
672 
673                 // write should not block
674                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
675                 int n = sink.write(bb);
676                 assertTrue(n > 0);
677 
678                 // read should not block
679                 bb = ByteBuffer.allocate(10);
680                 n = source.read(bb);
681                 assertTrue(n > 0);
682                 assertTrue(bb.get(0) == 'X');
683             }
684         });
685     }
686 
687     /**
688      * Virtual thread blocks in Pipe.SourceChannel read.
689      */
690     @Test
691     void testPipeReadWrite2() throws Exception {
692         VThreadRunner.run(() -> {
693             Pipe p = Pipe.open();
694             try (Pipe.SinkChannel sink = p.sink();
695                  Pipe.SourceChannel source = p.source()) {
696 
697                 // write from sink when current thread blocks reading from source
698                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
699                 runAfterParkedAsync(() -> sink.write(bb1));
700 
701                 // read should block
702                 ByteBuffer bb2 = ByteBuffer.allocate(10);
703                 int n = source.read(bb2);
704                 assertTrue(n > 0);
705                 assertTrue(bb2.get(0) == 'X');
706             }
707         });
708     }
709 
710     /**
711      * Virtual thread blocks in Pipe.SinkChannel write.
712      */
713     @Test
714     void testPipeReadWrite3() throws Exception {
715         VThreadRunner.run(() -> {
716             Pipe p = Pipe.open();
717             try (Pipe.SinkChannel sink = p.sink();
718                  Pipe.SourceChannel source = p.source()) {
719 
720                 // read from source to EOF when current thread blocking in write
721                 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
722 
723                 // write to sink should block
724                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
725                 for (int i=0; i<1000; i++) {
726                     int n = sink.write(bb);
727                     assertTrue(n > 0);
728                     bb.clear();
729                 }
730                 sink.close();
731 
732                 // wait for reader to finish
733                 reader.join();
734             }
735         });
736     }
737 
738     /**
739      * Pipe.SourceChannel close while virtual thread blocked in read.
740      */
741     @Test
742     void testPipeReadAsyncClose() throws Exception {
743         VThreadRunner.run(() -> {
744             Pipe p = Pipe.open();
745             try (Pipe.SinkChannel sink = p.sink();
746                  Pipe.SourceChannel source = p.source()) {
747                 runAfterParkedAsync(source::close);
748                 try {
749                     int n = source.read(ByteBuffer.allocate(100));
750                     fail("read returned " + n);
751                 } catch (AsynchronousCloseException expected) { }
752             }
753         });
754     }
755 
756     /**
757      * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
758      */
759     @Test
760     void testPipeReadInterrupt() throws Exception {
761         VThreadRunner.run(() -> {
762             Pipe p = Pipe.open();
763             try (Pipe.SinkChannel sink = p.sink();
764                  Pipe.SourceChannel source = p.source()) {
765 
766                 // interrupt current thread when it blocks reading from source
767                 Thread thisThread = Thread.currentThread();
768                 runAfterParkedAsync(thisThread::interrupt);
769 
770                 try {
771                     int n = source.read(ByteBuffer.allocate(100));
772                     fail("read returned " + n);
773                 } catch (ClosedByInterruptException expected) {
774                     assertTrue(Thread.interrupted());
775                 }
776             }
777         });
778     }
779 
780     /**
781      * Pipe.SinkChannel close while virtual thread blocked in write.
782      */
783     @Test
784     void testPipeWriteAsyncClose() throws Exception {
785         VThreadRunner.run(() -> {
786             boolean done = false;
787             while (!done) {
788                 Pipe p = Pipe.open();
789                 try (Pipe.SinkChannel sink = p.sink();
790                      Pipe.SourceChannel source = p.source()) {
791 
792                     // close sink when current thread blocks in write
793                     runAfterParkedAsync(sink::close, true);
794 
795                     // write until channel is closed
796                     try {
797                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
798                         for (;;) {
799                             int n = sink.write(bb);
800                             assertTrue(n > 0);
801                             bb.clear();
802                         }
803                     } catch (AsynchronousCloseException e) {
804                         // closed when blocked in write
805                         done = true;
806                     } catch (ClosedChannelException e) {
807                         // closed but not blocked in write, need to retry test
808                         System.err.format("%s, need to retry!%n", e);
809                     }
810                 }
811             }
812         });
813     }
814 
815     /**
816      * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
817      */
818     @Test
819     void testPipeWriteInterrupt() throws Exception {
820         VThreadRunner.run(() -> {
821             boolean done = false;
822             while (!done) {
823                 Pipe p = Pipe.open();
824                 try (Pipe.SinkChannel sink = p.sink();
825                      Pipe.SourceChannel source = p.source()) {
826 
827                     // interrupt current thread when it blocks in write
828                     Thread thisThread = Thread.currentThread();
829                     runAfterParkedAsync(thisThread::interrupt, true);
830 
831                     // write until channel is closed
832                     try {
833                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
834                         for (;;) {
835                             int n = sink.write(bb);
836                             assertTrue(n > 0);
837                             bb.clear();
838                         }
839                     } catch (ClosedByInterruptException expected) {
840                         // closed when blocked in write
841                         assertTrue(Thread.interrupted());
842                         done = true;
843                     } catch (ClosedChannelException e) {
844                         // closed but not blocked in write, need to retry test
845                         System.err.format("%s, need to retry!%n", e);
846                     }
847                 }
848             }
849         });
850     }
851 
852     /**
853      * Creates a loopback connection
854      */
855     static class Connection implements Closeable {
856         private final SocketChannel sc1;
857         private final SocketChannel sc2;
858         Connection() throws IOException {
859             var lh = InetAddress.getLoopbackAddress();
860             try (var listener = ServerSocketChannel.open()) {
861                 listener.bind(new InetSocketAddress(lh, 0));
862                 SocketChannel sc1 = SocketChannel.open();
863                 SocketChannel sc2 = null;
864                 try {
865                     sc1.socket().connect(listener.getLocalAddress());
866                     sc2 = listener.accept();
867                 } catch (IOException ioe) {
868                     sc1.close();
869                     throw ioe;
870                 }
871                 this.sc1 = sc1;
872                 this.sc2 = sc2;
873             }
874         }
875         SocketChannel channel1() {
876             return sc1;
877         }
878         SocketChannel channel2() {
879             return sc2;
880         }
881         @Override
882         public void close() throws IOException {
883             sc1.close();
884             sc2.close();
885         }
886     }
887 
888     /**
889      * Read from a channel until all bytes have been read or an I/O error occurs.
890      */
891     static void readToEOF(ReadableByteChannel rbc) throws IOException {
892         ByteBuffer bb = ByteBuffer.allocate(16*1024);
893         int n;
894         while ((n = rbc.read(bb)) > 0) {
895             bb.clear();
896         }
897     }
898 
899     @FunctionalInterface
900     interface ThrowingRunnable {
901         void run() throws Exception;
902     }
903 
904     /**
905      * Runs the given task asynchronously after the current virtual thread parks.
906      * @param writing if the thread will block in write
907      * @return the thread started to run the task
908      */
909     private static Thread runAfterParkedAsync(ThrowingRunnable task, boolean writing) {
910         Thread target = Thread.currentThread();
911         if (!target.isVirtual())
912             throw new WrongThreadException();
913         return Thread.ofPlatform().daemon().start(() -> {
914             try {
915                 // wait for target thread to park
916                 while (!isWaiting(target)) {
917                     Thread.sleep(20);
918                 }
919 
920                 // if the target thread is parked in write then we nudge it a few times
921                 // to avoid wakeup with some bytes written
922                 if (writing) {
923                     for (int i = 0; i < 3; i++) {
924                         LockSupport.unpark(target);
925                         while (!isWaiting(target)) {
926                             Thread.sleep(20);
927                         }
928                     }
929                 }
930 
931                 task.run();
932 
933             } catch (Exception e) {
934                 e.printStackTrace();
935             }
936         });
937     }
938 
939     private static Thread runAfterParkedAsync(ThrowingRunnable task) {
940         return runAfterParkedAsync(task, false);
941     }
942 
943     /**
944      * Return true if the given Thread is parked.
945      */
946     private static boolean isWaiting(Thread target) {
947         Thread.State state = target.getState();
948         assertNotEquals(Thread.State.TERMINATED, state);
949         return (state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING);
950     }
951 }