1 /*
  2  * Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /**
 25  * @test id=default
 26  * @bug 8284161
 27  * @summary Test virtual threads doing blocking I/O on NIO channels
 28  * @library /test/lib
 29  * @run junit BlockingChannelOps
 30  */
 31 
 32 /**
 33  * @test id=poller-modes
 34  * @requires (os.family == "linux") | (os.family == "mac")
 35  * @library /test/lib
 36  * @run junit/othervm -Djdk.pollerMode=1 BlockingChannelOps
 37  * @run junit/othervm -Djdk.pollerMode=2 BlockingChannelOps
 38  * @run junit/othervm -Djdk.pollerMode=3 BlockingChannelOps
 39  */
 40 
 41 /**
 42  * @test id=no-vmcontinuations
 43  * @requires vm.continuations
 44  * @library /test/lib
 45  * @run junit/othervm -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
 46  */
 47 
 48 import java.io.Closeable;
 49 import java.io.IOException;
 50 import java.net.DatagramPacket;
 51 import java.net.InetAddress;
 52 import java.net.InetSocketAddress;
 53 import java.net.Socket;
 54 import java.net.SocketAddress;
 55 import java.net.SocketException;
 56 import java.nio.ByteBuffer;
 57 import java.nio.channels.AsynchronousCloseException;
 58 import java.nio.channels.ClosedByInterruptException;
 59 import java.nio.channels.ClosedChannelException;
 60 import java.nio.channels.DatagramChannel;
 61 import java.nio.channels.Pipe;
 62 import java.nio.channels.ReadableByteChannel;
 63 import java.nio.channels.ServerSocketChannel;
 64 import java.nio.channels.SocketChannel;
 65 import java.nio.channels.WritableByteChannel;
 66 import java.util.concurrent.ExecutorService;
 67 import java.util.concurrent.Executors;
 68 import java.util.stream.Stream;
 69 
 70 import jdk.test.lib.thread.VThreadRunner;
 71 import jdk.test.lib.thread.VThreadScheduler;
 72 
 73 import org.junit.jupiter.api.BeforeAll;
 74 import org.junit.jupiter.api.AfterAll;
 75 import org.junit.jupiter.params.ParameterizedTest;
 76 import org.junit.jupiter.params.provider.MethodSource;
 77 import static org.junit.jupiter.api.Assertions.*;
 78 
 79 class BlockingChannelOps {
 80     private static ExecutorService threadPool;
 81     private static Thread.VirtualThreadScheduler customScheduler;
 82 
 83     @BeforeAll
 84     static void setup() {
 85         threadPool = Executors.newCachedThreadPool();
 86         customScheduler = (_, task) -> threadPool.execute(task);
 87     }
 88 
 89     @AfterAll
 90     static void finish() {
 91         threadPool.shutdown();
 92     }
 93 
 94     /**
 95      * Returns the Thread.Builder to create the virtual thread.
 96      */
 97     static Stream<Thread.Builder.OfVirtual> threadBuilders() {
 98         if (VThreadScheduler.supportsCustomScheduler()) {
 99             return Stream.of(Thread.ofVirtual(), Thread.ofVirtual().scheduler(customScheduler));
100         } else {
101             return Stream.of(Thread.ofVirtual());
102         }
103     }
104 
105     /**
106      * SocketChannel read/write, no blocking.
107      */
108     @ParameterizedTest
109     @MethodSource("threadBuilders")
110     void testSocketChannelReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
111         VThreadRunner.run(builder, () -> {
112             try (var connection = new Connection()) {
113                 SocketChannel sc1 = connection.channel1();
114                 SocketChannel sc2 = connection.channel2();
115 
116                 // write to sc1
117                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
118                 int n = sc1.write(bb);
119                 assertTrue(n > 0);
120 
121                 // read from sc2 should not block
122                 bb = ByteBuffer.allocate(10);
123                 n = sc2.read(bb);
124                 assertTrue(n > 0);
125                 assertTrue(bb.get(0) == 'X');
126             }
127         });
128     }
129 
130     /**
131      * Virtual thread blocks in SocketChannel read.
132      */
133     @ParameterizedTest
134     @MethodSource("threadBuilders")
135     void testSocketChannelRead(Thread.Builder.OfVirtual builder) throws Exception {
136         VThreadRunner.run(builder, () -> {
137             try (var connection = new Connection()) {
138                 SocketChannel sc1 = connection.channel1();
139                 SocketChannel sc2 = connection.channel2();
140 
141                 // write to sc1 when current thread blocks in sc2.read
142                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
143                 runAfterParkedAsync(() -> sc1.write(bb1));
144 
145                 // read from sc2 should block
146                 ByteBuffer bb2 = ByteBuffer.allocate(10);
147                 int n = sc2.read(bb2);
148                 assertTrue(n > 0);
149                 assertTrue(bb2.get(0) == 'X');
150             }
151         });
152     }
153 
154     /**
155      * Virtual thread blocks in SocketChannel write.
156      */
157     @ParameterizedTest
158     @MethodSource("threadBuilders")
159     void testSocketChannelWrite(Thread.Builder.OfVirtual builder) throws Exception {
160         VThreadRunner.run(builder, () -> {
161             try (var connection = new Connection()) {
162                 SocketChannel sc1 = connection.channel1();
163                 SocketChannel sc2 = connection.channel2();
164 
165                 // read from sc2 to EOF when current thread blocks in sc1.write
166                 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
167 
168                 // write to sc1 should block
169                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
170                 for (int i=0; i<1000; i++) {
171                     int n = sc1.write(bb);
172                     assertTrue(n > 0);
173                     bb.clear();
174                 }
175                 sc1.close();
176 
177                 // wait for reader to finish
178                 reader.join();
179             }
180         });
181     }
182 
183     /**
184      * SocketChannel close while virtual thread blocked in read.
185      */
186     @ParameterizedTest
187     @MethodSource("threadBuilders")
188     void testSocketChannelReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
189         VThreadRunner.run(builder, () -> {
190             try (var connection = new Connection()) {
191                 SocketChannel sc = connection.channel1();
192                 runAfterParkedAsync(sc::close);
193                 try {
194                     int n = sc.read(ByteBuffer.allocate(100));
195                     fail("read returned " + n);
196                 } catch (AsynchronousCloseException expected) { }
197             }
198         });
199     }
200 
201     /**
202      * SocketChannel shutdownInput while virtual thread blocked in read.
203      */
204     @ParameterizedTest
205     @MethodSource("threadBuilders")
206     void testSocketChannelReadAsyncShutdownInput(Thread.Builder.OfVirtual builder) throws Exception {
207         VThreadRunner.run(builder, () -> {
208             try (var connection = new Connection()) {
209                 SocketChannel sc = connection.channel1();
210                 runAfterParkedAsync(sc::shutdownInput);
211                 int n = sc.read(ByteBuffer.allocate(100));
212                 assertEquals(-1, n);
213                 assertTrue(sc.isOpen());
214             }
215         });
216     }
217 
218     /**
219      * Virtual thread interrupted while blocked in SocketChannel read.
220      */
221     @ParameterizedTest
222     @MethodSource("threadBuilders")
223     void testSocketChannelReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
224         VThreadRunner.run(builder, () -> {
225             try (var connection = new Connection()) {
226                 SocketChannel sc = connection.channel1();
227 
228                 // interrupt current thread when it blocks in read
229                 Thread thisThread = Thread.currentThread();
230                 runAfterParkedAsync(thisThread::interrupt);
231 
232                 try {
233                     int n = sc.read(ByteBuffer.allocate(100));
234                     fail("read returned " + n);
235                 } catch (ClosedByInterruptException expected) {
236                     assertTrue(Thread.interrupted());
237                 }
238             }
239         });
240     }
241 
242     /**
243      * SocketChannel close while virtual thread blocked in write.
244      */
245     @ParameterizedTest
246     @MethodSource("threadBuilders")
247     void testSocketChannelWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
248         VThreadRunner.run(builder, () -> {
249             boolean retry = true;
250             while (retry) {
251                 try (var connection = new Connection()) {
252                     SocketChannel sc = connection.channel1();
253 
254                     // close sc when current thread blocks in write
255                     runAfterParkedAsync(sc::close);
256                     try {
257                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
258                         for (;;) {
259                             int n = sc.write(bb);
260                             assertTrue(n > 0);
261                             bb.clear();
262                         }
263                     } catch (AsynchronousCloseException expected) {
264                         // closed when blocked in write
265                         retry = false;
266                     } catch (ClosedChannelException e) {
267                         // closed when not blocked in write, need to retry test
268                     }
269                 }
270             }
271         });
272     }
273 
274 
275     /**
276      * SocketChannel shutdownOutput while virtual thread blocked in write.
277      */
278     @ParameterizedTest
279     @MethodSource("threadBuilders")
280     void testSocketChannelWriteAsyncShutdownOutput(Thread.Builder.OfVirtual builder) throws Exception {
281         VThreadRunner.run(builder, () -> {
282             try (var connection = new Connection()) {
283                 SocketChannel sc = connection.channel1();
284 
285                 // shutdown output when current thread blocks in write
286                 runAfterParkedAsync(sc::shutdownOutput);
287                 try {
288                     ByteBuffer bb = ByteBuffer.allocate(100*1024);
289                     for (;;) {
290                         int n = sc.write(bb);
291                         assertTrue(n > 0);
292                         bb.clear();
293                     }
294                 } catch (ClosedChannelException e) {
295                     // expected
296                 }
297                 assertTrue(sc.isOpen());
298             }
299         });
300     }
301 
302     /**
303      * Virtual thread interrupted while blocked in SocketChannel write.
304      */
305     @ParameterizedTest
306     @MethodSource("threadBuilders")
307     void testSocketChannelWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
308         VThreadRunner.run(builder, () -> {
309             boolean retry = true;
310             while (retry) {
311                 try (var connection = new Connection()) {
312                     SocketChannel sc = connection.channel1();
313 
314                     // interrupt current thread when it blocks in write
315                     Thread thisThread = Thread.currentThread();
316                     runAfterParkedAsync(thisThread::interrupt);
317 
318                     try {
319                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
320                         for (;;) {
321                             int n = sc.write(bb);
322                             assertTrue(n > 0);
323                             bb.clear();
324                         }
325                     } catch (ClosedByInterruptException e) {
326                         // closed when blocked in write
327                         assertTrue(Thread.interrupted());
328                         retry = false;
329                     } catch (ClosedChannelException e) {
330                         // closed when not blocked in write, need to retry test
331                     }
332                 }
333             }
334         });
335     }
336 
337     /**
338      * Virtual thread blocks in SocketChannel adaptor read.
339      */
340     @ParameterizedTest
341     @MethodSource("threadBuilders")
342     void testSocketAdaptorRead1(Thread.Builder.OfVirtual builder) throws Exception {
343         testSocketAdaptorRead(builder, 0);
344     }
345 
346     /**
347      * Virtual thread blocks in SocketChannel adaptor read with timeout.
348      */
349     @ParameterizedTest
350     @MethodSource("threadBuilders")
351     void testSocketAdaptorRead2(Thread.Builder.OfVirtual builder) throws Exception {
352         testSocketAdaptorRead(builder, 60_000);
353     }
354 
355     private void testSocketAdaptorRead(Thread.Builder.OfVirtual builder,
356                                        int timeout) throws Exception {
357         VThreadRunner.run(builder, () -> {
358             try (var connection = new Connection()) {
359                 SocketChannel sc1 = connection.channel1();
360                 SocketChannel sc2 = connection.channel2();
361 
362                 // write to sc1 when currnet thread blocks reading from sc2
363                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
364                 runAfterParkedAsync(() -> sc1.write(bb));
365 
366                 // read from sc2 should block
367                 byte[] array = new byte[100];
368                 if (timeout > 0)
369                     sc2.socket().setSoTimeout(timeout);
370                 int n = sc2.socket().getInputStream().read(array);
371                 assertTrue(n > 0);
372                 assertTrue(array[0] == 'X');
373             }
374         });
375     }
376 
377     /**
378      * ServerSocketChannel accept, no blocking.
379      */
380     @ParameterizedTest
381     @MethodSource("threadBuilders")
382     void testServerSocketChannelAccept1(Thread.Builder.OfVirtual builder) throws Exception {
383         VThreadRunner.run(builder, () -> {
384             try (var ssc = ServerSocketChannel.open()) {
385                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
386                 var sc1 = SocketChannel.open(ssc.getLocalAddress());
387                 // accept should not block
388                 var sc2 = ssc.accept();
389                 sc1.close();
390                 sc2.close();
391             }
392         });
393     }
394 
395     /**
396      * Virtual thread blocks in ServerSocketChannel accept.
397      */
398     @ParameterizedTest
399     @MethodSource("threadBuilders")
400     void testServerSocketChannelAccept2(Thread.Builder.OfVirtual builder) throws Exception {
401         VThreadRunner.run(builder, () -> {
402             try (var ssc = ServerSocketChannel.open()) {
403                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
404                 var sc1 = SocketChannel.open();
405 
406                 // connect when current thread when it blocks in accept
407                 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
408 
409                 // accept should block
410                 var sc2 = ssc.accept();
411                 sc1.close();
412                 sc2.close();
413             }
414         });
415     }
416 
417     /**
418      * SeverSocketChannel close while virtual thread blocked in accept.
419      */
420     @ParameterizedTest
421     @MethodSource("threadBuilders")
422     void testServerSocketChannelAcceptAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
423         VThreadRunner.run(builder, () -> {
424             try (var ssc = ServerSocketChannel.open()) {
425                 InetAddress lh = InetAddress.getLoopbackAddress();
426                 ssc.bind(new InetSocketAddress(lh, 0));
427                 runAfterParkedAsync(ssc::close);
428                 try {
429                     SocketChannel sc = ssc.accept();
430                     sc.close();
431                     fail("connection accepted???");
432                 } catch (AsynchronousCloseException expected) { }
433             }
434         });
435     }
436 
437     /**
438      * Virtual thread interrupted while blocked in ServerSocketChannel accept.
439      */
440     @ParameterizedTest
441     @MethodSource("threadBuilders")
442     void testServerSocketChannelAcceptInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
443         VThreadRunner.run(builder, () -> {
444             try (var ssc = ServerSocketChannel.open()) {
445                 InetAddress lh = InetAddress.getLoopbackAddress();
446                 ssc.bind(new InetSocketAddress(lh, 0));
447 
448                 // interrupt current thread when it blocks in accept
449                 Thread thisThread = Thread.currentThread();
450                 runAfterParkedAsync(thisThread::interrupt);
451 
452                 try {
453                     SocketChannel sc = ssc.accept();
454                     sc.close();
455                     fail("connection accepted???");
456                 } catch (ClosedByInterruptException expected) {
457                     assertTrue(Thread.interrupted());
458                 }
459             }
460         });
461     }
462 
463     /**
464      * Virtual thread blocks in ServerSocketChannel adaptor accept.
465      */
466     @ParameterizedTest
467     @MethodSource("threadBuilders")
468     void testSocketChannelAdaptorAccept1(Thread.Builder.OfVirtual builder) throws Exception {
469         testSocketChannelAdaptorAccept(builder, 0);
470     }
471 
472     /**
473      * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
474      */
475     @ParameterizedTest
476     @MethodSource("threadBuilders")
477     void testSocketChannelAdaptorAccept2(Thread.Builder.OfVirtual builder) throws Exception {
478         testSocketChannelAdaptorAccept(builder, 60_000);
479     }
480 
481     private void testSocketChannelAdaptorAccept(Thread.Builder.OfVirtual builder,
482                                                 int timeout) throws Exception {
483         VThreadRunner.run(builder, () -> {
484             try (var ssc = ServerSocketChannel.open()) {
485                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
486                 var sc = SocketChannel.open();
487 
488                 // interrupt current thread when it blocks in accept
489                 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
490 
491                 // accept should block
492                 if (timeout > 0)
493                     ssc.socket().setSoTimeout(timeout);
494                 Socket s = ssc.socket().accept();
495                 sc.close();
496                 s.close();
497             }
498         });
499     }
500 
501     /**
502      * DatagramChannel receive/send, no blocking.
503      */
504     @ParameterizedTest
505     @MethodSource("threadBuilders")
506     void testDatagramChannelSendReceive1(Thread.Builder.OfVirtual builder) throws Exception {
507         VThreadRunner.run(builder, () -> {
508             try (DatagramChannel dc1 = DatagramChannel.open();
509                  DatagramChannel dc2 = DatagramChannel.open()) {
510 
511                 InetAddress lh = InetAddress.getLoopbackAddress();
512                 dc2.bind(new InetSocketAddress(lh, 0));
513 
514                 // send should not block
515                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
516                 int n = dc1.send(bb, dc2.getLocalAddress());
517                 assertTrue(n > 0);
518 
519                 // receive should not block
520                 bb = ByteBuffer.allocate(10);
521                 dc2.receive(bb);
522                 assertTrue(bb.get(0) == 'X');
523             }
524         });
525     }
526 
527     /**
528      * Virtual thread blocks in DatagramChannel receive.
529      */
530     @ParameterizedTest
531     @MethodSource("threadBuilders")
532     void testDatagramChannelSendReceive2(Thread.Builder.OfVirtual builder) throws Exception {
533         VThreadRunner.run(builder, () -> {
534             try (DatagramChannel dc1 = DatagramChannel.open();
535                  DatagramChannel dc2 = DatagramChannel.open()) {
536 
537                 InetAddress lh = InetAddress.getLoopbackAddress();
538                 dc2.bind(new InetSocketAddress(lh, 0));
539 
540                 // send from dc1 when current thread blocked in dc2.receive
541                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
542                 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
543 
544                 // read from dc2 should block
545                 ByteBuffer bb2 = ByteBuffer.allocate(10);
546                 dc2.receive(bb2);
547                 assertTrue(bb2.get(0) == 'X');
548             }
549         });
550     }
551 
552     /**
553      * DatagramChannel close while virtual thread blocked in receive.
554      */
555     @ParameterizedTest
556     @MethodSource("threadBuilders")
557     void testDatagramChannelReceiveAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
558         VThreadRunner.run(builder, () -> {
559             try (DatagramChannel dc = DatagramChannel.open()) {
560                 InetAddress lh = InetAddress.getLoopbackAddress();
561                 dc.bind(new InetSocketAddress(lh, 0));
562                 runAfterParkedAsync(dc::close);
563                 try {
564                     dc.receive(ByteBuffer.allocate(100));
565                     fail("receive returned");
566                 } catch (AsynchronousCloseException expected) { }
567             }
568         });
569     }
570 
571     /**
572      * Virtual thread interrupted while blocked in DatagramChannel receive.
573      */
574     @ParameterizedTest
575     @MethodSource("threadBuilders")
576     void testDatagramChannelReceiveInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
577         VThreadRunner.run(builder, () -> {
578             try (DatagramChannel dc = DatagramChannel.open()) {
579                 InetAddress lh = InetAddress.getLoopbackAddress();
580                 dc.bind(new InetSocketAddress(lh, 0));
581 
582                 // interrupt current thread when it blocks in receive
583                 Thread thisThread = Thread.currentThread();
584                 runAfterParkedAsync(thisThread::interrupt);
585 
586                 try {
587                     dc.receive(ByteBuffer.allocate(100));
588                     fail("receive returned");
589                 } catch (ClosedByInterruptException expected) {
590                     assertTrue(Thread.interrupted());
591                 }
592             }
593         });
594     }
595 
596     /**
597      * Virtual thread blocks in DatagramSocket adaptor receive.
598      */
599     @ParameterizedTest
600     @MethodSource("threadBuilders")
601     void testDatagramSocketAdaptorReceive1(Thread.Builder.OfVirtual builder) throws Exception {
602         testDatagramSocketAdaptorReceive(builder, 0);
603     }
604 
605     /**
606      * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
607      */
608     @ParameterizedTest
609     @MethodSource("threadBuilders")
610     void testDatagramSocketAdaptorReceive2(Thread.Builder.OfVirtual builder) throws Exception {
611         testDatagramSocketAdaptorReceive(builder, 60_000);
612     }
613 
614     private void testDatagramSocketAdaptorReceive(Thread.Builder.OfVirtual builder,
615                                                   int timeout) throws Exception {
616         VThreadRunner.run(builder, () -> {
617             try (DatagramChannel dc1 = DatagramChannel.open();
618                  DatagramChannel dc2 = DatagramChannel.open()) {
619 
620                 InetAddress lh = InetAddress.getLoopbackAddress();
621                 dc2.bind(new InetSocketAddress(lh, 0));
622 
623                 // send from dc1 when current thread blocks in dc2 receive
624                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
625                 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
626 
627                 // receive should block
628                 byte[] array = new byte[100];
629                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
630                 if (timeout > 0)
631                     dc2.socket().setSoTimeout(timeout);
632                 dc2.socket().receive(p);
633                 assertTrue(p.getLength() == 3 && array[0] == 'X');
634             }
635         });
636     }
637 
638     /**
639      * DatagramChannel close while virtual thread blocked in adaptor receive.
640      */
641     @ParameterizedTest
642     @MethodSource("threadBuilders")
643     void testDatagramSocketAdaptorReceiveAsyncClose1(Thread.Builder.OfVirtual builder) throws Exception {
644         testDatagramSocketAdaptorReceiveAsyncClose(builder, 0);
645     }
646 
647     /**
648      * DatagramChannel close while virtual thread blocked in adaptor receive
649      * with timeout.
650      */
651     @ParameterizedTest
652     @MethodSource("threadBuilders")
653     void testDatagramSocketAdaptorReceiveAsyncClose2(Thread.Builder.OfVirtual builder) throws Exception {
654         testDatagramSocketAdaptorReceiveAsyncClose(builder, 60_1000);
655     }
656 
657     private void testDatagramSocketAdaptorReceiveAsyncClose(Thread.Builder.OfVirtual builder,
658                                                             int timeout) throws Exception {
659         VThreadRunner.run(builder, () -> {
660             try (DatagramChannel dc = DatagramChannel.open()) {
661                 InetAddress lh = InetAddress.getLoopbackAddress();
662                 dc.bind(new InetSocketAddress(lh, 0));
663 
664                 byte[] array = new byte[100];
665                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
666                 if (timeout > 0)
667                     dc.socket().setSoTimeout(timeout);
668 
669                 // close channel/socket when current thread blocks in receive
670                 runAfterParkedAsync(dc::close);
671 
672                 assertThrows(SocketException.class, () -> dc.socket().receive(p));
673             }
674         });
675     }
676 
677     /**
678      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
679      */
680     @ParameterizedTest
681     @MethodSource("threadBuilders")
682     void testDatagramSocketAdaptorReceiveInterrupt1(Thread.Builder.OfVirtual builder) throws Exception {
683         testDatagramSocketAdaptorReceiveInterrupt(builder, 0);
684     }
685 
686     /**
687      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
688      * with timeout.
689      */
690     @ParameterizedTest
691     @MethodSource("threadBuilders")
692     void testDatagramSocketAdaptorReceiveInterrupt2(Thread.Builder.OfVirtual builder) throws Exception {
693         testDatagramSocketAdaptorReceiveInterrupt(builder, 60_1000);
694     }
695 
696     private void testDatagramSocketAdaptorReceiveInterrupt(Thread.Builder.OfVirtual builder,
697                                                            int timeout) throws Exception {
698         VThreadRunner.run(builder, () -> {
699             try (DatagramChannel dc = DatagramChannel.open()) {
700                 InetAddress lh = InetAddress.getLoopbackAddress();
701                 dc.bind(new InetSocketAddress(lh, 0));
702 
703                 byte[] array = new byte[100];
704                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
705                 if (timeout > 0)
706                     dc.socket().setSoTimeout(timeout);
707 
708                 // interrupt current thread when it blocks in receive
709                 Thread thisThread = Thread.currentThread();
710                 runAfterParkedAsync(thisThread::interrupt);
711 
712                 try {
713                     dc.socket().receive(p);
714                     fail();
715                 } catch (ClosedByInterruptException expected) {
716                     assertTrue(Thread.interrupted());
717                 }
718             }
719         });
720     }
721 
722     /**
723      * Pipe read/write, no blocking.
724      */
725     @ParameterizedTest
726     @MethodSource("threadBuilders")
727     void testPipeReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
728         VThreadRunner.run(builder, () -> {
729             Pipe p = Pipe.open();
730             try (Pipe.SinkChannel sink = p.sink();
731                  Pipe.SourceChannel source = p.source()) {
732 
733                 // write should not block
734                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
735                 int n = sink.write(bb);
736                 assertTrue(n > 0);
737 
738                 // read should not block
739                 bb = ByteBuffer.allocate(10);
740                 n = source.read(bb);
741                 assertTrue(n > 0);
742                 assertTrue(bb.get(0) == 'X');
743             }
744         });
745     }
746 
747     /**
748      * Virtual thread blocks in Pipe.SourceChannel read.
749      */
750     @ParameterizedTest
751     @MethodSource("threadBuilders")
752     void testPipeReadWrite2(Thread.Builder.OfVirtual builder) throws Exception {
753         VThreadRunner.run(builder, () -> {
754             Pipe p = Pipe.open();
755             try (Pipe.SinkChannel sink = p.sink();
756                  Pipe.SourceChannel source = p.source()) {
757 
758                 // write from sink when current thread blocks reading from source
759                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
760                 runAfterParkedAsync(() -> sink.write(bb1));
761 
762                 // read should block
763                 ByteBuffer bb2 = ByteBuffer.allocate(10);
764                 int n = source.read(bb2);
765                 assertTrue(n > 0);
766                 assertTrue(bb2.get(0) == 'X');
767             }
768         });
769     }
770 
771     /**
772      * Virtual thread blocks in Pipe.SinkChannel write.
773      */
774     @ParameterizedTest
775     @MethodSource("threadBuilders")
776     void testPipeReadWrite3(Thread.Builder.OfVirtual builder) throws Exception {
777         VThreadRunner.run(builder, () -> {
778             Pipe p = Pipe.open();
779             try (Pipe.SinkChannel sink = p.sink();
780                  Pipe.SourceChannel source = p.source()) {
781 
782                 // read from source to EOF when current thread blocking in write
783                 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
784 
785                 // write to sink should block
786                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
787                 for (int i=0; i<1000; i++) {
788                     int n = sink.write(bb);
789                     assertTrue(n > 0);
790                     bb.clear();
791                 }
792                 sink.close();
793 
794                 // wait for reader to finish
795                 reader.join();
796             }
797         });
798     }
799 
800     /**
801      * Pipe.SourceChannel close while virtual thread blocked in read.
802      */
803     @ParameterizedTest
804     @MethodSource("threadBuilders")
805     void testPipeReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
806         VThreadRunner.run(builder, () -> {
807             Pipe p = Pipe.open();
808             try (Pipe.SinkChannel sink = p.sink();
809                  Pipe.SourceChannel source = p.source()) {
810                 runAfterParkedAsync(source::close);
811                 try {
812                     int n = source.read(ByteBuffer.allocate(100));
813                     fail("read returned " + n);
814                 } catch (AsynchronousCloseException expected) { }
815             }
816         });
817     }
818 
819     /**
820      * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
821      */
822     @ParameterizedTest
823     @MethodSource("threadBuilders")
824     void testPipeReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
825         VThreadRunner.run(builder, () -> {
826             Pipe p = Pipe.open();
827             try (Pipe.SinkChannel sink = p.sink();
828                  Pipe.SourceChannel source = p.source()) {
829 
830                 // interrupt current thread when it blocks reading from source
831                 Thread thisThread = Thread.currentThread();
832                 runAfterParkedAsync(thisThread::interrupt);
833 
834                 try {
835                     int n = source.read(ByteBuffer.allocate(100));
836                     fail("read returned " + n);
837                 } catch (ClosedByInterruptException expected) {
838                     assertTrue(Thread.interrupted());
839                 }
840             }
841         });
842     }
843 
844     /**
845      * Pipe.SinkChannel close while virtual thread blocked in write.
846      */
847     @ParameterizedTest
848     @MethodSource("threadBuilders")
849     void testPipeWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
850         VThreadRunner.run(builder, () -> {
851             boolean retry = true;
852             while (retry) {
853                 Pipe p = Pipe.open();
854                 try (Pipe.SinkChannel sink = p.sink();
855                      Pipe.SourceChannel source = p.source()) {
856 
857                     // close sink when current thread blocks in write
858                     runAfterParkedAsync(sink::close);
859                     try {
860                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
861                         for (;;) {
862                             int n = sink.write(bb);
863                             assertTrue(n > 0);
864                             bb.clear();
865                         }
866                     } catch (AsynchronousCloseException e) {
867                         // closed when blocked in write
868                         retry = false;
869                     } catch (ClosedChannelException e) {
870                         // closed when not blocked in write, need to retry test
871                     }
872                 }
873             }
874         });
875     }
876 
877     /**
878      * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
879      */
880     @ParameterizedTest
881     @MethodSource("threadBuilders")
882     void testPipeWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
883         VThreadRunner.run(builder, () -> {
884             boolean retry = true;
885             while (retry) {
886                 Pipe p = Pipe.open();
887                 try (Pipe.SinkChannel sink = p.sink();
888                      Pipe.SourceChannel source = p.source()) {
889 
890                     // interrupt current thread when it blocks in write
891                     Thread thisThread = Thread.currentThread();
892                     runAfterParkedAsync(thisThread::interrupt);
893 
894                     try {
895                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
896                         for (;;) {
897                             int n = sink.write(bb);
898                             assertTrue(n > 0);
899                             bb.clear();
900                         }
901                     } catch (ClosedByInterruptException expected) {
902                         // closed when blocked in write
903                         assertTrue(Thread.interrupted());
904                         retry = false;
905                     } catch (ClosedChannelException e) {
906                         // closed when not blocked in write, need to retry test
907                     }
908                 }
909             }
910         });
911     }
912 
913     /**
914      * Creates a loopback connection
915      */
916     static class Connection implements Closeable {
917         private final SocketChannel sc1;
918         private final SocketChannel sc2;
919         Connection() throws IOException {
920             var lh = InetAddress.getLoopbackAddress();
921             try (var listener = ServerSocketChannel.open()) {
922                 listener.bind(new InetSocketAddress(lh, 0));
923                 SocketChannel sc1 = SocketChannel.open();
924                 SocketChannel sc2 = null;
925                 try {
926                     sc1.socket().connect(listener.getLocalAddress());
927                     sc2 = listener.accept();
928                 } catch (IOException ioe) {
929                     sc1.close();
930                     throw ioe;
931                 }
932                 this.sc1 = sc1;
933                 this.sc2 = sc2;
934             }
935         }
936         SocketChannel channel1() {
937             return sc1;
938         }
939         SocketChannel channel2() {
940             return sc2;
941         }
942         @Override
943         public void close() throws IOException {
944             sc1.close();
945             sc2.close();
946         }
947     }
948 
949     /**
950      * Read from a channel until all bytes have been read or an I/O error occurs.
951      */
952     static void readToEOF(ReadableByteChannel rbc) throws IOException {
953         ByteBuffer bb = ByteBuffer.allocate(16*1024);
954         int n;
955         while ((n = rbc.read(bb)) > 0) {
956             bb.clear();
957         }
958     }
959 
960     @FunctionalInterface
961     interface ThrowingRunnable {
962         void run() throws Exception;
963     }
964 
965     /**
966      * Runs the given task asynchronously after the current virtual thread has parked.
967      * @return the thread started to run the task
968      */
969     static Thread runAfterParkedAsync(ThrowingRunnable task) {
970         Thread target = Thread.currentThread();
971         if (!target.isVirtual())
972             throw new WrongThreadException();
973         return Thread.ofPlatform().daemon().start(() -> {
974             try {
975                 Thread.State state = target.getState();
976                 while (state != Thread.State.WAITING
977                         && state != Thread.State.TIMED_WAITING) {
978                     Thread.sleep(20);
979                     state = target.getState();
980                 }
981                 Thread.sleep(20);  // give a bit more time to release carrier
982                 task.run();
983             } catch (Exception e) {
984                 e.printStackTrace();
985             }
986         });
987     }
988 }
--- EOF ---