1 /*
  2  * Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /*
 25  * @test id=default
 26  * @bug 8284161
 27  * @summary Test virtual threads doing blocking I/O on NIO channels
 28  * @library /test/lib
 29  * @run junit/othervm/timeout=480 BlockingChannelOps
 30  */
 31 
 32 /*
 33  * @test id=poller-modes
 34  * @requires (os.family == "linux") | (os.family == "mac")
 35  * @library /test/lib
 36  * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 BlockingChannelOps
 37  * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 BlockingChannelOps
 38  * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 BlockingChannelOps
 39  */
 40 
 41 /*
 42  * @test id=io_uring
 43  * @requires os.family == "linux"
 44  * @library /test/lib
 45  * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 -Djdk.io_uring=true BlockingChannelOps
 46  * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 -Djdk.io_uring=true BlockingChannelOps
 47  * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 -Djdk.io_uring=true BlockingChannelOps
 48  * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps
 49  * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps
 50  * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps
 51  */
 52 
 53 /*
 54  * @test id=no-vmcontinuations
 55  * @requires vm.continuations
 56  * @library /test/lib
 57  * @run junit/othervm/timeout=480 -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
 58  */
 59 
 60 import java.io.Closeable;
 61 import java.io.IOException;
 62 import java.net.DatagramPacket;
 63 import java.net.InetAddress;
 64 import java.net.InetSocketAddress;
 65 import java.net.Socket;
 66 import java.net.SocketAddress;
 67 import java.net.SocketException;
 68 import java.nio.ByteBuffer;
 69 import java.nio.channels.AsynchronousCloseException;
 70 import java.nio.channels.ClosedByInterruptException;
 71 import java.nio.channels.ClosedChannelException;
 72 import java.nio.channels.DatagramChannel;
 73 import java.nio.channels.Pipe;
 74 import java.nio.channels.ReadableByteChannel;
 75 import java.nio.channels.ServerSocketChannel;
 76 import java.nio.channels.SocketChannel;
 77 import java.nio.channels.WritableByteChannel;
 78 import java.util.concurrent.locks.LockSupport;
 79 
 80 import jdk.test.lib.thread.VThreadRunner;
 81 import org.junit.jupiter.api.Test;
 82 import org.junit.jupiter.params.ParameterizedTest;
 83 import org.junit.jupiter.params.provider.ValueSource;
 84 import static org.junit.jupiter.api.Assertions.*;
 85 
 86 class BlockingChannelOps {
 87 
 88     /**
 89      * SocketChannel read/write, no blocking.
 90      */
 91     @Test
 92     void testSocketChannelReadWrite1() throws Exception {
 93         VThreadRunner.run(() -> {
 94             try (var connection = new Connection()) {
 95                 SocketChannel sc1 = connection.channel1();
 96                 SocketChannel sc2 = connection.channel2();
 97 
 98                 // write to sc1
 99                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
100                 int n = sc1.write(bb);
101                 assertTrue(n > 0);
102 
103                 // read from sc2 should not block
104                 bb = ByteBuffer.allocate(10);
105                 n = sc2.read(bb);
106                 assertTrue(n > 0);
107                 assertTrue(bb.get(0) == 'X');
108             }
109         });
110     }
111 
112     /**
113      * Virtual thread blocks in SocketChannel read.
114      */
115     @Test
116     void testSocketChannelRead() throws Exception {
117         VThreadRunner.run(() -> {
118             try (var connection = new Connection()) {
119                 SocketChannel sc1 = connection.channel1();
120                 SocketChannel sc2 = connection.channel2();
121 
122                 // write to sc1 when current thread blocks in sc2.read
123                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
124                 runAfterParkedAsync(() -> sc1.write(bb1));
125 
126                 // read from sc2 should block
127                 ByteBuffer bb2 = ByteBuffer.allocate(10);
128                 int n = sc2.read(bb2);
129                 assertTrue(n > 0);
130                 assertTrue(bb2.get(0) == 'X');
131             }
132         });
133     }
134 
135     /**
136      * Virtual thread blocks in SocketChannel write.
137      */
138     @Test
139     void testSocketChannelWrite() throws Exception {
140         VThreadRunner.run(() -> {
141             try (var connection = new Connection()) {
142                 SocketChannel sc1 = connection.channel1();
143                 SocketChannel sc2 = connection.channel2();
144 
145                 // read from sc2 to EOF when current thread blocks in sc1.write
146                 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
147 
148                 // write to sc1 should block
149                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
150                 for (int i=0; i<1000; i++) {
151                     int n = sc1.write(bb);
152                     assertTrue(n > 0);
153                     bb.clear();
154                 }
155                 sc1.close();
156 
157                 // wait for reader to finish
158                 reader.join();
159             }
160         });
161     }
162 
163     /**
164      * SocketChannel close while virtual thread blocked in read.
165      */
166     @Test
167     void testSocketChannelReadAsyncClose() throws Exception {
168         VThreadRunner.run(() -> {
169             try (var connection = new Connection()) {
170                 SocketChannel sc = connection.channel1();
171                 runAfterParkedAsync(sc::close);
172                 try {
173                     int n = sc.read(ByteBuffer.allocate(100));
174                     fail("read returned " + n);
175                 } catch (AsynchronousCloseException expected) { }
176             }
177         });
178     }
179 
180     /**
181      * SocketChannel shutdownInput while virtual thread blocked in read.
182      */
183     @Test
184     void testSocketChannelReadAsyncShutdownInput() throws Exception {
185         VThreadRunner.run(() -> {
186             try (var connection = new Connection()) {
187                 SocketChannel sc = connection.channel1();
188                 runAfterParkedAsync(sc::shutdownInput);
189                 int n = sc.read(ByteBuffer.allocate(100));
190                 assertEquals(-1, n);
191                 assertTrue(sc.isOpen());
192             }
193         });
194     }
195 
196     /**
197      * Virtual thread interrupted while blocked in SocketChannel read.
198      */
199     @Test
200     void testSocketChannelReadInterrupt() throws Exception {
201         VThreadRunner.run(() -> {
202             try (var connection = new Connection()) {
203                 SocketChannel sc = connection.channel1();
204 
205                 // interrupt current thread when it blocks in read
206                 Thread thisThread = Thread.currentThread();
207                 runAfterParkedAsync(thisThread::interrupt);
208 
209                 try {
210                     int n = sc.read(ByteBuffer.allocate(100));
211                     fail("read returned " + n);
212                 } catch (ClosedByInterruptException expected) {
213                     assertTrue(Thread.interrupted());
214                 }
215             }
216         });
217     }
218 
219     /**
220      * SocketChannel close while virtual thread blocked in write.
221      */
222     @Test
223     void testSocketChannelWriteAsyncClose() throws Exception {
224         VThreadRunner.run(() -> {
225             boolean done = false;
226             while (!done) {
227                 try (var connection = new Connection()) {
228                     SocketChannel sc = connection.channel1();
229 
230                     // close sc when current thread blocks in write
231                     runAfterParkedAsync(sc::close, true);
232 
233                     // write until channel is closed
234                     try {
235                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
236                         for (;;) {
237                             int n = sc.write(bb);
238                             assertTrue(n > 0);
239                             bb.clear();
240                         }
241                     } catch (AsynchronousCloseException expected) {
242                         // closed when blocked in write
243                         done = true;
244                     } catch (ClosedChannelException e) {
245                         // closed but not blocked in write, need to retry test
246                         System.err.format("%s, need to retry!%n", e);
247                     }
248                 }
249             }
250         });
251     }
252 
253 
254     /**
255      * SocketChannel shutdownOutput while virtual thread blocked in write.
256      */
257     @Test
258     void testSocketChannelWriteAsyncShutdownOutput() throws Exception {
259         VThreadRunner.run(() -> {
260             try (var connection = new Connection()) {
261                 SocketChannel sc = connection.channel1();
262 
263                 // shutdown output when current thread blocks in write
264                 runAfterParkedAsync(sc::shutdownOutput);
265                 try {
266                     ByteBuffer bb = ByteBuffer.allocate(100*1024);
267                     for (;;) {
268                         int n = sc.write(bb);
269                         assertTrue(n > 0);
270                         bb.clear();
271                     }
272                 } catch (ClosedChannelException e) {
273                     // expected
274                 }
275                 assertTrue(sc.isOpen());
276             }
277         });
278     }
279 
280     /**
281      * Virtual thread interrupted while blocked in SocketChannel write.
282      */
283     @Test
284     void testSocketChannelWriteInterrupt() throws Exception {
285         VThreadRunner.run(() -> {
286             boolean done = false;
287             while (!done) {
288                 try (var connection = new Connection()) {
289                     SocketChannel sc = connection.channel1();
290 
291                     // interrupt current thread when it blocks in write
292                     Thread thisThread = Thread.currentThread();
293                     runAfterParkedAsync(thisThread::interrupt, true);
294 
295                     // write until channel is closed
296                     try {
297                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
298                         for (;;) {
299                             int n = sc.write(bb);
300                             assertTrue(n > 0);
301                             bb.clear();
302                         }
303                     } catch (ClosedByInterruptException e) {
304                         // closed when blocked in write
305                         assertTrue(Thread.interrupted());
306                         done = true;
307                     } catch (ClosedChannelException e) {
308                         // closed but not blocked in write, need to retry test
309                         System.err.format("%s, need to retry!%n", e);
310                     }
311                 }
312             }
313         });
314     }
315 
316     /**
317      * Virtual thread blocks in SocketChannel adaptor read.
318      */
319     @ParameterizedTest
320     @ValueSource(ints = { 0, 60_000 })
321     void testSocketAdaptorRead(int timeout) throws Exception {
322         VThreadRunner.run(() -> {
323             try (var connection = new Connection()) {
324                 SocketChannel sc1 = connection.channel1();
325                 SocketChannel sc2 = connection.channel2();
326 
327                 // write to sc1 when currnet thread blocks reading from sc2
328                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
329                 runAfterParkedAsync(() -> sc1.write(bb));
330 
331                 // read from sc2 should block
332                 byte[] array = new byte[100];
333                 if (timeout > 0)
334                     sc2.socket().setSoTimeout(timeout);
335                 int n = sc2.socket().getInputStream().read(array);
336                 assertTrue(n > 0);
337                 assertTrue(array[0] == 'X');
338             }
339         });
340     }
341 
342     /**
343      * ServerSocketChannel accept, no blocking.
344      */
345     @Test
346     void testServerSocketChannelAccept1() throws Exception {
347         VThreadRunner.run(() -> {
348             try (var ssc = ServerSocketChannel.open()) {
349                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
350                 var sc1 = SocketChannel.open(ssc.getLocalAddress());
351                 // accept should not block
352                 var sc2 = ssc.accept();
353                 sc1.close();
354                 sc2.close();
355             }
356         });
357     }
358 
359     /**
360      * Virtual thread blocks in ServerSocketChannel accept.
361      */
362     @Test
363     void testServerSocketChannelAccept2() throws Exception {
364         VThreadRunner.run(() -> {
365             try (var ssc = ServerSocketChannel.open()) {
366                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
367                 var sc1 = SocketChannel.open();
368 
369                 // connect when current thread when it blocks in accept
370                 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
371 
372                 // accept should block
373                 var sc2 = ssc.accept();
374                 sc1.close();
375                 sc2.close();
376             }
377         });
378     }
379 
380     /**
381      * SeverSocketChannel close while virtual thread blocked in accept.
382      */
383     @Test
384     void testServerSocketChannelAcceptAsyncClose() throws Exception {
385         VThreadRunner.run(() -> {
386             try (var ssc = ServerSocketChannel.open()) {
387                 InetAddress lh = InetAddress.getLoopbackAddress();
388                 ssc.bind(new InetSocketAddress(lh, 0));
389                 runAfterParkedAsync(ssc::close);
390                 try {
391                     SocketChannel sc = ssc.accept();
392                     sc.close();
393                     fail("connection accepted???");
394                 } catch (AsynchronousCloseException expected) { }
395             }
396         });
397     }
398 
399     /**
400      * Virtual thread interrupted while blocked in ServerSocketChannel accept.
401      */
402     @Test
403     void testServerSocketChannelAcceptInterrupt() throws Exception {
404         VThreadRunner.run(() -> {
405             try (var ssc = ServerSocketChannel.open()) {
406                 InetAddress lh = InetAddress.getLoopbackAddress();
407                 ssc.bind(new InetSocketAddress(lh, 0));
408 
409                 // interrupt current thread when it blocks in accept
410                 Thread thisThread = Thread.currentThread();
411                 runAfterParkedAsync(thisThread::interrupt);
412 
413                 try {
414                     SocketChannel sc = ssc.accept();
415                     sc.close();
416                     fail("connection accepted???");
417                 } catch (ClosedByInterruptException expected) {
418                     assertTrue(Thread.interrupted());
419                 }
420             }
421         });
422     }
423 
424     /**
425      * Virtual thread blocks in ServerSocketChannel adaptor accept.
426      */
427     @ParameterizedTest
428     @ValueSource(ints = { 0, 60_000 })
429     void testSocketChannelAdaptorAccept(int timeout) throws Exception {
430         VThreadRunner.run(() -> {
431             try (var ssc = ServerSocketChannel.open()) {
432                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
433                 var sc = SocketChannel.open();
434 
435                 // interrupt current thread when it blocks in accept
436                 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
437 
438                 // accept should block
439                 if (timeout > 0)
440                     ssc.socket().setSoTimeout(timeout);
441                 Socket s = ssc.socket().accept();
442                 sc.close();
443                 s.close();
444             }
445         });
446     }
447 
448     /**
449      * DatagramChannel receive/send, no blocking.
450      */
451     @Test
452     void testDatagramChannelSendReceive1() throws Exception {
453         VThreadRunner.run(() -> {
454             try (DatagramChannel dc1 = DatagramChannel.open();
455                  DatagramChannel dc2 = DatagramChannel.open()) {
456 
457                 InetAddress lh = InetAddress.getLoopbackAddress();
458                 dc2.bind(new InetSocketAddress(lh, 0));
459 
460                 // send should not block
461                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
462                 int n = dc1.send(bb, dc2.getLocalAddress());
463                 assertTrue(n > 0);
464 
465                 // receive should not block
466                 bb = ByteBuffer.allocate(10);
467                 dc2.receive(bb);
468                 assertTrue(bb.get(0) == 'X');
469             }
470         });
471     }
472 
473     /**
474      * Virtual thread blocks in DatagramChannel receive.
475      */
476     @Test
477     void testDatagramChannelSendReceive2() throws Exception {
478         VThreadRunner.run(() -> {
479             try (DatagramChannel dc1 = DatagramChannel.open();
480                  DatagramChannel dc2 = DatagramChannel.open()) {
481 
482                 InetAddress lh = InetAddress.getLoopbackAddress();
483                 dc2.bind(new InetSocketAddress(lh, 0));
484 
485                 // send from dc1 when current thread blocked in dc2.receive
486                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
487                 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
488 
489                 // read from dc2 should block
490                 ByteBuffer bb2 = ByteBuffer.allocate(10);
491                 dc2.receive(bb2);
492                 assertTrue(bb2.get(0) == 'X');
493             }
494         });
495     }
496 
497     /**
498      * DatagramChannel close while virtual thread blocked in receive.
499      */
500     @Test
501     void testDatagramChannelReceiveAsyncClose() throws Exception {
502         VThreadRunner.run(() -> {
503             try (DatagramChannel dc = DatagramChannel.open()) {
504                 InetAddress lh = InetAddress.getLoopbackAddress();
505                 dc.bind(new InetSocketAddress(lh, 0));
506                 runAfterParkedAsync(dc::close);
507                 try {
508                     dc.receive(ByteBuffer.allocate(100));
509                     fail("receive returned");
510                 } catch (AsynchronousCloseException expected) { }
511             }
512         });
513     }
514 
515     /**
516      * Virtual thread interrupted while blocked in DatagramChannel receive.
517      */
518     @Test
519     void testDatagramChannelReceiveInterrupt() throws Exception {
520         VThreadRunner.run(() -> {
521             try (DatagramChannel dc = DatagramChannel.open()) {
522                 InetAddress lh = InetAddress.getLoopbackAddress();
523                 dc.bind(new InetSocketAddress(lh, 0));
524 
525                 // interrupt current thread when it blocks in receive
526                 Thread thisThread = Thread.currentThread();
527                 runAfterParkedAsync(thisThread::interrupt);
528 
529                 try {
530                     dc.receive(ByteBuffer.allocate(100));
531                     fail("receive returned");
532                 } catch (ClosedByInterruptException expected) {
533                     assertTrue(Thread.interrupted());
534                 }
535             }
536         });
537     }
538 
539     /**
540      * Virtual thread blocks in DatagramSocket adaptor receive.
541      */
542     @ParameterizedTest
543     @ValueSource(ints = { 0, 60_000 })
544     void testDatagramSocketAdaptorReceive(int timeout) throws Exception {
545         VThreadRunner.run(() -> {
546             try (DatagramChannel dc1 = DatagramChannel.open();
547                  DatagramChannel dc2 = DatagramChannel.open()) {
548 
549                 InetAddress lh = InetAddress.getLoopbackAddress();
550                 dc2.bind(new InetSocketAddress(lh, 0));
551 
552                 // send from dc1 when current thread blocks in dc2 receive
553                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
554                 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
555 
556                 // receive should block
557                 byte[] array = new byte[100];
558                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
559                 if (timeout > 0)
560                     dc2.socket().setSoTimeout(timeout);
561                 dc2.socket().receive(p);
562                 assertTrue(p.getLength() == 3 && array[0] == 'X');
563             }
564         });
565     }
566 
567     /**
568      * DatagramChannel close while virtual thread blocked in adaptor receive.
569      */
570     @ParameterizedTest
571     @ValueSource(ints = { 0, 60_000 })
572     void testDatagramSocketAdaptorReceiveAsyncClose(int timeout) throws Exception {
573         VThreadRunner.run(() -> {
574             try (DatagramChannel dc = DatagramChannel.open()) {
575                 InetAddress lh = InetAddress.getLoopbackAddress();
576                 dc.bind(new InetSocketAddress(lh, 0));
577 
578                 byte[] array = new byte[100];
579                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
580                 if (timeout > 0)
581                     dc.socket().setSoTimeout(timeout);
582 
583                 // close channel/socket when current thread blocks in receive
584                 runAfterParkedAsync(dc::close);
585 
586                 assertThrows(SocketException.class, () -> dc.socket().receive(p));
587             }
588         });
589     }
590 
591     /**
592      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
593      */
594     @ParameterizedTest
595     @ValueSource(ints = { 0, 60_000 })
596     void testDatagramSocketAdaptorReceiveInterrupt(int timeout) throws Exception {
597         VThreadRunner.run(() -> {
598             try (DatagramChannel dc = DatagramChannel.open()) {
599                 InetAddress lh = InetAddress.getLoopbackAddress();
600                 dc.bind(new InetSocketAddress(lh, 0));
601 
602                 byte[] array = new byte[100];
603                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
604                 if (timeout > 0)
605                     dc.socket().setSoTimeout(timeout);
606 
607                 // interrupt current thread when it blocks in receive
608                 Thread thisThread = Thread.currentThread();
609                 runAfterParkedAsync(thisThread::interrupt);
610 
611                 try {
612                     dc.socket().receive(p);
613                     fail();
614                 } catch (ClosedByInterruptException expected) {
615                     assertTrue(Thread.interrupted());
616                 }
617             }
618         });
619     }
620 
621     /**
622      * Pipe read/write, no blocking.
623      */
624     @Test
625     void testPipeReadWrite1() throws Exception {
626         VThreadRunner.run(() -> {
627             Pipe p = Pipe.open();
628             try (Pipe.SinkChannel sink = p.sink();
629                  Pipe.SourceChannel source = p.source()) {
630 
631                 // write should not block
632                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
633                 int n = sink.write(bb);
634                 assertTrue(n > 0);
635 
636                 // read should not block
637                 bb = ByteBuffer.allocate(10);
638                 n = source.read(bb);
639                 assertTrue(n > 0);
640                 assertTrue(bb.get(0) == 'X');
641             }
642         });
643     }
644 
645     /**
646      * Virtual thread blocks in Pipe.SourceChannel read.
647      */
648     @Test
649     void testPipeReadWrite2() throws Exception {
650         VThreadRunner.run(() -> {
651             Pipe p = Pipe.open();
652             try (Pipe.SinkChannel sink = p.sink();
653                  Pipe.SourceChannel source = p.source()) {
654 
655                 // write from sink when current thread blocks reading from source
656                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
657                 runAfterParkedAsync(() -> sink.write(bb1));
658 
659                 // read should block
660                 ByteBuffer bb2 = ByteBuffer.allocate(10);
661                 int n = source.read(bb2);
662                 assertTrue(n > 0);
663                 assertTrue(bb2.get(0) == 'X');
664             }
665         });
666     }
667 
668     /**
669      * Virtual thread blocks in Pipe.SinkChannel write.
670      */
671     @Test
672     void testPipeReadWrite3() throws Exception {
673         VThreadRunner.run(() -> {
674             Pipe p = Pipe.open();
675             try (Pipe.SinkChannel sink = p.sink();
676                  Pipe.SourceChannel source = p.source()) {
677 
678                 // read from source to EOF when current thread blocking in write
679                 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
680 
681                 // write to sink should block
682                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
683                 for (int i=0; i<1000; i++) {
684                     int n = sink.write(bb);
685                     assertTrue(n > 0);
686                     bb.clear();
687                 }
688                 sink.close();
689 
690                 // wait for reader to finish
691                 reader.join();
692             }
693         });
694     }
695 
696     /**
697      * Pipe.SourceChannel close while virtual thread blocked in read.
698      */
699     @Test
700     void testPipeReadAsyncClose() throws Exception {
701         VThreadRunner.run(() -> {
702             Pipe p = Pipe.open();
703             try (Pipe.SinkChannel sink = p.sink();
704                  Pipe.SourceChannel source = p.source()) {
705                 runAfterParkedAsync(source::close);
706                 try {
707                     int n = source.read(ByteBuffer.allocate(100));
708                     fail("read returned " + n);
709                 } catch (AsynchronousCloseException expected) { }
710             }
711         });
712     }
713 
714     /**
715      * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
716      */
717     @Test
718     void testPipeReadInterrupt() throws Exception {
719         VThreadRunner.run(() -> {
720             Pipe p = Pipe.open();
721             try (Pipe.SinkChannel sink = p.sink();
722                  Pipe.SourceChannel source = p.source()) {
723 
724                 // interrupt current thread when it blocks reading from source
725                 Thread thisThread = Thread.currentThread();
726                 runAfterParkedAsync(thisThread::interrupt);
727 
728                 try {
729                     int n = source.read(ByteBuffer.allocate(100));
730                     fail("read returned " + n);
731                 } catch (ClosedByInterruptException expected) {
732                     assertTrue(Thread.interrupted());
733                 }
734             }
735         });
736     }
737 
738     /**
739      * Pipe.SinkChannel close while virtual thread blocked in write.
740      */
741     @Test
742     void testPipeWriteAsyncClose() throws Exception {
743         VThreadRunner.run(() -> {
744             boolean done = false;
745             while (!done) {
746                 Pipe p = Pipe.open();
747                 try (Pipe.SinkChannel sink = p.sink();
748                      Pipe.SourceChannel source = p.source()) {
749 
750                     // close sink when current thread blocks in write
751                     runAfterParkedAsync(sink::close, true);
752 
753                     // write until channel is closed
754                     try {
755                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
756                         for (;;) {
757                             int n = sink.write(bb);
758                             assertTrue(n > 0);
759                             bb.clear();
760                         }
761                     } catch (AsynchronousCloseException e) {
762                         // closed when blocked in write
763                         done = true;
764                     } catch (ClosedChannelException e) {
765                         // closed but not blocked in write, need to retry test
766                         System.err.format("%s, need to retry!%n", e);
767                     }
768                 }
769             }
770         });
771     }
772 
773     /**
774      * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
775      */
776     @Test
777     void testPipeWriteInterrupt() throws Exception {
778         VThreadRunner.run(() -> {
779             boolean done = false;
780             while (!done) {
781                 Pipe p = Pipe.open();
782                 try (Pipe.SinkChannel sink = p.sink();
783                      Pipe.SourceChannel source = p.source()) {
784 
785                     // interrupt current thread when it blocks in write
786                     Thread thisThread = Thread.currentThread();
787                     runAfterParkedAsync(thisThread::interrupt, true);
788 
789                     // write until channel is closed
790                     try {
791                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
792                         for (;;) {
793                             int n = sink.write(bb);
794                             assertTrue(n > 0);
795                             bb.clear();
796                         }
797                     } catch (ClosedByInterruptException expected) {
798                         // closed when blocked in write
799                         assertTrue(Thread.interrupted());
800                         done = true;
801                     } catch (ClosedChannelException e) {
802                         // closed but not blocked in write, need to retry test
803                         System.err.format("%s, need to retry!%n", e);
804                     }
805                 }
806             }
807         });
808     }
809 
810     /**
811      * Creates a loopback connection
812      */
813     static class Connection implements Closeable {
814         private final SocketChannel sc1;
815         private final SocketChannel sc2;
816         Connection() throws IOException {
817             var lh = InetAddress.getLoopbackAddress();
818             try (var listener = ServerSocketChannel.open()) {
819                 listener.bind(new InetSocketAddress(lh, 0));
820                 SocketChannel sc1 = SocketChannel.open();
821                 SocketChannel sc2 = null;
822                 try {
823                     sc1.socket().connect(listener.getLocalAddress());
824                     sc2 = listener.accept();
825                 } catch (IOException ioe) {
826                     sc1.close();
827                     throw ioe;
828                 }
829                 this.sc1 = sc1;
830                 this.sc2 = sc2;
831             }
832         }
833         SocketChannel channel1() {
834             return sc1;
835         }
836         SocketChannel channel2() {
837             return sc2;
838         }
839         @Override
840         public void close() throws IOException {
841             sc1.close();
842             sc2.close();
843         }
844     }
845 
846     /**
847      * Read from a channel until all bytes have been read or an I/O error occurs.
848      */
849     static void readToEOF(ReadableByteChannel rbc) throws IOException {
850         ByteBuffer bb = ByteBuffer.allocate(16*1024);
851         int n;
852         while ((n = rbc.read(bb)) > 0) {
853             bb.clear();
854         }
855     }
856 
857     @FunctionalInterface
858     interface ThrowingRunnable {
859         void run() throws Exception;
860     }
861 
862     /**
863      * Runs the given task asynchronously after the current virtual thread parks.
864      * @param writing if the thread will block in write
865      * @return the thread started to run the task
866      */
867     private static Thread runAfterParkedAsync(ThrowingRunnable task, boolean writing) {
868         Thread target = Thread.currentThread();
869         if (!target.isVirtual())
870             throw new WrongThreadException();
871         return Thread.ofPlatform().daemon().start(() -> {
872             try {
873                 // wait for target thread to park
874                 while (!isWaiting(target)) {
875                     Thread.sleep(20);
876                 }
877 
878                 // if the target thread is parked in write then we nudge it a few times
879                 // to avoid wakeup with some bytes written
880                 if (writing) {
881                     for (int i = 0; i < 3; i++) {
882                         LockSupport.unpark(target);
883                         while (!isWaiting(target)) {
884                             Thread.sleep(20);
885                         }
886                     }
887                 }
888 
889                 task.run();
890 
891             } catch (Exception e) {
892                 e.printStackTrace();
893             }
894         });
895     }
896 
897     private static Thread runAfterParkedAsync(ThrowingRunnable task) {
898         return runAfterParkedAsync(task, false);
899     }
900 
901     /**
902      * Return true if the given Thread is parked.
903      */
904     private static boolean isWaiting(Thread target) {
905         Thread.State state = target.getState();
906         assertNotEquals(Thread.State.TERMINATED, state);
907         return (state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING);
908     }
909 }