< prev index next >

test/jdk/java/nio/channels/vthread/BlockingChannelOps.java

Print this page

  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /**
 25  * @test id=default
 26  * @bug 8284161
 27  * @summary Test virtual threads doing blocking I/O on NIO channels
 28  * @library /test/lib
 29  * @run junit BlockingChannelOps
 30  */
 31 
 32 /**
 33  * @test id=poller-modes
 34  * @requires (os.family == "linux") | (os.family == "mac")
 35  * @library /test/lib
 36  * @run junit/othervm -Djdk.pollerMode=1 BlockingChannelOps
 37  * @run junit/othervm -Djdk.pollerMode=2 BlockingChannelOps

 38  */
 39 
 40 /**









 41  * @test id=no-vmcontinuations
 42  * @requires vm.continuations
 43  * @library /test/lib
 44  * @run junit/othervm -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
 45  */
 46 
 47 import java.io.Closeable;
 48 import java.io.IOException;
 49 import java.net.DatagramPacket;
 50 import java.net.InetAddress;
 51 import java.net.InetSocketAddress;
 52 import java.net.Socket;
 53 import java.net.SocketAddress;
 54 import java.net.SocketException;
 55 import java.nio.ByteBuffer;
 56 import java.nio.channels.AsynchronousCloseException;
 57 import java.nio.channels.ClosedByInterruptException;
 58 import java.nio.channels.ClosedChannelException;
 59 import java.nio.channels.DatagramChannel;
 60 import java.nio.channels.Pipe;
 61 import java.nio.channels.ReadableByteChannel;
 62 import java.nio.channels.ServerSocketChannel;
 63 import java.nio.channels.SocketChannel;
 64 import java.nio.channels.WritableByteChannel;




 65 
 66 import jdk.test.lib.thread.VThreadRunner;
 67 import org.junit.jupiter.api.Test;




 68 import static org.junit.jupiter.api.Assertions.*;
 69 
 70 class BlockingChannelOps {




















 71 
 72     /**
 73      * SocketChannel read/write, no blocking.
 74      */
 75     @Test
 76     void testSocketChannelReadWrite1() throws Exception {
 77         VThreadRunner.run(() -> {

 78             try (var connection = new Connection()) {
 79                 SocketChannel sc1 = connection.channel1();
 80                 SocketChannel sc2 = connection.channel2();
 81 
 82                 // write to sc1
 83                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 84                 int n = sc1.write(bb);
 85                 assertTrue(n > 0);
 86 
 87                 // read from sc2 should not block
 88                 bb = ByteBuffer.allocate(10);
 89                 n = sc2.read(bb);
 90                 assertTrue(n > 0);
 91                 assertTrue(bb.get(0) == 'X');
 92             }
 93         });
 94     }
 95 
 96     /**
 97      * Virtual thread blocks in SocketChannel read.
 98      */
 99     @Test
100     void testSocketChannelRead() throws Exception {
101         VThreadRunner.run(() -> {

102             try (var connection = new Connection()) {
103                 SocketChannel sc1 = connection.channel1();
104                 SocketChannel sc2 = connection.channel2();
105 
106                 // write to sc1 when current thread blocks in sc2.read
107                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
108                 runAfterParkedAsync(() -> sc1.write(bb1));
109 
110                 // read from sc2 should block
111                 ByteBuffer bb2 = ByteBuffer.allocate(10);
112                 int n = sc2.read(bb2);
113                 assertTrue(n > 0);
114                 assertTrue(bb2.get(0) == 'X');
115             }
116         });
117     }
118 
119     /**
120      * Virtual thread blocks in SocketChannel write.
121      */
122     @Test
123     void testSocketChannelWrite() throws Exception {
124         VThreadRunner.run(() -> {

125             try (var connection = new Connection()) {
126                 SocketChannel sc1 = connection.channel1();
127                 SocketChannel sc2 = connection.channel2();
128 
129                 // read from sc2 to EOF when current thread blocks in sc1.write
130                 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
131 
132                 // write to sc1 should block
133                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
134                 for (int i=0; i<1000; i++) {
135                     int n = sc1.write(bb);
136                     assertTrue(n > 0);
137                     bb.clear();
138                 }
139                 sc1.close();
140 
141                 // wait for reader to finish
142                 reader.join();
143             }
144         });
145     }
146 
147     /**
148      * SocketChannel close while virtual thread blocked in read.
149      */
150     @Test
151     void testSocketChannelReadAsyncClose() throws Exception {
152         VThreadRunner.run(() -> {

153             try (var connection = new Connection()) {
154                 SocketChannel sc = connection.channel1();
155                 runAfterParkedAsync(sc::close);
156                 try {
157                     int n = sc.read(ByteBuffer.allocate(100));
158                     fail("read returned " + n);
159                 } catch (AsynchronousCloseException expected) { }
160             }
161         });
162     }
163 

















164     /**
165      * Virtual thread interrupted while blocked in SocketChannel read.
166      */
167     @Test
168     void testSocketChannelReadInterrupt() throws Exception {
169         VThreadRunner.run(() -> {

170             try (var connection = new Connection()) {
171                 SocketChannel sc = connection.channel1();
172 
173                 // interrupt current thread when it blocks in read
174                 Thread thisThread = Thread.currentThread();
175                 runAfterParkedAsync(thisThread::interrupt);
176 
177                 try {
178                     int n = sc.read(ByteBuffer.allocate(100));
179                     fail("read returned " + n);
180                 } catch (ClosedByInterruptException expected) {
181                     assertTrue(Thread.interrupted());
182                 }
183             }
184         });
185     }
186 
187     /**
188      * SocketChannel close while virtual thread blocked in write.
189      */
190     @Test
191     void testSocketChannelWriteAsyncClose() throws Exception {
192         VThreadRunner.run(() -> {

193             boolean retry = true;
194             while (retry) {
195                 try (var connection = new Connection()) {
196                     SocketChannel sc = connection.channel1();
197 
198                     // close sc when current thread blocks in write
199                     runAfterParkedAsync(sc::close);
200                     try {
201                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
202                         for (;;) {
203                             int n = sc.write(bb);
204                             assertTrue(n > 0);
205                             bb.clear();
206                         }
207                     } catch (AsynchronousCloseException expected) {
208                         // closed when blocked in write
209                         retry = false;
210                     } catch (ClosedChannelException e) {
211                         // closed when not blocked in write, need to retry test
212                     }
213                 }
214             }
215         });
216     }
217 




























218     /**
219      * Virtual thread interrupted while blocked in SocketChannel write.
220      */
221     @Test
222     void testSocketChannelWriteInterrupt() throws Exception {
223         VThreadRunner.run(() -> {

224             boolean retry = true;
225             while (retry) {
226                 try (var connection = new Connection()) {
227                     SocketChannel sc = connection.channel1();
228 
229                     // interrupt current thread when it blocks in write
230                     Thread thisThread = Thread.currentThread();
231                     runAfterParkedAsync(thisThread::interrupt);
232 
233                     try {
234                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
235                         for (;;) {
236                             int n = sc.write(bb);
237                             assertTrue(n > 0);
238                             bb.clear();
239                         }
240                     } catch (ClosedByInterruptException e) {
241                         // closed when blocked in write
242                         assertTrue(Thread.interrupted());
243                         retry = false;
244                     } catch (ClosedChannelException e) {
245                         // closed when not blocked in write, need to retry test
246                     }
247                 }
248             }
249         });
250     }
251 
252     /**
253      * Virtual thread blocks in SocketChannel adaptor read.
254      */
255     @Test
256     void testSocketAdaptorRead1() throws Exception {
257         testSocketAdaptorRead(0);

258     }
259 
260     /**
261      * Virtual thread blocks in SocketChannel adaptor read with timeout.
262      */
263     @Test
264     void testSocketAdaptorRead2() throws Exception {
265         testSocketAdaptorRead(60_000);

266     }
267 
268     private void testSocketAdaptorRead(int timeout) throws Exception {
269         VThreadRunner.run(() -> {

270             try (var connection = new Connection()) {
271                 SocketChannel sc1 = connection.channel1();
272                 SocketChannel sc2 = connection.channel2();
273 
274                 // write to sc1 when currnet thread blocks reading from sc2
275                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
276                 runAfterParkedAsync(() -> sc1.write(bb));
277 
278                 // read from sc2 should block
279                 byte[] array = new byte[100];
280                 if (timeout > 0)
281                     sc2.socket().setSoTimeout(timeout);
282                 int n = sc2.socket().getInputStream().read(array);
283                 assertTrue(n > 0);
284                 assertTrue(array[0] == 'X');
285             }
286         });
287     }
288 
289     /**
290      * ServerSocketChannel accept, no blocking.
291      */
292     @Test
293     void testServerSocketChannelAccept1() throws Exception {
294         VThreadRunner.run(() -> {

295             try (var ssc = ServerSocketChannel.open()) {
296                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
297                 var sc1 = SocketChannel.open(ssc.getLocalAddress());
298                 // accept should not block
299                 var sc2 = ssc.accept();
300                 sc1.close();
301                 sc2.close();
302             }
303         });
304     }
305 
306     /**
307      * Virtual thread blocks in ServerSocketChannel accept.
308      */
309     @Test
310     void testServerSocketChannelAccept2() throws Exception {
311         VThreadRunner.run(() -> {

312             try (var ssc = ServerSocketChannel.open()) {
313                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
314                 var sc1 = SocketChannel.open();
315 
316                 // connect when current thread when it blocks in accept
317                 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
318 
319                 // accept should block
320                 var sc2 = ssc.accept();
321                 sc1.close();
322                 sc2.close();
323             }
324         });
325     }
326 
327     /**
328      * SeverSocketChannel close while virtual thread blocked in accept.
329      */
330     @Test
331     void testServerSocketChannelAcceptAsyncClose() throws Exception {
332         VThreadRunner.run(() -> {

333             try (var ssc = ServerSocketChannel.open()) {
334                 InetAddress lh = InetAddress.getLoopbackAddress();
335                 ssc.bind(new InetSocketAddress(lh, 0));
336                 runAfterParkedAsync(ssc::close);
337                 try {
338                     SocketChannel sc = ssc.accept();
339                     sc.close();
340                     fail("connection accepted???");
341                 } catch (AsynchronousCloseException expected) { }
342             }
343         });
344     }
345 
346     /**
347      * Virtual thread interrupted while blocked in ServerSocketChannel accept.
348      */
349     @Test
350     void testServerSocketChannelAcceptInterrupt() throws Exception {
351         VThreadRunner.run(() -> {

352             try (var ssc = ServerSocketChannel.open()) {
353                 InetAddress lh = InetAddress.getLoopbackAddress();
354                 ssc.bind(new InetSocketAddress(lh, 0));
355 
356                 // interrupt current thread when it blocks in accept
357                 Thread thisThread = Thread.currentThread();
358                 runAfterParkedAsync(thisThread::interrupt);
359 
360                 try {
361                     SocketChannel sc = ssc.accept();
362                     sc.close();
363                     fail("connection accepted???");
364                 } catch (ClosedByInterruptException expected) {
365                     assertTrue(Thread.interrupted());
366                 }
367             }
368         });
369     }
370 
371     /**
372      * Virtual thread blocks in ServerSocketChannel adaptor accept.
373      */
374     @Test
375     void testSocketChannelAdaptorAccept1() throws Exception {
376         testSocketChannelAdaptorAccept(0);

377     }
378 
379     /**
380      * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
381      */
382     @Test
383     void testSocketChannelAdaptorAccept2() throws Exception {
384         testSocketChannelAdaptorAccept(60_000);

385     }
386 
387     private void testSocketChannelAdaptorAccept(int timeout) throws Exception {
388         VThreadRunner.run(() -> {

389             try (var ssc = ServerSocketChannel.open()) {
390                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
391                 var sc = SocketChannel.open();
392 
393                 // interrupt current thread when it blocks in accept
394                 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
395 
396                 // accept should block
397                 if (timeout > 0)
398                     ssc.socket().setSoTimeout(timeout);
399                 Socket s = ssc.socket().accept();
400                 sc.close();
401                 s.close();
402             }
403         });
404     }
405 
406     /**
407      * DatagramChannel receive/send, no blocking.
408      */
409     @Test
410     void testDatagramChannelSendReceive1() throws Exception {
411         VThreadRunner.run(() -> {

412             try (DatagramChannel dc1 = DatagramChannel.open();
413                  DatagramChannel dc2 = DatagramChannel.open()) {
414 
415                 InetAddress lh = InetAddress.getLoopbackAddress();
416                 dc2.bind(new InetSocketAddress(lh, 0));
417 
418                 // send should not block
419                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
420                 int n = dc1.send(bb, dc2.getLocalAddress());
421                 assertTrue(n > 0);
422 
423                 // receive should not block
424                 bb = ByteBuffer.allocate(10);
425                 dc2.receive(bb);
426                 assertTrue(bb.get(0) == 'X');
427             }
428         });
429     }
430 
431     /**
432      * Virtual thread blocks in DatagramChannel receive.
433      */
434     @Test
435     void testDatagramChannelSendReceive2() throws Exception {
436         VThreadRunner.run(() -> {

437             try (DatagramChannel dc1 = DatagramChannel.open();
438                  DatagramChannel dc2 = DatagramChannel.open()) {
439 
440                 InetAddress lh = InetAddress.getLoopbackAddress();
441                 dc2.bind(new InetSocketAddress(lh, 0));
442 
443                 // send from dc1 when current thread blocked in dc2.receive
444                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
445                 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
446 
447                 // read from dc2 should block
448                 ByteBuffer bb2 = ByteBuffer.allocate(10);
449                 dc2.receive(bb2);
450                 assertTrue(bb2.get(0) == 'X');
451             }
452         });
453     }
454 
455     /**
456      * DatagramChannel close while virtual thread blocked in receive.
457      */
458     @Test
459     void testDatagramChannelReceiveAsyncClose() throws Exception {
460         VThreadRunner.run(() -> {

461             try (DatagramChannel dc = DatagramChannel.open()) {
462                 InetAddress lh = InetAddress.getLoopbackAddress();
463                 dc.bind(new InetSocketAddress(lh, 0));
464                 runAfterParkedAsync(dc::close);
465                 try {
466                     dc.receive(ByteBuffer.allocate(100));
467                     fail("receive returned");
468                 } catch (AsynchronousCloseException expected) { }
469             }
470         });
471     }
472 
473     /**
474      * Virtual thread interrupted while blocked in DatagramChannel receive.
475      */
476     @Test
477     void testDatagramChannelReceiveInterrupt() throws Exception {
478         VThreadRunner.run(() -> {

479             try (DatagramChannel dc = DatagramChannel.open()) {
480                 InetAddress lh = InetAddress.getLoopbackAddress();
481                 dc.bind(new InetSocketAddress(lh, 0));
482 
483                 // interrupt current thread when it blocks in receive
484                 Thread thisThread = Thread.currentThread();
485                 runAfterParkedAsync(thisThread::interrupt);
486 
487                 try {
488                     dc.receive(ByteBuffer.allocate(100));
489                     fail("receive returned");
490                 } catch (ClosedByInterruptException expected) {
491                     assertTrue(Thread.interrupted());
492                 }
493             }
494         });
495     }
496 
497     /**
498      * Virtual thread blocks in DatagramSocket adaptor receive.
499      */
500     @Test
501     void testDatagramSocketAdaptorReceive1() throws Exception {
502         testDatagramSocketAdaptorReceive(0);

503     }
504 
505     /**
506      * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
507      */
508     @Test
509     void testDatagramSocketAdaptorReceive2() throws Exception {
510         testDatagramSocketAdaptorReceive(60_000);

511     }
512 
513     private void testDatagramSocketAdaptorReceive(int timeout) throws Exception {
514         VThreadRunner.run(() -> {

515             try (DatagramChannel dc1 = DatagramChannel.open();
516                  DatagramChannel dc2 = DatagramChannel.open()) {
517 
518                 InetAddress lh = InetAddress.getLoopbackAddress();
519                 dc2.bind(new InetSocketAddress(lh, 0));
520 
521                 // send from dc1 when current thread blocks in dc2 receive
522                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
523                 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
524 
525                 // receive should block
526                 byte[] array = new byte[100];
527                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
528                 if (timeout > 0)
529                     dc2.socket().setSoTimeout(timeout);
530                 dc2.socket().receive(p);
531                 assertTrue(p.getLength() == 3 && array[0] == 'X');
532             }
533         });
534     }
535 
536     /**
537      * DatagramChannel close while virtual thread blocked in adaptor receive.
538      */
539     @Test
540     void testDatagramSocketAdaptorReceiveAsyncClose1() throws Exception {
541         testDatagramSocketAdaptorReceiveAsyncClose(0);

542     }
543 
544     /**
545      * DatagramChannel close while virtual thread blocked in adaptor receive
546      * with timeout.
547      */
548     @Test
549     void testDatagramSocketAdaptorReceiveAsyncClose2() throws Exception {
550         testDatagramSocketAdaptorReceiveAsyncClose(60_1000);

551     }
552 
553     private void testDatagramSocketAdaptorReceiveAsyncClose(int timeout) throws Exception {
554         VThreadRunner.run(() -> {

555             try (DatagramChannel dc = DatagramChannel.open()) {
556                 InetAddress lh = InetAddress.getLoopbackAddress();
557                 dc.bind(new InetSocketAddress(lh, 0));
558 
559                 byte[] array = new byte[100];
560                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
561                 if (timeout > 0)
562                     dc.socket().setSoTimeout(timeout);
563 
564                 // close channel/socket when current thread blocks in receive
565                 runAfterParkedAsync(dc::close);
566 
567                 assertThrows(SocketException.class, () -> dc.socket().receive(p));
568             }
569         });
570     }
571 
572     /**
573      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
574      */
575     @Test
576     void testDatagramSocketAdaptorReceiveInterrupt1() throws Exception {
577         testDatagramSocketAdaptorReceiveInterrupt(0);

578     }
579 
580     /**
581      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
582      * with timeout.
583      */
584     @Test
585     void testDatagramSocketAdaptorReceiveInterrupt2() throws Exception {
586         testDatagramSocketAdaptorReceiveInterrupt(60_1000);

587     }
588 
589     private void testDatagramSocketAdaptorReceiveInterrupt(int timeout) throws Exception {
590         VThreadRunner.run(() -> {

591             try (DatagramChannel dc = DatagramChannel.open()) {
592                 InetAddress lh = InetAddress.getLoopbackAddress();
593                 dc.bind(new InetSocketAddress(lh, 0));
594 
595                 byte[] array = new byte[100];
596                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
597                 if (timeout > 0)
598                     dc.socket().setSoTimeout(timeout);
599 
600                 // interrupt current thread when it blocks in receive
601                 Thread thisThread = Thread.currentThread();
602                 runAfterParkedAsync(thisThread::interrupt);
603 
604                 try {
605                     dc.socket().receive(p);
606                     fail();
607                 } catch (ClosedByInterruptException expected) {
608                     assertTrue(Thread.interrupted());
609                 }
610             }
611         });
612     }
613 
614     /**
615      * Pipe read/write, no blocking.
616      */
617     @Test
618     void testPipeReadWrite1() throws Exception {
619         VThreadRunner.run(() -> {

620             Pipe p = Pipe.open();
621             try (Pipe.SinkChannel sink = p.sink();
622                  Pipe.SourceChannel source = p.source()) {
623 
624                 // write should not block
625                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
626                 int n = sink.write(bb);
627                 assertTrue(n > 0);
628 
629                 // read should not block
630                 bb = ByteBuffer.allocate(10);
631                 n = source.read(bb);
632                 assertTrue(n > 0);
633                 assertTrue(bb.get(0) == 'X');
634             }
635         });
636     }
637 
638     /**
639      * Virtual thread blocks in Pipe.SourceChannel read.
640      */
641     @Test
642     void testPipeReadWrite2() throws Exception {
643         VThreadRunner.run(() -> {

644             Pipe p = Pipe.open();
645             try (Pipe.SinkChannel sink = p.sink();
646                  Pipe.SourceChannel source = p.source()) {
647 
648                 // write from sink when current thread blocks reading from source
649                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
650                 runAfterParkedAsync(() -> sink.write(bb1));
651 
652                 // read should block
653                 ByteBuffer bb2 = ByteBuffer.allocate(10);
654                 int n = source.read(bb2);
655                 assertTrue(n > 0);
656                 assertTrue(bb2.get(0) == 'X');
657             }
658         });
659     }
660 
661     /**
662      * Virtual thread blocks in Pipe.SinkChannel write.
663      */
664     @Test
665     void testPipeReadWrite3() throws Exception {
666         VThreadRunner.run(() -> {

667             Pipe p = Pipe.open();
668             try (Pipe.SinkChannel sink = p.sink();
669                  Pipe.SourceChannel source = p.source()) {
670 
671                 // read from source to EOF when current thread blocking in write
672                 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
673 
674                 // write to sink should block
675                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
676                 for (int i=0; i<1000; i++) {
677                     int n = sink.write(bb);
678                     assertTrue(n > 0);
679                     bb.clear();
680                 }
681                 sink.close();
682 
683                 // wait for reader to finish
684                 reader.join();
685             }
686         });
687     }
688 
689     /**
690      * Pipe.SourceChannel close while virtual thread blocked in read.
691      */
692     @Test
693     void testPipeReadAsyncClose() throws Exception {
694         VThreadRunner.run(() -> {

695             Pipe p = Pipe.open();
696             try (Pipe.SinkChannel sink = p.sink();
697                  Pipe.SourceChannel source = p.source()) {
698                 runAfterParkedAsync(source::close);
699                 try {
700                     int n = source.read(ByteBuffer.allocate(100));
701                     fail("read returned " + n);
702                 } catch (AsynchronousCloseException expected) { }
703             }
704         });
705     }
706 
707     /**
708      * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
709      */
710     @Test
711     void testPipeReadInterrupt() throws Exception {
712         VThreadRunner.run(() -> {

713             Pipe p = Pipe.open();
714             try (Pipe.SinkChannel sink = p.sink();
715                  Pipe.SourceChannel source = p.source()) {
716 
717                 // interrupt current thread when it blocks reading from source
718                 Thread thisThread = Thread.currentThread();
719                 runAfterParkedAsync(thisThread::interrupt);
720 
721                 try {
722                     int n = source.read(ByteBuffer.allocate(100));
723                     fail("read returned " + n);
724                 } catch (ClosedByInterruptException expected) {
725                     assertTrue(Thread.interrupted());
726                 }
727             }
728         });
729     }
730 
731     /**
732      * Pipe.SinkChannel close while virtual thread blocked in write.
733      */
734     @Test
735     void testPipeWriteAsyncClose() throws Exception {
736         VThreadRunner.run(() -> {

737             boolean retry = true;
738             while (retry) {
739                 Pipe p = Pipe.open();
740                 try (Pipe.SinkChannel sink = p.sink();
741                      Pipe.SourceChannel source = p.source()) {
742 
743                     // close sink when current thread blocks in write
744                     runAfterParkedAsync(sink::close);
745                     try {
746                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
747                         for (;;) {
748                             int n = sink.write(bb);
749                             assertTrue(n > 0);
750                             bb.clear();
751                         }
752                     } catch (AsynchronousCloseException e) {
753                         // closed when blocked in write
754                         retry = false;
755                     } catch (ClosedChannelException e) {
756                         // closed when not blocked in write, need to retry test
757                     }
758                 }
759             }
760         });
761     }
762 
763     /**
764      * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
765      */
766     @Test
767     void testPipeWriteInterrupt() throws Exception {
768         VThreadRunner.run(() -> {

769             boolean retry = true;
770             while (retry) {
771                 Pipe p = Pipe.open();
772                 try (Pipe.SinkChannel sink = p.sink();
773                      Pipe.SourceChannel source = p.source()) {
774 
775                     // interrupt current thread when it blocks in write
776                     Thread thisThread = Thread.currentThread();
777                     runAfterParkedAsync(thisThread::interrupt);
778 
779                     try {
780                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
781                         for (;;) {
782                             int n = sink.write(bb);
783                             assertTrue(n > 0);
784                             bb.clear();
785                         }
786                     } catch (ClosedByInterruptException expected) {
787                         // closed when blocked in write
788                         assertTrue(Thread.interrupted());

  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /*
 25  * @test id=default
 26  * @bug 8284161
 27  * @summary Test virtual threads doing blocking I/O on NIO channels
 28  * @library /test/lib
 29  * @run junit/othervm BlockingChannelOps
 30  */
 31 
 32 /*
 33  * @test id=poller-modes
 34  * @requires (os.family == "linux") | (os.family == "mac")
 35  * @library /test/lib
 36  * @run junit/othervm -Djdk.pollerMode=1 BlockingChannelOps
 37  * @run junit/othervm -Djdk.pollerMode=2 BlockingChannelOps
 38  * @run junit/othervm -Djdk.pollerMode=3 BlockingChannelOps
 39  */
 40 
 41 /*
 42  * @test id=io_uring
 43  * @requires os.family == "linux"
 44  * @library /test/lib
 45  * @run junit/othervm -Djdk.pollerMode=1 -Djdk.io_uring=true BlockingChannelOps
 46  * @run junit/othervm -Djdk.pollerMode=2 -Djdk.io_uring=true BlockingChannelOps
 47  * @run junit/othervm -Djdk.pollerMode=3 -Djdk.io_uring=true BlockingChannelOps
 48  */
 49 
 50 /*
 51  * @test id=no-vmcontinuations
 52  * @requires vm.continuations
 53  * @library /test/lib
 54  * @run junit/othervm -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
 55  */
 56 
 57 import java.io.Closeable;
 58 import java.io.IOException;
 59 import java.net.DatagramPacket;
 60 import java.net.InetAddress;
 61 import java.net.InetSocketAddress;
 62 import java.net.Socket;
 63 import java.net.SocketAddress;
 64 import java.net.SocketException;
 65 import java.nio.ByteBuffer;
 66 import java.nio.channels.AsynchronousCloseException;
 67 import java.nio.channels.ClosedByInterruptException;
 68 import java.nio.channels.ClosedChannelException;
 69 import java.nio.channels.DatagramChannel;
 70 import java.nio.channels.Pipe;
 71 import java.nio.channels.ReadableByteChannel;
 72 import java.nio.channels.ServerSocketChannel;
 73 import java.nio.channels.SocketChannel;
 74 import java.nio.channels.WritableByteChannel;
 75 import java.util.concurrent.ExecutorService;
 76 import java.util.concurrent.Executors;
 77 import java.util.concurrent.ThreadFactory;
 78 import java.util.stream.Stream;
 79 
 80 import jdk.test.lib.thread.VThreadRunner;
 81 import jdk.test.lib.thread.VThreadScheduler;
 82 
 83 import org.junit.jupiter.api.BeforeAll;
 84 import org.junit.jupiter.params.ParameterizedTest;
 85 import org.junit.jupiter.params.provider.MethodSource;
 86 import static org.junit.jupiter.api.Assertions.*;
 87 
 88 class BlockingChannelOps {
 89     private static ExecutorService threadPool;
 90     private static Thread.VirtualThreadScheduler customScheduler;
 91 
 92     @BeforeAll
 93     static void setup() {
 94         ThreadFactory factory = Thread.ofPlatform().daemon().factory();
 95         threadPool = Executors.newCachedThreadPool(factory);
 96         customScheduler = (_, task) -> threadPool.execute(task);
 97     }
 98 
 99     /**
100      * Returns the Thread.Builder to create the virtual thread.
101      */
102     static Stream<Thread.Builder.OfVirtual> threadBuilders() {
103         if (VThreadScheduler.supportsCustomScheduler()) {
104             return Stream.of(Thread.ofVirtual(), Thread.ofVirtual().scheduler(customScheduler));
105         } else {
106             return Stream.of(Thread.ofVirtual());
107         }
108     }
109 
110     /**
111      * SocketChannel read/write, no blocking.
112      */
113     @ParameterizedTest
114     @MethodSource("threadBuilders")
115     void testSocketChannelReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
116         VThreadRunner.run(builder, () -> {
117             try (var connection = new Connection()) {
118                 SocketChannel sc1 = connection.channel1();
119                 SocketChannel sc2 = connection.channel2();
120 
121                 // write to sc1
122                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
123                 int n = sc1.write(bb);
124                 assertTrue(n > 0);
125 
126                 // read from sc2 should not block
127                 bb = ByteBuffer.allocate(10);
128                 n = sc2.read(bb);
129                 assertTrue(n > 0);
130                 assertTrue(bb.get(0) == 'X');
131             }
132         });
133     }
134 
135     /**
136      * Virtual thread blocks in SocketChannel read.
137      */
138     @ParameterizedTest
139     @MethodSource("threadBuilders")
140     void testSocketChannelRead(Thread.Builder.OfVirtual builder) throws Exception {
141         VThreadRunner.run(builder, () -> {
142             try (var connection = new Connection()) {
143                 SocketChannel sc1 = connection.channel1();
144                 SocketChannel sc2 = connection.channel2();
145 
146                 // write to sc1 when current thread blocks in sc2.read
147                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
148                 runAfterParkedAsync(() -> sc1.write(bb1));
149 
150                 // read from sc2 should block
151                 ByteBuffer bb2 = ByteBuffer.allocate(10);
152                 int n = sc2.read(bb2);
153                 assertTrue(n > 0);
154                 assertTrue(bb2.get(0) == 'X');
155             }
156         });
157     }
158 
159     /**
160      * Virtual thread blocks in SocketChannel write.
161      */
162     @ParameterizedTest
163     @MethodSource("threadBuilders")
164     void testSocketChannelWrite(Thread.Builder.OfVirtual builder) throws Exception {
165         VThreadRunner.run(builder, () -> {
166             try (var connection = new Connection()) {
167                 SocketChannel sc1 = connection.channel1();
168                 SocketChannel sc2 = connection.channel2();
169 
170                 // read from sc2 to EOF when current thread blocks in sc1.write
171                 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
172 
173                 // write to sc1 should block
174                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
175                 for (int i=0; i<1000; i++) {
176                     int n = sc1.write(bb);
177                     assertTrue(n > 0);
178                     bb.clear();
179                 }
180                 sc1.close();
181 
182                 // wait for reader to finish
183                 reader.join();
184             }
185         });
186     }
187 
188     /**
189      * SocketChannel close while virtual thread blocked in read.
190      */
191     @ParameterizedTest
192     @MethodSource("threadBuilders")
193     void testSocketChannelReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
194         VThreadRunner.run(builder, () -> {
195             try (var connection = new Connection()) {
196                 SocketChannel sc = connection.channel1();
197                 runAfterParkedAsync(sc::close);
198                 try {
199                     int n = sc.read(ByteBuffer.allocate(100));
200                     fail("read returned " + n);
201                 } catch (AsynchronousCloseException expected) { }
202             }
203         });
204     }
205 
206     /**
207      * SocketChannel shutdownInput while virtual thread blocked in read.
208      */
209     @ParameterizedTest
210     @MethodSource("threadBuilders")
211     void testSocketChannelReadAsyncShutdownInput(Thread.Builder.OfVirtual builder) throws Exception {
212         VThreadRunner.run(builder, () -> {
213             try (var connection = new Connection()) {
214                 SocketChannel sc = connection.channel1();
215                 runAfterParkedAsync(sc::shutdownInput);
216                 int n = sc.read(ByteBuffer.allocate(100));
217                 assertEquals(-1, n);
218                 assertTrue(sc.isOpen());
219             }
220         });
221     }
222 
223     /**
224      * Virtual thread interrupted while blocked in SocketChannel read.
225      */
226     @ParameterizedTest
227     @MethodSource("threadBuilders")
228     void testSocketChannelReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
229         VThreadRunner.run(builder, () -> {
230             try (var connection = new Connection()) {
231                 SocketChannel sc = connection.channel1();
232 
233                 // interrupt current thread when it blocks in read
234                 Thread thisThread = Thread.currentThread();
235                 runAfterParkedAsync(thisThread::interrupt);
236 
237                 try {
238                     int n = sc.read(ByteBuffer.allocate(100));
239                     fail("read returned " + n);
240                 } catch (ClosedByInterruptException expected) {
241                     assertTrue(Thread.interrupted());
242                 }
243             }
244         });
245     }
246 
247     /**
248      * SocketChannel close while virtual thread blocked in write.
249      */
250     @ParameterizedTest
251     @MethodSource("threadBuilders")
252     void testSocketChannelWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
253         VThreadRunner.run(builder, () -> {
254             boolean retry = true;
255             while (retry) {
256                 try (var connection = new Connection()) {
257                     SocketChannel sc = connection.channel1();
258 
259                     // close sc when current thread blocks in write
260                     runAfterParkedAsync(sc::close);
261                     try {
262                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
263                         for (;;) {
264                             int n = sc.write(bb);
265                             assertTrue(n > 0);
266                             bb.clear();
267                         }
268                     } catch (AsynchronousCloseException expected) {
269                         // closed when blocked in write
270                         retry = false;
271                     } catch (ClosedChannelException e) {
272                         // closed when not blocked in write, need to retry test
273                     }
274                 }
275             }
276         });
277     }
278 
279 
280     /**
281      * SocketChannel shutdownOutput while virtual thread blocked in write.
282      */
283     @ParameterizedTest
284     @MethodSource("threadBuilders")
285     void testSocketChannelWriteAsyncShutdownOutput(Thread.Builder.OfVirtual builder) throws Exception {
286         VThreadRunner.run(builder, () -> {
287             try (var connection = new Connection()) {
288                 SocketChannel sc = connection.channel1();
289 
290                 // shutdown output when current thread blocks in write
291                 runAfterParkedAsync(sc::shutdownOutput);
292                 try {
293                     ByteBuffer bb = ByteBuffer.allocate(100*1024);
294                     for (;;) {
295                         int n = sc.write(bb);
296                         assertTrue(n > 0);
297                         bb.clear();
298                     }
299                 } catch (ClosedChannelException e) {
300                     // expected
301                 }
302                 assertTrue(sc.isOpen());
303             }
304         });
305     }
306 
307     /**
308      * Virtual thread interrupted while blocked in SocketChannel write.
309      */
310     @ParameterizedTest
311     @MethodSource("threadBuilders")
312     void testSocketChannelWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
313         VThreadRunner.run(builder, () -> {
314             boolean retry = true;
315             while (retry) {
316                 try (var connection = new Connection()) {
317                     SocketChannel sc = connection.channel1();
318 
319                     // interrupt current thread when it blocks in write
320                     Thread thisThread = Thread.currentThread();
321                     runAfterParkedAsync(thisThread::interrupt);
322 
323                     try {
324                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
325                         for (;;) {
326                             int n = sc.write(bb);
327                             assertTrue(n > 0);
328                             bb.clear();
329                         }
330                     } catch (ClosedByInterruptException e) {
331                         // closed when blocked in write
332                         assertTrue(Thread.interrupted());
333                         retry = false;
334                     } catch (ClosedChannelException e) {
335                         // closed when not blocked in write, need to retry test
336                     }
337                 }
338             }
339         });
340     }
341 
342     /**
343      * Virtual thread blocks in SocketChannel adaptor read.
344      */
345     @ParameterizedTest
346     @MethodSource("threadBuilders")
347     void testSocketAdaptorRead1(Thread.Builder.OfVirtual builder) throws Exception {
348         testSocketAdaptorRead(builder, 0);
349     }
350 
351     /**
352      * Virtual thread blocks in SocketChannel adaptor read with timeout.
353      */
354     @ParameterizedTest
355     @MethodSource("threadBuilders")
356     void testSocketAdaptorRead2(Thread.Builder.OfVirtual builder) throws Exception {
357         testSocketAdaptorRead(builder, 60_000);
358     }
359 
360     private void testSocketAdaptorRead(Thread.Builder.OfVirtual builder,
361                                        int timeout) throws Exception {
362         VThreadRunner.run(builder, () -> {
363             try (var connection = new Connection()) {
364                 SocketChannel sc1 = connection.channel1();
365                 SocketChannel sc2 = connection.channel2();
366 
367                 // write to sc1 when currnet thread blocks reading from sc2
368                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
369                 runAfterParkedAsync(() -> sc1.write(bb));
370 
371                 // read from sc2 should block
372                 byte[] array = new byte[100];
373                 if (timeout > 0)
374                     sc2.socket().setSoTimeout(timeout);
375                 int n = sc2.socket().getInputStream().read(array);
376                 assertTrue(n > 0);
377                 assertTrue(array[0] == 'X');
378             }
379         });
380     }
381 
382     /**
383      * ServerSocketChannel accept, no blocking.
384      */
385     @ParameterizedTest
386     @MethodSource("threadBuilders")
387     void testServerSocketChannelAccept1(Thread.Builder.OfVirtual builder) throws Exception {
388         VThreadRunner.run(builder, () -> {
389             try (var ssc = ServerSocketChannel.open()) {
390                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
391                 var sc1 = SocketChannel.open(ssc.getLocalAddress());
392                 // accept should not block
393                 var sc2 = ssc.accept();
394                 sc1.close();
395                 sc2.close();
396             }
397         });
398     }
399 
400     /**
401      * Virtual thread blocks in ServerSocketChannel accept.
402      */
403     @ParameterizedTest
404     @MethodSource("threadBuilders")
405     void testServerSocketChannelAccept2(Thread.Builder.OfVirtual builder) throws Exception {
406         VThreadRunner.run(builder, () -> {
407             try (var ssc = ServerSocketChannel.open()) {
408                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
409                 var sc1 = SocketChannel.open();
410 
411                 // connect when current thread when it blocks in accept
412                 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
413 
414                 // accept should block
415                 var sc2 = ssc.accept();
416                 sc1.close();
417                 sc2.close();
418             }
419         });
420     }
421 
422     /**
423      * SeverSocketChannel close while virtual thread blocked in accept.
424      */
425     @ParameterizedTest
426     @MethodSource("threadBuilders")
427     void testServerSocketChannelAcceptAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
428         VThreadRunner.run(builder, () -> {
429             try (var ssc = ServerSocketChannel.open()) {
430                 InetAddress lh = InetAddress.getLoopbackAddress();
431                 ssc.bind(new InetSocketAddress(lh, 0));
432                 runAfterParkedAsync(ssc::close);
433                 try {
434                     SocketChannel sc = ssc.accept();
435                     sc.close();
436                     fail("connection accepted???");
437                 } catch (AsynchronousCloseException expected) { }
438             }
439         });
440     }
441 
442     /**
443      * Virtual thread interrupted while blocked in ServerSocketChannel accept.
444      */
445     @ParameterizedTest
446     @MethodSource("threadBuilders")
447     void testServerSocketChannelAcceptInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
448         VThreadRunner.run(builder, () -> {
449             try (var ssc = ServerSocketChannel.open()) {
450                 InetAddress lh = InetAddress.getLoopbackAddress();
451                 ssc.bind(new InetSocketAddress(lh, 0));
452 
453                 // interrupt current thread when it blocks in accept
454                 Thread thisThread = Thread.currentThread();
455                 runAfterParkedAsync(thisThread::interrupt);
456 
457                 try {
458                     SocketChannel sc = ssc.accept();
459                     sc.close();
460                     fail("connection accepted???");
461                 } catch (ClosedByInterruptException expected) {
462                     assertTrue(Thread.interrupted());
463                 }
464             }
465         });
466     }
467 
468     /**
469      * Virtual thread blocks in ServerSocketChannel adaptor accept.
470      */
471     @ParameterizedTest
472     @MethodSource("threadBuilders")
473     void testSocketChannelAdaptorAccept1(Thread.Builder.OfVirtual builder) throws Exception {
474         testSocketChannelAdaptorAccept(builder, 0);
475     }
476 
477     /**
478      * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
479      */
480     @ParameterizedTest
481     @MethodSource("threadBuilders")
482     void testSocketChannelAdaptorAccept2(Thread.Builder.OfVirtual builder) throws Exception {
483         testSocketChannelAdaptorAccept(builder, 60_000);
484     }
485 
486     private void testSocketChannelAdaptorAccept(Thread.Builder.OfVirtual builder,
487                                                 int timeout) throws Exception {
488         VThreadRunner.run(builder, () -> {
489             try (var ssc = ServerSocketChannel.open()) {
490                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
491                 var sc = SocketChannel.open();
492 
493                 // interrupt current thread when it blocks in accept
494                 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
495 
496                 // accept should block
497                 if (timeout > 0)
498                     ssc.socket().setSoTimeout(timeout);
499                 Socket s = ssc.socket().accept();
500                 sc.close();
501                 s.close();
502             }
503         });
504     }
505 
506     /**
507      * DatagramChannel receive/send, no blocking.
508      */
509     @ParameterizedTest
510     @MethodSource("threadBuilders")
511     void testDatagramChannelSendReceive1(Thread.Builder.OfVirtual builder) throws Exception {
512         VThreadRunner.run(builder, () -> {
513             try (DatagramChannel dc1 = DatagramChannel.open();
514                  DatagramChannel dc2 = DatagramChannel.open()) {
515 
516                 InetAddress lh = InetAddress.getLoopbackAddress();
517                 dc2.bind(new InetSocketAddress(lh, 0));
518 
519                 // send should not block
520                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
521                 int n = dc1.send(bb, dc2.getLocalAddress());
522                 assertTrue(n > 0);
523 
524                 // receive should not block
525                 bb = ByteBuffer.allocate(10);
526                 dc2.receive(bb);
527                 assertTrue(bb.get(0) == 'X');
528             }
529         });
530     }
531 
532     /**
533      * Virtual thread blocks in DatagramChannel receive.
534      */
535     @ParameterizedTest
536     @MethodSource("threadBuilders")
537     void testDatagramChannelSendReceive2(Thread.Builder.OfVirtual builder) throws Exception {
538         VThreadRunner.run(builder, () -> {
539             try (DatagramChannel dc1 = DatagramChannel.open();
540                  DatagramChannel dc2 = DatagramChannel.open()) {
541 
542                 InetAddress lh = InetAddress.getLoopbackAddress();
543                 dc2.bind(new InetSocketAddress(lh, 0));
544 
545                 // send from dc1 when current thread blocked in dc2.receive
546                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
547                 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
548 
549                 // read from dc2 should block
550                 ByteBuffer bb2 = ByteBuffer.allocate(10);
551                 dc2.receive(bb2);
552                 assertTrue(bb2.get(0) == 'X');
553             }
554         });
555     }
556 
557     /**
558      * DatagramChannel close while virtual thread blocked in receive.
559      */
560     @ParameterizedTest
561     @MethodSource("threadBuilders")
562     void testDatagramChannelReceiveAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
563         VThreadRunner.run(builder, () -> {
564             try (DatagramChannel dc = DatagramChannel.open()) {
565                 InetAddress lh = InetAddress.getLoopbackAddress();
566                 dc.bind(new InetSocketAddress(lh, 0));
567                 runAfterParkedAsync(dc::close);
568                 try {
569                     dc.receive(ByteBuffer.allocate(100));
570                     fail("receive returned");
571                 } catch (AsynchronousCloseException expected) { }
572             }
573         });
574     }
575 
576     /**
577      * Virtual thread interrupted while blocked in DatagramChannel receive.
578      */
579     @ParameterizedTest
580     @MethodSource("threadBuilders")
581     void testDatagramChannelReceiveInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
582         VThreadRunner.run(builder, () -> {
583             try (DatagramChannel dc = DatagramChannel.open()) {
584                 InetAddress lh = InetAddress.getLoopbackAddress();
585                 dc.bind(new InetSocketAddress(lh, 0));
586 
587                 // interrupt current thread when it blocks in receive
588                 Thread thisThread = Thread.currentThread();
589                 runAfterParkedAsync(thisThread::interrupt);
590 
591                 try {
592                     dc.receive(ByteBuffer.allocate(100));
593                     fail("receive returned");
594                 } catch (ClosedByInterruptException expected) {
595                     assertTrue(Thread.interrupted());
596                 }
597             }
598         });
599     }
600 
601     /**
602      * Virtual thread blocks in DatagramSocket adaptor receive.
603      */
604     @ParameterizedTest
605     @MethodSource("threadBuilders")
606     void testDatagramSocketAdaptorReceive1(Thread.Builder.OfVirtual builder) throws Exception {
607         testDatagramSocketAdaptorReceive(builder, 0);
608     }
609 
610     /**
611      * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
612      */
613     @ParameterizedTest
614     @MethodSource("threadBuilders")
615     void testDatagramSocketAdaptorReceive2(Thread.Builder.OfVirtual builder) throws Exception {
616         testDatagramSocketAdaptorReceive(builder, 60_000);
617     }
618 
619     private void testDatagramSocketAdaptorReceive(Thread.Builder.OfVirtual builder,
620                                                   int timeout) throws Exception {
621         VThreadRunner.run(builder, () -> {
622             try (DatagramChannel dc1 = DatagramChannel.open();
623                  DatagramChannel dc2 = DatagramChannel.open()) {
624 
625                 InetAddress lh = InetAddress.getLoopbackAddress();
626                 dc2.bind(new InetSocketAddress(lh, 0));
627 
628                 // send from dc1 when current thread blocks in dc2 receive
629                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
630                 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
631 
632                 // receive should block
633                 byte[] array = new byte[100];
634                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
635                 if (timeout > 0)
636                     dc2.socket().setSoTimeout(timeout);
637                 dc2.socket().receive(p);
638                 assertTrue(p.getLength() == 3 && array[0] == 'X');
639             }
640         });
641     }
642 
643     /**
644      * DatagramChannel close while virtual thread blocked in adaptor receive.
645      */
646     @ParameterizedTest
647     @MethodSource("threadBuilders")
648     void testDatagramSocketAdaptorReceiveAsyncClose1(Thread.Builder.OfVirtual builder) throws Exception {
649         testDatagramSocketAdaptorReceiveAsyncClose(builder, 0);
650     }
651 
652     /**
653      * DatagramChannel close while virtual thread blocked in adaptor receive
654      * with timeout.
655      */
656     @ParameterizedTest
657     @MethodSource("threadBuilders")
658     void testDatagramSocketAdaptorReceiveAsyncClose2(Thread.Builder.OfVirtual builder) throws Exception {
659         testDatagramSocketAdaptorReceiveAsyncClose(builder, 60_1000);
660     }
661 
662     private void testDatagramSocketAdaptorReceiveAsyncClose(Thread.Builder.OfVirtual builder,
663                                                             int timeout) throws Exception {
664         VThreadRunner.run(builder, () -> {
665             try (DatagramChannel dc = DatagramChannel.open()) {
666                 InetAddress lh = InetAddress.getLoopbackAddress();
667                 dc.bind(new InetSocketAddress(lh, 0));
668 
669                 byte[] array = new byte[100];
670                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
671                 if (timeout > 0)
672                     dc.socket().setSoTimeout(timeout);
673 
674                 // close channel/socket when current thread blocks in receive
675                 runAfterParkedAsync(dc::close);
676 
677                 assertThrows(SocketException.class, () -> dc.socket().receive(p));
678             }
679         });
680     }
681 
682     /**
683      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
684      */
685     @ParameterizedTest
686     @MethodSource("threadBuilders")
687     void testDatagramSocketAdaptorReceiveInterrupt1(Thread.Builder.OfVirtual builder) throws Exception {
688         testDatagramSocketAdaptorReceiveInterrupt(builder, 0);
689     }
690 
691     /**
692      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
693      * with timeout.
694      */
695     @ParameterizedTest
696     @MethodSource("threadBuilders")
697     void testDatagramSocketAdaptorReceiveInterrupt2(Thread.Builder.OfVirtual builder) throws Exception {
698         testDatagramSocketAdaptorReceiveInterrupt(builder, 60_1000);
699     }
700 
701     private void testDatagramSocketAdaptorReceiveInterrupt(Thread.Builder.OfVirtual builder,
702                                                            int timeout) throws Exception {
703         VThreadRunner.run(builder, () -> {
704             try (DatagramChannel dc = DatagramChannel.open()) {
705                 InetAddress lh = InetAddress.getLoopbackAddress();
706                 dc.bind(new InetSocketAddress(lh, 0));
707 
708                 byte[] array = new byte[100];
709                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
710                 if (timeout > 0)
711                     dc.socket().setSoTimeout(timeout);
712 
713                 // interrupt current thread when it blocks in receive
714                 Thread thisThread = Thread.currentThread();
715                 runAfterParkedAsync(thisThread::interrupt);
716 
717                 try {
718                     dc.socket().receive(p);
719                     fail();
720                 } catch (ClosedByInterruptException expected) {
721                     assertTrue(Thread.interrupted());
722                 }
723             }
724         });
725     }
726 
727     /**
728      * Pipe read/write, no blocking.
729      */
730     @ParameterizedTest
731     @MethodSource("threadBuilders")
732     void testPipeReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
733         VThreadRunner.run(builder, () -> {
734             Pipe p = Pipe.open();
735             try (Pipe.SinkChannel sink = p.sink();
736                  Pipe.SourceChannel source = p.source()) {
737 
738                 // write should not block
739                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
740                 int n = sink.write(bb);
741                 assertTrue(n > 0);
742 
743                 // read should not block
744                 bb = ByteBuffer.allocate(10);
745                 n = source.read(bb);
746                 assertTrue(n > 0);
747                 assertTrue(bb.get(0) == 'X');
748             }
749         });
750     }
751 
752     /**
753      * Virtual thread blocks in Pipe.SourceChannel read.
754      */
755     @ParameterizedTest
756     @MethodSource("threadBuilders")
757     void testPipeReadWrite2(Thread.Builder.OfVirtual builder) throws Exception {
758         VThreadRunner.run(builder, () -> {
759             Pipe p = Pipe.open();
760             try (Pipe.SinkChannel sink = p.sink();
761                  Pipe.SourceChannel source = p.source()) {
762 
763                 // write from sink when current thread blocks reading from source
764                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
765                 runAfterParkedAsync(() -> sink.write(bb1));
766 
767                 // read should block
768                 ByteBuffer bb2 = ByteBuffer.allocate(10);
769                 int n = source.read(bb2);
770                 assertTrue(n > 0);
771                 assertTrue(bb2.get(0) == 'X');
772             }
773         });
774     }
775 
776     /**
777      * Virtual thread blocks in Pipe.SinkChannel write.
778      */
779     @ParameterizedTest
780     @MethodSource("threadBuilders")
781     void testPipeReadWrite3(Thread.Builder.OfVirtual builder) throws Exception {
782         VThreadRunner.run(builder, () -> {
783             Pipe p = Pipe.open();
784             try (Pipe.SinkChannel sink = p.sink();
785                  Pipe.SourceChannel source = p.source()) {
786 
787                 // read from source to EOF when current thread blocking in write
788                 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
789 
790                 // write to sink should block
791                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
792                 for (int i=0; i<1000; i++) {
793                     int n = sink.write(bb);
794                     assertTrue(n > 0);
795                     bb.clear();
796                 }
797                 sink.close();
798 
799                 // wait for reader to finish
800                 reader.join();
801             }
802         });
803     }
804 
805     /**
806      * Pipe.SourceChannel close while virtual thread blocked in read.
807      */
808     @ParameterizedTest
809     @MethodSource("threadBuilders")
810     void testPipeReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
811         VThreadRunner.run(builder, () -> {
812             Pipe p = Pipe.open();
813             try (Pipe.SinkChannel sink = p.sink();
814                  Pipe.SourceChannel source = p.source()) {
815                 runAfterParkedAsync(source::close);
816                 try {
817                     int n = source.read(ByteBuffer.allocate(100));
818                     fail("read returned " + n);
819                 } catch (AsynchronousCloseException expected) { }
820             }
821         });
822     }
823 
824     /**
825      * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
826      */
827     @ParameterizedTest
828     @MethodSource("threadBuilders")
829     void testPipeReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
830         VThreadRunner.run(builder, () -> {
831             Pipe p = Pipe.open();
832             try (Pipe.SinkChannel sink = p.sink();
833                  Pipe.SourceChannel source = p.source()) {
834 
835                 // interrupt current thread when it blocks reading from source
836                 Thread thisThread = Thread.currentThread();
837                 runAfterParkedAsync(thisThread::interrupt);
838 
839                 try {
840                     int n = source.read(ByteBuffer.allocate(100));
841                     fail("read returned " + n);
842                 } catch (ClosedByInterruptException expected) {
843                     assertTrue(Thread.interrupted());
844                 }
845             }
846         });
847     }
848 
849     /**
850      * Pipe.SinkChannel close while virtual thread blocked in write.
851      */
852     @ParameterizedTest
853     @MethodSource("threadBuilders")
854     void testPipeWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
855         VThreadRunner.run(builder, () -> {
856             boolean retry = true;
857             while (retry) {
858                 Pipe p = Pipe.open();
859                 try (Pipe.SinkChannel sink = p.sink();
860                      Pipe.SourceChannel source = p.source()) {
861 
862                     // close sink when current thread blocks in write
863                     runAfterParkedAsync(sink::close);
864                     try {
865                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
866                         for (;;) {
867                             int n = sink.write(bb);
868                             assertTrue(n > 0);
869                             bb.clear();
870                         }
871                     } catch (AsynchronousCloseException e) {
872                         // closed when blocked in write
873                         retry = false;
874                     } catch (ClosedChannelException e) {
875                         // closed when not blocked in write, need to retry test
876                     }
877                 }
878             }
879         });
880     }
881 
882     /**
883      * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
884      */
885     @ParameterizedTest
886     @MethodSource("threadBuilders")
887     void testPipeWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
888         VThreadRunner.run(builder, () -> {
889             boolean retry = true;
890             while (retry) {
891                 Pipe p = Pipe.open();
892                 try (Pipe.SinkChannel sink = p.sink();
893                      Pipe.SourceChannel source = p.source()) {
894 
895                     // interrupt current thread when it blocks in write
896                     Thread thisThread = Thread.currentThread();
897                     runAfterParkedAsync(thisThread::interrupt);
898 
899                     try {
900                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
901                         for (;;) {
902                             int n = sink.write(bb);
903                             assertTrue(n > 0);
904                             bb.clear();
905                         }
906                     } catch (ClosedByInterruptException expected) {
907                         // closed when blocked in write
908                         assertTrue(Thread.interrupted());
< prev index next >