9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24 /*
25 * @test id=default
26 * @bug 8284161
27 * @summary Test virtual threads doing blocking I/O on NIO channels
28 * @library /test/lib
29 * @run junit/timeout=480 BlockingChannelOps
30 */
31
32 /*
33 * @test id=poller-modes
34 * @requires (os.family == "linux") | (os.family == "mac")
35 * @library /test/lib
36 * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 BlockingChannelOps
37 * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 BlockingChannelOps
38 */
39
40 /*
41 * @test id=no-vmcontinuations
42 * @requires vm.continuations
43 * @library /test/lib
44 * @run junit/othervm/timeout=480 -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
45 */
46
47 import java.io.Closeable;
48 import java.io.IOException;
49 import java.net.DatagramPacket;
50 import java.net.InetAddress;
51 import java.net.InetSocketAddress;
52 import java.net.Socket;
53 import java.net.SocketAddress;
54 import java.net.SocketException;
55 import java.nio.ByteBuffer;
56 import java.nio.channels.AsynchronousCloseException;
57 import java.nio.channels.ClosedByInterruptException;
58 import java.nio.channels.ClosedChannelException;
59 import java.nio.channels.DatagramChannel;
60 import java.nio.channels.Pipe;
61 import java.nio.channels.ReadableByteChannel;
62 import java.nio.channels.ServerSocketChannel;
63 import java.nio.channels.SocketChannel;
64 import java.nio.channels.WritableByteChannel;
65 import java.util.concurrent.locks.LockSupport;
66
67 import jdk.test.lib.thread.VThreadRunner;
68 import org.junit.jupiter.api.Test;
69 import static org.junit.jupiter.api.Assertions.*;
70
71 class BlockingChannelOps {
72
73 /**
74 * SocketChannel read/write, no blocking.
75 */
76 @Test
77 void testSocketChannelReadWrite1() throws Exception {
78 VThreadRunner.run(() -> {
79 try (var connection = new Connection()) {
80 SocketChannel sc1 = connection.channel1();
81 SocketChannel sc2 = connection.channel2();
82
83 // write to sc1
84 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
85 int n = sc1.write(bb);
86 assertTrue(n > 0);
87
88 // read from sc2 should not block
89 bb = ByteBuffer.allocate(10);
90 n = sc2.read(bb);
91 assertTrue(n > 0);
92 assertTrue(bb.get(0) == 'X');
93 }
94 });
95 }
96
97 /**
98 * Virtual thread blocks in SocketChannel read.
99 */
100 @Test
101 void testSocketChannelRead() throws Exception {
102 VThreadRunner.run(() -> {
103 try (var connection = new Connection()) {
104 SocketChannel sc1 = connection.channel1();
105 SocketChannel sc2 = connection.channel2();
106
107 // write to sc1 when current thread blocks in sc2.read
108 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
109 runAfterParkedAsync(() -> sc1.write(bb1));
110
111 // read from sc2 should block
112 ByteBuffer bb2 = ByteBuffer.allocate(10);
113 int n = sc2.read(bb2);
114 assertTrue(n > 0);
115 assertTrue(bb2.get(0) == 'X');
116 }
117 });
118 }
119
120 /**
121 * Virtual thread blocks in SocketChannel write.
122 */
123 @Test
124 void testSocketChannelWrite() throws Exception {
125 VThreadRunner.run(() -> {
126 try (var connection = new Connection()) {
127 SocketChannel sc1 = connection.channel1();
128 SocketChannel sc2 = connection.channel2();
129
130 // read from sc2 to EOF when current thread blocks in sc1.write
131 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
132
133 // write to sc1 should block
134 ByteBuffer bb = ByteBuffer.allocate(100*1024);
135 for (int i=0; i<1000; i++) {
136 int n = sc1.write(bb);
137 assertTrue(n > 0);
138 bb.clear();
139 }
140 sc1.close();
141
142 // wait for reader to finish
143 reader.join();
144 }
145 });
146 }
147
148 /**
149 * SocketChannel close while virtual thread blocked in read.
150 */
151 @Test
152 void testSocketChannelReadAsyncClose() throws Exception {
153 VThreadRunner.run(() -> {
154 try (var connection = new Connection()) {
155 SocketChannel sc = connection.channel1();
156 runAfterParkedAsync(sc::close);
157 try {
158 int n = sc.read(ByteBuffer.allocate(100));
159 fail("read returned " + n);
160 } catch (AsynchronousCloseException expected) { }
161 }
162 });
163 }
164
165 /**
166 * SocketChannel shutdownInput while virtual thread blocked in read.
167 */
168 @Test
169 void testSocketChannelReadAsyncShutdownInput() throws Exception {
170 VThreadRunner.run(() -> {
171 try (var connection = new Connection()) {
172 SocketChannel sc = connection.channel1();
173 runAfterParkedAsync(sc::shutdownInput);
174 int n = sc.read(ByteBuffer.allocate(100));
175 assertEquals(-1, n);
176 assertTrue(sc.isOpen());
177 }
178 });
179 }
180
181 /**
182 * Virtual thread interrupted while blocked in SocketChannel read.
183 */
184 @Test
185 void testSocketChannelReadInterrupt() throws Exception {
186 VThreadRunner.run(() -> {
187 try (var connection = new Connection()) {
188 SocketChannel sc = connection.channel1();
189
190 // interrupt current thread when it blocks in read
191 Thread thisThread = Thread.currentThread();
192 runAfterParkedAsync(thisThread::interrupt);
193
194 try {
195 int n = sc.read(ByteBuffer.allocate(100));
196 fail("read returned " + n);
197 } catch (ClosedByInterruptException expected) {
198 assertTrue(Thread.interrupted());
199 }
200 }
201 });
202 }
203
204 /**
205 * SocketChannel close while virtual thread blocked in write.
206 */
207 @Test
208 void testSocketChannelWriteAsyncClose() throws Exception {
209 VThreadRunner.run(() -> {
210 boolean done = false;
211 while (!done) {
212 try (var connection = new Connection()) {
213 SocketChannel sc = connection.channel1();
214
215 // close sc when current thread blocks in write
216 runAfterParkedAsync(sc::close, true);
217
218 // write until channel is closed
219 try {
220 ByteBuffer bb = ByteBuffer.allocate(100*1024);
221 for (;;) {
222 int n = sc.write(bb);
223 assertTrue(n > 0);
224 bb.clear();
225 }
226 } catch (AsynchronousCloseException expected) {
227 // closed when blocked in write
228 done = true;
229 } catch (ClosedChannelException e) {
230 // closed but not blocked in write, need to retry test
231 System.err.format("%s, need to retry!%n", e);
232 }
233 }
234 }
235 });
236 }
237
238
239 /**
240 * SocketChannel shutdownOutput while virtual thread blocked in write.
241 */
242 @Test
243 void testSocketChannelWriteAsyncShutdownOutput() throws Exception {
244 VThreadRunner.run(() -> {
245 try (var connection = new Connection()) {
246 SocketChannel sc = connection.channel1();
247
248 // shutdown output when current thread blocks in write
249 runAfterParkedAsync(sc::shutdownOutput);
250 try {
251 ByteBuffer bb = ByteBuffer.allocate(100*1024);
252 for (;;) {
253 int n = sc.write(bb);
254 assertTrue(n > 0);
255 bb.clear();
256 }
257 } catch (ClosedChannelException e) {
258 // expected
259 }
260 assertTrue(sc.isOpen());
261 }
262 });
263 }
264
265 /**
266 * Virtual thread interrupted while blocked in SocketChannel write.
267 */
268 @Test
269 void testSocketChannelWriteInterrupt() throws Exception {
270 VThreadRunner.run(() -> {
271 boolean done = false;
272 while (!done) {
273 try (var connection = new Connection()) {
274 SocketChannel sc = connection.channel1();
275
276 // interrupt current thread when it blocks in write
277 Thread thisThread = Thread.currentThread();
278 runAfterParkedAsync(thisThread::interrupt, true);
279
280 // write until channel is closed
281 try {
282 ByteBuffer bb = ByteBuffer.allocate(100*1024);
283 for (;;) {
284 int n = sc.write(bb);
285 assertTrue(n > 0);
286 bb.clear();
287 }
288 } catch (ClosedByInterruptException e) {
289 // closed when blocked in write
290 assertTrue(Thread.interrupted());
291 done = true;
292 } catch (ClosedChannelException e) {
293 // closed but not blocked in write, need to retry test
294 System.err.format("%s, need to retry!%n", e);
295 }
296 }
297 }
298 });
299 }
300
301 /**
302 * Virtual thread blocks in SocketChannel adaptor read.
303 */
304 @Test
305 void testSocketAdaptorRead1() throws Exception {
306 testSocketAdaptorRead(0);
307 }
308
309 /**
310 * Virtual thread blocks in SocketChannel adaptor read with timeout.
311 */
312 @Test
313 void testSocketAdaptorRead2() throws Exception {
314 testSocketAdaptorRead(60_000);
315 }
316
317 private void testSocketAdaptorRead(int timeout) throws Exception {
318 VThreadRunner.run(() -> {
319 try (var connection = new Connection()) {
320 SocketChannel sc1 = connection.channel1();
321 SocketChannel sc2 = connection.channel2();
322
323 // write to sc1 when currnet thread blocks reading from sc2
324 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
325 runAfterParkedAsync(() -> sc1.write(bb));
326
327 // read from sc2 should block
328 byte[] array = new byte[100];
329 if (timeout > 0)
330 sc2.socket().setSoTimeout(timeout);
331 int n = sc2.socket().getInputStream().read(array);
332 assertTrue(n > 0);
333 assertTrue(array[0] == 'X');
334 }
335 });
336 }
337
338 /**
339 * ServerSocketChannel accept, no blocking.
340 */
341 @Test
342 void testServerSocketChannelAccept1() throws Exception {
343 VThreadRunner.run(() -> {
344 try (var ssc = ServerSocketChannel.open()) {
345 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
346 var sc1 = SocketChannel.open(ssc.getLocalAddress());
347 // accept should not block
348 var sc2 = ssc.accept();
349 sc1.close();
350 sc2.close();
351 }
352 });
353 }
354
355 /**
356 * Virtual thread blocks in ServerSocketChannel accept.
357 */
358 @Test
359 void testServerSocketChannelAccept2() throws Exception {
360 VThreadRunner.run(() -> {
361 try (var ssc = ServerSocketChannel.open()) {
362 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
363 var sc1 = SocketChannel.open();
364
365 // connect when current thread when it blocks in accept
366 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
367
368 // accept should block
369 var sc2 = ssc.accept();
370 sc1.close();
371 sc2.close();
372 }
373 });
374 }
375
376 /**
377 * SeverSocketChannel close while virtual thread blocked in accept.
378 */
379 @Test
380 void testServerSocketChannelAcceptAsyncClose() throws Exception {
381 VThreadRunner.run(() -> {
382 try (var ssc = ServerSocketChannel.open()) {
383 InetAddress lh = InetAddress.getLoopbackAddress();
384 ssc.bind(new InetSocketAddress(lh, 0));
385 runAfterParkedAsync(ssc::close);
386 try {
387 SocketChannel sc = ssc.accept();
388 sc.close();
389 fail("connection accepted???");
390 } catch (AsynchronousCloseException expected) { }
391 }
392 });
393 }
394
395 /**
396 * Virtual thread interrupted while blocked in ServerSocketChannel accept.
397 */
398 @Test
399 void testServerSocketChannelAcceptInterrupt() throws Exception {
400 VThreadRunner.run(() -> {
401 try (var ssc = ServerSocketChannel.open()) {
402 InetAddress lh = InetAddress.getLoopbackAddress();
403 ssc.bind(new InetSocketAddress(lh, 0));
404
405 // interrupt current thread when it blocks in accept
406 Thread thisThread = Thread.currentThread();
407 runAfterParkedAsync(thisThread::interrupt);
408
409 try {
410 SocketChannel sc = ssc.accept();
411 sc.close();
412 fail("connection accepted???");
413 } catch (ClosedByInterruptException expected) {
414 assertTrue(Thread.interrupted());
415 }
416 }
417 });
418 }
419
420 /**
421 * Virtual thread blocks in ServerSocketChannel adaptor accept.
422 */
423 @Test
424 void testSocketChannelAdaptorAccept1() throws Exception {
425 testSocketChannelAdaptorAccept(0);
426 }
427
428 /**
429 * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
430 */
431 @Test
432 void testSocketChannelAdaptorAccept2() throws Exception {
433 testSocketChannelAdaptorAccept(60_000);
434 }
435
436 private void testSocketChannelAdaptorAccept(int timeout) throws Exception {
437 VThreadRunner.run(() -> {
438 try (var ssc = ServerSocketChannel.open()) {
439 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
440 var sc = SocketChannel.open();
441
442 // interrupt current thread when it blocks in accept
443 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
444
445 // accept should block
446 if (timeout > 0)
447 ssc.socket().setSoTimeout(timeout);
448 Socket s = ssc.socket().accept();
449 sc.close();
450 s.close();
451 }
452 });
453 }
454
455 /**
456 * DatagramChannel receive/send, no blocking.
457 */
458 @Test
459 void testDatagramChannelSendReceive1() throws Exception {
460 VThreadRunner.run(() -> {
461 try (DatagramChannel dc1 = DatagramChannel.open();
462 DatagramChannel dc2 = DatagramChannel.open()) {
463
464 InetAddress lh = InetAddress.getLoopbackAddress();
465 dc2.bind(new InetSocketAddress(lh, 0));
466
467 // send should not block
468 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
469 int n = dc1.send(bb, dc2.getLocalAddress());
470 assertTrue(n > 0);
471
472 // receive should not block
473 bb = ByteBuffer.allocate(10);
474 dc2.receive(bb);
475 assertTrue(bb.get(0) == 'X');
476 }
477 });
478 }
479
480 /**
481 * Virtual thread blocks in DatagramChannel receive.
482 */
483 @Test
484 void testDatagramChannelSendReceive2() throws Exception {
485 VThreadRunner.run(() -> {
486 try (DatagramChannel dc1 = DatagramChannel.open();
487 DatagramChannel dc2 = DatagramChannel.open()) {
488
489 InetAddress lh = InetAddress.getLoopbackAddress();
490 dc2.bind(new InetSocketAddress(lh, 0));
491
492 // send from dc1 when current thread blocked in dc2.receive
493 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
494 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
495
496 // read from dc2 should block
497 ByteBuffer bb2 = ByteBuffer.allocate(10);
498 dc2.receive(bb2);
499 assertTrue(bb2.get(0) == 'X');
500 }
501 });
502 }
503
504 /**
505 * DatagramChannel close while virtual thread blocked in receive.
506 */
507 @Test
508 void testDatagramChannelReceiveAsyncClose() throws Exception {
509 VThreadRunner.run(() -> {
510 try (DatagramChannel dc = DatagramChannel.open()) {
511 InetAddress lh = InetAddress.getLoopbackAddress();
512 dc.bind(new InetSocketAddress(lh, 0));
513 runAfterParkedAsync(dc::close);
514 try {
515 dc.receive(ByteBuffer.allocate(100));
516 fail("receive returned");
517 } catch (AsynchronousCloseException expected) { }
518 }
519 });
520 }
521
522 /**
523 * Virtual thread interrupted while blocked in DatagramChannel receive.
524 */
525 @Test
526 void testDatagramChannelReceiveInterrupt() throws Exception {
527 VThreadRunner.run(() -> {
528 try (DatagramChannel dc = DatagramChannel.open()) {
529 InetAddress lh = InetAddress.getLoopbackAddress();
530 dc.bind(new InetSocketAddress(lh, 0));
531
532 // interrupt current thread when it blocks in receive
533 Thread thisThread = Thread.currentThread();
534 runAfterParkedAsync(thisThread::interrupt);
535
536 try {
537 dc.receive(ByteBuffer.allocate(100));
538 fail("receive returned");
539 } catch (ClosedByInterruptException expected) {
540 assertTrue(Thread.interrupted());
541 }
542 }
543 });
544 }
545
546 /**
547 * Virtual thread blocks in DatagramSocket adaptor receive.
548 */
549 @Test
550 void testDatagramSocketAdaptorReceive1() throws Exception {
551 testDatagramSocketAdaptorReceive(0);
552 }
553
554 /**
555 * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
556 */
557 @Test
558 void testDatagramSocketAdaptorReceive2() throws Exception {
559 testDatagramSocketAdaptorReceive(60_000);
560 }
561
562 private void testDatagramSocketAdaptorReceive(int timeout) throws Exception {
563 VThreadRunner.run(() -> {
564 try (DatagramChannel dc1 = DatagramChannel.open();
565 DatagramChannel dc2 = DatagramChannel.open()) {
566
567 InetAddress lh = InetAddress.getLoopbackAddress();
568 dc2.bind(new InetSocketAddress(lh, 0));
569
570 // send from dc1 when current thread blocks in dc2 receive
571 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
572 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
573
574 // receive should block
575 byte[] array = new byte[100];
576 DatagramPacket p = new DatagramPacket(array, 0, array.length);
577 if (timeout > 0)
578 dc2.socket().setSoTimeout(timeout);
579 dc2.socket().receive(p);
580 assertTrue(p.getLength() == 3 && array[0] == 'X');
581 }
582 });
583 }
584
585 /**
586 * DatagramChannel close while virtual thread blocked in adaptor receive.
587 */
588 @Test
589 void testDatagramSocketAdaptorReceiveAsyncClose1() throws Exception {
590 testDatagramSocketAdaptorReceiveAsyncClose(0);
591 }
592
593 /**
594 * DatagramChannel close while virtual thread blocked in adaptor receive
595 * with timeout.
596 */
597 @Test
598 void testDatagramSocketAdaptorReceiveAsyncClose2() throws Exception {
599 testDatagramSocketAdaptorReceiveAsyncClose(60_1000);
600 }
601
602 private void testDatagramSocketAdaptorReceiveAsyncClose(int timeout) throws Exception {
603 VThreadRunner.run(() -> {
604 try (DatagramChannel dc = DatagramChannel.open()) {
605 InetAddress lh = InetAddress.getLoopbackAddress();
606 dc.bind(new InetSocketAddress(lh, 0));
607
608 byte[] array = new byte[100];
609 DatagramPacket p = new DatagramPacket(array, 0, array.length);
610 if (timeout > 0)
611 dc.socket().setSoTimeout(timeout);
612
613 // close channel/socket when current thread blocks in receive
614 runAfterParkedAsync(dc::close);
615
616 assertThrows(SocketException.class, () -> dc.socket().receive(p));
617 }
618 });
619 }
620
621 /**
622 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
623 */
624 @Test
625 void testDatagramSocketAdaptorReceiveInterrupt1() throws Exception {
626 testDatagramSocketAdaptorReceiveInterrupt(0);
627 }
628
629 /**
630 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
631 * with timeout.
632 */
633 @Test
634 void testDatagramSocketAdaptorReceiveInterrupt2() throws Exception {
635 testDatagramSocketAdaptorReceiveInterrupt(60_1000);
636 }
637
638 private void testDatagramSocketAdaptorReceiveInterrupt(int timeout) throws Exception {
639 VThreadRunner.run(() -> {
640 try (DatagramChannel dc = DatagramChannel.open()) {
641 InetAddress lh = InetAddress.getLoopbackAddress();
642 dc.bind(new InetSocketAddress(lh, 0));
643
644 byte[] array = new byte[100];
645 DatagramPacket p = new DatagramPacket(array, 0, array.length);
646 if (timeout > 0)
647 dc.socket().setSoTimeout(timeout);
648
649 // interrupt current thread when it blocks in receive
650 Thread thisThread = Thread.currentThread();
651 runAfterParkedAsync(thisThread::interrupt);
652
653 try {
654 dc.socket().receive(p);
655 fail();
656 } catch (ClosedByInterruptException expected) {
657 assertTrue(Thread.interrupted());
658 }
659 }
660 });
661 }
662
663 /**
664 * Pipe read/write, no blocking.
665 */
666 @Test
667 void testPipeReadWrite1() throws Exception {
668 VThreadRunner.run(() -> {
669 Pipe p = Pipe.open();
670 try (Pipe.SinkChannel sink = p.sink();
671 Pipe.SourceChannel source = p.source()) {
672
673 // write should not block
674 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
675 int n = sink.write(bb);
676 assertTrue(n > 0);
677
678 // read should not block
679 bb = ByteBuffer.allocate(10);
680 n = source.read(bb);
681 assertTrue(n > 0);
682 assertTrue(bb.get(0) == 'X');
683 }
684 });
685 }
686
687 /**
688 * Virtual thread blocks in Pipe.SourceChannel read.
689 */
690 @Test
691 void testPipeReadWrite2() throws Exception {
692 VThreadRunner.run(() -> {
693 Pipe p = Pipe.open();
694 try (Pipe.SinkChannel sink = p.sink();
695 Pipe.SourceChannel source = p.source()) {
696
697 // write from sink when current thread blocks reading from source
698 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
699 runAfterParkedAsync(() -> sink.write(bb1));
700
701 // read should block
702 ByteBuffer bb2 = ByteBuffer.allocate(10);
703 int n = source.read(bb2);
704 assertTrue(n > 0);
705 assertTrue(bb2.get(0) == 'X');
706 }
707 });
708 }
709
710 /**
711 * Virtual thread blocks in Pipe.SinkChannel write.
712 */
713 @Test
714 void testPipeReadWrite3() throws Exception {
715 VThreadRunner.run(() -> {
716 Pipe p = Pipe.open();
717 try (Pipe.SinkChannel sink = p.sink();
718 Pipe.SourceChannel source = p.source()) {
719
720 // read from source to EOF when current thread blocking in write
721 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
722
723 // write to sink should block
724 ByteBuffer bb = ByteBuffer.allocate(100*1024);
725 for (int i=0; i<1000; i++) {
726 int n = sink.write(bb);
727 assertTrue(n > 0);
728 bb.clear();
729 }
730 sink.close();
731
732 // wait for reader to finish
733 reader.join();
734 }
735 });
736 }
737
738 /**
739 * Pipe.SourceChannel close while virtual thread blocked in read.
740 */
741 @Test
742 void testPipeReadAsyncClose() throws Exception {
743 VThreadRunner.run(() -> {
744 Pipe p = Pipe.open();
745 try (Pipe.SinkChannel sink = p.sink();
746 Pipe.SourceChannel source = p.source()) {
747 runAfterParkedAsync(source::close);
748 try {
749 int n = source.read(ByteBuffer.allocate(100));
750 fail("read returned " + n);
751 } catch (AsynchronousCloseException expected) { }
752 }
753 });
754 }
755
756 /**
757 * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
758 */
759 @Test
760 void testPipeReadInterrupt() throws Exception {
761 VThreadRunner.run(() -> {
762 Pipe p = Pipe.open();
763 try (Pipe.SinkChannel sink = p.sink();
764 Pipe.SourceChannel source = p.source()) {
765
766 // interrupt current thread when it blocks reading from source
767 Thread thisThread = Thread.currentThread();
768 runAfterParkedAsync(thisThread::interrupt);
769
770 try {
771 int n = source.read(ByteBuffer.allocate(100));
772 fail("read returned " + n);
773 } catch (ClosedByInterruptException expected) {
774 assertTrue(Thread.interrupted());
775 }
776 }
777 });
778 }
779
780 /**
781 * Pipe.SinkChannel close while virtual thread blocked in write.
782 */
783 @Test
784 void testPipeWriteAsyncClose() throws Exception {
785 VThreadRunner.run(() -> {
786 boolean done = false;
787 while (!done) {
788 Pipe p = Pipe.open();
789 try (Pipe.SinkChannel sink = p.sink();
790 Pipe.SourceChannel source = p.source()) {
791
792 // close sink when current thread blocks in write
793 runAfterParkedAsync(sink::close, true);
794
795 // write until channel is closed
796 try {
797 ByteBuffer bb = ByteBuffer.allocate(100*1024);
798 for (;;) {
799 int n = sink.write(bb);
800 assertTrue(n > 0);
801 bb.clear();
802 }
803 } catch (AsynchronousCloseException e) {
804 // closed when blocked in write
805 done = true;
806 } catch (ClosedChannelException e) {
807 // closed but not blocked in write, need to retry test
808 System.err.format("%s, need to retry!%n", e);
809 }
810 }
811 }
812 });
813 }
814
815 /**
816 * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
817 */
818 @Test
819 void testPipeWriteInterrupt() throws Exception {
820 VThreadRunner.run(() -> {
821 boolean done = false;
822 while (!done) {
823 Pipe p = Pipe.open();
824 try (Pipe.SinkChannel sink = p.sink();
825 Pipe.SourceChannel source = p.source()) {
826
827 // interrupt current thread when it blocks in write
828 Thread thisThread = Thread.currentThread();
829 runAfterParkedAsync(thisThread::interrupt, true);
830
831 // write until channel is closed
832 try {
833 ByteBuffer bb = ByteBuffer.allocate(100*1024);
834 for (;;) {
835 int n = sink.write(bb);
836 assertTrue(n > 0);
837 bb.clear();
838 }
839 } catch (ClosedByInterruptException expected) {
840 // closed when blocked in write
|
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24 /*
25 * @test id=default
26 * @bug 8284161
27 * @summary Test virtual threads doing blocking I/O on NIO channels
28 * @library /test/lib
29 * @run junit/othervm/timeout=480 BlockingChannelOps
30 */
31
32 /*
33 * @test id=poller-modes
34 * @requires (os.family == "linux") | (os.family == "mac")
35 * @library /test/lib
36 * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 BlockingChannelOps
37 * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 BlockingChannelOps
38 * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 BlockingChannelOps
39 */
40
41 /*
42 * @test id=io_uring
43 * @requires os.family == "linux"
44 * @library /test/lib
45 * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 -Djdk.io_uring=true BlockingChannelOps
46 * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 -Djdk.io_uring=true BlockingChannelOps
47 * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 -Djdk.io_uring=true BlockingChannelOps
48 * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps
49 * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps
50 * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps
51 */
52
53 /*
54 * @test id=no-vmcontinuations
55 * @requires vm.continuations
56 * @library /test/lib
57 * @run junit/othervm/timeout=480 -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
58 */
59
60 import java.io.Closeable;
61 import java.io.IOException;
62 import java.net.DatagramPacket;
63 import java.net.InetAddress;
64 import java.net.InetSocketAddress;
65 import java.net.Socket;
66 import java.net.SocketAddress;
67 import java.net.SocketException;
68 import java.nio.ByteBuffer;
69 import java.nio.channels.AsynchronousCloseException;
70 import java.nio.channels.ClosedByInterruptException;
71 import java.nio.channels.ClosedChannelException;
72 import java.nio.channels.DatagramChannel;
73 import java.nio.channels.Pipe;
74 import java.nio.channels.ReadableByteChannel;
75 import java.nio.channels.ServerSocketChannel;
76 import java.nio.channels.SocketChannel;
77 import java.nio.channels.WritableByteChannel;
78 import java.util.concurrent.ExecutorService;
79 import java.util.concurrent.Executors;
80 import java.util.concurrent.ThreadFactory;
81 import java.util.concurrent.locks.LockSupport;
82 import java.util.stream.Stream;
83
84 import jdk.test.lib.thread.VThreadRunner;
85 import jdk.test.lib.thread.VThreadScheduler;
86
87 import org.junit.jupiter.api.BeforeAll;
88 import org.junit.jupiter.params.ParameterizedTest;
89 import org.junit.jupiter.params.provider.MethodSource;
90 import static org.junit.jupiter.api.Assertions.*;
91
92 class BlockingChannelOps {
93 private static ExecutorService threadPool;
94 private static Thread.VirtualThreadScheduler customScheduler;
95
96 @BeforeAll
97 static void setup() {
98 ThreadFactory factory = Thread.ofPlatform().daemon().factory();
99 threadPool = Executors.newCachedThreadPool(factory);
100 customScheduler = (_, task) -> threadPool.execute(task);
101 }
102
103 /**
104 * Returns the Thread.Builder to create the virtual thread.
105 */
106 static Stream<Thread.Builder.OfVirtual> threadBuilders() {
107 if (VThreadScheduler.supportsCustomScheduler()) {
108 return Stream.of(Thread.ofVirtual(), Thread.ofVirtual().scheduler(customScheduler));
109 } else {
110 return Stream.of(Thread.ofVirtual());
111 }
112 }
113
114 /**
115 * SocketChannel read/write, no blocking.
116 */
117 @ParameterizedTest
118 @MethodSource("threadBuilders")
119 void testSocketChannelReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
120 VThreadRunner.run(builder, () -> {
121 try (var connection = new Connection()) {
122 SocketChannel sc1 = connection.channel1();
123 SocketChannel sc2 = connection.channel2();
124
125 // write to sc1
126 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
127 int n = sc1.write(bb);
128 assertTrue(n > 0);
129
130 // read from sc2 should not block
131 bb = ByteBuffer.allocate(10);
132 n = sc2.read(bb);
133 assertTrue(n > 0);
134 assertTrue(bb.get(0) == 'X');
135 }
136 });
137 }
138
139 /**
140 * Virtual thread blocks in SocketChannel read.
141 */
142 @ParameterizedTest
143 @MethodSource("threadBuilders")
144 void testSocketChannelRead(Thread.Builder.OfVirtual builder) throws Exception {
145 VThreadRunner.run(builder, () -> {
146 try (var connection = new Connection()) {
147 SocketChannel sc1 = connection.channel1();
148 SocketChannel sc2 = connection.channel2();
149
150 // write to sc1 when current thread blocks in sc2.read
151 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
152 runAfterParkedAsync(() -> sc1.write(bb1));
153
154 // read from sc2 should block
155 ByteBuffer bb2 = ByteBuffer.allocate(10);
156 int n = sc2.read(bb2);
157 assertTrue(n > 0);
158 assertTrue(bb2.get(0) == 'X');
159 }
160 });
161 }
162
163 /**
164 * Virtual thread blocks in SocketChannel write.
165 */
166 @ParameterizedTest
167 @MethodSource("threadBuilders")
168 void testSocketChannelWrite(Thread.Builder.OfVirtual builder) throws Exception {
169 VThreadRunner.run(builder, () -> {
170 try (var connection = new Connection()) {
171 SocketChannel sc1 = connection.channel1();
172 SocketChannel sc2 = connection.channel2();
173
174 // read from sc2 to EOF when current thread blocks in sc1.write
175 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
176
177 // write to sc1 should block
178 ByteBuffer bb = ByteBuffer.allocate(100*1024);
179 for (int i=0; i<1000; i++) {
180 int n = sc1.write(bb);
181 assertTrue(n > 0);
182 bb.clear();
183 }
184 sc1.close();
185
186 // wait for reader to finish
187 reader.join();
188 }
189 });
190 }
191
192 /**
193 * SocketChannel close while virtual thread blocked in read.
194 */
195 @ParameterizedTest
196 @MethodSource("threadBuilders")
197 void testSocketChannelReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
198 VThreadRunner.run(builder, () -> {
199 try (var connection = new Connection()) {
200 SocketChannel sc = connection.channel1();
201 runAfterParkedAsync(sc::close);
202 try {
203 int n = sc.read(ByteBuffer.allocate(100));
204 fail("read returned " + n);
205 } catch (AsynchronousCloseException expected) { }
206 }
207 });
208 }
209
210 /**
211 * SocketChannel shutdownInput while virtual thread blocked in read.
212 */
213 @ParameterizedTest
214 @MethodSource("threadBuilders")
215 void testSocketChannelReadAsyncShutdownInput(Thread.Builder.OfVirtual builder) throws Exception {
216 VThreadRunner.run(builder, () -> {
217 try (var connection = new Connection()) {
218 SocketChannel sc = connection.channel1();
219 runAfterParkedAsync(sc::shutdownInput);
220 int n = sc.read(ByteBuffer.allocate(100));
221 assertEquals(-1, n);
222 assertTrue(sc.isOpen());
223 }
224 });
225 }
226
227 /**
228 * Virtual thread interrupted while blocked in SocketChannel read.
229 */
230 @ParameterizedTest
231 @MethodSource("threadBuilders")
232 void testSocketChannelReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
233 VThreadRunner.run(builder, () -> {
234 try (var connection = new Connection()) {
235 SocketChannel sc = connection.channel1();
236
237 // interrupt current thread when it blocks in read
238 Thread thisThread = Thread.currentThread();
239 runAfterParkedAsync(thisThread::interrupt);
240
241 try {
242 int n = sc.read(ByteBuffer.allocate(100));
243 fail("read returned " + n);
244 } catch (ClosedByInterruptException expected) {
245 assertTrue(Thread.interrupted());
246 }
247 }
248 });
249 }
250
251 /**
252 * SocketChannel close while virtual thread blocked in write.
253 */
254 @ParameterizedTest
255 @MethodSource("threadBuilders")
256 void testSocketChannelWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
257 VThreadRunner.run(builder, () -> {
258 boolean done = false;
259 while (!done) {
260 try (var connection = new Connection()) {
261 SocketChannel sc = connection.channel1();
262
263 // close sc when current thread blocks in write
264 runAfterParkedAsync(sc::close, true);
265
266 // write until channel is closed
267 try {
268 ByteBuffer bb = ByteBuffer.allocate(100*1024);
269 for (;;) {
270 int n = sc.write(bb);
271 assertTrue(n > 0);
272 bb.clear();
273 }
274 } catch (AsynchronousCloseException expected) {
275 // closed when blocked in write
276 done = true;
277 } catch (ClosedChannelException e) {
278 // closed but not blocked in write, need to retry test
279 System.err.format("%s, need to retry!%n", e);
280 }
281 }
282 }
283 });
284 }
285
286 /**
287 * SocketChannel shutdownOutput while virtual thread blocked in write.
288 */
289 @ParameterizedTest
290 @MethodSource("threadBuilders")
291 void testSocketChannelWriteAsyncShutdownOutput(Thread.Builder.OfVirtual builder) throws Exception {
292 VThreadRunner.run(builder, () -> {
293 try (var connection = new Connection()) {
294 SocketChannel sc = connection.channel1();
295
296 // shutdown output when current thread blocks in write
297 runAfterParkedAsync(sc::shutdownOutput);
298 try {
299 ByteBuffer bb = ByteBuffer.allocate(100*1024);
300 for (;;) {
301 int n = sc.write(bb);
302 assertTrue(n > 0);
303 bb.clear();
304 }
305 } catch (ClosedChannelException e) {
306 // expected
307 }
308 assertTrue(sc.isOpen());
309 }
310 });
311 }
312
313 /**
314 * Virtual thread interrupted while blocked in SocketChannel write.
315 */
316 @ParameterizedTest
317 @MethodSource("threadBuilders")
318 void testSocketChannelWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
319 VThreadRunner.run(builder, () -> {
320 boolean done = false;
321 while (!done) {
322 try (var connection = new Connection()) {
323 SocketChannel sc = connection.channel1();
324
325 // interrupt current thread when it blocks in write
326 Thread thisThread = Thread.currentThread();
327 runAfterParkedAsync(thisThread::interrupt, true);
328
329 // write until channel is closed
330 try {
331 ByteBuffer bb = ByteBuffer.allocate(100*1024);
332 for (;;) {
333 int n = sc.write(bb);
334 assertTrue(n > 0);
335 bb.clear();
336 }
337 } catch (ClosedByInterruptException e) {
338 // closed when blocked in write
339 assertTrue(Thread.interrupted());
340 done = true;
341 } catch (ClosedChannelException e) {
342 // closed but not blocked in write, need to retry test
343 System.err.format("%s, need to retry!%n", e);
344 }
345 }
346 }
347 });
348 }
349
350 /**
351 * Virtual thread blocks in SocketChannel adaptor read.
352 */
353 @ParameterizedTest
354 @MethodSource("threadBuilders")
355 void testSocketAdaptorRead1(Thread.Builder.OfVirtual builder) throws Exception {
356 testSocketAdaptorRead(builder, 0);
357 }
358
359 /**
360 * Virtual thread blocks in SocketChannel adaptor read with timeout.
361 */
362 @ParameterizedTest
363 @MethodSource("threadBuilders")
364 void testSocketAdaptorRead2(Thread.Builder.OfVirtual builder) throws Exception {
365 testSocketAdaptorRead(builder, 60_000);
366 }
367
368 private void testSocketAdaptorRead(Thread.Builder.OfVirtual builder,
369 int timeout) throws Exception {
370 VThreadRunner.run(builder, () -> {
371 try (var connection = new Connection()) {
372 SocketChannel sc1 = connection.channel1();
373 SocketChannel sc2 = connection.channel2();
374
375 // write to sc1 when currnet thread blocks reading from sc2
376 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
377 runAfterParkedAsync(() -> sc1.write(bb));
378
379 // read from sc2 should block
380 byte[] array = new byte[100];
381 if (timeout > 0)
382 sc2.socket().setSoTimeout(timeout);
383 int n = sc2.socket().getInputStream().read(array);
384 assertTrue(n > 0);
385 assertTrue(array[0] == 'X');
386 }
387 });
388 }
389
390 /**
391 * ServerSocketChannel accept, no blocking.
392 */
393 @ParameterizedTest
394 @MethodSource("threadBuilders")
395 void testServerSocketChannelAccept1(Thread.Builder.OfVirtual builder) throws Exception {
396 VThreadRunner.run(builder, () -> {
397 try (var ssc = ServerSocketChannel.open()) {
398 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
399 var sc1 = SocketChannel.open(ssc.getLocalAddress());
400 // accept should not block
401 var sc2 = ssc.accept();
402 sc1.close();
403 sc2.close();
404 }
405 });
406 }
407
408 /**
409 * Virtual thread blocks in ServerSocketChannel accept.
410 */
411 @ParameterizedTest
412 @MethodSource("threadBuilders")
413 void testServerSocketChannelAccept2(Thread.Builder.OfVirtual builder) throws Exception {
414 VThreadRunner.run(builder, () -> {
415 try (var ssc = ServerSocketChannel.open()) {
416 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
417 var sc1 = SocketChannel.open();
418
419 // connect when current thread when it blocks in accept
420 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
421
422 // accept should block
423 var sc2 = ssc.accept();
424 sc1.close();
425 sc2.close();
426 }
427 });
428 }
429
430 /**
431 * SeverSocketChannel close while virtual thread blocked in accept.
432 */
433 @ParameterizedTest
434 @MethodSource("threadBuilders")
435 void testServerSocketChannelAcceptAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
436 VThreadRunner.run(builder, () -> {
437 try (var ssc = ServerSocketChannel.open()) {
438 InetAddress lh = InetAddress.getLoopbackAddress();
439 ssc.bind(new InetSocketAddress(lh, 0));
440 runAfterParkedAsync(ssc::close);
441 try {
442 SocketChannel sc = ssc.accept();
443 sc.close();
444 fail("connection accepted???");
445 } catch (AsynchronousCloseException expected) { }
446 }
447 });
448 }
449
450 /**
451 * Virtual thread interrupted while blocked in ServerSocketChannel accept.
452 */
453 @ParameterizedTest
454 @MethodSource("threadBuilders")
455 void testServerSocketChannelAcceptInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
456 VThreadRunner.run(builder, () -> {
457 try (var ssc = ServerSocketChannel.open()) {
458 InetAddress lh = InetAddress.getLoopbackAddress();
459 ssc.bind(new InetSocketAddress(lh, 0));
460
461 // interrupt current thread when it blocks in accept
462 Thread thisThread = Thread.currentThread();
463 runAfterParkedAsync(thisThread::interrupt);
464
465 try {
466 SocketChannel sc = ssc.accept();
467 sc.close();
468 fail("connection accepted???");
469 } catch (ClosedByInterruptException expected) {
470 assertTrue(Thread.interrupted());
471 }
472 }
473 });
474 }
475
476 /**
477 * Virtual thread blocks in ServerSocketChannel adaptor accept.
478 */
479 @ParameterizedTest
480 @MethodSource("threadBuilders")
481 void testSocketChannelAdaptorAccept1(Thread.Builder.OfVirtual builder) throws Exception {
482 testSocketChannelAdaptorAccept(builder, 0);
483 }
484
485 /**
486 * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
487 */
488 @ParameterizedTest
489 @MethodSource("threadBuilders")
490 void testSocketChannelAdaptorAccept2(Thread.Builder.OfVirtual builder) throws Exception {
491 testSocketChannelAdaptorAccept(builder, 60_000);
492 }
493
494 private void testSocketChannelAdaptorAccept(Thread.Builder.OfVirtual builder,
495 int timeout) throws Exception {
496 VThreadRunner.run(builder, () -> {
497 try (var ssc = ServerSocketChannel.open()) {
498 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
499 var sc = SocketChannel.open();
500
501 // interrupt current thread when it blocks in accept
502 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
503
504 // accept should block
505 if (timeout > 0)
506 ssc.socket().setSoTimeout(timeout);
507 Socket s = ssc.socket().accept();
508 sc.close();
509 s.close();
510 }
511 });
512 }
513
514 /**
515 * DatagramChannel receive/send, no blocking.
516 */
517 @ParameterizedTest
518 @MethodSource("threadBuilders")
519 void testDatagramChannelSendReceive1(Thread.Builder.OfVirtual builder) throws Exception {
520 VThreadRunner.run(builder, () -> {
521 try (DatagramChannel dc1 = DatagramChannel.open();
522 DatagramChannel dc2 = DatagramChannel.open()) {
523
524 InetAddress lh = InetAddress.getLoopbackAddress();
525 dc2.bind(new InetSocketAddress(lh, 0));
526
527 // send should not block
528 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
529 int n = dc1.send(bb, dc2.getLocalAddress());
530 assertTrue(n > 0);
531
532 // receive should not block
533 bb = ByteBuffer.allocate(10);
534 dc2.receive(bb);
535 assertTrue(bb.get(0) == 'X');
536 }
537 });
538 }
539
540 /**
541 * Virtual thread blocks in DatagramChannel receive.
542 */
543 @ParameterizedTest
544 @MethodSource("threadBuilders")
545 void testDatagramChannelSendReceive2(Thread.Builder.OfVirtual builder) throws Exception {
546 VThreadRunner.run(builder, () -> {
547 try (DatagramChannel dc1 = DatagramChannel.open();
548 DatagramChannel dc2 = DatagramChannel.open()) {
549
550 InetAddress lh = InetAddress.getLoopbackAddress();
551 dc2.bind(new InetSocketAddress(lh, 0));
552
553 // send from dc1 when current thread blocked in dc2.receive
554 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
555 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
556
557 // read from dc2 should block
558 ByteBuffer bb2 = ByteBuffer.allocate(10);
559 dc2.receive(bb2);
560 assertTrue(bb2.get(0) == 'X');
561 }
562 });
563 }
564
565 /**
566 * DatagramChannel close while virtual thread blocked in receive.
567 */
568 @ParameterizedTest
569 @MethodSource("threadBuilders")
570 void testDatagramChannelReceiveAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
571 VThreadRunner.run(builder, () -> {
572 try (DatagramChannel dc = DatagramChannel.open()) {
573 InetAddress lh = InetAddress.getLoopbackAddress();
574 dc.bind(new InetSocketAddress(lh, 0));
575 runAfterParkedAsync(dc::close);
576 try {
577 dc.receive(ByteBuffer.allocate(100));
578 fail("receive returned");
579 } catch (AsynchronousCloseException expected) { }
580 }
581 });
582 }
583
584 /**
585 * Virtual thread interrupted while blocked in DatagramChannel receive.
586 */
587 @ParameterizedTest
588 @MethodSource("threadBuilders")
589 void testDatagramChannelReceiveInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
590 VThreadRunner.run(builder, () -> {
591 try (DatagramChannel dc = DatagramChannel.open()) {
592 InetAddress lh = InetAddress.getLoopbackAddress();
593 dc.bind(new InetSocketAddress(lh, 0));
594
595 // interrupt current thread when it blocks in receive
596 Thread thisThread = Thread.currentThread();
597 runAfterParkedAsync(thisThread::interrupt);
598
599 try {
600 dc.receive(ByteBuffer.allocate(100));
601 fail("receive returned");
602 } catch (ClosedByInterruptException expected) {
603 assertTrue(Thread.interrupted());
604 }
605 }
606 });
607 }
608
609 /**
610 * Virtual thread blocks in DatagramSocket adaptor receive.
611 */
612 @ParameterizedTest
613 @MethodSource("threadBuilders")
614 void testDatagramSocketAdaptorReceive1(Thread.Builder.OfVirtual builder) throws Exception {
615 testDatagramSocketAdaptorReceive(builder, 0);
616 }
617
618 /**
619 * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
620 */
621 @ParameterizedTest
622 @MethodSource("threadBuilders")
623 void testDatagramSocketAdaptorReceive2(Thread.Builder.OfVirtual builder) throws Exception {
624 testDatagramSocketAdaptorReceive(builder, 60_000);
625 }
626
627 private void testDatagramSocketAdaptorReceive(Thread.Builder.OfVirtual builder,
628 int timeout) throws Exception {
629 VThreadRunner.run(builder, () -> {
630 try (DatagramChannel dc1 = DatagramChannel.open();
631 DatagramChannel dc2 = DatagramChannel.open()) {
632
633 InetAddress lh = InetAddress.getLoopbackAddress();
634 dc2.bind(new InetSocketAddress(lh, 0));
635
636 // send from dc1 when current thread blocks in dc2 receive
637 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
638 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
639
640 // receive should block
641 byte[] array = new byte[100];
642 DatagramPacket p = new DatagramPacket(array, 0, array.length);
643 if (timeout > 0)
644 dc2.socket().setSoTimeout(timeout);
645 dc2.socket().receive(p);
646 assertTrue(p.getLength() == 3 && array[0] == 'X');
647 }
648 });
649 }
650
651 /**
652 * DatagramChannel close while virtual thread blocked in adaptor receive.
653 */
654 @ParameterizedTest
655 @MethodSource("threadBuilders")
656 void testDatagramSocketAdaptorReceiveAsyncClose1(Thread.Builder.OfVirtual builder) throws Exception {
657 testDatagramSocketAdaptorReceiveAsyncClose(builder, 0);
658 }
659
660 /**
661 * DatagramChannel close while virtual thread blocked in adaptor receive
662 * with timeout.
663 */
664 @ParameterizedTest
665 @MethodSource("threadBuilders")
666 void testDatagramSocketAdaptorReceiveAsyncClose2(Thread.Builder.OfVirtual builder) throws Exception {
667 testDatagramSocketAdaptorReceiveAsyncClose(builder, 60_1000);
668 }
669
670 private void testDatagramSocketAdaptorReceiveAsyncClose(Thread.Builder.OfVirtual builder,
671 int timeout) throws Exception {
672 VThreadRunner.run(builder, () -> {
673 try (DatagramChannel dc = DatagramChannel.open()) {
674 InetAddress lh = InetAddress.getLoopbackAddress();
675 dc.bind(new InetSocketAddress(lh, 0));
676
677 byte[] array = new byte[100];
678 DatagramPacket p = new DatagramPacket(array, 0, array.length);
679 if (timeout > 0)
680 dc.socket().setSoTimeout(timeout);
681
682 // close channel/socket when current thread blocks in receive
683 runAfterParkedAsync(dc::close);
684
685 assertThrows(SocketException.class, () -> dc.socket().receive(p));
686 }
687 });
688 }
689
690 /**
691 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
692 */
693 @ParameterizedTest
694 @MethodSource("threadBuilders")
695 void testDatagramSocketAdaptorReceiveInterrupt1(Thread.Builder.OfVirtual builder) throws Exception {
696 testDatagramSocketAdaptorReceiveInterrupt(builder, 0);
697 }
698
699 /**
700 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
701 * with timeout.
702 */
703 @ParameterizedTest
704 @MethodSource("threadBuilders")
705 void testDatagramSocketAdaptorReceiveInterrupt2(Thread.Builder.OfVirtual builder) throws Exception {
706 testDatagramSocketAdaptorReceiveInterrupt(builder, 60_1000);
707 }
708
709 private void testDatagramSocketAdaptorReceiveInterrupt(Thread.Builder.OfVirtual builder,
710 int timeout) throws Exception {
711 VThreadRunner.run(builder, () -> {
712 try (DatagramChannel dc = DatagramChannel.open()) {
713 InetAddress lh = InetAddress.getLoopbackAddress();
714 dc.bind(new InetSocketAddress(lh, 0));
715
716 byte[] array = new byte[100];
717 DatagramPacket p = new DatagramPacket(array, 0, array.length);
718 if (timeout > 0)
719 dc.socket().setSoTimeout(timeout);
720
721 // interrupt current thread when it blocks in receive
722 Thread thisThread = Thread.currentThread();
723 runAfterParkedAsync(thisThread::interrupt);
724
725 try {
726 dc.socket().receive(p);
727 fail();
728 } catch (ClosedByInterruptException expected) {
729 assertTrue(Thread.interrupted());
730 }
731 }
732 });
733 }
734
735 /**
736 * Pipe read/write, no blocking.
737 */
738 @ParameterizedTest
739 @MethodSource("threadBuilders")
740 void testPipeReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
741 VThreadRunner.run(builder, () -> {
742 Pipe p = Pipe.open();
743 try (Pipe.SinkChannel sink = p.sink();
744 Pipe.SourceChannel source = p.source()) {
745
746 // write should not block
747 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
748 int n = sink.write(bb);
749 assertTrue(n > 0);
750
751 // read should not block
752 bb = ByteBuffer.allocate(10);
753 n = source.read(bb);
754 assertTrue(n > 0);
755 assertTrue(bb.get(0) == 'X');
756 }
757 });
758 }
759
760 /**
761 * Virtual thread blocks in Pipe.SourceChannel read.
762 */
763 @ParameterizedTest
764 @MethodSource("threadBuilders")
765 void testPipeReadWrite2(Thread.Builder.OfVirtual builder) throws Exception {
766 VThreadRunner.run(builder, () -> {
767 Pipe p = Pipe.open();
768 try (Pipe.SinkChannel sink = p.sink();
769 Pipe.SourceChannel source = p.source()) {
770
771 // write from sink when current thread blocks reading from source
772 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
773 runAfterParkedAsync(() -> sink.write(bb1));
774
775 // read should block
776 ByteBuffer bb2 = ByteBuffer.allocate(10);
777 int n = source.read(bb2);
778 assertTrue(n > 0);
779 assertTrue(bb2.get(0) == 'X');
780 }
781 });
782 }
783
784 /**
785 * Virtual thread blocks in Pipe.SinkChannel write.
786 */
787 @ParameterizedTest
788 @MethodSource("threadBuilders")
789 void testPipeReadWrite3(Thread.Builder.OfVirtual builder) throws Exception {
790 VThreadRunner.run(builder, () -> {
791 Pipe p = Pipe.open();
792 try (Pipe.SinkChannel sink = p.sink();
793 Pipe.SourceChannel source = p.source()) {
794
795 // read from source to EOF when current thread blocking in write
796 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
797
798 // write to sink should block
799 ByteBuffer bb = ByteBuffer.allocate(100*1024);
800 for (int i=0; i<1000; i++) {
801 int n = sink.write(bb);
802 assertTrue(n > 0);
803 bb.clear();
804 }
805 sink.close();
806
807 // wait for reader to finish
808 reader.join();
809 }
810 });
811 }
812
813 /**
814 * Pipe.SourceChannel close while virtual thread blocked in read.
815 */
816 @ParameterizedTest
817 @MethodSource("threadBuilders")
818 void testPipeReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
819 VThreadRunner.run(builder, () -> {
820 Pipe p = Pipe.open();
821 try (Pipe.SinkChannel sink = p.sink();
822 Pipe.SourceChannel source = p.source()) {
823 runAfterParkedAsync(source::close);
824 try {
825 int n = source.read(ByteBuffer.allocate(100));
826 fail("read returned " + n);
827 } catch (AsynchronousCloseException expected) { }
828 }
829 });
830 }
831
832 /**
833 * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
834 */
835 @ParameterizedTest
836 @MethodSource("threadBuilders")
837 void testPipeReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
838 VThreadRunner.run(builder, () -> {
839 Pipe p = Pipe.open();
840 try (Pipe.SinkChannel sink = p.sink();
841 Pipe.SourceChannel source = p.source()) {
842
843 // interrupt current thread when it blocks reading from source
844 Thread thisThread = Thread.currentThread();
845 runAfterParkedAsync(thisThread::interrupt);
846
847 try {
848 int n = source.read(ByteBuffer.allocate(100));
849 fail("read returned " + n);
850 } catch (ClosedByInterruptException expected) {
851 assertTrue(Thread.interrupted());
852 }
853 }
854 });
855 }
856
857 /**
858 * Pipe.SinkChannel close while virtual thread blocked in write.
859 */
860 @ParameterizedTest
861 @MethodSource("threadBuilders")
862 void testPipeWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
863 VThreadRunner.run(builder, () -> {
864 boolean done = false;
865 while (!done) {
866 Pipe p = Pipe.open();
867 try (Pipe.SinkChannel sink = p.sink();
868 Pipe.SourceChannel source = p.source()) {
869
870 // close sink when current thread blocks in write
871 runAfterParkedAsync(sink::close, true);
872
873 // write until channel is closed
874 try {
875 ByteBuffer bb = ByteBuffer.allocate(100*1024);
876 for (;;) {
877 int n = sink.write(bb);
878 assertTrue(n > 0);
879 bb.clear();
880 }
881 } catch (AsynchronousCloseException e) {
882 // closed when blocked in write
883 done = true;
884 } catch (ClosedChannelException e) {
885 // closed but not blocked in write, need to retry test
886 System.err.format("%s, need to retry!%n", e);
887 }
888 }
889 }
890 });
891 }
892
893 /**
894 * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
895 */
896 @ParameterizedTest
897 @MethodSource("threadBuilders")
898 void testPipeWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
899 VThreadRunner.run(builder, () -> {
900 boolean done = false;
901 while (!done) {
902 Pipe p = Pipe.open();
903 try (Pipe.SinkChannel sink = p.sink();
904 Pipe.SourceChannel source = p.source()) {
905
906 // interrupt current thread when it blocks in write
907 Thread thisThread = Thread.currentThread();
908 runAfterParkedAsync(thisThread::interrupt, true);
909
910 // write until channel is closed
911 try {
912 ByteBuffer bb = ByteBuffer.allocate(100*1024);
913 for (;;) {
914 int n = sink.write(bb);
915 assertTrue(n > 0);
916 bb.clear();
917 }
918 } catch (ClosedByInterruptException expected) {
919 // closed when blocked in write
|