1 /*
2 * Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24 /*
25 * @test id=default
26 * @bug 8284161
27 * @summary Test virtual threads doing blocking I/O on NIO channels
28 * @library /test/lib
29 * @run junit/othervm/timeout=480 BlockingChannelOps
30 */
31
32 /*
33 * @test id=poller-modes
34 * @requires (os.family == "linux") | (os.family == "mac")
35 * @library /test/lib
36 * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 BlockingChannelOps
37 * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 BlockingChannelOps
38 * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 BlockingChannelOps
39 */
40
41 /*
42 * @test id=io_uring
43 * @requires os.family == "linux"
44 * @library /test/lib
45 * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 -Djdk.io_uring=true BlockingChannelOps
46 * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 -Djdk.io_uring=true BlockingChannelOps
47 * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 -Djdk.io_uring=true BlockingChannelOps
48 * @run junit/othervm/timeout=480 -Djdk.pollerMode=1 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps
49 * @run junit/othervm/timeout=480 -Djdk.pollerMode=2 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps
50 * @run junit/othervm/timeout=480 -Djdk.pollerMode=3 -Djdk.io_uring.sqpoll_idle=20 -Djdk.io_uring=true BlockingChannelOps
51 */
52
53 /*
54 * @test id=no-vmcontinuations
55 * @requires vm.continuations
56 * @library /test/lib
57 * @run junit/othervm/timeout=480 -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingChannelOps
58 */
59
60 import java.io.Closeable;
61 import java.io.IOException;
62 import java.net.DatagramPacket;
63 import java.net.InetAddress;
64 import java.net.InetSocketAddress;
65 import java.net.Socket;
66 import java.net.SocketAddress;
67 import java.net.SocketException;
68 import java.nio.ByteBuffer;
69 import java.nio.channels.AsynchronousCloseException;
70 import java.nio.channels.ClosedByInterruptException;
71 import java.nio.channels.ClosedChannelException;
72 import java.nio.channels.DatagramChannel;
73 import java.nio.channels.Pipe;
74 import java.nio.channels.ReadableByteChannel;
75 import java.nio.channels.ServerSocketChannel;
76 import java.nio.channels.SocketChannel;
77 import java.nio.channels.WritableByteChannel;
78 import java.util.concurrent.ExecutorService;
79 import java.util.concurrent.Executors;
80 import java.util.concurrent.ThreadFactory;
81 import java.util.concurrent.locks.LockSupport;
82 import java.util.stream.Stream;
83
84 import jdk.test.lib.thread.VThreadRunner;
85 import jdk.test.lib.thread.VThreadScheduler;
86
87 import org.junit.jupiter.api.BeforeAll;
88 import org.junit.jupiter.params.ParameterizedTest;
89 import org.junit.jupiter.params.provider.MethodSource;
90 import static org.junit.jupiter.api.Assertions.*;
91
92 class BlockingChannelOps {
93 private static ExecutorService threadPool;
94 private static Thread.VirtualThreadScheduler customScheduler;
95
96 @BeforeAll
97 static void setup() {
98 ThreadFactory factory = Thread.ofPlatform().daemon().factory();
99 threadPool = Executors.newCachedThreadPool(factory);
100 customScheduler = Thread.VirtualThreadScheduler.adapt(threadPool);
101 }
102
103 /**
104 * Returns the Thread.Builder to create the virtual thread.
105 */
106 static Stream<Thread.Builder.OfVirtual> threadBuilders() {
107 if (VThreadScheduler.supportsCustomScheduler()) {
108 return Stream.of(Thread.ofVirtual(), Thread.ofVirtual().scheduler(customScheduler));
109 } else {
110 return Stream.of(Thread.ofVirtual());
111 }
112 }
113
114 /**
115 * SocketChannel read/write, no blocking.
116 */
117 @ParameterizedTest
118 @MethodSource("threadBuilders")
119 void testSocketChannelReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
120 VThreadRunner.run(builder, () -> {
121 try (var connection = new Connection()) {
122 SocketChannel sc1 = connection.channel1();
123 SocketChannel sc2 = connection.channel2();
124
125 // write to sc1
126 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
127 int n = sc1.write(bb);
128 assertTrue(n > 0);
129
130 // read from sc2 should not block
131 bb = ByteBuffer.allocate(10);
132 n = sc2.read(bb);
133 assertTrue(n > 0);
134 assertTrue(bb.get(0) == 'X');
135 }
136 });
137 }
138
139 /**
140 * Virtual thread blocks in SocketChannel read.
141 */
142 @ParameterizedTest
143 @MethodSource("threadBuilders")
144 void testSocketChannelRead(Thread.Builder.OfVirtual builder) throws Exception {
145 VThreadRunner.run(builder, () -> {
146 try (var connection = new Connection()) {
147 SocketChannel sc1 = connection.channel1();
148 SocketChannel sc2 = connection.channel2();
149
150 // write to sc1 when current thread blocks in sc2.read
151 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
152 runAfterParkedAsync(() -> sc1.write(bb1));
153
154 // read from sc2 should block
155 ByteBuffer bb2 = ByteBuffer.allocate(10);
156 int n = sc2.read(bb2);
157 assertTrue(n > 0);
158 assertTrue(bb2.get(0) == 'X');
159 }
160 });
161 }
162
163 /**
164 * Virtual thread blocks in SocketChannel write.
165 */
166 @ParameterizedTest
167 @MethodSource("threadBuilders")
168 void testSocketChannelWrite(Thread.Builder.OfVirtual builder) throws Exception {
169 VThreadRunner.run(builder, () -> {
170 try (var connection = new Connection()) {
171 SocketChannel sc1 = connection.channel1();
172 SocketChannel sc2 = connection.channel2();
173
174 // read from sc2 to EOF when current thread blocks in sc1.write
175 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
176
177 // write to sc1 should block
178 ByteBuffer bb = ByteBuffer.allocate(100*1024);
179 for (int i=0; i<1000; i++) {
180 int n = sc1.write(bb);
181 assertTrue(n > 0);
182 bb.clear();
183 }
184 sc1.close();
185
186 // wait for reader to finish
187 reader.join();
188 }
189 });
190 }
191
192 /**
193 * SocketChannel close while virtual thread blocked in read.
194 */
195 @ParameterizedTest
196 @MethodSource("threadBuilders")
197 void testSocketChannelReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
198 VThreadRunner.run(builder, () -> {
199 try (var connection = new Connection()) {
200 SocketChannel sc = connection.channel1();
201 runAfterParkedAsync(sc::close);
202 try {
203 int n = sc.read(ByteBuffer.allocate(100));
204 fail("read returned " + n);
205 } catch (AsynchronousCloseException expected) { }
206 }
207 });
208 }
209
210 /**
211 * SocketChannel shutdownInput while virtual thread blocked in read.
212 */
213 @ParameterizedTest
214 @MethodSource("threadBuilders")
215 void testSocketChannelReadAsyncShutdownInput(Thread.Builder.OfVirtual builder) throws Exception {
216 VThreadRunner.run(builder, () -> {
217 try (var connection = new Connection()) {
218 SocketChannel sc = connection.channel1();
219 runAfterParkedAsync(sc::shutdownInput);
220 int n = sc.read(ByteBuffer.allocate(100));
221 assertEquals(-1, n);
222 assertTrue(sc.isOpen());
223 }
224 });
225 }
226
227 /**
228 * Virtual thread interrupted while blocked in SocketChannel read.
229 */
230 @ParameterizedTest
231 @MethodSource("threadBuilders")
232 void testSocketChannelReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
233 VThreadRunner.run(builder, () -> {
234 try (var connection = new Connection()) {
235 SocketChannel sc = connection.channel1();
236
237 // interrupt current thread when it blocks in read
238 Thread thisThread = Thread.currentThread();
239 runAfterParkedAsync(thisThread::interrupt);
240
241 try {
242 int n = sc.read(ByteBuffer.allocate(100));
243 fail("read returned " + n);
244 } catch (ClosedByInterruptException expected) {
245 assertTrue(Thread.interrupted());
246 }
247 }
248 });
249 }
250
251 /**
252 * SocketChannel close while virtual thread blocked in write.
253 */
254 @ParameterizedTest
255 @MethodSource("threadBuilders")
256 void testSocketChannelWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
257 VThreadRunner.run(builder, () -> {
258 boolean done = false;
259 while (!done) {
260 try (var connection = new Connection()) {
261 SocketChannel sc = connection.channel1();
262
263 // close sc when current thread blocks in write
264 runAfterParkedAsync(sc::close, true);
265
266 // write until channel is closed
267 try {
268 ByteBuffer bb = ByteBuffer.allocate(100*1024);
269 for (;;) {
270 int n = sc.write(bb);
271 assertTrue(n > 0);
272 bb.clear();
273 }
274 } catch (AsynchronousCloseException expected) {
275 // closed when blocked in write
276 done = true;
277 } catch (ClosedChannelException e) {
278 // closed but not blocked in write, need to retry test
279 System.err.format("%s, need to retry!%n", e);
280 }
281 }
282 }
283 });
284 }
285
286 /**
287 * SocketChannel shutdownOutput while virtual thread blocked in write.
288 */
289 @ParameterizedTest
290 @MethodSource("threadBuilders")
291 void testSocketChannelWriteAsyncShutdownOutput(Thread.Builder.OfVirtual builder) throws Exception {
292 VThreadRunner.run(builder, () -> {
293 try (var connection = new Connection()) {
294 SocketChannel sc = connection.channel1();
295
296 // shutdown output when current thread blocks in write
297 runAfterParkedAsync(sc::shutdownOutput);
298 try {
299 ByteBuffer bb = ByteBuffer.allocate(100*1024);
300 for (;;) {
301 int n = sc.write(bb);
302 assertTrue(n > 0);
303 bb.clear();
304 }
305 } catch (ClosedChannelException e) {
306 // expected
307 }
308 assertTrue(sc.isOpen());
309 }
310 });
311 }
312
313 /**
314 * Virtual thread interrupted while blocked in SocketChannel write.
315 */
316 @ParameterizedTest
317 @MethodSource("threadBuilders")
318 void testSocketChannelWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
319 VThreadRunner.run(builder, () -> {
320 boolean done = false;
321 while (!done) {
322 try (var connection = new Connection()) {
323 SocketChannel sc = connection.channel1();
324
325 // interrupt current thread when it blocks in write
326 Thread thisThread = Thread.currentThread();
327 runAfterParkedAsync(thisThread::interrupt, true);
328
329 // write until channel is closed
330 try {
331 ByteBuffer bb = ByteBuffer.allocate(100*1024);
332 for (;;) {
333 int n = sc.write(bb);
334 assertTrue(n > 0);
335 bb.clear();
336 }
337 } catch (ClosedByInterruptException e) {
338 // closed when blocked in write
339 assertTrue(Thread.interrupted());
340 done = true;
341 } catch (ClosedChannelException e) {
342 // closed but not blocked in write, need to retry test
343 System.err.format("%s, need to retry!%n", e);
344 }
345 }
346 }
347 });
348 }
349
350 /**
351 * Virtual thread blocks in SocketChannel adaptor read.
352 */
353 @ParameterizedTest
354 @MethodSource("threadBuilders")
355 void testSocketAdaptorRead1(Thread.Builder.OfVirtual builder) throws Exception {
356 testSocketAdaptorRead(builder, 0);
357 }
358
359 /**
360 * Virtual thread blocks in SocketChannel adaptor read with timeout.
361 */
362 @ParameterizedTest
363 @MethodSource("threadBuilders")
364 void testSocketAdaptorRead2(Thread.Builder.OfVirtual builder) throws Exception {
365 testSocketAdaptorRead(builder, 60_000);
366 }
367
368 private void testSocketAdaptorRead(Thread.Builder.OfVirtual builder,
369 int timeout) throws Exception {
370 VThreadRunner.run(builder, () -> {
371 try (var connection = new Connection()) {
372 SocketChannel sc1 = connection.channel1();
373 SocketChannel sc2 = connection.channel2();
374
375 // write to sc1 when currnet thread blocks reading from sc2
376 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
377 runAfterParkedAsync(() -> sc1.write(bb));
378
379 // read from sc2 should block
380 byte[] array = new byte[100];
381 if (timeout > 0)
382 sc2.socket().setSoTimeout(timeout);
383 int n = sc2.socket().getInputStream().read(array);
384 assertTrue(n > 0);
385 assertTrue(array[0] == 'X');
386 }
387 });
388 }
389
390 /**
391 * ServerSocketChannel accept, no blocking.
392 */
393 @ParameterizedTest
394 @MethodSource("threadBuilders")
395 void testServerSocketChannelAccept1(Thread.Builder.OfVirtual builder) throws Exception {
396 VThreadRunner.run(builder, () -> {
397 try (var ssc = ServerSocketChannel.open()) {
398 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
399 var sc1 = SocketChannel.open(ssc.getLocalAddress());
400 // accept should not block
401 var sc2 = ssc.accept();
402 sc1.close();
403 sc2.close();
404 }
405 });
406 }
407
408 /**
409 * Virtual thread blocks in ServerSocketChannel accept.
410 */
411 @ParameterizedTest
412 @MethodSource("threadBuilders")
413 void testServerSocketChannelAccept2(Thread.Builder.OfVirtual builder) throws Exception {
414 VThreadRunner.run(builder, () -> {
415 try (var ssc = ServerSocketChannel.open()) {
416 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
417 var sc1 = SocketChannel.open();
418
419 // connect when current thread when it blocks in accept
420 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
421
422 // accept should block
423 var sc2 = ssc.accept();
424 sc1.close();
425 sc2.close();
426 }
427 });
428 }
429
430 /**
431 * SeverSocketChannel close while virtual thread blocked in accept.
432 */
433 @ParameterizedTest
434 @MethodSource("threadBuilders")
435 void testServerSocketChannelAcceptAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
436 VThreadRunner.run(builder, () -> {
437 try (var ssc = ServerSocketChannel.open()) {
438 InetAddress lh = InetAddress.getLoopbackAddress();
439 ssc.bind(new InetSocketAddress(lh, 0));
440 runAfterParkedAsync(ssc::close);
441 try {
442 SocketChannel sc = ssc.accept();
443 sc.close();
444 fail("connection accepted???");
445 } catch (AsynchronousCloseException expected) { }
446 }
447 });
448 }
449
450 /**
451 * Virtual thread interrupted while blocked in ServerSocketChannel accept.
452 */
453 @ParameterizedTest
454 @MethodSource("threadBuilders")
455 void testServerSocketChannelAcceptInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
456 VThreadRunner.run(builder, () -> {
457 try (var ssc = ServerSocketChannel.open()) {
458 InetAddress lh = InetAddress.getLoopbackAddress();
459 ssc.bind(new InetSocketAddress(lh, 0));
460
461 // interrupt current thread when it blocks in accept
462 Thread thisThread = Thread.currentThread();
463 runAfterParkedAsync(thisThread::interrupt);
464
465 try {
466 SocketChannel sc = ssc.accept();
467 sc.close();
468 fail("connection accepted???");
469 } catch (ClosedByInterruptException expected) {
470 assertTrue(Thread.interrupted());
471 }
472 }
473 });
474 }
475
476 /**
477 * Virtual thread blocks in ServerSocketChannel adaptor accept.
478 */
479 @ParameterizedTest
480 @MethodSource("threadBuilders")
481 void testSocketChannelAdaptorAccept1(Thread.Builder.OfVirtual builder) throws Exception {
482 testSocketChannelAdaptorAccept(builder, 0);
483 }
484
485 /**
486 * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
487 */
488 @ParameterizedTest
489 @MethodSource("threadBuilders")
490 void testSocketChannelAdaptorAccept2(Thread.Builder.OfVirtual builder) throws Exception {
491 testSocketChannelAdaptorAccept(builder, 60_000);
492 }
493
494 private void testSocketChannelAdaptorAccept(Thread.Builder.OfVirtual builder,
495 int timeout) throws Exception {
496 VThreadRunner.run(builder, () -> {
497 try (var ssc = ServerSocketChannel.open()) {
498 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
499 var sc = SocketChannel.open();
500
501 // interrupt current thread when it blocks in accept
502 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
503
504 // accept should block
505 if (timeout > 0)
506 ssc.socket().setSoTimeout(timeout);
507 Socket s = ssc.socket().accept();
508 sc.close();
509 s.close();
510 }
511 });
512 }
513
514 /**
515 * DatagramChannel receive/send, no blocking.
516 */
517 @ParameterizedTest
518 @MethodSource("threadBuilders")
519 void testDatagramChannelSendReceive1(Thread.Builder.OfVirtual builder) throws Exception {
520 VThreadRunner.run(builder, () -> {
521 try (DatagramChannel dc1 = DatagramChannel.open();
522 DatagramChannel dc2 = DatagramChannel.open()) {
523
524 InetAddress lh = InetAddress.getLoopbackAddress();
525 dc2.bind(new InetSocketAddress(lh, 0));
526
527 // send should not block
528 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
529 int n = dc1.send(bb, dc2.getLocalAddress());
530 assertTrue(n > 0);
531
532 // receive should not block
533 bb = ByteBuffer.allocate(10);
534 dc2.receive(bb);
535 assertTrue(bb.get(0) == 'X');
536 }
537 });
538 }
539
540 /**
541 * Virtual thread blocks in DatagramChannel receive.
542 */
543 @ParameterizedTest
544 @MethodSource("threadBuilders")
545 void testDatagramChannelSendReceive2(Thread.Builder.OfVirtual builder) throws Exception {
546 VThreadRunner.run(builder, () -> {
547 try (DatagramChannel dc1 = DatagramChannel.open();
548 DatagramChannel dc2 = DatagramChannel.open()) {
549
550 InetAddress lh = InetAddress.getLoopbackAddress();
551 dc2.bind(new InetSocketAddress(lh, 0));
552
553 // send from dc1 when current thread blocked in dc2.receive
554 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
555 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
556
557 // read from dc2 should block
558 ByteBuffer bb2 = ByteBuffer.allocate(10);
559 dc2.receive(bb2);
560 assertTrue(bb2.get(0) == 'X');
561 }
562 });
563 }
564
565 /**
566 * DatagramChannel close while virtual thread blocked in receive.
567 */
568 @ParameterizedTest
569 @MethodSource("threadBuilders")
570 void testDatagramChannelReceiveAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
571 VThreadRunner.run(builder, () -> {
572 try (DatagramChannel dc = DatagramChannel.open()) {
573 InetAddress lh = InetAddress.getLoopbackAddress();
574 dc.bind(new InetSocketAddress(lh, 0));
575 runAfterParkedAsync(dc::close);
576 try {
577 dc.receive(ByteBuffer.allocate(100));
578 fail("receive returned");
579 } catch (AsynchronousCloseException expected) { }
580 }
581 });
582 }
583
584 /**
585 * Virtual thread interrupted while blocked in DatagramChannel receive.
586 */
587 @ParameterizedTest
588 @MethodSource("threadBuilders")
589 void testDatagramChannelReceiveInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
590 VThreadRunner.run(builder, () -> {
591 try (DatagramChannel dc = DatagramChannel.open()) {
592 InetAddress lh = InetAddress.getLoopbackAddress();
593 dc.bind(new InetSocketAddress(lh, 0));
594
595 // interrupt current thread when it blocks in receive
596 Thread thisThread = Thread.currentThread();
597 runAfterParkedAsync(thisThread::interrupt);
598
599 try {
600 dc.receive(ByteBuffer.allocate(100));
601 fail("receive returned");
602 } catch (ClosedByInterruptException expected) {
603 assertTrue(Thread.interrupted());
604 }
605 }
606 });
607 }
608
609 /**
610 * Virtual thread blocks in DatagramSocket adaptor receive.
611 */
612 @ParameterizedTest
613 @MethodSource("threadBuilders")
614 void testDatagramSocketAdaptorReceive1(Thread.Builder.OfVirtual builder) throws Exception {
615 testDatagramSocketAdaptorReceive(builder, 0);
616 }
617
618 /**
619 * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
620 */
621 @ParameterizedTest
622 @MethodSource("threadBuilders")
623 void testDatagramSocketAdaptorReceive2(Thread.Builder.OfVirtual builder) throws Exception {
624 testDatagramSocketAdaptorReceive(builder, 60_000);
625 }
626
627 private void testDatagramSocketAdaptorReceive(Thread.Builder.OfVirtual builder,
628 int timeout) throws Exception {
629 VThreadRunner.run(builder, () -> {
630 try (DatagramChannel dc1 = DatagramChannel.open();
631 DatagramChannel dc2 = DatagramChannel.open()) {
632
633 InetAddress lh = InetAddress.getLoopbackAddress();
634 dc2.bind(new InetSocketAddress(lh, 0));
635
636 // send from dc1 when current thread blocks in dc2 receive
637 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
638 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
639
640 // receive should block
641 byte[] array = new byte[100];
642 DatagramPacket p = new DatagramPacket(array, 0, array.length);
643 if (timeout > 0)
644 dc2.socket().setSoTimeout(timeout);
645 dc2.socket().receive(p);
646 assertTrue(p.getLength() == 3 && array[0] == 'X');
647 }
648 });
649 }
650
651 /**
652 * DatagramChannel close while virtual thread blocked in adaptor receive.
653 */
654 @ParameterizedTest
655 @MethodSource("threadBuilders")
656 void testDatagramSocketAdaptorReceiveAsyncClose1(Thread.Builder.OfVirtual builder) throws Exception {
657 testDatagramSocketAdaptorReceiveAsyncClose(builder, 0);
658 }
659
660 /**
661 * DatagramChannel close while virtual thread blocked in adaptor receive
662 * with timeout.
663 */
664 @ParameterizedTest
665 @MethodSource("threadBuilders")
666 void testDatagramSocketAdaptorReceiveAsyncClose2(Thread.Builder.OfVirtual builder) throws Exception {
667 testDatagramSocketAdaptorReceiveAsyncClose(builder, 60_1000);
668 }
669
670 private void testDatagramSocketAdaptorReceiveAsyncClose(Thread.Builder.OfVirtual builder,
671 int timeout) throws Exception {
672 VThreadRunner.run(builder, () -> {
673 try (DatagramChannel dc = DatagramChannel.open()) {
674 InetAddress lh = InetAddress.getLoopbackAddress();
675 dc.bind(new InetSocketAddress(lh, 0));
676
677 byte[] array = new byte[100];
678 DatagramPacket p = new DatagramPacket(array, 0, array.length);
679 if (timeout > 0)
680 dc.socket().setSoTimeout(timeout);
681
682 // close channel/socket when current thread blocks in receive
683 runAfterParkedAsync(dc::close);
684
685 assertThrows(SocketException.class, () -> dc.socket().receive(p));
686 }
687 });
688 }
689
690 /**
691 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
692 */
693 @ParameterizedTest
694 @MethodSource("threadBuilders")
695 void testDatagramSocketAdaptorReceiveInterrupt1(Thread.Builder.OfVirtual builder) throws Exception {
696 testDatagramSocketAdaptorReceiveInterrupt(builder, 0);
697 }
698
699 /**
700 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
701 * with timeout.
702 */
703 @ParameterizedTest
704 @MethodSource("threadBuilders")
705 void testDatagramSocketAdaptorReceiveInterrupt2(Thread.Builder.OfVirtual builder) throws Exception {
706 testDatagramSocketAdaptorReceiveInterrupt(builder, 60_1000);
707 }
708
709 private void testDatagramSocketAdaptorReceiveInterrupt(Thread.Builder.OfVirtual builder,
710 int timeout) throws Exception {
711 VThreadRunner.run(builder, () -> {
712 try (DatagramChannel dc = DatagramChannel.open()) {
713 InetAddress lh = InetAddress.getLoopbackAddress();
714 dc.bind(new InetSocketAddress(lh, 0));
715
716 byte[] array = new byte[100];
717 DatagramPacket p = new DatagramPacket(array, 0, array.length);
718 if (timeout > 0)
719 dc.socket().setSoTimeout(timeout);
720
721 // interrupt current thread when it blocks in receive
722 Thread thisThread = Thread.currentThread();
723 runAfterParkedAsync(thisThread::interrupt);
724
725 try {
726 dc.socket().receive(p);
727 fail();
728 } catch (ClosedByInterruptException expected) {
729 assertTrue(Thread.interrupted());
730 }
731 }
732 });
733 }
734
735 /**
736 * Pipe read/write, no blocking.
737 */
738 @ParameterizedTest
739 @MethodSource("threadBuilders")
740 void testPipeReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
741 VThreadRunner.run(builder, () -> {
742 Pipe p = Pipe.open();
743 try (Pipe.SinkChannel sink = p.sink();
744 Pipe.SourceChannel source = p.source()) {
745
746 // write should not block
747 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
748 int n = sink.write(bb);
749 assertTrue(n > 0);
750
751 // read should not block
752 bb = ByteBuffer.allocate(10);
753 n = source.read(bb);
754 assertTrue(n > 0);
755 assertTrue(bb.get(0) == 'X');
756 }
757 });
758 }
759
760 /**
761 * Virtual thread blocks in Pipe.SourceChannel read.
762 */
763 @ParameterizedTest
764 @MethodSource("threadBuilders")
765 void testPipeReadWrite2(Thread.Builder.OfVirtual builder) throws Exception {
766 VThreadRunner.run(builder, () -> {
767 Pipe p = Pipe.open();
768 try (Pipe.SinkChannel sink = p.sink();
769 Pipe.SourceChannel source = p.source()) {
770
771 // write from sink when current thread blocks reading from source
772 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
773 runAfterParkedAsync(() -> sink.write(bb1));
774
775 // read should block
776 ByteBuffer bb2 = ByteBuffer.allocate(10);
777 int n = source.read(bb2);
778 assertTrue(n > 0);
779 assertTrue(bb2.get(0) == 'X');
780 }
781 });
782 }
783
784 /**
785 * Virtual thread blocks in Pipe.SinkChannel write.
786 */
787 @ParameterizedTest
788 @MethodSource("threadBuilders")
789 void testPipeReadWrite3(Thread.Builder.OfVirtual builder) throws Exception {
790 VThreadRunner.run(builder, () -> {
791 Pipe p = Pipe.open();
792 try (Pipe.SinkChannel sink = p.sink();
793 Pipe.SourceChannel source = p.source()) {
794
795 // read from source to EOF when current thread blocking in write
796 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
797
798 // write to sink should block
799 ByteBuffer bb = ByteBuffer.allocate(100*1024);
800 for (int i=0; i<1000; i++) {
801 int n = sink.write(bb);
802 assertTrue(n > 0);
803 bb.clear();
804 }
805 sink.close();
806
807 // wait for reader to finish
808 reader.join();
809 }
810 });
811 }
812
813 /**
814 * Pipe.SourceChannel close while virtual thread blocked in read.
815 */
816 @ParameterizedTest
817 @MethodSource("threadBuilders")
818 void testPipeReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
819 VThreadRunner.run(builder, () -> {
820 Pipe p = Pipe.open();
821 try (Pipe.SinkChannel sink = p.sink();
822 Pipe.SourceChannel source = p.source()) {
823 runAfterParkedAsync(source::close);
824 try {
825 int n = source.read(ByteBuffer.allocate(100));
826 fail("read returned " + n);
827 } catch (AsynchronousCloseException expected) { }
828 }
829 });
830 }
831
832 /**
833 * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
834 */
835 @ParameterizedTest
836 @MethodSource("threadBuilders")
837 void testPipeReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
838 VThreadRunner.run(builder, () -> {
839 Pipe p = Pipe.open();
840 try (Pipe.SinkChannel sink = p.sink();
841 Pipe.SourceChannel source = p.source()) {
842
843 // interrupt current thread when it blocks reading from source
844 Thread thisThread = Thread.currentThread();
845 runAfterParkedAsync(thisThread::interrupt);
846
847 try {
848 int n = source.read(ByteBuffer.allocate(100));
849 fail("read returned " + n);
850 } catch (ClosedByInterruptException expected) {
851 assertTrue(Thread.interrupted());
852 }
853 }
854 });
855 }
856
857 /**
858 * Pipe.SinkChannel close while virtual thread blocked in write.
859 */
860 @ParameterizedTest
861 @MethodSource("threadBuilders")
862 void testPipeWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
863 VThreadRunner.run(builder, () -> {
864 boolean done = false;
865 while (!done) {
866 Pipe p = Pipe.open();
867 try (Pipe.SinkChannel sink = p.sink();
868 Pipe.SourceChannel source = p.source()) {
869
870 // close sink when current thread blocks in write
871 runAfterParkedAsync(sink::close, true);
872
873 // write until channel is closed
874 try {
875 ByteBuffer bb = ByteBuffer.allocate(100*1024);
876 for (;;) {
877 int n = sink.write(bb);
878 assertTrue(n > 0);
879 bb.clear();
880 }
881 } catch (AsynchronousCloseException e) {
882 // closed when blocked in write
883 done = true;
884 } catch (ClosedChannelException e) {
885 // closed but not blocked in write, need to retry test
886 System.err.format("%s, need to retry!%n", e);
887 }
888 }
889 }
890 });
891 }
892
893 /**
894 * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
895 */
896 @ParameterizedTest
897 @MethodSource("threadBuilders")
898 void testPipeWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
899 VThreadRunner.run(builder, () -> {
900 boolean done = false;
901 while (!done) {
902 Pipe p = Pipe.open();
903 try (Pipe.SinkChannel sink = p.sink();
904 Pipe.SourceChannel source = p.source()) {
905
906 // interrupt current thread when it blocks in write
907 Thread thisThread = Thread.currentThread();
908 runAfterParkedAsync(thisThread::interrupt, true);
909
910 // write until channel is closed
911 try {
912 ByteBuffer bb = ByteBuffer.allocate(100*1024);
913 for (;;) {
914 int n = sink.write(bb);
915 assertTrue(n > 0);
916 bb.clear();
917 }
918 } catch (ClosedByInterruptException expected) {
919 // closed when blocked in write
920 assertTrue(Thread.interrupted());
921 done = true;
922 } catch (ClosedChannelException e) {
923 // closed but not blocked in write, need to retry test
924 System.err.format("%s, need to retry!%n", e);
925 }
926 }
927 }
928 });
929 }
930
931 /**
932 * Creates a loopback connection
933 */
934 static class Connection implements Closeable {
935 private final SocketChannel sc1;
936 private final SocketChannel sc2;
937 Connection() throws IOException {
938 var lh = InetAddress.getLoopbackAddress();
939 try (var listener = ServerSocketChannel.open()) {
940 listener.bind(new InetSocketAddress(lh, 0));
941 SocketChannel sc1 = SocketChannel.open();
942 SocketChannel sc2 = null;
943 try {
944 sc1.socket().connect(listener.getLocalAddress());
945 sc2 = listener.accept();
946 } catch (IOException ioe) {
947 sc1.close();
948 throw ioe;
949 }
950 this.sc1 = sc1;
951 this.sc2 = sc2;
952 }
953 }
954 SocketChannel channel1() {
955 return sc1;
956 }
957 SocketChannel channel2() {
958 return sc2;
959 }
960 @Override
961 public void close() throws IOException {
962 sc1.close();
963 sc2.close();
964 }
965 }
966
967 /**
968 * Read from a channel until all bytes have been read or an I/O error occurs.
969 */
970 static void readToEOF(ReadableByteChannel rbc) throws IOException {
971 ByteBuffer bb = ByteBuffer.allocate(16*1024);
972 int n;
973 while ((n = rbc.read(bb)) > 0) {
974 bb.clear();
975 }
976 }
977
978 @FunctionalInterface
979 interface ThrowingRunnable {
980 void run() throws Exception;
981 }
982
983 /**
984 * Runs the given task asynchronously after the current virtual thread parks.
985 * @param writing if the thread will block in write
986 * @return the thread started to run the task
987 */
988 private static Thread runAfterParkedAsync(ThrowingRunnable task, boolean writing) {
989 Thread target = Thread.currentThread();
990 if (!target.isVirtual())
991 throw new WrongThreadException();
992 return Thread.ofPlatform().daemon().start(() -> {
993 try {
994 // wait for target thread to park
995 while (!isWaiting(target)) {
996 Thread.sleep(20);
997 }
998
999 // if the target thread is parked in write then we nudge it a few times
1000 // to avoid wakeup with some bytes written
1001 if (writing) {
1002 for (int i = 0; i < 3; i++) {
1003 LockSupport.unpark(target);
1004 while (!isWaiting(target)) {
1005 Thread.sleep(20);
1006 }
1007 }
1008 }
1009
1010 task.run();
1011
1012 } catch (Exception e) {
1013 e.printStackTrace();
1014 }
1015 });
1016 }
1017
1018 private static Thread runAfterParkedAsync(ThrowingRunnable task) {
1019 return runAfterParkedAsync(task, false);
1020 }
1021
1022 /**
1023 * Return true if the given Thread is parked.
1024 */
1025 private static boolean isWaiting(Thread target) {
1026 Thread.State state = target.getState();
1027 assertNotEquals(Thread.State.TERMINATED, state);
1028 return (state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING);
1029 }
1030 }