1 /*
2 * Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24 /*
25 * @test id=default
26 * @bug 8284161 8372958
27 * @summary Test virtual threads doing blocking I/O on java.net Sockets
28 * @library /test/lib
29 * @run junit BlockingSocketOps
30 */
31
32 /*
33 * @test id=poller-modes
34 * @requires (os.family == "linux") | (os.family == "mac")
35 * @library /test/lib
36 * @run junit/othervm -Djdk.pollerMode=1 BlockingSocketOps
37 * @run junit/othervm -Djdk.pollerMode=2 BlockingSocketOps
38 * @run junit/othervm -Djdk.pollerMode=3 BlockingSocketOps
39 */
40
41 /*
42 * @test id=io_uring
43 * @requires os.family == "linux"
44 * @library /test/lib
45 * @run junit/othervm -Djdk.pollerMode=1 -Djdk.io_uring=true BlockingSocketOps
46 * @run junit/othervm -Djdk.pollerMode=2 -Djdk.io_uring=true BlockingSocketOps
47 * @run junit/othervm -Djdk.pollerMode=3 -Djdk.io_uring=true BlockingSocketOps
48 * @run junit/othervm -Djdk.pollerMode=1 -Djdk.io_uring=true -Djdk.io_uring.sqpoll_idle=20 BlockingSocketOps
49 * @run junit/othervm -Djdk.pollerMode=2 -Djdk.io_uring=true -Djdk.io_uring.sqpoll_idle=20 BlockingSocketOps
50 * @run junit/othervm -Djdk.pollerMode=3 -Djdk.io_uring=true -Djdk.io_uring.sqpoll_idle=20 BlockingSocketOps
51 * @run junit/othervm -Djdk.pollerMode=1 -Djdk.io_uring=true -Djdk.io_uring.read=true -Djdk.io_uring.write=true BlockingSocketOps
52 * @run junit/othervm -Djdk.pollerMode=2 -Djdk.io_uring=true -Djdk.io_uring.read=true -Djdk.io_uring.write=true BlockingSocketOps
53 * @run junit/othervm -Djdk.pollerMode=3 -Djdk.io_uring=true -Djdk.io_uring.read=true -Djdk.io_uring.write=true BlockingSocketOps
54 * @run junit/othervm -Djdk.pollerMode=1 -Djdk.io_uring=true -Djdk.io_uring.read=true -Djdk.io_uring.write=true -Djdk.io_uring.sqpoll_idle=20 BlockingSocketOps
55 * @run junit/othervm -Djdk.pollerMode=2 -Djdk.io_uring=true -Djdk.io_uring.read=true -Djdk.io_uring.write=true -Djdk.io_uring.sqpoll_idle=20 BlockingSocketOps
56 * @run junit/othervm -Djdk.pollerMode=3 -Djdk.io_uring=true -Djdk.io_uring.read=true -Djdk.io_uring.write=true -Djdk.io_uring.sqpoll_idle=20 BlockingSocketOps
57 */
58
59 /*
60 * @test id=no-vmcontinuations
61 * @requires vm.continuations
62 * @library /test/lib
63 * @run junit/othervm -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingSocketOps
64 */
65
66 import java.io.Closeable;
67 import java.io.IOException;
68 import java.io.InputStream;
69 import java.io.OutputStream;
70 import java.net.DatagramPacket;
71 import java.net.DatagramSocket;
72 import java.net.InetAddress;
73 import java.net.InetSocketAddress;
74 import java.net.ServerSocket;
75 import java.net.Socket;
76 import java.net.SocketAddress;
77 import java.net.SocketException;
78 import java.net.SocketTimeoutException;
79
80 import jdk.test.lib.thread.VThreadRunner;
81 import org.junit.jupiter.api.Test;
82 import org.junit.jupiter.params.ParameterizedTest;
83 import org.junit.jupiter.params.provider.ValueSource;
84 import static org.junit.jupiter.api.Assertions.*;
85
86 class BlockingSocketOps {
87
88 /**
89 * Socket read/write, no blocking.
90 */
91 @Test
92 void testSocketReadWrite1() throws Exception {
93 VThreadRunner.run(() -> {
94 try (var connection = new Connection()) {
95 Socket s1 = connection.socket1();
96 Socket s2 = connection.socket2();
97
98 // write should not block
99 byte[] ba = "XXX".getBytes("UTF-8");
100 s1.getOutputStream().write(ba);
101
102 // read should not block
103 ba = new byte[10];
104 int n = s2.getInputStream().read(ba);
105 assertTrue(n > 0);
106 assertTrue(ba[0] == 'X');
107 }
108 });
109 }
110
111 /**
112 * Virtual thread blocks in read.
113 */
114 @ParameterizedTest
115 @ValueSource(ints = { 0, 60_000 })
116 void testSocketRead(int timeout) throws Exception {
117 VThreadRunner.run(() -> {
118 try (var connection = new Connection()) {
119 Socket s1 = connection.socket1();
120 Socket s2 = connection.socket2();
121
122 // delayed write from sc1
123 byte[] ba1 = "XXX".getBytes("UTF-8");
124 runAfterParkedAsync(() -> s1.getOutputStream().write(ba1));
125
126 // read from sc2 should block
127 if (timeout > 0) {
128 s2.setSoTimeout(timeout);
129 }
130 byte[] ba2 = new byte[10];
131 int n = s2.getInputStream().read(ba2);
132 assertTrue(n > 0);
133 assertTrue(ba2[0] == 'X');
134 }
135 });
136 }
137
138 /**
139 * Virtual thread blocks in write.
140 */
141 @Test
142 void testSocketWrite1() throws Exception {
143 VThreadRunner.run(() -> {
144 try (var connection = new Connection()) {
145 Socket s1 = connection.socket1();
146 Socket s2 = connection.socket2();
147
148 // delayed read from s2 to EOF
149 InputStream in = s2.getInputStream();
150 Thread reader = runAfterParkedAsync(() ->
151 in.transferTo(OutputStream.nullOutputStream()));
152
153 // write should block
154 byte[] ba = new byte[100*1024];
155 try (OutputStream out = s1.getOutputStream()) {
156 for (int i = 0; i < 1000; i++) {
157 out.write(ba);
158 }
159 }
160
161 // wait for reader to finish
162 reader.join();
163 }
164 });
165 }
166
167 /**
168 * Virtual thread blocks in read, peer closes connection gracefully.
169 */
170 @Test
171 void testSocketReadPeerClose1() throws Exception {
172 VThreadRunner.run(() -> {
173 try (var connection = new Connection()) {
174 Socket s1 = connection.socket1();
175 Socket s2 = connection.socket2();
176
177 // delayed close of s2
178 runAfterParkedAsync(s2::close);
179
180 // read from s1 should block, then read -1
181 int n = s1.getInputStream().read();
182 assertTrue(n == -1);
183 }
184 });
185 }
186
187 /**
188 * Virtual thread blocks in read, peer closes connection abruptly.
189 */
190 @Test
191 void testSocketReadPeerClose2() throws Exception {
192 VThreadRunner.run(() -> {
193 try (var connection = new Connection()) {
194 Socket s1 = connection.socket1();
195 Socket s2 = connection.socket2();
196
197 // delayed abrupt close of s2
198 s2.setSoLinger(true, 0);
199 runAfterParkedAsync(s2::close);
200
201 // read from s1 should block, then throw
202 try {
203 int n = s1.getInputStream().read();
204 fail("read " + n);
205 } catch (IOException ioe) {
206 // expected
207 }
208 }
209 });
210 }
211
212 /**
213 * Socket close while virtual thread blocked in read.
214 */
215 @ParameterizedTest
216 @ValueSource(ints = { 0, 60_000 })
217 void testSocketReadAsyncClose(int timeout) throws Exception {
218 VThreadRunner.run(() -> {
219 try (var connection = new Connection()) {
220 Socket s = connection.socket1();
221
222 // delayed close of s
223 runAfterParkedAsync(s::close);
224
225 // read from s should block, then throw
226 if (timeout > 0) {
227 s.setSoTimeout(timeout);
228 }
229 try {
230 int n = s.getInputStream().read();
231 fail("read " + n);
232 } catch (SocketException expected) {
233 log(expected);
234 }
235 }
236 });
237 }
238
239 /**
240 * Socket shutdownInput while virtual thread blocked in read.
241 */
242 @ParameterizedTest
243 @ValueSource(ints = { 0, 60_000 })
244 void testSocketReadAsyncShutdownInput(int timeout) throws Exception {
245 VThreadRunner.run(() -> {
246 try (var connection = new Connection()) {
247 Socket s = connection.socket1();
248
249 // delayed shutdown of input stream
250 InputStream in = s.getInputStream();
251 runAfterParkedAsync(s::shutdownInput);
252
253 // read should return -1
254 if (timeout > 0) {
255 s.setSoTimeout(timeout);
256 }
257 assertEquals(-1, in.read());
258 assertEquals(0, in.available());
259 assertFalse(s.isClosed());
260 }
261 });
262 }
263
264 /**
265 * Virtual thread interrupted while blocked in Socket read.
266 */
267 @ParameterizedTest
268 @ValueSource(ints = { 0, 60_000 })
269 void testSocketReadInterrupt(int timeout) throws Exception {
270 VThreadRunner.run(() -> {
271 try (var connection = new Connection()) {
272 Socket s = connection.socket1();
273
274
275 // delayed interrupt of current thread
276 Thread thisThread = Thread.currentThread();
277 runAfterParkedAsync(thisThread::interrupt);
278
279 // read from s should block, then throw
280 if (timeout > 0) {
281 s.setSoTimeout(timeout);
282 }
283 try {
284 int n = s.getInputStream().read();
285 fail("read " + n);
286 } catch (SocketException expected) {
287 log(expected);
288 assertTrue(Thread.interrupted());
289 assertTrue(s.isClosed());
290 }
291 }
292 });
293 }
294
295 /**
296 * Socket close while virtual thread blocked in write.
297 */
298 @Test
299 void testSocketWriteAsyncClose() throws Exception {
300 VThreadRunner.run(() -> {
301 try (var connection = new Connection()) {
302 Socket s = connection.socket1();
303
304 // delayed close of s
305 runAfterParkedAsync(s::close);
306
307 // write to s should block, then throw
308 try {
309 byte[] ba = new byte[100*1024];
310 OutputStream out = s.getOutputStream();
311 for (;;) {
312 out.write(ba);
313 }
314 } catch (SocketException expected) {
315 log(expected);
316 }
317 }
318 });
319 }
320
321 /**
322 * Socket shutdownOutput while virtual thread blocked in write.
323 */
324 @Test
325 void testSocketWriteAsyncShutdownOutput() throws Exception {
326 VThreadRunner.run(() -> {
327 try (var connection = new Connection()) {
328 Socket s = connection.socket1();
329
330 // delayed shutdown of output stream
331 OutputStream out = s.getOutputStream();
332 runAfterParkedAsync(s::shutdownOutput);
333
334 // write to s should block, then throw
335 try {
336 byte[] ba = new byte[100*1024];
337 for (;;) {
338 out.write(ba);
339 }
340 } catch (SocketException expected) {
341 log(expected);
342 }
343 assertFalse(s.isClosed());
344 }
345 });
346 }
347
348 /**
349 * Virtual thread interrupted while blocked in Socket write.
350 */
351 @Test
352 void testSocketWriteInterrupt() throws Exception {
353 VThreadRunner.run(() -> {
354 try (var connection = new Connection()) {
355 Socket s = connection.socket1();
356
357 // delayed interrupt of current thread
358 Thread thisThread = Thread.currentThread();
359 runAfterParkedAsync(thisThread::interrupt);
360
361 // write to s should block, then throw
362 try {
363 byte[] ba = new byte[100*1024];
364 OutputStream out = s.getOutputStream();
365 for (;;) {
366 out.write(ba);
367 }
368 } catch (SocketException expected) {
369 log(expected);
370 assertTrue(Thread.interrupted());
371 assertTrue(s.isClosed());
372 }
373 }
374 });
375 }
376
377 /**
378 * Virtual thread reading urgent data when SO_OOBINLINE is enabled.
379 */
380 @Test
381 void testSocketReadUrgentData() throws Exception {
382 VThreadRunner.run(() -> {
383 try (var connection = new Connection()) {
384 Socket s1 = connection.socket1();
385 Socket s2 = connection.socket2();
386
387 // urgent data should be received
388 runAfterParkedAsync(() -> s2.sendUrgentData('X'));
389
390 // read should block, then read the OOB byte
391 s1.setOOBInline(true);
392 byte[] ba = new byte[10];
393 int n = s1.getInputStream().read(ba);
394 assertTrue(n == 1);
395 assertTrue(ba[0] == 'X');
396
397 // urgent data should not be received
398 s1.setOOBInline(false);
399 s1.setSoTimeout(500);
400 s2.sendUrgentData('X');
401 try {
402 s1.getInputStream().read(ba);
403 fail();
404 } catch (SocketTimeoutException expected) {
405 log(expected);
406 }
407 }
408 });
409 }
410
411 /**
412 * ServerSocket accept, no blocking.
413 */
414 @Test
415 void testServerSocketAccept1() throws Exception {
416 VThreadRunner.run(() -> {
417 try (var listener = new ServerSocket()) {
418 InetAddress loopback = InetAddress.getLoopbackAddress();
419 listener.bind(new InetSocketAddress(loopback, 0));
420
421 // establish connection
422 var socket1 = new Socket(loopback, listener.getLocalPort());
423
424 // accept should not block
425 var socket2 = listener.accept();
426 socket1.close();
427 socket2.close();
428 }
429 });
430 }
431
432 /**
433 * Virtual thread blocks in accept.
434 */
435 @ParameterizedTest
436 @ValueSource(ints = { 0, 60_000 })
437 void testServerSocketAccept(int timeout) throws Exception {
438 VThreadRunner.run(() -> {
439 try (var listener = new ServerSocket()) {
440 InetAddress loopback = InetAddress.getLoopbackAddress();
441 listener.bind(new InetSocketAddress(loopback, 0));
442
443 // schedule connect
444 var socket1 = new Socket();
445 SocketAddress remote = listener.getLocalSocketAddress();
446 runAfterParkedAsync(() -> socket1.connect(remote));
447
448 // accept should block
449 if (timeout > 0) {
450 listener.setSoTimeout(timeout);
451 }
452 var socket2 = listener.accept();
453 socket1.close();
454 socket2.close();
455 }
456 });
457 }
458
459 /**
460 * ServerSocket close while virtual thread blocked in accept.
461 */
462 @ParameterizedTest
463 @ValueSource(ints = { 0, 60_000 })
464 void testServerSocketAcceptAsyncClose(int timeout) throws Exception {
465 VThreadRunner.run(() -> {
466 try (var listener = new ServerSocket()) {
467 InetAddress loopback = InetAddress.getLoopbackAddress();
468 listener.bind(new InetSocketAddress(loopback, 0));
469
470 // delayed close of listener
471 runAfterParkedAsync(listener::close);
472
473 // accept should block, then throw
474 if (timeout > 0) {
475 listener.setSoTimeout(timeout);
476 }
477 try {
478 listener.accept().close();
479 fail("connection accepted???");
480 } catch (SocketException expected) {
481 log(expected);
482 }
483 }
484 });
485 }
486
487 /**
488 * Virtual thread interrupted while blocked in ServerSocket accept.
489 */
490 @ParameterizedTest
491 @ValueSource(ints = { 0, 60_000 })
492 void testServerSocketAcceptInterrupt(int timeout) throws Exception {
493 VThreadRunner.run(() -> {
494 try (var listener = new ServerSocket()) {
495 InetAddress loopback = InetAddress.getLoopbackAddress();
496 listener.bind(new InetSocketAddress(loopback, 0));
497
498 // delayed interrupt of current thread
499 Thread thisThread = Thread.currentThread();
500 runAfterParkedAsync(thisThread::interrupt);
501
502 // accept should block, then throw
503 if (timeout > 0) {
504 listener.setSoTimeout(timeout);
505 }
506 try {
507 listener.accept().close();
508 fail("connection accepted???");
509 } catch (SocketException expected) {
510 log(expected);
511 assertTrue(Thread.interrupted());
512 assertTrue(listener.isClosed());
513 }
514 }
515 });
516 }
517
518 /**
519 * DatagramSocket receive/send, no blocking.
520 */
521 @Test
522 void testDatagramSocketSendReceive1() throws Exception {
523 VThreadRunner.run(() -> {
524 try (DatagramSocket s1 = new DatagramSocket(null);
525 DatagramSocket s2 = new DatagramSocket(null)) {
526
527 InetAddress lh = InetAddress.getLoopbackAddress();
528 s1.bind(new InetSocketAddress(lh, 0));
529 s2.bind(new InetSocketAddress(lh, 0));
530
531 // send should not block
532 byte[] bytes = "XXX".getBytes("UTF-8");
533 DatagramPacket p1 = new DatagramPacket(bytes, bytes.length);
534 p1.setSocketAddress(s2.getLocalSocketAddress());
535 s1.send(p1);
536
537 // receive should not block
538 byte[] ba = new byte[100];
539 DatagramPacket p2 = new DatagramPacket(ba, ba.length);
540 s2.receive(p2);
541 assertEquals(s1.getLocalSocketAddress(), p2.getSocketAddress());
542 assertTrue(ba[0] == 'X');
543 }
544 });
545 }
546
547 /**
548 * Virtual thread blocks in DatagramSocket receive.
549 */
550 @ParameterizedTest
551 @ValueSource(ints = { 0, 60_000 })
552 void testDatagramSocketSendReceive(int timeout) throws Exception {
553 VThreadRunner.run(() -> {
554 try (DatagramSocket s1 = new DatagramSocket(null);
555 DatagramSocket s2 = new DatagramSocket(null)) {
556
557 InetAddress lh = InetAddress.getLoopbackAddress();
558 s1.bind(new InetSocketAddress(lh, 0));
559 s2.bind(new InetSocketAddress(lh, 0));
560
561 // delayed send
562 byte[] bytes = "XXX".getBytes("UTF-8");
563 DatagramPacket p1 = new DatagramPacket(bytes, bytes.length);
564 p1.setSocketAddress(s2.getLocalSocketAddress());
565 runAfterParkedAsync(() -> s1.send(p1));
566
567 // receive should block
568 if (timeout > 0) {
569 s2.setSoTimeout(timeout);
570 }
571 byte[] ba = new byte[100];
572 DatagramPacket p2 = new DatagramPacket(ba, ba.length);
573 s2.receive(p2);
574 assertEquals(s1.getLocalSocketAddress(), p2.getSocketAddress());
575 assertTrue(ba[0] == 'X');
576 }
577 });
578 }
579
580 /**
581 * Virtual thread blocks in DatagramSocket receive that times out.
582 */
583 @Test
584 void testDatagramSocketReceiveTimeout() throws Exception {
585 VThreadRunner.run(() -> {
586 try (DatagramSocket s = new DatagramSocket(null)) {
587 InetAddress lh = InetAddress.getLoopbackAddress();
588 s.bind(new InetSocketAddress(lh, 0));
589 s.setSoTimeout(500);
590 byte[] ba = new byte[100];
591 DatagramPacket p = new DatagramPacket(ba, ba.length);
592 try {
593 s.receive(p);
594 fail();
595 } catch (SocketTimeoutException expected) {
596 log(expected);
597 }
598 }
599 });
600 }
601
602 /**
603 * DatagramSocket close while virtual thread blocked in receive.
604 */
605 @ParameterizedTest
606 @ValueSource(ints = { 0, 60_000 })
607 void testDatagramSocketReceiveAsyncClose(int timeout) throws Exception {
608 VThreadRunner.run(() -> {
609 try (DatagramSocket s = new DatagramSocket(null)) {
610 InetAddress lh = InetAddress.getLoopbackAddress();
611 s.bind(new InetSocketAddress(lh, 0));
612
613 // delayed close of s
614 runAfterParkedAsync(s::close);
615
616 // receive should block, then throw
617 if (timeout > 0) {
618 s.setSoTimeout(timeout);
619 }
620 try {
621 byte[] ba = new byte[100];
622 DatagramPacket p = new DatagramPacket(ba, ba.length);
623 s.receive(p);
624 fail();
625 } catch (SocketException expected) {
626 log(expected);
627 }
628 }
629 });
630 }
631
632 /**
633 * Virtual thread interrupted while blocked in DatagramSocket receive.
634 */
635 @ParameterizedTest
636 @ValueSource(ints = { 0, 60_000 })
637 void testDatagramSocketReceiveInterrupt(int timeout) throws Exception {
638 VThreadRunner.run(() -> {
639 try (DatagramSocket s = new DatagramSocket(null)) {
640 InetAddress lh = InetAddress.getLoopbackAddress();
641 s.bind(new InetSocketAddress(lh, 0));
642
643 // delayed interrupt of current thread
644 Thread thisThread = Thread.currentThread();
645 runAfterParkedAsync(thisThread::interrupt);
646
647 // receive should block, then throw
648 if (timeout > 0) {
649 s.setSoTimeout(timeout);
650 }
651 try {
652 byte[] ba = new byte[100];
653 DatagramPacket p = new DatagramPacket(ba, ba.length);
654 s.receive(p);
655 fail();
656 } catch (SocketException expected) {
657 log(expected);
658 assertTrue(Thread.interrupted());
659 assertTrue(s.isClosed());
660 }
661 }
662 });
663 }
664
665 /**
666 * Creates a loopback connection
667 */
668 static class Connection implements Closeable {
669 private final Socket s1;
670 private final Socket s2;
671 Connection() throws IOException {
672 var lh = InetAddress.getLoopbackAddress();
673 try (var listener = new ServerSocket()) {
674 listener.bind(new InetSocketAddress(lh, 0));
675 Socket s1 = new Socket();
676 Socket s2;
677 try {
678 s1.connect(listener.getLocalSocketAddress());
679 s2 = listener.accept();
680 } catch (IOException ioe) {
681 s1.close();
682 throw ioe;
683 }
684 this.s1 = s1;
685 this.s2 = s2;
686 }
687
688 }
689 Socket socket1() {
690 return s1;
691 }
692 Socket socket2() {
693 return s2;
694 }
695 @Override
696 public void close() throws IOException {
697 s1.close();
698 s2.close();
699 }
700 }
701
702 @FunctionalInterface
703 interface ThrowingRunnable {
704 void run() throws Exception;
705 }
706
707 /**
708 * Runs the given task asynchronously after the current virtual thread has parked.
709 * @return the thread started to run the task
710 */
711 static Thread runAfterParkedAsync(ThrowingRunnable task) {
712 Thread target = Thread.currentThread();
713 if (!target.isVirtual())
714 throw new WrongThreadException();
715 return Thread.ofPlatform().daemon().start(() -> {
716 try {
717 Thread.State state = target.getState();
718 while (state != Thread.State.WAITING
719 && state != Thread.State.TIMED_WAITING) {
720 Thread.sleep(20);
721 state = target.getState();
722 }
723 Thread.sleep(20); // give a bit more time to release carrier
724 task.run();
725 } catch (Exception e) {
726 e.printStackTrace();
727 }
728 });
729 }
730
731 /**
732 * Log to System.err to inline with the JUnit messages.
733 */
734 static void log(Throwable e) {
735 System.err.println(e);
736 }
737 }