1 /*
2 * Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24 /*
25 * @test id=default
26 * @bug 8284161
27 * @summary Test virtual threads doing blocking I/O on NIO channels
28 * @library /test/lib
29 * @run junit/timeout=480 BlockingChannelOps
30 */
31
32 /*
33 * @test id=poller-modes
34 * @requires (os.family == "linux") | (os.family == "mac")
35 * @library /test/lib
36 * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 BlockingChannelOps
37 * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 BlockingChannelOps
38 */
39
40 /*
41 * @test id=no-vmcontinuations
42 * @requires vm.continuations
43 * @library /test/lib
44 * @run junit/othervm/timeout=480 -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
45 */
46
47 import java.io.Closeable;
48 import java.io.IOException;
49 import java.net.DatagramPacket;
50 import java.net.InetAddress;
51 import java.net.InetSocketAddress;
52 import java.net.Socket;
53 import java.net.SocketAddress;
54 import java.net.SocketException;
55 import java.nio.ByteBuffer;
56 import java.nio.channels.AsynchronousCloseException;
57 import java.nio.channels.ClosedByInterruptException;
58 import java.nio.channels.ClosedChannelException;
59 import java.nio.channels.DatagramChannel;
60 import java.nio.channels.Pipe;
61 import java.nio.channels.ReadableByteChannel;
62 import java.nio.channels.ServerSocketChannel;
63 import java.nio.channels.SocketChannel;
64 import java.nio.channels.WritableByteChannel;
65 import java.util.concurrent.locks.LockSupport;
66
67 import jdk.test.lib.thread.VThreadRunner;
68 import org.junit.jupiter.api.Test;
69 import static org.junit.jupiter.api.Assertions.*;
70
71 class BlockingChannelOps {
72
73 /**
74 * SocketChannel read/write, no blocking.
75 */
76 @Test
77 void testSocketChannelReadWrite1() throws Exception {
78 VThreadRunner.run(() -> {
79 try (var connection = new Connection()) {
80 SocketChannel sc1 = connection.channel1();
81 SocketChannel sc2 = connection.channel2();
82
83 // write to sc1
84 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
85 int n = sc1.write(bb);
86 assertTrue(n > 0);
87
88 // read from sc2 should not block
89 bb = ByteBuffer.allocate(10);
90 n = sc2.read(bb);
91 assertTrue(n > 0);
92 assertTrue(bb.get(0) == 'X');
93 }
94 });
95 }
96
97 /**
98 * Virtual thread blocks in SocketChannel read.
99 */
100 @Test
101 void testSocketChannelRead() throws Exception {
102 VThreadRunner.run(() -> {
103 try (var connection = new Connection()) {
104 SocketChannel sc1 = connection.channel1();
105 SocketChannel sc2 = connection.channel2();
106
107 // write to sc1 when current thread blocks in sc2.read
108 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
109 runAfterParkedAsync(() -> sc1.write(bb1));
110
111 // read from sc2 should block
112 ByteBuffer bb2 = ByteBuffer.allocate(10);
113 int n = sc2.read(bb2);
114 assertTrue(n > 0);
115 assertTrue(bb2.get(0) == 'X');
116 }
117 });
118 }
119
120 /**
121 * Virtual thread blocks in SocketChannel write.
122 */
123 @Test
124 void testSocketChannelWrite() throws Exception {
125 VThreadRunner.run(() -> {
126 try (var connection = new Connection()) {
127 SocketChannel sc1 = connection.channel1();
128 SocketChannel sc2 = connection.channel2();
129
130 // read from sc2 to EOF when current thread blocks in sc1.write
131 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
132
133 // write to sc1 should block
134 ByteBuffer bb = ByteBuffer.allocate(100*1024);
135 for (int i=0; i<1000; i++) {
136 int n = sc1.write(bb);
137 assertTrue(n > 0);
138 bb.clear();
139 }
140 sc1.close();
141
142 // wait for reader to finish
143 reader.join();
144 }
145 });
146 }
147
148 /**
149 * SocketChannel close while virtual thread blocked in read.
150 */
151 @Test
152 void testSocketChannelReadAsyncClose() throws Exception {
153 VThreadRunner.run(() -> {
154 try (var connection = new Connection()) {
155 SocketChannel sc = connection.channel1();
156 runAfterParkedAsync(sc::close);
157 try {
158 int n = sc.read(ByteBuffer.allocate(100));
159 fail("read returned " + n);
160 } catch (AsynchronousCloseException expected) { }
161 }
162 });
163 }
164
165 /**
166 * SocketChannel shutdownInput while virtual thread blocked in read.
167 */
168 @Test
169 void testSocketChannelReadAsyncShutdownInput() throws Exception {
170 VThreadRunner.run(() -> {
171 try (var connection = new Connection()) {
172 SocketChannel sc = connection.channel1();
173 runAfterParkedAsync(sc::shutdownInput);
174 int n = sc.read(ByteBuffer.allocate(100));
175 assertEquals(-1, n);
176 assertTrue(sc.isOpen());
177 }
178 });
179 }
180
181 /**
182 * Virtual thread interrupted while blocked in SocketChannel read.
183 */
184 @Test
185 void testSocketChannelReadInterrupt() throws Exception {
186 VThreadRunner.run(() -> {
187 try (var connection = new Connection()) {
188 SocketChannel sc = connection.channel1();
189
190 // interrupt current thread when it blocks in read
191 Thread thisThread = Thread.currentThread();
192 runAfterParkedAsync(thisThread::interrupt);
193
194 try {
195 int n = sc.read(ByteBuffer.allocate(100));
196 fail("read returned " + n);
197 } catch (ClosedByInterruptException expected) {
198 assertTrue(Thread.interrupted());
199 }
200 }
201 });
202 }
203
204 /**
205 * SocketChannel close while virtual thread blocked in write.
206 */
207 @Test
208 void testSocketChannelWriteAsyncClose() throws Exception {
209 VThreadRunner.run(() -> {
210 boolean done = false;
211 while (!done) {
212 try (var connection = new Connection()) {
213 SocketChannel sc = connection.channel1();
214
215 // close sc when current thread blocks in write
216 runAfterParkedAsync(sc::close, true);
217
218 // write until channel is closed
219 try {
220 ByteBuffer bb = ByteBuffer.allocate(100*1024);
221 for (;;) {
222 int n = sc.write(bb);
223 assertTrue(n > 0);
224 bb.clear();
225 }
226 } catch (AsynchronousCloseException expected) {
227 // closed when blocked in write
228 done = true;
229 } catch (ClosedChannelException e) {
230 // closed but not blocked in write, need to retry test
231 System.err.format("%s, need to retry!%n", e);
232 }
233 }
234 }
235 });
236 }
237
238
239 /**
240 * SocketChannel shutdownOutput while virtual thread blocked in write.
241 */
242 @Test
243 void testSocketChannelWriteAsyncShutdownOutput() throws Exception {
244 VThreadRunner.run(() -> {
245 try (var connection = new Connection()) {
246 SocketChannel sc = connection.channel1();
247
248 // shutdown output when current thread blocks in write
249 runAfterParkedAsync(sc::shutdownOutput);
250 try {
251 ByteBuffer bb = ByteBuffer.allocate(100*1024);
252 for (;;) {
253 int n = sc.write(bb);
254 assertTrue(n > 0);
255 bb.clear();
256 }
257 } catch (ClosedChannelException e) {
258 // expected
259 }
260 assertTrue(sc.isOpen());
261 }
262 });
263 }
264
265 /**
266 * Virtual thread interrupted while blocked in SocketChannel write.
267 */
268 @Test
269 void testSocketChannelWriteInterrupt() throws Exception {
270 VThreadRunner.run(() -> {
271 boolean done = false;
272 while (!done) {
273 try (var connection = new Connection()) {
274 SocketChannel sc = connection.channel1();
275
276 // interrupt current thread when it blocks in write
277 Thread thisThread = Thread.currentThread();
278 runAfterParkedAsync(thisThread::interrupt, true);
279
280 // write until channel is closed
281 try {
282 ByteBuffer bb = ByteBuffer.allocate(100*1024);
283 for (;;) {
284 int n = sc.write(bb);
285 assertTrue(n > 0);
286 bb.clear();
287 }
288 } catch (ClosedByInterruptException e) {
289 // closed when blocked in write
290 assertTrue(Thread.interrupted());
291 done = true;
292 } catch (ClosedChannelException e) {
293 // closed but not blocked in write, need to retry test
294 System.err.format("%s, need to retry!%n", e);
295 }
296 }
297 }
298 });
299 }
300
301 /**
302 * Virtual thread blocks in SocketChannel adaptor read.
303 */
304 @Test
305 void testSocketAdaptorRead1() throws Exception {
306 testSocketAdaptorRead(0);
307 }
308
309 /**
310 * Virtual thread blocks in SocketChannel adaptor read with timeout.
311 */
312 @Test
313 void testSocketAdaptorRead2() throws Exception {
314 testSocketAdaptorRead(60_000);
315 }
316
317 private void testSocketAdaptorRead(int timeout) throws Exception {
318 VThreadRunner.run(() -> {
319 try (var connection = new Connection()) {
320 SocketChannel sc1 = connection.channel1();
321 SocketChannel sc2 = connection.channel2();
322
323 // write to sc1 when currnet thread blocks reading from sc2
324 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
325 runAfterParkedAsync(() -> sc1.write(bb));
326
327 // read from sc2 should block
328 byte[] array = new byte[100];
329 if (timeout > 0)
330 sc2.socket().setSoTimeout(timeout);
331 int n = sc2.socket().getInputStream().read(array);
332 assertTrue(n > 0);
333 assertTrue(array[0] == 'X');
334 }
335 });
336 }
337
338 /**
339 * ServerSocketChannel accept, no blocking.
340 */
341 @Test
342 void testServerSocketChannelAccept1() throws Exception {
343 VThreadRunner.run(() -> {
344 try (var ssc = ServerSocketChannel.open()) {
345 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
346 var sc1 = SocketChannel.open(ssc.getLocalAddress());
347 // accept should not block
348 var sc2 = ssc.accept();
349 sc1.close();
350 sc2.close();
351 }
352 });
353 }
354
355 /**
356 * Virtual thread blocks in ServerSocketChannel accept.
357 */
358 @Test
359 void testServerSocketChannelAccept2() throws Exception {
360 VThreadRunner.run(() -> {
361 try (var ssc = ServerSocketChannel.open()) {
362 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
363 var sc1 = SocketChannel.open();
364
365 // connect when current thread when it blocks in accept
366 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
367
368 // accept should block
369 var sc2 = ssc.accept();
370 sc1.close();
371 sc2.close();
372 }
373 });
374 }
375
376 /**
377 * SeverSocketChannel close while virtual thread blocked in accept.
378 */
379 @Test
380 void testServerSocketChannelAcceptAsyncClose() throws Exception {
381 VThreadRunner.run(() -> {
382 try (var ssc = ServerSocketChannel.open()) {
383 InetAddress lh = InetAddress.getLoopbackAddress();
384 ssc.bind(new InetSocketAddress(lh, 0));
385 runAfterParkedAsync(ssc::close);
386 try {
387 SocketChannel sc = ssc.accept();
388 sc.close();
389 fail("connection accepted???");
390 } catch (AsynchronousCloseException expected) { }
391 }
392 });
393 }
394
395 /**
396 * Virtual thread interrupted while blocked in ServerSocketChannel accept.
397 */
398 @Test
399 void testServerSocketChannelAcceptInterrupt() throws Exception {
400 VThreadRunner.run(() -> {
401 try (var ssc = ServerSocketChannel.open()) {
402 InetAddress lh = InetAddress.getLoopbackAddress();
403 ssc.bind(new InetSocketAddress(lh, 0));
404
405 // interrupt current thread when it blocks in accept
406 Thread thisThread = Thread.currentThread();
407 runAfterParkedAsync(thisThread::interrupt);
408
409 try {
410 SocketChannel sc = ssc.accept();
411 sc.close();
412 fail("connection accepted???");
413 } catch (ClosedByInterruptException expected) {
414 assertTrue(Thread.interrupted());
415 }
416 }
417 });
418 }
419
420 /**
421 * Virtual thread blocks in ServerSocketChannel adaptor accept.
422 */
423 @Test
424 void testSocketChannelAdaptorAccept1() throws Exception {
425 testSocketChannelAdaptorAccept(0);
426 }
427
428 /**
429 * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
430 */
431 @Test
432 void testSocketChannelAdaptorAccept2() throws Exception {
433 testSocketChannelAdaptorAccept(60_000);
434 }
435
436 private void testSocketChannelAdaptorAccept(int timeout) throws Exception {
437 VThreadRunner.run(() -> {
438 try (var ssc = ServerSocketChannel.open()) {
439 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
440 var sc = SocketChannel.open();
441
442 // interrupt current thread when it blocks in accept
443 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
444
445 // accept should block
446 if (timeout > 0)
447 ssc.socket().setSoTimeout(timeout);
448 Socket s = ssc.socket().accept();
449 sc.close();
450 s.close();
451 }
452 });
453 }
454
455 /**
456 * DatagramChannel receive/send, no blocking.
457 */
458 @Test
459 void testDatagramChannelSendReceive1() throws Exception {
460 VThreadRunner.run(() -> {
461 try (DatagramChannel dc1 = DatagramChannel.open();
462 DatagramChannel dc2 = DatagramChannel.open()) {
463
464 InetAddress lh = InetAddress.getLoopbackAddress();
465 dc2.bind(new InetSocketAddress(lh, 0));
466
467 // send should not block
468 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
469 int n = dc1.send(bb, dc2.getLocalAddress());
470 assertTrue(n > 0);
471
472 // receive should not block
473 bb = ByteBuffer.allocate(10);
474 dc2.receive(bb);
475 assertTrue(bb.get(0) == 'X');
476 }
477 });
478 }
479
480 /**
481 * Virtual thread blocks in DatagramChannel receive.
482 */
483 @Test
484 void testDatagramChannelSendReceive2() throws Exception {
485 VThreadRunner.run(() -> {
486 try (DatagramChannel dc1 = DatagramChannel.open();
487 DatagramChannel dc2 = DatagramChannel.open()) {
488
489 InetAddress lh = InetAddress.getLoopbackAddress();
490 dc2.bind(new InetSocketAddress(lh, 0));
491
492 // send from dc1 when current thread blocked in dc2.receive
493 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
494 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
495
496 // read from dc2 should block
497 ByteBuffer bb2 = ByteBuffer.allocate(10);
498 dc2.receive(bb2);
499 assertTrue(bb2.get(0) == 'X');
500 }
501 });
502 }
503
504 /**
505 * DatagramChannel close while virtual thread blocked in receive.
506 */
507 @Test
508 void testDatagramChannelReceiveAsyncClose() throws Exception {
509 VThreadRunner.run(() -> {
510 try (DatagramChannel dc = DatagramChannel.open()) {
511 InetAddress lh = InetAddress.getLoopbackAddress();
512 dc.bind(new InetSocketAddress(lh, 0));
513 runAfterParkedAsync(dc::close);
514 try {
515 dc.receive(ByteBuffer.allocate(100));
516 fail("receive returned");
517 } catch (AsynchronousCloseException expected) { }
518 }
519 });
520 }
521
522 /**
523 * Virtual thread interrupted while blocked in DatagramChannel receive.
524 */
525 @Test
526 void testDatagramChannelReceiveInterrupt() throws Exception {
527 VThreadRunner.run(() -> {
528 try (DatagramChannel dc = DatagramChannel.open()) {
529 InetAddress lh = InetAddress.getLoopbackAddress();
530 dc.bind(new InetSocketAddress(lh, 0));
531
532 // interrupt current thread when it blocks in receive
533 Thread thisThread = Thread.currentThread();
534 runAfterParkedAsync(thisThread::interrupt);
535
536 try {
537 dc.receive(ByteBuffer.allocate(100));
538 fail("receive returned");
539 } catch (ClosedByInterruptException expected) {
540 assertTrue(Thread.interrupted());
541 }
542 }
543 });
544 }
545
546 /**
547 * Virtual thread blocks in DatagramSocket adaptor receive.
548 */
549 @Test
550 void testDatagramSocketAdaptorReceive1() throws Exception {
551 testDatagramSocketAdaptorReceive(0);
552 }
553
554 /**
555 * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
556 */
557 @Test
558 void testDatagramSocketAdaptorReceive2() throws Exception {
559 testDatagramSocketAdaptorReceive(60_000);
560 }
561
562 private void testDatagramSocketAdaptorReceive(int timeout) throws Exception {
563 VThreadRunner.run(() -> {
564 try (DatagramChannel dc1 = DatagramChannel.open();
565 DatagramChannel dc2 = DatagramChannel.open()) {
566
567 InetAddress lh = InetAddress.getLoopbackAddress();
568 dc2.bind(new InetSocketAddress(lh, 0));
569
570 // send from dc1 when current thread blocks in dc2 receive
571 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
572 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
573
574 // receive should block
575 byte[] array = new byte[100];
576 DatagramPacket p = new DatagramPacket(array, 0, array.length);
577 if (timeout > 0)
578 dc2.socket().setSoTimeout(timeout);
579 dc2.socket().receive(p);
580 assertTrue(p.getLength() == 3 && array[0] == 'X');
581 }
582 });
583 }
584
585 /**
586 * DatagramChannel close while virtual thread blocked in adaptor receive.
587 */
588 @Test
589 void testDatagramSocketAdaptorReceiveAsyncClose1() throws Exception {
590 testDatagramSocketAdaptorReceiveAsyncClose(0);
591 }
592
593 /**
594 * DatagramChannel close while virtual thread blocked in adaptor receive
595 * with timeout.
596 */
597 @Test
598 void testDatagramSocketAdaptorReceiveAsyncClose2() throws Exception {
599 testDatagramSocketAdaptorReceiveAsyncClose(60_1000);
600 }
601
602 private void testDatagramSocketAdaptorReceiveAsyncClose(int timeout) throws Exception {
603 VThreadRunner.run(() -> {
604 try (DatagramChannel dc = DatagramChannel.open()) {
605 InetAddress lh = InetAddress.getLoopbackAddress();
606 dc.bind(new InetSocketAddress(lh, 0));
607
608 byte[] array = new byte[100];
609 DatagramPacket p = new DatagramPacket(array, 0, array.length);
610 if (timeout > 0)
611 dc.socket().setSoTimeout(timeout);
612
613 // close channel/socket when current thread blocks in receive
614 runAfterParkedAsync(dc::close);
615
616 assertThrows(SocketException.class, () -> dc.socket().receive(p));
617 }
618 });
619 }
620
621 /**
622 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
623 */
624 @Test
625 void testDatagramSocketAdaptorReceiveInterrupt1() throws Exception {
626 testDatagramSocketAdaptorReceiveInterrupt(0);
627 }
628
629 /**
630 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
631 * with timeout.
632 */
633 @Test
634 void testDatagramSocketAdaptorReceiveInterrupt2() throws Exception {
635 testDatagramSocketAdaptorReceiveInterrupt(60_1000);
636 }
637
638 private void testDatagramSocketAdaptorReceiveInterrupt(int timeout) throws Exception {
639 VThreadRunner.run(() -> {
640 try (DatagramChannel dc = DatagramChannel.open()) {
641 InetAddress lh = InetAddress.getLoopbackAddress();
642 dc.bind(new InetSocketAddress(lh, 0));
643
644 byte[] array = new byte[100];
645 DatagramPacket p = new DatagramPacket(array, 0, array.length);
646 if (timeout > 0)
647 dc.socket().setSoTimeout(timeout);
648
649 // interrupt current thread when it blocks in receive
650 Thread thisThread = Thread.currentThread();
651 runAfterParkedAsync(thisThread::interrupt);
652
653 try {
654 dc.socket().receive(p);
655 fail();
656 } catch (ClosedByInterruptException expected) {
657 assertTrue(Thread.interrupted());
658 }
659 }
660 });
661 }
662
663 /**
664 * Pipe read/write, no blocking.
665 */
666 @Test
667 void testPipeReadWrite1() throws Exception {
668 VThreadRunner.run(() -> {
669 Pipe p = Pipe.open();
670 try (Pipe.SinkChannel sink = p.sink();
671 Pipe.SourceChannel source = p.source()) {
672
673 // write should not block
674 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
675 int n = sink.write(bb);
676 assertTrue(n > 0);
677
678 // read should not block
679 bb = ByteBuffer.allocate(10);
680 n = source.read(bb);
681 assertTrue(n > 0);
682 assertTrue(bb.get(0) == 'X');
683 }
684 });
685 }
686
687 /**
688 * Virtual thread blocks in Pipe.SourceChannel read.
689 */
690 @Test
691 void testPipeReadWrite2() throws Exception {
692 VThreadRunner.run(() -> {
693 Pipe p = Pipe.open();
694 try (Pipe.SinkChannel sink = p.sink();
695 Pipe.SourceChannel source = p.source()) {
696
697 // write from sink when current thread blocks reading from source
698 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
699 runAfterParkedAsync(() -> sink.write(bb1));
700
701 // read should block
702 ByteBuffer bb2 = ByteBuffer.allocate(10);
703 int n = source.read(bb2);
704 assertTrue(n > 0);
705 assertTrue(bb2.get(0) == 'X');
706 }
707 });
708 }
709
710 /**
711 * Virtual thread blocks in Pipe.SinkChannel write.
712 */
713 @Test
714 void testPipeReadWrite3() throws Exception {
715 VThreadRunner.run(() -> {
716 Pipe p = Pipe.open();
717 try (Pipe.SinkChannel sink = p.sink();
718 Pipe.SourceChannel source = p.source()) {
719
720 // read from source to EOF when current thread blocking in write
721 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
722
723 // write to sink should block
724 ByteBuffer bb = ByteBuffer.allocate(100*1024);
725 for (int i=0; i<1000; i++) {
726 int n = sink.write(bb);
727 assertTrue(n > 0);
728 bb.clear();
729 }
730 sink.close();
731
732 // wait for reader to finish
733 reader.join();
734 }
735 });
736 }
737
738 /**
739 * Pipe.SourceChannel close while virtual thread blocked in read.
740 */
741 @Test
742 void testPipeReadAsyncClose() throws Exception {
743 VThreadRunner.run(() -> {
744 Pipe p = Pipe.open();
745 try (Pipe.SinkChannel sink = p.sink();
746 Pipe.SourceChannel source = p.source()) {
747 runAfterParkedAsync(source::close);
748 try {
749 int n = source.read(ByteBuffer.allocate(100));
750 fail("read returned " + n);
751 } catch (AsynchronousCloseException expected) { }
752 }
753 });
754 }
755
756 /**
757 * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
758 */
759 @Test
760 void testPipeReadInterrupt() throws Exception {
761 VThreadRunner.run(() -> {
762 Pipe p = Pipe.open();
763 try (Pipe.SinkChannel sink = p.sink();
764 Pipe.SourceChannel source = p.source()) {
765
766 // interrupt current thread when it blocks reading from source
767 Thread thisThread = Thread.currentThread();
768 runAfterParkedAsync(thisThread::interrupt);
769
770 try {
771 int n = source.read(ByteBuffer.allocate(100));
772 fail("read returned " + n);
773 } catch (ClosedByInterruptException expected) {
774 assertTrue(Thread.interrupted());
775 }
776 }
777 });
778 }
779
780 /**
781 * Pipe.SinkChannel close while virtual thread blocked in write.
782 */
783 @Test
784 void testPipeWriteAsyncClose() throws Exception {
785 VThreadRunner.run(() -> {
786 boolean done = false;
787 while (!done) {
788 Pipe p = Pipe.open();
789 try (Pipe.SinkChannel sink = p.sink();
790 Pipe.SourceChannel source = p.source()) {
791
792 // close sink when current thread blocks in write
793 runAfterParkedAsync(sink::close, true);
794
795 // write until channel is closed
796 try {
797 ByteBuffer bb = ByteBuffer.allocate(100*1024);
798 for (;;) {
799 int n = sink.write(bb);
800 assertTrue(n > 0);
801 bb.clear();
802 }
803 } catch (AsynchronousCloseException e) {
804 // closed when blocked in write
805 done = true;
806 } catch (ClosedChannelException e) {
807 // closed but not blocked in write, need to retry test
808 System.err.format("%s, need to retry!%n", e);
809 }
810 }
811 }
812 });
813 }
814
815 /**
816 * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
817 */
818 @Test
819 void testPipeWriteInterrupt() throws Exception {
820 VThreadRunner.run(() -> {
821 boolean done = false;
822 while (!done) {
823 Pipe p = Pipe.open();
824 try (Pipe.SinkChannel sink = p.sink();
825 Pipe.SourceChannel source = p.source()) {
826
827 // interrupt current thread when it blocks in write
828 Thread thisThread = Thread.currentThread();
829 runAfterParkedAsync(thisThread::interrupt, true);
830
831 // write until channel is closed
832 try {
833 ByteBuffer bb = ByteBuffer.allocate(100*1024);
834 for (;;) {
835 int n = sink.write(bb);
836 assertTrue(n > 0);
837 bb.clear();
838 }
839 } catch (ClosedByInterruptException expected) {
840 // closed when blocked in write
841 assertTrue(Thread.interrupted());
842 done = true;
843 } catch (ClosedChannelException e) {
844 // closed but not blocked in write, need to retry test
845 System.err.format("%s, need to retry!%n", e);
846 }
847 }
848 }
849 });
850 }
851
852 /**
853 * Creates a loopback connection
854 */
855 static class Connection implements Closeable {
856 private final SocketChannel sc1;
857 private final SocketChannel sc2;
858 Connection() throws IOException {
859 var lh = InetAddress.getLoopbackAddress();
860 try (var listener = ServerSocketChannel.open()) {
861 listener.bind(new InetSocketAddress(lh, 0));
862 SocketChannel sc1 = SocketChannel.open();
863 SocketChannel sc2 = null;
864 try {
865 sc1.socket().connect(listener.getLocalAddress());
866 sc2 = listener.accept();
867 } catch (IOException ioe) {
868 sc1.close();
869 throw ioe;
870 }
871 this.sc1 = sc1;
872 this.sc2 = sc2;
873 }
874 }
875 SocketChannel channel1() {
876 return sc1;
877 }
878 SocketChannel channel2() {
879 return sc2;
880 }
881 @Override
882 public void close() throws IOException {
883 sc1.close();
884 sc2.close();
885 }
886 }
887
888 /**
889 * Read from a channel until all bytes have been read or an I/O error occurs.
890 */
891 static void readToEOF(ReadableByteChannel rbc) throws IOException {
892 ByteBuffer bb = ByteBuffer.allocate(16*1024);
893 int n;
894 while ((n = rbc.read(bb)) > 0) {
895 bb.clear();
896 }
897 }
898
899 @FunctionalInterface
900 interface ThrowingRunnable {
901 void run() throws Exception;
902 }
903
904 /**
905 * Runs the given task asynchronously after the current virtual thread parks.
906 * @param writing if the thread will block in write
907 * @return the thread started to run the task
908 */
909 private static Thread runAfterParkedAsync(ThrowingRunnable task, boolean writing) {
910 Thread target = Thread.currentThread();
911 if (!target.isVirtual())
912 throw new WrongThreadException();
913 return Thread.ofPlatform().daemon().start(() -> {
914 try {
915 // wait for target thread to park
916 while (!isWaiting(target)) {
917 Thread.sleep(20);
918 }
919
920 // if the target thread is parked in write then we nudge it a few times
921 // to avoid wakeup with some bytes written
922 if (writing) {
923 for (int i = 0; i < 3; i++) {
924 LockSupport.unpark(target);
925 while (!isWaiting(target)) {
926 Thread.sleep(20);
927 }
928 }
929 }
930
931 task.run();
932
933 } catch (Exception e) {
934 e.printStackTrace();
935 }
936 });
937 }
938
939 private static Thread runAfterParkedAsync(ThrowingRunnable task) {
940 return runAfterParkedAsync(task, false);
941 }
942
943 /**
944 * Return true if the given Thread is parked.
945 */
946 private static boolean isWaiting(Thread target) {
947 Thread.State state = target.getState();
948 assertNotEquals(Thread.State.TERMINATED, state);
949 return (state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING);
950 }
951 }