1 /*
2 * Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24 /*
25 * @test id=default
26 * @bug 8284161
27 * @summary Test virtual threads doing blocking I/O on NIO channels
28 * @library /test/lib
29 * @run junit/timeout=480 BlockingChannelOps
30 */
31
32 /*
33 * @test id=poller-modes
34 * @requires (os.family == "linux") | (os.family == "mac")
35 * @library /test/lib
36 * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 BlockingChannelOps
37 * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 BlockingChannelOps
38 * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 BlockingChannelOps
39 */
40
41 /*
42 * @test id=no-vmcontinuations
43 * @requires vm.continuations
44 * @library /test/lib
45 * @run junit/othervm/timeout=480 -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
46 */
47
48 import java.io.Closeable;
49 import java.io.IOException;
50 import java.net.DatagramPacket;
51 import java.net.InetAddress;
52 import java.net.InetSocketAddress;
53 import java.net.Socket;
54 import java.net.SocketAddress;
55 import java.net.SocketException;
56 import java.nio.ByteBuffer;
57 import java.nio.channels.AsynchronousCloseException;
58 import java.nio.channels.ClosedByInterruptException;
59 import java.nio.channels.ClosedChannelException;
60 import java.nio.channels.DatagramChannel;
61 import java.nio.channels.Pipe;
62 import java.nio.channels.ReadableByteChannel;
63 import java.nio.channels.ServerSocketChannel;
64 import java.nio.channels.SocketChannel;
65 import java.nio.channels.WritableByteChannel;
66 import java.util.concurrent.locks.LockSupport;
67
68 import jdk.test.lib.thread.VThreadRunner;
69 import org.junit.jupiter.api.Test;
70 import org.junit.jupiter.params.ParameterizedTest;
71 import org.junit.jupiter.params.provider.ValueSource;
72 import static org.junit.jupiter.api.Assertions.*;
73
74 class BlockingChannelOps {
75
76 /**
77 * SocketChannel read/write, no blocking.
78 */
79 @Test
80 void testSocketChannelReadWrite1() throws Exception {
81 VThreadRunner.run(() -> {
82 try (var connection = new Connection()) {
83 SocketChannel sc1 = connection.channel1();
84 SocketChannel sc2 = connection.channel2();
85
86 // write to sc1
87 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
88 int n = sc1.write(bb);
89 assertTrue(n > 0);
90
91 // read from sc2 should not block
92 bb = ByteBuffer.allocate(10);
93 n = sc2.read(bb);
94 assertTrue(n > 0);
95 assertTrue(bb.get(0) == 'X');
96 }
97 });
98 }
99
100 /**
101 * Virtual thread blocks in SocketChannel read.
102 */
103 @Test
104 void testSocketChannelRead() throws Exception {
105 VThreadRunner.run(() -> {
106 try (var connection = new Connection()) {
107 SocketChannel sc1 = connection.channel1();
108 SocketChannel sc2 = connection.channel2();
109
110 // write to sc1 when current thread blocks in sc2.read
111 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
112 runAfterParkedAsync(() -> sc1.write(bb1));
113
114 // read from sc2 should block
115 ByteBuffer bb2 = ByteBuffer.allocate(10);
116 int n = sc2.read(bb2);
117 assertTrue(n > 0);
118 assertTrue(bb2.get(0) == 'X');
119 }
120 });
121 }
122
123 /**
124 * Virtual thread blocks in SocketChannel write.
125 */
126 @Test
127 void testSocketChannelWrite() throws Exception {
128 VThreadRunner.run(() -> {
129 try (var connection = new Connection()) {
130 SocketChannel sc1 = connection.channel1();
131 SocketChannel sc2 = connection.channel2();
132
133 // read from sc2 to EOF when current thread blocks in sc1.write
134 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
135
136 // write to sc1 should block
137 ByteBuffer bb = ByteBuffer.allocate(100*1024);
138 for (int i=0; i<1000; i++) {
139 int n = sc1.write(bb);
140 assertTrue(n > 0);
141 bb.clear();
142 }
143 sc1.close();
144
145 // wait for reader to finish
146 reader.join();
147 }
148 });
149 }
150
151 /**
152 * SocketChannel close while virtual thread blocked in read.
153 */
154 @Test
155 void testSocketChannelReadAsyncClose() throws Exception {
156 VThreadRunner.run(() -> {
157 try (var connection = new Connection()) {
158 SocketChannel sc = connection.channel1();
159 runAfterParkedAsync(sc::close);
160 try {
161 int n = sc.read(ByteBuffer.allocate(100));
162 fail("read returned " + n);
163 } catch (AsynchronousCloseException expected) { }
164 }
165 });
166 }
167
168 /**
169 * SocketChannel shutdownInput while virtual thread blocked in read.
170 */
171 @Test
172 void testSocketChannelReadAsyncShutdownInput() throws Exception {
173 VThreadRunner.run(() -> {
174 try (var connection = new Connection()) {
175 SocketChannel sc = connection.channel1();
176 runAfterParkedAsync(sc::shutdownInput);
177 int n = sc.read(ByteBuffer.allocate(100));
178 assertEquals(-1, n);
179 assertTrue(sc.isOpen());
180 }
181 });
182 }
183
184 /**
185 * Virtual thread interrupted while blocked in SocketChannel read.
186 */
187 @Test
188 void testSocketChannelReadInterrupt() throws Exception {
189 VThreadRunner.run(() -> {
190 try (var connection = new Connection()) {
191 SocketChannel sc = connection.channel1();
192
193 // interrupt current thread when it blocks in read
194 Thread thisThread = Thread.currentThread();
195 runAfterParkedAsync(thisThread::interrupt);
196
197 try {
198 int n = sc.read(ByteBuffer.allocate(100));
199 fail("read returned " + n);
200 } catch (ClosedByInterruptException expected) {
201 assertTrue(Thread.interrupted());
202 }
203 }
204 });
205 }
206
207 /**
208 * SocketChannel close while virtual thread blocked in write.
209 */
210 @Test
211 void testSocketChannelWriteAsyncClose() throws Exception {
212 VThreadRunner.run(() -> {
213 boolean done = false;
214 while (!done) {
215 try (var connection = new Connection()) {
216 SocketChannel sc = connection.channel1();
217
218 // close sc when current thread blocks in write
219 runAfterParkedAsync(sc::close, true);
220
221 // write until channel is closed
222 try {
223 ByteBuffer bb = ByteBuffer.allocate(100*1024);
224 for (;;) {
225 int n = sc.write(bb);
226 assertTrue(n > 0);
227 bb.clear();
228 }
229 } catch (AsynchronousCloseException expected) {
230 // closed when blocked in write
231 done = true;
232 } catch (ClosedChannelException e) {
233 // closed but not blocked in write, need to retry test
234 System.err.format("%s, need to retry!%n", e);
235 }
236 }
237 }
238 });
239 }
240
241
242 /**
243 * SocketChannel shutdownOutput while virtual thread blocked in write.
244 */
245 @Test
246 void testSocketChannelWriteAsyncShutdownOutput() throws Exception {
247 VThreadRunner.run(() -> {
248 try (var connection = new Connection()) {
249 SocketChannel sc = connection.channel1();
250
251 // shutdown output when current thread blocks in write
252 runAfterParkedAsync(sc::shutdownOutput);
253 try {
254 ByteBuffer bb = ByteBuffer.allocate(100*1024);
255 for (;;) {
256 int n = sc.write(bb);
257 assertTrue(n > 0);
258 bb.clear();
259 }
260 } catch (ClosedChannelException e) {
261 // expected
262 }
263 assertTrue(sc.isOpen());
264 }
265 });
266 }
267
268 /**
269 * Virtual thread interrupted while blocked in SocketChannel write.
270 */
271 @Test
272 void testSocketChannelWriteInterrupt() throws Exception {
273 VThreadRunner.run(() -> {
274 boolean done = false;
275 while (!done) {
276 try (var connection = new Connection()) {
277 SocketChannel sc = connection.channel1();
278
279 // interrupt current thread when it blocks in write
280 Thread thisThread = Thread.currentThread();
281 runAfterParkedAsync(thisThread::interrupt, true);
282
283 // write until channel is closed
284 try {
285 ByteBuffer bb = ByteBuffer.allocate(100*1024);
286 for (;;) {
287 int n = sc.write(bb);
288 assertTrue(n > 0);
289 bb.clear();
290 }
291 } catch (ClosedByInterruptException e) {
292 // closed when blocked in write
293 assertTrue(Thread.interrupted());
294 done = true;
295 } catch (ClosedChannelException e) {
296 // closed but not blocked in write, need to retry test
297 System.err.format("%s, need to retry!%n", e);
298 }
299 }
300 }
301 });
302 }
303
304 /**
305 * Virtual thread blocks in SocketChannel adaptor read.
306 */
307 @ParameterizedTest
308 @ValueSource(ints = { 0, 60_000 })
309 void testSocketAdaptorRead(int timeout) throws Exception {
310 VThreadRunner.run(() -> {
311 try (var connection = new Connection()) {
312 SocketChannel sc1 = connection.channel1();
313 SocketChannel sc2 = connection.channel2();
314
315 // write to sc1 when currnet thread blocks reading from sc2
316 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
317 runAfterParkedAsync(() -> sc1.write(bb));
318
319 // read from sc2 should block
320 byte[] array = new byte[100];
321 if (timeout > 0)
322 sc2.socket().setSoTimeout(timeout);
323 int n = sc2.socket().getInputStream().read(array);
324 assertTrue(n > 0);
325 assertTrue(array[0] == 'X');
326 }
327 });
328 }
329
330 /**
331 * ServerSocketChannel accept, no blocking.
332 */
333 @Test
334 void testServerSocketChannelAccept1() throws Exception {
335 VThreadRunner.run(() -> {
336 try (var ssc = ServerSocketChannel.open()) {
337 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
338 var sc1 = SocketChannel.open(ssc.getLocalAddress());
339 // accept should not block
340 var sc2 = ssc.accept();
341 sc1.close();
342 sc2.close();
343 }
344 });
345 }
346
347 /**
348 * Virtual thread blocks in ServerSocketChannel accept.
349 */
350 @Test
351 void testServerSocketChannelAccept2() throws Exception {
352 VThreadRunner.run(() -> {
353 try (var ssc = ServerSocketChannel.open()) {
354 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
355 var sc1 = SocketChannel.open();
356
357 // connect when current thread when it blocks in accept
358 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
359
360 // accept should block
361 var sc2 = ssc.accept();
362 sc1.close();
363 sc2.close();
364 }
365 });
366 }
367
368 /**
369 * SeverSocketChannel close while virtual thread blocked in accept.
370 */
371 @Test
372 void testServerSocketChannelAcceptAsyncClose() throws Exception {
373 VThreadRunner.run(() -> {
374 try (var ssc = ServerSocketChannel.open()) {
375 InetAddress lh = InetAddress.getLoopbackAddress();
376 ssc.bind(new InetSocketAddress(lh, 0));
377 runAfterParkedAsync(ssc::close);
378 try {
379 SocketChannel sc = ssc.accept();
380 sc.close();
381 fail("connection accepted???");
382 } catch (AsynchronousCloseException expected) { }
383 }
384 });
385 }
386
387 /**
388 * Virtual thread interrupted while blocked in ServerSocketChannel accept.
389 */
390 @Test
391 void testServerSocketChannelAcceptInterrupt() throws Exception {
392 VThreadRunner.run(() -> {
393 try (var ssc = ServerSocketChannel.open()) {
394 InetAddress lh = InetAddress.getLoopbackAddress();
395 ssc.bind(new InetSocketAddress(lh, 0));
396
397 // interrupt current thread when it blocks in accept
398 Thread thisThread = Thread.currentThread();
399 runAfterParkedAsync(thisThread::interrupt);
400
401 try {
402 SocketChannel sc = ssc.accept();
403 sc.close();
404 fail("connection accepted???");
405 } catch (ClosedByInterruptException expected) {
406 assertTrue(Thread.interrupted());
407 }
408 }
409 });
410 }
411
412 /**
413 * Virtual thread blocks in ServerSocketChannel adaptor accept.
414 */
415 @ParameterizedTest
416 @ValueSource(ints = { 0, 60_000 })
417 void testSocketChannelAdaptorAccept(int timeout) throws Exception {
418 VThreadRunner.run(() -> {
419 try (var ssc = ServerSocketChannel.open()) {
420 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
421 var sc = SocketChannel.open();
422
423 // interrupt current thread when it blocks in accept
424 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
425
426 // accept should block
427 if (timeout > 0)
428 ssc.socket().setSoTimeout(timeout);
429 Socket s = ssc.socket().accept();
430 sc.close();
431 s.close();
432 }
433 });
434 }
435
436 /**
437 * DatagramChannel receive/send, no blocking.
438 */
439 @Test
440 void testDatagramChannelSendReceive1() throws Exception {
441 VThreadRunner.run(() -> {
442 try (DatagramChannel dc1 = DatagramChannel.open();
443 DatagramChannel dc2 = DatagramChannel.open()) {
444
445 InetAddress lh = InetAddress.getLoopbackAddress();
446 dc2.bind(new InetSocketAddress(lh, 0));
447
448 // send should not block
449 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
450 int n = dc1.send(bb, dc2.getLocalAddress());
451 assertTrue(n > 0);
452
453 // receive should not block
454 bb = ByteBuffer.allocate(10);
455 dc2.receive(bb);
456 assertTrue(bb.get(0) == 'X');
457 }
458 });
459 }
460
461 /**
462 * Virtual thread blocks in DatagramChannel receive.
463 */
464 @Test
465 void testDatagramChannelSendReceive2() throws Exception {
466 VThreadRunner.run(() -> {
467 try (DatagramChannel dc1 = DatagramChannel.open();
468 DatagramChannel dc2 = DatagramChannel.open()) {
469
470 InetAddress lh = InetAddress.getLoopbackAddress();
471 dc2.bind(new InetSocketAddress(lh, 0));
472
473 // send from dc1 when current thread blocked in dc2.receive
474 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
475 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
476
477 // read from dc2 should block
478 ByteBuffer bb2 = ByteBuffer.allocate(10);
479 dc2.receive(bb2);
480 assertTrue(bb2.get(0) == 'X');
481 }
482 });
483 }
484
485 /**
486 * DatagramChannel close while virtual thread blocked in receive.
487 */
488 @Test
489 void testDatagramChannelReceiveAsyncClose() throws Exception {
490 VThreadRunner.run(() -> {
491 try (DatagramChannel dc = DatagramChannel.open()) {
492 InetAddress lh = InetAddress.getLoopbackAddress();
493 dc.bind(new InetSocketAddress(lh, 0));
494 runAfterParkedAsync(dc::close);
495 try {
496 dc.receive(ByteBuffer.allocate(100));
497 fail("receive returned");
498 } catch (AsynchronousCloseException expected) { }
499 }
500 });
501 }
502
503 /**
504 * Virtual thread interrupted while blocked in DatagramChannel receive.
505 */
506 @Test
507 void testDatagramChannelReceiveInterrupt() throws Exception {
508 VThreadRunner.run(() -> {
509 try (DatagramChannel dc = DatagramChannel.open()) {
510 InetAddress lh = InetAddress.getLoopbackAddress();
511 dc.bind(new InetSocketAddress(lh, 0));
512
513 // interrupt current thread when it blocks in receive
514 Thread thisThread = Thread.currentThread();
515 runAfterParkedAsync(thisThread::interrupt);
516
517 try {
518 dc.receive(ByteBuffer.allocate(100));
519 fail("receive returned");
520 } catch (ClosedByInterruptException expected) {
521 assertTrue(Thread.interrupted());
522 }
523 }
524 });
525 }
526
527 /**
528 * Virtual thread blocks in DatagramSocket adaptor receive.
529 */
530 @ParameterizedTest
531 @ValueSource(ints = { 0, 60_000 })
532 void testDatagramSocketAdaptorReceive(int timeout) throws Exception {
533 VThreadRunner.run(() -> {
534 try (DatagramChannel dc1 = DatagramChannel.open();
535 DatagramChannel dc2 = DatagramChannel.open()) {
536
537 InetAddress lh = InetAddress.getLoopbackAddress();
538 dc2.bind(new InetSocketAddress(lh, 0));
539
540 // send from dc1 when current thread blocks in dc2 receive
541 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
542 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
543
544 // receive should block
545 byte[] array = new byte[100];
546 DatagramPacket p = new DatagramPacket(array, 0, array.length);
547 if (timeout > 0)
548 dc2.socket().setSoTimeout(timeout);
549 dc2.socket().receive(p);
550 assertTrue(p.getLength() == 3 && array[0] == 'X');
551 }
552 });
553 }
554
555 /**
556 * DatagramChannel close while virtual thread blocked in adaptor receive.
557 */
558 @ParameterizedTest
559 @ValueSource(ints = { 0, 60_000 })
560 void testDatagramSocketAdaptorReceiveAsyncClose(int timeout) throws Exception {
561 VThreadRunner.run(() -> {
562 try (DatagramChannel dc = DatagramChannel.open()) {
563 InetAddress lh = InetAddress.getLoopbackAddress();
564 dc.bind(new InetSocketAddress(lh, 0));
565
566 byte[] array = new byte[100];
567 DatagramPacket p = new DatagramPacket(array, 0, array.length);
568 if (timeout > 0)
569 dc.socket().setSoTimeout(timeout);
570
571 // close channel/socket when current thread blocks in receive
572 runAfterParkedAsync(dc::close);
573
574 assertThrows(SocketException.class, () -> dc.socket().receive(p));
575 }
576 });
577 }
578
579 /**
580 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
581 */
582 @ParameterizedTest
583 @ValueSource(ints = { 0, 60_000 })
584 void testDatagramSocketAdaptorReceiveInterrupt(int timeout) throws Exception {
585 VThreadRunner.run(() -> {
586 try (DatagramChannel dc = DatagramChannel.open()) {
587 InetAddress lh = InetAddress.getLoopbackAddress();
588 dc.bind(new InetSocketAddress(lh, 0));
589
590 byte[] array = new byte[100];
591 DatagramPacket p = new DatagramPacket(array, 0, array.length);
592 if (timeout > 0)
593 dc.socket().setSoTimeout(timeout);
594
595 // interrupt current thread when it blocks in receive
596 Thread thisThread = Thread.currentThread();
597 runAfterParkedAsync(thisThread::interrupt);
598
599 try {
600 dc.socket().receive(p);
601 fail();
602 } catch (ClosedByInterruptException expected) {
603 assertTrue(Thread.interrupted());
604 }
605 }
606 });
607 }
608
609 /**
610 * Pipe read/write, no blocking.
611 */
612 @Test
613 void testPipeReadWrite1() throws Exception {
614 VThreadRunner.run(() -> {
615 Pipe p = Pipe.open();
616 try (Pipe.SinkChannel sink = p.sink();
617 Pipe.SourceChannel source = p.source()) {
618
619 // write should not block
620 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
621 int n = sink.write(bb);
622 assertTrue(n > 0);
623
624 // read should not block
625 bb = ByteBuffer.allocate(10);
626 n = source.read(bb);
627 assertTrue(n > 0);
628 assertTrue(bb.get(0) == 'X');
629 }
630 });
631 }
632
633 /**
634 * Virtual thread blocks in Pipe.SourceChannel read.
635 */
636 @Test
637 void testPipeReadWrite2() throws Exception {
638 VThreadRunner.run(() -> {
639 Pipe p = Pipe.open();
640 try (Pipe.SinkChannel sink = p.sink();
641 Pipe.SourceChannel source = p.source()) {
642
643 // write from sink when current thread blocks reading from source
644 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
645 runAfterParkedAsync(() -> sink.write(bb1));
646
647 // read should block
648 ByteBuffer bb2 = ByteBuffer.allocate(10);
649 int n = source.read(bb2);
650 assertTrue(n > 0);
651 assertTrue(bb2.get(0) == 'X');
652 }
653 });
654 }
655
656 /**
657 * Virtual thread blocks in Pipe.SinkChannel write.
658 */
659 @Test
660 void testPipeReadWrite3() throws Exception {
661 VThreadRunner.run(() -> {
662 Pipe p = Pipe.open();
663 try (Pipe.SinkChannel sink = p.sink();
664 Pipe.SourceChannel source = p.source()) {
665
666 // read from source to EOF when current thread blocking in write
667 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
668
669 // write to sink should block
670 ByteBuffer bb = ByteBuffer.allocate(100*1024);
671 for (int i=0; i<1000; i++) {
672 int n = sink.write(bb);
673 assertTrue(n > 0);
674 bb.clear();
675 }
676 sink.close();
677
678 // wait for reader to finish
679 reader.join();
680 }
681 });
682 }
683
684 /**
685 * Pipe.SourceChannel close while virtual thread blocked in read.
686 */
687 @Test
688 void testPipeReadAsyncClose() throws Exception {
689 VThreadRunner.run(() -> {
690 Pipe p = Pipe.open();
691 try (Pipe.SinkChannel sink = p.sink();
692 Pipe.SourceChannel source = p.source()) {
693 runAfterParkedAsync(source::close);
694 try {
695 int n = source.read(ByteBuffer.allocate(100));
696 fail("read returned " + n);
697 } catch (AsynchronousCloseException expected) { }
698 }
699 });
700 }
701
702 /**
703 * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
704 */
705 @Test
706 void testPipeReadInterrupt() throws Exception {
707 VThreadRunner.run(() -> {
708 Pipe p = Pipe.open();
709 try (Pipe.SinkChannel sink = p.sink();
710 Pipe.SourceChannel source = p.source()) {
711
712 // interrupt current thread when it blocks reading from source
713 Thread thisThread = Thread.currentThread();
714 runAfterParkedAsync(thisThread::interrupt);
715
716 try {
717 int n = source.read(ByteBuffer.allocate(100));
718 fail("read returned " + n);
719 } catch (ClosedByInterruptException expected) {
720 assertTrue(Thread.interrupted());
721 }
722 }
723 });
724 }
725
726 /**
727 * Pipe.SinkChannel close while virtual thread blocked in write.
728 */
729 @Test
730 void testPipeWriteAsyncClose() throws Exception {
731 VThreadRunner.run(() -> {
732 boolean done = false;
733 while (!done) {
734 Pipe p = Pipe.open();
735 try (Pipe.SinkChannel sink = p.sink();
736 Pipe.SourceChannel source = p.source()) {
737
738 // close sink when current thread blocks in write
739 runAfterParkedAsync(sink::close, true);
740
741 // write until channel is closed
742 try {
743 ByteBuffer bb = ByteBuffer.allocate(100*1024);
744 for (;;) {
745 int n = sink.write(bb);
746 assertTrue(n > 0);
747 bb.clear();
748 }
749 } catch (AsynchronousCloseException e) {
750 // closed when blocked in write
751 done = true;
752 } catch (ClosedChannelException e) {
753 // closed but not blocked in write, need to retry test
754 System.err.format("%s, need to retry!%n", e);
755 }
756 }
757 }
758 });
759 }
760
761 /**
762 * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
763 */
764 @Test
765 void testPipeWriteInterrupt() throws Exception {
766 VThreadRunner.run(() -> {
767 boolean done = false;
768 while (!done) {
769 Pipe p = Pipe.open();
770 try (Pipe.SinkChannel sink = p.sink();
771 Pipe.SourceChannel source = p.source()) {
772
773 // interrupt current thread when it blocks in write
774 Thread thisThread = Thread.currentThread();
775 runAfterParkedAsync(thisThread::interrupt, true);
776
777 // write until channel is closed
778 try {
779 ByteBuffer bb = ByteBuffer.allocate(100*1024);
780 for (;;) {
781 int n = sink.write(bb);
782 assertTrue(n > 0);
783 bb.clear();
784 }
785 } catch (ClosedByInterruptException expected) {
786 // closed when blocked in write
787 assertTrue(Thread.interrupted());
788 done = true;
789 } catch (ClosedChannelException e) {
790 // closed but not blocked in write, need to retry test
791 System.err.format("%s, need to retry!%n", e);
792 }
793 }
794 }
795 });
796 }
797
798 /**
799 * Creates a loopback connection
800 */
801 static class Connection implements Closeable {
802 private final SocketChannel sc1;
803 private final SocketChannel sc2;
804 Connection() throws IOException {
805 var lh = InetAddress.getLoopbackAddress();
806 try (var listener = ServerSocketChannel.open()) {
807 listener.bind(new InetSocketAddress(lh, 0));
808 SocketChannel sc1 = SocketChannel.open();
809 SocketChannel sc2 = null;
810 try {
811 sc1.socket().connect(listener.getLocalAddress());
812 sc2 = listener.accept();
813 } catch (IOException ioe) {
814 sc1.close();
815 throw ioe;
816 }
817 this.sc1 = sc1;
818 this.sc2 = sc2;
819 }
820 }
821 SocketChannel channel1() {
822 return sc1;
823 }
824 SocketChannel channel2() {
825 return sc2;
826 }
827 @Override
828 public void close() throws IOException {
829 sc1.close();
830 sc2.close();
831 }
832 }
833
834 /**
835 * Read from a channel until all bytes have been read or an I/O error occurs.
836 */
837 static void readToEOF(ReadableByteChannel rbc) throws IOException {
838 ByteBuffer bb = ByteBuffer.allocate(16*1024);
839 int n;
840 while ((n = rbc.read(bb)) > 0) {
841 bb.clear();
842 }
843 }
844
845 @FunctionalInterface
846 interface ThrowingRunnable {
847 void run() throws Exception;
848 }
849
850 /**
851 * Runs the given task asynchronously after the current virtual thread parks.
852 * @param writing if the thread will block in write
853 * @return the thread started to run the task
854 */
855 private static Thread runAfterParkedAsync(ThrowingRunnable task, boolean writing) {
856 Thread target = Thread.currentThread();
857 if (!target.isVirtual())
858 throw new WrongThreadException();
859 return Thread.ofPlatform().daemon().start(() -> {
860 try {
861 // wait for target thread to park
862 while (!isWaiting(target)) {
863 Thread.sleep(20);
864 }
865
866 // if the target thread is parked in write then we nudge it a few times
867 // to avoid wakeup with some bytes written
868 if (writing) {
869 for (int i = 0; i < 3; i++) {
870 LockSupport.unpark(target);
871 while (!isWaiting(target)) {
872 Thread.sleep(20);
873 }
874 }
875 }
876
877 task.run();
878
879 } catch (Exception e) {
880 e.printStackTrace();
881 }
882 });
883 }
884
885 private static Thread runAfterParkedAsync(ThrowingRunnable task) {
886 return runAfterParkedAsync(task, false);
887 }
888
889 /**
890 * Return true if the given Thread is parked.
891 */
892 private static boolean isWaiting(Thread target) {
893 Thread.State state = target.getState();
894 assertNotEquals(Thread.State.TERMINATED, state);
895 return (state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING);
896 }
897 }