9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24 /*
25 * @test id=default
26 * @bug 8284161
27 * @summary Test virtual threads doing blocking I/O on NIO channels
28 * @library /test/lib
29 * @run junit/timeout=480 BlockingChannelOps
30 */
31
32 /*
33 * @test id=poller-modes
34 * @requires (os.family == "linux") | (os.family == "mac")
35 * @library /test/lib
36 * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 BlockingChannelOps
37 * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 BlockingChannelOps
38 */
39
40 /*
41 * @test id=no-vmcontinuations
42 * @requires vm.continuations
43 * @library /test/lib
44 * @run junit/othervm/timeout=480 -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
45 */
46
47 import java.io.Closeable;
48 import java.io.IOException;
49 import java.net.DatagramPacket;
50 import java.net.InetAddress;
51 import java.net.InetSocketAddress;
52 import java.net.Socket;
53 import java.net.SocketAddress;
54 import java.net.SocketException;
55 import java.nio.ByteBuffer;
56 import java.nio.channels.AsynchronousCloseException;
57 import java.nio.channels.ClosedByInterruptException;
58 import java.nio.channels.ClosedChannelException;
59 import java.nio.channels.DatagramChannel;
60 import java.nio.channels.Pipe;
61 import java.nio.channels.ReadableByteChannel;
62 import java.nio.channels.ServerSocketChannel;
63 import java.nio.channels.SocketChannel;
64 import java.nio.channels.WritableByteChannel;
65 import java.util.concurrent.locks.LockSupport;
66
67 import jdk.test.lib.thread.VThreadRunner;
68 import org.junit.jupiter.api.Test;
69 import org.junit.jupiter.params.ParameterizedTest;
70 import org.junit.jupiter.params.provider.ValueSource;
71 import static org.junit.jupiter.api.Assertions.*;
72
73 class BlockingChannelOps {
74
75 /**
76 * SocketChannel read/write, no blocking.
77 */
78 @Test
79 void testSocketChannelReadWrite1() throws Exception {
80 VThreadRunner.run(() -> {
81 try (var connection = new Connection()) {
82 SocketChannel sc1 = connection.channel1();
83 SocketChannel sc2 = connection.channel2();
84
85 // write to sc1
86 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
87 int n = sc1.write(bb);
88 assertTrue(n > 0);
89
90 // read from sc2 should not block
91 bb = ByteBuffer.allocate(10);
92 n = sc2.read(bb);
93 assertTrue(n > 0);
94 assertTrue(bb.get(0) == 'X');
95 }
96 });
97 }
98
99 /**
100 * Virtual thread blocks in SocketChannel read.
101 */
102 @Test
103 void testSocketChannelRead() throws Exception {
104 VThreadRunner.run(() -> {
105 try (var connection = new Connection()) {
106 SocketChannel sc1 = connection.channel1();
107 SocketChannel sc2 = connection.channel2();
108
109 // write to sc1 when current thread blocks in sc2.read
110 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
111 runAfterParkedAsync(() -> sc1.write(bb1));
112
113 // read from sc2 should block
114 ByteBuffer bb2 = ByteBuffer.allocate(10);
115 int n = sc2.read(bb2);
116 assertTrue(n > 0);
117 assertTrue(bb2.get(0) == 'X');
118 }
119 });
120 }
121
122 /**
123 * Virtual thread blocks in SocketChannel write.
124 */
125 @Test
126 void testSocketChannelWrite() throws Exception {
127 VThreadRunner.run(() -> {
128 try (var connection = new Connection()) {
129 SocketChannel sc1 = connection.channel1();
130 SocketChannel sc2 = connection.channel2();
131
132 // read from sc2 to EOF when current thread blocks in sc1.write
133 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
134
135 // write to sc1 should block
136 ByteBuffer bb = ByteBuffer.allocate(100*1024);
137 for (int i=0; i<1000; i++) {
138 int n = sc1.write(bb);
139 assertTrue(n > 0);
140 bb.clear();
141 }
142 sc1.close();
143
144 // wait for reader to finish
145 reader.join();
146 }
147 });
148 }
149
150 /**
151 * SocketChannel close while virtual thread blocked in read.
152 */
153 @Test
154 void testSocketChannelReadAsyncClose() throws Exception {
155 VThreadRunner.run(() -> {
156 try (var connection = new Connection()) {
157 SocketChannel sc = connection.channel1();
158 runAfterParkedAsync(sc::close);
159 try {
160 int n = sc.read(ByteBuffer.allocate(100));
161 fail("read returned " + n);
162 } catch (AsynchronousCloseException expected) { }
163 }
164 });
165 }
166
167 /**
168 * SocketChannel shutdownInput while virtual thread blocked in read.
169 */
170 @Test
171 void testSocketChannelReadAsyncShutdownInput() throws Exception {
172 VThreadRunner.run(() -> {
173 try (var connection = new Connection()) {
174 SocketChannel sc = connection.channel1();
175 runAfterParkedAsync(sc::shutdownInput);
176 int n = sc.read(ByteBuffer.allocate(100));
177 assertEquals(-1, n);
178 assertTrue(sc.isOpen());
179 }
180 });
181 }
182
183 /**
184 * Virtual thread interrupted while blocked in SocketChannel read.
185 */
186 @Test
187 void testSocketChannelReadInterrupt() throws Exception {
188 VThreadRunner.run(() -> {
189 try (var connection = new Connection()) {
190 SocketChannel sc = connection.channel1();
191
192 // interrupt current thread when it blocks in read
193 Thread thisThread = Thread.currentThread();
194 runAfterParkedAsync(thisThread::interrupt);
195
196 try {
197 int n = sc.read(ByteBuffer.allocate(100));
198 fail("read returned " + n);
199 } catch (ClosedByInterruptException expected) {
200 assertTrue(Thread.interrupted());
201 }
202 }
203 });
204 }
205
206 /**
207 * SocketChannel close while virtual thread blocked in write.
208 */
209 @Test
210 void testSocketChannelWriteAsyncClose() throws Exception {
211 VThreadRunner.run(() -> {
212 boolean done = false;
213 while (!done) {
214 try (var connection = new Connection()) {
215 SocketChannel sc = connection.channel1();
216
217 // close sc when current thread blocks in write
218 runAfterParkedAsync(sc::close, true);
219
220 // write until channel is closed
221 try {
222 ByteBuffer bb = ByteBuffer.allocate(100*1024);
223 for (;;) {
224 int n = sc.write(bb);
225 assertTrue(n > 0);
226 bb.clear();
227 }
228 } catch (AsynchronousCloseException expected) {
229 // closed when blocked in write
230 done = true;
231 } catch (ClosedChannelException e) {
232 // closed but not blocked in write, need to retry test
233 System.err.format("%s, need to retry!%n", e);
234 }
235 }
236 }
237 });
238 }
239
240
241 /**
242 * SocketChannel shutdownOutput while virtual thread blocked in write.
243 */
244 @Test
245 void testSocketChannelWriteAsyncShutdownOutput() throws Exception {
246 VThreadRunner.run(() -> {
247 try (var connection = new Connection()) {
248 SocketChannel sc = connection.channel1();
249
250 // shutdown output when current thread blocks in write
251 runAfterParkedAsync(sc::shutdownOutput);
252 try {
253 ByteBuffer bb = ByteBuffer.allocate(100*1024);
254 for (;;) {
255 int n = sc.write(bb);
256 assertTrue(n > 0);
257 bb.clear();
258 }
259 } catch (ClosedChannelException e) {
260 // expected
261 }
262 assertTrue(sc.isOpen());
263 }
264 });
265 }
266
267 /**
268 * Virtual thread interrupted while blocked in SocketChannel write.
269 */
270 @Test
271 void testSocketChannelWriteInterrupt() throws Exception {
272 VThreadRunner.run(() -> {
273 boolean done = false;
274 while (!done) {
275 try (var connection = new Connection()) {
276 SocketChannel sc = connection.channel1();
277
278 // interrupt current thread when it blocks in write
279 Thread thisThread = Thread.currentThread();
280 runAfterParkedAsync(thisThread::interrupt, true);
281
282 // write until channel is closed
283 try {
284 ByteBuffer bb = ByteBuffer.allocate(100*1024);
285 for (;;) {
286 int n = sc.write(bb);
287 assertTrue(n > 0);
288 bb.clear();
289 }
290 } catch (ClosedByInterruptException e) {
291 // closed when blocked in write
292 assertTrue(Thread.interrupted());
293 done = true;
294 } catch (ClosedChannelException e) {
295 // closed but not blocked in write, need to retry test
296 System.err.format("%s, need to retry!%n", e);
297 }
298 }
299 }
300 });
301 }
302
303 /**
304 * Virtual thread blocks in SocketChannel adaptor read.
305 */
306 @ParameterizedTest
307 @ValueSource(ints = { 0, 60_000 })
308 void testSocketAdaptorRead(int timeout) throws Exception {
309 VThreadRunner.run(() -> {
310 try (var connection = new Connection()) {
311 SocketChannel sc1 = connection.channel1();
312 SocketChannel sc2 = connection.channel2();
313
314 // write to sc1 when currnet thread blocks reading from sc2
315 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
316 runAfterParkedAsync(() -> sc1.write(bb));
317
318 // read from sc2 should block
319 byte[] array = new byte[100];
320 if (timeout > 0)
321 sc2.socket().setSoTimeout(timeout);
322 int n = sc2.socket().getInputStream().read(array);
323 assertTrue(n > 0);
324 assertTrue(array[0] == 'X');
325 }
326 });
327 }
328
329 /**
330 * ServerSocketChannel accept, no blocking.
331 */
332 @Test
333 void testServerSocketChannelAccept1() throws Exception {
334 VThreadRunner.run(() -> {
335 try (var ssc = ServerSocketChannel.open()) {
336 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
337 var sc1 = SocketChannel.open(ssc.getLocalAddress());
338 // accept should not block
339 var sc2 = ssc.accept();
340 sc1.close();
341 sc2.close();
342 }
343 });
344 }
345
346 /**
347 * Virtual thread blocks in ServerSocketChannel accept.
348 */
349 @Test
350 void testServerSocketChannelAccept2() throws Exception {
351 VThreadRunner.run(() -> {
352 try (var ssc = ServerSocketChannel.open()) {
353 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
354 var sc1 = SocketChannel.open();
355
356 // connect when current thread when it blocks in accept
357 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
358
359 // accept should block
360 var sc2 = ssc.accept();
361 sc1.close();
362 sc2.close();
363 }
364 });
365 }
366
367 /**
368 * SeverSocketChannel close while virtual thread blocked in accept.
369 */
370 @Test
371 void testServerSocketChannelAcceptAsyncClose() throws Exception {
372 VThreadRunner.run(() -> {
373 try (var ssc = ServerSocketChannel.open()) {
374 InetAddress lh = InetAddress.getLoopbackAddress();
375 ssc.bind(new InetSocketAddress(lh, 0));
376 runAfterParkedAsync(ssc::close);
377 try {
378 SocketChannel sc = ssc.accept();
379 sc.close();
380 fail("connection accepted???");
381 } catch (AsynchronousCloseException expected) { }
382 }
383 });
384 }
385
386 /**
387 * Virtual thread interrupted while blocked in ServerSocketChannel accept.
388 */
389 @Test
390 void testServerSocketChannelAcceptInterrupt() throws Exception {
391 VThreadRunner.run(() -> {
392 try (var ssc = ServerSocketChannel.open()) {
393 InetAddress lh = InetAddress.getLoopbackAddress();
394 ssc.bind(new InetSocketAddress(lh, 0));
395
396 // interrupt current thread when it blocks in accept
397 Thread thisThread = Thread.currentThread();
398 runAfterParkedAsync(thisThread::interrupt);
399
400 try {
401 SocketChannel sc = ssc.accept();
402 sc.close();
403 fail("connection accepted???");
404 } catch (ClosedByInterruptException expected) {
405 assertTrue(Thread.interrupted());
406 }
407 }
408 });
409 }
410
411 /**
412 * Virtual thread blocks in ServerSocketChannel adaptor accept.
413 */
414 @ParameterizedTest
415 @ValueSource(ints = { 0, 60_000 })
416 void testSocketChannelAdaptorAccept(int timeout) throws Exception {
417 VThreadRunner.run(() -> {
418 try (var ssc = ServerSocketChannel.open()) {
419 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
420 var sc = SocketChannel.open();
421
422 // interrupt current thread when it blocks in accept
423 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
424
425 // accept should block
426 if (timeout > 0)
427 ssc.socket().setSoTimeout(timeout);
428 Socket s = ssc.socket().accept();
429 sc.close();
430 s.close();
431 }
432 });
433 }
434
435 /**
436 * DatagramChannel receive/send, no blocking.
437 */
438 @Test
439 void testDatagramChannelSendReceive1() throws Exception {
440 VThreadRunner.run(() -> {
441 try (DatagramChannel dc1 = DatagramChannel.open();
442 DatagramChannel dc2 = DatagramChannel.open()) {
443
444 InetAddress lh = InetAddress.getLoopbackAddress();
445 dc2.bind(new InetSocketAddress(lh, 0));
446
447 // send should not block
448 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
449 int n = dc1.send(bb, dc2.getLocalAddress());
450 assertTrue(n > 0);
451
452 // receive should not block
453 bb = ByteBuffer.allocate(10);
454 dc2.receive(bb);
455 assertTrue(bb.get(0) == 'X');
456 }
457 });
458 }
459
460 /**
461 * Virtual thread blocks in DatagramChannel receive.
462 */
463 @Test
464 void testDatagramChannelSendReceive2() throws Exception {
465 VThreadRunner.run(() -> {
466 try (DatagramChannel dc1 = DatagramChannel.open();
467 DatagramChannel dc2 = DatagramChannel.open()) {
468
469 InetAddress lh = InetAddress.getLoopbackAddress();
470 dc2.bind(new InetSocketAddress(lh, 0));
471
472 // send from dc1 when current thread blocked in dc2.receive
473 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
474 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
475
476 // read from dc2 should block
477 ByteBuffer bb2 = ByteBuffer.allocate(10);
478 dc2.receive(bb2);
479 assertTrue(bb2.get(0) == 'X');
480 }
481 });
482 }
483
484 /**
485 * DatagramChannel close while virtual thread blocked in receive.
486 */
487 @Test
488 void testDatagramChannelReceiveAsyncClose() throws Exception {
489 VThreadRunner.run(() -> {
490 try (DatagramChannel dc = DatagramChannel.open()) {
491 InetAddress lh = InetAddress.getLoopbackAddress();
492 dc.bind(new InetSocketAddress(lh, 0));
493 runAfterParkedAsync(dc::close);
494 try {
495 dc.receive(ByteBuffer.allocate(100));
496 fail("receive returned");
497 } catch (AsynchronousCloseException expected) { }
498 }
499 });
500 }
501
502 /**
503 * Virtual thread interrupted while blocked in DatagramChannel receive.
504 */
505 @Test
506 void testDatagramChannelReceiveInterrupt() throws Exception {
507 VThreadRunner.run(() -> {
508 try (DatagramChannel dc = DatagramChannel.open()) {
509 InetAddress lh = InetAddress.getLoopbackAddress();
510 dc.bind(new InetSocketAddress(lh, 0));
511
512 // interrupt current thread when it blocks in receive
513 Thread thisThread = Thread.currentThread();
514 runAfterParkedAsync(thisThread::interrupt);
515
516 try {
517 dc.receive(ByteBuffer.allocate(100));
518 fail("receive returned");
519 } catch (ClosedByInterruptException expected) {
520 assertTrue(Thread.interrupted());
521 }
522 }
523 });
524 }
525
526 /**
527 * Virtual thread blocks in DatagramSocket adaptor receive.
528 */
529 @ParameterizedTest
530 @ValueSource(ints = { 0, 60_000 })
531 void testDatagramSocketAdaptorReceive(int timeout) throws Exception {
532 VThreadRunner.run(() -> {
533 try (DatagramChannel dc1 = DatagramChannel.open();
534 DatagramChannel dc2 = DatagramChannel.open()) {
535
536 InetAddress lh = InetAddress.getLoopbackAddress();
537 dc2.bind(new InetSocketAddress(lh, 0));
538
539 // send from dc1 when current thread blocks in dc2 receive
540 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
541 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
542
543 // receive should block
544 byte[] array = new byte[100];
545 DatagramPacket p = new DatagramPacket(array, 0, array.length);
546 if (timeout > 0)
547 dc2.socket().setSoTimeout(timeout);
548 dc2.socket().receive(p);
549 assertTrue(p.getLength() == 3 && array[0] == 'X');
550 }
551 });
552 }
553
554 /**
555 * DatagramChannel close while virtual thread blocked in adaptor receive.
556 */
557 @ParameterizedTest
558 @ValueSource(ints = { 0, 60_000 })
559 void testDatagramSocketAdaptorReceiveAsyncClose(int timeout) throws Exception {
560 VThreadRunner.run(() -> {
561 try (DatagramChannel dc = DatagramChannel.open()) {
562 InetAddress lh = InetAddress.getLoopbackAddress();
563 dc.bind(new InetSocketAddress(lh, 0));
564
565 byte[] array = new byte[100];
566 DatagramPacket p = new DatagramPacket(array, 0, array.length);
567 if (timeout > 0)
568 dc.socket().setSoTimeout(timeout);
569
570 // close channel/socket when current thread blocks in receive
571 runAfterParkedAsync(dc::close);
572
573 assertThrows(SocketException.class, () -> dc.socket().receive(p));
574 }
575 });
576 }
577
578 /**
579 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
580 */
581 @ParameterizedTest
582 @ValueSource(ints = { 0, 60_000 })
583 void testDatagramSocketAdaptorReceiveInterrupt(int timeout) throws Exception {
584 VThreadRunner.run(() -> {
585 try (DatagramChannel dc = DatagramChannel.open()) {
586 InetAddress lh = InetAddress.getLoopbackAddress();
587 dc.bind(new InetSocketAddress(lh, 0));
588
589 byte[] array = new byte[100];
590 DatagramPacket p = new DatagramPacket(array, 0, array.length);
591 if (timeout > 0)
592 dc.socket().setSoTimeout(timeout);
593
594 // interrupt current thread when it blocks in receive
595 Thread thisThread = Thread.currentThread();
596 runAfterParkedAsync(thisThread::interrupt);
597
598 try {
599 dc.socket().receive(p);
600 fail();
601 } catch (ClosedByInterruptException expected) {
602 assertTrue(Thread.interrupted());
603 }
604 }
605 });
606 }
607
608 /**
609 * Pipe read/write, no blocking.
610 */
611 @Test
612 void testPipeReadWrite1() throws Exception {
613 VThreadRunner.run(() -> {
614 Pipe p = Pipe.open();
615 try (Pipe.SinkChannel sink = p.sink();
616 Pipe.SourceChannel source = p.source()) {
617
618 // write should not block
619 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
620 int n = sink.write(bb);
621 assertTrue(n > 0);
622
623 // read should not block
624 bb = ByteBuffer.allocate(10);
625 n = source.read(bb);
626 assertTrue(n > 0);
627 assertTrue(bb.get(0) == 'X');
628 }
629 });
630 }
631
632 /**
633 * Virtual thread blocks in Pipe.SourceChannel read.
634 */
635 @Test
636 void testPipeReadWrite2() throws Exception {
637 VThreadRunner.run(() -> {
638 Pipe p = Pipe.open();
639 try (Pipe.SinkChannel sink = p.sink();
640 Pipe.SourceChannel source = p.source()) {
641
642 // write from sink when current thread blocks reading from source
643 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
644 runAfterParkedAsync(() -> sink.write(bb1));
645
646 // read should block
647 ByteBuffer bb2 = ByteBuffer.allocate(10);
648 int n = source.read(bb2);
649 assertTrue(n > 0);
650 assertTrue(bb2.get(0) == 'X');
651 }
652 });
653 }
654
655 /**
656 * Virtual thread blocks in Pipe.SinkChannel write.
657 */
658 @Test
659 void testPipeReadWrite3() throws Exception {
660 VThreadRunner.run(() -> {
661 Pipe p = Pipe.open();
662 try (Pipe.SinkChannel sink = p.sink();
663 Pipe.SourceChannel source = p.source()) {
664
665 // read from source to EOF when current thread blocking in write
666 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
667
668 // write to sink should block
669 ByteBuffer bb = ByteBuffer.allocate(100*1024);
670 for (int i=0; i<1000; i++) {
671 int n = sink.write(bb);
672 assertTrue(n > 0);
673 bb.clear();
674 }
675 sink.close();
676
677 // wait for reader to finish
678 reader.join();
679 }
680 });
681 }
682
683 /**
684 * Pipe.SourceChannel close while virtual thread blocked in read.
685 */
686 @Test
687 void testPipeReadAsyncClose() throws Exception {
688 VThreadRunner.run(() -> {
689 Pipe p = Pipe.open();
690 try (Pipe.SinkChannel sink = p.sink();
691 Pipe.SourceChannel source = p.source()) {
692 runAfterParkedAsync(source::close);
693 try {
694 int n = source.read(ByteBuffer.allocate(100));
695 fail("read returned " + n);
696 } catch (AsynchronousCloseException expected) { }
697 }
698 });
699 }
700
701 /**
702 * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
703 */
704 @Test
705 void testPipeReadInterrupt() throws Exception {
706 VThreadRunner.run(() -> {
707 Pipe p = Pipe.open();
708 try (Pipe.SinkChannel sink = p.sink();
709 Pipe.SourceChannel source = p.source()) {
710
711 // interrupt current thread when it blocks reading from source
712 Thread thisThread = Thread.currentThread();
713 runAfterParkedAsync(thisThread::interrupt);
714
715 try {
716 int n = source.read(ByteBuffer.allocate(100));
717 fail("read returned " + n);
718 } catch (ClosedByInterruptException expected) {
719 assertTrue(Thread.interrupted());
720 }
721 }
722 });
723 }
724
725 /**
726 * Pipe.SinkChannel close while virtual thread blocked in write.
727 */
728 @Test
729 void testPipeWriteAsyncClose() throws Exception {
730 VThreadRunner.run(() -> {
731 boolean done = false;
732 while (!done) {
733 Pipe p = Pipe.open();
734 try (Pipe.SinkChannel sink = p.sink();
735 Pipe.SourceChannel source = p.source()) {
736
737 // close sink when current thread blocks in write
738 runAfterParkedAsync(sink::close, true);
739
740 // write until channel is closed
741 try {
742 ByteBuffer bb = ByteBuffer.allocate(100*1024);
743 for (;;) {
744 int n = sink.write(bb);
745 assertTrue(n > 0);
746 bb.clear();
747 }
748 } catch (AsynchronousCloseException e) {
749 // closed when blocked in write
750 done = true;
751 } catch (ClosedChannelException e) {
752 // closed but not blocked in write, need to retry test
753 System.err.format("%s, need to retry!%n", e);
754 }
755 }
756 }
757 });
758 }
759
760 /**
761 * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
762 */
763 @Test
764 void testPipeWriteInterrupt() throws Exception {
765 VThreadRunner.run(() -> {
766 boolean done = false;
767 while (!done) {
768 Pipe p = Pipe.open();
769 try (Pipe.SinkChannel sink = p.sink();
770 Pipe.SourceChannel source = p.source()) {
771
772 // interrupt current thread when it blocks in write
773 Thread thisThread = Thread.currentThread();
774 runAfterParkedAsync(thisThread::interrupt, true);
775
776 // write until channel is closed
777 try {
778 ByteBuffer bb = ByteBuffer.allocate(100*1024);
779 for (;;) {
780 int n = sink.write(bb);
781 assertTrue(n > 0);
782 bb.clear();
783 }
784 } catch (ClosedByInterruptException expected) {
785 // closed when blocked in write
|
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24 /*
25 * @test id=default
26 * @bug 8284161
27 * @summary Test virtual threads doing blocking I/O on NIO channels
28 * @library /test/lib
29 * @run junit/othervm/timeout=480 BlockingChannelOps
30 */
31
32 /*
33 * @test id=poller-modes
34 * @requires (os.family == "linux") | (os.family == "mac")
35 * @library /test/lib
36 * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 BlockingChannelOps
37 * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 BlockingChannelOps
38 * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 BlockingChannelOps
39 */
40
41 /*
42 * @test id=io_uring
43 * @requires os.family == "linux"
44 * @library /test/lib
45 * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 -Djdk.io_uring=true BlockingChannelOps
46 * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 -Djdk.io_uring=true BlockingChannelOps
47 * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 -Djdk.io_uring=true BlockingChannelOps
48 * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps
49 * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps
50 * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps
51 */
52
53 /*
54 * @test id=no-vmcontinuations
55 * @requires vm.continuations
56 * @library /test/lib
57 * @run junit/othervm/timeout=480 -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
58 */
59
60 import java.io.Closeable;
61 import java.io.IOException;
62 import java.net.DatagramPacket;
63 import java.net.InetAddress;
64 import java.net.InetSocketAddress;
65 import java.net.Socket;
66 import java.net.SocketAddress;
67 import java.net.SocketException;
68 import java.nio.ByteBuffer;
69 import java.nio.channels.AsynchronousCloseException;
70 import java.nio.channels.ClosedByInterruptException;
71 import java.nio.channels.ClosedChannelException;
72 import java.nio.channels.DatagramChannel;
73 import java.nio.channels.Pipe;
74 import java.nio.channels.ReadableByteChannel;
75 import java.nio.channels.ServerSocketChannel;
76 import java.nio.channels.SocketChannel;
77 import java.nio.channels.WritableByteChannel;
78 import java.util.concurrent.ExecutorService;
79 import java.util.concurrent.Executors;
80 import java.util.concurrent.ThreadFactory;
81 import java.util.concurrent.locks.LockSupport;
82 import java.util.stream.Stream;
83
84 import jdk.test.lib.thread.VThreadRunner;
85 import jdk.test.lib.thread.VThreadScheduler;
86
87 import org.junit.jupiter.api.BeforeAll;
88 import org.junit.jupiter.params.ParameterizedTest;
89 import org.junit.jupiter.params.provider.MethodSource;
90 import static org.junit.jupiter.api.Assertions.*;
91
92 class BlockingChannelOps {
93 private static ExecutorService threadPool;
94 private static Thread.VirtualThreadScheduler customScheduler;
95
96 @BeforeAll
97 static void setup() {
98 ThreadFactory factory = Thread.ofPlatform().daemon().factory();
99 threadPool = Executors.newCachedThreadPool(factory);
100 customScheduler = new Thread.VirtualThreadScheduler() {
101 @Override
102 public void onStart(Thread.VirtualThreadTask task) {
103 threadPool.execute(task);
104 }
105 @Override
106 public void onContinue(Thread.VirtualThreadTask task) {
107 threadPool.execute(task);
108 }
109 };
110 }
111
112 /**
113 * Returns the Thread.Builder to create the virtual thread.
114 */
115 static Stream<Thread.Builder.OfVirtual> threadBuilders() {
116 if (VThreadScheduler.supportsCustomScheduler()) {
117 return Stream.of(Thread.ofVirtual(), Thread.ofVirtual().scheduler(customScheduler));
118 } else {
119 return Stream.of(Thread.ofVirtual());
120 }
121 }
122
123 /**
124 * SocketChannel read/write, no blocking.
125 */
126 @ParameterizedTest
127 @MethodSource("threadBuilders")
128 void testSocketChannelReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
129 VThreadRunner.run(builder, () -> {
130 try (var connection = new Connection()) {
131 SocketChannel sc1 = connection.channel1();
132 SocketChannel sc2 = connection.channel2();
133
134 // write to sc1
135 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
136 int n = sc1.write(bb);
137 assertTrue(n > 0);
138
139 // read from sc2 should not block
140 bb = ByteBuffer.allocate(10);
141 n = sc2.read(bb);
142 assertTrue(n > 0);
143 assertTrue(bb.get(0) == 'X');
144 }
145 });
146 }
147
148 /**
149 * Virtual thread blocks in SocketChannel read.
150 */
151 @ParameterizedTest
152 @MethodSource("threadBuilders")
153 void testSocketChannelRead(Thread.Builder.OfVirtual builder) throws Exception {
154 VThreadRunner.run(builder, () -> {
155 try (var connection = new Connection()) {
156 SocketChannel sc1 = connection.channel1();
157 SocketChannel sc2 = connection.channel2();
158
159 // write to sc1 when current thread blocks in sc2.read
160 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
161 runAfterParkedAsync(() -> sc1.write(bb1));
162
163 // read from sc2 should block
164 ByteBuffer bb2 = ByteBuffer.allocate(10);
165 int n = sc2.read(bb2);
166 assertTrue(n > 0);
167 assertTrue(bb2.get(0) == 'X');
168 }
169 });
170 }
171
172 /**
173 * Virtual thread blocks in SocketChannel write.
174 */
175 @ParameterizedTest
176 @MethodSource("threadBuilders")
177 void testSocketChannelWrite(Thread.Builder.OfVirtual builder) throws Exception {
178 VThreadRunner.run(builder, () -> {
179 try (var connection = new Connection()) {
180 SocketChannel sc1 = connection.channel1();
181 SocketChannel sc2 = connection.channel2();
182
183 // read from sc2 to EOF when current thread blocks in sc1.write
184 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
185
186 // write to sc1 should block
187 ByteBuffer bb = ByteBuffer.allocate(100*1024);
188 for (int i=0; i<1000; i++) {
189 int n = sc1.write(bb);
190 assertTrue(n > 0);
191 bb.clear();
192 }
193 sc1.close();
194
195 // wait for reader to finish
196 reader.join();
197 }
198 });
199 }
200
201 /**
202 * SocketChannel close while virtual thread blocked in read.
203 */
204 @ParameterizedTest
205 @MethodSource("threadBuilders")
206 void testSocketChannelReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
207 VThreadRunner.run(builder, () -> {
208 try (var connection = new Connection()) {
209 SocketChannel sc = connection.channel1();
210 runAfterParkedAsync(sc::close);
211 try {
212 int n = sc.read(ByteBuffer.allocate(100));
213 fail("read returned " + n);
214 } catch (AsynchronousCloseException expected) { }
215 }
216 });
217 }
218
219 /**
220 * SocketChannel shutdownInput while virtual thread blocked in read.
221 */
222 @ParameterizedTest
223 @MethodSource("threadBuilders")
224 void testSocketChannelReadAsyncShutdownInput(Thread.Builder.OfVirtual builder) throws Exception {
225 VThreadRunner.run(builder, () -> {
226 try (var connection = new Connection()) {
227 SocketChannel sc = connection.channel1();
228 runAfterParkedAsync(sc::shutdownInput);
229 int n = sc.read(ByteBuffer.allocate(100));
230 assertEquals(-1, n);
231 assertTrue(sc.isOpen());
232 }
233 });
234 }
235
236 /**
237 * Virtual thread interrupted while blocked in SocketChannel read.
238 */
239 @ParameterizedTest
240 @MethodSource("threadBuilders")
241 void testSocketChannelReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
242 VThreadRunner.run(builder, () -> {
243 try (var connection = new Connection()) {
244 SocketChannel sc = connection.channel1();
245
246 // interrupt current thread when it blocks in read
247 Thread thisThread = Thread.currentThread();
248 runAfterParkedAsync(thisThread::interrupt);
249
250 try {
251 int n = sc.read(ByteBuffer.allocate(100));
252 fail("read returned " + n);
253 } catch (ClosedByInterruptException expected) {
254 assertTrue(Thread.interrupted());
255 }
256 }
257 });
258 }
259
260 /**
261 * SocketChannel close while virtual thread blocked in write.
262 */
263 @ParameterizedTest
264 @MethodSource("threadBuilders")
265 void testSocketChannelWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
266 VThreadRunner.run(builder, () -> {
267 boolean done = false;
268 while (!done) {
269 try (var connection = new Connection()) {
270 SocketChannel sc = connection.channel1();
271
272 // close sc when current thread blocks in write
273 runAfterParkedAsync(sc::close, true);
274
275 // write until channel is closed
276 try {
277 ByteBuffer bb = ByteBuffer.allocate(100*1024);
278 for (;;) {
279 int n = sc.write(bb);
280 assertTrue(n > 0);
281 bb.clear();
282 }
283 } catch (AsynchronousCloseException expected) {
284 // closed when blocked in write
285 done = true;
286 } catch (ClosedChannelException e) {
287 // closed but not blocked in write, need to retry test
288 System.err.format("%s, need to retry!%n", e);
289 }
290 }
291 }
292 });
293 }
294
295 /**
296 * SocketChannel shutdownOutput while virtual thread blocked in write.
297 */
298 @ParameterizedTest
299 @MethodSource("threadBuilders")
300 void testSocketChannelWriteAsyncShutdownOutput(Thread.Builder.OfVirtual builder) throws Exception {
301 VThreadRunner.run(builder, () -> {
302 try (var connection = new Connection()) {
303 SocketChannel sc = connection.channel1();
304
305 // shutdown output when current thread blocks in write
306 runAfterParkedAsync(sc::shutdownOutput);
307 try {
308 ByteBuffer bb = ByteBuffer.allocate(100*1024);
309 for (;;) {
310 int n = sc.write(bb);
311 assertTrue(n > 0);
312 bb.clear();
313 }
314 } catch (ClosedChannelException e) {
315 // expected
316 }
317 assertTrue(sc.isOpen());
318 }
319 });
320 }
321
322 /**
323 * Virtual thread interrupted while blocked in SocketChannel write.
324 */
325 @ParameterizedTest
326 @MethodSource("threadBuilders")
327 void testSocketChannelWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
328 VThreadRunner.run(builder, () -> {
329 boolean done = false;
330 while (!done) {
331 try (var connection = new Connection()) {
332 SocketChannel sc = connection.channel1();
333
334 // interrupt current thread when it blocks in write
335 Thread thisThread = Thread.currentThread();
336 runAfterParkedAsync(thisThread::interrupt, true);
337
338 // write until channel is closed
339 try {
340 ByteBuffer bb = ByteBuffer.allocate(100*1024);
341 for (;;) {
342 int n = sc.write(bb);
343 assertTrue(n > 0);
344 bb.clear();
345 }
346 } catch (ClosedByInterruptException e) {
347 // closed when blocked in write
348 assertTrue(Thread.interrupted());
349 done = true;
350 } catch (ClosedChannelException e) {
351 // closed but not blocked in write, need to retry test
352 System.err.format("%s, need to retry!%n", e);
353 }
354 }
355 }
356 });
357 }
358
359 /**
360 * Virtual thread blocks in SocketChannel adaptor read.
361 */
362 @ParameterizedTest
363 @MethodSource("threadBuilders")
364 void testSocketAdaptorRead1(Thread.Builder.OfVirtual builder) throws Exception {
365 testSocketAdaptorRead(builder, 0);
366 }
367
368 /**
369 * Virtual thread blocks in SocketChannel adaptor read with timeout.
370 */
371 @ParameterizedTest
372 @MethodSource("threadBuilders")
373 void testSocketAdaptorRead2(Thread.Builder.OfVirtual builder) throws Exception {
374 testSocketAdaptorRead(builder, 60_000);
375 }
376
377 private void testSocketAdaptorRead(Thread.Builder.OfVirtual builder,
378 int timeout) throws Exception {
379 VThreadRunner.run(builder, () -> {
380 try (var connection = new Connection()) {
381 SocketChannel sc1 = connection.channel1();
382 SocketChannel sc2 = connection.channel2();
383
384 // write to sc1 when currnet thread blocks reading from sc2
385 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
386 runAfterParkedAsync(() -> sc1.write(bb));
387
388 // read from sc2 should block
389 byte[] array = new byte[100];
390 if (timeout > 0)
391 sc2.socket().setSoTimeout(timeout);
392 int n = sc2.socket().getInputStream().read(array);
393 assertTrue(n > 0);
394 assertTrue(array[0] == 'X');
395 }
396 });
397 }
398
399 /**
400 * ServerSocketChannel accept, no blocking.
401 */
402 @ParameterizedTest
403 @MethodSource("threadBuilders")
404 void testServerSocketChannelAccept1(Thread.Builder.OfVirtual builder) throws Exception {
405 VThreadRunner.run(builder, () -> {
406 try (var ssc = ServerSocketChannel.open()) {
407 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
408 var sc1 = SocketChannel.open(ssc.getLocalAddress());
409 // accept should not block
410 var sc2 = ssc.accept();
411 sc1.close();
412 sc2.close();
413 }
414 });
415 }
416
417 /**
418 * Virtual thread blocks in ServerSocketChannel accept.
419 */
420 @ParameterizedTest
421 @MethodSource("threadBuilders")
422 void testServerSocketChannelAccept2(Thread.Builder.OfVirtual builder) throws Exception {
423 VThreadRunner.run(builder, () -> {
424 try (var ssc = ServerSocketChannel.open()) {
425 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
426 var sc1 = SocketChannel.open();
427
428 // connect when current thread when it blocks in accept
429 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
430
431 // accept should block
432 var sc2 = ssc.accept();
433 sc1.close();
434 sc2.close();
435 }
436 });
437 }
438
439 /**
440 * SeverSocketChannel close while virtual thread blocked in accept.
441 */
442 @ParameterizedTest
443 @MethodSource("threadBuilders")
444 void testServerSocketChannelAcceptAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
445 VThreadRunner.run(builder, () -> {
446 try (var ssc = ServerSocketChannel.open()) {
447 InetAddress lh = InetAddress.getLoopbackAddress();
448 ssc.bind(new InetSocketAddress(lh, 0));
449 runAfterParkedAsync(ssc::close);
450 try {
451 SocketChannel sc = ssc.accept();
452 sc.close();
453 fail("connection accepted???");
454 } catch (AsynchronousCloseException expected) { }
455 }
456 });
457 }
458
459 /**
460 * Virtual thread interrupted while blocked in ServerSocketChannel accept.
461 */
462 @ParameterizedTest
463 @MethodSource("threadBuilders")
464 void testServerSocketChannelAcceptInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
465 VThreadRunner.run(builder, () -> {
466 try (var ssc = ServerSocketChannel.open()) {
467 InetAddress lh = InetAddress.getLoopbackAddress();
468 ssc.bind(new InetSocketAddress(lh, 0));
469
470 // interrupt current thread when it blocks in accept
471 Thread thisThread = Thread.currentThread();
472 runAfterParkedAsync(thisThread::interrupt);
473
474 try {
475 SocketChannel sc = ssc.accept();
476 sc.close();
477 fail("connection accepted???");
478 } catch (ClosedByInterruptException expected) {
479 assertTrue(Thread.interrupted());
480 }
481 }
482 });
483 }
484
485 /**
486 * Virtual thread blocks in ServerSocketChannel adaptor accept.
487 */
488 @ParameterizedTest
489 @MethodSource("threadBuilders")
490 void testSocketChannelAdaptorAccept1(Thread.Builder.OfVirtual builder) throws Exception {
491 testSocketChannelAdaptorAccept(builder, 0);
492 }
493
494 /**
495 * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
496 */
497 @ParameterizedTest
498 @MethodSource("threadBuilders")
499 void testSocketChannelAdaptorAccept2(Thread.Builder.OfVirtual builder) throws Exception {
500 testSocketChannelAdaptorAccept(builder, 60_000);
501 }
502
503 private void testSocketChannelAdaptorAccept(Thread.Builder.OfVirtual builder,
504 int timeout) throws Exception {
505 VThreadRunner.run(builder, () -> {
506 try (var ssc = ServerSocketChannel.open()) {
507 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
508 var sc = SocketChannel.open();
509
510 // interrupt current thread when it blocks in accept
511 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
512
513 // accept should block
514 if (timeout > 0)
515 ssc.socket().setSoTimeout(timeout);
516 Socket s = ssc.socket().accept();
517 sc.close();
518 s.close();
519 }
520 });
521 }
522
523 /**
524 * DatagramChannel receive/send, no blocking.
525 */
526 @ParameterizedTest
527 @MethodSource("threadBuilders")
528 void testDatagramChannelSendReceive1(Thread.Builder.OfVirtual builder) throws Exception {
529 VThreadRunner.run(builder, () -> {
530 try (DatagramChannel dc1 = DatagramChannel.open();
531 DatagramChannel dc2 = DatagramChannel.open()) {
532
533 InetAddress lh = InetAddress.getLoopbackAddress();
534 dc2.bind(new InetSocketAddress(lh, 0));
535
536 // send should not block
537 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
538 int n = dc1.send(bb, dc2.getLocalAddress());
539 assertTrue(n > 0);
540
541 // receive should not block
542 bb = ByteBuffer.allocate(10);
543 dc2.receive(bb);
544 assertTrue(bb.get(0) == 'X');
545 }
546 });
547 }
548
549 /**
550 * Virtual thread blocks in DatagramChannel receive.
551 */
552 @ParameterizedTest
553 @MethodSource("threadBuilders")
554 void testDatagramChannelSendReceive2(Thread.Builder.OfVirtual builder) throws Exception {
555 VThreadRunner.run(builder, () -> {
556 try (DatagramChannel dc1 = DatagramChannel.open();
557 DatagramChannel dc2 = DatagramChannel.open()) {
558
559 InetAddress lh = InetAddress.getLoopbackAddress();
560 dc2.bind(new InetSocketAddress(lh, 0));
561
562 // send from dc1 when current thread blocked in dc2.receive
563 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
564 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
565
566 // read from dc2 should block
567 ByteBuffer bb2 = ByteBuffer.allocate(10);
568 dc2.receive(bb2);
569 assertTrue(bb2.get(0) == 'X');
570 }
571 });
572 }
573
574 /**
575 * DatagramChannel close while virtual thread blocked in receive.
576 */
577 @ParameterizedTest
578 @MethodSource("threadBuilders")
579 void testDatagramChannelReceiveAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
580 VThreadRunner.run(builder, () -> {
581 try (DatagramChannel dc = DatagramChannel.open()) {
582 InetAddress lh = InetAddress.getLoopbackAddress();
583 dc.bind(new InetSocketAddress(lh, 0));
584 runAfterParkedAsync(dc::close);
585 try {
586 dc.receive(ByteBuffer.allocate(100));
587 fail("receive returned");
588 } catch (AsynchronousCloseException expected) { }
589 }
590 });
591 }
592
593 /**
594 * Virtual thread interrupted while blocked in DatagramChannel receive.
595 */
596 @ParameterizedTest
597 @MethodSource("threadBuilders")
598 void testDatagramChannelReceiveInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
599 VThreadRunner.run(builder, () -> {
600 try (DatagramChannel dc = DatagramChannel.open()) {
601 InetAddress lh = InetAddress.getLoopbackAddress();
602 dc.bind(new InetSocketAddress(lh, 0));
603
604 // interrupt current thread when it blocks in receive
605 Thread thisThread = Thread.currentThread();
606 runAfterParkedAsync(thisThread::interrupt);
607
608 try {
609 dc.receive(ByteBuffer.allocate(100));
610 fail("receive returned");
611 } catch (ClosedByInterruptException expected) {
612 assertTrue(Thread.interrupted());
613 }
614 }
615 });
616 }
617
618 /**
619 * Virtual thread blocks in DatagramSocket adaptor receive.
620 */
621 @ParameterizedTest
622 @MethodSource("threadBuilders")
623 void testDatagramSocketAdaptorReceive1(Thread.Builder.OfVirtual builder) throws Exception {
624 testDatagramSocketAdaptorReceive(builder, 0);
625 }
626
627 /**
628 * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
629 */
630 @ParameterizedTest
631 @MethodSource("threadBuilders")
632 void testDatagramSocketAdaptorReceive2(Thread.Builder.OfVirtual builder) throws Exception {
633 testDatagramSocketAdaptorReceive(builder, 60_000);
634 }
635
636 private void testDatagramSocketAdaptorReceive(Thread.Builder.OfVirtual builder,
637 int timeout) throws Exception {
638 VThreadRunner.run(builder, () -> {
639 try (DatagramChannel dc1 = DatagramChannel.open();
640 DatagramChannel dc2 = DatagramChannel.open()) {
641
642 InetAddress lh = InetAddress.getLoopbackAddress();
643 dc2.bind(new InetSocketAddress(lh, 0));
644
645 // send from dc1 when current thread blocks in dc2 receive
646 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
647 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
648
649 // receive should block
650 byte[] array = new byte[100];
651 DatagramPacket p = new DatagramPacket(array, 0, array.length);
652 if (timeout > 0)
653 dc2.socket().setSoTimeout(timeout);
654 dc2.socket().receive(p);
655 assertTrue(p.getLength() == 3 && array[0] == 'X');
656 }
657 });
658 }
659
660 /**
661 * DatagramChannel close while virtual thread blocked in adaptor receive.
662 */
663 @ParameterizedTest
664 @MethodSource("threadBuilders")
665 void testDatagramSocketAdaptorReceiveAsyncClose1(Thread.Builder.OfVirtual builder) throws Exception {
666 testDatagramSocketAdaptorReceiveAsyncClose(builder, 0);
667 }
668
669 /**
670 * DatagramChannel close while virtual thread blocked in adaptor receive
671 * with timeout.
672 */
673 @ParameterizedTest
674 @MethodSource("threadBuilders")
675 void testDatagramSocketAdaptorReceiveAsyncClose2(Thread.Builder.OfVirtual builder) throws Exception {
676 testDatagramSocketAdaptorReceiveAsyncClose(builder, 60_000);
677 }
678
679 private void testDatagramSocketAdaptorReceiveAsyncClose(Thread.Builder.OfVirtual builder,
680 int timeout) throws Exception {
681 VThreadRunner.run(builder, () -> {
682 try (DatagramChannel dc = DatagramChannel.open()) {
683 InetAddress lh = InetAddress.getLoopbackAddress();
684 dc.bind(new InetSocketAddress(lh, 0));
685
686 byte[] array = new byte[100];
687 DatagramPacket p = new DatagramPacket(array, 0, array.length);
688 if (timeout > 0)
689 dc.socket().setSoTimeout(timeout);
690
691 // close channel/socket when current thread blocks in receive
692 runAfterParkedAsync(dc::close);
693
694 assertThrows(SocketException.class, () -> dc.socket().receive(p));
695 }
696 });
697 }
698
699 /**
700 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
701 */
702 @ParameterizedTest
703 @MethodSource("threadBuilders")
704 void testDatagramSocketAdaptorReceiveInterrupt1(Thread.Builder.OfVirtual builder) throws Exception {
705 testDatagramSocketAdaptorReceiveInterrupt(builder, 0);
706 }
707
708 /**
709 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
710 * with timeout.
711 */
712 @ParameterizedTest
713 @MethodSource("threadBuilders")
714 void testDatagramSocketAdaptorReceiveInterrupt2(Thread.Builder.OfVirtual builder) throws Exception {
715 testDatagramSocketAdaptorReceiveInterrupt(builder, 60_000);
716 }
717
718 private void testDatagramSocketAdaptorReceiveInterrupt(Thread.Builder.OfVirtual builder,
719 int timeout) throws Exception {
720 VThreadRunner.run(builder, () -> {
721 try (DatagramChannel dc = DatagramChannel.open()) {
722 InetAddress lh = InetAddress.getLoopbackAddress();
723 dc.bind(new InetSocketAddress(lh, 0));
724
725 byte[] array = new byte[100];
726 DatagramPacket p = new DatagramPacket(array, 0, array.length);
727 if (timeout > 0)
728 dc.socket().setSoTimeout(timeout);
729
730 // interrupt current thread when it blocks in receive
731 Thread thisThread = Thread.currentThread();
732 runAfterParkedAsync(thisThread::interrupt);
733
734 try {
735 dc.socket().receive(p);
736 fail();
737 } catch (ClosedByInterruptException expected) {
738 assertTrue(Thread.interrupted());
739 }
740 }
741 });
742 }
743
744 /**
745 * Pipe read/write, no blocking.
746 */
747 @ParameterizedTest
748 @MethodSource("threadBuilders")
749 void testPipeReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
750 VThreadRunner.run(builder, () -> {
751 Pipe p = Pipe.open();
752 try (Pipe.SinkChannel sink = p.sink();
753 Pipe.SourceChannel source = p.source()) {
754
755 // write should not block
756 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
757 int n = sink.write(bb);
758 assertTrue(n > 0);
759
760 // read should not block
761 bb = ByteBuffer.allocate(10);
762 n = source.read(bb);
763 assertTrue(n > 0);
764 assertTrue(bb.get(0) == 'X');
765 }
766 });
767 }
768
769 /**
770 * Virtual thread blocks in Pipe.SourceChannel read.
771 */
772 @ParameterizedTest
773 @MethodSource("threadBuilders")
774 void testPipeReadWrite2(Thread.Builder.OfVirtual builder) throws Exception {
775 VThreadRunner.run(builder, () -> {
776 Pipe p = Pipe.open();
777 try (Pipe.SinkChannel sink = p.sink();
778 Pipe.SourceChannel source = p.source()) {
779
780 // write from sink when current thread blocks reading from source
781 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
782 runAfterParkedAsync(() -> sink.write(bb1));
783
784 // read should block
785 ByteBuffer bb2 = ByteBuffer.allocate(10);
786 int n = source.read(bb2);
787 assertTrue(n > 0);
788 assertTrue(bb2.get(0) == 'X');
789 }
790 });
791 }
792
793 /**
794 * Virtual thread blocks in Pipe.SinkChannel write.
795 */
796 @ParameterizedTest
797 @MethodSource("threadBuilders")
798 void testPipeReadWrite3(Thread.Builder.OfVirtual builder) throws Exception {
799 VThreadRunner.run(builder, () -> {
800 Pipe p = Pipe.open();
801 try (Pipe.SinkChannel sink = p.sink();
802 Pipe.SourceChannel source = p.source()) {
803
804 // read from source to EOF when current thread blocking in write
805 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
806
807 // write to sink should block
808 ByteBuffer bb = ByteBuffer.allocate(100*1024);
809 for (int i=0; i<1000; i++) {
810 int n = sink.write(bb);
811 assertTrue(n > 0);
812 bb.clear();
813 }
814 sink.close();
815
816 // wait for reader to finish
817 reader.join();
818 }
819 });
820 }
821
822 /**
823 * Pipe.SourceChannel close while virtual thread blocked in read.
824 */
825 @ParameterizedTest
826 @MethodSource("threadBuilders")
827 void testPipeReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
828 VThreadRunner.run(builder, () -> {
829 Pipe p = Pipe.open();
830 try (Pipe.SinkChannel sink = p.sink();
831 Pipe.SourceChannel source = p.source()) {
832 runAfterParkedAsync(source::close);
833 try {
834 int n = source.read(ByteBuffer.allocate(100));
835 fail("read returned " + n);
836 } catch (AsynchronousCloseException expected) { }
837 }
838 });
839 }
840
841 /**
842 * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
843 */
844 @ParameterizedTest
845 @MethodSource("threadBuilders")
846 void testPipeReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
847 VThreadRunner.run(builder, () -> {
848 Pipe p = Pipe.open();
849 try (Pipe.SinkChannel sink = p.sink();
850 Pipe.SourceChannel source = p.source()) {
851
852 // interrupt current thread when it blocks reading from source
853 Thread thisThread = Thread.currentThread();
854 runAfterParkedAsync(thisThread::interrupt);
855
856 try {
857 int n = source.read(ByteBuffer.allocate(100));
858 fail("read returned " + n);
859 } catch (ClosedByInterruptException expected) {
860 assertTrue(Thread.interrupted());
861 }
862 }
863 });
864 }
865
866 /**
867 * Pipe.SinkChannel close while virtual thread blocked in write.
868 */
869 @ParameterizedTest
870 @MethodSource("threadBuilders")
871 void testPipeWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
872 VThreadRunner.run(builder, () -> {
873 boolean done = false;
874 while (!done) {
875 Pipe p = Pipe.open();
876 try (Pipe.SinkChannel sink = p.sink();
877 Pipe.SourceChannel source = p.source()) {
878
879 // close sink when current thread blocks in write
880 runAfterParkedAsync(sink::close, true);
881
882 // write until channel is closed
883 try {
884 ByteBuffer bb = ByteBuffer.allocate(100*1024);
885 for (;;) {
886 int n = sink.write(bb);
887 assertTrue(n > 0);
888 bb.clear();
889 }
890 } catch (AsynchronousCloseException e) {
891 // closed when blocked in write
892 done = true;
893 } catch (ClosedChannelException e) {
894 // closed but not blocked in write, need to retry test
895 System.err.format("%s, need to retry!%n", e);
896 }
897 }
898 }
899 });
900 }
901
902 /**
903 * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
904 */
905 @ParameterizedTest
906 @MethodSource("threadBuilders")
907 void testPipeWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
908 VThreadRunner.run(builder, () -> {
909 boolean done = false;
910 while (!done) {
911 Pipe p = Pipe.open();
912 try (Pipe.SinkChannel sink = p.sink();
913 Pipe.SourceChannel source = p.source()) {
914
915 // interrupt current thread when it blocks in write
916 Thread thisThread = Thread.currentThread();
917 runAfterParkedAsync(thisThread::interrupt, true);
918
919 // write until channel is closed
920 try {
921 ByteBuffer bb = ByteBuffer.allocate(100*1024);
922 for (;;) {
923 int n = sink.write(bb);
924 assertTrue(n > 0);
925 bb.clear();
926 }
927 } catch (ClosedByInterruptException expected) {
928 // closed when blocked in write
|