1 /*
2 * Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24 /*
25 * @test id=default
26 * @bug 8284161
27 * @summary Test virtual threads doing blocking I/O on NIO channels
28 * @library /test/lib
29 * @run junit/othervm/timeout=480 BlockingChannelOps
30 */
31
32 /*
33 * @test id=poller-modes
34 * @requires (os.family == "linux") | (os.family == "mac")
35 * @library /test/lib
36 * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 BlockingChannelOps
37 * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 BlockingChannelOps
38 */
39
40 /*
41 * @test id=io_uring
42 * @requires os.family == "linux"
43 * @library /test/lib
44 * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 -Djdk.io_uring=true BlockingChannelOps
45 * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 -Djdk.io_uring=true BlockingChannelOps
46 * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 -Djdk.io_uring=true BlockingChannelOps
47 * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps
48 * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps
49 * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps
50 */
51
52 /*
53 * @test id=no-vmcontinuations
54 * @requires vm.continuations
55 * @library /test/lib
56 * @run junit/othervm/timeout=480 -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
57 */
58
59 import java.io.Closeable;
60 import java.io.IOException;
61 import java.net.DatagramPacket;
62 import java.net.InetAddress;
63 import java.net.InetSocketAddress;
64 import java.net.Socket;
65 import java.net.SocketAddress;
66 import java.net.SocketException;
67 import java.nio.ByteBuffer;
68 import java.nio.channels.AsynchronousCloseException;
69 import java.nio.channels.ClosedByInterruptException;
70 import java.nio.channels.ClosedChannelException;
71 import java.nio.channels.DatagramChannel;
72 import java.nio.channels.Pipe;
73 import java.nio.channels.ReadableByteChannel;
74 import java.nio.channels.ServerSocketChannel;
75 import java.nio.channels.SocketChannel;
76 import java.nio.channels.WritableByteChannel;
77 import java.util.concurrent.locks.LockSupport;
78
79 import jdk.test.lib.thread.VThreadRunner;
80 import org.junit.jupiter.api.Test;
81 import org.junit.jupiter.params.ParameterizedTest;
82 import org.junit.jupiter.params.provider.ValueSource;
83 import static org.junit.jupiter.api.Assertions.*;
84
85 class BlockingChannelOps {
86
87 /**
88 * SocketChannel read/write, no blocking.
89 */
90 @Test
91 void testSocketChannelReadWrite1() throws Exception {
92 VThreadRunner.run(() -> {
93 try (var connection = new Connection()) {
94 SocketChannel sc1 = connection.channel1();
95 SocketChannel sc2 = connection.channel2();
96
97 // write to sc1
98 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
99 int n = sc1.write(bb);
100 assertTrue(n > 0);
101
102 // read from sc2 should not block
103 bb = ByteBuffer.allocate(10);
104 n = sc2.read(bb);
105 assertTrue(n > 0);
106 assertTrue(bb.get(0) == 'X');
107 }
108 });
109 }
110
111 /**
112 * Virtual thread blocks in SocketChannel read.
113 */
114 @Test
115 void testSocketChannelRead() throws Exception {
116 VThreadRunner.run(() -> {
117 try (var connection = new Connection()) {
118 SocketChannel sc1 = connection.channel1();
119 SocketChannel sc2 = connection.channel2();
120
121 // write to sc1 when current thread blocks in sc2.read
122 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
123 runAfterParkedAsync(() -> sc1.write(bb1));
124
125 // read from sc2 should block
126 ByteBuffer bb2 = ByteBuffer.allocate(10);
127 int n = sc2.read(bb2);
128 assertTrue(n > 0);
129 assertTrue(bb2.get(0) == 'X');
130 }
131 });
132 }
133
134 /**
135 * Virtual thread blocks in SocketChannel write.
136 */
137 @Test
138 void testSocketChannelWrite() throws Exception {
139 VThreadRunner.run(() -> {
140 try (var connection = new Connection()) {
141 SocketChannel sc1 = connection.channel1();
142 SocketChannel sc2 = connection.channel2();
143
144 // read from sc2 to EOF when current thread blocks in sc1.write
145 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
146
147 // write to sc1 should block
148 ByteBuffer bb = ByteBuffer.allocate(100*1024);
149 for (int i=0; i<1000; i++) {
150 int n = sc1.write(bb);
151 assertTrue(n > 0);
152 bb.clear();
153 }
154 sc1.close();
155
156 // wait for reader to finish
157 reader.join();
158 }
159 });
160 }
161
162 /**
163 * SocketChannel close while virtual thread blocked in read.
164 */
165 @Test
166 void testSocketChannelReadAsyncClose() throws Exception {
167 VThreadRunner.run(() -> {
168 try (var connection = new Connection()) {
169 SocketChannel sc = connection.channel1();
170 runAfterParkedAsync(sc::close);
171 try {
172 int n = sc.read(ByteBuffer.allocate(100));
173 fail("read returned " + n);
174 } catch (AsynchronousCloseException expected) { }
175 }
176 });
177 }
178
179 /**
180 * SocketChannel shutdownInput while virtual thread blocked in read.
181 */
182 @Test
183 void testSocketChannelReadAsyncShutdownInput() throws Exception {
184 VThreadRunner.run(() -> {
185 try (var connection = new Connection()) {
186 SocketChannel sc = connection.channel1();
187 runAfterParkedAsync(sc::shutdownInput);
188 int n = sc.read(ByteBuffer.allocate(100));
189 assertEquals(-1, n);
190 assertTrue(sc.isOpen());
191 }
192 });
193 }
194
195 /**
196 * Virtual thread interrupted while blocked in SocketChannel read.
197 */
198 @Test
199 void testSocketChannelReadInterrupt() throws Exception {
200 VThreadRunner.run(() -> {
201 try (var connection = new Connection()) {
202 SocketChannel sc = connection.channel1();
203
204 // interrupt current thread when it blocks in read
205 Thread thisThread = Thread.currentThread();
206 runAfterParkedAsync(thisThread::interrupt);
207
208 try {
209 int n = sc.read(ByteBuffer.allocate(100));
210 fail("read returned " + n);
211 } catch (ClosedByInterruptException expected) {
212 assertTrue(Thread.interrupted());
213 }
214 }
215 });
216 }
217
218 /**
219 * SocketChannel close while virtual thread blocked in write.
220 */
221 @Test
222 void testSocketChannelWriteAsyncClose() throws Exception {
223 VThreadRunner.run(() -> {
224 boolean done = false;
225 while (!done) {
226 try (var connection = new Connection()) {
227 SocketChannel sc = connection.channel1();
228
229 // close sc when current thread blocks in write
230 runAfterParkedAsync(sc::close, true);
231
232 // write until channel is closed
233 try {
234 ByteBuffer bb = ByteBuffer.allocate(100*1024);
235 for (;;) {
236 int n = sc.write(bb);
237 assertTrue(n > 0);
238 bb.clear();
239 }
240 } catch (AsynchronousCloseException expected) {
241 // closed when blocked in write
242 done = true;
243 } catch (ClosedChannelException e) {
244 // closed but not blocked in write, need to retry test
245 System.err.format("%s, need to retry!%n", e);
246 }
247 }
248 }
249 });
250 }
251
252
253 /**
254 * SocketChannel shutdownOutput while virtual thread blocked in write.
255 */
256 @Test
257 void testSocketChannelWriteAsyncShutdownOutput() throws Exception {
258 VThreadRunner.run(() -> {
259 try (var connection = new Connection()) {
260 SocketChannel sc = connection.channel1();
261
262 // shutdown output when current thread blocks in write
263 runAfterParkedAsync(sc::shutdownOutput);
264 try {
265 ByteBuffer bb = ByteBuffer.allocate(100*1024);
266 for (;;) {
267 int n = sc.write(bb);
268 assertTrue(n > 0);
269 bb.clear();
270 }
271 } catch (ClosedChannelException e) {
272 // expected
273 }
274 assertTrue(sc.isOpen());
275 }
276 });
277 }
278
279 /**
280 * Virtual thread interrupted while blocked in SocketChannel write.
281 */
282 @Test
283 void testSocketChannelWriteInterrupt() throws Exception {
284 VThreadRunner.run(() -> {
285 boolean done = false;
286 while (!done) {
287 try (var connection = new Connection()) {
288 SocketChannel sc = connection.channel1();
289
290 // interrupt current thread when it blocks in write
291 Thread thisThread = Thread.currentThread();
292 runAfterParkedAsync(thisThread::interrupt, true);
293
294 // write until channel is closed
295 try {
296 ByteBuffer bb = ByteBuffer.allocate(100*1024);
297 for (;;) {
298 int n = sc.write(bb);
299 assertTrue(n > 0);
300 bb.clear();
301 }
302 } catch (ClosedByInterruptException e) {
303 // closed when blocked in write
304 assertTrue(Thread.interrupted());
305 done = true;
306 } catch (ClosedChannelException e) {
307 // closed but not blocked in write, need to retry test
308 System.err.format("%s, need to retry!%n", e);
309 }
310 }
311 }
312 });
313 }
314
315 /**
316 * Virtual thread blocks in SocketChannel adaptor read.
317 */
318 @ParameterizedTest
319 @ValueSource(ints = { 0, 60_000 })
320 void testSocketAdaptorRead(int timeout) throws Exception {
321 VThreadRunner.run(() -> {
322 try (var connection = new Connection()) {
323 SocketChannel sc1 = connection.channel1();
324 SocketChannel sc2 = connection.channel2();
325
326 // write to sc1 when currnet thread blocks reading from sc2
327 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
328 runAfterParkedAsync(() -> sc1.write(bb));
329
330 // read from sc2 should block
331 byte[] array = new byte[100];
332 if (timeout > 0)
333 sc2.socket().setSoTimeout(timeout);
334 int n = sc2.socket().getInputStream().read(array);
335 assertTrue(n > 0);
336 assertTrue(array[0] == 'X');
337 }
338 });
339 }
340
341 /**
342 * ServerSocketChannel accept, no blocking.
343 */
344 @Test
345 void testServerSocketChannelAccept1() throws Exception {
346 VThreadRunner.run(() -> {
347 try (var ssc = ServerSocketChannel.open()) {
348 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
349 var sc1 = SocketChannel.open(ssc.getLocalAddress());
350 // accept should not block
351 var sc2 = ssc.accept();
352 sc1.close();
353 sc2.close();
354 }
355 });
356 }
357
358 /**
359 * Virtual thread blocks in ServerSocketChannel accept.
360 */
361 @Test
362 void testServerSocketChannelAccept2() throws Exception {
363 VThreadRunner.run(() -> {
364 try (var ssc = ServerSocketChannel.open()) {
365 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
366 var sc1 = SocketChannel.open();
367
368 // connect when current thread when it blocks in accept
369 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
370
371 // accept should block
372 var sc2 = ssc.accept();
373 sc1.close();
374 sc2.close();
375 }
376 });
377 }
378
379 /**
380 * SeverSocketChannel close while virtual thread blocked in accept.
381 */
382 @Test
383 void testServerSocketChannelAcceptAsyncClose() throws Exception {
384 VThreadRunner.run(() -> {
385 try (var ssc = ServerSocketChannel.open()) {
386 InetAddress lh = InetAddress.getLoopbackAddress();
387 ssc.bind(new InetSocketAddress(lh, 0));
388 runAfterParkedAsync(ssc::close);
389 try {
390 SocketChannel sc = ssc.accept();
391 sc.close();
392 fail("connection accepted???");
393 } catch (AsynchronousCloseException expected) { }
394 }
395 });
396 }
397
398 /**
399 * Virtual thread interrupted while blocked in ServerSocketChannel accept.
400 */
401 @Test
402 void testServerSocketChannelAcceptInterrupt() throws Exception {
403 VThreadRunner.run(() -> {
404 try (var ssc = ServerSocketChannel.open()) {
405 InetAddress lh = InetAddress.getLoopbackAddress();
406 ssc.bind(new InetSocketAddress(lh, 0));
407
408 // interrupt current thread when it blocks in accept
409 Thread thisThread = Thread.currentThread();
410 runAfterParkedAsync(thisThread::interrupt);
411
412 try {
413 SocketChannel sc = ssc.accept();
414 sc.close();
415 fail("connection accepted???");
416 } catch (ClosedByInterruptException expected) {
417 assertTrue(Thread.interrupted());
418 }
419 }
420 });
421 }
422
423 /**
424 * Virtual thread blocks in ServerSocketChannel adaptor accept.
425 */
426 @ParameterizedTest
427 @ValueSource(ints = { 0, 60_000 })
428 void testSocketChannelAdaptorAccept(int timeout) throws Exception {
429 VThreadRunner.run(() -> {
430 try (var ssc = ServerSocketChannel.open()) {
431 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
432 var sc = SocketChannel.open();
433
434 // interrupt current thread when it blocks in accept
435 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
436
437 // accept should block
438 if (timeout > 0)
439 ssc.socket().setSoTimeout(timeout);
440 Socket s = ssc.socket().accept();
441 sc.close();
442 s.close();
443 }
444 });
445 }
446
447 /**
448 * DatagramChannel receive/send, no blocking.
449 */
450 @Test
451 void testDatagramChannelSendReceive1() throws Exception {
452 VThreadRunner.run(() -> {
453 try (DatagramChannel dc1 = DatagramChannel.open();
454 DatagramChannel dc2 = DatagramChannel.open()) {
455
456 InetAddress lh = InetAddress.getLoopbackAddress();
457 dc2.bind(new InetSocketAddress(lh, 0));
458
459 // send should not block
460 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
461 int n = dc1.send(bb, dc2.getLocalAddress());
462 assertTrue(n > 0);
463
464 // receive should not block
465 bb = ByteBuffer.allocate(10);
466 dc2.receive(bb);
467 assertTrue(bb.get(0) == 'X');
468 }
469 });
470 }
471
472 /**
473 * Virtual thread blocks in DatagramChannel receive.
474 */
475 @Test
476 void testDatagramChannelSendReceive2() throws Exception {
477 VThreadRunner.run(() -> {
478 try (DatagramChannel dc1 = DatagramChannel.open();
479 DatagramChannel dc2 = DatagramChannel.open()) {
480
481 InetAddress lh = InetAddress.getLoopbackAddress();
482 dc2.bind(new InetSocketAddress(lh, 0));
483
484 // send from dc1 when current thread blocked in dc2.receive
485 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
486 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
487
488 // read from dc2 should block
489 ByteBuffer bb2 = ByteBuffer.allocate(10);
490 dc2.receive(bb2);
491 assertTrue(bb2.get(0) == 'X');
492 }
493 });
494 }
495
496 /**
497 * DatagramChannel close while virtual thread blocked in receive.
498 */
499 @Test
500 void testDatagramChannelReceiveAsyncClose() throws Exception {
501 VThreadRunner.run(() -> {
502 try (DatagramChannel dc = DatagramChannel.open()) {
503 InetAddress lh = InetAddress.getLoopbackAddress();
504 dc.bind(new InetSocketAddress(lh, 0));
505 runAfterParkedAsync(dc::close);
506 try {
507 dc.receive(ByteBuffer.allocate(100));
508 fail("receive returned");
509 } catch (AsynchronousCloseException expected) { }
510 }
511 });
512 }
513
514 /**
515 * Virtual thread interrupted while blocked in DatagramChannel receive.
516 */
517 @Test
518 void testDatagramChannelReceiveInterrupt() throws Exception {
519 VThreadRunner.run(() -> {
520 try (DatagramChannel dc = DatagramChannel.open()) {
521 InetAddress lh = InetAddress.getLoopbackAddress();
522 dc.bind(new InetSocketAddress(lh, 0));
523
524 // interrupt current thread when it blocks in receive
525 Thread thisThread = Thread.currentThread();
526 runAfterParkedAsync(thisThread::interrupt);
527
528 try {
529 dc.receive(ByteBuffer.allocate(100));
530 fail("receive returned");
531 } catch (ClosedByInterruptException expected) {
532 assertTrue(Thread.interrupted());
533 }
534 }
535 });
536 }
537
538 /**
539 * Virtual thread blocks in DatagramSocket adaptor receive.
540 */
541 @ParameterizedTest
542 @ValueSource(ints = { 0, 60_000 })
543 void testDatagramSocketAdaptorReceive(int timeout) throws Exception {
544 VThreadRunner.run(() -> {
545 try (DatagramChannel dc1 = DatagramChannel.open();
546 DatagramChannel dc2 = DatagramChannel.open()) {
547
548 InetAddress lh = InetAddress.getLoopbackAddress();
549 dc2.bind(new InetSocketAddress(lh, 0));
550
551 // send from dc1 when current thread blocks in dc2 receive
552 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
553 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
554
555 // receive should block
556 byte[] array = new byte[100];
557 DatagramPacket p = new DatagramPacket(array, 0, array.length);
558 if (timeout > 0)
559 dc2.socket().setSoTimeout(timeout);
560 dc2.socket().receive(p);
561 assertTrue(p.getLength() == 3 && array[0] == 'X');
562 }
563 });
564 }
565
566 /**
567 * DatagramChannel close while virtual thread blocked in adaptor receive.
568 */
569 @ParameterizedTest
570 @ValueSource(ints = { 0, 60_000 })
571 void testDatagramSocketAdaptorReceiveAsyncClose(int timeout) throws Exception {
572 VThreadRunner.run(() -> {
573 try (DatagramChannel dc = DatagramChannel.open()) {
574 InetAddress lh = InetAddress.getLoopbackAddress();
575 dc.bind(new InetSocketAddress(lh, 0));
576
577 byte[] array = new byte[100];
578 DatagramPacket p = new DatagramPacket(array, 0, array.length);
579 if (timeout > 0)
580 dc.socket().setSoTimeout(timeout);
581
582 // close channel/socket when current thread blocks in receive
583 runAfterParkedAsync(dc::close);
584
585 assertThrows(SocketException.class, () -> dc.socket().receive(p));
586 }
587 });
588 }
589
590 /**
591 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
592 */
593 @ParameterizedTest
594 @ValueSource(ints = { 0, 60_000 })
595 void testDatagramSocketAdaptorReceiveInterrupt(int timeout) throws Exception {
596 VThreadRunner.run(() -> {
597 try (DatagramChannel dc = DatagramChannel.open()) {
598 InetAddress lh = InetAddress.getLoopbackAddress();
599 dc.bind(new InetSocketAddress(lh, 0));
600
601 byte[] array = new byte[100];
602 DatagramPacket p = new DatagramPacket(array, 0, array.length);
603 if (timeout > 0)
604 dc.socket().setSoTimeout(timeout);
605
606 // interrupt current thread when it blocks in receive
607 Thread thisThread = Thread.currentThread();
608 runAfterParkedAsync(thisThread::interrupt);
609
610 try {
611 dc.socket().receive(p);
612 fail();
613 } catch (ClosedByInterruptException expected) {
614 assertTrue(Thread.interrupted());
615 }
616 }
617 });
618 }
619
620 /**
621 * Pipe read/write, no blocking.
622 */
623 @Test
624 void testPipeReadWrite1() throws Exception {
625 VThreadRunner.run(() -> {
626 Pipe p = Pipe.open();
627 try (Pipe.SinkChannel sink = p.sink();
628 Pipe.SourceChannel source = p.source()) {
629
630 // write should not block
631 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
632 int n = sink.write(bb);
633 assertTrue(n > 0);
634
635 // read should not block
636 bb = ByteBuffer.allocate(10);
637 n = source.read(bb);
638 assertTrue(n > 0);
639 assertTrue(bb.get(0) == 'X');
640 }
641 });
642 }
643
644 /**
645 * Virtual thread blocks in Pipe.SourceChannel read.
646 */
647 @Test
648 void testPipeReadWrite2() throws Exception {
649 VThreadRunner.run(() -> {
650 Pipe p = Pipe.open();
651 try (Pipe.SinkChannel sink = p.sink();
652 Pipe.SourceChannel source = p.source()) {
653
654 // write from sink when current thread blocks reading from source
655 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
656 runAfterParkedAsync(() -> sink.write(bb1));
657
658 // read should block
659 ByteBuffer bb2 = ByteBuffer.allocate(10);
660 int n = source.read(bb2);
661 assertTrue(n > 0);
662 assertTrue(bb2.get(0) == 'X');
663 }
664 });
665 }
666
667 /**
668 * Virtual thread blocks in Pipe.SinkChannel write.
669 */
670 @Test
671 void testPipeReadWrite3() throws Exception {
672 VThreadRunner.run(() -> {
673 Pipe p = Pipe.open();
674 try (Pipe.SinkChannel sink = p.sink();
675 Pipe.SourceChannel source = p.source()) {
676
677 // read from source to EOF when current thread blocking in write
678 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
679
680 // write to sink should block
681 ByteBuffer bb = ByteBuffer.allocate(100*1024);
682 for (int i=0; i<1000; i++) {
683 int n = sink.write(bb);
684 assertTrue(n > 0);
685 bb.clear();
686 }
687 sink.close();
688
689 // wait for reader to finish
690 reader.join();
691 }
692 });
693 }
694
695 /**
696 * Pipe.SourceChannel close while virtual thread blocked in read.
697 */
698 @Test
699 void testPipeReadAsyncClose() throws Exception {
700 VThreadRunner.run(() -> {
701 Pipe p = Pipe.open();
702 try (Pipe.SinkChannel sink = p.sink();
703 Pipe.SourceChannel source = p.source()) {
704 runAfterParkedAsync(source::close);
705 try {
706 int n = source.read(ByteBuffer.allocate(100));
707 fail("read returned " + n);
708 } catch (AsynchronousCloseException expected) { }
709 }
710 });
711 }
712
713 /**
714 * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
715 */
716 @Test
717 void testPipeReadInterrupt() throws Exception {
718 VThreadRunner.run(() -> {
719 Pipe p = Pipe.open();
720 try (Pipe.SinkChannel sink = p.sink();
721 Pipe.SourceChannel source = p.source()) {
722
723 // interrupt current thread when it blocks reading from source
724 Thread thisThread = Thread.currentThread();
725 runAfterParkedAsync(thisThread::interrupt);
726
727 try {
728 int n = source.read(ByteBuffer.allocate(100));
729 fail("read returned " + n);
730 } catch (ClosedByInterruptException expected) {
731 assertTrue(Thread.interrupted());
732 }
733 }
734 });
735 }
736
737 /**
738 * Pipe.SinkChannel close while virtual thread blocked in write.
739 */
740 @Test
741 void testPipeWriteAsyncClose() throws Exception {
742 VThreadRunner.run(() -> {
743 boolean done = false;
744 while (!done) {
745 Pipe p = Pipe.open();
746 try (Pipe.SinkChannel sink = p.sink();
747 Pipe.SourceChannel source = p.source()) {
748
749 // close sink when current thread blocks in write
750 runAfterParkedAsync(sink::close, true);
751
752 // write until channel is closed
753 try {
754 ByteBuffer bb = ByteBuffer.allocate(100*1024);
755 for (;;) {
756 int n = sink.write(bb);
757 assertTrue(n > 0);
758 bb.clear();
759 }
760 } catch (AsynchronousCloseException e) {
761 // closed when blocked in write
762 done = true;
763 } catch (ClosedChannelException e) {
764 // closed but not blocked in write, need to retry test
765 System.err.format("%s, need to retry!%n", e);
766 }
767 }
768 }
769 });
770 }
771
772 /**
773 * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
774 */
775 @Test
776 void testPipeWriteInterrupt() throws Exception {
777 VThreadRunner.run(() -> {
778 boolean done = false;
779 while (!done) {
780 Pipe p = Pipe.open();
781 try (Pipe.SinkChannel sink = p.sink();
782 Pipe.SourceChannel source = p.source()) {
783
784 // interrupt current thread when it blocks in write
785 Thread thisThread = Thread.currentThread();
786 runAfterParkedAsync(thisThread::interrupt, true);
787
788 // write until channel is closed
789 try {
790 ByteBuffer bb = ByteBuffer.allocate(100*1024);
791 for (;;) {
792 int n = sink.write(bb);
793 assertTrue(n > 0);
794 bb.clear();
795 }
796 } catch (ClosedByInterruptException expected) {
797 // closed when blocked in write
798 assertTrue(Thread.interrupted());
799 done = true;
800 } catch (ClosedChannelException e) {
801 // closed but not blocked in write, need to retry test
802 System.err.format("%s, need to retry!%n", e);
803 }
804 }
805 }
806 });
807 }
808
809 /**
810 * Creates a loopback connection
811 */
812 static class Connection implements Closeable {
813 private final SocketChannel sc1;
814 private final SocketChannel sc2;
815 Connection() throws IOException {
816 var lh = InetAddress.getLoopbackAddress();
817 try (var listener = ServerSocketChannel.open()) {
818 listener.bind(new InetSocketAddress(lh, 0));
819 SocketChannel sc1 = SocketChannel.open();
820 SocketChannel sc2 = null;
821 try {
822 sc1.socket().connect(listener.getLocalAddress());
823 sc2 = listener.accept();
824 } catch (IOException ioe) {
825 sc1.close();
826 throw ioe;
827 }
828 this.sc1 = sc1;
829 this.sc2 = sc2;
830 }
831 }
832 SocketChannel channel1() {
833 return sc1;
834 }
835 SocketChannel channel2() {
836 return sc2;
837 }
838 @Override
839 public void close() throws IOException {
840 sc1.close();
841 sc2.close();
842 }
843 }
844
845 /**
846 * Read from a channel until all bytes have been read or an I/O error occurs.
847 */
848 static void readToEOF(ReadableByteChannel rbc) throws IOException {
849 ByteBuffer bb = ByteBuffer.allocate(16*1024);
850 int n;
851 while ((n = rbc.read(bb)) > 0) {
852 bb.clear();
853 }
854 }
855
856 @FunctionalInterface
857 interface ThrowingRunnable {
858 void run() throws Exception;
859 }
860
861 /**
862 * Runs the given task asynchronously after the current virtual thread parks.
863 * @param writing if the thread will block in write
864 * @return the thread started to run the task
865 */
866 private static Thread runAfterParkedAsync(ThrowingRunnable task, boolean writing) {
867 Thread target = Thread.currentThread();
868 if (!target.isVirtual())
869 throw new WrongThreadException();
870 return Thread.ofPlatform().daemon().start(() -> {
871 try {
872 // wait for target thread to park
873 while (!isWaiting(target)) {
874 Thread.sleep(20);
875 }
876
877 // if the target thread is parked in write then we nudge it a few times
878 // to avoid wakeup with some bytes written
879 if (writing) {
880 for (int i = 0; i < 3; i++) {
881 LockSupport.unpark(target);
882 while (!isWaiting(target)) {
883 Thread.sleep(20);
884 }
885 }
886 }
887
888 task.run();
889
890 } catch (Exception e) {
891 e.printStackTrace();
892 }
893 });
894 }
895
896 private static Thread runAfterParkedAsync(ThrowingRunnable task) {
897 return runAfterParkedAsync(task, false);
898 }
899
900 /**
901 * Return true if the given Thread is parked.
902 */
903 private static boolean isWaiting(Thread target) {
904 Thread.State state = target.getState();
905 assertNotEquals(Thread.State.TERMINATED, state);
906 return (state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING);
907 }
908 }