1 /*
2 * Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24 /*
25 * @test id=default
26 * @bug 8284161 8372958
27 * @summary Test virtual threads doing blocking I/O on java.net Sockets
28 * @library /test/lib
29 * @run junit BlockingSocketOps
30 */
31
32 /*
33 * @test id=poller-modes
34 * @requires (os.family == "linux") | (os.family == "mac")
35 * @library /test/lib
36 * @run junit/othervm -Djdk.pollerMode=1 BlockingSocketOps
37 * @run junit/othervm -Djdk.pollerMode=2 BlockingSocketOps
38 * @run junit/othervm -Djdk.pollerMode=3 BlockingSocketOps
39 */
40
41 /*
42 * @test id=no-vmcontinuations
43 * @requires vm.continuations
44 * @library /test/lib
45 * @run junit/othervm -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingSocketOps
46 */
47
48 import java.io.Closeable;
49 import java.io.IOException;
50 import java.io.InputStream;
51 import java.io.OutputStream;
52 import java.net.DatagramPacket;
53 import java.net.DatagramSocket;
54 import java.net.InetAddress;
55 import java.net.InetSocketAddress;
56 import java.net.ServerSocket;
57 import java.net.Socket;
58 import java.net.SocketAddress;
59 import java.net.SocketException;
60 import java.net.SocketTimeoutException;
61
62 import jdk.test.lib.thread.VThreadRunner;
63 import org.junit.jupiter.api.Test;
64 import org.junit.jupiter.params.ParameterizedTest;
65 import org.junit.jupiter.params.provider.ValueSource;
66 import static org.junit.jupiter.api.Assertions.*;
67
68 class BlockingSocketOps {
69
70 /**
71 * Socket read/write, no blocking.
72 */
73 @Test
74 void testSocketReadWrite1() throws Exception {
75 VThreadRunner.run(() -> {
76 try (var connection = new Connection()) {
77 Socket s1 = connection.socket1();
78 Socket s2 = connection.socket2();
79
80 // write should not block
81 byte[] ba = "XXX".getBytes("UTF-8");
82 s1.getOutputStream().write(ba);
83
84 // read should not block
85 ba = new byte[10];
86 int n = s2.getInputStream().read(ba);
87 assertTrue(n > 0);
88 assertTrue(ba[0] == 'X');
89 }
90 });
91 }
92
93 /**
94 * Virtual thread blocks in read.
95 */
96 @ParameterizedTest
97 @ValueSource(ints = { 0, 60_000 })
98 void testSocketRead(int timeout) throws Exception {
99 VThreadRunner.run(() -> {
100 try (var connection = new Connection()) {
101 Socket s1 = connection.socket1();
102 Socket s2 = connection.socket2();
103
104 // delayed write from sc1
105 byte[] ba1 = "XXX".getBytes("UTF-8");
106 runAfterParkedAsync(() -> s1.getOutputStream().write(ba1));
107
108 // read from sc2 should block
109 if (timeout > 0) {
110 s2.setSoTimeout(timeout);
111 }
112 byte[] ba2 = new byte[10];
113 int n = s2.getInputStream().read(ba2);
114 assertTrue(n > 0);
115 assertTrue(ba2[0] == 'X');
116 }
117 });
118 }
119
120 /**
121 * Virtual thread blocks in write.
122 */
123 @Test
124 void testSocketWrite1() throws Exception {
125 VThreadRunner.run(() -> {
126 try (var connection = new Connection()) {
127 Socket s1 = connection.socket1();
128 Socket s2 = connection.socket2();
129
130 // delayed read from s2 to EOF
131 InputStream in = s2.getInputStream();
132 Thread reader = runAfterParkedAsync(() ->
133 in.transferTo(OutputStream.nullOutputStream()));
134
135 // write should block
136 byte[] ba = new byte[100*1024];
137 try (OutputStream out = s1.getOutputStream()) {
138 for (int i = 0; i < 1000; i++) {
139 out.write(ba);
140 }
141 }
142
143 // wait for reader to finish
144 reader.join();
145 }
146 });
147 }
148
149 /**
150 * Virtual thread blocks in read, peer closes connection gracefully.
151 */
152 @Test
153 void testSocketReadPeerClose1() throws Exception {
154 VThreadRunner.run(() -> {
155 try (var connection = new Connection()) {
156 Socket s1 = connection.socket1();
157 Socket s2 = connection.socket2();
158
159 // delayed close of s2
160 runAfterParkedAsync(s2::close);
161
162 // read from s1 should block, then read -1
163 int n = s1.getInputStream().read();
164 assertTrue(n == -1);
165 }
166 });
167 }
168
169 /**
170 * Virtual thread blocks in read, peer closes connection abruptly.
171 */
172 @Test
173 void testSocketReadPeerClose2() throws Exception {
174 VThreadRunner.run(() -> {
175 try (var connection = new Connection()) {
176 Socket s1 = connection.socket1();
177 Socket s2 = connection.socket2();
178
179 // delayed abrupt close of s2
180 s2.setSoLinger(true, 0);
181 runAfterParkedAsync(s2::close);
182
183 // read from s1 should block, then throw
184 try {
185 int n = s1.getInputStream().read();
186 fail("read " + n);
187 } catch (IOException ioe) {
188 // expected
189 }
190 }
191 });
192 }
193
194 /**
195 * Socket close while virtual thread blocked in read.
196 */
197 @ParameterizedTest
198 @ValueSource(ints = { 0, 60_000 })
199 void testSocketReadAsyncClose(int timeout) throws Exception {
200 VThreadRunner.run(() -> {
201 try (var connection = new Connection()) {
202 Socket s = connection.socket1();
203
204 // delayed close of s
205 runAfterParkedAsync(s::close);
206
207 // read from s should block, then throw
208 if (timeout > 0) {
209 s.setSoTimeout(timeout);
210 }
211 try {
212 int n = s.getInputStream().read();
213 fail("read " + n);
214 } catch (SocketException expected) {
215 log(expected);
216 }
217 }
218 });
219 }
220
221 /**
222 * Socket shutdownInput while virtual thread blocked in read.
223 */
224 @ParameterizedTest
225 @ValueSource(ints = { 0, 60_000 })
226 void testSocketReadAsyncShutdownInput(int timeout) throws Exception {
227 VThreadRunner.run(() -> {
228 try (var connection = new Connection()) {
229 Socket s = connection.socket1();
230
231 // delayed shutdown of input stream
232 InputStream in = s.getInputStream();
233 runAfterParkedAsync(s::shutdownInput);
234
235 // read should return -1
236 if (timeout > 0) {
237 s.setSoTimeout(timeout);
238 }
239 assertEquals(-1, in.read());
240 assertEquals(0, in.available());
241 assertFalse(s.isClosed());
242 }
243 });
244 }
245
246 /**
247 * Virtual thread interrupted while blocked in Socket read.
248 */
249 @ParameterizedTest
250 @ValueSource(ints = { 0, 60_000 })
251 void testSocketReadInterrupt(int timeout) throws Exception {
252 VThreadRunner.run(() -> {
253 try (var connection = new Connection()) {
254 Socket s = connection.socket1();
255
256
257 // delayed interrupt of current thread
258 Thread thisThread = Thread.currentThread();
259 runAfterParkedAsync(thisThread::interrupt);
260
261 // read from s should block, then throw
262 if (timeout > 0) {
263 s.setSoTimeout(timeout);
264 }
265 try {
266 int n = s.getInputStream().read();
267 fail("read " + n);
268 } catch (SocketException expected) {
269 log(expected);
270 assertTrue(Thread.interrupted());
271 assertTrue(s.isClosed());
272 }
273 }
274 });
275 }
276
277 /**
278 * Socket close while virtual thread blocked in write.
279 */
280 @Test
281 void testSocketWriteAsyncClose() throws Exception {
282 VThreadRunner.run(() -> {
283 try (var connection = new Connection()) {
284 Socket s = connection.socket1();
285
286 // delayed close of s
287 runAfterParkedAsync(s::close);
288
289 // write to s should block, then throw
290 try {
291 byte[] ba = new byte[100*1024];
292 OutputStream out = s.getOutputStream();
293 for (;;) {
294 out.write(ba);
295 }
296 } catch (SocketException expected) {
297 log(expected);
298 }
299 }
300 });
301 }
302
303 /**
304 * Socket shutdownOutput while virtual thread blocked in write.
305 */
306 @Test
307 void testSocketWriteAsyncShutdownOutput() throws Exception {
308 VThreadRunner.run(() -> {
309 try (var connection = new Connection()) {
310 Socket s = connection.socket1();
311
312 // delayed shutdown of output stream
313 OutputStream out = s.getOutputStream();
314 runAfterParkedAsync(s::shutdownOutput);
315
316 // write to s should block, then throw
317 try {
318 byte[] ba = new byte[100*1024];
319 for (;;) {
320 out.write(ba);
321 }
322 } catch (SocketException expected) {
323 log(expected);
324 }
325 assertFalse(s.isClosed());
326 }
327 });
328 }
329
330 /**
331 * Virtual thread interrupted while blocked in Socket write.
332 */
333 @Test
334 void testSocketWriteInterrupt() throws Exception {
335 VThreadRunner.run(() -> {
336 try (var connection = new Connection()) {
337 Socket s = connection.socket1();
338
339 // delayed interrupt of current thread
340 Thread thisThread = Thread.currentThread();
341 runAfterParkedAsync(thisThread::interrupt);
342
343 // write to s should block, then throw
344 try {
345 byte[] ba = new byte[100*1024];
346 OutputStream out = s.getOutputStream();
347 for (;;) {
348 out.write(ba);
349 }
350 } catch (SocketException expected) {
351 log(expected);
352 assertTrue(Thread.interrupted());
353 assertTrue(s.isClosed());
354 }
355 }
356 });
357 }
358
359 /**
360 * Virtual thread reading urgent data when SO_OOBINLINE is enabled.
361 */
362 @Test
363 void testSocketReadUrgentData() throws Exception {
364 VThreadRunner.run(() -> {
365 try (var connection = new Connection()) {
366 Socket s1 = connection.socket1();
367 Socket s2 = connection.socket2();
368
369 // urgent data should be received
370 runAfterParkedAsync(() -> s2.sendUrgentData('X'));
371
372 // read should block, then read the OOB byte
373 s1.setOOBInline(true);
374 byte[] ba = new byte[10];
375 int n = s1.getInputStream().read(ba);
376 assertTrue(n == 1);
377 assertTrue(ba[0] == 'X');
378
379 // urgent data should not be received
380 s1.setOOBInline(false);
381 s1.setSoTimeout(500);
382 s2.sendUrgentData('X');
383 try {
384 s1.getInputStream().read(ba);
385 fail();
386 } catch (SocketTimeoutException expected) {
387 log(expected);
388 }
389 }
390 });
391 }
392
393 /**
394 * ServerSocket accept, no blocking.
395 */
396 @Test
397 void testServerSocketAccept1() throws Exception {
398 VThreadRunner.run(() -> {
399 try (var listener = new ServerSocket()) {
400 InetAddress loopback = InetAddress.getLoopbackAddress();
401 listener.bind(new InetSocketAddress(loopback, 0));
402
403 // establish connection
404 var socket1 = new Socket(loopback, listener.getLocalPort());
405
406 // accept should not block
407 var socket2 = listener.accept();
408 socket1.close();
409 socket2.close();
410 }
411 });
412 }
413
414 /**
415 * Virtual thread blocks in accept.
416 */
417 @ParameterizedTest
418 @ValueSource(ints = { 0, 60_000 })
419 void testServerSocketAccept(int timeout) throws Exception {
420 VThreadRunner.run(() -> {
421 try (var listener = new ServerSocket()) {
422 InetAddress loopback = InetAddress.getLoopbackAddress();
423 listener.bind(new InetSocketAddress(loopback, 0));
424
425 // schedule connect
426 var socket1 = new Socket();
427 SocketAddress remote = listener.getLocalSocketAddress();
428 runAfterParkedAsync(() -> socket1.connect(remote));
429
430 // accept should block
431 if (timeout > 0) {
432 listener.setSoTimeout(timeout);
433 }
434 var socket2 = listener.accept();
435 socket1.close();
436 socket2.close();
437 }
438 });
439 }
440
441 /**
442 * ServerSocket close while virtual thread blocked in accept.
443 */
444 @ParameterizedTest
445 @ValueSource(ints = { 0, 60_000 })
446 void testServerSocketAcceptAsyncClose(int timeout) throws Exception {
447 VThreadRunner.run(() -> {
448 try (var listener = new ServerSocket()) {
449 InetAddress loopback = InetAddress.getLoopbackAddress();
450 listener.bind(new InetSocketAddress(loopback, 0));
451
452 // delayed close of listener
453 runAfterParkedAsync(listener::close);
454
455 // accept should block, then throw
456 if (timeout > 0) {
457 listener.setSoTimeout(timeout);
458 }
459 try {
460 listener.accept().close();
461 fail("connection accepted???");
462 } catch (SocketException expected) {
463 log(expected);
464 }
465 }
466 });
467 }
468
469 /**
470 * Virtual thread interrupted while blocked in ServerSocket accept.
471 */
472 @ParameterizedTest
473 @ValueSource(ints = { 0, 60_000 })
474 void testServerSocketAcceptInterrupt(int timeout) throws Exception {
475 VThreadRunner.run(() -> {
476 try (var listener = new ServerSocket()) {
477 InetAddress loopback = InetAddress.getLoopbackAddress();
478 listener.bind(new InetSocketAddress(loopback, 0));
479
480 // delayed interrupt of current thread
481 Thread thisThread = Thread.currentThread();
482 runAfterParkedAsync(thisThread::interrupt);
483
484 // accept should block, then throw
485 if (timeout > 0) {
486 listener.setSoTimeout(timeout);
487 }
488 try {
489 listener.accept().close();
490 fail("connection accepted???");
491 } catch (SocketException expected) {
492 log(expected);
493 assertTrue(Thread.interrupted());
494 assertTrue(listener.isClosed());
495 }
496 }
497 });
498 }
499
500 /**
501 * DatagramSocket receive/send, no blocking.
502 */
503 @Test
504 void testDatagramSocketSendReceive1() throws Exception {
505 VThreadRunner.run(() -> {
506 try (DatagramSocket s1 = new DatagramSocket(null);
507 DatagramSocket s2 = new DatagramSocket(null)) {
508
509 InetAddress lh = InetAddress.getLoopbackAddress();
510 s1.bind(new InetSocketAddress(lh, 0));
511 s2.bind(new InetSocketAddress(lh, 0));
512
513 // send should not block
514 byte[] bytes = "XXX".getBytes("UTF-8");
515 DatagramPacket p1 = new DatagramPacket(bytes, bytes.length);
516 p1.setSocketAddress(s2.getLocalSocketAddress());
517 s1.send(p1);
518
519 // receive should not block
520 byte[] ba = new byte[100];
521 DatagramPacket p2 = new DatagramPacket(ba, ba.length);
522 s2.receive(p2);
523 assertEquals(s1.getLocalSocketAddress(), p2.getSocketAddress());
524 assertTrue(ba[0] == 'X');
525 }
526 });
527 }
528
529 /**
530 * Virtual thread blocks in DatagramSocket receive.
531 */
532 @ParameterizedTest
533 @ValueSource(ints = { 0, 60_000 })
534 void testDatagramSocketSendReceive(int timeout) throws Exception {
535 VThreadRunner.run(() -> {
536 try (DatagramSocket s1 = new DatagramSocket(null);
537 DatagramSocket s2 = new DatagramSocket(null)) {
538
539 InetAddress lh = InetAddress.getLoopbackAddress();
540 s1.bind(new InetSocketAddress(lh, 0));
541 s2.bind(new InetSocketAddress(lh, 0));
542
543 // delayed send
544 byte[] bytes = "XXX".getBytes("UTF-8");
545 DatagramPacket p1 = new DatagramPacket(bytes, bytes.length);
546 p1.setSocketAddress(s2.getLocalSocketAddress());
547 runAfterParkedAsync(() -> s1.send(p1));
548
549 // receive should block
550 if (timeout > 0) {
551 s2.setSoTimeout(timeout);
552 }
553 byte[] ba = new byte[100];
554 DatagramPacket p2 = new DatagramPacket(ba, ba.length);
555 s2.receive(p2);
556 assertEquals(s1.getLocalSocketAddress(), p2.getSocketAddress());
557 assertTrue(ba[0] == 'X');
558 }
559 });
560 }
561
562 /**
563 * Virtual thread blocks in DatagramSocket receive that times out.
564 */
565 @Test
566 void testDatagramSocketReceiveTimeout() throws Exception {
567 VThreadRunner.run(() -> {
568 try (DatagramSocket s = new DatagramSocket(null)) {
569 InetAddress lh = InetAddress.getLoopbackAddress();
570 s.bind(new InetSocketAddress(lh, 0));
571 s.setSoTimeout(500);
572 byte[] ba = new byte[100];
573 DatagramPacket p = new DatagramPacket(ba, ba.length);
574 try {
575 s.receive(p);
576 fail();
577 } catch (SocketTimeoutException expected) {
578 log(expected);
579 }
580 }
581 });
582 }
583
584 /**
585 * DatagramSocket close while virtual thread blocked in receive.
586 */
587 @ParameterizedTest
588 @ValueSource(ints = { 0, 60_000 })
589 void testDatagramSocketReceiveAsyncClose(int timeout) throws Exception {
590 VThreadRunner.run(() -> {
591 try (DatagramSocket s = new DatagramSocket(null)) {
592 InetAddress lh = InetAddress.getLoopbackAddress();
593 s.bind(new InetSocketAddress(lh, 0));
594
595 // delayed close of s
596 runAfterParkedAsync(s::close);
597
598 // receive should block, then throw
599 if (timeout > 0) {
600 s.setSoTimeout(timeout);
601 }
602 try {
603 byte[] ba = new byte[100];
604 DatagramPacket p = new DatagramPacket(ba, ba.length);
605 s.receive(p);
606 fail();
607 } catch (SocketException expected) {
608 log(expected);
609 }
610 }
611 });
612 }
613
614 /**
615 * Virtual thread interrupted while blocked in DatagramSocket receive.
616 */
617 @ParameterizedTest
618 @ValueSource(ints = { 0, 60_000 })
619 void testDatagramSocketReceiveInterrupt(int timeout) throws Exception {
620 VThreadRunner.run(() -> {
621 try (DatagramSocket s = new DatagramSocket(null)) {
622 InetAddress lh = InetAddress.getLoopbackAddress();
623 s.bind(new InetSocketAddress(lh, 0));
624
625 // delayed interrupt of current thread
626 Thread thisThread = Thread.currentThread();
627 runAfterParkedAsync(thisThread::interrupt);
628
629 // receive should block, then throw
630 if (timeout > 0) {
631 s.setSoTimeout(timeout);
632 }
633 try {
634 byte[] ba = new byte[100];
635 DatagramPacket p = new DatagramPacket(ba, ba.length);
636 s.receive(p);
637 fail();
638 } catch (SocketException expected) {
639 log(expected);
640 assertTrue(Thread.interrupted());
641 assertTrue(s.isClosed());
642 }
643 }
644 });
645 }
646
647 /**
648 * Creates a loopback connection
649 */
650 static class Connection implements Closeable {
651 private final Socket s1;
652 private final Socket s2;
653 Connection() throws IOException {
654 var lh = InetAddress.getLoopbackAddress();
655 try (var listener = new ServerSocket()) {
656 listener.bind(new InetSocketAddress(lh, 0));
657 Socket s1 = new Socket();
658 Socket s2;
659 try {
660 s1.connect(listener.getLocalSocketAddress());
661 s2 = listener.accept();
662 } catch (IOException ioe) {
663 s1.close();
664 throw ioe;
665 }
666 this.s1 = s1;
667 this.s2 = s2;
668 }
669
670 }
671 Socket socket1() {
672 return s1;
673 }
674 Socket socket2() {
675 return s2;
676 }
677 @Override
678 public void close() throws IOException {
679 s1.close();
680 s2.close();
681 }
682 }
683
684 @FunctionalInterface
685 interface ThrowingRunnable {
686 void run() throws Exception;
687 }
688
689 /**
690 * Runs the given task asynchronously after the current virtual thread has parked.
691 * @return the thread started to run the task
692 */
693 static Thread runAfterParkedAsync(ThrowingRunnable task) {
694 Thread target = Thread.currentThread();
695 if (!target.isVirtual())
696 throw new WrongThreadException();
697 return Thread.ofPlatform().daemon().start(() -> {
698 try {
699 Thread.State state = target.getState();
700 while (state != Thread.State.WAITING
701 && state != Thread.State.TIMED_WAITING) {
702 Thread.sleep(20);
703 state = target.getState();
704 }
705 Thread.sleep(20); // give a bit more time to release carrier
706 task.run();
707 } catch (Exception e) {
708 e.printStackTrace();
709 }
710 });
711 }
712
713 /**
714 * Log to System.err to inline with the JUnit messages.
715 */
716 static void log(Throwable e) {
717 System.err.println(e);
718 }
719 }