< prev index next >

test/jdk/java/nio/channels/vthread/BlockingChannelOps.java

Print this page

 45  */
 46 
 47 import java.io.Closeable;
 48 import java.io.IOException;
 49 import java.net.DatagramPacket;
 50 import java.net.InetAddress;
 51 import java.net.InetSocketAddress;
 52 import java.net.Socket;
 53 import java.net.SocketAddress;
 54 import java.net.SocketException;
 55 import java.nio.ByteBuffer;
 56 import java.nio.channels.AsynchronousCloseException;
 57 import java.nio.channels.ClosedByInterruptException;
 58 import java.nio.channels.ClosedChannelException;
 59 import java.nio.channels.DatagramChannel;
 60 import java.nio.channels.Pipe;
 61 import java.nio.channels.ReadableByteChannel;
 62 import java.nio.channels.ServerSocketChannel;
 63 import java.nio.channels.SocketChannel;
 64 import java.nio.channels.WritableByteChannel;



 65 
 66 import jdk.test.lib.thread.VThreadRunner;
 67 import org.junit.jupiter.api.Test;





 68 import static org.junit.jupiter.api.Assertions.*;
 69 
 70 class BlockingChannelOps {
























 71 
 72     /**
 73      * SocketChannel read/write, no blocking.
 74      */
 75     @Test
 76     void testSocketChannelReadWrite1() throws Exception {
 77         VThreadRunner.run(() -> {

 78             try (var connection = new Connection()) {
 79                 SocketChannel sc1 = connection.channel1();
 80                 SocketChannel sc2 = connection.channel2();
 81 
 82                 // write to sc1
 83                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
 84                 int n = sc1.write(bb);
 85                 assertTrue(n > 0);
 86 
 87                 // read from sc2 should not block
 88                 bb = ByteBuffer.allocate(10);
 89                 n = sc2.read(bb);
 90                 assertTrue(n > 0);
 91                 assertTrue(bb.get(0) == 'X');
 92             }
 93         });
 94     }
 95 
 96     /**
 97      * Virtual thread blocks in SocketChannel read.
 98      */
 99     @Test
100     void testSocketChannelRead() throws Exception {
101         VThreadRunner.run(() -> {

102             try (var connection = new Connection()) {
103                 SocketChannel sc1 = connection.channel1();
104                 SocketChannel sc2 = connection.channel2();
105 
106                 // write to sc1 when current thread blocks in sc2.read
107                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
108                 runAfterParkedAsync(() -> sc1.write(bb1));
109 
110                 // read from sc2 should block
111                 ByteBuffer bb2 = ByteBuffer.allocate(10);
112                 int n = sc2.read(bb2);
113                 assertTrue(n > 0);
114                 assertTrue(bb2.get(0) == 'X');
115             }
116         });
117     }
118 
119     /**
120      * Virtual thread blocks in SocketChannel write.
121      */
122     @Test
123     void testSocketChannelWrite() throws Exception {
124         VThreadRunner.run(() -> {

125             try (var connection = new Connection()) {
126                 SocketChannel sc1 = connection.channel1();
127                 SocketChannel sc2 = connection.channel2();
128 
129                 // read from sc2 to EOF when current thread blocks in sc1.write
130                 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
131 
132                 // write to sc1 should block
133                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
134                 for (int i=0; i<1000; i++) {
135                     int n = sc1.write(bb);
136                     assertTrue(n > 0);
137                     bb.clear();
138                 }
139                 sc1.close();
140 
141                 // wait for reader to finish
142                 reader.join();
143             }
144         });
145     }
146 
147     /**
148      * SocketChannel close while virtual thread blocked in read.
149      */
150     @Test
151     void testSocketChannelReadAsyncClose() throws Exception {
152         VThreadRunner.run(() -> {

153             try (var connection = new Connection()) {
154                 SocketChannel sc = connection.channel1();
155                 runAfterParkedAsync(sc::close);
156                 try {
157                     int n = sc.read(ByteBuffer.allocate(100));
158                     fail("read returned " + n);
159                 } catch (AsynchronousCloseException expected) { }
160             }
161         });
162     }
163 

















164     /**
165      * Virtual thread interrupted while blocked in SocketChannel read.
166      */
167     @Test
168     void testSocketChannelReadInterrupt() throws Exception {
169         VThreadRunner.run(() -> {

170             try (var connection = new Connection()) {
171                 SocketChannel sc = connection.channel1();
172 
173                 // interrupt current thread when it blocks in read
174                 Thread thisThread = Thread.currentThread();
175                 runAfterParkedAsync(thisThread::interrupt);
176 
177                 try {
178                     int n = sc.read(ByteBuffer.allocate(100));
179                     fail("read returned " + n);
180                 } catch (ClosedByInterruptException expected) {
181                     assertTrue(Thread.interrupted());
182                 }
183             }
184         });
185     }
186 
187     /**
188      * SocketChannel close while virtual thread blocked in write.
189      */
190     @Test
191     void testSocketChannelWriteAsyncClose() throws Exception {
192         VThreadRunner.run(() -> {

193             boolean retry = true;
194             while (retry) {
195                 try (var connection = new Connection()) {
196                     SocketChannel sc = connection.channel1();
197 
198                     // close sc when current thread blocks in write
199                     runAfterParkedAsync(sc::close);
200                     try {
201                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
202                         for (;;) {
203                             int n = sc.write(bb);
204                             assertTrue(n > 0);
205                             bb.clear();
206                         }
207                     } catch (AsynchronousCloseException expected) {
208                         // closed when blocked in write
209                         retry = false;
210                     } catch (ClosedChannelException e) {
211                         // closed when not blocked in write, need to retry test
212                     }
213                 }
214             }
215         });
216     }
217 




























218     /**
219      * Virtual thread interrupted while blocked in SocketChannel write.
220      */
221     @Test
222     void testSocketChannelWriteInterrupt() throws Exception {
223         VThreadRunner.run(() -> {

224             boolean retry = true;
225             while (retry) {
226                 try (var connection = new Connection()) {
227                     SocketChannel sc = connection.channel1();
228 
229                     // interrupt current thread when it blocks in write
230                     Thread thisThread = Thread.currentThread();
231                     runAfterParkedAsync(thisThread::interrupt);
232 
233                     try {
234                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
235                         for (;;) {
236                             int n = sc.write(bb);
237                             assertTrue(n > 0);
238                             bb.clear();
239                         }
240                     } catch (ClosedByInterruptException e) {
241                         // closed when blocked in write
242                         assertTrue(Thread.interrupted());
243                         retry = false;
244                     } catch (ClosedChannelException e) {
245                         // closed when not blocked in write, need to retry test
246                     }
247                 }
248             }
249         });
250     }
251 
252     /**
253      * Virtual thread blocks in SocketChannel adaptor read.
254      */
255     @Test
256     void testSocketAdaptorRead1() throws Exception {
257         testSocketAdaptorRead(0);

258     }
259 
260     /**
261      * Virtual thread blocks in SocketChannel adaptor read with timeout.
262      */
263     @Test
264     void testSocketAdaptorRead2() throws Exception {
265         testSocketAdaptorRead(60_000);

266     }
267 
268     private void testSocketAdaptorRead(int timeout) throws Exception {
269         VThreadRunner.run(() -> {

270             try (var connection = new Connection()) {
271                 SocketChannel sc1 = connection.channel1();
272                 SocketChannel sc2 = connection.channel2();
273 
274                 // write to sc1 when currnet thread blocks reading from sc2
275                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
276                 runAfterParkedAsync(() -> sc1.write(bb));
277 
278                 // read from sc2 should block
279                 byte[] array = new byte[100];
280                 if (timeout > 0)
281                     sc2.socket().setSoTimeout(timeout);
282                 int n = sc2.socket().getInputStream().read(array);
283                 assertTrue(n > 0);
284                 assertTrue(array[0] == 'X');
285             }
286         });
287     }
288 
289     /**
290      * ServerSocketChannel accept, no blocking.
291      */
292     @Test
293     void testServerSocketChannelAccept1() throws Exception {
294         VThreadRunner.run(() -> {

295             try (var ssc = ServerSocketChannel.open()) {
296                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
297                 var sc1 = SocketChannel.open(ssc.getLocalAddress());
298                 // accept should not block
299                 var sc2 = ssc.accept();
300                 sc1.close();
301                 sc2.close();
302             }
303         });
304     }
305 
306     /**
307      * Virtual thread blocks in ServerSocketChannel accept.
308      */
309     @Test
310     void testServerSocketChannelAccept2() throws Exception {
311         VThreadRunner.run(() -> {

312             try (var ssc = ServerSocketChannel.open()) {
313                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
314                 var sc1 = SocketChannel.open();
315 
316                 // connect when current thread when it blocks in accept
317                 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
318 
319                 // accept should block
320                 var sc2 = ssc.accept();
321                 sc1.close();
322                 sc2.close();
323             }
324         });
325     }
326 
327     /**
328      * SeverSocketChannel close while virtual thread blocked in accept.
329      */
330     @Test
331     void testServerSocketChannelAcceptAsyncClose() throws Exception {
332         VThreadRunner.run(() -> {

333             try (var ssc = ServerSocketChannel.open()) {
334                 InetAddress lh = InetAddress.getLoopbackAddress();
335                 ssc.bind(new InetSocketAddress(lh, 0));
336                 runAfterParkedAsync(ssc::close);
337                 try {
338                     SocketChannel sc = ssc.accept();
339                     sc.close();
340                     fail("connection accepted???");
341                 } catch (AsynchronousCloseException expected) { }
342             }
343         });
344     }
345 
346     /**
347      * Virtual thread interrupted while blocked in ServerSocketChannel accept.
348      */
349     @Test
350     void testServerSocketChannelAcceptInterrupt() throws Exception {
351         VThreadRunner.run(() -> {

352             try (var ssc = ServerSocketChannel.open()) {
353                 InetAddress lh = InetAddress.getLoopbackAddress();
354                 ssc.bind(new InetSocketAddress(lh, 0));
355 
356                 // interrupt current thread when it blocks in accept
357                 Thread thisThread = Thread.currentThread();
358                 runAfterParkedAsync(thisThread::interrupt);
359 
360                 try {
361                     SocketChannel sc = ssc.accept();
362                     sc.close();
363                     fail("connection accepted???");
364                 } catch (ClosedByInterruptException expected) {
365                     assertTrue(Thread.interrupted());
366                 }
367             }
368         });
369     }
370 
371     /**
372      * Virtual thread blocks in ServerSocketChannel adaptor accept.
373      */
374     @Test
375     void testSocketChannelAdaptorAccept1() throws Exception {
376         testSocketChannelAdaptorAccept(0);

377     }
378 
379     /**
380      * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
381      */
382     @Test
383     void testSocketChannelAdaptorAccept2() throws Exception {
384         testSocketChannelAdaptorAccept(60_000);

385     }
386 
387     private void testSocketChannelAdaptorAccept(int timeout) throws Exception {
388         VThreadRunner.run(() -> {

389             try (var ssc = ServerSocketChannel.open()) {
390                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
391                 var sc = SocketChannel.open();
392 
393                 // interrupt current thread when it blocks in accept
394                 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
395 
396                 // accept should block
397                 if (timeout > 0)
398                     ssc.socket().setSoTimeout(timeout);
399                 Socket s = ssc.socket().accept();
400                 sc.close();
401                 s.close();
402             }
403         });
404     }
405 
406     /**
407      * DatagramChannel receive/send, no blocking.
408      */
409     @Test
410     void testDatagramChannelSendReceive1() throws Exception {
411         VThreadRunner.run(() -> {

412             try (DatagramChannel dc1 = DatagramChannel.open();
413                  DatagramChannel dc2 = DatagramChannel.open()) {
414 
415                 InetAddress lh = InetAddress.getLoopbackAddress();
416                 dc2.bind(new InetSocketAddress(lh, 0));
417 
418                 // send should not block
419                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
420                 int n = dc1.send(bb, dc2.getLocalAddress());
421                 assertTrue(n > 0);
422 
423                 // receive should not block
424                 bb = ByteBuffer.allocate(10);
425                 dc2.receive(bb);
426                 assertTrue(bb.get(0) == 'X');
427             }
428         });
429     }
430 
431     /**
432      * Virtual thread blocks in DatagramChannel receive.
433      */
434     @Test
435     void testDatagramChannelSendReceive2() throws Exception {
436         VThreadRunner.run(() -> {

437             try (DatagramChannel dc1 = DatagramChannel.open();
438                  DatagramChannel dc2 = DatagramChannel.open()) {
439 
440                 InetAddress lh = InetAddress.getLoopbackAddress();
441                 dc2.bind(new InetSocketAddress(lh, 0));
442 
443                 // send from dc1 when current thread blocked in dc2.receive
444                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
445                 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
446 
447                 // read from dc2 should block
448                 ByteBuffer bb2 = ByteBuffer.allocate(10);
449                 dc2.receive(bb2);
450                 assertTrue(bb2.get(0) == 'X');
451             }
452         });
453     }
454 
455     /**
456      * DatagramChannel close while virtual thread blocked in receive.
457      */
458     @Test
459     void testDatagramChannelReceiveAsyncClose() throws Exception {
460         VThreadRunner.run(() -> {

461             try (DatagramChannel dc = DatagramChannel.open()) {
462                 InetAddress lh = InetAddress.getLoopbackAddress();
463                 dc.bind(new InetSocketAddress(lh, 0));
464                 runAfterParkedAsync(dc::close);
465                 try {
466                     dc.receive(ByteBuffer.allocate(100));
467                     fail("receive returned");
468                 } catch (AsynchronousCloseException expected) { }
469             }
470         });
471     }
472 
473     /**
474      * Virtual thread interrupted while blocked in DatagramChannel receive.
475      */
476     @Test
477     void testDatagramChannelReceiveInterrupt() throws Exception {
478         VThreadRunner.run(() -> {

479             try (DatagramChannel dc = DatagramChannel.open()) {
480                 InetAddress lh = InetAddress.getLoopbackAddress();
481                 dc.bind(new InetSocketAddress(lh, 0));
482 
483                 // interrupt current thread when it blocks in receive
484                 Thread thisThread = Thread.currentThread();
485                 runAfterParkedAsync(thisThread::interrupt);
486 
487                 try {
488                     dc.receive(ByteBuffer.allocate(100));
489                     fail("receive returned");
490                 } catch (ClosedByInterruptException expected) {
491                     assertTrue(Thread.interrupted());
492                 }
493             }
494         });
495     }
496 
497     /**
498      * Virtual thread blocks in DatagramSocket adaptor receive.
499      */
500     @Test
501     void testDatagramSocketAdaptorReceive1() throws Exception {
502         testDatagramSocketAdaptorReceive(0);

503     }
504 
505     /**
506      * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
507      */
508     @Test
509     void testDatagramSocketAdaptorReceive2() throws Exception {
510         testDatagramSocketAdaptorReceive(60_000);

511     }
512 
513     private void testDatagramSocketAdaptorReceive(int timeout) throws Exception {
514         VThreadRunner.run(() -> {

515             try (DatagramChannel dc1 = DatagramChannel.open();
516                  DatagramChannel dc2 = DatagramChannel.open()) {
517 
518                 InetAddress lh = InetAddress.getLoopbackAddress();
519                 dc2.bind(new InetSocketAddress(lh, 0));
520 
521                 // send from dc1 when current thread blocks in dc2 receive
522                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
523                 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
524 
525                 // receive should block
526                 byte[] array = new byte[100];
527                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
528                 if (timeout > 0)
529                     dc2.socket().setSoTimeout(timeout);
530                 dc2.socket().receive(p);
531                 assertTrue(p.getLength() == 3 && array[0] == 'X');
532             }
533         });
534     }
535 
536     /**
537      * DatagramChannel close while virtual thread blocked in adaptor receive.
538      */
539     @Test
540     void testDatagramSocketAdaptorReceiveAsyncClose1() throws Exception {
541         testDatagramSocketAdaptorReceiveAsyncClose(0);

542     }
543 
544     /**
545      * DatagramChannel close while virtual thread blocked in adaptor receive
546      * with timeout.
547      */
548     @Test
549     void testDatagramSocketAdaptorReceiveAsyncClose2() throws Exception {
550         testDatagramSocketAdaptorReceiveAsyncClose(60_1000);

551     }
552 
553     private void testDatagramSocketAdaptorReceiveAsyncClose(int timeout) throws Exception {
554         VThreadRunner.run(() -> {

555             try (DatagramChannel dc = DatagramChannel.open()) {
556                 InetAddress lh = InetAddress.getLoopbackAddress();
557                 dc.bind(new InetSocketAddress(lh, 0));
558 
559                 byte[] array = new byte[100];
560                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
561                 if (timeout > 0)
562                     dc.socket().setSoTimeout(timeout);
563 
564                 // close channel/socket when current thread blocks in receive
565                 runAfterParkedAsync(dc::close);
566 
567                 assertThrows(SocketException.class, () -> dc.socket().receive(p));
568             }
569         });
570     }
571 
572     /**
573      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
574      */
575     @Test
576     void testDatagramSocketAdaptorReceiveInterrupt1() throws Exception {
577         testDatagramSocketAdaptorReceiveInterrupt(0);

578     }
579 
580     /**
581      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
582      * with timeout.
583      */
584     @Test
585     void testDatagramSocketAdaptorReceiveInterrupt2() throws Exception {
586         testDatagramSocketAdaptorReceiveInterrupt(60_1000);

587     }
588 
589     private void testDatagramSocketAdaptorReceiveInterrupt(int timeout) throws Exception {
590         VThreadRunner.run(() -> {

591             try (DatagramChannel dc = DatagramChannel.open()) {
592                 InetAddress lh = InetAddress.getLoopbackAddress();
593                 dc.bind(new InetSocketAddress(lh, 0));
594 
595                 byte[] array = new byte[100];
596                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
597                 if (timeout > 0)
598                     dc.socket().setSoTimeout(timeout);
599 
600                 // interrupt current thread when it blocks in receive
601                 Thread thisThread = Thread.currentThread();
602                 runAfterParkedAsync(thisThread::interrupt);
603 
604                 try {
605                     dc.socket().receive(p);
606                     fail();
607                 } catch (ClosedByInterruptException expected) {
608                     assertTrue(Thread.interrupted());
609                 }
610             }
611         });
612     }
613 
614     /**
615      * Pipe read/write, no blocking.
616      */
617     @Test
618     void testPipeReadWrite1() throws Exception {
619         VThreadRunner.run(() -> {

620             Pipe p = Pipe.open();
621             try (Pipe.SinkChannel sink = p.sink();
622                  Pipe.SourceChannel source = p.source()) {
623 
624                 // write should not block
625                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
626                 int n = sink.write(bb);
627                 assertTrue(n > 0);
628 
629                 // read should not block
630                 bb = ByteBuffer.allocate(10);
631                 n = source.read(bb);
632                 assertTrue(n > 0);
633                 assertTrue(bb.get(0) == 'X');
634             }
635         });
636     }
637 
638     /**
639      * Virtual thread blocks in Pipe.SourceChannel read.
640      */
641     @Test
642     void testPipeReadWrite2() throws Exception {
643         VThreadRunner.run(() -> {

644             Pipe p = Pipe.open();
645             try (Pipe.SinkChannel sink = p.sink();
646                  Pipe.SourceChannel source = p.source()) {
647 
648                 // write from sink when current thread blocks reading from source
649                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
650                 runAfterParkedAsync(() -> sink.write(bb1));
651 
652                 // read should block
653                 ByteBuffer bb2 = ByteBuffer.allocate(10);
654                 int n = source.read(bb2);
655                 assertTrue(n > 0);
656                 assertTrue(bb2.get(0) == 'X');
657             }
658         });
659     }
660 
661     /**
662      * Virtual thread blocks in Pipe.SinkChannel write.
663      */
664     @Test
665     void testPipeReadWrite3() throws Exception {
666         VThreadRunner.run(() -> {

667             Pipe p = Pipe.open();
668             try (Pipe.SinkChannel sink = p.sink();
669                  Pipe.SourceChannel source = p.source()) {
670 
671                 // read from source to EOF when current thread blocking in write
672                 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
673 
674                 // write to sink should block
675                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
676                 for (int i=0; i<1000; i++) {
677                     int n = sink.write(bb);
678                     assertTrue(n > 0);
679                     bb.clear();
680                 }
681                 sink.close();
682 
683                 // wait for reader to finish
684                 reader.join();
685             }
686         });
687     }
688 
689     /**
690      * Pipe.SourceChannel close while virtual thread blocked in read.
691      */
692     @Test
693     void testPipeReadAsyncClose() throws Exception {
694         VThreadRunner.run(() -> {

695             Pipe p = Pipe.open();
696             try (Pipe.SinkChannel sink = p.sink();
697                  Pipe.SourceChannel source = p.source()) {
698                 runAfterParkedAsync(source::close);
699                 try {
700                     int n = source.read(ByteBuffer.allocate(100));
701                     fail("read returned " + n);
702                 } catch (AsynchronousCloseException expected) { }
703             }
704         });
705     }
706 
707     /**
708      * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
709      */
710     @Test
711     void testPipeReadInterrupt() throws Exception {
712         VThreadRunner.run(() -> {

713             Pipe p = Pipe.open();
714             try (Pipe.SinkChannel sink = p.sink();
715                  Pipe.SourceChannel source = p.source()) {
716 
717                 // interrupt current thread when it blocks reading from source
718                 Thread thisThread = Thread.currentThread();
719                 runAfterParkedAsync(thisThread::interrupt);
720 
721                 try {
722                     int n = source.read(ByteBuffer.allocate(100));
723                     fail("read returned " + n);
724                 } catch (ClosedByInterruptException expected) {
725                     assertTrue(Thread.interrupted());
726                 }
727             }
728         });
729     }
730 
731     /**
732      * Pipe.SinkChannel close while virtual thread blocked in write.
733      */
734     @Test
735     void testPipeWriteAsyncClose() throws Exception {
736         VThreadRunner.run(() -> {

737             boolean retry = true;
738             while (retry) {
739                 Pipe p = Pipe.open();
740                 try (Pipe.SinkChannel sink = p.sink();
741                      Pipe.SourceChannel source = p.source()) {
742 
743                     // close sink when current thread blocks in write
744                     runAfterParkedAsync(sink::close);
745                     try {
746                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
747                         for (;;) {
748                             int n = sink.write(bb);
749                             assertTrue(n > 0);
750                             bb.clear();
751                         }
752                     } catch (AsynchronousCloseException e) {
753                         // closed when blocked in write
754                         retry = false;
755                     } catch (ClosedChannelException e) {
756                         // closed when not blocked in write, need to retry test
757                     }
758                 }
759             }
760         });
761     }
762 
763     /**
764      * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
765      */
766     @Test
767     void testPipeWriteInterrupt() throws Exception {
768         VThreadRunner.run(() -> {

769             boolean retry = true;
770             while (retry) {
771                 Pipe p = Pipe.open();
772                 try (Pipe.SinkChannel sink = p.sink();
773                      Pipe.SourceChannel source = p.source()) {
774 
775                     // interrupt current thread when it blocks in write
776                     Thread thisThread = Thread.currentThread();
777                     runAfterParkedAsync(thisThread::interrupt);
778 
779                     try {
780                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
781                         for (;;) {
782                             int n = sink.write(bb);
783                             assertTrue(n > 0);
784                             bb.clear();
785                         }
786                     } catch (ClosedByInterruptException expected) {
787                         // closed when blocked in write
788                         assertTrue(Thread.interrupted());

 45  */
 46 
 47 import java.io.Closeable;
 48 import java.io.IOException;
 49 import java.net.DatagramPacket;
 50 import java.net.InetAddress;
 51 import java.net.InetSocketAddress;
 52 import java.net.Socket;
 53 import java.net.SocketAddress;
 54 import java.net.SocketException;
 55 import java.nio.ByteBuffer;
 56 import java.nio.channels.AsynchronousCloseException;
 57 import java.nio.channels.ClosedByInterruptException;
 58 import java.nio.channels.ClosedChannelException;
 59 import java.nio.channels.DatagramChannel;
 60 import java.nio.channels.Pipe;
 61 import java.nio.channels.ReadableByteChannel;
 62 import java.nio.channels.ServerSocketChannel;
 63 import java.nio.channels.SocketChannel;
 64 import java.nio.channels.WritableByteChannel;
 65 import java.util.concurrent.ExecutorService;
 66 import java.util.concurrent.Executors;
 67 import java.util.stream.Stream;
 68 
 69 import jdk.test.lib.thread.VThreadRunner;
 70 import jdk.test.lib.thread.VThreadScheduler;
 71 
 72 import org.junit.jupiter.api.BeforeAll;
 73 import org.junit.jupiter.api.AfterAll;
 74 import org.junit.jupiter.params.ParameterizedTest;
 75 import org.junit.jupiter.params.provider.MethodSource;
 76 import static org.junit.jupiter.api.Assertions.*;
 77 
 78 class BlockingChannelOps {
 79     private static ExecutorService threadPool;
 80     private static Thread.VirtualThreadScheduler customScheduler;
 81 
 82     @BeforeAll
 83     static void setup() {
 84         threadPool = Executors.newCachedThreadPool();
 85         customScheduler = (_, task) -> threadPool.execute(task);
 86     }
 87 
 88     @AfterAll
 89     static void finish() {
 90         threadPool.shutdown();
 91     }
 92 
 93     /**
 94      * Returns the Thread.Builder to create the virtual thread.
 95      */
 96     static Stream<Thread.Builder.OfVirtual> threadBuilders() {
 97         if (VThreadScheduler.supportsCustomScheduler()) {
 98             return Stream.of(Thread.ofVirtual(), Thread.ofVirtual().scheduler(customScheduler));
 99         } else {
100             return Stream.of(Thread.ofVirtual());
101         }
102     }
103 
104     /**
105      * SocketChannel read/write, no blocking.
106      */
107     @ParameterizedTest
108     @MethodSource("threadBuilders")
109     void testSocketChannelReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
110         VThreadRunner.run(builder, () -> {
111             try (var connection = new Connection()) {
112                 SocketChannel sc1 = connection.channel1();
113                 SocketChannel sc2 = connection.channel2();
114 
115                 // write to sc1
116                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
117                 int n = sc1.write(bb);
118                 assertTrue(n > 0);
119 
120                 // read from sc2 should not block
121                 bb = ByteBuffer.allocate(10);
122                 n = sc2.read(bb);
123                 assertTrue(n > 0);
124                 assertTrue(bb.get(0) == 'X');
125             }
126         });
127     }
128 
129     /**
130      * Virtual thread blocks in SocketChannel read.
131      */
132     @ParameterizedTest
133     @MethodSource("threadBuilders")
134     void testSocketChannelRead(Thread.Builder.OfVirtual builder) throws Exception {
135         VThreadRunner.run(builder, () -> {
136             try (var connection = new Connection()) {
137                 SocketChannel sc1 = connection.channel1();
138                 SocketChannel sc2 = connection.channel2();
139 
140                 // write to sc1 when current thread blocks in sc2.read
141                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
142                 runAfterParkedAsync(() -> sc1.write(bb1));
143 
144                 // read from sc2 should block
145                 ByteBuffer bb2 = ByteBuffer.allocate(10);
146                 int n = sc2.read(bb2);
147                 assertTrue(n > 0);
148                 assertTrue(bb2.get(0) == 'X');
149             }
150         });
151     }
152 
153     /**
154      * Virtual thread blocks in SocketChannel write.
155      */
156     @ParameterizedTest
157     @MethodSource("threadBuilders")
158     void testSocketChannelWrite(Thread.Builder.OfVirtual builder) throws Exception {
159         VThreadRunner.run(builder, () -> {
160             try (var connection = new Connection()) {
161                 SocketChannel sc1 = connection.channel1();
162                 SocketChannel sc2 = connection.channel2();
163 
164                 // read from sc2 to EOF when current thread blocks in sc1.write
165                 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
166 
167                 // write to sc1 should block
168                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
169                 for (int i=0; i<1000; i++) {
170                     int n = sc1.write(bb);
171                     assertTrue(n > 0);
172                     bb.clear();
173                 }
174                 sc1.close();
175 
176                 // wait for reader to finish
177                 reader.join();
178             }
179         });
180     }
181 
182     /**
183      * SocketChannel close while virtual thread blocked in read.
184      */
185     @ParameterizedTest
186     @MethodSource("threadBuilders")
187     void testSocketChannelReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
188         VThreadRunner.run(builder, () -> {
189             try (var connection = new Connection()) {
190                 SocketChannel sc = connection.channel1();
191                 runAfterParkedAsync(sc::close);
192                 try {
193                     int n = sc.read(ByteBuffer.allocate(100));
194                     fail("read returned " + n);
195                 } catch (AsynchronousCloseException expected) { }
196             }
197         });
198     }
199 
200     /**
201      * SocketChannel shutdownInput while virtual thread blocked in read.
202      */
203     @ParameterizedTest
204     @MethodSource("threadBuilders")
205     void testSocketChannelReadAsyncShutdownInput(Thread.Builder.OfVirtual builder) throws Exception {
206         VThreadRunner.run(builder, () -> {
207             try (var connection = new Connection()) {
208                 SocketChannel sc = connection.channel1();
209                 runAfterParkedAsync(sc::shutdownInput);
210                 int n = sc.read(ByteBuffer.allocate(100));
211                 assertEquals(-1, n);
212                 assertTrue(sc.isOpen());
213             }
214         });
215     }
216 
217     /**
218      * Virtual thread interrupted while blocked in SocketChannel read.
219      */
220     @ParameterizedTest
221     @MethodSource("threadBuilders")
222     void testSocketChannelReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
223         VThreadRunner.run(builder, () -> {
224             try (var connection = new Connection()) {
225                 SocketChannel sc = connection.channel1();
226 
227                 // interrupt current thread when it blocks in read
228                 Thread thisThread = Thread.currentThread();
229                 runAfterParkedAsync(thisThread::interrupt);
230 
231                 try {
232                     int n = sc.read(ByteBuffer.allocate(100));
233                     fail("read returned " + n);
234                 } catch (ClosedByInterruptException expected) {
235                     assertTrue(Thread.interrupted());
236                 }
237             }
238         });
239     }
240 
241     /**
242      * SocketChannel close while virtual thread blocked in write.
243      */
244     @ParameterizedTest
245     @MethodSource("threadBuilders")
246     void testSocketChannelWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
247         VThreadRunner.run(builder, () -> {
248             boolean retry = true;
249             while (retry) {
250                 try (var connection = new Connection()) {
251                     SocketChannel sc = connection.channel1();
252 
253                     // close sc when current thread blocks in write
254                     runAfterParkedAsync(sc::close);
255                     try {
256                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
257                         for (;;) {
258                             int n = sc.write(bb);
259                             assertTrue(n > 0);
260                             bb.clear();
261                         }
262                     } catch (AsynchronousCloseException expected) {
263                         // closed when blocked in write
264                         retry = false;
265                     } catch (ClosedChannelException e) {
266                         // closed when not blocked in write, need to retry test
267                     }
268                 }
269             }
270         });
271     }
272 
273 
274     /**
275      * SocketChannel shutdownOutput while virtual thread blocked in write.
276      */
277     @ParameterizedTest
278     @MethodSource("threadBuilders")
279     void testSocketChannelWriteAsyncShutdownOutput(Thread.Builder.OfVirtual builder) throws Exception {
280         VThreadRunner.run(builder, () -> {
281             try (var connection = new Connection()) {
282                 SocketChannel sc = connection.channel1();
283 
284                 // shutdown output when current thread blocks in write
285                 runAfterParkedAsync(sc::shutdownOutput);
286                 try {
287                     ByteBuffer bb = ByteBuffer.allocate(100*1024);
288                     for (;;) {
289                         int n = sc.write(bb);
290                         assertTrue(n > 0);
291                         bb.clear();
292                     }
293                 } catch (ClosedChannelException e) {
294                     // expected
295                 }
296                 assertTrue(sc.isOpen());
297             }
298         });
299     }
300 
301     /**
302      * Virtual thread interrupted while blocked in SocketChannel write.
303      */
304     @ParameterizedTest
305     @MethodSource("threadBuilders")
306     void testSocketChannelWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
307         VThreadRunner.run(builder, () -> {
308             boolean retry = true;
309             while (retry) {
310                 try (var connection = new Connection()) {
311                     SocketChannel sc = connection.channel1();
312 
313                     // interrupt current thread when it blocks in write
314                     Thread thisThread = Thread.currentThread();
315                     runAfterParkedAsync(thisThread::interrupt);
316 
317                     try {
318                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
319                         for (;;) {
320                             int n = sc.write(bb);
321                             assertTrue(n > 0);
322                             bb.clear();
323                         }
324                     } catch (ClosedByInterruptException e) {
325                         // closed when blocked in write
326                         assertTrue(Thread.interrupted());
327                         retry = false;
328                     } catch (ClosedChannelException e) {
329                         // closed when not blocked in write, need to retry test
330                     }
331                 }
332             }
333         });
334     }
335 
336     /**
337      * Virtual thread blocks in SocketChannel adaptor read.
338      */
339     @ParameterizedTest
340     @MethodSource("threadBuilders")
341     void testSocketAdaptorRead1(Thread.Builder.OfVirtual builder) throws Exception {
342         testSocketAdaptorRead(builder, 0);
343     }
344 
345     /**
346      * Virtual thread blocks in SocketChannel adaptor read with timeout.
347      */
348     @ParameterizedTest
349     @MethodSource("threadBuilders")
350     void testSocketAdaptorRead2(Thread.Builder.OfVirtual builder) throws Exception {
351         testSocketAdaptorRead(builder, 60_000);
352     }
353 
354     private void testSocketAdaptorRead(Thread.Builder.OfVirtual builder,
355                                        int timeout) throws Exception {
356         VThreadRunner.run(builder, () -> {
357             try (var connection = new Connection()) {
358                 SocketChannel sc1 = connection.channel1();
359                 SocketChannel sc2 = connection.channel2();
360 
361                 // write to sc1 when currnet thread blocks reading from sc2
362                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
363                 runAfterParkedAsync(() -> sc1.write(bb));
364 
365                 // read from sc2 should block
366                 byte[] array = new byte[100];
367                 if (timeout > 0)
368                     sc2.socket().setSoTimeout(timeout);
369                 int n = sc2.socket().getInputStream().read(array);
370                 assertTrue(n > 0);
371                 assertTrue(array[0] == 'X');
372             }
373         });
374     }
375 
376     /**
377      * ServerSocketChannel accept, no blocking.
378      */
379     @ParameterizedTest
380     @MethodSource("threadBuilders")
381     void testServerSocketChannelAccept1(Thread.Builder.OfVirtual builder) throws Exception {
382         VThreadRunner.run(builder, () -> {
383             try (var ssc = ServerSocketChannel.open()) {
384                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
385                 var sc1 = SocketChannel.open(ssc.getLocalAddress());
386                 // accept should not block
387                 var sc2 = ssc.accept();
388                 sc1.close();
389                 sc2.close();
390             }
391         });
392     }
393 
394     /**
395      * Virtual thread blocks in ServerSocketChannel accept.
396      */
397     @ParameterizedTest
398     @MethodSource("threadBuilders")
399     void testServerSocketChannelAccept2(Thread.Builder.OfVirtual builder) throws Exception {
400         VThreadRunner.run(builder, () -> {
401             try (var ssc = ServerSocketChannel.open()) {
402                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
403                 var sc1 = SocketChannel.open();
404 
405                 // connect when current thread when it blocks in accept
406                 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
407 
408                 // accept should block
409                 var sc2 = ssc.accept();
410                 sc1.close();
411                 sc2.close();
412             }
413         });
414     }
415 
416     /**
417      * SeverSocketChannel close while virtual thread blocked in accept.
418      */
419     @ParameterizedTest
420     @MethodSource("threadBuilders")
421     void testServerSocketChannelAcceptAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
422         VThreadRunner.run(builder, () -> {
423             try (var ssc = ServerSocketChannel.open()) {
424                 InetAddress lh = InetAddress.getLoopbackAddress();
425                 ssc.bind(new InetSocketAddress(lh, 0));
426                 runAfterParkedAsync(ssc::close);
427                 try {
428                     SocketChannel sc = ssc.accept();
429                     sc.close();
430                     fail("connection accepted???");
431                 } catch (AsynchronousCloseException expected) { }
432             }
433         });
434     }
435 
436     /**
437      * Virtual thread interrupted while blocked in ServerSocketChannel accept.
438      */
439     @ParameterizedTest
440     @MethodSource("threadBuilders")
441     void testServerSocketChannelAcceptInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
442         VThreadRunner.run(builder, () -> {
443             try (var ssc = ServerSocketChannel.open()) {
444                 InetAddress lh = InetAddress.getLoopbackAddress();
445                 ssc.bind(new InetSocketAddress(lh, 0));
446 
447                 // interrupt current thread when it blocks in accept
448                 Thread thisThread = Thread.currentThread();
449                 runAfterParkedAsync(thisThread::interrupt);
450 
451                 try {
452                     SocketChannel sc = ssc.accept();
453                     sc.close();
454                     fail("connection accepted???");
455                 } catch (ClosedByInterruptException expected) {
456                     assertTrue(Thread.interrupted());
457                 }
458             }
459         });
460     }
461 
462     /**
463      * Virtual thread blocks in ServerSocketChannel adaptor accept.
464      */
465     @ParameterizedTest
466     @MethodSource("threadBuilders")
467     void testSocketChannelAdaptorAccept1(Thread.Builder.OfVirtual builder) throws Exception {
468         testSocketChannelAdaptorAccept(builder, 0);
469     }
470 
471     /**
472      * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
473      */
474     @ParameterizedTest
475     @MethodSource("threadBuilders")
476     void testSocketChannelAdaptorAccept2(Thread.Builder.OfVirtual builder) throws Exception {
477         testSocketChannelAdaptorAccept(builder, 60_000);
478     }
479 
480     private void testSocketChannelAdaptorAccept(Thread.Builder.OfVirtual builder,
481                                                 int timeout) throws Exception {
482         VThreadRunner.run(builder, () -> {
483             try (var ssc = ServerSocketChannel.open()) {
484                 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
485                 var sc = SocketChannel.open();
486 
487                 // interrupt current thread when it blocks in accept
488                 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
489 
490                 // accept should block
491                 if (timeout > 0)
492                     ssc.socket().setSoTimeout(timeout);
493                 Socket s = ssc.socket().accept();
494                 sc.close();
495                 s.close();
496             }
497         });
498     }
499 
500     /**
501      * DatagramChannel receive/send, no blocking.
502      */
503     @ParameterizedTest
504     @MethodSource("threadBuilders")
505     void testDatagramChannelSendReceive1(Thread.Builder.OfVirtual builder) throws Exception {
506         VThreadRunner.run(builder, () -> {
507             try (DatagramChannel dc1 = DatagramChannel.open();
508                  DatagramChannel dc2 = DatagramChannel.open()) {
509 
510                 InetAddress lh = InetAddress.getLoopbackAddress();
511                 dc2.bind(new InetSocketAddress(lh, 0));
512 
513                 // send should not block
514                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
515                 int n = dc1.send(bb, dc2.getLocalAddress());
516                 assertTrue(n > 0);
517 
518                 // receive should not block
519                 bb = ByteBuffer.allocate(10);
520                 dc2.receive(bb);
521                 assertTrue(bb.get(0) == 'X');
522             }
523         });
524     }
525 
526     /**
527      * Virtual thread blocks in DatagramChannel receive.
528      */
529     @ParameterizedTest
530     @MethodSource("threadBuilders")
531     void testDatagramChannelSendReceive2(Thread.Builder.OfVirtual builder) throws Exception {
532         VThreadRunner.run(builder, () -> {
533             try (DatagramChannel dc1 = DatagramChannel.open();
534                  DatagramChannel dc2 = DatagramChannel.open()) {
535 
536                 InetAddress lh = InetAddress.getLoopbackAddress();
537                 dc2.bind(new InetSocketAddress(lh, 0));
538 
539                 // send from dc1 when current thread blocked in dc2.receive
540                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
541                 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
542 
543                 // read from dc2 should block
544                 ByteBuffer bb2 = ByteBuffer.allocate(10);
545                 dc2.receive(bb2);
546                 assertTrue(bb2.get(0) == 'X');
547             }
548         });
549     }
550 
551     /**
552      * DatagramChannel close while virtual thread blocked in receive.
553      */
554     @ParameterizedTest
555     @MethodSource("threadBuilders")
556     void testDatagramChannelReceiveAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
557         VThreadRunner.run(builder, () -> {
558             try (DatagramChannel dc = DatagramChannel.open()) {
559                 InetAddress lh = InetAddress.getLoopbackAddress();
560                 dc.bind(new InetSocketAddress(lh, 0));
561                 runAfterParkedAsync(dc::close);
562                 try {
563                     dc.receive(ByteBuffer.allocate(100));
564                     fail("receive returned");
565                 } catch (AsynchronousCloseException expected) { }
566             }
567         });
568     }
569 
570     /**
571      * Virtual thread interrupted while blocked in DatagramChannel receive.
572      */
573     @ParameterizedTest
574     @MethodSource("threadBuilders")
575     void testDatagramChannelReceiveInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
576         VThreadRunner.run(builder, () -> {
577             try (DatagramChannel dc = DatagramChannel.open()) {
578                 InetAddress lh = InetAddress.getLoopbackAddress();
579                 dc.bind(new InetSocketAddress(lh, 0));
580 
581                 // interrupt current thread when it blocks in receive
582                 Thread thisThread = Thread.currentThread();
583                 runAfterParkedAsync(thisThread::interrupt);
584 
585                 try {
586                     dc.receive(ByteBuffer.allocate(100));
587                     fail("receive returned");
588                 } catch (ClosedByInterruptException expected) {
589                     assertTrue(Thread.interrupted());
590                 }
591             }
592         });
593     }
594 
595     /**
596      * Virtual thread blocks in DatagramSocket adaptor receive.
597      */
598     @ParameterizedTest
599     @MethodSource("threadBuilders")
600     void testDatagramSocketAdaptorReceive1(Thread.Builder.OfVirtual builder) throws Exception {
601         testDatagramSocketAdaptorReceive(builder, 0);
602     }
603 
604     /**
605      * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
606      */
607     @ParameterizedTest
608     @MethodSource("threadBuilders")
609     void testDatagramSocketAdaptorReceive2(Thread.Builder.OfVirtual builder) throws Exception {
610         testDatagramSocketAdaptorReceive(builder, 60_000);
611     }
612 
613     private void testDatagramSocketAdaptorReceive(Thread.Builder.OfVirtual builder,
614                                                   int timeout) throws Exception {
615         VThreadRunner.run(builder, () -> {
616             try (DatagramChannel dc1 = DatagramChannel.open();
617                  DatagramChannel dc2 = DatagramChannel.open()) {
618 
619                 InetAddress lh = InetAddress.getLoopbackAddress();
620                 dc2.bind(new InetSocketAddress(lh, 0));
621 
622                 // send from dc1 when current thread blocks in dc2 receive
623                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
624                 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
625 
626                 // receive should block
627                 byte[] array = new byte[100];
628                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
629                 if (timeout > 0)
630                     dc2.socket().setSoTimeout(timeout);
631                 dc2.socket().receive(p);
632                 assertTrue(p.getLength() == 3 && array[0] == 'X');
633             }
634         });
635     }
636 
637     /**
638      * DatagramChannel close while virtual thread blocked in adaptor receive.
639      */
640     @ParameterizedTest
641     @MethodSource("threadBuilders")
642     void testDatagramSocketAdaptorReceiveAsyncClose1(Thread.Builder.OfVirtual builder) throws Exception {
643         testDatagramSocketAdaptorReceiveAsyncClose(builder, 0);
644     }
645 
646     /**
647      * DatagramChannel close while virtual thread blocked in adaptor receive
648      * with timeout.
649      */
650     @ParameterizedTest
651     @MethodSource("threadBuilders")
652     void testDatagramSocketAdaptorReceiveAsyncClose2(Thread.Builder.OfVirtual builder) throws Exception {
653         testDatagramSocketAdaptorReceiveAsyncClose(builder, 60_1000);
654     }
655 
656     private void testDatagramSocketAdaptorReceiveAsyncClose(Thread.Builder.OfVirtual builder,
657                                                             int timeout) throws Exception {
658         VThreadRunner.run(builder, () -> {
659             try (DatagramChannel dc = DatagramChannel.open()) {
660                 InetAddress lh = InetAddress.getLoopbackAddress();
661                 dc.bind(new InetSocketAddress(lh, 0));
662 
663                 byte[] array = new byte[100];
664                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
665                 if (timeout > 0)
666                     dc.socket().setSoTimeout(timeout);
667 
668                 // close channel/socket when current thread blocks in receive
669                 runAfterParkedAsync(dc::close);
670 
671                 assertThrows(SocketException.class, () -> dc.socket().receive(p));
672             }
673         });
674     }
675 
676     /**
677      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
678      */
679     @ParameterizedTest
680     @MethodSource("threadBuilders")
681     void testDatagramSocketAdaptorReceiveInterrupt1(Thread.Builder.OfVirtual builder) throws Exception {
682         testDatagramSocketAdaptorReceiveInterrupt(builder, 0);
683     }
684 
685     /**
686      * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
687      * with timeout.
688      */
689     @ParameterizedTest
690     @MethodSource("threadBuilders")
691     void testDatagramSocketAdaptorReceiveInterrupt2(Thread.Builder.OfVirtual builder) throws Exception {
692         testDatagramSocketAdaptorReceiveInterrupt(builder, 60_1000);
693     }
694 
695     private void testDatagramSocketAdaptorReceiveInterrupt(Thread.Builder.OfVirtual builder,
696                                                            int timeout) throws Exception {
697         VThreadRunner.run(builder, () -> {
698             try (DatagramChannel dc = DatagramChannel.open()) {
699                 InetAddress lh = InetAddress.getLoopbackAddress();
700                 dc.bind(new InetSocketAddress(lh, 0));
701 
702                 byte[] array = new byte[100];
703                 DatagramPacket p = new DatagramPacket(array, 0, array.length);
704                 if (timeout > 0)
705                     dc.socket().setSoTimeout(timeout);
706 
707                 // interrupt current thread when it blocks in receive
708                 Thread thisThread = Thread.currentThread();
709                 runAfterParkedAsync(thisThread::interrupt);
710 
711                 try {
712                     dc.socket().receive(p);
713                     fail();
714                 } catch (ClosedByInterruptException expected) {
715                     assertTrue(Thread.interrupted());
716                 }
717             }
718         });
719     }
720 
721     /**
722      * Pipe read/write, no blocking.
723      */
724     @ParameterizedTest
725     @MethodSource("threadBuilders")
726     void testPipeReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
727         VThreadRunner.run(builder, () -> {
728             Pipe p = Pipe.open();
729             try (Pipe.SinkChannel sink = p.sink();
730                  Pipe.SourceChannel source = p.source()) {
731 
732                 // write should not block
733                 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
734                 int n = sink.write(bb);
735                 assertTrue(n > 0);
736 
737                 // read should not block
738                 bb = ByteBuffer.allocate(10);
739                 n = source.read(bb);
740                 assertTrue(n > 0);
741                 assertTrue(bb.get(0) == 'X');
742             }
743         });
744     }
745 
746     /**
747      * Virtual thread blocks in Pipe.SourceChannel read.
748      */
749     @ParameterizedTest
750     @MethodSource("threadBuilders")
751     void testPipeReadWrite2(Thread.Builder.OfVirtual builder) throws Exception {
752         VThreadRunner.run(builder, () -> {
753             Pipe p = Pipe.open();
754             try (Pipe.SinkChannel sink = p.sink();
755                  Pipe.SourceChannel source = p.source()) {
756 
757                 // write from sink when current thread blocks reading from source
758                 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
759                 runAfterParkedAsync(() -> sink.write(bb1));
760 
761                 // read should block
762                 ByteBuffer bb2 = ByteBuffer.allocate(10);
763                 int n = source.read(bb2);
764                 assertTrue(n > 0);
765                 assertTrue(bb2.get(0) == 'X');
766             }
767         });
768     }
769 
770     /**
771      * Virtual thread blocks in Pipe.SinkChannel write.
772      */
773     @ParameterizedTest
774     @MethodSource("threadBuilders")
775     void testPipeReadWrite3(Thread.Builder.OfVirtual builder) throws Exception {
776         VThreadRunner.run(builder, () -> {
777             Pipe p = Pipe.open();
778             try (Pipe.SinkChannel sink = p.sink();
779                  Pipe.SourceChannel source = p.source()) {
780 
781                 // read from source to EOF when current thread blocking in write
782                 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
783 
784                 // write to sink should block
785                 ByteBuffer bb = ByteBuffer.allocate(100*1024);
786                 for (int i=0; i<1000; i++) {
787                     int n = sink.write(bb);
788                     assertTrue(n > 0);
789                     bb.clear();
790                 }
791                 sink.close();
792 
793                 // wait for reader to finish
794                 reader.join();
795             }
796         });
797     }
798 
799     /**
800      * Pipe.SourceChannel close while virtual thread blocked in read.
801      */
802     @ParameterizedTest
803     @MethodSource("threadBuilders")
804     void testPipeReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
805         VThreadRunner.run(builder, () -> {
806             Pipe p = Pipe.open();
807             try (Pipe.SinkChannel sink = p.sink();
808                  Pipe.SourceChannel source = p.source()) {
809                 runAfterParkedAsync(source::close);
810                 try {
811                     int n = source.read(ByteBuffer.allocate(100));
812                     fail("read returned " + n);
813                 } catch (AsynchronousCloseException expected) { }
814             }
815         });
816     }
817 
818     /**
819      * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
820      */
821     @ParameterizedTest
822     @MethodSource("threadBuilders")
823     void testPipeReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
824         VThreadRunner.run(builder, () -> {
825             Pipe p = Pipe.open();
826             try (Pipe.SinkChannel sink = p.sink();
827                  Pipe.SourceChannel source = p.source()) {
828 
829                 // interrupt current thread when it blocks reading from source
830                 Thread thisThread = Thread.currentThread();
831                 runAfterParkedAsync(thisThread::interrupt);
832 
833                 try {
834                     int n = source.read(ByteBuffer.allocate(100));
835                     fail("read returned " + n);
836                 } catch (ClosedByInterruptException expected) {
837                     assertTrue(Thread.interrupted());
838                 }
839             }
840         });
841     }
842 
843     /**
844      * Pipe.SinkChannel close while virtual thread blocked in write.
845      */
846     @ParameterizedTest
847     @MethodSource("threadBuilders")
848     void testPipeWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
849         VThreadRunner.run(builder, () -> {
850             boolean retry = true;
851             while (retry) {
852                 Pipe p = Pipe.open();
853                 try (Pipe.SinkChannel sink = p.sink();
854                      Pipe.SourceChannel source = p.source()) {
855 
856                     // close sink when current thread blocks in write
857                     runAfterParkedAsync(sink::close);
858                     try {
859                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
860                         for (;;) {
861                             int n = sink.write(bb);
862                             assertTrue(n > 0);
863                             bb.clear();
864                         }
865                     } catch (AsynchronousCloseException e) {
866                         // closed when blocked in write
867                         retry = false;
868                     } catch (ClosedChannelException e) {
869                         // closed when not blocked in write, need to retry test
870                     }
871                 }
872             }
873         });
874     }
875 
876     /**
877      * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
878      */
879     @ParameterizedTest
880     @MethodSource("threadBuilders")
881     void testPipeWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
882         VThreadRunner.run(builder, () -> {
883             boolean retry = true;
884             while (retry) {
885                 Pipe p = Pipe.open();
886                 try (Pipe.SinkChannel sink = p.sink();
887                      Pipe.SourceChannel source = p.source()) {
888 
889                     // interrupt current thread when it blocks in write
890                     Thread thisThread = Thread.currentThread();
891                     runAfterParkedAsync(thisThread::interrupt);
892 
893                     try {
894                         ByteBuffer bb = ByteBuffer.allocate(100*1024);
895                         for (;;) {
896                             int n = sink.write(bb);
897                             assertTrue(n > 0);
898                             bb.clear();
899                         }
900                     } catch (ClosedByInterruptException expected) {
901                         // closed when blocked in write
902                         assertTrue(Thread.interrupted());
< prev index next >