1 /*
2 * Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24 /*
25 * @test id=default
26 * @bug 8284161
27 * @summary Test virtual threads doing blocking I/O on NIO channels
28 * @library /test/lib
29 * @run junit/othervm/timeout=480 BlockingChannelOps
30 */
31
32 /*
33 * @test id=poller-modes
34 * @requires (os.family == "linux") | (os.family == "mac")
35 * @library /test/lib
36 * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 BlockingChannelOps
37 * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 BlockingChannelOps
38 * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 BlockingChannelOps
39 */
40
41 /*
42 * @test id=io_uring
43 * @requires os.family == "linux"
44 * @library /test/lib
45 * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 -Djdk.io_uring=true BlockingChannelOps
46 * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 -Djdk.io_uring=true BlockingChannelOps
47 * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 -Djdk.io_uring=true BlockingChannelOps
48 * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps
49 * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps
50 * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps
51 */
52
53 /*
54 * @test id=no-vmcontinuations
55 * @requires vm.continuations
56 * @library /test/lib
57 * @run junit/othervm/timeout=480 -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
58 */
59
60 import java.io.Closeable;
61 import java.io.IOException;
62 import java.net.DatagramPacket;
63 import java.net.InetAddress;
64 import java.net.InetSocketAddress;
65 import java.net.Socket;
66 import java.net.SocketAddress;
67 import java.net.SocketException;
68 import java.nio.ByteBuffer;
69 import java.nio.channels.AsynchronousCloseException;
70 import java.nio.channels.ClosedByInterruptException;
71 import java.nio.channels.ClosedChannelException;
72 import java.nio.channels.DatagramChannel;
73 import java.nio.channels.Pipe;
74 import java.nio.channels.ReadableByteChannel;
75 import java.nio.channels.ServerSocketChannel;
76 import java.nio.channels.SocketChannel;
77 import java.nio.channels.WritableByteChannel;
78 import java.util.concurrent.locks.LockSupport;
79
80 import jdk.test.lib.thread.VThreadRunner;
81 import org.junit.jupiter.api.Test;
82 import org.junit.jupiter.params.ParameterizedTest;
83 import org.junit.jupiter.params.provider.ValueSource;
84 import static org.junit.jupiter.api.Assertions.*;
85
86 class BlockingChannelOps {
87
88 /**
89 * SocketChannel read/write, no blocking.
90 */
91 @Test
92 void testSocketChannelReadWrite1() throws Exception {
93 VThreadRunner.run(() -> {
94 try (var connection = new Connection()) {
95 SocketChannel sc1 = connection.channel1();
96 SocketChannel sc2 = connection.channel2();
97
98 // write to sc1
99 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
100 int n = sc1.write(bb);
101 assertTrue(n > 0);
102
103 // read from sc2 should not block
104 bb = ByteBuffer.allocate(10);
105 n = sc2.read(bb);
106 assertTrue(n > 0);
107 assertTrue(bb.get(0) == 'X');
108 }
109 });
110 }
111
112 /**
113 * Virtual thread blocks in SocketChannel read.
114 */
115 @Test
116 void testSocketChannelRead() throws Exception {
117 VThreadRunner.run(() -> {
118 try (var connection = new Connection()) {
119 SocketChannel sc1 = connection.channel1();
120 SocketChannel sc2 = connection.channel2();
121
122 // write to sc1 when current thread blocks in sc2.read
123 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
124 runAfterParkedAsync(() -> sc1.write(bb1));
125
126 // read from sc2 should block
127 ByteBuffer bb2 = ByteBuffer.allocate(10);
128 int n = sc2.read(bb2);
129 assertTrue(n > 0);
130 assertTrue(bb2.get(0) == 'X');
131 }
132 });
133 }
134
135 /**
136 * Virtual thread blocks in SocketChannel write.
137 */
138 @Test
139 void testSocketChannelWrite() throws Exception {
140 VThreadRunner.run(() -> {
141 try (var connection = new Connection()) {
142 SocketChannel sc1 = connection.channel1();
143 SocketChannel sc2 = connection.channel2();
144
145 // read from sc2 to EOF when current thread blocks in sc1.write
146 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
147
148 // write to sc1 should block
149 ByteBuffer bb = ByteBuffer.allocate(100*1024);
150 for (int i=0; i<1000; i++) {
151 int n = sc1.write(bb);
152 assertTrue(n > 0);
153 bb.clear();
154 }
155 sc1.close();
156
157 // wait for reader to finish
158 reader.join();
159 }
160 });
161 }
162
163 /**
164 * SocketChannel close while virtual thread blocked in read.
165 */
166 @Test
167 void testSocketChannelReadAsyncClose() throws Exception {
168 VThreadRunner.run(() -> {
169 try (var connection = new Connection()) {
170 SocketChannel sc = connection.channel1();
171 runAfterParkedAsync(sc::close);
172 try {
173 int n = sc.read(ByteBuffer.allocate(100));
174 fail("read returned " + n);
175 } catch (AsynchronousCloseException expected) { }
176 }
177 });
178 }
179
180 /**
181 * SocketChannel shutdownInput while virtual thread blocked in read.
182 */
183 @Test
184 void testSocketChannelReadAsyncShutdownInput() throws Exception {
185 VThreadRunner.run(() -> {
186 try (var connection = new Connection()) {
187 SocketChannel sc = connection.channel1();
188 runAfterParkedAsync(sc::shutdownInput);
189 int n = sc.read(ByteBuffer.allocate(100));
190 assertEquals(-1, n);
191 assertTrue(sc.isOpen());
192 }
193 });
194 }
195
196 /**
197 * Virtual thread interrupted while blocked in SocketChannel read.
198 */
199 @Test
200 void testSocketChannelReadInterrupt() throws Exception {
201 VThreadRunner.run(() -> {
202 try (var connection = new Connection()) {
203 SocketChannel sc = connection.channel1();
204
205 // interrupt current thread when it blocks in read
206 Thread thisThread = Thread.currentThread();
207 runAfterParkedAsync(thisThread::interrupt);
208
209 try {
210 int n = sc.read(ByteBuffer.allocate(100));
211 fail("read returned " + n);
212 } catch (ClosedByInterruptException expected) {
213 assertTrue(Thread.interrupted());
214 }
215 }
216 });
217 }
218
219 /**
220 * SocketChannel close while virtual thread blocked in write.
221 */
222 @Test
223 void testSocketChannelWriteAsyncClose() throws Exception {
224 VThreadRunner.run(() -> {
225 boolean done = false;
226 while (!done) {
227 try (var connection = new Connection()) {
228 SocketChannel sc = connection.channel1();
229
230 // close sc when current thread blocks in write
231 runAfterParkedAsync(sc::close, true);
232
233 // write until channel is closed
234 try {
235 ByteBuffer bb = ByteBuffer.allocate(100*1024);
236 for (;;) {
237 int n = sc.write(bb);
238 assertTrue(n > 0);
239 bb.clear();
240 }
241 } catch (AsynchronousCloseException expected) {
242 // closed when blocked in write
243 done = true;
244 } catch (ClosedChannelException e) {
245 // closed but not blocked in write, need to retry test
246 System.err.format("%s, need to retry!%n", e);
247 }
248 }
249 }
250 });
251 }
252
253
254 /**
255 * SocketChannel shutdownOutput while virtual thread blocked in write.
256 */
257 @Test
258 void testSocketChannelWriteAsyncShutdownOutput() throws Exception {
259 VThreadRunner.run(() -> {
260 try (var connection = new Connection()) {
261 SocketChannel sc = connection.channel1();
262
263 // shutdown output when current thread blocks in write
264 runAfterParkedAsync(sc::shutdownOutput);
265 try {
266 ByteBuffer bb = ByteBuffer.allocate(100*1024);
267 for (;;) {
268 int n = sc.write(bb);
269 assertTrue(n > 0);
270 bb.clear();
271 }
272 } catch (ClosedChannelException e) {
273 // expected
274 }
275 assertTrue(sc.isOpen());
276 }
277 });
278 }
279
280 /**
281 * Virtual thread interrupted while blocked in SocketChannel write.
282 */
283 @Test
284 void testSocketChannelWriteInterrupt() throws Exception {
285 VThreadRunner.run(() -> {
286 boolean done = false;
287 while (!done) {
288 try (var connection = new Connection()) {
289 SocketChannel sc = connection.channel1();
290
291 // interrupt current thread when it blocks in write
292 Thread thisThread = Thread.currentThread();
293 runAfterParkedAsync(thisThread::interrupt, true);
294
295 // write until channel is closed
296 try {
297 ByteBuffer bb = ByteBuffer.allocate(100*1024);
298 for (;;) {
299 int n = sc.write(bb);
300 assertTrue(n > 0);
301 bb.clear();
302 }
303 } catch (ClosedByInterruptException e) {
304 // closed when blocked in write
305 assertTrue(Thread.interrupted());
306 done = true;
307 } catch (ClosedChannelException e) {
308 // closed but not blocked in write, need to retry test
309 System.err.format("%s, need to retry!%n", e);
310 }
311 }
312 }
313 });
314 }
315
316 /**
317 * Virtual thread blocks in SocketChannel adaptor read.
318 */
319 @ParameterizedTest
320 @ValueSource(ints = { 0, 60_000 })
321 void testSocketAdaptorRead(int timeout) throws Exception {
322 VThreadRunner.run(() -> {
323 try (var connection = new Connection()) {
324 SocketChannel sc1 = connection.channel1();
325 SocketChannel sc2 = connection.channel2();
326
327 // write to sc1 when currnet thread blocks reading from sc2
328 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
329 runAfterParkedAsync(() -> sc1.write(bb));
330
331 // read from sc2 should block
332 byte[] array = new byte[100];
333 if (timeout > 0)
334 sc2.socket().setSoTimeout(timeout);
335 int n = sc2.socket().getInputStream().read(array);
336 assertTrue(n > 0);
337 assertTrue(array[0] == 'X');
338 }
339 });
340 }
341
342 /**
343 * ServerSocketChannel accept, no blocking.
344 */
345 @Test
346 void testServerSocketChannelAccept1() throws Exception {
347 VThreadRunner.run(() -> {
348 try (var ssc = ServerSocketChannel.open()) {
349 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
350 var sc1 = SocketChannel.open(ssc.getLocalAddress());
351 // accept should not block
352 var sc2 = ssc.accept();
353 sc1.close();
354 sc2.close();
355 }
356 });
357 }
358
359 /**
360 * Virtual thread blocks in ServerSocketChannel accept.
361 */
362 @Test
363 void testServerSocketChannelAccept2() throws Exception {
364 VThreadRunner.run(() -> {
365 try (var ssc = ServerSocketChannel.open()) {
366 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
367 var sc1 = SocketChannel.open();
368
369 // connect when current thread when it blocks in accept
370 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
371
372 // accept should block
373 var sc2 = ssc.accept();
374 sc1.close();
375 sc2.close();
376 }
377 });
378 }
379
380 /**
381 * SeverSocketChannel close while virtual thread blocked in accept.
382 */
383 @Test
384 void testServerSocketChannelAcceptAsyncClose() throws Exception {
385 VThreadRunner.run(() -> {
386 try (var ssc = ServerSocketChannel.open()) {
387 InetAddress lh = InetAddress.getLoopbackAddress();
388 ssc.bind(new InetSocketAddress(lh, 0));
389 runAfterParkedAsync(ssc::close);
390 try {
391 SocketChannel sc = ssc.accept();
392 sc.close();
393 fail("connection accepted???");
394 } catch (AsynchronousCloseException expected) { }
395 }
396 });
397 }
398
399 /**
400 * Virtual thread interrupted while blocked in ServerSocketChannel accept.
401 */
402 @Test
403 void testServerSocketChannelAcceptInterrupt() throws Exception {
404 VThreadRunner.run(() -> {
405 try (var ssc = ServerSocketChannel.open()) {
406 InetAddress lh = InetAddress.getLoopbackAddress();
407 ssc.bind(new InetSocketAddress(lh, 0));
408
409 // interrupt current thread when it blocks in accept
410 Thread thisThread = Thread.currentThread();
411 runAfterParkedAsync(thisThread::interrupt);
412
413 try {
414 SocketChannel sc = ssc.accept();
415 sc.close();
416 fail("connection accepted???");
417 } catch (ClosedByInterruptException expected) {
418 assertTrue(Thread.interrupted());
419 }
420 }
421 });
422 }
423
424 /**
425 * Virtual thread blocks in ServerSocketChannel adaptor accept.
426 */
427 @ParameterizedTest
428 @ValueSource(ints = { 0, 60_000 })
429 void testSocketChannelAdaptorAccept(int timeout) throws Exception {
430 VThreadRunner.run(() -> {
431 try (var ssc = ServerSocketChannel.open()) {
432 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
433 var sc = SocketChannel.open();
434
435 // interrupt current thread when it blocks in accept
436 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
437
438 // accept should block
439 if (timeout > 0)
440 ssc.socket().setSoTimeout(timeout);
441 Socket s = ssc.socket().accept();
442 sc.close();
443 s.close();
444 }
445 });
446 }
447
448 /**
449 * DatagramChannel receive/send, no blocking.
450 */
451 @Test
452 void testDatagramChannelSendReceive1() throws Exception {
453 VThreadRunner.run(() -> {
454 try (DatagramChannel dc1 = DatagramChannel.open();
455 DatagramChannel dc2 = DatagramChannel.open()) {
456
457 InetAddress lh = InetAddress.getLoopbackAddress();
458 dc2.bind(new InetSocketAddress(lh, 0));
459
460 // send should not block
461 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
462 int n = dc1.send(bb, dc2.getLocalAddress());
463 assertTrue(n > 0);
464
465 // receive should not block
466 bb = ByteBuffer.allocate(10);
467 dc2.receive(bb);
468 assertTrue(bb.get(0) == 'X');
469 }
470 });
471 }
472
473 /**
474 * Virtual thread blocks in DatagramChannel receive.
475 */
476 @Test
477 void testDatagramChannelSendReceive2() throws Exception {
478 VThreadRunner.run(() -> {
479 try (DatagramChannel dc1 = DatagramChannel.open();
480 DatagramChannel dc2 = DatagramChannel.open()) {
481
482 InetAddress lh = InetAddress.getLoopbackAddress();
483 dc2.bind(new InetSocketAddress(lh, 0));
484
485 // send from dc1 when current thread blocked in dc2.receive
486 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
487 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
488
489 // read from dc2 should block
490 ByteBuffer bb2 = ByteBuffer.allocate(10);
491 dc2.receive(bb2);
492 assertTrue(bb2.get(0) == 'X');
493 }
494 });
495 }
496
497 /**
498 * DatagramChannel close while virtual thread blocked in receive.
499 */
500 @Test
501 void testDatagramChannelReceiveAsyncClose() throws Exception {
502 VThreadRunner.run(() -> {
503 try (DatagramChannel dc = DatagramChannel.open()) {
504 InetAddress lh = InetAddress.getLoopbackAddress();
505 dc.bind(new InetSocketAddress(lh, 0));
506 runAfterParkedAsync(dc::close);
507 try {
508 dc.receive(ByteBuffer.allocate(100));
509 fail("receive returned");
510 } catch (AsynchronousCloseException expected) { }
511 }
512 });
513 }
514
515 /**
516 * Virtual thread interrupted while blocked in DatagramChannel receive.
517 */
518 @Test
519 void testDatagramChannelReceiveInterrupt() throws Exception {
520 VThreadRunner.run(() -> {
521 try (DatagramChannel dc = DatagramChannel.open()) {
522 InetAddress lh = InetAddress.getLoopbackAddress();
523 dc.bind(new InetSocketAddress(lh, 0));
524
525 // interrupt current thread when it blocks in receive
526 Thread thisThread = Thread.currentThread();
527 runAfterParkedAsync(thisThread::interrupt);
528
529 try {
530 dc.receive(ByteBuffer.allocate(100));
531 fail("receive returned");
532 } catch (ClosedByInterruptException expected) {
533 assertTrue(Thread.interrupted());
534 }
535 }
536 });
537 }
538
539 /**
540 * Virtual thread blocks in DatagramSocket adaptor receive.
541 */
542 @ParameterizedTest
543 @ValueSource(ints = { 0, 60_000 })
544 void testDatagramSocketAdaptorReceive(int timeout) throws Exception {
545 VThreadRunner.run(() -> {
546 try (DatagramChannel dc1 = DatagramChannel.open();
547 DatagramChannel dc2 = DatagramChannel.open()) {
548
549 InetAddress lh = InetAddress.getLoopbackAddress();
550 dc2.bind(new InetSocketAddress(lh, 0));
551
552 // send from dc1 when current thread blocks in dc2 receive
553 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
554 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
555
556 // receive should block
557 byte[] array = new byte[100];
558 DatagramPacket p = new DatagramPacket(array, 0, array.length);
559 if (timeout > 0)
560 dc2.socket().setSoTimeout(timeout);
561 dc2.socket().receive(p);
562 assertTrue(p.getLength() == 3 && array[0] == 'X');
563 }
564 });
565 }
566
567 /**
568 * DatagramChannel close while virtual thread blocked in adaptor receive.
569 */
570 @ParameterizedTest
571 @ValueSource(ints = { 0, 60_000 })
572 void testDatagramSocketAdaptorReceiveAsyncClose(int timeout) throws Exception {
573 VThreadRunner.run(() -> {
574 try (DatagramChannel dc = DatagramChannel.open()) {
575 InetAddress lh = InetAddress.getLoopbackAddress();
576 dc.bind(new InetSocketAddress(lh, 0));
577
578 byte[] array = new byte[100];
579 DatagramPacket p = new DatagramPacket(array, 0, array.length);
580 if (timeout > 0)
581 dc.socket().setSoTimeout(timeout);
582
583 // close channel/socket when current thread blocks in receive
584 runAfterParkedAsync(dc::close);
585
586 assertThrows(SocketException.class, () -> dc.socket().receive(p));
587 }
588 });
589 }
590
591 /**
592 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
593 */
594 @ParameterizedTest
595 @ValueSource(ints = { 0, 60_000 })
596 void testDatagramSocketAdaptorReceiveInterrupt(int timeout) throws Exception {
597 VThreadRunner.run(() -> {
598 try (DatagramChannel dc = DatagramChannel.open()) {
599 InetAddress lh = InetAddress.getLoopbackAddress();
600 dc.bind(new InetSocketAddress(lh, 0));
601
602 byte[] array = new byte[100];
603 DatagramPacket p = new DatagramPacket(array, 0, array.length);
604 if (timeout > 0)
605 dc.socket().setSoTimeout(timeout);
606
607 // interrupt current thread when it blocks in receive
608 Thread thisThread = Thread.currentThread();
609 runAfterParkedAsync(thisThread::interrupt);
610
611 try {
612 dc.socket().receive(p);
613 fail();
614 } catch (ClosedByInterruptException expected) {
615 assertTrue(Thread.interrupted());
616 }
617 }
618 });
619 }
620
621 /**
622 * Pipe read/write, no blocking.
623 */
624 @Test
625 void testPipeReadWrite1() throws Exception {
626 VThreadRunner.run(() -> {
627 Pipe p = Pipe.open();
628 try (Pipe.SinkChannel sink = p.sink();
629 Pipe.SourceChannel source = p.source()) {
630
631 // write should not block
632 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
633 int n = sink.write(bb);
634 assertTrue(n > 0);
635
636 // read should not block
637 bb = ByteBuffer.allocate(10);
638 n = source.read(bb);
639 assertTrue(n > 0);
640 assertTrue(bb.get(0) == 'X');
641 }
642 });
643 }
644
645 /**
646 * Virtual thread blocks in Pipe.SourceChannel read.
647 */
648 @Test
649 void testPipeReadWrite2() throws Exception {
650 VThreadRunner.run(() -> {
651 Pipe p = Pipe.open();
652 try (Pipe.SinkChannel sink = p.sink();
653 Pipe.SourceChannel source = p.source()) {
654
655 // write from sink when current thread blocks reading from source
656 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
657 runAfterParkedAsync(() -> sink.write(bb1));
658
659 // read should block
660 ByteBuffer bb2 = ByteBuffer.allocate(10);
661 int n = source.read(bb2);
662 assertTrue(n > 0);
663 assertTrue(bb2.get(0) == 'X');
664 }
665 });
666 }
667
668 /**
669 * Virtual thread blocks in Pipe.SinkChannel write.
670 */
671 @Test
672 void testPipeReadWrite3() throws Exception {
673 VThreadRunner.run(() -> {
674 Pipe p = Pipe.open();
675 try (Pipe.SinkChannel sink = p.sink();
676 Pipe.SourceChannel source = p.source()) {
677
678 // read from source to EOF when current thread blocking in write
679 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
680
681 // write to sink should block
682 ByteBuffer bb = ByteBuffer.allocate(100*1024);
683 for (int i=0; i<1000; i++) {
684 int n = sink.write(bb);
685 assertTrue(n > 0);
686 bb.clear();
687 }
688 sink.close();
689
690 // wait for reader to finish
691 reader.join();
692 }
693 });
694 }
695
696 /**
697 * Pipe.SourceChannel close while virtual thread blocked in read.
698 */
699 @Test
700 void testPipeReadAsyncClose() throws Exception {
701 VThreadRunner.run(() -> {
702 Pipe p = Pipe.open();
703 try (Pipe.SinkChannel sink = p.sink();
704 Pipe.SourceChannel source = p.source()) {
705 runAfterParkedAsync(source::close);
706 try {
707 int n = source.read(ByteBuffer.allocate(100));
708 fail("read returned " + n);
709 } catch (AsynchronousCloseException expected) { }
710 }
711 });
712 }
713
714 /**
715 * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
716 */
717 @Test
718 void testPipeReadInterrupt() throws Exception {
719 VThreadRunner.run(() -> {
720 Pipe p = Pipe.open();
721 try (Pipe.SinkChannel sink = p.sink();
722 Pipe.SourceChannel source = p.source()) {
723
724 // interrupt current thread when it blocks reading from source
725 Thread thisThread = Thread.currentThread();
726 runAfterParkedAsync(thisThread::interrupt);
727
728 try {
729 int n = source.read(ByteBuffer.allocate(100));
730 fail("read returned " + n);
731 } catch (ClosedByInterruptException expected) {
732 assertTrue(Thread.interrupted());
733 }
734 }
735 });
736 }
737
738 /**
739 * Pipe.SinkChannel close while virtual thread blocked in write.
740 */
741 @Test
742 void testPipeWriteAsyncClose() throws Exception {
743 VThreadRunner.run(() -> {
744 boolean done = false;
745 while (!done) {
746 Pipe p = Pipe.open();
747 try (Pipe.SinkChannel sink = p.sink();
748 Pipe.SourceChannel source = p.source()) {
749
750 // close sink when current thread blocks in write
751 runAfterParkedAsync(sink::close, true);
752
753 // write until channel is closed
754 try {
755 ByteBuffer bb = ByteBuffer.allocate(100*1024);
756 for (;;) {
757 int n = sink.write(bb);
758 assertTrue(n > 0);
759 bb.clear();
760 }
761 } catch (AsynchronousCloseException e) {
762 // closed when blocked in write
763 done = true;
764 } catch (ClosedChannelException e) {
765 // closed but not blocked in write, need to retry test
766 System.err.format("%s, need to retry!%n", e);
767 }
768 }
769 }
770 });
771 }
772
773 /**
774 * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
775 */
776 @Test
777 void testPipeWriteInterrupt() throws Exception {
778 VThreadRunner.run(() -> {
779 boolean done = false;
780 while (!done) {
781 Pipe p = Pipe.open();
782 try (Pipe.SinkChannel sink = p.sink();
783 Pipe.SourceChannel source = p.source()) {
784
785 // interrupt current thread when it blocks in write
786 Thread thisThread = Thread.currentThread();
787 runAfterParkedAsync(thisThread::interrupt, true);
788
789 // write until channel is closed
790 try {
791 ByteBuffer bb = ByteBuffer.allocate(100*1024);
792 for (;;) {
793 int n = sink.write(bb);
794 assertTrue(n > 0);
795 bb.clear();
796 }
797 } catch (ClosedByInterruptException expected) {
798 // closed when blocked in write
799 assertTrue(Thread.interrupted());
800 done = true;
801 } catch (ClosedChannelException e) {
802 // closed but not blocked in write, need to retry test
803 System.err.format("%s, need to retry!%n", e);
804 }
805 }
806 }
807 });
808 }
809
810 /**
811 * Creates a loopback connection
812 */
813 static class Connection implements Closeable {
814 private final SocketChannel sc1;
815 private final SocketChannel sc2;
816 Connection() throws IOException {
817 var lh = InetAddress.getLoopbackAddress();
818 try (var listener = ServerSocketChannel.open()) {
819 listener.bind(new InetSocketAddress(lh, 0));
820 SocketChannel sc1 = SocketChannel.open();
821 SocketChannel sc2 = null;
822 try {
823 sc1.socket().connect(listener.getLocalAddress());
824 sc2 = listener.accept();
825 } catch (IOException ioe) {
826 sc1.close();
827 throw ioe;
828 }
829 this.sc1 = sc1;
830 this.sc2 = sc2;
831 }
832 }
833 SocketChannel channel1() {
834 return sc1;
835 }
836 SocketChannel channel2() {
837 return sc2;
838 }
839 @Override
840 public void close() throws IOException {
841 sc1.close();
842 sc2.close();
843 }
844 }
845
846 /**
847 * Read from a channel until all bytes have been read or an I/O error occurs.
848 */
849 static void readToEOF(ReadableByteChannel rbc) throws IOException {
850 ByteBuffer bb = ByteBuffer.allocate(16*1024);
851 int n;
852 while ((n = rbc.read(bb)) > 0) {
853 bb.clear();
854 }
855 }
856
857 @FunctionalInterface
858 interface ThrowingRunnable {
859 void run() throws Exception;
860 }
861
862 /**
863 * Runs the given task asynchronously after the current virtual thread parks.
864 * @param writing if the thread will block in write
865 * @return the thread started to run the task
866 */
867 private static Thread runAfterParkedAsync(ThrowingRunnable task, boolean writing) {
868 Thread target = Thread.currentThread();
869 if (!target.isVirtual())
870 throw new WrongThreadException();
871 return Thread.ofPlatform().daemon().start(() -> {
872 try {
873 // wait for target thread to park
874 while (!isWaiting(target)) {
875 Thread.sleep(20);
876 }
877
878 // if the target thread is parked in write then we nudge it a few times
879 // to avoid wakeup with some bytes written
880 if (writing) {
881 for (int i = 0; i < 3; i++) {
882 LockSupport.unpark(target);
883 while (!isWaiting(target)) {
884 Thread.sleep(20);
885 }
886 }
887 }
888
889 task.run();
890
891 } catch (Exception e) {
892 e.printStackTrace();
893 }
894 });
895 }
896
897 private static Thread runAfterParkedAsync(ThrowingRunnable task) {
898 return runAfterParkedAsync(task, false);
899 }
900
901 /**
902 * Return true if the given Thread is parked.
903 */
904 private static boolean isWaiting(Thread target) {
905 Thread.State state = target.getState();
906 assertNotEquals(Thread.State.TERMINATED, state);
907 return (state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING);
908 }
909 }