1 /*
  2  * Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /*
 25  * @test id=default
 26  * @bug 8284161
 27  * @summary Test virtual threads doing blocking I/O on NIO channels
 28  * @library /test/lib
 29  * @run junit/othervm/timeout=480 BlockingChannelOps
 30  */
 31 
 32 /*
 33  * @test id=poller-modes
 34  * @requires (os.family == "linux") | (os.family == "mac")
 35  * @library /test/lib
 36  * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 BlockingChannelOps
 37  * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 BlockingChannelOps
 38  */
 39 
 40 /*
 41  * @test id=io_uring
 42  * @requires os.family == "linux"
 43  * @library /test/lib
 44  * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 -Djdk.io_uring=true BlockingChannelOps
 45  * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 -Djdk.io_uring=true BlockingChannelOps
 46  * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 -Djdk.io_uring=true BlockingChannelOps
 47  * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps
 48  * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps
 49  * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps
 50  */
 51 
 52 /*
 53  * @test id=no-vmcontinuations
 54  * @requires vm.continuations
 55  * @library /test/lib
 56  * @run junit/othervm/timeout=480 -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
 57  */
 58 
 59 import java.io.Closeable;
 60 import java.io.IOException;
 61 import java.net.DatagramPacket;
 62 import java.net.InetAddress;
 63 import java.net.InetSocketAddress;
 64 import java.net.Socket;
 65 import java.net.SocketAddress;
 66 import java.net.SocketException;
 67 import java.nio.ByteBuffer;
 68 import java.nio.channels.AsynchronousCloseException;
 69 import java.nio.channels.ClosedByInterruptException;
 70 import java.nio.channels.ClosedChannelException;
 71 import java.nio.channels.DatagramChannel;
 72 import java.nio.channels.Pipe;
 73 import java.nio.channels.ReadableByteChannel;
 74 import java.nio.channels.ServerSocketChannel;
 75 import java.nio.channels.SocketChannel;
 76 import java.nio.channels.WritableByteChannel;
 77 import java.util.concurrent.locks.LockSupport;
 78 
 79 import jdk.test.lib.thread.VThreadRunner;
 80 import org.junit.jupiter.api.Test;
 81 import org.junit.jupiter.params.ParameterizedTest;
 82 import org.junit.jupiter.params.provider.ValueSource;
 83 import static org.junit.jupiter.api.Assertions.*;
 84 
 85 class BlockingChannelOps {
 86 
 87     /**
 88      * SocketChannel read/write, no blocking.
 89      */
 90     @Test
 91     void testSocketChannelReadWrite1() throws Exception {
 92         VThreadRunner.run(() -> {
 93             try (var connection = new Connection()) {
 94                 SocketChannel sc1 = connection.channel1();
 95                 SocketChannel sc2 = connection.channel2();
 96 
 97                 // write to sc1
 98                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 99                 int n = sc1.write(bb);
100                 assertTrue(n > 0);
101 
102                 // read from sc2 should not block
103                 bb = ByteBuffer.allocate(10);
104                 n = sc2.read(bb);
105                 assertTrue(n > 0);
106                 assertTrue(bb.get(0) == 'X');
107             }
108         });
109     }
110 
111     /**
112      * Virtual thread blocks in SocketChannel read.
113      */
114     @Test
115     void testSocketChannelRead() throws Exception {
116         VThreadRunner.run(() -> {
117             try (var connection = new Connection()) {
118                 SocketChannel sc1 = connection.channel1();
119                 SocketChannel sc2 = connection.channel2();
120 
121                 // write to sc1 when current thread blocks in sc2.read
122                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
123                 runAfterParkedAsync(() -> sc1.write(bb1));
124 
125                 // read from sc2 should block
126                 ByteBuffer bb2 = ByteBuffer.allocate(10);
127                 int n = sc2.read(bb2);
128                 assertTrue(n > 0);
129                 assertTrue(bb2.get(0) == 'X');
130             }
131         });
132     }
133 
134     /**
135      * Virtual thread blocks in SocketChannel write.
136      */
137     @Test
138     void testSocketChannelWrite() throws Exception {
139         VThreadRunner.run(() -> {
140             try (var connection = new Connection()) {
141                 SocketChannel sc1 = connection.channel1();
142                 SocketChannel sc2 = connection.channel2();
143 
144                 // read from sc2 to EOF when current thread blocks in sc1.write
145                 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
146 
147                 // write to sc1 should block
148                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
149                 for (int i=0; i<1000; i++) {
150                     int n = sc1.write(bb);
151                     assertTrue(n > 0);
152                     bb.clear();
153                 }
154                 sc1.close();
155 
156                 // wait for reader to finish
157                 reader.join();
158             }
159         });
160     }
161 
162     /**
163      * SocketChannel close while virtual thread blocked in read.
164      */
165     @Test
166     void testSocketChannelReadAsyncClose() throws Exception {
167         VThreadRunner.run(() -> {
168             try (var connection = new Connection()) {
169                 SocketChannel sc = connection.channel1();
170                 runAfterParkedAsync(sc::close);
171                 try {
172                     int n = sc.read(ByteBuffer.allocate(100));
173                     fail("read returned " + n);
174                 } catch (AsynchronousCloseException expected) { }
175             }
176         });
177     }
178 
179     /**
180      * SocketChannel shutdownInput while virtual thread blocked in read.
181      */
182     @Test
183     void testSocketChannelReadAsyncShutdownInput() throws Exception {
184         VThreadRunner.run(() -> {
185             try (var connection = new Connection()) {
186                 SocketChannel sc = connection.channel1();
187                 runAfterParkedAsync(sc::shutdownInput);
188                 int n = sc.read(ByteBuffer.allocate(100));
189                 assertEquals(-1, n);
190                 assertTrue(sc.isOpen());
191             }
192         });
193     }
194 
195     /**
196      * Virtual thread interrupted while blocked in SocketChannel read.
197      */
198     @Test
199     void testSocketChannelReadInterrupt() throws Exception {
200         VThreadRunner.run(() -> {
201             try (var connection = new Connection()) {
202                 SocketChannel sc = connection.channel1();
203 
204                 // interrupt current thread when it blocks in read
205                 Thread thisThread = Thread.currentThread();
206                 runAfterParkedAsync(thisThread::interrupt);
207 
208                 try {
209                     int n = sc.read(ByteBuffer.allocate(100));
210                     fail("read returned " + n);
211                 } catch (ClosedByInterruptException expected) {
212                     assertTrue(Thread.interrupted());
213                 }
214             }
215         });
216     }
217 
218     /**
219      * SocketChannel close while virtual thread blocked in write.
220      */
221     @Test
222     void testSocketChannelWriteAsyncClose() throws Exception {
223         VThreadRunner.run(() -> {
224             boolean done = false;
225             while (!done) {
226                 try (var connection = new Connection()) {
227                     SocketChannel sc = connection.channel1();
228 
229                     // close sc when current thread blocks in write
230                     runAfterParkedAsync(sc::close, true);
231 
232                     // write until channel is closed
233                     try {
234                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
235                         for (;;) {
236                             int n = sc.write(bb);
237                             assertTrue(n > 0);
238                             bb.clear();
239                         }
240                     } catch (AsynchronousCloseException expected) {
241                         // closed when blocked in write
242                         done = true;
243                     } catch (ClosedChannelException e) {
244                         // closed but not blocked in write, need to retry test
245                         System.err.format("%s, need to retry!%n", e);
246                     }
247                 }
248             }
249         });
250     }
251 
252 
253     /**
254      * SocketChannel shutdownOutput while virtual thread blocked in write.
255      */
256     @Test
257     void testSocketChannelWriteAsyncShutdownOutput() throws Exception {
258         VThreadRunner.run(() -> {
259             try (var connection = new Connection()) {
260                 SocketChannel sc = connection.channel1();
261 
262                 // shutdown output when current thread blocks in write
263                 runAfterParkedAsync(sc::shutdownOutput);
264                 try {
265                     ByteBuffer bb = ByteBuffer.allocate(100*1024);
266                     for (;;) {
267                         int n = sc.write(bb);
268                         assertTrue(n > 0);
269                         bb.clear();
270                     }
271                 } catch (ClosedChannelException e) {
272                     // expected
273                 }
274                 assertTrue(sc.isOpen());
275             }
276         });
277     }
278 
279     /**
280      * Virtual thread interrupted while blocked in SocketChannel write.
281      */
282     @Test
283     void testSocketChannelWriteInterrupt() throws Exception {
284         VThreadRunner.run(() -> {
285             boolean done = false;
286             while (!done) {
287                 try (var connection = new Connection()) {
288                     SocketChannel sc = connection.channel1();
289 
290                     // interrupt current thread when it blocks in write
291                     Thread thisThread = Thread.currentThread();
292                     runAfterParkedAsync(thisThread::interrupt, true);
293 
294                     // write until channel is closed
295                     try {
296                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
297                         for (;;) {
298                             int n = sc.write(bb);
299                             assertTrue(n > 0);
300                             bb.clear();
301                         }
302                     } catch (ClosedByInterruptException e) {
303                         // closed when blocked in write
304                         assertTrue(Thread.interrupted());
305                         done = true;
306                     } catch (ClosedChannelException e) {
307                         // closed but not blocked in write, need to retry test
308                         System.err.format("%s, need to retry!%n", e);
309                     }
310                 }
311             }
312         });
313     }
314 
315     /**
316      * Virtual thread blocks in SocketChannel adaptor read.
317      */
318     @ParameterizedTest
319     @ValueSource(ints = { 0, 60_000 })
320     void testSocketAdaptorRead(int timeout) throws Exception {
321         VThreadRunner.run(() -> {
322             try (var connection = new Connection()) {
323                 SocketChannel sc1 = connection.channel1();
324                 SocketChannel sc2 = connection.channel2();
325 
326                 // write to sc1 when currnet thread blocks reading from sc2
327                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
328                 runAfterParkedAsync(() -> sc1.write(bb));
329 
330                 // read from sc2 should block
331                 byte[] array = new byte[100];
332                 if (timeout > 0)
333                     sc2.socket().setSoTimeout(timeout);
334                 int n = sc2.socket().getInputStream().read(array);
335                 assertTrue(n > 0);
336                 assertTrue(array[0] == 'X');
337             }
338         });
339     }
340 
341     /**
342      * ServerSocketChannel accept, no blocking.
343      */
344     @Test
345     void testServerSocketChannelAccept1() throws Exception {
346         VThreadRunner.run(() -> {
347             try (var ssc = ServerSocketChannel.open()) {
348                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
349                 var sc1 = SocketChannel.open(ssc.getLocalAddress());
350                 // accept should not block
351                 var sc2 = ssc.accept();
352                 sc1.close();
353                 sc2.close();
354             }
355         });
356     }
357 
358     /**
359      * Virtual thread blocks in ServerSocketChannel accept.
360      */
361     @Test
362     void testServerSocketChannelAccept2() throws Exception {
363         VThreadRunner.run(() -> {
364             try (var ssc = ServerSocketChannel.open()) {
365                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
366                 var sc1 = SocketChannel.open();
367 
368                 // connect when current thread when it blocks in accept
369                 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
370 
371                 // accept should block
372                 var sc2 = ssc.accept();
373                 sc1.close();
374                 sc2.close();
375             }
376         });
377     }
378 
379     /**
380      * SeverSocketChannel close while virtual thread blocked in accept.
381      */
382     @Test
383     void testServerSocketChannelAcceptAsyncClose() throws Exception {
384         VThreadRunner.run(() -> {
385             try (var ssc = ServerSocketChannel.open()) {
386                 InetAddress lh = InetAddress.getLoopbackAddress();
387                 ssc.bind(new InetSocketAddress(lh, 0));
388                 runAfterParkedAsync(ssc::close);
389                 try {
390                     SocketChannel sc = ssc.accept();
391                     sc.close();
392                     fail("connection accepted???");
393                 } catch (AsynchronousCloseException expected) { }
394             }
395         });
396     }
397 
398     /**
399      * Virtual thread interrupted while blocked in ServerSocketChannel accept.
400      */
401     @Test
402     void testServerSocketChannelAcceptInterrupt() throws Exception {
403         VThreadRunner.run(() -> {
404             try (var ssc = ServerSocketChannel.open()) {
405                 InetAddress lh = InetAddress.getLoopbackAddress();
406                 ssc.bind(new InetSocketAddress(lh, 0));
407 
408                 // interrupt current thread when it blocks in accept
409                 Thread thisThread = Thread.currentThread();
410                 runAfterParkedAsync(thisThread::interrupt);
411 
412                 try {
413                     SocketChannel sc = ssc.accept();
414                     sc.close();
415                     fail("connection accepted???");
416                 } catch (ClosedByInterruptException expected) {
417                     assertTrue(Thread.interrupted());
418                 }
419             }
420         });
421     }
422 
423     /**
424      * Virtual thread blocks in ServerSocketChannel adaptor accept.
425      */
426     @ParameterizedTest
427     @ValueSource(ints = { 0, 60_000 })
428     void testSocketChannelAdaptorAccept(int timeout) throws Exception {
429         VThreadRunner.run(() -> {
430             try (var ssc = ServerSocketChannel.open()) {
431                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
432                 var sc = SocketChannel.open();
433 
434                 // interrupt current thread when it blocks in accept
435                 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
436 
437                 // accept should block
438                 if (timeout > 0)
439                     ssc.socket().setSoTimeout(timeout);
440                 Socket s = ssc.socket().accept();
441                 sc.close();
442                 s.close();
443             }
444         });
445     }
446 
447     /**
448      * DatagramChannel receive/send, no blocking.
449      */
450     @Test
451     void testDatagramChannelSendReceive1() throws Exception {
452         VThreadRunner.run(() -> {
453             try (DatagramChannel dc1 = DatagramChannel.open();
454                  DatagramChannel dc2 = DatagramChannel.open()) {
455 
456                 InetAddress lh = InetAddress.getLoopbackAddress();
457                 dc2.bind(new InetSocketAddress(lh, 0));
458 
459                 // send should not block
460                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
461                 int n = dc1.send(bb, dc2.getLocalAddress());
462                 assertTrue(n > 0);
463 
464                 // receive should not block
465                 bb = ByteBuffer.allocate(10);
466                 dc2.receive(bb);
467                 assertTrue(bb.get(0) == 'X');
468             }
469         });
470     }
471 
472     /**
473      * Virtual thread blocks in DatagramChannel receive.
474      */
475     @Test
476     void testDatagramChannelSendReceive2() throws Exception {
477         VThreadRunner.run(() -> {
478             try (DatagramChannel dc1 = DatagramChannel.open();
479                  DatagramChannel dc2 = DatagramChannel.open()) {
480 
481                 InetAddress lh = InetAddress.getLoopbackAddress();
482                 dc2.bind(new InetSocketAddress(lh, 0));
483 
484                 // send from dc1 when current thread blocked in dc2.receive
485                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
486                 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
487 
488                 // read from dc2 should block
489                 ByteBuffer bb2 = ByteBuffer.allocate(10);
490                 dc2.receive(bb2);
491                 assertTrue(bb2.get(0) == 'X');
492             }
493         });
494     }
495 
496     /**
497      * DatagramChannel close while virtual thread blocked in receive.
498      */
499     @Test
500     void testDatagramChannelReceiveAsyncClose() throws Exception {
501         VThreadRunner.run(() -> {
502             try (DatagramChannel dc = DatagramChannel.open()) {
503                 InetAddress lh = InetAddress.getLoopbackAddress();
504                 dc.bind(new InetSocketAddress(lh, 0));
505                 runAfterParkedAsync(dc::close);
506                 try {
507                     dc.receive(ByteBuffer.allocate(100));
508                     fail("receive returned");
509                 } catch (AsynchronousCloseException expected) { }
510             }
511         });
512     }
513 
514     /**
515      * Virtual thread interrupted while blocked in DatagramChannel receive.
516      */
517     @Test
518     void testDatagramChannelReceiveInterrupt() throws Exception {
519         VThreadRunner.run(() -> {
520             try (DatagramChannel dc = DatagramChannel.open()) {
521                 InetAddress lh = InetAddress.getLoopbackAddress();
522                 dc.bind(new InetSocketAddress(lh, 0));
523 
524                 // interrupt current thread when it blocks in receive
525                 Thread thisThread = Thread.currentThread();
526                 runAfterParkedAsync(thisThread::interrupt);
527 
528                 try {
529                     dc.receive(ByteBuffer.allocate(100));
530                     fail("receive returned");
531                 } catch (ClosedByInterruptException expected) {
532                     assertTrue(Thread.interrupted());
533                 }
534             }
535         });
536     }
537 
538     /**
539      * Virtual thread blocks in DatagramSocket adaptor receive.
540      */
541     @ParameterizedTest
542     @ValueSource(ints = { 0, 60_000 })
543     void testDatagramSocketAdaptorReceive(int timeout) throws Exception {
544         VThreadRunner.run(() -> {
545             try (DatagramChannel dc1 = DatagramChannel.open();
546                  DatagramChannel dc2 = DatagramChannel.open()) {
547 
548                 InetAddress lh = InetAddress.getLoopbackAddress();
549                 dc2.bind(new InetSocketAddress(lh, 0));
550 
551                 // send from dc1 when current thread blocks in dc2 receive
552                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
553                 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
554 
555                 // receive should block
556                 byte[] array = new byte[100];
557                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
558                 if (timeout > 0)
559                     dc2.socket().setSoTimeout(timeout);
560                 dc2.socket().receive(p);
561                 assertTrue(p.getLength() == 3 && array[0] == 'X');
562             }
563         });
564     }
565 
566     /**
567      * DatagramChannel close while virtual thread blocked in adaptor receive.
568      */
569     @ParameterizedTest
570     @ValueSource(ints = { 0, 60_000 })
571     void testDatagramSocketAdaptorReceiveAsyncClose(int timeout) throws Exception {
572         VThreadRunner.run(() -> {
573             try (DatagramChannel dc = DatagramChannel.open()) {
574                 InetAddress lh = InetAddress.getLoopbackAddress();
575                 dc.bind(new InetSocketAddress(lh, 0));
576 
577                 byte[] array = new byte[100];
578                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
579                 if (timeout > 0)
580                     dc.socket().setSoTimeout(timeout);
581 
582                 // close channel/socket when current thread blocks in receive
583                 runAfterParkedAsync(dc::close);
584 
585                 assertThrows(SocketException.class, () -> dc.socket().receive(p));
586             }
587         });
588     }
589 
590     /**
591      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
592      */
593     @ParameterizedTest
594     @ValueSource(ints = { 0, 60_000 })
595     void testDatagramSocketAdaptorReceiveInterrupt(int timeout) throws Exception {
596         VThreadRunner.run(() -> {
597             try (DatagramChannel dc = DatagramChannel.open()) {
598                 InetAddress lh = InetAddress.getLoopbackAddress();
599                 dc.bind(new InetSocketAddress(lh, 0));
600 
601                 byte[] array = new byte[100];
602                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
603                 if (timeout > 0)
604                     dc.socket().setSoTimeout(timeout);
605 
606                 // interrupt current thread when it blocks in receive
607                 Thread thisThread = Thread.currentThread();
608                 runAfterParkedAsync(thisThread::interrupt);
609 
610                 try {
611                     dc.socket().receive(p);
612                     fail();
613                 } catch (ClosedByInterruptException expected) {
614                     assertTrue(Thread.interrupted());
615                 }
616             }
617         });
618     }
619 
620     /**
621      * Pipe read/write, no blocking.
622      */
623     @Test
624     void testPipeReadWrite1() throws Exception {
625         VThreadRunner.run(() -> {
626             Pipe p = Pipe.open();
627             try (Pipe.SinkChannel sink = p.sink();
628                  Pipe.SourceChannel source = p.source()) {
629 
630                 // write should not block
631                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
632                 int n = sink.write(bb);
633                 assertTrue(n > 0);
634 
635                 // read should not block
636                 bb = ByteBuffer.allocate(10);
637                 n = source.read(bb);
638                 assertTrue(n > 0);
639                 assertTrue(bb.get(0) == 'X');
640             }
641         });
642     }
643 
644     /**
645      * Virtual thread blocks in Pipe.SourceChannel read.
646      */
647     @Test
648     void testPipeReadWrite2() throws Exception {
649         VThreadRunner.run(() -> {
650             Pipe p = Pipe.open();
651             try (Pipe.SinkChannel sink = p.sink();
652                  Pipe.SourceChannel source = p.source()) {
653 
654                 // write from sink when current thread blocks reading from source
655                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
656                 runAfterParkedAsync(() -> sink.write(bb1));
657 
658                 // read should block
659                 ByteBuffer bb2 = ByteBuffer.allocate(10);
660                 int n = source.read(bb2);
661                 assertTrue(n > 0);
662                 assertTrue(bb2.get(0) == 'X');
663             }
664         });
665     }
666 
667     /**
668      * Virtual thread blocks in Pipe.SinkChannel write.
669      */
670     @Test
671     void testPipeReadWrite3() throws Exception {
672         VThreadRunner.run(() -> {
673             Pipe p = Pipe.open();
674             try (Pipe.SinkChannel sink = p.sink();
675                  Pipe.SourceChannel source = p.source()) {
676 
677                 // read from source to EOF when current thread blocking in write
678                 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
679 
680                 // write to sink should block
681                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
682                 for (int i=0; i<1000; i++) {
683                     int n = sink.write(bb);
684                     assertTrue(n > 0);
685                     bb.clear();
686                 }
687                 sink.close();
688 
689                 // wait for reader to finish
690                 reader.join();
691             }
692         });
693     }
694 
695     /**
696      * Pipe.SourceChannel close while virtual thread blocked in read.
697      */
698     @Test
699     void testPipeReadAsyncClose() throws Exception {
700         VThreadRunner.run(() -> {
701             Pipe p = Pipe.open();
702             try (Pipe.SinkChannel sink = p.sink();
703                  Pipe.SourceChannel source = p.source()) {
704                 runAfterParkedAsync(source::close);
705                 try {
706                     int n = source.read(ByteBuffer.allocate(100));
707                     fail("read returned " + n);
708                 } catch (AsynchronousCloseException expected) { }
709             }
710         });
711     }
712 
713     /**
714      * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
715      */
716     @Test
717     void testPipeReadInterrupt() throws Exception {
718         VThreadRunner.run(() -> {
719             Pipe p = Pipe.open();
720             try (Pipe.SinkChannel sink = p.sink();
721                  Pipe.SourceChannel source = p.source()) {
722 
723                 // interrupt current thread when it blocks reading from source
724                 Thread thisThread = Thread.currentThread();
725                 runAfterParkedAsync(thisThread::interrupt);
726 
727                 try {
728                     int n = source.read(ByteBuffer.allocate(100));
729                     fail("read returned " + n);
730                 } catch (ClosedByInterruptException expected) {
731                     assertTrue(Thread.interrupted());
732                 }
733             }
734         });
735     }
736 
737     /**
738      * Pipe.SinkChannel close while virtual thread blocked in write.
739      */
740     @Test
741     void testPipeWriteAsyncClose() throws Exception {
742         VThreadRunner.run(() -> {
743             boolean done = false;
744             while (!done) {
745                 Pipe p = Pipe.open();
746                 try (Pipe.SinkChannel sink = p.sink();
747                      Pipe.SourceChannel source = p.source()) {
748 
749                     // close sink when current thread blocks in write
750                     runAfterParkedAsync(sink::close, true);
751 
752                     // write until channel is closed
753                     try {
754                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
755                         for (;;) {
756                             int n = sink.write(bb);
757                             assertTrue(n > 0);
758                             bb.clear();
759                         }
760                     } catch (AsynchronousCloseException e) {
761                         // closed when blocked in write
762                         done = true;
763                     } catch (ClosedChannelException e) {
764                         // closed but not blocked in write, need to retry test
765                         System.err.format("%s, need to retry!%n", e);
766                     }
767                 }
768             }
769         });
770     }
771 
772     /**
773      * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
774      */
775     @Test
776     void testPipeWriteInterrupt() throws Exception {
777         VThreadRunner.run(() -> {
778             boolean done = false;
779             while (!done) {
780                 Pipe p = Pipe.open();
781                 try (Pipe.SinkChannel sink = p.sink();
782                      Pipe.SourceChannel source = p.source()) {
783 
784                     // interrupt current thread when it blocks in write
785                     Thread thisThread = Thread.currentThread();
786                     runAfterParkedAsync(thisThread::interrupt, true);
787 
788                     // write until channel is closed
789                     try {
790                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
791                         for (;;) {
792                             int n = sink.write(bb);
793                             assertTrue(n > 0);
794                             bb.clear();
795                         }
796                     } catch (ClosedByInterruptException expected) {
797                         // closed when blocked in write
798                         assertTrue(Thread.interrupted());
799                         done = true;
800                     } catch (ClosedChannelException e) {
801                         // closed but not blocked in write, need to retry test
802                         System.err.format("%s, need to retry!%n", e);
803                     }
804                 }
805             }
806         });
807     }
808 
809     /**
810      * Creates a loopback connection
811      */
812     static class Connection implements Closeable {
813         private final SocketChannel sc1;
814         private final SocketChannel sc2;
815         Connection() throws IOException {
816             var lh = InetAddress.getLoopbackAddress();
817             try (var listener = ServerSocketChannel.open()) {
818                 listener.bind(new InetSocketAddress(lh, 0));
819                 SocketChannel sc1 = SocketChannel.open();
820                 SocketChannel sc2 = null;
821                 try {
822                     sc1.socket().connect(listener.getLocalAddress());
823                     sc2 = listener.accept();
824                 } catch (IOException ioe) {
825                     sc1.close();
826                     throw ioe;
827                 }
828                 this.sc1 = sc1;
829                 this.sc2 = sc2;
830             }
831         }
832         SocketChannel channel1() {
833             return sc1;
834         }
835         SocketChannel channel2() {
836             return sc2;
837         }
838         @Override
839         public void close() throws IOException {
840             sc1.close();
841             sc2.close();
842         }
843     }
844 
845     /**
846      * Read from a channel until all bytes have been read or an I/O error occurs.
847      */
848     static void readToEOF(ReadableByteChannel rbc) throws IOException {
849         ByteBuffer bb = ByteBuffer.allocate(16*1024);
850         int n;
851         while ((n = rbc.read(bb)) > 0) {
852             bb.clear();
853         }
854     }
855 
856     @FunctionalInterface
857     interface ThrowingRunnable {
858         void run() throws Exception;
859     }
860 
861     /**
862      * Runs the given task asynchronously after the current virtual thread parks.
863      * @param writing if the thread will block in write
864      * @return the thread started to run the task
865      */
866     private static Thread runAfterParkedAsync(ThrowingRunnable task, boolean writing) {
867         Thread target = Thread.currentThread();
868         if (!target.isVirtual())
869             throw new WrongThreadException();
870         return Thread.ofPlatform().daemon().start(() -> {
871             try {
872                 // wait for target thread to park
873                 while (!isWaiting(target)) {
874                     Thread.sleep(20);
875                 }
876 
877                 // if the target thread is parked in write then we nudge it a few times
878                 // to avoid wakeup with some bytes written
879                 if (writing) {
880                     for (int i = 0; i < 3; i++) {
881                         LockSupport.unpark(target);
882                         while (!isWaiting(target)) {
883                             Thread.sleep(20);
884                         }
885                     }
886                 }
887 
888                 task.run();
889 
890             } catch (Exception e) {
891                 e.printStackTrace();
892             }
893         });
894     }
895 
896     private static Thread runAfterParkedAsync(ThrowingRunnable task) {
897         return runAfterParkedAsync(task, false);
898     }
899 
900     /**
901      * Return true if the given Thread is parked.
902      */
903     private static boolean isWaiting(Thread target) {
904         Thread.State state = target.getState();
905         assertNotEquals(Thread.State.TERMINATED, state);
906         return (state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING);
907     }
908 }