1 /*
2 * Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24 /*
25 * @test id=default
26 * @bug 8284161 8372958
27 * @summary Test virtual threads doing blocking I/O on java.net Sockets
28 * @library /test/lib
29 * @run junit BlockingSocketOps
30 */
31
32 /*
33 * @test id=poller-modes
34 * @requires (os.family == "linux") | (os.family == "mac")
35 * @library /test/lib
36 * @run junit/othervm -Djdk.pollerMode=1 BlockingSocketOps
37 * @run junit/othervm -Djdk.pollerMode=2 BlockingSocketOps
38 */
39
40 /*
41 * @test id=no-vmcontinuations
42 * @requires vm.continuations
43 * @library /test/lib
44 * @run junit/othervm -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingSocketOps
45 */
46
47 import java.io.Closeable;
48 import java.io.IOException;
49 import java.io.InputStream;
50 import java.io.OutputStream;
51 import java.net.DatagramPacket;
52 import java.net.DatagramSocket;
53 import java.net.InetAddress;
54 import java.net.InetSocketAddress;
55 import java.net.ServerSocket;
56 import java.net.Socket;
57 import java.net.SocketAddress;
58 import java.net.SocketException;
59 import java.net.SocketTimeoutException;
60
61 import jdk.test.lib.thread.VThreadRunner;
62 import org.junit.jupiter.api.Test;
63 import org.junit.jupiter.params.ParameterizedTest;
64 import org.junit.jupiter.params.provider.ValueSource;
65 import static org.junit.jupiter.api.Assertions.*;
66
67 class BlockingSocketOps {
68
69 /**
70 * Socket read/write, no blocking.
71 */
72 @Test
73 void testSocketReadWrite1() throws Exception {
74 VThreadRunner.run(() -> {
75 try (var connection = new Connection()) {
76 Socket s1 = connection.socket1();
77 Socket s2 = connection.socket2();
78
79 // write should not block
80 byte[] ba = "XXX".getBytes("UTF-8");
81 s1.getOutputStream().write(ba);
82
83 // read should not block
84 ba = new byte[10];
85 int n = s2.getInputStream().read(ba);
86 assertTrue(n > 0);
87 assertTrue(ba[0] == 'X');
88 }
89 });
90 }
91
92 /**
93 * Virtual thread blocks in read.
94 */
95 @ParameterizedTest
96 @ValueSource(ints = { 0, 60_000 })
97 void testSocketRead(int timeout) throws Exception {
98 VThreadRunner.run(() -> {
99 try (var connection = new Connection()) {
100 Socket s1 = connection.socket1();
101 Socket s2 = connection.socket2();
102
103 // delayed write from sc1
104 byte[] ba1 = "XXX".getBytes("UTF-8");
105 runAfterParkedAsync(() -> s1.getOutputStream().write(ba1));
106
107 // read from sc2 should block
108 if (timeout > 0) {
109 s2.setSoTimeout(timeout);
110 }
111 byte[] ba2 = new byte[10];
112 int n = s2.getInputStream().read(ba2);
113 assertTrue(n > 0);
114 assertTrue(ba2[0] == 'X');
115 }
116 });
117 }
118
119 /**
120 * Virtual thread blocks in write.
121 */
122 @Test
123 void testSocketWrite1() throws Exception {
124 VThreadRunner.run(() -> {
125 try (var connection = new Connection()) {
126 Socket s1 = connection.socket1();
127 Socket s2 = connection.socket2();
128
129 // delayed read from s2 to EOF
130 InputStream in = s2.getInputStream();
131 Thread reader = runAfterParkedAsync(() ->
132 in.transferTo(OutputStream.nullOutputStream()));
133
134 // write should block
135 byte[] ba = new byte[100*1024];
136 try (OutputStream out = s1.getOutputStream()) {
137 for (int i = 0; i < 1000; i++) {
138 out.write(ba);
139 }
140 }
141
142 // wait for reader to finish
143 reader.join();
144 }
145 });
146 }
147
148 /**
149 * Virtual thread blocks in read, peer closes connection gracefully.
150 */
151 @Test
152 void testSocketReadPeerClose1() throws Exception {
153 VThreadRunner.run(() -> {
154 try (var connection = new Connection()) {
155 Socket s1 = connection.socket1();
156 Socket s2 = connection.socket2();
157
158 // delayed close of s2
159 runAfterParkedAsync(s2::close);
160
161 // read from s1 should block, then read -1
162 int n = s1.getInputStream().read();
163 assertTrue(n == -1);
164 }
165 });
166 }
167
168 /**
169 * Virtual thread blocks in read, peer closes connection abruptly.
170 */
171 @Test
172 void testSocketReadPeerClose2() throws Exception {
173 VThreadRunner.run(() -> {
174 try (var connection = new Connection()) {
175 Socket s1 = connection.socket1();
176 Socket s2 = connection.socket2();
177
178 // delayed abrupt close of s2
179 s2.setSoLinger(true, 0);
180 runAfterParkedAsync(s2::close);
181
182 // read from s1 should block, then throw
183 try {
184 int n = s1.getInputStream().read();
185 fail("read " + n);
186 } catch (IOException ioe) {
187 // expected
188 }
189 }
190 });
191 }
192
193 /**
194 * Socket close while virtual thread blocked in read.
195 */
196 @ParameterizedTest
197 @ValueSource(ints = { 0, 60_000 })
198 void testSocketReadAsyncClose(int timeout) throws Exception {
199 VThreadRunner.run(() -> {
200 try (var connection = new Connection()) {
201 Socket s = connection.socket1();
202
203 // delayed close of s
204 runAfterParkedAsync(s::close);
205
206 // read from s should block, then throw
207 if (timeout > 0) {
208 s.setSoTimeout(timeout);
209 }
210 try {
211 int n = s.getInputStream().read();
212 fail("read " + n);
213 } catch (SocketException expected) {
214 log(expected);
215 }
216 }
217 });
218 }
219
220 /**
221 * Socket shutdownInput while virtual thread blocked in read.
222 */
223 @ParameterizedTest
224 @ValueSource(ints = { 0, 60_000 })
225 void testSocketReadAsyncShutdownInput(int timeout) throws Exception {
226 VThreadRunner.run(() -> {
227 try (var connection = new Connection()) {
228 Socket s = connection.socket1();
229
230 // delayed shutdown of input stream
231 InputStream in = s.getInputStream();
232 runAfterParkedAsync(s::shutdownInput);
233
234 // read should return -1
235 if (timeout > 0) {
236 s.setSoTimeout(timeout);
237 }
238 assertEquals(-1, in.read());
239 assertEquals(0, in.available());
240 assertFalse(s.isClosed());
241 }
242 });
243 }
244
245 /**
246 * Virtual thread interrupted while blocked in Socket read.
247 */
248 @ParameterizedTest
249 @ValueSource(ints = { 0, 60_000 })
250 void testSocketReadInterrupt(int timeout) throws Exception {
251 VThreadRunner.run(() -> {
252 try (var connection = new Connection()) {
253 Socket s = connection.socket1();
254
255
256 // delayed interrupt of current thread
257 Thread thisThread = Thread.currentThread();
258 runAfterParkedAsync(thisThread::interrupt);
259
260 // read from s should block, then throw
261 if (timeout > 0) {
262 s.setSoTimeout(timeout);
263 }
264 try {
265 int n = s.getInputStream().read();
266 fail("read " + n);
267 } catch (SocketException expected) {
268 log(expected);
269 assertTrue(Thread.interrupted());
270 assertTrue(s.isClosed());
271 }
272 }
273 });
274 }
275
276 /**
277 * Socket close while virtual thread blocked in write.
278 */
279 @Test
280 void testSocketWriteAsyncClose() throws Exception {
281 VThreadRunner.run(() -> {
282 try (var connection = new Connection()) {
283 Socket s = connection.socket1();
284
285 // delayed close of s
286 runAfterParkedAsync(s::close);
287
288 // write to s should block, then throw
289 try {
290 byte[] ba = new byte[100*1024];
291 OutputStream out = s.getOutputStream();
292 for (;;) {
293 out.write(ba);
294 }
295 } catch (SocketException expected) {
296 log(expected);
297 }
298 }
299 });
300 }
301
302 /**
303 * Socket shutdownOutput while virtual thread blocked in write.
304 */
305 @Test
306 void testSocketWriteAsyncShutdownOutput() throws Exception {
307 VThreadRunner.run(() -> {
308 try (var connection = new Connection()) {
309 Socket s = connection.socket1();
310
311 // delayed shutdown of output stream
312 OutputStream out = s.getOutputStream();
313 runAfterParkedAsync(s::shutdownOutput);
314
315 // write to s should block, then throw
316 try {
317 byte[] ba = new byte[100*1024];
318 for (;;) {
319 out.write(ba);
320 }
321 } catch (SocketException expected) {
322 log(expected);
323 }
324 assertFalse(s.isClosed());
325 }
326 });
327 }
328
329 /**
330 * Virtual thread interrupted while blocked in Socket write.
331 */
332 @Test
333 void testSocketWriteInterrupt() throws Exception {
334 VThreadRunner.run(() -> {
335 try (var connection = new Connection()) {
336 Socket s = connection.socket1();
337
338 // delayed interrupt of current thread
339 Thread thisThread = Thread.currentThread();
340 runAfterParkedAsync(thisThread::interrupt);
341
342 // write to s should block, then throw
343 try {
344 byte[] ba = new byte[100*1024];
345 OutputStream out = s.getOutputStream();
346 for (;;) {
347 out.write(ba);
348 }
349 } catch (SocketException expected) {
350 log(expected);
351 assertTrue(Thread.interrupted());
352 assertTrue(s.isClosed());
353 }
354 }
355 });
356 }
357
358 /**
359 * Virtual thread reading urgent data when SO_OOBINLINE is enabled.
360 */
361 @Test
362 void testSocketReadUrgentData() throws Exception {
363 VThreadRunner.run(() -> {
364 try (var connection = new Connection()) {
365 Socket s1 = connection.socket1();
366 Socket s2 = connection.socket2();
367
368 // urgent data should be received
369 runAfterParkedAsync(() -> s2.sendUrgentData('X'));
370
371 // read should block, then read the OOB byte
372 s1.setOOBInline(true);
373 byte[] ba = new byte[10];
374 int n = s1.getInputStream().read(ba);
375 assertTrue(n == 1);
376 assertTrue(ba[0] == 'X');
377
378 // urgent data should not be received
379 s1.setOOBInline(false);
380 s1.setSoTimeout(500);
381 s2.sendUrgentData('X');
382 try {
383 s1.getInputStream().read(ba);
384 fail();
385 } catch (SocketTimeoutException expected) {
386 log(expected);
387 }
388 }
389 });
390 }
391
392 /**
393 * ServerSocket accept, no blocking.
394 */
395 @Test
396 void testServerSocketAccept1() throws Exception {
397 VThreadRunner.run(() -> {
398 try (var listener = new ServerSocket()) {
399 InetAddress loopback = InetAddress.getLoopbackAddress();
400 listener.bind(new InetSocketAddress(loopback, 0));
401
402 // establish connection
403 var socket1 = new Socket(loopback, listener.getLocalPort());
404
405 // accept should not block
406 var socket2 = listener.accept();
407 socket1.close();
408 socket2.close();
409 }
410 });
411 }
412
413 /**
414 * Virtual thread blocks in accept.
415 */
416 @ParameterizedTest
417 @ValueSource(ints = { 0, 60_000 })
418 void testServerSocketAccept(int timeout) throws Exception {
419 VThreadRunner.run(() -> {
420 try (var listener = new ServerSocket()) {
421 InetAddress loopback = InetAddress.getLoopbackAddress();
422 listener.bind(new InetSocketAddress(loopback, 0));
423
424 // schedule connect
425 var socket1 = new Socket();
426 SocketAddress remote = listener.getLocalSocketAddress();
427 runAfterParkedAsync(() -> socket1.connect(remote));
428
429 // accept should block
430 if (timeout > 0) {
431 listener.setSoTimeout(timeout);
432 }
433 var socket2 = listener.accept();
434 socket1.close();
435 socket2.close();
436 }
437 });
438 }
439
440 /**
441 * ServerSocket close while virtual thread blocked in accept.
442 */
443 @ParameterizedTest
444 @ValueSource(ints = { 0, 60_000 })
445 void testServerSocketAcceptAsyncClose(int timeout) throws Exception {
446 VThreadRunner.run(() -> {
447 try (var listener = new ServerSocket()) {
448 InetAddress loopback = InetAddress.getLoopbackAddress();
449 listener.bind(new InetSocketAddress(loopback, 0));
450
451 // delayed close of listener
452 runAfterParkedAsync(listener::close);
453
454 // accept should block, then throw
455 if (timeout > 0) {
456 listener.setSoTimeout(timeout);
457 }
458 try {
459 listener.accept().close();
460 fail("connection accepted???");
461 } catch (SocketException expected) {
462 log(expected);
463 }
464 }
465 });
466 }
467
468 /**
469 * Virtual thread interrupted while blocked in ServerSocket accept.
470 */
471 @ParameterizedTest
472 @ValueSource(ints = { 0, 60_000 })
473 void testServerSocketAcceptInterrupt(int timeout) throws Exception {
474 VThreadRunner.run(() -> {
475 try (var listener = new ServerSocket()) {
476 InetAddress loopback = InetAddress.getLoopbackAddress();
477 listener.bind(new InetSocketAddress(loopback, 0));
478
479 // delayed interrupt of current thread
480 Thread thisThread = Thread.currentThread();
481 runAfterParkedAsync(thisThread::interrupt);
482
483 // accept should block, then throw
484 if (timeout > 0) {
485 listener.setSoTimeout(timeout);
486 }
487 try {
488 listener.accept().close();
489 fail("connection accepted???");
490 } catch (SocketException expected) {
491 log(expected);
492 assertTrue(Thread.interrupted());
493 assertTrue(listener.isClosed());
494 }
495 }
496 });
497 }
498
499 /**
500 * DatagramSocket receive/send, no blocking.
501 */
502 @Test
503 void testDatagramSocketSendReceive1() throws Exception {
504 VThreadRunner.run(() -> {
505 try (DatagramSocket s1 = new DatagramSocket(null);
506 DatagramSocket s2 = new DatagramSocket(null)) {
507
508 InetAddress lh = InetAddress.getLoopbackAddress();
509 s1.bind(new InetSocketAddress(lh, 0));
510 s2.bind(new InetSocketAddress(lh, 0));
511
512 // send should not block
513 byte[] bytes = "XXX".getBytes("UTF-8");
514 DatagramPacket p1 = new DatagramPacket(bytes, bytes.length);
515 p1.setSocketAddress(s2.getLocalSocketAddress());
516 s1.send(p1);
517
518 // receive should not block
519 byte[] ba = new byte[100];
520 DatagramPacket p2 = new DatagramPacket(ba, ba.length);
521 s2.receive(p2);
522 assertEquals(s1.getLocalSocketAddress(), p2.getSocketAddress());
523 assertTrue(ba[0] == 'X');
524 }
525 });
526 }
527
528 /**
529 * Virtual thread blocks in DatagramSocket receive.
530 */
531 @ParameterizedTest
532 @ValueSource(ints = { 0, 60_000 })
533 void testDatagramSocketSendReceive(int timeout) throws Exception {
534 VThreadRunner.run(() -> {
535 try (DatagramSocket s1 = new DatagramSocket(null);
536 DatagramSocket s2 = new DatagramSocket(null)) {
537
538 InetAddress lh = InetAddress.getLoopbackAddress();
539 s1.bind(new InetSocketAddress(lh, 0));
540 s2.bind(new InetSocketAddress(lh, 0));
541
542 // delayed send
543 byte[] bytes = "XXX".getBytes("UTF-8");
544 DatagramPacket p1 = new DatagramPacket(bytes, bytes.length);
545 p1.setSocketAddress(s2.getLocalSocketAddress());
546 runAfterParkedAsync(() -> s1.send(p1));
547
548 // receive should block
549 if (timeout > 0) {
550 s2.setSoTimeout(timeout);
551 }
552 byte[] ba = new byte[100];
553 DatagramPacket p2 = new DatagramPacket(ba, ba.length);
554 s2.receive(p2);
555 assertEquals(s1.getLocalSocketAddress(), p2.getSocketAddress());
556 assertTrue(ba[0] == 'X');
557 }
558 });
559 }
560
561 /**
562 * Virtual thread blocks in DatagramSocket receive that times out.
563 */
564 @Test
565 void testDatagramSocketReceiveTimeout() throws Exception {
566 VThreadRunner.run(() -> {
567 try (DatagramSocket s = new DatagramSocket(null)) {
568 InetAddress lh = InetAddress.getLoopbackAddress();
569 s.bind(new InetSocketAddress(lh, 0));
570 s.setSoTimeout(500);
571 byte[] ba = new byte[100];
572 DatagramPacket p = new DatagramPacket(ba, ba.length);
573 try {
574 s.receive(p);
575 fail();
576 } catch (SocketTimeoutException expected) {
577 log(expected);
578 }
579 }
580 });
581 }
582
583 /**
584 * DatagramSocket close while virtual thread blocked in receive.
585 */
586 @ParameterizedTest
587 @ValueSource(ints = { 0, 60_000 })
588 void testDatagramSocketReceiveAsyncClose(int timeout) throws Exception {
589 VThreadRunner.run(() -> {
590 try (DatagramSocket s = new DatagramSocket(null)) {
591 InetAddress lh = InetAddress.getLoopbackAddress();
592 s.bind(new InetSocketAddress(lh, 0));
593
594 // delayed close of s
595 runAfterParkedAsync(s::close);
596
597 // receive should block, then throw
598 if (timeout > 0) {
599 s.setSoTimeout(timeout);
600 }
601 try {
602 byte[] ba = new byte[100];
603 DatagramPacket p = new DatagramPacket(ba, ba.length);
604 s.receive(p);
605 fail();
606 } catch (SocketException expected) {
607 log(expected);
608 }
609 }
610 });
611 }
612
613 /**
614 * Virtual thread interrupted while blocked in DatagramSocket receive.
615 */
616 @ParameterizedTest
617 @ValueSource(ints = { 0, 60_000 })
618 void testDatagramSocketReceiveInterrupt(int timeout) throws Exception {
619 VThreadRunner.run(() -> {
620 try (DatagramSocket s = new DatagramSocket(null)) {
621 InetAddress lh = InetAddress.getLoopbackAddress();
622 s.bind(new InetSocketAddress(lh, 0));
623
624 // delayed interrupt of current thread
625 Thread thisThread = Thread.currentThread();
626 runAfterParkedAsync(thisThread::interrupt);
627
628 // receive should block, then throw
629 if (timeout > 0) {
630 s.setSoTimeout(timeout);
631 }
632 try {
633 byte[] ba = new byte[100];
634 DatagramPacket p = new DatagramPacket(ba, ba.length);
635 s.receive(p);
636 fail();
637 } catch (SocketException expected) {
638 log(expected);
639 assertTrue(Thread.interrupted());
640 assertTrue(s.isClosed());
641 }
642 }
643 });
644 }
645
646 /**
647 * Creates a loopback connection
648 */
649 static class Connection implements Closeable {
650 private final Socket s1;
651 private final Socket s2;
652 Connection() throws IOException {
653 var lh = InetAddress.getLoopbackAddress();
654 try (var listener = new ServerSocket()) {
655 listener.bind(new InetSocketAddress(lh, 0));
656 Socket s1 = new Socket();
657 Socket s2;
658 try {
659 s1.connect(listener.getLocalSocketAddress());
660 s2 = listener.accept();
661 } catch (IOException ioe) {
662 s1.close();
663 throw ioe;
664 }
665 this.s1 = s1;
666 this.s2 = s2;
667 }
668
669 }
670 Socket socket1() {
671 return s1;
672 }
673 Socket socket2() {
674 return s2;
675 }
676 @Override
677 public void close() throws IOException {
678 s1.close();
679 s2.close();
680 }
681 }
682
683 @FunctionalInterface
684 interface ThrowingRunnable {
685 void run() throws Exception;
686 }
687
688 /**
689 * Runs the given task asynchronously after the current virtual thread has parked.
690 * @return the thread started to run the task
691 */
692 static Thread runAfterParkedAsync(ThrowingRunnable task) {
693 Thread target = Thread.currentThread();
694 if (!target.isVirtual())
695 throw new WrongThreadException();
696 return Thread.ofPlatform().daemon().start(() -> {
697 try {
698 Thread.State state = target.getState();
699 while (state != Thread.State.WAITING
700 && state != Thread.State.TIMED_WAITING) {
701 Thread.sleep(20);
702 state = target.getState();
703 }
704 Thread.sleep(20); // give a bit more time to release carrier
705 task.run();
706 } catch (Exception e) {
707 e.printStackTrace();
708 }
709 });
710 }
711
712 /**
713 * Log to System.err to inline with the JUnit messages.
714 */
715 static void log(Throwable e) {
716 System.err.println(e);
717 }
718 }