4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24 /**
25 * @test id=default
26 * @bug 8284161
27 * @summary Test virtual threads doing blocking I/O on NIO channels
28 * @library /test/lib
29 * @run junit BlockingChannelOps
30 */
31
32 /**
33 * @test id=poller-modes
34 * @requires (os.family == "linux") | (os.family == "mac")
35 * @library /test/lib
36 * @run junit/othervm -Djdk.pollerMode=1 BlockingChannelOps
37 * @run junit/othervm -Djdk.pollerMode=2 BlockingChannelOps
38 */
39
40 /**
41 * @test id=no-vmcontinuations
42 * @requires vm.continuations
43 * @library /test/lib
44 * @run junit/othervm -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
45 */
46
47 import java.io.Closeable;
48 import java.io.IOException;
49 import java.net.DatagramPacket;
50 import java.net.InetAddress;
51 import java.net.InetSocketAddress;
52 import java.net.Socket;
53 import java.net.SocketAddress;
54 import java.net.SocketException;
55 import java.nio.ByteBuffer;
56 import java.nio.channels.AsynchronousCloseException;
57 import java.nio.channels.ClosedByInterruptException;
58 import java.nio.channels.ClosedChannelException;
59 import java.nio.channels.DatagramChannel;
60 import java.nio.channels.Pipe;
61 import java.nio.channels.ReadableByteChannel;
62 import java.nio.channels.ServerSocketChannel;
63 import java.nio.channels.SocketChannel;
64 import java.nio.channels.WritableByteChannel;
65
66 import jdk.test.lib.thread.VThreadRunner;
67 import org.junit.jupiter.api.Test;
68 import static org.junit.jupiter.api.Assertions.*;
69
70 class BlockingChannelOps {
71
72 /**
73 * SocketChannel read/write, no blocking.
74 */
75 @Test
76 void testSocketChannelReadWrite1() throws Exception {
77 VThreadRunner.run(() -> {
78 try (var connection = new Connection()) {
79 SocketChannel sc1 = connection.channel1();
80 SocketChannel sc2 = connection.channel2();
81
82 // write to sc1
83 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
84 int n = sc1.write(bb);
85 assertTrue(n > 0);
86
87 // read from sc2 should not block
88 bb = ByteBuffer.allocate(10);
89 n = sc2.read(bb);
90 assertTrue(n > 0);
91 assertTrue(bb.get(0) == 'X');
92 }
93 });
94 }
95
96 /**
97 * Virtual thread blocks in SocketChannel read.
98 */
99 @Test
100 void testSocketChannelRead() throws Exception {
101 VThreadRunner.run(() -> {
102 try (var connection = new Connection()) {
103 SocketChannel sc1 = connection.channel1();
104 SocketChannel sc2 = connection.channel2();
105
106 // write to sc1 when current thread blocks in sc2.read
107 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
108 runAfterParkedAsync(() -> sc1.write(bb1));
109
110 // read from sc2 should block
111 ByteBuffer bb2 = ByteBuffer.allocate(10);
112 int n = sc2.read(bb2);
113 assertTrue(n > 0);
114 assertTrue(bb2.get(0) == 'X');
115 }
116 });
117 }
118
119 /**
120 * Virtual thread blocks in SocketChannel write.
121 */
122 @Test
123 void testSocketChannelWrite() throws Exception {
124 VThreadRunner.run(() -> {
125 try (var connection = new Connection()) {
126 SocketChannel sc1 = connection.channel1();
127 SocketChannel sc2 = connection.channel2();
128
129 // read from sc2 to EOF when current thread blocks in sc1.write
130 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
131
132 // write to sc1 should block
133 ByteBuffer bb = ByteBuffer.allocate(100*1024);
134 for (int i=0; i<1000; i++) {
135 int n = sc1.write(bb);
136 assertTrue(n > 0);
137 bb.clear();
138 }
139 sc1.close();
140
141 // wait for reader to finish
142 reader.join();
143 }
144 });
145 }
146
147 /**
148 * SocketChannel close while virtual thread blocked in read.
149 */
150 @Test
151 void testSocketChannelReadAsyncClose() throws Exception {
152 VThreadRunner.run(() -> {
153 try (var connection = new Connection()) {
154 SocketChannel sc = connection.channel1();
155 runAfterParkedAsync(sc::close);
156 try {
157 int n = sc.read(ByteBuffer.allocate(100));
158 fail("read returned " + n);
159 } catch (AsynchronousCloseException expected) { }
160 }
161 });
162 }
163
164 /**
165 * Virtual thread interrupted while blocked in SocketChannel read.
166 */
167 @Test
168 void testSocketChannelReadInterrupt() throws Exception {
169 VThreadRunner.run(() -> {
170 try (var connection = new Connection()) {
171 SocketChannel sc = connection.channel1();
172
173 // interrupt current thread when it blocks in read
174 Thread thisThread = Thread.currentThread();
175 runAfterParkedAsync(thisThread::interrupt);
176
177 try {
178 int n = sc.read(ByteBuffer.allocate(100));
179 fail("read returned " + n);
180 } catch (ClosedByInterruptException expected) {
181 assertTrue(Thread.interrupted());
182 }
183 }
184 });
185 }
186
187 /**
188 * SocketChannel close while virtual thread blocked in write.
189 */
190 @Test
191 void testSocketChannelWriteAsyncClose() throws Exception {
192 VThreadRunner.run(() -> {
193 boolean retry = true;
194 while (retry) {
195 try (var connection = new Connection()) {
196 SocketChannel sc = connection.channel1();
197
198 // close sc when current thread blocks in write
199 runAfterParkedAsync(sc::close);
200 try {
201 ByteBuffer bb = ByteBuffer.allocate(100*1024);
202 for (;;) {
203 int n = sc.write(bb);
204 assertTrue(n > 0);
205 bb.clear();
206 }
207 } catch (AsynchronousCloseException expected) {
208 // closed when blocked in write
209 retry = false;
210 } catch (ClosedChannelException e) {
211 // closed when not blocked in write, need to retry test
212 }
213 }
214 }
215 });
216 }
217
218 /**
219 * Virtual thread interrupted while blocked in SocketChannel write.
220 */
221 @Test
222 void testSocketChannelWriteInterrupt() throws Exception {
223 VThreadRunner.run(() -> {
224 boolean retry = true;
225 while (retry) {
226 try (var connection = new Connection()) {
227 SocketChannel sc = connection.channel1();
228
229 // interrupt current thread when it blocks in write
230 Thread thisThread = Thread.currentThread();
231 runAfterParkedAsync(thisThread::interrupt);
232
233 try {
234 ByteBuffer bb = ByteBuffer.allocate(100*1024);
235 for (;;) {
236 int n = sc.write(bb);
237 assertTrue(n > 0);
238 bb.clear();
239 }
240 } catch (ClosedByInterruptException e) {
241 // closed when blocked in write
242 assertTrue(Thread.interrupted());
243 retry = false;
244 } catch (ClosedChannelException e) {
245 // closed when not blocked in write, need to retry test
246 }
247 }
248 }
249 });
250 }
251
252 /**
253 * Virtual thread blocks in SocketChannel adaptor read.
254 */
255 @Test
256 void testSocketAdaptorRead1() throws Exception {
257 testSocketAdaptorRead(0);
258 }
259
260 /**
261 * Virtual thread blocks in SocketChannel adaptor read with timeout.
262 */
263 @Test
264 void testSocketAdaptorRead2() throws Exception {
265 testSocketAdaptorRead(60_000);
266 }
267
268 private void testSocketAdaptorRead(int timeout) throws Exception {
269 VThreadRunner.run(() -> {
270 try (var connection = new Connection()) {
271 SocketChannel sc1 = connection.channel1();
272 SocketChannel sc2 = connection.channel2();
273
274 // write to sc1 when currnet thread blocks reading from sc2
275 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
276 runAfterParkedAsync(() -> sc1.write(bb));
277
278 // read from sc2 should block
279 byte[] array = new byte[100];
280 if (timeout > 0)
281 sc2.socket().setSoTimeout(timeout);
282 int n = sc2.socket().getInputStream().read(array);
283 assertTrue(n > 0);
284 assertTrue(array[0] == 'X');
285 }
286 });
287 }
288
289 /**
290 * ServerSocketChannel accept, no blocking.
291 */
292 @Test
293 void testServerSocketChannelAccept1() throws Exception {
294 VThreadRunner.run(() -> {
295 try (var ssc = ServerSocketChannel.open()) {
296 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
297 var sc1 = SocketChannel.open(ssc.getLocalAddress());
298 // accept should not block
299 var sc2 = ssc.accept();
300 sc1.close();
301 sc2.close();
302 }
303 });
304 }
305
306 /**
307 * Virtual thread blocks in ServerSocketChannel accept.
308 */
309 @Test
310 void testServerSocketChannelAccept2() throws Exception {
311 VThreadRunner.run(() -> {
312 try (var ssc = ServerSocketChannel.open()) {
313 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
314 var sc1 = SocketChannel.open();
315
316 // connect when current thread when it blocks in accept
317 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
318
319 // accept should block
320 var sc2 = ssc.accept();
321 sc1.close();
322 sc2.close();
323 }
324 });
325 }
326
327 /**
328 * SeverSocketChannel close while virtual thread blocked in accept.
329 */
330 @Test
331 void testServerSocketChannelAcceptAsyncClose() throws Exception {
332 VThreadRunner.run(() -> {
333 try (var ssc = ServerSocketChannel.open()) {
334 InetAddress lh = InetAddress.getLoopbackAddress();
335 ssc.bind(new InetSocketAddress(lh, 0));
336 runAfterParkedAsync(ssc::close);
337 try {
338 SocketChannel sc = ssc.accept();
339 sc.close();
340 fail("connection accepted???");
341 } catch (AsynchronousCloseException expected) { }
342 }
343 });
344 }
345
346 /**
347 * Virtual thread interrupted while blocked in ServerSocketChannel accept.
348 */
349 @Test
350 void testServerSocketChannelAcceptInterrupt() throws Exception {
351 VThreadRunner.run(() -> {
352 try (var ssc = ServerSocketChannel.open()) {
353 InetAddress lh = InetAddress.getLoopbackAddress();
354 ssc.bind(new InetSocketAddress(lh, 0));
355
356 // interrupt current thread when it blocks in accept
357 Thread thisThread = Thread.currentThread();
358 runAfterParkedAsync(thisThread::interrupt);
359
360 try {
361 SocketChannel sc = ssc.accept();
362 sc.close();
363 fail("connection accepted???");
364 } catch (ClosedByInterruptException expected) {
365 assertTrue(Thread.interrupted());
366 }
367 }
368 });
369 }
370
371 /**
372 * Virtual thread blocks in ServerSocketChannel adaptor accept.
373 */
374 @Test
375 void testSocketChannelAdaptorAccept1() throws Exception {
376 testSocketChannelAdaptorAccept(0);
377 }
378
379 /**
380 * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
381 */
382 @Test
383 void testSocketChannelAdaptorAccept2() throws Exception {
384 testSocketChannelAdaptorAccept(60_000);
385 }
386
387 private void testSocketChannelAdaptorAccept(int timeout) throws Exception {
388 VThreadRunner.run(() -> {
389 try (var ssc = ServerSocketChannel.open()) {
390 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
391 var sc = SocketChannel.open();
392
393 // interrupt current thread when it blocks in accept
394 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
395
396 // accept should block
397 if (timeout > 0)
398 ssc.socket().setSoTimeout(timeout);
399 Socket s = ssc.socket().accept();
400 sc.close();
401 s.close();
402 }
403 });
404 }
405
406 /**
407 * DatagramChannel receive/send, no blocking.
408 */
409 @Test
410 void testDatagramChannelSendReceive1() throws Exception {
411 VThreadRunner.run(() -> {
412 try (DatagramChannel dc1 = DatagramChannel.open();
413 DatagramChannel dc2 = DatagramChannel.open()) {
414
415 InetAddress lh = InetAddress.getLoopbackAddress();
416 dc2.bind(new InetSocketAddress(lh, 0));
417
418 // send should not block
419 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
420 int n = dc1.send(bb, dc2.getLocalAddress());
421 assertTrue(n > 0);
422
423 // receive should not block
424 bb = ByteBuffer.allocate(10);
425 dc2.receive(bb);
426 assertTrue(bb.get(0) == 'X');
427 }
428 });
429 }
430
431 /**
432 * Virtual thread blocks in DatagramChannel receive.
433 */
434 @Test
435 void testDatagramChannelSendReceive2() throws Exception {
436 VThreadRunner.run(() -> {
437 try (DatagramChannel dc1 = DatagramChannel.open();
438 DatagramChannel dc2 = DatagramChannel.open()) {
439
440 InetAddress lh = InetAddress.getLoopbackAddress();
441 dc2.bind(new InetSocketAddress(lh, 0));
442
443 // send from dc1 when current thread blocked in dc2.receive
444 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
445 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
446
447 // read from dc2 should block
448 ByteBuffer bb2 = ByteBuffer.allocate(10);
449 dc2.receive(bb2);
450 assertTrue(bb2.get(0) == 'X');
451 }
452 });
453 }
454
455 /**
456 * DatagramChannel close while virtual thread blocked in receive.
457 */
458 @Test
459 void testDatagramChannelReceiveAsyncClose() throws Exception {
460 VThreadRunner.run(() -> {
461 try (DatagramChannel dc = DatagramChannel.open()) {
462 InetAddress lh = InetAddress.getLoopbackAddress();
463 dc.bind(new InetSocketAddress(lh, 0));
464 runAfterParkedAsync(dc::close);
465 try {
466 dc.receive(ByteBuffer.allocate(100));
467 fail("receive returned");
468 } catch (AsynchronousCloseException expected) { }
469 }
470 });
471 }
472
473 /**
474 * Virtual thread interrupted while blocked in DatagramChannel receive.
475 */
476 @Test
477 void testDatagramChannelReceiveInterrupt() throws Exception {
478 VThreadRunner.run(() -> {
479 try (DatagramChannel dc = DatagramChannel.open()) {
480 InetAddress lh = InetAddress.getLoopbackAddress();
481 dc.bind(new InetSocketAddress(lh, 0));
482
483 // interrupt current thread when it blocks in receive
484 Thread thisThread = Thread.currentThread();
485 runAfterParkedAsync(thisThread::interrupt);
486
487 try {
488 dc.receive(ByteBuffer.allocate(100));
489 fail("receive returned");
490 } catch (ClosedByInterruptException expected) {
491 assertTrue(Thread.interrupted());
492 }
493 }
494 });
495 }
496
497 /**
498 * Virtual thread blocks in DatagramSocket adaptor receive.
499 */
500 @Test
501 void testDatagramSocketAdaptorReceive1() throws Exception {
502 testDatagramSocketAdaptorReceive(0);
503 }
504
505 /**
506 * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
507 */
508 @Test
509 void testDatagramSocketAdaptorReceive2() throws Exception {
510 testDatagramSocketAdaptorReceive(60_000);
511 }
512
513 private void testDatagramSocketAdaptorReceive(int timeout) throws Exception {
514 VThreadRunner.run(() -> {
515 try (DatagramChannel dc1 = DatagramChannel.open();
516 DatagramChannel dc2 = DatagramChannel.open()) {
517
518 InetAddress lh = InetAddress.getLoopbackAddress();
519 dc2.bind(new InetSocketAddress(lh, 0));
520
521 // send from dc1 when current thread blocks in dc2 receive
522 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
523 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
524
525 // receive should block
526 byte[] array = new byte[100];
527 DatagramPacket p = new DatagramPacket(array, 0, array.length);
528 if (timeout > 0)
529 dc2.socket().setSoTimeout(timeout);
530 dc2.socket().receive(p);
531 assertTrue(p.getLength() == 3 && array[0] == 'X');
532 }
533 });
534 }
535
536 /**
537 * DatagramChannel close while virtual thread blocked in adaptor receive.
538 */
539 @Test
540 void testDatagramSocketAdaptorReceiveAsyncClose1() throws Exception {
541 testDatagramSocketAdaptorReceiveAsyncClose(0);
542 }
543
544 /**
545 * DatagramChannel close while virtual thread blocked in adaptor receive
546 * with timeout.
547 */
548 @Test
549 void testDatagramSocketAdaptorReceiveAsyncClose2() throws Exception {
550 testDatagramSocketAdaptorReceiveAsyncClose(60_1000);
551 }
552
553 private void testDatagramSocketAdaptorReceiveAsyncClose(int timeout) throws Exception {
554 VThreadRunner.run(() -> {
555 try (DatagramChannel dc = DatagramChannel.open()) {
556 InetAddress lh = InetAddress.getLoopbackAddress();
557 dc.bind(new InetSocketAddress(lh, 0));
558
559 byte[] array = new byte[100];
560 DatagramPacket p = new DatagramPacket(array, 0, array.length);
561 if (timeout > 0)
562 dc.socket().setSoTimeout(timeout);
563
564 // close channel/socket when current thread blocks in receive
565 runAfterParkedAsync(dc::close);
566
567 assertThrows(SocketException.class, () -> dc.socket().receive(p));
568 }
569 });
570 }
571
572 /**
573 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
574 */
575 @Test
576 void testDatagramSocketAdaptorReceiveInterrupt1() throws Exception {
577 testDatagramSocketAdaptorReceiveInterrupt(0);
578 }
579
580 /**
581 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
582 * with timeout.
583 */
584 @Test
585 void testDatagramSocketAdaptorReceiveInterrupt2() throws Exception {
586 testDatagramSocketAdaptorReceiveInterrupt(60_1000);
587 }
588
589 private void testDatagramSocketAdaptorReceiveInterrupt(int timeout) throws Exception {
590 VThreadRunner.run(() -> {
591 try (DatagramChannel dc = DatagramChannel.open()) {
592 InetAddress lh = InetAddress.getLoopbackAddress();
593 dc.bind(new InetSocketAddress(lh, 0));
594
595 byte[] array = new byte[100];
596 DatagramPacket p = new DatagramPacket(array, 0, array.length);
597 if (timeout > 0)
598 dc.socket().setSoTimeout(timeout);
599
600 // interrupt current thread when it blocks in receive
601 Thread thisThread = Thread.currentThread();
602 runAfterParkedAsync(thisThread::interrupt);
603
604 try {
605 dc.socket().receive(p);
606 fail();
607 } catch (ClosedByInterruptException expected) {
608 assertTrue(Thread.interrupted());
609 }
610 }
611 });
612 }
613
614 /**
615 * Pipe read/write, no blocking.
616 */
617 @Test
618 void testPipeReadWrite1() throws Exception {
619 VThreadRunner.run(() -> {
620 Pipe p = Pipe.open();
621 try (Pipe.SinkChannel sink = p.sink();
622 Pipe.SourceChannel source = p.source()) {
623
624 // write should not block
625 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
626 int n = sink.write(bb);
627 assertTrue(n > 0);
628
629 // read should not block
630 bb = ByteBuffer.allocate(10);
631 n = source.read(bb);
632 assertTrue(n > 0);
633 assertTrue(bb.get(0) == 'X');
634 }
635 });
636 }
637
638 /**
639 * Virtual thread blocks in Pipe.SourceChannel read.
640 */
641 @Test
642 void testPipeReadWrite2() throws Exception {
643 VThreadRunner.run(() -> {
644 Pipe p = Pipe.open();
645 try (Pipe.SinkChannel sink = p.sink();
646 Pipe.SourceChannel source = p.source()) {
647
648 // write from sink when current thread blocks reading from source
649 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
650 runAfterParkedAsync(() -> sink.write(bb1));
651
652 // read should block
653 ByteBuffer bb2 = ByteBuffer.allocate(10);
654 int n = source.read(bb2);
655 assertTrue(n > 0);
656 assertTrue(bb2.get(0) == 'X');
657 }
658 });
659 }
660
661 /**
662 * Virtual thread blocks in Pipe.SinkChannel write.
663 */
664 @Test
665 void testPipeReadWrite3() throws Exception {
666 VThreadRunner.run(() -> {
667 Pipe p = Pipe.open();
668 try (Pipe.SinkChannel sink = p.sink();
669 Pipe.SourceChannel source = p.source()) {
670
671 // read from source to EOF when current thread blocking in write
672 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
673
674 // write to sink should block
675 ByteBuffer bb = ByteBuffer.allocate(100*1024);
676 for (int i=0; i<1000; i++) {
677 int n = sink.write(bb);
678 assertTrue(n > 0);
679 bb.clear();
680 }
681 sink.close();
682
683 // wait for reader to finish
684 reader.join();
685 }
686 });
687 }
688
689 /**
690 * Pipe.SourceChannel close while virtual thread blocked in read.
691 */
692 @Test
693 void testPipeReadAsyncClose() throws Exception {
694 VThreadRunner.run(() -> {
695 Pipe p = Pipe.open();
696 try (Pipe.SinkChannel sink = p.sink();
697 Pipe.SourceChannel source = p.source()) {
698 runAfterParkedAsync(source::close);
699 try {
700 int n = source.read(ByteBuffer.allocate(100));
701 fail("read returned " + n);
702 } catch (AsynchronousCloseException expected) { }
703 }
704 });
705 }
706
707 /**
708 * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
709 */
710 @Test
711 void testPipeReadInterrupt() throws Exception {
712 VThreadRunner.run(() -> {
713 Pipe p = Pipe.open();
714 try (Pipe.SinkChannel sink = p.sink();
715 Pipe.SourceChannel source = p.source()) {
716
717 // interrupt current thread when it blocks reading from source
718 Thread thisThread = Thread.currentThread();
719 runAfterParkedAsync(thisThread::interrupt);
720
721 try {
722 int n = source.read(ByteBuffer.allocate(100));
723 fail("read returned " + n);
724 } catch (ClosedByInterruptException expected) {
725 assertTrue(Thread.interrupted());
726 }
727 }
728 });
729 }
730
731 /**
732 * Pipe.SinkChannel close while virtual thread blocked in write.
733 */
734 @Test
735 void testPipeWriteAsyncClose() throws Exception {
736 VThreadRunner.run(() -> {
737 boolean retry = true;
738 while (retry) {
739 Pipe p = Pipe.open();
740 try (Pipe.SinkChannel sink = p.sink();
741 Pipe.SourceChannel source = p.source()) {
742
743 // close sink when current thread blocks in write
744 runAfterParkedAsync(sink::close);
745 try {
746 ByteBuffer bb = ByteBuffer.allocate(100*1024);
747 for (;;) {
748 int n = sink.write(bb);
749 assertTrue(n > 0);
750 bb.clear();
751 }
752 } catch (AsynchronousCloseException e) {
753 // closed when blocked in write
754 retry = false;
755 } catch (ClosedChannelException e) {
756 // closed when not blocked in write, need to retry test
757 }
758 }
759 }
760 });
761 }
762
763 /**
764 * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
765 */
766 @Test
767 void testPipeWriteInterrupt() throws Exception {
768 VThreadRunner.run(() -> {
769 boolean retry = true;
770 while (retry) {
771 Pipe p = Pipe.open();
772 try (Pipe.SinkChannel sink = p.sink();
773 Pipe.SourceChannel source = p.source()) {
774
775 // interrupt current thread when it blocks in write
776 Thread thisThread = Thread.currentThread();
777 runAfterParkedAsync(thisThread::interrupt);
778
779 try {
780 ByteBuffer bb = ByteBuffer.allocate(100*1024);
781 for (;;) {
782 int n = sink.write(bb);
783 assertTrue(n > 0);
784 bb.clear();
785 }
786 } catch (ClosedByInterruptException expected) {
787 // closed when blocked in write
788 assertTrue(Thread.interrupted());
|
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24 /*
25 * @test id=default
26 * @bug 8284161
27 * @summary Test virtual threads doing blocking I/O on NIO channels
28 * @library /test/lib
29 * @run junit/othervm BlockingChannelOps
30 */
31
32 /*
33 * @test id=poller-modes
34 * @requires (os.family == "linux") | (os.family == "mac")
35 * @library /test/lib
36 * @run junit/othervm -Djdk.pollerMode=1 BlockingChannelOps
37 * @run junit/othervm -Djdk.pollerMode=2 BlockingChannelOps
38 * @run junit/othervm -Djdk.pollerMode=3 BlockingChannelOps
39 */
40
41 /*
42 * @test id=io_uring
43 * @requires os.family == "linux"
44 * @library /test/lib
45 * @run junit/othervm -Djdk.pollerMode=1 -Djdk.io_uring=true BlockingChannelOps
46 * @run junit/othervm -Djdk.pollerMode=2 -Djdk.io_uring=true BlockingChannelOps
47 * @run junit/othervm -Djdk.pollerMode=3 -Djdk.io_uring=true BlockingChannelOps
48 */
49
50 /*
51 * @test id=no-vmcontinuations
52 * @requires vm.continuations
53 * @library /test/lib
54 * @run junit/othervm -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
55 */
56
57 import java.io.Closeable;
58 import java.io.IOException;
59 import java.net.DatagramPacket;
60 import java.net.InetAddress;
61 import java.net.InetSocketAddress;
62 import java.net.Socket;
63 import java.net.SocketAddress;
64 import java.net.SocketException;
65 import java.nio.ByteBuffer;
66 import java.nio.channels.AsynchronousCloseException;
67 import java.nio.channels.ClosedByInterruptException;
68 import java.nio.channels.ClosedChannelException;
69 import java.nio.channels.DatagramChannel;
70 import java.nio.channels.Pipe;
71 import java.nio.channels.ReadableByteChannel;
72 import java.nio.channels.ServerSocketChannel;
73 import java.nio.channels.SocketChannel;
74 import java.nio.channels.WritableByteChannel;
75 import java.util.concurrent.ExecutorService;
76 import java.util.concurrent.Executors;
77 import java.util.concurrent.ThreadFactory;
78 import java.util.stream.Stream;
79
80 import jdk.test.lib.thread.VThreadRunner;
81 import jdk.test.lib.thread.VThreadScheduler;
82
83 import org.junit.jupiter.api.BeforeAll;
84 import org.junit.jupiter.params.ParameterizedTest;
85 import org.junit.jupiter.params.provider.MethodSource;
86 import static org.junit.jupiter.api.Assertions.*;
87
88 class BlockingChannelOps {
89 private static ExecutorService threadPool;
90 private static Thread.VirtualThreadScheduler customScheduler;
91
92 @BeforeAll
93 static void setup() {
94 ThreadFactory factory = Thread.ofPlatform().daemon().factory();
95 threadPool = Executors.newCachedThreadPool(factory);
96 customScheduler = (_, task) -> threadPool.execute(task);
97 }
98
99 /**
100 * Returns the Thread.Builder to create the virtual thread.
101 */
102 static Stream<Thread.Builder.OfVirtual> threadBuilders() {
103 if (VThreadScheduler.supportsCustomScheduler()) {
104 return Stream.of(Thread.ofVirtual(), Thread.ofVirtual().scheduler(customScheduler));
105 } else {
106 return Stream.of(Thread.ofVirtual());
107 }
108 }
109
110 /**
111 * SocketChannel read/write, no blocking.
112 */
113 @ParameterizedTest
114 @MethodSource("threadBuilders")
115 void testSocketChannelReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
116 VThreadRunner.run(builder, () -> {
117 try (var connection = new Connection()) {
118 SocketChannel sc1 = connection.channel1();
119 SocketChannel sc2 = connection.channel2();
120
121 // write to sc1
122 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
123 int n = sc1.write(bb);
124 assertTrue(n > 0);
125
126 // read from sc2 should not block
127 bb = ByteBuffer.allocate(10);
128 n = sc2.read(bb);
129 assertTrue(n > 0);
130 assertTrue(bb.get(0) == 'X');
131 }
132 });
133 }
134
135 /**
136 * Virtual thread blocks in SocketChannel read.
137 */
138 @ParameterizedTest
139 @MethodSource("threadBuilders")
140 void testSocketChannelRead(Thread.Builder.OfVirtual builder) throws Exception {
141 VThreadRunner.run(builder, () -> {
142 try (var connection = new Connection()) {
143 SocketChannel sc1 = connection.channel1();
144 SocketChannel sc2 = connection.channel2();
145
146 // write to sc1 when current thread blocks in sc2.read
147 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
148 runAfterParkedAsync(() -> sc1.write(bb1));
149
150 // read from sc2 should block
151 ByteBuffer bb2 = ByteBuffer.allocate(10);
152 int n = sc2.read(bb2);
153 assertTrue(n > 0);
154 assertTrue(bb2.get(0) == 'X');
155 }
156 });
157 }
158
159 /**
160 * Virtual thread blocks in SocketChannel write.
161 */
162 @ParameterizedTest
163 @MethodSource("threadBuilders")
164 void testSocketChannelWrite(Thread.Builder.OfVirtual builder) throws Exception {
165 VThreadRunner.run(builder, () -> {
166 try (var connection = new Connection()) {
167 SocketChannel sc1 = connection.channel1();
168 SocketChannel sc2 = connection.channel2();
169
170 // read from sc2 to EOF when current thread blocks in sc1.write
171 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
172
173 // write to sc1 should block
174 ByteBuffer bb = ByteBuffer.allocate(100*1024);
175 for (int i=0; i<1000; i++) {
176 int n = sc1.write(bb);
177 assertTrue(n > 0);
178 bb.clear();
179 }
180 sc1.close();
181
182 // wait for reader to finish
183 reader.join();
184 }
185 });
186 }
187
188 /**
189 * SocketChannel close while virtual thread blocked in read.
190 */
191 @ParameterizedTest
192 @MethodSource("threadBuilders")
193 void testSocketChannelReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
194 VThreadRunner.run(builder, () -> {
195 try (var connection = new Connection()) {
196 SocketChannel sc = connection.channel1();
197 runAfterParkedAsync(sc::close);
198 try {
199 int n = sc.read(ByteBuffer.allocate(100));
200 fail("read returned " + n);
201 } catch (AsynchronousCloseException expected) { }
202 }
203 });
204 }
205
206 /**
207 * SocketChannel shutdownInput while virtual thread blocked in read.
208 */
209 @ParameterizedTest
210 @MethodSource("threadBuilders")
211 void testSocketChannelReadAsyncShutdownInput(Thread.Builder.OfVirtual builder) throws Exception {
212 VThreadRunner.run(builder, () -> {
213 try (var connection = new Connection()) {
214 SocketChannel sc = connection.channel1();
215 runAfterParkedAsync(sc::shutdownInput);
216 int n = sc.read(ByteBuffer.allocate(100));
217 assertEquals(-1, n);
218 assertTrue(sc.isOpen());
219 }
220 });
221 }
222
223 /**
224 * Virtual thread interrupted while blocked in SocketChannel read.
225 */
226 @ParameterizedTest
227 @MethodSource("threadBuilders")
228 void testSocketChannelReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
229 VThreadRunner.run(builder, () -> {
230 try (var connection = new Connection()) {
231 SocketChannel sc = connection.channel1();
232
233 // interrupt current thread when it blocks in read
234 Thread thisThread = Thread.currentThread();
235 runAfterParkedAsync(thisThread::interrupt);
236
237 try {
238 int n = sc.read(ByteBuffer.allocate(100));
239 fail("read returned " + n);
240 } catch (ClosedByInterruptException expected) {
241 assertTrue(Thread.interrupted());
242 }
243 }
244 });
245 }
246
247 /**
248 * SocketChannel close while virtual thread blocked in write.
249 */
250 @ParameterizedTest
251 @MethodSource("threadBuilders")
252 void testSocketChannelWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
253 VThreadRunner.run(builder, () -> {
254 boolean retry = true;
255 while (retry) {
256 try (var connection = new Connection()) {
257 SocketChannel sc = connection.channel1();
258
259 // close sc when current thread blocks in write
260 runAfterParkedAsync(sc::close);
261 try {
262 ByteBuffer bb = ByteBuffer.allocate(100*1024);
263 for (;;) {
264 int n = sc.write(bb);
265 assertTrue(n > 0);
266 bb.clear();
267 }
268 } catch (AsynchronousCloseException expected) {
269 // closed when blocked in write
270 retry = false;
271 } catch (ClosedChannelException e) {
272 // closed when not blocked in write, need to retry test
273 }
274 }
275 }
276 });
277 }
278
279
280 /**
281 * SocketChannel shutdownOutput while virtual thread blocked in write.
282 */
283 @ParameterizedTest
284 @MethodSource("threadBuilders")
285 void testSocketChannelWriteAsyncShutdownOutput(Thread.Builder.OfVirtual builder) throws Exception {
286 VThreadRunner.run(builder, () -> {
287 try (var connection = new Connection()) {
288 SocketChannel sc = connection.channel1();
289
290 // shutdown output when current thread blocks in write
291 runAfterParkedAsync(sc::shutdownOutput);
292 try {
293 ByteBuffer bb = ByteBuffer.allocate(100*1024);
294 for (;;) {
295 int n = sc.write(bb);
296 assertTrue(n > 0);
297 bb.clear();
298 }
299 } catch (ClosedChannelException e) {
300 // expected
301 }
302 assertTrue(sc.isOpen());
303 }
304 });
305 }
306
307 /**
308 * Virtual thread interrupted while blocked in SocketChannel write.
309 */
310 @ParameterizedTest
311 @MethodSource("threadBuilders")
312 void testSocketChannelWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
313 VThreadRunner.run(builder, () -> {
314 boolean retry = true;
315 while (retry) {
316 try (var connection = new Connection()) {
317 SocketChannel sc = connection.channel1();
318
319 // interrupt current thread when it blocks in write
320 Thread thisThread = Thread.currentThread();
321 runAfterParkedAsync(thisThread::interrupt);
322
323 try {
324 ByteBuffer bb = ByteBuffer.allocate(100*1024);
325 for (;;) {
326 int n = sc.write(bb);
327 assertTrue(n > 0);
328 bb.clear();
329 }
330 } catch (ClosedByInterruptException e) {
331 // closed when blocked in write
332 assertTrue(Thread.interrupted());
333 retry = false;
334 } catch (ClosedChannelException e) {
335 // closed when not blocked in write, need to retry test
336 }
337 }
338 }
339 });
340 }
341
342 /**
343 * Virtual thread blocks in SocketChannel adaptor read.
344 */
345 @ParameterizedTest
346 @MethodSource("threadBuilders")
347 void testSocketAdaptorRead1(Thread.Builder.OfVirtual builder) throws Exception {
348 testSocketAdaptorRead(builder, 0);
349 }
350
351 /**
352 * Virtual thread blocks in SocketChannel adaptor read with timeout.
353 */
354 @ParameterizedTest
355 @MethodSource("threadBuilders")
356 void testSocketAdaptorRead2(Thread.Builder.OfVirtual builder) throws Exception {
357 testSocketAdaptorRead(builder, 60_000);
358 }
359
360 private void testSocketAdaptorRead(Thread.Builder.OfVirtual builder,
361 int timeout) throws Exception {
362 VThreadRunner.run(builder, () -> {
363 try (var connection = new Connection()) {
364 SocketChannel sc1 = connection.channel1();
365 SocketChannel sc2 = connection.channel2();
366
367 // write to sc1 when currnet thread blocks reading from sc2
368 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
369 runAfterParkedAsync(() -> sc1.write(bb));
370
371 // read from sc2 should block
372 byte[] array = new byte[100];
373 if (timeout > 0)
374 sc2.socket().setSoTimeout(timeout);
375 int n = sc2.socket().getInputStream().read(array);
376 assertTrue(n > 0);
377 assertTrue(array[0] == 'X');
378 }
379 });
380 }
381
382 /**
383 * ServerSocketChannel accept, no blocking.
384 */
385 @ParameterizedTest
386 @MethodSource("threadBuilders")
387 void testServerSocketChannelAccept1(Thread.Builder.OfVirtual builder) throws Exception {
388 VThreadRunner.run(builder, () -> {
389 try (var ssc = ServerSocketChannel.open()) {
390 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
391 var sc1 = SocketChannel.open(ssc.getLocalAddress());
392 // accept should not block
393 var sc2 = ssc.accept();
394 sc1.close();
395 sc2.close();
396 }
397 });
398 }
399
400 /**
401 * Virtual thread blocks in ServerSocketChannel accept.
402 */
403 @ParameterizedTest
404 @MethodSource("threadBuilders")
405 void testServerSocketChannelAccept2(Thread.Builder.OfVirtual builder) throws Exception {
406 VThreadRunner.run(builder, () -> {
407 try (var ssc = ServerSocketChannel.open()) {
408 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
409 var sc1 = SocketChannel.open();
410
411 // connect when current thread when it blocks in accept
412 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
413
414 // accept should block
415 var sc2 = ssc.accept();
416 sc1.close();
417 sc2.close();
418 }
419 });
420 }
421
422 /**
423 * SeverSocketChannel close while virtual thread blocked in accept.
424 */
425 @ParameterizedTest
426 @MethodSource("threadBuilders")
427 void testServerSocketChannelAcceptAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
428 VThreadRunner.run(builder, () -> {
429 try (var ssc = ServerSocketChannel.open()) {
430 InetAddress lh = InetAddress.getLoopbackAddress();
431 ssc.bind(new InetSocketAddress(lh, 0));
432 runAfterParkedAsync(ssc::close);
433 try {
434 SocketChannel sc = ssc.accept();
435 sc.close();
436 fail("connection accepted???");
437 } catch (AsynchronousCloseException expected) { }
438 }
439 });
440 }
441
442 /**
443 * Virtual thread interrupted while blocked in ServerSocketChannel accept.
444 */
445 @ParameterizedTest
446 @MethodSource("threadBuilders")
447 void testServerSocketChannelAcceptInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
448 VThreadRunner.run(builder, () -> {
449 try (var ssc = ServerSocketChannel.open()) {
450 InetAddress lh = InetAddress.getLoopbackAddress();
451 ssc.bind(new InetSocketAddress(lh, 0));
452
453 // interrupt current thread when it blocks in accept
454 Thread thisThread = Thread.currentThread();
455 runAfterParkedAsync(thisThread::interrupt);
456
457 try {
458 SocketChannel sc = ssc.accept();
459 sc.close();
460 fail("connection accepted???");
461 } catch (ClosedByInterruptException expected) {
462 assertTrue(Thread.interrupted());
463 }
464 }
465 });
466 }
467
468 /**
469 * Virtual thread blocks in ServerSocketChannel adaptor accept.
470 */
471 @ParameterizedTest
472 @MethodSource("threadBuilders")
473 void testSocketChannelAdaptorAccept1(Thread.Builder.OfVirtual builder) throws Exception {
474 testSocketChannelAdaptorAccept(builder, 0);
475 }
476
477 /**
478 * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
479 */
480 @ParameterizedTest
481 @MethodSource("threadBuilders")
482 void testSocketChannelAdaptorAccept2(Thread.Builder.OfVirtual builder) throws Exception {
483 testSocketChannelAdaptorAccept(builder, 60_000);
484 }
485
486 private void testSocketChannelAdaptorAccept(Thread.Builder.OfVirtual builder,
487 int timeout) throws Exception {
488 VThreadRunner.run(builder, () -> {
489 try (var ssc = ServerSocketChannel.open()) {
490 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
491 var sc = SocketChannel.open();
492
493 // interrupt current thread when it blocks in accept
494 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
495
496 // accept should block
497 if (timeout > 0)
498 ssc.socket().setSoTimeout(timeout);
499 Socket s = ssc.socket().accept();
500 sc.close();
501 s.close();
502 }
503 });
504 }
505
506 /**
507 * DatagramChannel receive/send, no blocking.
508 */
509 @ParameterizedTest
510 @MethodSource("threadBuilders")
511 void testDatagramChannelSendReceive1(Thread.Builder.OfVirtual builder) throws Exception {
512 VThreadRunner.run(builder, () -> {
513 try (DatagramChannel dc1 = DatagramChannel.open();
514 DatagramChannel dc2 = DatagramChannel.open()) {
515
516 InetAddress lh = InetAddress.getLoopbackAddress();
517 dc2.bind(new InetSocketAddress(lh, 0));
518
519 // send should not block
520 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
521 int n = dc1.send(bb, dc2.getLocalAddress());
522 assertTrue(n > 0);
523
524 // receive should not block
525 bb = ByteBuffer.allocate(10);
526 dc2.receive(bb);
527 assertTrue(bb.get(0) == 'X');
528 }
529 });
530 }
531
532 /**
533 * Virtual thread blocks in DatagramChannel receive.
534 */
535 @ParameterizedTest
536 @MethodSource("threadBuilders")
537 void testDatagramChannelSendReceive2(Thread.Builder.OfVirtual builder) throws Exception {
538 VThreadRunner.run(builder, () -> {
539 try (DatagramChannel dc1 = DatagramChannel.open();
540 DatagramChannel dc2 = DatagramChannel.open()) {
541
542 InetAddress lh = InetAddress.getLoopbackAddress();
543 dc2.bind(new InetSocketAddress(lh, 0));
544
545 // send from dc1 when current thread blocked in dc2.receive
546 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
547 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
548
549 // read from dc2 should block
550 ByteBuffer bb2 = ByteBuffer.allocate(10);
551 dc2.receive(bb2);
552 assertTrue(bb2.get(0) == 'X');
553 }
554 });
555 }
556
557 /**
558 * DatagramChannel close while virtual thread blocked in receive.
559 */
560 @ParameterizedTest
561 @MethodSource("threadBuilders")
562 void testDatagramChannelReceiveAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
563 VThreadRunner.run(builder, () -> {
564 try (DatagramChannel dc = DatagramChannel.open()) {
565 InetAddress lh = InetAddress.getLoopbackAddress();
566 dc.bind(new InetSocketAddress(lh, 0));
567 runAfterParkedAsync(dc::close);
568 try {
569 dc.receive(ByteBuffer.allocate(100));
570 fail("receive returned");
571 } catch (AsynchronousCloseException expected) { }
572 }
573 });
574 }
575
576 /**
577 * Virtual thread interrupted while blocked in DatagramChannel receive.
578 */
579 @ParameterizedTest
580 @MethodSource("threadBuilders")
581 void testDatagramChannelReceiveInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
582 VThreadRunner.run(builder, () -> {
583 try (DatagramChannel dc = DatagramChannel.open()) {
584 InetAddress lh = InetAddress.getLoopbackAddress();
585 dc.bind(new InetSocketAddress(lh, 0));
586
587 // interrupt current thread when it blocks in receive
588 Thread thisThread = Thread.currentThread();
589 runAfterParkedAsync(thisThread::interrupt);
590
591 try {
592 dc.receive(ByteBuffer.allocate(100));
593 fail("receive returned");
594 } catch (ClosedByInterruptException expected) {
595 assertTrue(Thread.interrupted());
596 }
597 }
598 });
599 }
600
601 /**
602 * Virtual thread blocks in DatagramSocket adaptor receive.
603 */
604 @ParameterizedTest
605 @MethodSource("threadBuilders")
606 void testDatagramSocketAdaptorReceive1(Thread.Builder.OfVirtual builder) throws Exception {
607 testDatagramSocketAdaptorReceive(builder, 0);
608 }
609
610 /**
611 * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
612 */
613 @ParameterizedTest
614 @MethodSource("threadBuilders")
615 void testDatagramSocketAdaptorReceive2(Thread.Builder.OfVirtual builder) throws Exception {
616 testDatagramSocketAdaptorReceive(builder, 60_000);
617 }
618
619 private void testDatagramSocketAdaptorReceive(Thread.Builder.OfVirtual builder,
620 int timeout) throws Exception {
621 VThreadRunner.run(builder, () -> {
622 try (DatagramChannel dc1 = DatagramChannel.open();
623 DatagramChannel dc2 = DatagramChannel.open()) {
624
625 InetAddress lh = InetAddress.getLoopbackAddress();
626 dc2.bind(new InetSocketAddress(lh, 0));
627
628 // send from dc1 when current thread blocks in dc2 receive
629 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
630 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
631
632 // receive should block
633 byte[] array = new byte[100];
634 DatagramPacket p = new DatagramPacket(array, 0, array.length);
635 if (timeout > 0)
636 dc2.socket().setSoTimeout(timeout);
637 dc2.socket().receive(p);
638 assertTrue(p.getLength() == 3 && array[0] == 'X');
639 }
640 });
641 }
642
643 /**
644 * DatagramChannel close while virtual thread blocked in adaptor receive.
645 */
646 @ParameterizedTest
647 @MethodSource("threadBuilders")
648 void testDatagramSocketAdaptorReceiveAsyncClose1(Thread.Builder.OfVirtual builder) throws Exception {
649 testDatagramSocketAdaptorReceiveAsyncClose(builder, 0);
650 }
651
652 /**
653 * DatagramChannel close while virtual thread blocked in adaptor receive
654 * with timeout.
655 */
656 @ParameterizedTest
657 @MethodSource("threadBuilders")
658 void testDatagramSocketAdaptorReceiveAsyncClose2(Thread.Builder.OfVirtual builder) throws Exception {
659 testDatagramSocketAdaptorReceiveAsyncClose(builder, 60_1000);
660 }
661
662 private void testDatagramSocketAdaptorReceiveAsyncClose(Thread.Builder.OfVirtual builder,
663 int timeout) throws Exception {
664 VThreadRunner.run(builder, () -> {
665 try (DatagramChannel dc = DatagramChannel.open()) {
666 InetAddress lh = InetAddress.getLoopbackAddress();
667 dc.bind(new InetSocketAddress(lh, 0));
668
669 byte[] array = new byte[100];
670 DatagramPacket p = new DatagramPacket(array, 0, array.length);
671 if (timeout > 0)
672 dc.socket().setSoTimeout(timeout);
673
674 // close channel/socket when current thread blocks in receive
675 runAfterParkedAsync(dc::close);
676
677 assertThrows(SocketException.class, () -> dc.socket().receive(p));
678 }
679 });
680 }
681
682 /**
683 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
684 */
685 @ParameterizedTest
686 @MethodSource("threadBuilders")
687 void testDatagramSocketAdaptorReceiveInterrupt1(Thread.Builder.OfVirtual builder) throws Exception {
688 testDatagramSocketAdaptorReceiveInterrupt(builder, 0);
689 }
690
691 /**
692 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
693 * with timeout.
694 */
695 @ParameterizedTest
696 @MethodSource("threadBuilders")
697 void testDatagramSocketAdaptorReceiveInterrupt2(Thread.Builder.OfVirtual builder) throws Exception {
698 testDatagramSocketAdaptorReceiveInterrupt(builder, 60_1000);
699 }
700
701 private void testDatagramSocketAdaptorReceiveInterrupt(Thread.Builder.OfVirtual builder,
702 int timeout) throws Exception {
703 VThreadRunner.run(builder, () -> {
704 try (DatagramChannel dc = DatagramChannel.open()) {
705 InetAddress lh = InetAddress.getLoopbackAddress();
706 dc.bind(new InetSocketAddress(lh, 0));
707
708 byte[] array = new byte[100];
709 DatagramPacket p = new DatagramPacket(array, 0, array.length);
710 if (timeout > 0)
711 dc.socket().setSoTimeout(timeout);
712
713 // interrupt current thread when it blocks in receive
714 Thread thisThread = Thread.currentThread();
715 runAfterParkedAsync(thisThread::interrupt);
716
717 try {
718 dc.socket().receive(p);
719 fail();
720 } catch (ClosedByInterruptException expected) {
721 assertTrue(Thread.interrupted());
722 }
723 }
724 });
725 }
726
727 /**
728 * Pipe read/write, no blocking.
729 */
730 @ParameterizedTest
731 @MethodSource("threadBuilders")
732 void testPipeReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
733 VThreadRunner.run(builder, () -> {
734 Pipe p = Pipe.open();
735 try (Pipe.SinkChannel sink = p.sink();
736 Pipe.SourceChannel source = p.source()) {
737
738 // write should not block
739 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
740 int n = sink.write(bb);
741 assertTrue(n > 0);
742
743 // read should not block
744 bb = ByteBuffer.allocate(10);
745 n = source.read(bb);
746 assertTrue(n > 0);
747 assertTrue(bb.get(0) == 'X');
748 }
749 });
750 }
751
752 /**
753 * Virtual thread blocks in Pipe.SourceChannel read.
754 */
755 @ParameterizedTest
756 @MethodSource("threadBuilders")
757 void testPipeReadWrite2(Thread.Builder.OfVirtual builder) throws Exception {
758 VThreadRunner.run(builder, () -> {
759 Pipe p = Pipe.open();
760 try (Pipe.SinkChannel sink = p.sink();
761 Pipe.SourceChannel source = p.source()) {
762
763 // write from sink when current thread blocks reading from source
764 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
765 runAfterParkedAsync(() -> sink.write(bb1));
766
767 // read should block
768 ByteBuffer bb2 = ByteBuffer.allocate(10);
769 int n = source.read(bb2);
770 assertTrue(n > 0);
771 assertTrue(bb2.get(0) == 'X');
772 }
773 });
774 }
775
776 /**
777 * Virtual thread blocks in Pipe.SinkChannel write.
778 */
779 @ParameterizedTest
780 @MethodSource("threadBuilders")
781 void testPipeReadWrite3(Thread.Builder.OfVirtual builder) throws Exception {
782 VThreadRunner.run(builder, () -> {
783 Pipe p = Pipe.open();
784 try (Pipe.SinkChannel sink = p.sink();
785 Pipe.SourceChannel source = p.source()) {
786
787 // read from source to EOF when current thread blocking in write
788 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
789
790 // write to sink should block
791 ByteBuffer bb = ByteBuffer.allocate(100*1024);
792 for (int i=0; i<1000; i++) {
793 int n = sink.write(bb);
794 assertTrue(n > 0);
795 bb.clear();
796 }
797 sink.close();
798
799 // wait for reader to finish
800 reader.join();
801 }
802 });
803 }
804
805 /**
806 * Pipe.SourceChannel close while virtual thread blocked in read.
807 */
808 @ParameterizedTest
809 @MethodSource("threadBuilders")
810 void testPipeReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
811 VThreadRunner.run(builder, () -> {
812 Pipe p = Pipe.open();
813 try (Pipe.SinkChannel sink = p.sink();
814 Pipe.SourceChannel source = p.source()) {
815 runAfterParkedAsync(source::close);
816 try {
817 int n = source.read(ByteBuffer.allocate(100));
818 fail("read returned " + n);
819 } catch (AsynchronousCloseException expected) { }
820 }
821 });
822 }
823
824 /**
825 * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
826 */
827 @ParameterizedTest
828 @MethodSource("threadBuilders")
829 void testPipeReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
830 VThreadRunner.run(builder, () -> {
831 Pipe p = Pipe.open();
832 try (Pipe.SinkChannel sink = p.sink();
833 Pipe.SourceChannel source = p.source()) {
834
835 // interrupt current thread when it blocks reading from source
836 Thread thisThread = Thread.currentThread();
837 runAfterParkedAsync(thisThread::interrupt);
838
839 try {
840 int n = source.read(ByteBuffer.allocate(100));
841 fail("read returned " + n);
842 } catch (ClosedByInterruptException expected) {
843 assertTrue(Thread.interrupted());
844 }
845 }
846 });
847 }
848
849 /**
850 * Pipe.SinkChannel close while virtual thread blocked in write.
851 */
852 @ParameterizedTest
853 @MethodSource("threadBuilders")
854 void testPipeWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
855 VThreadRunner.run(builder, () -> {
856 boolean retry = true;
857 while (retry) {
858 Pipe p = Pipe.open();
859 try (Pipe.SinkChannel sink = p.sink();
860 Pipe.SourceChannel source = p.source()) {
861
862 // close sink when current thread blocks in write
863 runAfterParkedAsync(sink::close);
864 try {
865 ByteBuffer bb = ByteBuffer.allocate(100*1024);
866 for (;;) {
867 int n = sink.write(bb);
868 assertTrue(n > 0);
869 bb.clear();
870 }
871 } catch (AsynchronousCloseException e) {
872 // closed when blocked in write
873 retry = false;
874 } catch (ClosedChannelException e) {
875 // closed when not blocked in write, need to retry test
876 }
877 }
878 }
879 });
880 }
881
882 /**
883 * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
884 */
885 @ParameterizedTest
886 @MethodSource("threadBuilders")
887 void testPipeWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
888 VThreadRunner.run(builder, () -> {
889 boolean retry = true;
890 while (retry) {
891 Pipe p = Pipe.open();
892 try (Pipe.SinkChannel sink = p.sink();
893 Pipe.SourceChannel source = p.source()) {
894
895 // interrupt current thread when it blocks in write
896 Thread thisThread = Thread.currentThread();
897 runAfterParkedAsync(thisThread::interrupt);
898
899 try {
900 ByteBuffer bb = ByteBuffer.allocate(100*1024);
901 for (;;) {
902 int n = sink.write(bb);
903 assertTrue(n > 0);
904 bb.clear();
905 }
906 } catch (ClosedByInterruptException expected) {
907 // closed when blocked in write
908 assertTrue(Thread.interrupted());
|