9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24 /*
25 * @test id=default
26 * @bug 8284161
27 * @summary Test virtual threads doing blocking I/O on NIO channels
28 * @library /test/lib
29 * @run junit/timeout=480 BlockingChannelOps
30 */
31
32 /*
33 * @test id=poller-modes
34 * @requires (os.family == "linux") | (os.family == "mac")
35 * @library /test/lib
36 * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 BlockingChannelOps
37 * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 BlockingChannelOps
38 */
39
40 /*
41 * @test id=no-vmcontinuations
42 * @requires vm.continuations
43 * @library /test/lib
44 * @run junit/othervm/timeout=480 -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
45 */
46
47 import java.io.Closeable;
48 import java.io.IOException;
49 import java.net.DatagramPacket;
50 import java.net.InetAddress;
51 import java.net.InetSocketAddress;
52 import java.net.Socket;
53 import java.net.SocketAddress;
54 import java.net.SocketException;
55 import java.nio.ByteBuffer;
56 import java.nio.channels.AsynchronousCloseException;
57 import java.nio.channels.ClosedByInterruptException;
58 import java.nio.channels.ClosedChannelException;
59 import java.nio.channels.DatagramChannel;
60 import java.nio.channels.Pipe;
61 import java.nio.channels.ReadableByteChannel;
62 import java.nio.channels.ServerSocketChannel;
63 import java.nio.channels.SocketChannel;
64 import java.nio.channels.WritableByteChannel;
65 import java.util.concurrent.locks.LockSupport;
66
67 import jdk.test.lib.thread.VThreadRunner;
68 import org.junit.jupiter.api.Test;
69 import static org.junit.jupiter.api.Assertions.*;
70
71 class BlockingChannelOps {
72
73 /**
74 * SocketChannel read/write, no blocking.
75 */
76 @Test
77 void testSocketChannelReadWrite1() throws Exception {
78 VThreadRunner.run(() -> {
79 try (var connection = new Connection()) {
80 SocketChannel sc1 = connection.channel1();
81 SocketChannel sc2 = connection.channel2();
82
83 // write to sc1
84 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
85 int n = sc1.write(bb);
86 assertTrue(n > 0);
87
88 // read from sc2 should not block
89 bb = ByteBuffer.allocate(10);
90 n = sc2.read(bb);
91 assertTrue(n > 0);
92 assertTrue(bb.get(0) == 'X');
93 }
94 });
95 }
96
97 /**
98 * Virtual thread blocks in SocketChannel read.
99 */
100 @Test
101 void testSocketChannelRead() throws Exception {
102 VThreadRunner.run(() -> {
103 try (var connection = new Connection()) {
104 SocketChannel sc1 = connection.channel1();
105 SocketChannel sc2 = connection.channel2();
106
107 // write to sc1 when current thread blocks in sc2.read
108 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
109 runAfterParkedAsync(() -> sc1.write(bb1));
110
111 // read from sc2 should block
112 ByteBuffer bb2 = ByteBuffer.allocate(10);
113 int n = sc2.read(bb2);
114 assertTrue(n > 0);
115 assertTrue(bb2.get(0) == 'X');
116 }
117 });
118 }
119
120 /**
121 * Virtual thread blocks in SocketChannel write.
122 */
123 @Test
124 void testSocketChannelWrite() throws Exception {
125 VThreadRunner.run(() -> {
126 try (var connection = new Connection()) {
127 SocketChannel sc1 = connection.channel1();
128 SocketChannel sc2 = connection.channel2();
129
130 // read from sc2 to EOF when current thread blocks in sc1.write
131 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
132
133 // write to sc1 should block
134 ByteBuffer bb = ByteBuffer.allocate(100*1024);
135 for (int i=0; i<1000; i++) {
136 int n = sc1.write(bb);
137 assertTrue(n > 0);
138 bb.clear();
139 }
140 sc1.close();
141
142 // wait for reader to finish
143 reader.join();
144 }
145 });
146 }
147
148 /**
149 * SocketChannel close while virtual thread blocked in read.
150 */
151 @Test
152 void testSocketChannelReadAsyncClose() throws Exception {
153 VThreadRunner.run(() -> {
154 try (var connection = new Connection()) {
155 SocketChannel sc = connection.channel1();
156 runAfterParkedAsync(sc::close);
157 try {
158 int n = sc.read(ByteBuffer.allocate(100));
159 fail("read returned " + n);
160 } catch (AsynchronousCloseException expected) { }
161 }
162 });
163 }
164
165 /**
166 * SocketChannel shutdownInput while virtual thread blocked in read.
167 */
168 @Test
169 void testSocketChannelReadAsyncShutdownInput() throws Exception {
170 VThreadRunner.run(() -> {
171 try (var connection = new Connection()) {
172 SocketChannel sc = connection.channel1();
173 runAfterParkedAsync(sc::shutdownInput);
174 int n = sc.read(ByteBuffer.allocate(100));
175 assertEquals(-1, n);
176 assertTrue(sc.isOpen());
177 }
178 });
179 }
180
181 /**
182 * Virtual thread interrupted while blocked in SocketChannel read.
183 */
184 @Test
185 void testSocketChannelReadInterrupt() throws Exception {
186 VThreadRunner.run(() -> {
187 try (var connection = new Connection()) {
188 SocketChannel sc = connection.channel1();
189
190 // interrupt current thread when it blocks in read
191 Thread thisThread = Thread.currentThread();
192 runAfterParkedAsync(thisThread::interrupt);
193
194 try {
195 int n = sc.read(ByteBuffer.allocate(100));
196 fail("read returned " + n);
197 } catch (ClosedByInterruptException expected) {
198 assertTrue(Thread.interrupted());
199 }
200 }
201 });
202 }
203
204 /**
205 * SocketChannel close while virtual thread blocked in write.
206 */
207 @Test
208 void testSocketChannelWriteAsyncClose() throws Exception {
209 VThreadRunner.run(() -> {
210 boolean done = false;
211 while (!done) {
212 try (var connection = new Connection()) {
213 SocketChannel sc = connection.channel1();
214
215 // close sc when current thread blocks in write
216 runAfterParkedAsync(sc::close, true);
217
218 // write until channel is closed
219 try {
220 ByteBuffer bb = ByteBuffer.allocate(100*1024);
221 for (;;) {
222 int n = sc.write(bb);
223 assertTrue(n > 0);
224 bb.clear();
225 }
226 } catch (AsynchronousCloseException expected) {
227 // closed when blocked in write
228 done = true;
229 } catch (ClosedChannelException e) {
230 // closed but not blocked in write, need to retry test
231 System.err.format("%s, need to retry!%n", e);
232 }
233 }
234 }
235 });
236 }
237
238
239 /**
240 * SocketChannel shutdownOutput while virtual thread blocked in write.
241 */
242 @Test
243 void testSocketChannelWriteAsyncShutdownOutput() throws Exception {
244 VThreadRunner.run(() -> {
245 try (var connection = new Connection()) {
246 SocketChannel sc = connection.channel1();
247
248 // shutdown output when current thread blocks in write
249 runAfterParkedAsync(sc::shutdownOutput);
250 try {
251 ByteBuffer bb = ByteBuffer.allocate(100*1024);
252 for (;;) {
253 int n = sc.write(bb);
254 assertTrue(n > 0);
255 bb.clear();
256 }
257 } catch (ClosedChannelException e) {
258 // expected
259 }
260 assertTrue(sc.isOpen());
261 }
262 });
263 }
264
265 /**
266 * Virtual thread interrupted while blocked in SocketChannel write.
267 */
268 @Test
269 void testSocketChannelWriteInterrupt() throws Exception {
270 VThreadRunner.run(() -> {
271 boolean done = false;
272 while (!done) {
273 try (var connection = new Connection()) {
274 SocketChannel sc = connection.channel1();
275
276 // interrupt current thread when it blocks in write
277 Thread thisThread = Thread.currentThread();
278 runAfterParkedAsync(thisThread::interrupt, true);
279
280 // write until channel is closed
281 try {
282 ByteBuffer bb = ByteBuffer.allocate(100*1024);
283 for (;;) {
284 int n = sc.write(bb);
285 assertTrue(n > 0);
286 bb.clear();
287 }
288 } catch (ClosedByInterruptException e) {
289 // closed when blocked in write
290 assertTrue(Thread.interrupted());
291 done = true;
292 } catch (ClosedChannelException e) {
293 // closed but not blocked in write, need to retry test
294 System.err.format("%s, need to retry!%n", e);
295 }
296 }
297 }
298 });
299 }
300
301 /**
302 * Virtual thread blocks in SocketChannel adaptor read.
303 */
304 @Test
305 void testSocketAdaptorRead1() throws Exception {
306 testSocketAdaptorRead(0);
307 }
308
309 /**
310 * Virtual thread blocks in SocketChannel adaptor read with timeout.
311 */
312 @Test
313 void testSocketAdaptorRead2() throws Exception {
314 testSocketAdaptorRead(60_000);
315 }
316
317 private void testSocketAdaptorRead(int timeout) throws Exception {
318 VThreadRunner.run(() -> {
319 try (var connection = new Connection()) {
320 SocketChannel sc1 = connection.channel1();
321 SocketChannel sc2 = connection.channel2();
322
323 // write to sc1 when currnet thread blocks reading from sc2
324 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
325 runAfterParkedAsync(() -> sc1.write(bb));
326
327 // read from sc2 should block
328 byte[] array = new byte[100];
329 if (timeout > 0)
330 sc2.socket().setSoTimeout(timeout);
331 int n = sc2.socket().getInputStream().read(array);
332 assertTrue(n > 0);
333 assertTrue(array[0] == 'X');
334 }
335 });
336 }
337
338 /**
339 * ServerSocketChannel accept, no blocking.
340 */
341 @Test
342 void testServerSocketChannelAccept1() throws Exception {
343 VThreadRunner.run(() -> {
344 try (var ssc = ServerSocketChannel.open()) {
345 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
346 var sc1 = SocketChannel.open(ssc.getLocalAddress());
347 // accept should not block
348 var sc2 = ssc.accept();
349 sc1.close();
350 sc2.close();
351 }
352 });
353 }
354
355 /**
356 * Virtual thread blocks in ServerSocketChannel accept.
357 */
358 @Test
359 void testServerSocketChannelAccept2() throws Exception {
360 VThreadRunner.run(() -> {
361 try (var ssc = ServerSocketChannel.open()) {
362 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
363 var sc1 = SocketChannel.open();
364
365 // connect when current thread when it blocks in accept
366 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
367
368 // accept should block
369 var sc2 = ssc.accept();
370 sc1.close();
371 sc2.close();
372 }
373 });
374 }
375
376 /**
377 * SeverSocketChannel close while virtual thread blocked in accept.
378 */
379 @Test
380 void testServerSocketChannelAcceptAsyncClose() throws Exception {
381 VThreadRunner.run(() -> {
382 try (var ssc = ServerSocketChannel.open()) {
383 InetAddress lh = InetAddress.getLoopbackAddress();
384 ssc.bind(new InetSocketAddress(lh, 0));
385 runAfterParkedAsync(ssc::close);
386 try {
387 SocketChannel sc = ssc.accept();
388 sc.close();
389 fail("connection accepted???");
390 } catch (AsynchronousCloseException expected) { }
391 }
392 });
393 }
394
395 /**
396 * Virtual thread interrupted while blocked in ServerSocketChannel accept.
397 */
398 @Test
399 void testServerSocketChannelAcceptInterrupt() throws Exception {
400 VThreadRunner.run(() -> {
401 try (var ssc = ServerSocketChannel.open()) {
402 InetAddress lh = InetAddress.getLoopbackAddress();
403 ssc.bind(new InetSocketAddress(lh, 0));
404
405 // interrupt current thread when it blocks in accept
406 Thread thisThread = Thread.currentThread();
407 runAfterParkedAsync(thisThread::interrupt);
408
409 try {
410 SocketChannel sc = ssc.accept();
411 sc.close();
412 fail("connection accepted???");
413 } catch (ClosedByInterruptException expected) {
414 assertTrue(Thread.interrupted());
415 }
416 }
417 });
418 }
419
420 /**
421 * Virtual thread blocks in ServerSocketChannel adaptor accept.
422 */
423 @Test
424 void testSocketChannelAdaptorAccept1() throws Exception {
425 testSocketChannelAdaptorAccept(0);
426 }
427
428 /**
429 * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
430 */
431 @Test
432 void testSocketChannelAdaptorAccept2() throws Exception {
433 testSocketChannelAdaptorAccept(60_000);
434 }
435
436 private void testSocketChannelAdaptorAccept(int timeout) throws Exception {
437 VThreadRunner.run(() -> {
438 try (var ssc = ServerSocketChannel.open()) {
439 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
440 var sc = SocketChannel.open();
441
442 // interrupt current thread when it blocks in accept
443 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
444
445 // accept should block
446 if (timeout > 0)
447 ssc.socket().setSoTimeout(timeout);
448 Socket s = ssc.socket().accept();
449 sc.close();
450 s.close();
451 }
452 });
453 }
454
455 /**
456 * DatagramChannel receive/send, no blocking.
457 */
458 @Test
459 void testDatagramChannelSendReceive1() throws Exception {
460 VThreadRunner.run(() -> {
461 try (DatagramChannel dc1 = DatagramChannel.open();
462 DatagramChannel dc2 = DatagramChannel.open()) {
463
464 InetAddress lh = InetAddress.getLoopbackAddress();
465 dc2.bind(new InetSocketAddress(lh, 0));
466
467 // send should not block
468 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
469 int n = dc1.send(bb, dc2.getLocalAddress());
470 assertTrue(n > 0);
471
472 // receive should not block
473 bb = ByteBuffer.allocate(10);
474 dc2.receive(bb);
475 assertTrue(bb.get(0) == 'X');
476 }
477 });
478 }
479
480 /**
481 * Virtual thread blocks in DatagramChannel receive.
482 */
483 @Test
484 void testDatagramChannelSendReceive2() throws Exception {
485 VThreadRunner.run(() -> {
486 try (DatagramChannel dc1 = DatagramChannel.open();
487 DatagramChannel dc2 = DatagramChannel.open()) {
488
489 InetAddress lh = InetAddress.getLoopbackAddress();
490 dc2.bind(new InetSocketAddress(lh, 0));
491
492 // send from dc1 when current thread blocked in dc2.receive
493 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
494 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
495
496 // read from dc2 should block
497 ByteBuffer bb2 = ByteBuffer.allocate(10);
498 dc2.receive(bb2);
499 assertTrue(bb2.get(0) == 'X');
500 }
501 });
502 }
503
504 /**
505 * DatagramChannel close while virtual thread blocked in receive.
506 */
507 @Test
508 void testDatagramChannelReceiveAsyncClose() throws Exception {
509 VThreadRunner.run(() -> {
510 try (DatagramChannel dc = DatagramChannel.open()) {
511 InetAddress lh = InetAddress.getLoopbackAddress();
512 dc.bind(new InetSocketAddress(lh, 0));
513 runAfterParkedAsync(dc::close);
514 try {
515 dc.receive(ByteBuffer.allocate(100));
516 fail("receive returned");
517 } catch (AsynchronousCloseException expected) { }
518 }
519 });
520 }
521
522 /**
523 * Virtual thread interrupted while blocked in DatagramChannel receive.
524 */
525 @Test
526 void testDatagramChannelReceiveInterrupt() throws Exception {
527 VThreadRunner.run(() -> {
528 try (DatagramChannel dc = DatagramChannel.open()) {
529 InetAddress lh = InetAddress.getLoopbackAddress();
530 dc.bind(new InetSocketAddress(lh, 0));
531
532 // interrupt current thread when it blocks in receive
533 Thread thisThread = Thread.currentThread();
534 runAfterParkedAsync(thisThread::interrupt);
535
536 try {
537 dc.receive(ByteBuffer.allocate(100));
538 fail("receive returned");
539 } catch (ClosedByInterruptException expected) {
540 assertTrue(Thread.interrupted());
541 }
542 }
543 });
544 }
545
546 /**
547 * Virtual thread blocks in DatagramSocket adaptor receive.
548 */
549 @Test
550 void testDatagramSocketAdaptorReceive1() throws Exception {
551 testDatagramSocketAdaptorReceive(0);
552 }
553
554 /**
555 * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
556 */
557 @Test
558 void testDatagramSocketAdaptorReceive2() throws Exception {
559 testDatagramSocketAdaptorReceive(60_000);
560 }
561
562 private void testDatagramSocketAdaptorReceive(int timeout) throws Exception {
563 VThreadRunner.run(() -> {
564 try (DatagramChannel dc1 = DatagramChannel.open();
565 DatagramChannel dc2 = DatagramChannel.open()) {
566
567 InetAddress lh = InetAddress.getLoopbackAddress();
568 dc2.bind(new InetSocketAddress(lh, 0));
569
570 // send from dc1 when current thread blocks in dc2 receive
571 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
572 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
573
574 // receive should block
575 byte[] array = new byte[100];
576 DatagramPacket p = new DatagramPacket(array, 0, array.length);
577 if (timeout > 0)
578 dc2.socket().setSoTimeout(timeout);
579 dc2.socket().receive(p);
580 assertTrue(p.getLength() == 3 && array[0] == 'X');
581 }
582 });
583 }
584
585 /**
586 * DatagramChannel close while virtual thread blocked in adaptor receive.
587 */
588 @Test
589 void testDatagramSocketAdaptorReceiveAsyncClose1() throws Exception {
590 testDatagramSocketAdaptorReceiveAsyncClose(0);
591 }
592
593 /**
594 * DatagramChannel close while virtual thread blocked in adaptor receive
595 * with timeout.
596 */
597 @Test
598 void testDatagramSocketAdaptorReceiveAsyncClose2() throws Exception {
599 testDatagramSocketAdaptorReceiveAsyncClose(60_1000);
600 }
601
602 private void testDatagramSocketAdaptorReceiveAsyncClose(int timeout) throws Exception {
603 VThreadRunner.run(() -> {
604 try (DatagramChannel dc = DatagramChannel.open()) {
605 InetAddress lh = InetAddress.getLoopbackAddress();
606 dc.bind(new InetSocketAddress(lh, 0));
607
608 byte[] array = new byte[100];
609 DatagramPacket p = new DatagramPacket(array, 0, array.length);
610 if (timeout > 0)
611 dc.socket().setSoTimeout(timeout);
612
613 // close channel/socket when current thread blocks in receive
614 runAfterParkedAsync(dc::close);
615
616 assertThrows(SocketException.class, () -> dc.socket().receive(p));
617 }
618 });
619 }
620
621 /**
622 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
623 */
624 @Test
625 void testDatagramSocketAdaptorReceiveInterrupt1() throws Exception {
626 testDatagramSocketAdaptorReceiveInterrupt(0);
627 }
628
629 /**
630 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
631 * with timeout.
632 */
633 @Test
634 void testDatagramSocketAdaptorReceiveInterrupt2() throws Exception {
635 testDatagramSocketAdaptorReceiveInterrupt(60_1000);
636 }
637
638 private void testDatagramSocketAdaptorReceiveInterrupt(int timeout) throws Exception {
639 VThreadRunner.run(() -> {
640 try (DatagramChannel dc = DatagramChannel.open()) {
641 InetAddress lh = InetAddress.getLoopbackAddress();
642 dc.bind(new InetSocketAddress(lh, 0));
643
644 byte[] array = new byte[100];
645 DatagramPacket p = new DatagramPacket(array, 0, array.length);
646 if (timeout > 0)
647 dc.socket().setSoTimeout(timeout);
648
649 // interrupt current thread when it blocks in receive
650 Thread thisThread = Thread.currentThread();
651 runAfterParkedAsync(thisThread::interrupt);
652
653 try {
654 dc.socket().receive(p);
655 fail();
656 } catch (ClosedByInterruptException expected) {
657 assertTrue(Thread.interrupted());
658 }
659 }
660 });
661 }
662
663 /**
664 * Pipe read/write, no blocking.
665 */
666 @Test
667 void testPipeReadWrite1() throws Exception {
668 VThreadRunner.run(() -> {
669 Pipe p = Pipe.open();
670 try (Pipe.SinkChannel sink = p.sink();
671 Pipe.SourceChannel source = p.source()) {
672
673 // write should not block
674 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
675 int n = sink.write(bb);
676 assertTrue(n > 0);
677
678 // read should not block
679 bb = ByteBuffer.allocate(10);
680 n = source.read(bb);
681 assertTrue(n > 0);
682 assertTrue(bb.get(0) == 'X');
683 }
684 });
685 }
686
687 /**
688 * Virtual thread blocks in Pipe.SourceChannel read.
689 */
690 @Test
691 void testPipeReadWrite2() throws Exception {
692 VThreadRunner.run(() -> {
693 Pipe p = Pipe.open();
694 try (Pipe.SinkChannel sink = p.sink();
695 Pipe.SourceChannel source = p.source()) {
696
697 // write from sink when current thread blocks reading from source
698 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
699 runAfterParkedAsync(() -> sink.write(bb1));
700
701 // read should block
702 ByteBuffer bb2 = ByteBuffer.allocate(10);
703 int n = source.read(bb2);
704 assertTrue(n > 0);
705 assertTrue(bb2.get(0) == 'X');
706 }
707 });
708 }
709
710 /**
711 * Virtual thread blocks in Pipe.SinkChannel write.
712 */
713 @Test
714 void testPipeReadWrite3() throws Exception {
715 VThreadRunner.run(() -> {
716 Pipe p = Pipe.open();
717 try (Pipe.SinkChannel sink = p.sink();
718 Pipe.SourceChannel source = p.source()) {
719
720 // read from source to EOF when current thread blocking in write
721 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
722
723 // write to sink should block
724 ByteBuffer bb = ByteBuffer.allocate(100*1024);
725 for (int i=0; i<1000; i++) {
726 int n = sink.write(bb);
727 assertTrue(n > 0);
728 bb.clear();
729 }
730 sink.close();
731
732 // wait for reader to finish
733 reader.join();
734 }
735 });
736 }
737
738 /**
739 * Pipe.SourceChannel close while virtual thread blocked in read.
740 */
741 @Test
742 void testPipeReadAsyncClose() throws Exception {
743 VThreadRunner.run(() -> {
744 Pipe p = Pipe.open();
745 try (Pipe.SinkChannel sink = p.sink();
746 Pipe.SourceChannel source = p.source()) {
747 runAfterParkedAsync(source::close);
748 try {
749 int n = source.read(ByteBuffer.allocate(100));
750 fail("read returned " + n);
751 } catch (AsynchronousCloseException expected) { }
752 }
753 });
754 }
755
756 /**
757 * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
758 */
759 @Test
760 void testPipeReadInterrupt() throws Exception {
761 VThreadRunner.run(() -> {
762 Pipe p = Pipe.open();
763 try (Pipe.SinkChannel sink = p.sink();
764 Pipe.SourceChannel source = p.source()) {
765
766 // interrupt current thread when it blocks reading from source
767 Thread thisThread = Thread.currentThread();
768 runAfterParkedAsync(thisThread::interrupt);
769
770 try {
771 int n = source.read(ByteBuffer.allocate(100));
772 fail("read returned " + n);
773 } catch (ClosedByInterruptException expected) {
774 assertTrue(Thread.interrupted());
775 }
776 }
777 });
778 }
779
780 /**
781 * Pipe.SinkChannel close while virtual thread blocked in write.
782 */
783 @Test
784 void testPipeWriteAsyncClose() throws Exception {
785 VThreadRunner.run(() -> {
786 boolean done = false;
787 while (!done) {
788 Pipe p = Pipe.open();
789 try (Pipe.SinkChannel sink = p.sink();
790 Pipe.SourceChannel source = p.source()) {
791
792 // close sink when current thread blocks in write
793 runAfterParkedAsync(sink::close, true);
794
795 // write until channel is closed
796 try {
797 ByteBuffer bb = ByteBuffer.allocate(100*1024);
798 for (;;) {
799 int n = sink.write(bb);
800 assertTrue(n > 0);
801 bb.clear();
802 }
803 } catch (AsynchronousCloseException e) {
804 // closed when blocked in write
805 done = true;
806 } catch (ClosedChannelException e) {
807 // closed but not blocked in write, need to retry test
808 System.err.format("%s, need to retry!%n", e);
809 }
810 }
811 }
812 });
813 }
814
815 /**
816 * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
817 */
818 @Test
819 void testPipeWriteInterrupt() throws Exception {
820 VThreadRunner.run(() -> {
821 boolean done = false;
822 while (!done) {
823 Pipe p = Pipe.open();
824 try (Pipe.SinkChannel sink = p.sink();
825 Pipe.SourceChannel source = p.source()) {
826
827 // interrupt current thread when it blocks in write
828 Thread thisThread = Thread.currentThread();
829 runAfterParkedAsync(thisThread::interrupt, true);
830
831 // write until channel is closed
832 try {
833 ByteBuffer bb = ByteBuffer.allocate(100*1024);
834 for (;;) {
835 int n = sink.write(bb);
836 assertTrue(n > 0);
837 bb.clear();
838 }
839 } catch (ClosedByInterruptException expected) {
840 // closed when blocked in write
|
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24 /*
25 * @test id=default
26 * @bug 8284161
27 * @summary Test virtual threads doing blocking I/O on NIO channels
28 * @library /test/lib
29 * @run junit/othervm/timeout=480 BlockingChannelOps
30 */
31
32 /*
33 * @test id=poller-modes
34 * @requires (os.family == "linux") | (os.family == "mac")
35 * @library /test/lib
36 * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 BlockingChannelOps
37 * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 BlockingChannelOps
38 * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 BlockingChannelOps
39 */
40
41 /*
42 * @test id=io_uring
43 * @requires os.family == "linux"
44 * @library /test/lib
45 * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 -Djdk.io_uring=true BlockingChannelOps
46 * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 -Djdk.io_uring=true BlockingChannelOps
47 * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 -Djdk.io_uring=true BlockingChannelOps
48 */
49
50 /*
51 * @test id=no-vmcontinuations
52 * @requires vm.continuations
53 * @library /test/lib
54 * @run junit/othervm/timeout=480 -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
55 */
56
57 import java.io.Closeable;
58 import java.io.IOException;
59 import java.net.DatagramPacket;
60 import java.net.InetAddress;
61 import java.net.InetSocketAddress;
62 import java.net.Socket;
63 import java.net.SocketAddress;
64 import java.net.SocketException;
65 import java.nio.ByteBuffer;
66 import java.nio.channels.AsynchronousCloseException;
67 import java.nio.channels.ClosedByInterruptException;
68 import java.nio.channels.ClosedChannelException;
69 import java.nio.channels.DatagramChannel;
70 import java.nio.channels.Pipe;
71 import java.nio.channels.ReadableByteChannel;
72 import java.nio.channels.ServerSocketChannel;
73 import java.nio.channels.SocketChannel;
74 import java.nio.channels.WritableByteChannel;
75 import java.util.concurrent.ExecutorService;
76 import java.util.concurrent.Executors;
77 import java.util.concurrent.ThreadFactory;
78 import java.util.concurrent.locks.LockSupport;
79 import java.util.stream.Stream;
80
81 import jdk.test.lib.thread.VThreadRunner;
82 import jdk.test.lib.thread.VThreadScheduler;
83
84 import org.junit.jupiter.api.BeforeAll;
85 import org.junit.jupiter.params.ParameterizedTest;
86 import org.junit.jupiter.params.provider.MethodSource;
87 import static org.junit.jupiter.api.Assertions.*;
88
89 class BlockingChannelOps {
90 private static ExecutorService threadPool;
91 private static Thread.VirtualThreadScheduler customScheduler;
92
93 @BeforeAll
94 static void setup() {
95 ThreadFactory factory = Thread.ofPlatform().daemon().factory();
96 threadPool = Executors.newCachedThreadPool(factory);
97 customScheduler = (_, task) -> threadPool.execute(task);
98 }
99
100 /**
101 * Returns the Thread.Builder to create the virtual thread.
102 */
103 static Stream<Thread.Builder.OfVirtual> threadBuilders() {
104 if (VThreadScheduler.supportsCustomScheduler()) {
105 return Stream.of(Thread.ofVirtual(), Thread.ofVirtual().scheduler(customScheduler));
106 } else {
107 return Stream.of(Thread.ofVirtual());
108 }
109 }
110
111 /**
112 * SocketChannel read/write, no blocking.
113 */
114 @ParameterizedTest
115 @MethodSource("threadBuilders")
116 void testSocketChannelReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
117 VThreadRunner.run(builder, () -> {
118 try (var connection = new Connection()) {
119 SocketChannel sc1 = connection.channel1();
120 SocketChannel sc2 = connection.channel2();
121
122 // write to sc1
123 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
124 int n = sc1.write(bb);
125 assertTrue(n > 0);
126
127 // read from sc2 should not block
128 bb = ByteBuffer.allocate(10);
129 n = sc2.read(bb);
130 assertTrue(n > 0);
131 assertTrue(bb.get(0) == 'X');
132 }
133 });
134 }
135
136 /**
137 * Virtual thread blocks in SocketChannel read.
138 */
139 @ParameterizedTest
140 @MethodSource("threadBuilders")
141 void testSocketChannelRead(Thread.Builder.OfVirtual builder) throws Exception {
142 VThreadRunner.run(builder, () -> {
143 try (var connection = new Connection()) {
144 SocketChannel sc1 = connection.channel1();
145 SocketChannel sc2 = connection.channel2();
146
147 // write to sc1 when current thread blocks in sc2.read
148 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
149 runAfterParkedAsync(() -> sc1.write(bb1));
150
151 // read from sc2 should block
152 ByteBuffer bb2 = ByteBuffer.allocate(10);
153 int n = sc2.read(bb2);
154 assertTrue(n > 0);
155 assertTrue(bb2.get(0) == 'X');
156 }
157 });
158 }
159
160 /**
161 * Virtual thread blocks in SocketChannel write.
162 */
163 @ParameterizedTest
164 @MethodSource("threadBuilders")
165 void testSocketChannelWrite(Thread.Builder.OfVirtual builder) throws Exception {
166 VThreadRunner.run(builder, () -> {
167 try (var connection = new Connection()) {
168 SocketChannel sc1 = connection.channel1();
169 SocketChannel sc2 = connection.channel2();
170
171 // read from sc2 to EOF when current thread blocks in sc1.write
172 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
173
174 // write to sc1 should block
175 ByteBuffer bb = ByteBuffer.allocate(100*1024);
176 for (int i=0; i<1000; i++) {
177 int n = sc1.write(bb);
178 assertTrue(n > 0);
179 bb.clear();
180 }
181 sc1.close();
182
183 // wait for reader to finish
184 reader.join();
185 }
186 });
187 }
188
189 /**
190 * SocketChannel close while virtual thread blocked in read.
191 */
192 @ParameterizedTest
193 @MethodSource("threadBuilders")
194 void testSocketChannelReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
195 VThreadRunner.run(builder, () -> {
196 try (var connection = new Connection()) {
197 SocketChannel sc = connection.channel1();
198 runAfterParkedAsync(sc::close);
199 try {
200 int n = sc.read(ByteBuffer.allocate(100));
201 fail("read returned " + n);
202 } catch (AsynchronousCloseException expected) { }
203 }
204 });
205 }
206
207 /**
208 * SocketChannel shutdownInput while virtual thread blocked in read.
209 */
210 @ParameterizedTest
211 @MethodSource("threadBuilders")
212 void testSocketChannelReadAsyncShutdownInput(Thread.Builder.OfVirtual builder) throws Exception {
213 VThreadRunner.run(builder, () -> {
214 try (var connection = new Connection()) {
215 SocketChannel sc = connection.channel1();
216 runAfterParkedAsync(sc::shutdownInput);
217 int n = sc.read(ByteBuffer.allocate(100));
218 assertEquals(-1, n);
219 assertTrue(sc.isOpen());
220 }
221 });
222 }
223
224 /**
225 * Virtual thread interrupted while blocked in SocketChannel read.
226 */
227 @ParameterizedTest
228 @MethodSource("threadBuilders")
229 void testSocketChannelReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
230 VThreadRunner.run(builder, () -> {
231 try (var connection = new Connection()) {
232 SocketChannel sc = connection.channel1();
233
234 // interrupt current thread when it blocks in read
235 Thread thisThread = Thread.currentThread();
236 runAfterParkedAsync(thisThread::interrupt);
237
238 try {
239 int n = sc.read(ByteBuffer.allocate(100));
240 fail("read returned " + n);
241 } catch (ClosedByInterruptException expected) {
242 assertTrue(Thread.interrupted());
243 }
244 }
245 });
246 }
247
248 /**
249 * SocketChannel close while virtual thread blocked in write.
250 */
251 @ParameterizedTest
252 @MethodSource("threadBuilders")
253 void testSocketChannelWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
254 VThreadRunner.run(builder, () -> {
255 boolean done = false;
256 while (!done) {
257 try (var connection = new Connection()) {
258 SocketChannel sc = connection.channel1();
259
260 // close sc when current thread blocks in write
261 runAfterParkedAsync(sc::close, true);
262
263 // write until channel is closed
264 try {
265 ByteBuffer bb = ByteBuffer.allocate(100*1024);
266 for (;;) {
267 int n = sc.write(bb);
268 assertTrue(n > 0);
269 bb.clear();
270 }
271 } catch (AsynchronousCloseException expected) {
272 // closed when blocked in write
273 done = true;
274 } catch (ClosedChannelException e) {
275 // closed but not blocked in write, need to retry test
276 System.err.format("%s, need to retry!%n", e);
277 }
278 }
279 }
280 });
281 }
282
283 /**
284 * SocketChannel shutdownOutput while virtual thread blocked in write.
285 */
286 @ParameterizedTest
287 @MethodSource("threadBuilders")
288 void testSocketChannelWriteAsyncShutdownOutput(Thread.Builder.OfVirtual builder) throws Exception {
289 VThreadRunner.run(builder, () -> {
290 try (var connection = new Connection()) {
291 SocketChannel sc = connection.channel1();
292
293 // shutdown output when current thread blocks in write
294 runAfterParkedAsync(sc::shutdownOutput);
295 try {
296 ByteBuffer bb = ByteBuffer.allocate(100*1024);
297 for (;;) {
298 int n = sc.write(bb);
299 assertTrue(n > 0);
300 bb.clear();
301 }
302 } catch (ClosedChannelException e) {
303 // expected
304 }
305 assertTrue(sc.isOpen());
306 }
307 });
308 }
309
310 /**
311 * Virtual thread interrupted while blocked in SocketChannel write.
312 */
313 @ParameterizedTest
314 @MethodSource("threadBuilders")
315 void testSocketChannelWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
316 VThreadRunner.run(builder, () -> {
317 boolean done = false;
318 while (!done) {
319 try (var connection = new Connection()) {
320 SocketChannel sc = connection.channel1();
321
322 // interrupt current thread when it blocks in write
323 Thread thisThread = Thread.currentThread();
324 runAfterParkedAsync(thisThread::interrupt, true);
325
326 // write until channel is closed
327 try {
328 ByteBuffer bb = ByteBuffer.allocate(100*1024);
329 for (;;) {
330 int n = sc.write(bb);
331 assertTrue(n > 0);
332 bb.clear();
333 }
334 } catch (ClosedByInterruptException e) {
335 // closed when blocked in write
336 assertTrue(Thread.interrupted());
337 done = true;
338 } catch (ClosedChannelException e) {
339 // closed but not blocked in write, need to retry test
340 System.err.format("%s, need to retry!%n", e);
341 }
342 }
343 }
344 });
345 }
346
347 /**
348 * Virtual thread blocks in SocketChannel adaptor read.
349 */
350 @ParameterizedTest
351 @MethodSource("threadBuilders")
352 void testSocketAdaptorRead1(Thread.Builder.OfVirtual builder) throws Exception {
353 testSocketAdaptorRead(builder, 0);
354 }
355
356 /**
357 * Virtual thread blocks in SocketChannel adaptor read with timeout.
358 */
359 @ParameterizedTest
360 @MethodSource("threadBuilders")
361 void testSocketAdaptorRead2(Thread.Builder.OfVirtual builder) throws Exception {
362 testSocketAdaptorRead(builder, 60_000);
363 }
364
365 private void testSocketAdaptorRead(Thread.Builder.OfVirtual builder,
366 int timeout) throws Exception {
367 VThreadRunner.run(builder, () -> {
368 try (var connection = new Connection()) {
369 SocketChannel sc1 = connection.channel1();
370 SocketChannel sc2 = connection.channel2();
371
372 // write to sc1 when currnet thread blocks reading from sc2
373 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
374 runAfterParkedAsync(() -> sc1.write(bb));
375
376 // read from sc2 should block
377 byte[] array = new byte[100];
378 if (timeout > 0)
379 sc2.socket().setSoTimeout(timeout);
380 int n = sc2.socket().getInputStream().read(array);
381 assertTrue(n > 0);
382 assertTrue(array[0] == 'X');
383 }
384 });
385 }
386
387 /**
388 * ServerSocketChannel accept, no blocking.
389 */
390 @ParameterizedTest
391 @MethodSource("threadBuilders")
392 void testServerSocketChannelAccept1(Thread.Builder.OfVirtual builder) throws Exception {
393 VThreadRunner.run(builder, () -> {
394 try (var ssc = ServerSocketChannel.open()) {
395 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
396 var sc1 = SocketChannel.open(ssc.getLocalAddress());
397 // accept should not block
398 var sc2 = ssc.accept();
399 sc1.close();
400 sc2.close();
401 }
402 });
403 }
404
405 /**
406 * Virtual thread blocks in ServerSocketChannel accept.
407 */
408 @ParameterizedTest
409 @MethodSource("threadBuilders")
410 void testServerSocketChannelAccept2(Thread.Builder.OfVirtual builder) throws Exception {
411 VThreadRunner.run(builder, () -> {
412 try (var ssc = ServerSocketChannel.open()) {
413 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
414 var sc1 = SocketChannel.open();
415
416 // connect when current thread when it blocks in accept
417 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
418
419 // accept should block
420 var sc2 = ssc.accept();
421 sc1.close();
422 sc2.close();
423 }
424 });
425 }
426
427 /**
428 * SeverSocketChannel close while virtual thread blocked in accept.
429 */
430 @ParameterizedTest
431 @MethodSource("threadBuilders")
432 void testServerSocketChannelAcceptAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
433 VThreadRunner.run(builder, () -> {
434 try (var ssc = ServerSocketChannel.open()) {
435 InetAddress lh = InetAddress.getLoopbackAddress();
436 ssc.bind(new InetSocketAddress(lh, 0));
437 runAfterParkedAsync(ssc::close);
438 try {
439 SocketChannel sc = ssc.accept();
440 sc.close();
441 fail("connection accepted???");
442 } catch (AsynchronousCloseException expected) { }
443 }
444 });
445 }
446
447 /**
448 * Virtual thread interrupted while blocked in ServerSocketChannel accept.
449 */
450 @ParameterizedTest
451 @MethodSource("threadBuilders")
452 void testServerSocketChannelAcceptInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
453 VThreadRunner.run(builder, () -> {
454 try (var ssc = ServerSocketChannel.open()) {
455 InetAddress lh = InetAddress.getLoopbackAddress();
456 ssc.bind(new InetSocketAddress(lh, 0));
457
458 // interrupt current thread when it blocks in accept
459 Thread thisThread = Thread.currentThread();
460 runAfterParkedAsync(thisThread::interrupt);
461
462 try {
463 SocketChannel sc = ssc.accept();
464 sc.close();
465 fail("connection accepted???");
466 } catch (ClosedByInterruptException expected) {
467 assertTrue(Thread.interrupted());
468 }
469 }
470 });
471 }
472
473 /**
474 * Virtual thread blocks in ServerSocketChannel adaptor accept.
475 */
476 @ParameterizedTest
477 @MethodSource("threadBuilders")
478 void testSocketChannelAdaptorAccept1(Thread.Builder.OfVirtual builder) throws Exception {
479 testSocketChannelAdaptorAccept(builder, 0);
480 }
481
482 /**
483 * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
484 */
485 @ParameterizedTest
486 @MethodSource("threadBuilders")
487 void testSocketChannelAdaptorAccept2(Thread.Builder.OfVirtual builder) throws Exception {
488 testSocketChannelAdaptorAccept(builder, 60_000);
489 }
490
491 private void testSocketChannelAdaptorAccept(Thread.Builder.OfVirtual builder,
492 int timeout) throws Exception {
493 VThreadRunner.run(builder, () -> {
494 try (var ssc = ServerSocketChannel.open()) {
495 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
496 var sc = SocketChannel.open();
497
498 // interrupt current thread when it blocks in accept
499 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
500
501 // accept should block
502 if (timeout > 0)
503 ssc.socket().setSoTimeout(timeout);
504 Socket s = ssc.socket().accept();
505 sc.close();
506 s.close();
507 }
508 });
509 }
510
511 /**
512 * DatagramChannel receive/send, no blocking.
513 */
514 @ParameterizedTest
515 @MethodSource("threadBuilders")
516 void testDatagramChannelSendReceive1(Thread.Builder.OfVirtual builder) throws Exception {
517 VThreadRunner.run(builder, () -> {
518 try (DatagramChannel dc1 = DatagramChannel.open();
519 DatagramChannel dc2 = DatagramChannel.open()) {
520
521 InetAddress lh = InetAddress.getLoopbackAddress();
522 dc2.bind(new InetSocketAddress(lh, 0));
523
524 // send should not block
525 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
526 int n = dc1.send(bb, dc2.getLocalAddress());
527 assertTrue(n > 0);
528
529 // receive should not block
530 bb = ByteBuffer.allocate(10);
531 dc2.receive(bb);
532 assertTrue(bb.get(0) == 'X');
533 }
534 });
535 }
536
537 /**
538 * Virtual thread blocks in DatagramChannel receive.
539 */
540 @ParameterizedTest
541 @MethodSource("threadBuilders")
542 void testDatagramChannelSendReceive2(Thread.Builder.OfVirtual builder) throws Exception {
543 VThreadRunner.run(builder, () -> {
544 try (DatagramChannel dc1 = DatagramChannel.open();
545 DatagramChannel dc2 = DatagramChannel.open()) {
546
547 InetAddress lh = InetAddress.getLoopbackAddress();
548 dc2.bind(new InetSocketAddress(lh, 0));
549
550 // send from dc1 when current thread blocked in dc2.receive
551 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
552 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
553
554 // read from dc2 should block
555 ByteBuffer bb2 = ByteBuffer.allocate(10);
556 dc2.receive(bb2);
557 assertTrue(bb2.get(0) == 'X');
558 }
559 });
560 }
561
562 /**
563 * DatagramChannel close while virtual thread blocked in receive.
564 */
565 @ParameterizedTest
566 @MethodSource("threadBuilders")
567 void testDatagramChannelReceiveAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
568 VThreadRunner.run(builder, () -> {
569 try (DatagramChannel dc = DatagramChannel.open()) {
570 InetAddress lh = InetAddress.getLoopbackAddress();
571 dc.bind(new InetSocketAddress(lh, 0));
572 runAfterParkedAsync(dc::close);
573 try {
574 dc.receive(ByteBuffer.allocate(100));
575 fail("receive returned");
576 } catch (AsynchronousCloseException expected) { }
577 }
578 });
579 }
580
581 /**
582 * Virtual thread interrupted while blocked in DatagramChannel receive.
583 */
584 @ParameterizedTest
585 @MethodSource("threadBuilders")
586 void testDatagramChannelReceiveInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
587 VThreadRunner.run(builder, () -> {
588 try (DatagramChannel dc = DatagramChannel.open()) {
589 InetAddress lh = InetAddress.getLoopbackAddress();
590 dc.bind(new InetSocketAddress(lh, 0));
591
592 // interrupt current thread when it blocks in receive
593 Thread thisThread = Thread.currentThread();
594 runAfterParkedAsync(thisThread::interrupt);
595
596 try {
597 dc.receive(ByteBuffer.allocate(100));
598 fail("receive returned");
599 } catch (ClosedByInterruptException expected) {
600 assertTrue(Thread.interrupted());
601 }
602 }
603 });
604 }
605
606 /**
607 * Virtual thread blocks in DatagramSocket adaptor receive.
608 */
609 @ParameterizedTest
610 @MethodSource("threadBuilders")
611 void testDatagramSocketAdaptorReceive1(Thread.Builder.OfVirtual builder) throws Exception {
612 testDatagramSocketAdaptorReceive(builder, 0);
613 }
614
615 /**
616 * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
617 */
618 @ParameterizedTest
619 @MethodSource("threadBuilders")
620 void testDatagramSocketAdaptorReceive2(Thread.Builder.OfVirtual builder) throws Exception {
621 testDatagramSocketAdaptorReceive(builder, 60_000);
622 }
623
624 private void testDatagramSocketAdaptorReceive(Thread.Builder.OfVirtual builder,
625 int timeout) throws Exception {
626 VThreadRunner.run(builder, () -> {
627 try (DatagramChannel dc1 = DatagramChannel.open();
628 DatagramChannel dc2 = DatagramChannel.open()) {
629
630 InetAddress lh = InetAddress.getLoopbackAddress();
631 dc2.bind(new InetSocketAddress(lh, 0));
632
633 // send from dc1 when current thread blocks in dc2 receive
634 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
635 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
636
637 // receive should block
638 byte[] array = new byte[100];
639 DatagramPacket p = new DatagramPacket(array, 0, array.length);
640 if (timeout > 0)
641 dc2.socket().setSoTimeout(timeout);
642 dc2.socket().receive(p);
643 assertTrue(p.getLength() == 3 && array[0] == 'X');
644 }
645 });
646 }
647
648 /**
649 * DatagramChannel close while virtual thread blocked in adaptor receive.
650 */
651 @ParameterizedTest
652 @MethodSource("threadBuilders")
653 void testDatagramSocketAdaptorReceiveAsyncClose1(Thread.Builder.OfVirtual builder) throws Exception {
654 testDatagramSocketAdaptorReceiveAsyncClose(builder, 0);
655 }
656
657 /**
658 * DatagramChannel close while virtual thread blocked in adaptor receive
659 * with timeout.
660 */
661 @ParameterizedTest
662 @MethodSource("threadBuilders")
663 void testDatagramSocketAdaptorReceiveAsyncClose2(Thread.Builder.OfVirtual builder) throws Exception {
664 testDatagramSocketAdaptorReceiveAsyncClose(builder, 60_1000);
665 }
666
667 private void testDatagramSocketAdaptorReceiveAsyncClose(Thread.Builder.OfVirtual builder,
668 int timeout) throws Exception {
669 VThreadRunner.run(builder, () -> {
670 try (DatagramChannel dc = DatagramChannel.open()) {
671 InetAddress lh = InetAddress.getLoopbackAddress();
672 dc.bind(new InetSocketAddress(lh, 0));
673
674 byte[] array = new byte[100];
675 DatagramPacket p = new DatagramPacket(array, 0, array.length);
676 if (timeout > 0)
677 dc.socket().setSoTimeout(timeout);
678
679 // close channel/socket when current thread blocks in receive
680 runAfterParkedAsync(dc::close);
681
682 assertThrows(SocketException.class, () -> dc.socket().receive(p));
683 }
684 });
685 }
686
687 /**
688 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
689 */
690 @ParameterizedTest
691 @MethodSource("threadBuilders")
692 void testDatagramSocketAdaptorReceiveInterrupt1(Thread.Builder.OfVirtual builder) throws Exception {
693 testDatagramSocketAdaptorReceiveInterrupt(builder, 0);
694 }
695
696 /**
697 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
698 * with timeout.
699 */
700 @ParameterizedTest
701 @MethodSource("threadBuilders")
702 void testDatagramSocketAdaptorReceiveInterrupt2(Thread.Builder.OfVirtual builder) throws Exception {
703 testDatagramSocketAdaptorReceiveInterrupt(builder, 60_1000);
704 }
705
706 private void testDatagramSocketAdaptorReceiveInterrupt(Thread.Builder.OfVirtual builder,
707 int timeout) throws Exception {
708 VThreadRunner.run(builder, () -> {
709 try (DatagramChannel dc = DatagramChannel.open()) {
710 InetAddress lh = InetAddress.getLoopbackAddress();
711 dc.bind(new InetSocketAddress(lh, 0));
712
713 byte[] array = new byte[100];
714 DatagramPacket p = new DatagramPacket(array, 0, array.length);
715 if (timeout > 0)
716 dc.socket().setSoTimeout(timeout);
717
718 // interrupt current thread when it blocks in receive
719 Thread thisThread = Thread.currentThread();
720 runAfterParkedAsync(thisThread::interrupt);
721
722 try {
723 dc.socket().receive(p);
724 fail();
725 } catch (ClosedByInterruptException expected) {
726 assertTrue(Thread.interrupted());
727 }
728 }
729 });
730 }
731
732 /**
733 * Pipe read/write, no blocking.
734 */
735 @ParameterizedTest
736 @MethodSource("threadBuilders")
737 void testPipeReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
738 VThreadRunner.run(builder, () -> {
739 Pipe p = Pipe.open();
740 try (Pipe.SinkChannel sink = p.sink();
741 Pipe.SourceChannel source = p.source()) {
742
743 // write should not block
744 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
745 int n = sink.write(bb);
746 assertTrue(n > 0);
747
748 // read should not block
749 bb = ByteBuffer.allocate(10);
750 n = source.read(bb);
751 assertTrue(n > 0);
752 assertTrue(bb.get(0) == 'X');
753 }
754 });
755 }
756
757 /**
758 * Virtual thread blocks in Pipe.SourceChannel read.
759 */
760 @ParameterizedTest
761 @MethodSource("threadBuilders")
762 void testPipeReadWrite2(Thread.Builder.OfVirtual builder) throws Exception {
763 VThreadRunner.run(builder, () -> {
764 Pipe p = Pipe.open();
765 try (Pipe.SinkChannel sink = p.sink();
766 Pipe.SourceChannel source = p.source()) {
767
768 // write from sink when current thread blocks reading from source
769 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
770 runAfterParkedAsync(() -> sink.write(bb1));
771
772 // read should block
773 ByteBuffer bb2 = ByteBuffer.allocate(10);
774 int n = source.read(bb2);
775 assertTrue(n > 0);
776 assertTrue(bb2.get(0) == 'X');
777 }
778 });
779 }
780
781 /**
782 * Virtual thread blocks in Pipe.SinkChannel write.
783 */
784 @ParameterizedTest
785 @MethodSource("threadBuilders")
786 void testPipeReadWrite3(Thread.Builder.OfVirtual builder) throws Exception {
787 VThreadRunner.run(builder, () -> {
788 Pipe p = Pipe.open();
789 try (Pipe.SinkChannel sink = p.sink();
790 Pipe.SourceChannel source = p.source()) {
791
792 // read from source to EOF when current thread blocking in write
793 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
794
795 // write to sink should block
796 ByteBuffer bb = ByteBuffer.allocate(100*1024);
797 for (int i=0; i<1000; i++) {
798 int n = sink.write(bb);
799 assertTrue(n > 0);
800 bb.clear();
801 }
802 sink.close();
803
804 // wait for reader to finish
805 reader.join();
806 }
807 });
808 }
809
810 /**
811 * Pipe.SourceChannel close while virtual thread blocked in read.
812 */
813 @ParameterizedTest
814 @MethodSource("threadBuilders")
815 void testPipeReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
816 VThreadRunner.run(builder, () -> {
817 Pipe p = Pipe.open();
818 try (Pipe.SinkChannel sink = p.sink();
819 Pipe.SourceChannel source = p.source()) {
820 runAfterParkedAsync(source::close);
821 try {
822 int n = source.read(ByteBuffer.allocate(100));
823 fail("read returned " + n);
824 } catch (AsynchronousCloseException expected) { }
825 }
826 });
827 }
828
829 /**
830 * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
831 */
832 @ParameterizedTest
833 @MethodSource("threadBuilders")
834 void testPipeReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
835 VThreadRunner.run(builder, () -> {
836 Pipe p = Pipe.open();
837 try (Pipe.SinkChannel sink = p.sink();
838 Pipe.SourceChannel source = p.source()) {
839
840 // interrupt current thread when it blocks reading from source
841 Thread thisThread = Thread.currentThread();
842 runAfterParkedAsync(thisThread::interrupt);
843
844 try {
845 int n = source.read(ByteBuffer.allocate(100));
846 fail("read returned " + n);
847 } catch (ClosedByInterruptException expected) {
848 assertTrue(Thread.interrupted());
849 }
850 }
851 });
852 }
853
854 /**
855 * Pipe.SinkChannel close while virtual thread blocked in write.
856 */
857 @ParameterizedTest
858 @MethodSource("threadBuilders")
859 void testPipeWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
860 VThreadRunner.run(builder, () -> {
861 boolean done = false;
862 while (!done) {
863 Pipe p = Pipe.open();
864 try (Pipe.SinkChannel sink = p.sink();
865 Pipe.SourceChannel source = p.source()) {
866
867 // close sink when current thread blocks in write
868 runAfterParkedAsync(sink::close, true);
869
870 // write until channel is closed
871 try {
872 ByteBuffer bb = ByteBuffer.allocate(100*1024);
873 for (;;) {
874 int n = sink.write(bb);
875 assertTrue(n > 0);
876 bb.clear();
877 }
878 } catch (AsynchronousCloseException e) {
879 // closed when blocked in write
880 done = true;
881 } catch (ClosedChannelException e) {
882 // closed but not blocked in write, need to retry test
883 System.err.format("%s, need to retry!%n", e);
884 }
885 }
886 }
887 });
888 }
889
890 /**
891 * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
892 */
893 @ParameterizedTest
894 @MethodSource("threadBuilders")
895 void testPipeWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
896 VThreadRunner.run(builder, () -> {
897 boolean done = false;
898 while (!done) {
899 Pipe p = Pipe.open();
900 try (Pipe.SinkChannel sink = p.sink();
901 Pipe.SourceChannel source = p.source()) {
902
903 // interrupt current thread when it blocks in write
904 Thread thisThread = Thread.currentThread();
905 runAfterParkedAsync(thisThread::interrupt, true);
906
907 // write until channel is closed
908 try {
909 ByteBuffer bb = ByteBuffer.allocate(100*1024);
910 for (;;) {
911 int n = sink.write(bb);
912 assertTrue(n > 0);
913 bb.clear();
914 }
915 } catch (ClosedByInterruptException expected) {
916 // closed when blocked in write
|